Skip to content
Snippets Groups Projects
Select Git revision
  • 6227a46d38e60760fc0830401f20a1b2fdabd98a
  • master default protected
  • v3-modify-mail
  • snyk-fix-207483a1e839c807f95a55077e86527d
  • translations_3b5aa4f3c755059914cfa23d7d2edcde_ru
  • translations_6e4a5e377a3e50f17e6402264fdbfcc6_ru
  • translations_3b5aa4f3c755059914cfa23d7d2edcde_fa_IR
  • translations_en-yml--master_fa_IR
  • snyk-fix-7d634f2eb65555f41bf06d6af930e812
  • translations_en-yml--master_ar
  • translations_3b5aa4f3c755059914cfa23d7d2edcde_el
  • jfederico-patch-1
  • v2
  • v3
  • v1
  • release-3.1.0.2
  • release-3.1.0.1
  • release-3.1.0
  • release-2.14.8.4
  • release-3.0.9.1
  • release-3.0.9
  • release-3.0.8.1
  • release-2.14.8.3
  • release-3.0.8
  • release-3.0.7.1
  • release-2.14.8.2
  • release-3.0.7
  • release-3.0.6.1
  • release-3.0.6
  • release-3.0.5.4
  • release-3.0.5.3
  • release-2.14.8.1
  • release-3.0.5.2
  • release-3.0.5.1
  • release-3.0.5
35 results

external_controller.rb

