""" Carla.org environment""" import glob import os import random import sys import math import weakref import numpy as np import pygame import time from steering_wheel import ACTION_SPACE, Controller, IDLE try: CARLA_PATH='/media/armin/Games/carla/PythonAPI/carla/dist/carla-*%d.%d-%s.egg' CARLA_PATH='/opt/carla-simulator/PythonAPI/carla/dist/carla-*%d.%d-%s.egg' sys.path.append(glob.glob(CARLA_PATH % ( sys.version_info.major, sys.version_info.minor, 'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0]) except IndexError: pass try: import carla except: print('Could not import carla, you will not be able to crate a client!') print('Contiuing...') class Camera: """ Add camera sensor to the carla world """ sensor = None surface = None display = None width = None height = None surface = None camera_type = None def __init__(self, world, camera_type='rgb', width=1280, height=720): self.width = width self.height = height self.camera_type = camera_type self.display = pygame.display.set_mode( (width, height), pygame.HWSURFACE | pygame.DOUBLEBUF) self.display.fill((0,0,0)) camera_blueprint = world.world.get_blueprint_library().find('sensor.camera.' + camera_type) camera_blueprint.set_attribute('image_size_x', f'{self.width}') camera_blueprint.set_attribute('image_size_y', f'{self.height}') camera_blueprint.set_attribute('fov', '95') camera_transform = carla.Transform(carla.Location(x=-7.0, z=5.2), carla.Rotation(pitch=-20.0, yaw=0, roll=0.0)) self.sensor = world.world.spawn_actor(camera_blueprint, camera_transform, attach_to=world.player) self.sensor.listen(lambda data: self.process_img(data)) def process_img(self, data): """ Callback for rgb-camera sensor. """ if self.camera_type == 'semantic_segmentation': data.convert(carla.ColorConverter.CityScapesPalette) array = np.frombuffer(data.raw_data, dtype=np.dtype("uint8")) array = np.reshape(array, (self.height, self.width, 4)) array = array[:, :, :3] array = array[:, :, ::-1] self.surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) def on_update(self): """ Update the pygame display. """ if self.surface is not None: self.display.blit(self.surface, (0, 0)) pygame.display.flip() class CollisionSensor: sensor = None collision = None intensity = 0 def __init__(self, world, parent): bp = world.get_blueprint_library().find('sensor.other.collision') self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=parent) weak_self = weakref.ref(self) self.sensor.listen(lambda event: CollisionSensor._on_collision(weak_self, event)) def _on_collision(weak_self, event): self = weak_self() if not self: return self.collision = event impulse = event.normal_impulse self.intensity = math.sqrt(impulse.x**2 + impulse.y**2 + impulse.z**2) class ObstacleSensor: sensor = None parent = None obstacle = [] def __init__(self, world, parent): self.parent = parent bp = world.get_blueprint_library().find('sensor.other.obstacle') bp.set_attribute('distance', '10') bp.set_attribute('hit_radius', '5') self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=parent, attachment_type=carla.AttachmentType.Rigid) weak_self = weakref.ref(self) self.sensor.listen(lambda event: ObstacleSensor._on_event(weak_self, event)) def _on_event(weak_self, event): self = weak_self() if not self: return self.collision = event actor_id = event.other_actor.type_id if not actor_id == 'static.road' and not actor_id == 'static.roadline' and not actor_id == 'static.sidewalk' and False: print(self.parent.get_transform()) print(event.transform) print(self.parent.get_velocity()) print(event.other_actor.type_id) print(str(event.distance)) class LidarSensor: sensor = None data = None def __init__(self, world, parent): bp = world.get_blueprint_library().find('sensor.lidar.ray_cast') bp.set_attribute('rotation_frequency', '10') bp.set_attribute('channels', '1') bp.set_attribute('points_per_second', '100') bp.set_attribute('range', '7') bp.set_attribute('sensor_tick', '0.1') bp.set_attribute('upper_fov', '0') bp.set_attribute('lower_fov', '0') weak_self = weakref.ref(self) self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=parent) self.sensor.listen(lambda event: LidarSensor._on_data(weak_self, event)) def _on_data(weak_self, event): self = weak_self() if not self: return for m in event: if False: print(m.point) if len(event) > 0: if False: print(event.horizontal_angle) class World: player = None collision_sensor = None obstacle_sensor = None lidar_sensor = None world = None blueprint_library = None spawn_points = None actors = [] was_big_reward = False def __init__(self, world): self.world = world settings = self.world.get_settings() settings.fixed_delta_seconds = 0.1 settings.synchronous_mode = True self.world.apply_settings(settings) self.blueprint_library = self.world.get_blueprint_library() self.spawn_points = self.world.get_map().get_spawn_points() self.reset() def reset(self): self.destroy() time.sleep(0.5) self.spawn_on_sidewalk() def observation(self): if self.player is not None: pos = self.player.get_transform() vel = self.player.get_velocity() return [pos.location.x, pos.location.y, vel.x, vel.y, (pos.rotation.yaw/180)] else: return [0,0,0,0,0] def spawn_player(self, transform): while self.player is None: blueprint = random.choice(self.blueprint_library.filter('model3')) position = transform self.player = self.world.try_spawn_actor(blueprint, position) self.collision_sensor = CollisionSensor(self.world, self.player) self.obstacle_sensor = ObstacleSensor(self.world, self.player) def spawn_on_sidewalk(self, with_obstacles=False): position = carla.Transform(carla.Location(x=-13, y=58.5, z=0.2), carla.Rotation(pitch=0.0, yaw=-190, roll=0.0)) self.spawn_player(position) if with_obstacles: self.spawn_actors(self.sidewalk_parking_cars_transforms()) def sidewalk_parking_cars_transforms(self): transforms = [] transforms.append(carla.Transform(carla.Location(x=-19.2, y=54.5, z=0.2), carla.Rotation(pitch=0.0, yaw=-180, roll=0.0))) transforms.append(carla.Transform(carla.Location(x=-14.3, y=54.7, z=0.2), carla.Rotation(pitch=0.0, yaw=-180, roll=0.0))) transforms.append(carla.Transform(carla.Location(x=-37.3, y=53.7, z=0.2), carla.Rotation(pitch=0.0, yaw=-180, roll=0.0))) return transforms def spawn_actors(self, transforms): blueprint = random.choice(self.blueprint_library.filter('etron')) mass = carla.VehiclePhysicsControl(mass=900000.0) for transform in transforms: actor = self.world.try_spawn_actor(blueprint, transform) actor.apply_physics_control(mass) if actor is not None: self.actors.append(actor) def destroy(self): if self.player is not None: self.player.destroy() self.player = None if self.actors is not None: for actor in self.actors: actor.destroy() self.actors = [] if self.collision_sensor is not None: self.collision_sensor.sensor.destroy() self.collision_sensor = None if self.obstacle_sensor is not None: self.obstacle_sensor.sensor.destroy() self.obstacle_sensor = None def step(self, action): controls = Controller.action_to_controls(action) c = carla.VehicleControl(throttle=controls[0], steer=controls[1], brake=controls[2], reverse=controls[3]) self.player.apply_control(c) self.world.tick() reward = self.reward() self.collision_sensor.collision = None return reward def reward(self): x, y, vx, vy, yaw = self.observation() target = carla.Transform( \ carla.Location(x=-25.534311, y=54.460903, z=0.112781), \ carla.Rotation(pitch=0.000000, yaw=-175.922913, roll=-6.221135)) v = math.sqrt((vx)**2 + (vy)**2) pos_diff = math.sqrt((target.location.x - x)**2 + (target.location.y - y)**2) target_yaw = (-175.922913) / 180 yaw_dif = math.sqrt((target_yaw - yaw)**2) r = -0.1 r += -0.01 * pos_diff LEARN_FINAL_ORIENTATION = True if LEARN_FINAL_ORIENTATION: r += -0.01 * yaw_dif INSENTIVE_MOVING = True if v < 0.01 and INSENTIVE_MOVING: r -= 0.02 done = False if pos_diff < 1.1 and v < 0.01: done = True r += 150 if abs(yaw_dif) < 2: r+= 50 if self.collision_sensor.collision is not None: r -= 100 done = True if pos_diff > 15: done = True r -= 100 return self.observation(), r, done, None class ActionSpace: n = ACTION_SPACE class ObservationSpace: shape = [5] class CarlaEnvironment: action_space = ActionSpace observation_space = ObservationSpace world = None client = None camera = None allow_render = False manual = False def __init__(self, host="127.0.0.1", port=2000, render=False, manual=False): pygame.init() self.client = carla.Client(host, port) self.client.set_timeout(3.0) self.client.load_world('Town07') time.sleep(1.0) self.world = World(self.client.get_world()) if render: self.allow_render = True self.camera = Camera(self.world, camera_type='semantic_segmentation') # self.camera = Camera(self.world, camera_type='rgb') if manual: self.manual = False self.cntrl = Controller() def reset(self): self.world.reset() return self.world.observation() def render(self): if self.allow_render: self.camera.on_update() def step(self, action, render=False): if render: self.render() if self.manual: action = self.cntrl.get_action() return self.world.step(action) def __del__(self): if self.camera is not None: self.camera.sensor.destroy() if self.world is not None: self.world.destroy() if __name__ == "__main__": LOCALHOST="127.0.0.1" SERVER="192.168.188.20" env = CarlaEnvironment(host=LOCALHOST, render=True) clock = pygame.time.Clock() ctrl = Controller() cumulated_reward = 0 while ctrl.is_running(): clock.tick(5) ctrl.on_update() obs, reward, done, _ = env.step(ctrl.get_action(), render=True) cumulated_reward += reward if done: break env.world.destroy() pygame.quit() print(cumulated_reward)