Skip to content
Snippets Groups Projects
Commit ce80ee20 authored by Lennard's avatar Lennard
Browse files

Add measure.py

parent 2c28e091
Branches
No related tags found
No related merge requests found
......@@ -3,3 +3,5 @@ out/*
CHANGELOG.md
data/data
logs/log
profiling.log
profiling_trace.json
\ No newline at end of file
#
# Example of how to profile a Python app with multiple processes
# by logging events and opening the resulting trace file in chrome://tracing.
#
# pip install multiprocessing_logging
from functools import wraps
import json
import logging
from multiprocessing import Pool
import os
import random
import time
import threading
import serial
import datetime
from multiprocessing_logging import install_mp_handler
# we want to be able to log from multiple processes
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger()
install_mp_handler()
# separate logger that only stores events into a file
prof_logger = logging.getLogger("profiling")
# not not propagate to root logger, we want to store this separately
prof_logger.propagate = False
handler = logging.FileHandler("profiling.log", "w+")
handler.setFormatter(logging.Formatter("%(message)s"))
prof_logger.addHandler(handler)
install_mp_handler(prof_logger)
def log_profile(category: str = None):
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
# format compatible with chrome://tracing
# info: https://www.gamasutra.com/view/news/176420/Indepth_Using_Chrometracing_to_view_your_inline_profiling_data.php
base_info = {
"name": f.__name__,
"pid": os.getpid(),
"tid": threading.current_thread().ident,
"cat": category,
}
def log_event(**kwargs):
prof_logger.debug(json.dumps(kwargs))
def time_usec():
return int(round(1e6 * time.time()))
start_time = time_usec()
log_event(ph="B", ts=start_time, **base_info)
result = f(*args, **kwargs)
end_time = time_usec()
duration = end_time - start_time
# TODO: duration could possibly be computed afterwards (if we can pair the events correctly)
log_event(ph="E", ts=end_time, duration=duration, **base_info)
return result
return wrapper
return decorator
def convert_log_to_trace(log_file, trace_file):
with open(trace_file, "wt") as output, open(log_file, "rt") as input:
events = [json.loads(line) for line in input]
json.dump({"traceEvents": events}, output)
@log_profile("compute")
def read(connection: serial.Serial):
for _ in range(4):
recv1 = connection.readline()
float(convert(recv1))
@log_profile("write")
def write(connection: serial.Serial):
connection.write(1)
@log_profile("offset")
def offset(connection: serial.Serial):
return 0 if int(convert(connection.readline())) == 1.0 else 4
@log_profile("write_data")
def write_data():
print("writing data")
time.sleep(10e-3)
def convert(data) -> str:
return str(data).replace("b'", "").replace("'", "").replace("\\r\\n", "")
@log_profile("get_data")
def get_data(con1: serial.Serial, con2: serial.Serial):
try:
for connection in [con1, con2]:
write(connection)
offset(connection)
read(connection)
except (TypeError, ValueError):
# may occur if no data was read over serial
print("Didn't receive data from arduino")
@log_profile("loop")
def loop(con1: serial.Serial, con2: serial.Serial):
last_write = time.time()
delta_time = 30
while time.time() - last_write < delta_time:
get_data(con1, con2)
write_data()
def main() -> None:
with serial.Serial("/dev/ttyACM0", 9600, timeout=3) as con1, serial.Serial("/dev/ttyACM1", 9600, timeout=3) as con2:
for _ in range(100):
loop(con1, con2)
convert_log_to_trace("profiling.log", "profiling_trace.json")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment