Skip to content
Snippets Groups Projects
Select Git revision
  • 2d552d0103ca68beb2993ef8e85c63a572b36fe0
  • main default protected
2 results

race_managment.py

Blame
  • race_managment.py 5.44 KiB
    # -*- coding: utf-8 -*-
    """
    Created on Tue May  2 15:01:44 2023
    
    @author: Jessica Dreyer, Sebastian Böttger
    """
    #%% imports
    import socket
    from datetime import datetime
    from localization import Localizer
    from driving import Driving_Analyser
    from grafic import Line_Plotter, Line_Plotter_Time, Track_Plotter, Dashboard
    
    #%% constant values
    HOST = ''                   # Symbolic name meaning all available interfaces
    PORT = 50007                # Arbitrary non-privileged port
    CHAR_RECIVING = 31          # Number of chars witch one message contains
    THRESHOLD_LEFT = 0.05       # Value to detect a left turn of the robot
    THRESHOLD_RIGHT = -0.05     # Value to detect a right turn of the robot
    BLOCK_SIZE_DA = 10          # Number of iterations to analyse the sektion type
    S = 0 # Value for a straight part of the track
    R = 1 # Value for a right turn
    L = 2 # Value for a left turn
    
    #%% Functions
    def main_loop():
        print("TO EXIT THE PROGRAMM, CLOSE THE DASHBOARDWINDOW!!!")
        try:
            # create objektes
            driving_analyser = Driving_Analyser(THRESHOLD_LEFT, THRESHOLD_RIGHT, BLOCK_SIZE_DA)
            localizer = Localizer(Localizer.get_set_of_section_3(), False)
            
            # line plotter with time
            lp_v = Line_Plotter_Time("Linear speed", "s", "mm/s", 1000)
            lp_w = Line_Plotter_Time("Rotation speed", "s", "rad/s", 1000)
            
            # trackplotter
            background, track = Track_Plotter.get_set_of_trackparts_3()
            tp = Track_Plotter(background, track)
            
            # dashboard
            dashboard = Dashboard()
            dashboard.update_window()
            
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.bind((HOST, PORT))
                s.listen(1)
                conn, addr = s.accept()
                with conn:
                    print('Connected by', addr)
                    while True:
                        dashboard.update_window()
                        # reciving data
                        data = conn.recv(CHAR_RECIVING)
                        
                        # check if data countions something
                        if len(data) > 0:
                            
                            # pars data to v w and t
                            v,w,t = pars_data(data)
                            
                            # update odometrie
                            driving_analyser.calculate_odometrie_continues(w, v, t)
                            driving_analyser.calculate_driven_distance_on_section(v, t)
                            
                            # update line plotter
                            lp_v.update(v,t)
                            lp_w.update(w,t)
                            
                            # get section type
                            section_type,_,_,_ = driving_analyser.detect_section_type(w)
                            
                            # if section type is None driving_analyser still count to decide which type it is
                            if(section_type is not None):
                                #print(get_section_type_formated(section_type))
                                # update localizer position
                                localizer.update_position(section_type)
                                update_dashboard(dashboard, localizer, driving_analyser)
                                
                                if(localizer.enter_new_section):
                                    driving_analyser.reset_driven_distance_on_section()
                                    section = localizer.localize()
                                    #print(section)
                                    if len(section) == 1:
                                        print(section.iloc[0].s_name)
                                        tp.select_trackpart(section.iloc[0].s_name)
    
        except Exception as e:
            print("An error occured")
            print(e)
        finally:
            try:
                dashboard.window.destroy()
            except Exception:
                pass 
            lp_w.close()
            lp_v.close()
            tp.close()
            
    
    def pars_data(data):
        data = data.decode("utf-8")
        w = float(data[3:10])
        if data[2] == "-":
            w *= -1
        v = float(data[12:21])
        t = float(data[23:])
        return (v,w,t)
    
    def get_section_type_formated(section_type_int):
        section_type_string = ""
        if section_type_int == S:
            section_type_string = "straight"
        elif section_type_int == R:
            section_type_string = "right"
        elif section_type_int == L:
            section_type_string = "left"
        return section_type_string
    
    def get_last_sections_type_formated(last_sections_type_int):
        last_sections_type = ""
        for section_type in last_sections_type_int:
            last_sections_type += get_section_type_formated(section_type) + ', '
        return last_sections_type
    
    def update_dashboard(dashboard, localizer, driving_analyser):
        dashboard.set_current_section_type(get_section_type_formated(localizer.current_section_type))
        dashboard.set_last_sections_type(get_last_sections_type_formated(localizer.last_sections_type))
        dashboard.set_enter_new_section("Yes" if localizer.enter_new_section else "No")
        dashboard.set_last_known_section("None" if localizer.last_known_section is None else localizer.last_known_section.s_name)
        dashboard.set_count_fail_loc(localizer.count_fail_loc)
        dashboard.set_x_position(driving_analyser.x_old)
        dashboard.set_y_position(driving_analyser.y_old)
        dashboard.set_driven_distance_on_section(driving_analyser.driven_distance_on_section)
        #dashboard.update_window()
    
    #%% main script for race controle
    if __name__ == '__main__':
        main_loop()