Blame
  • carla_environment.py 11.71 KiB
    """ Carla.org environment"""
    import glob
    import os
    import random
    import sys
    import math
    import weakref
    
    import numpy as np
    import pygame
    import time
    
    from steering_wheel import ACTION_SPACE, Controller, IDLE
    
    # find carla module
    try:
        CARLA_PATH='/media/armin/Games/carla/PythonAPI/carla/dist/carla-*%d.%d-%s.egg'
        sys.path.append(glob.glob(CARLA_PATH % (
            sys.version_info.major,
            sys.version_info.minor,
            'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0])
    except IndexError:
        pass
    
    import carla
    
    
    class Camera:
        """ Add camera sensor to the carla world """
        sensor = None
        surface = None
        display = None
        width = None
        height = None
        surface = None
        camera_type = None
    
        def __init__(self, world, camera_type='rgb', width=1280, height=720):
            self.width = width
            self.height = height
            self.camera_type = camera_type
            self.display = pygame.display.set_mode(
                (width, height),
                pygame.HWSURFACE | pygame.DOUBLEBUF)
            self.display.fill((0,0,0))
            camera_blueprint = world.world.get_blueprint_library().find('sensor.camera.' + camera_type)
            camera_blueprint.set_attribute('image_size_x', f'{self.width}')
            camera_blueprint.set_attribute('image_size_y', f'{self.height}')
            camera_blueprint.set_attribute('fov', '95')
            camera_transform = carla.Transform(carla.Location(x=-7.0, z=5.2), carla.Rotation(pitch=-20.0, yaw=0, roll=0.0))
            self.sensor = world.world.spawn_actor(camera_blueprint, camera_transform, attach_to=world.player)
            self.sensor.listen(lambda data: self.process_img(data))
        
        def process_img(self, data):
            """ Callback for rgb-camera sensor. """
            if self.camera_type == 'semantic_segmentation':
                data.convert(carla.ColorConverter.CityScapesPalette)
            array = np.frombuffer(data.raw_data, dtype=np.dtype("uint8"))
            array = np.reshape(array, (self.height, self.width, 4))
            array = array[:, :, :3]
            array = array[:, :, ::-1]
            self.surface = pygame.surfarray.make_surface(array.swapaxes(0, 1))
        
        def on_update(self):
            """ Update the pygame display. """
            if self.surface is not None:
                self.display.blit(self.surface, (0, 0))
                pygame.display.flip()
    
    class CollisionSensor:
        sensor = None
        collision = None
    
        def __init__(self, world, parent):
            bp = world.get_blueprint_library().find('sensor.other.collision')
            self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=parent)
            weak_self = weakref.ref(self)
            self.sensor.listen(lambda event: CollisionSensor._on_collision(weak_self, event))
            
        def _on_collision(weak_self, event):
            self = weak_self()
            if not self:
                return
            # print(event.other_actor)
            self.collision = event
            impulse = event.normal_impulse
            intensity = math.sqrt(impulse.x**2 + impulse.y**2 + impulse.z**2)
            # print(intensity)
    
    class ObstacleSensor:
        sensor = None
        parent = None
        ZONES = 4
        obstacle = []
    
        def __init__(self, world, parent):
            self.parent = parent
            bp = world.get_blueprint_library().find('sensor.other.obstacle')
            bp.set_attribute('distance', '10')
            bp.set_attribute('hit_radius', '3')
            print(bp.get_attribute('hit_radius'))
            self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=parent, attachment_type=carla.AttachmentType.Rigid)
            weak_self = weakref.ref(self)
            self.sensor.listen(lambda event: ObstacleSensor._on_event(weak_self, event))
            
        def _on_event(weak_self, event):
            self = weak_self()
            if not self:
                return
            self.collision = event
            if not event.other_actor.type_id == 'static.road':
                print(self.parent.get_location())
                print(self.parent.get_velocity())
                print(event.other_actor.type_id)
                print(event.other_actor.transform)
                print(event.distance)
    
    
    class LidarSensor:
        sensor = None
        data = None
    
        def __init__(self, world, parent):
            bp = world.get_blueprint_library().find('sensor.lidar.ray_cast')
            bp.set_attribute('rotation_frequency', '10')
            bp.set_attribute('channels', '1')
            bp.set_attribute('points_per_second', '100')
            bp.set_attribute('range', '7')
            bp.set_attribute('sensor_tick', '0.1')
            bp.set_attribute('upper_fov', '0')
            bp.set_attribute('lower_fov', '0')
            weak_self = weakref.ref(self)
            self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=parent)
            self.sensor.listen(lambda event: LidarSensor._on_data(weak_self, event))
        
        def _on_data(weak_self, event):
            self = weak_self()
            if not self:
                return
            for m in event: print(m.point)
            if len(event) > 0:
                print(event.horizontal_angle)
    
    class World:
        """ Wrapper for the carla environment, incl. player/vehicle """
        player = None
        collision_sensor = None
        obstacle_sensor = None
        lidar_sensor = None
        world = None
        blueprint_library = None
        spawn_points = None
        actors = []
        was_big_reward = False
    
        def __init__(self, world):
            self.world = world
            settings = self.world.get_settings()
            settings.fixed_delta_seconds = 0.1
            settings.synchronous_mode = True
            self.world.apply_settings(settings)
            self.blueprint_library = self.world.get_blueprint_library()
            self.spawn_points = self.world.get_map().get_spawn_points()
            self.reset()
        
        def reset(self):
            """ Remove and create new player/vehicle. """
            self.destroy()
            time.sleep(0.5)
            self.spawn_on_sidewalk()
    
        def observation(self):
            if self.player is not None:
                pos = self.player.get_transform()
                vel = self.player.get_velocity()
                return [pos.location.x, pos.location.y, vel.x, vel.y, pos.rotation.yaw]
            else:
                return [0,0,0]
    
        def spawn_player(self, transform):
            """ Add a vehicle to the world. """
            while self.player is None:
                blueprint = random.choice(self.blueprint_library.filter('model3'))
                position = transform
                self.player = self.world.try_spawn_actor(blueprint, position)
                start_location = self.player.get_location()
                print(str(start_location))
                self.collision_sensor = CollisionSensor(self.world, self.player)
                self.obstacle_sensor = ObstacleSensor(self.world, self.player)
        
        def spawn_on_sidewalk(self):
                position = carla.Transform(carla.Location(x=-13, y=57.5, z=0.2), carla.Rotation(pitch=0.0, yaw=-180, roll=0.0))
                self.spawn_player(position)
                self.spawn_actors(self.sidewalk_parking_cars_transforms())
    
    
        def sidewalk_parking_cars_transforms(self):
            transforms = []
            transforms.append(carla.Transform(carla.Location(x=-19.2, y=54.5, z=0.2), carla.Rotation(pitch=0.0, yaw=-180, roll=0.0)))
            transforms.append(carla.Transform(carla.Location(x=-14.3, y=54.7, z=0.2), carla.Rotation(pitch=0.0, yaw=-180, roll=0.0)))
            transforms.append(carla.Transform(carla.Location(x=-37.3, y=53.7, z=0.2), carla.Rotation(pitch=0.0, yaw=-180, roll=0.0)))
            return transforms
    
        def spawn_actors(self, transforms):
            blueprint = random.choice(self.blueprint_library.filter('etron'))
            mass = carla.VehiclePhysicsControl(mass=900000.0)
            for transform in transforms:
                actor = self.world.try_spawn_actor(blueprint, transform)
                actor.apply_physics_control(mass)
                if actor is not None:
                    self.actors.append(actor)
    
        def destroy(self):
            """ Remove vehicle from the world. """
            if self.player is not None:
                self.player.destroy()
                self.player = None
            if self.actors is not None:
                for actor in self.actors:
                    actor.destroy()
                self.actors = []
            if self.collision_sensor is not None:
                self.collision_sensor.sensor.destroy()
                self.collision_sensor = None
            if self.obstacle_sensor is not None:
                self.obstacle_sensor.sensor.destroy()
                self.obstacle_sensor = None
    
    
        def step(self, action):
            """ Apply controls to vehicle. """
            controls = Controller.action_to_controls(action)
            c = carla.VehicleControl(throttle=controls[0], steer=controls[1], brake=controls[2], reverse=controls[3])
            self.player.apply_control(c)
            self.world.tick()
            reward = self.reward(action)
            self.collision_sensor.collision = None
            return reward
    
        def reward(self, action):
            target = carla.Transform(carla.Location(x=-25.534311, y=54.460903, z=0.112781), carla.Rotation(pitch=0.000000, yaw=-175.922913, roll=-6.221135))
            pos = self.player.get_transform()
            pos_diff = math.sqrt((target.location.x - pos.location.x)**2 + (target.location.y - pos.location.y)**2)
            rot_diff = (target.rotation.yaw - pos.rotation.yaw)
            velocity = self.player.get_velocity()
            v = math.sqrt((velocity.x)**2 + (velocity.y)**2)
            
            r = ((-pos_diff / 10) + 1) / 5 
            
            if v < 0.01:
                r -= 0.5
            if v > 15.0:
                r -= 10
            
            at_final_position = False
            if pos_diff < 1.8:
                if self.was_big_reward == False:
                    r += 50
                    self.was_big_reward = True
                if abs(rot_diff) < 5 and v < 0.01:
                    at_final_position = True
            else:
                if self.was_big_reward == True:
                    self.was_big_reward = False
                    r -= 50
            
            done = False
            if at_final_position :
                print("Success")
                r += 100
                done = True
            if self.collision_sensor.collision is not None:
                r -= 250
                done = True
            if action == IDLE:
                r -= 0.3
            return self.observation(), r, done, None
    
    class ActionSpace:
        n = ACTION_SPACE
    
    class ObservationSpace:
        shape = [5]
        
    class CarlaEnvironment:
        action_space = ActionSpace
        observation_space = ObservationSpace
        world = None
        client = None
        camera = None
        allow_render = False
    
        def __init__(self, host="127.0.0.1", port=2000, render=False):
            pygame.init()
            self.client = carla.Client(host, port)
            self.client.set_timeout(5.0)
            time.sleep(1.0)
            self.client.load_world('Town07')
            time.sleep(1.0)
            self.world = World(self.client.get_world())
            if render:
                self.allow_render = True
                self.camera = Camera(self.world, camera_type='semantic_segmentation')
                # self.camera = Camera(self.world, camera_type='rgb')
        
        def reset(self):
            self.world.reset()
            return self.world.observation()
    
        def render(self):
            if  self.allow_render:
                self.camera.on_update()
    
        def step(self, action, render=False):
            if render:
                self.render()
            return self.world.step(action)
        
        def __del__(self):
            if self.camera is not None:
                self.camera.sensor.destroy()
            if self.world is not None:
                self.world.destroy()
    
    if __name__ == "__main__":
        LOCALHOST="127.0.0.1"
        SERVER="192.168.188.20"
        env = CarlaEnvironment(host=LOCALHOST, render=True)
        clock = pygame.time.Clock()
        ctrl = Controller()
        cumulated_reward = 0
        while ctrl.is_running():
            clock.tick(5)
            ctrl.on_update()
            obs, reward, done, _ = env.step(ctrl.get_action(), render=True)
            print(str(reward) + ' ' + str(done))
            cumulated_reward += reward
            if done:
                break
        env.world.destroy()
        pygame.quit()
        print(cumulated_reward)