Skip to content
Snippets Groups Projects
Select Git revision
  • 2023ws
  • 2024ws default
  • 2022ws
  • 2021ws
  • 2020ws
  • 2018ws
  • 2019ws
  • 2017ws
  • 2016ws
9 results

hanoi-2.c

Blame
  • measure.py 3.64 KiB
    #
    # Example of how to profile a Python app with multiple processes
    # by logging events and opening the resulting trace file in chrome://tracing.
    #
    
    # pip install multiprocessing_logging
    
    from functools import wraps
    import json
    import logging
    from multiprocessing import Pool
    import os
    import random
    import time
    import threading
    import serial
    import datetime
    
    from multiprocessing_logging import install_mp_handler
    
    # we want to be able to log from multiple processes
    logging.basicConfig(level=logging.DEBUG)
    logger = logging.getLogger()
    install_mp_handler()
    
    # separate logger that only stores events into a file
    prof_logger = logging.getLogger("profiling")
    # not not propagate to root logger, we want to store this separately
    prof_logger.propagate = False
    handler = logging.FileHandler("profiling.log", "w+")
    handler.setFormatter(logging.Formatter("%(message)s"))
    prof_logger.addHandler(handler)
    install_mp_handler(prof_logger)
    
    
    def log_profile(category: str = None):
        def decorator(f):
            @wraps(f)
            def wrapper(*args, **kwargs):
                # format compatible with chrome://tracing
                # info: https://www.gamasutra.com/view/news/176420/Indepth_Using_Chrometracing_to_view_your_inline_profiling_data.php
                base_info = {
                    "name": f.__name__,
                    "pid": os.getpid(),
                    "tid": threading.current_thread().ident,
                    "cat": category,
                }
    
                def log_event(**kwargs):
                    prof_logger.debug(json.dumps(kwargs))
    
                def time_usec():
                    return int(round(1e6 * time.time()))
    
                start_time = time_usec()
                log_event(ph="B", ts=start_time, **base_info)
    
                result = f(*args, **kwargs)
    
                end_time = time_usec()
                duration = end_time - start_time
                # TODO: duration could possibly be computed afterwards (if we can pair the events correctly)
                log_event(ph="E", ts=end_time, duration=duration, **base_info)
    
                return result
    
            return wrapper
    
        return decorator
    
    
    def convert_log_to_trace(log_file, trace_file):
        with open(trace_file, "wt") as output, open(log_file, "rt") as input:
            events = [json.loads(line) for line in input]
            json.dump({"traceEvents": events}, output)
    
    
    @log_profile("compute")
    def read(connection: serial.Serial):
        for _ in range(4):
            recv1 = connection.readline()
            float(convert(recv1))
    
    
    @log_profile("write")
    def write(connection: serial.Serial):
        connection.write(1)
    
    
    @log_profile("offset")
    def offset(connection: serial.Serial):
        return 0 if int(convert(connection.readline())) == 1.0 else 4
    
    
    @log_profile("write_data")
    def write_data():
        print("writing data")
        time.sleep(10e-3)
    
    
    def convert(data) -> str:
        return str(data).replace("b'", "").replace("'", "").replace("\\r\\n", "")
    
    
    @log_profile("get_data")
    def get_data(con1: serial.Serial, con2: serial.Serial):
        try:
            for connection in [con1, con2]:
                write(connection)
                offset(connection)
                read(connection)
    
        except (TypeError, ValueError):
            # may occur if no data was read over serial
            print("Didn't receive data from arduino")
    
    
    @log_profile("loop")
    def loop(con1: serial.Serial, con2: serial.Serial):
        last_write = time.time()
        delta_time = 30
        while time.time() - last_write < delta_time:
            get_data(con1, con2)
        write_data()
    
    
    def main() -> None:
        with serial.Serial("/dev/ttyACM0", 9600, timeout=3) as con1, serial.Serial("/dev/ttyACM1", 9600, timeout=3) as con2:
            for _ in range(100):
                loop(con1, con2)
    
    
    convert_log_to_trace("profiling.log", "profiling_trace.json")