Skip to content
Snippets Groups Projects
Select Git revision
  • 0ba264323975d1dc4dd71499afc9d06ee4ce3d5e
  • master default
2 results

pgslides.sty

Blame
  • Forked from Peter Gerwinski / hp
    Source project has a limited visibility.
    carla_environment.py 11.97 KiB
    """ Carla.org environment"""
    import glob
    import os
    import random
    import sys
    import math
    import weakref
    
    import numpy as np
    import pygame
    import time
    
    from steering_wheel import ACTION_SPACE, Controller, IDLE
    
    # find carla module
    try:
        CARLA_PATH='/media/armin/Games/carla/PythonAPI/carla/dist/carla-*%d.%d-%s.egg'
        CARLA_PATH='/opt/carla-simulator/PythonAPI/carla/dist/carla-*%d.%d-%s.egg'
        sys.path.append(glob.glob(CARLA_PATH % (
            sys.version_info.major,
            sys.version_info.minor,
            'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0])
    except IndexError:
        pass
    
    import carla
    
    
    class Camera:
        """ Add camera sensor to the carla world """
        sensor = None
        surface = None
        display = None
        width = None
        height = None
        surface = None
        camera_type = None
    
        def __init__(self, world, camera_type='rgb', width=1280, height=720):
            self.width = width
            self.height = height
            self.camera_type = camera_type
            self.display = pygame.display.set_mode(
                (width, height),
                pygame.HWSURFACE | pygame.DOUBLEBUF)
            self.display.fill((0,0,0))
            camera_blueprint = world.world.get_blueprint_library().find('sensor.camera.' + camera_type)
            camera_blueprint.set_attribute('image_size_x', f'{self.width}')
            camera_blueprint.set_attribute('image_size_y', f'{self.height}')
            camera_blueprint.set_attribute('fov', '95')
            camera_transform = carla.Transform(carla.Location(x=-7.0, z=5.2), carla.Rotation(pitch=-20.0, yaw=0, roll=0.0))
            self.sensor = world.world.spawn_actor(camera_blueprint, camera_transform, attach_to=world.player)
            self.sensor.listen(lambda data: self.process_img(data))
        
        def process_img(self, data):
            """ Callback for rgb-camera sensor. """
            if self.camera_type == 'semantic_segmentation':
                data.convert(carla.ColorConverter.CityScapesPalette)
            array = np.frombuffer(data.raw_data, dtype=np.dtype("uint8"))
            array = np.reshape(array, (self.height, self.width, 4))
            array = array[:, :, :3]
            array = array[:, :, ::-1]
            self.surface = pygame.surfarray.make_surface(array.swapaxes(0, 1))
        
        def on_update(self):
            """ Update the pygame display. """
            if self.surface is not None:
                self.display.blit(self.surface, (0, 0))
                pygame.display.flip()
    
    class CollisionSensor:
        sensor = None
        collision = None
    
        def __init__(self, world, parent):
            bp = world.get_blueprint_library().find('sensor.other.collision')
            self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=parent)
            weak_self = weakref.ref(self)
            self.sensor.listen(lambda event: CollisionSensor._on_collision(weak_self, event))
            
        def _on_collision(weak_self, event):
            self = weak_self()
            if not self:
                return
            # print(event.other_actor)
            self.collision = event
            impulse = event.normal_impulse
            intensity = math.sqrt(impulse.x**2 + impulse.y**2 + impulse.z**2)
            # print(intensity)
    
    class ObstacleSensor:
        sensor = None
        parent = None
        ZONES = 4
        obstacle = []
    
        def __init__(self, world, parent):
            self.parent = parent
            bp = world.get_blueprint_library().find('sensor.other.obstacle')
            bp.set_attribute('distance', '10')
            bp.set_attribute('hit_radius', '5')
            # print(bp.get_attribute('hit_radius'))
            position = carla.Transform(carla.Location(x=-2, y=0, z=0.0), carla.Rotation(pitch=0.0, yaw=0, roll=0.0))
            self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=parent, attachment_type=carla.AttachmentType.Rigid)
            weak_self = weakref.ref(self)
            self.sensor.listen(lambda event: ObstacleSensor._on_event(weak_self, event))
            
        def _on_event(weak_self, event):
            self = weak_self()
            if not self:
                return
            self.collision = event
            if not event.other_actor.type_id == 'static.road' and not event.other_actor.type_id == 'static.roadline' and not event.other_actor.type_id == 'static.sidewalk' and False:
                print(self.parent.get_transform())
                print(event.transform)
                print(self.parent.get_velocity())
                print(event.other_actor.type_id)
                print(str(event.distance))
    
    
    class LidarSensor:
        sensor = None
        data = None
    
        def __init__(self, world, parent):
            bp = world.get_blueprint_library().find('sensor.lidar.ray_cast')
            bp.set_attribute('rotation_frequency', '10')
            bp.set_attribute('channels', '1')
            bp.set_attribute('points_per_second', '100')
            bp.set_attribute('range', '7')
            bp.set_attribute('sensor_tick', '0.1')
            bp.set_attribute('upper_fov', '0')
            bp.set_attribute('lower_fov', '0')
            weak_self = weakref.ref(self)
            self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=parent)
            self.sensor.listen(lambda event: LidarSensor._on_data(weak_self, event))
        
        def _on_data(weak_self, event):
            self = weak_self()
            if not self:
                return
            for m in event: print(m.point)
            if len(event) > 0:
                print(event.horizontal_angle)
    
    class World:
        """ Wrapper for the carla environment, incl. player/vehicle """
        player = None
        collision_sensor = None
        obstacle_sensor = None
        lidar_sensor = None
        world = None
        blueprint_library = None
        spawn_points = None
        actors = []
        was_big_reward = False
    
        def __init__(self, world):
            self.world = world
            settings = self.world.get_settings()
            settings.fixed_delta_seconds = 0.1
            settings.synchronous_mode = True
            self.world.apply_settings(settings)
            self.blueprint_library = self.world.get_blueprint_library()
            self.spawn_points = self.world.get_map().get_spawn_points()
            self.reset()
        
        def reset(self):
            """ Remove and create new player/vehicle. """
            self.destroy()
            time.sleep(0.5)
            self.spawn_on_sidewalk()
    
        def observation(self):
            if self.player is not None:
                pos = self.player.get_transform()
                vel = self.player.get_velocity()
                return [pos.location.x, pos.location.y, vel.x, vel.y, (pos.rotation.yaw/180)]
            else:
                return [0,0,0,0,0]
    
        def spawn_player(self, transform):
            """ Add a vehicle to the world. """
            while self.player is None:
                blueprint = random.choice(self.blueprint_library.filter('model3'))
                position = transform
                self.player = self.world.try_spawn_actor(blueprint, position)
                start_location = self.player.get_location()
                self.collision_sensor = CollisionSensor(self.world, self.player)
                self.obstacle_sensor = ObstacleSensor(self.world, self.player)
        
        def spawn_on_sidewalk(self, with_obstacles=False):
                position = carla.Transform(carla.Location(x=-13, y=58.5, z=0.2), carla.Rotation(pitch=0.0, yaw=-190, roll=0.0))
                self.spawn_player(position)
                if with_obstacles:
                    self.spawn_actors(self.sidewalk_parking_cars_transforms())
    
    
        def sidewalk_parking_cars_transforms(self):
            transforms = []
            transforms.append(carla.Transform(carla.Location(x=-19.2, y=54.5, z=0.2), carla.Rotation(pitch=0.0, yaw=-180, roll=0.0)))
            transforms.append(carla.Transform(carla.Location(x=-14.3, y=54.7, z=0.2), carla.Rotation(pitch=0.0, yaw=-180, roll=0.0)))
            transforms.append(carla.Transform(carla.Location(x=-37.3, y=53.7, z=0.2), carla.Rotation(pitch=0.0, yaw=-180, roll=0.0)))
            return transforms
    
        def spawn_actors(self, transforms):
            blueprint = random.choice(self.blueprint_library.filter('etron'))
            mass = carla.VehiclePhysicsControl(mass=900000.0)
            for transform in transforms:
                actor = self.world.try_spawn_actor(blueprint, transform)
                actor.apply_physics_control(mass)
                if actor is not None:
                    self.actors.append(actor)
    
        def destroy(self):
            """ Remove vehicle from the world. """
            if self.player is not None:
                self.player.destroy()
                self.player = None
            if self.actors is not None:
                for actor in self.actors:
                    actor.destroy()
                self.actors = []
            if self.collision_sensor is not None:
                self.collision_sensor.sensor.destroy()
                self.collision_sensor = None
            if self.obstacle_sensor is not None:
                self.obstacle_sensor.sensor.destroy()
                self.obstacle_sensor = None
    
    
        def step(self, action):
            """ Apply controls to vehicle. """
            controls = Controller.action_to_controls(action)
            c = carla.VehicleControl(throttle=controls[0], steer=controls[1], brake=controls[2], reverse=controls[3])
            self.player.apply_control(c)
            self.world.tick()
            reward = self.reward(action)
            self.collision_sensor.collision = None
            return reward
    
        def reward(self, action):
            x, y, vx, vy, yaw = self.observation()
            
            target = carla.Transform( \
                        carla.Location(x=-25.534311, y=54.460903, z=0.112781), \
                        carla.Rotation(pitch=0.000000, yaw=-175.922913, roll=-6.221135))
            
            pos_diff = math.sqrt((target.location.x - x)**2 + (target.location.y - y)**2)
    
            v = math.sqrt((vx)**2 + (vy)**2)
            
            r = -0.1
            r += -0.01 * pos_diff
            
            LEARN_FINAL_ORIENTATION = False
            if LEARN_FINAL_ORIENTATION:
                target_yaw = (-175.922913) / 180
                yaw_dif = math.sqrt((target_yaw**2) - (yaw**2))
                r += -0.01 * yaw_dif
    
            INSENTIVE_MOVING = False
            if v < 0.01 and INSENTIVE_MOVING:
                r -= 0.02
    
            done = False
            if pos_diff < 1.5:
                done = True
                r += 100
            
            if self.collision_sensor.collision is not None:
                r -= 100
                done = True
            
            if pos_diff > 15:
                done = True
                r -= 100
            return self.observation(), r, done, None
    
    class ActionSpace:
        n = ACTION_SPACE
    
    class ObservationSpace:
        shape = [5]
        
    class CarlaEnvironment:
        action_space = ActionSpace
        observation_space = ObservationSpace
        world = None
        client = None
        camera = None
        allow_render = False
        manual = False
    
        def __init__(self, host="127.0.0.1", port=2000, render=False, manual=False):
            pygame.init()
            self.client = carla.Client(host, port)
            self.client.set_timeout(3.0)
            self.client.load_world('Town07')
            time.sleep(1.0)
            self.world = World(self.client.get_world())
            if render:
                self.allow_render = True
                self.camera = Camera(self.world, camera_type='semantic_segmentation')
                # self.camera = Camera(self.world, camera_type='rgb')
            if manual:
                self.manual = False
                self.cntrl = Controller()
        
        def reset(self):
            self.world.reset()
            return self.world.observation()
    
        def render(self):
            if  self.allow_render:
                self.camera.on_update()
    
        def step(self, action, render=False):
            if render:
                self.render()
            if self.manual:
                action = self.cntrl.get_action()
            return self.world.step(action)
        
        def __del__(self):
            if self.camera is not None:
                self.camera.sensor.destroy()
            if self.world is not None:
                self.world.destroy()
    
    if __name__ == "__main__":
        LOCALHOST="127.0.0.1"
        SERVER="192.168.188.20"
        env = CarlaEnvironment(host=LOCALHOST, render=True)
        clock = pygame.time.Clock()
        ctrl = Controller()
        cumulated_reward = 0
        while ctrl.is_running():
            clock.tick(5)
            ctrl.on_update()
            obs, reward, done, _ = env.step(ctrl.get_action(), render=True)
            # print(str(reward) + ' ' + str(done))
            cumulated_reward += reward
            if done:
                break
        env.world.destroy()
        pygame.quit()
        print(cumulated_reward)