Select Git revision
external_controller.rb
carla_environment.py 11.71 KiB
""" Carla.org environment"""
import glob
import os
import random
import sys
import math
import weakref
import numpy as np
import pygame
import time
from steering_wheel import ACTION_SPACE, Controller, IDLE
# find carla module
try:
CARLA_PATH='/media/armin/Games/carla/PythonAPI/carla/dist/carla-*%d.%d-%s.egg'
sys.path.append(glob.glob(CARLA_PATH % (
sys.version_info.major,
sys.version_info.minor,
'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0])
except IndexError:
pass
import carla
class Camera:
""" Add camera sensor to the carla world """
sensor = None
surface = None
display = None
width = None
height = None
surface = None
camera_type = None
def __init__(self, world, camera_type='rgb', width=1280, height=720):
self.width = width
self.height = height
self.camera_type = camera_type
self.display = pygame.display.set_mode(
(width, height),
pygame.HWSURFACE | pygame.DOUBLEBUF)
self.display.fill((0,0,0))
camera_blueprint = world.world.get_blueprint_library().find('sensor.camera.' + camera_type)
camera_blueprint.set_attribute('image_size_x', f'{self.width}')
camera_blueprint.set_attribute('image_size_y', f'{self.height}')
camera_blueprint.set_attribute('fov', '95')
camera_transform = carla.Transform(carla.Location(x=-7.0, z=5.2), carla.Rotation(pitch=-20.0, yaw=0, roll=0.0))
self.sensor = world.world.spawn_actor(camera_blueprint, camera_transform, attach_to=world.player)
self.sensor.listen(lambda data: self.process_img(data))
def process_img(self, data):
""" Callback for rgb-camera sensor. """
if self.camera_type == 'semantic_segmentation':
data.convert(carla.ColorConverter.CityScapesPalette)
array = np.frombuffer(data.raw_data, dtype=np.dtype("uint8"))
array = np.reshape(array, (self.height, self.width, 4))
array = array[:, :, :3]
array = array[:, :, ::-1]
self.surface = pygame.surfarray.make_surface(array.swapaxes(0, 1))
def on_update(self):
""" Update the pygame display. """
if self.surface is not None:
self.display.blit(self.surface, (0, 0))
pygame.display.flip()
class CollisionSensor:
sensor = None
collision = None
def __init__(self, world, parent):
bp = world.get_blueprint_library().find('sensor.other.collision')
self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=parent)
weak_self = weakref.ref(self)
self.sensor.listen(lambda event: CollisionSensor._on_collision(weak_self, event))
def _on_collision(weak_self, event):
self = weak_self()
if not self:
return
# print(event.other_actor)
self.collision = event
impulse = event.normal_impulse
intensity = math.sqrt(impulse.x**2 + impulse.y**2 + impulse.z**2)
# print(intensity)
class ObstacleSensor:
sensor = None
parent = None
ZONES = 4
obstacle = []
def __init__(self, world, parent):
self.parent = parent
bp = world.get_blueprint_library().find('sensor.other.obstacle')
bp.set_attribute('distance', '10')
bp.set_attribute('hit_radius', '3')
print(bp.get_attribute('hit_radius'))
self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=parent, attachment_type=carla.AttachmentType.Rigid)
weak_self = weakref.ref(self)
self.sensor.listen(lambda event: ObstacleSensor._on_event(weak_self, event))
def _on_event(weak_self, event):
self = weak_self()
if not self:
return
self.collision = event
if not event.other_actor.type_id == 'static.road':
print(self.parent.get_location())
print(self.parent.get_velocity())
print(event.other_actor.type_id)
print(event.other_actor.transform)
print(event.distance)
class LidarSensor:
sensor = None
data = None
def __init__(self, world, parent):
bp = world.get_blueprint_library().find('sensor.lidar.ray_cast')
bp.set_attribute('rotation_frequency', '10')
bp.set_attribute('channels', '1')
bp.set_attribute('points_per_second', '100')
bp.set_attribute('range', '7')
bp.set_attribute('sensor_tick', '0.1')
bp.set_attribute('upper_fov', '0')
bp.set_attribute('lower_fov', '0')
weak_self = weakref.ref(self)
self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=parent)
self.sensor.listen(lambda event: LidarSensor._on_data(weak_self, event))
def _on_data(weak_self, event):
self = weak_self()
if not self:
return
for m in event: print(m.point)
if len(event) > 0:
print(event.horizontal_angle)
class World:
""" Wrapper for the carla environment, incl. player/vehicle """
player = None
collision_sensor = None
obstacle_sensor = None
lidar_sensor = None
world = None
blueprint_library = None
spawn_points = None
actors = []
was_big_reward = False
def __init__(self, world):
self.world = world
settings = self.world.get_settings()
settings.fixed_delta_seconds = 0.1
settings.synchronous_mode = True
self.world.apply_settings(settings)
self.blueprint_library = self.world.get_blueprint_library()
self.spawn_points = self.world.get_map().get_spawn_points()
self.reset()
def reset(self):
""" Remove and create new player/vehicle. """
self.destroy()
time.sleep(0.5)
self.spawn_on_sidewalk()
def observation(self):
if self.player is not None:
pos = self.player.get_transform()
vel = self.player.get_velocity()
return [pos.location.x, pos.location.y, vel.x, vel.y, pos.rotation.yaw]
else:
return [0,0,0]
def spawn_player(self, transform):
""" Add a vehicle to the world. """
while self.player is None:
blueprint = random.choice(self.blueprint_library.filter('model3'))
position = transform
self.player = self.world.try_spawn_actor(blueprint, position)
start_location = self.player.get_location()
print(str(start_location))
self.collision_sensor = CollisionSensor(self.world, self.player)
self.obstacle_sensor = ObstacleSensor(self.world, self.player)
def spawn_on_sidewalk(self):
position = carla.Transform(carla.Location(x=-13, y=57.5, z=0.2), carla.Rotation(pitch=0.0, yaw=-180, roll=0.0))
self.spawn_player(position)
self.spawn_actors(self.sidewalk_parking_cars_transforms())
def sidewalk_parking_cars_transforms(self):
transforms = []
transforms.append(carla.Transform(carla.Location(x=-19.2, y=54.5, z=0.2), carla.Rotation(pitch=0.0, yaw=-180, roll=0.0)))
transforms.append(carla.Transform(carla.Location(x=-14.3, y=54.7, z=0.2), carla.Rotation(pitch=0.0, yaw=-180, roll=0.0)))
transforms.append(carla.Transform(carla.Location(x=-37.3, y=53.7, z=0.2), carla.Rotation(pitch=0.0, yaw=-180, roll=0.0)))
return transforms
def spawn_actors(self, transforms):
blueprint = random.choice(self.blueprint_library.filter('etron'))
mass = carla.VehiclePhysicsControl(mass=900000.0)
for transform in transforms:
actor = self.world.try_spawn_actor(blueprint, transform)
actor.apply_physics_control(mass)
if actor is not None:
self.actors.append(actor)
def destroy(self):
""" Remove vehicle from the world. """
if self.player is not None:
self.player.destroy()
self.player = None
if self.actors is not None:
for actor in self.actors:
actor.destroy()
self.actors = []
if self.collision_sensor is not None:
self.collision_sensor.sensor.destroy()
self.collision_sensor = None
if self.obstacle_sensor is not None:
self.obstacle_sensor.sensor.destroy()
self.obstacle_sensor = None
def step(self, action):
""" Apply controls to vehicle. """
controls = Controller.action_to_controls(action)
c = carla.VehicleControl(throttle=controls[0], steer=controls[1], brake=controls[2], reverse=controls[3])
self.player.apply_control(c)
self.world.tick()
reward = self.reward(action)
self.collision_sensor.collision = None
return reward
def reward(self, action):
target = carla.Transform(carla.Location(x=-25.534311, y=54.460903, z=0.112781), carla.Rotation(pitch=0.000000, yaw=-175.922913, roll=-6.221135))
pos = self.player.get_transform()
pos_diff = math.sqrt((target.location.x - pos.location.x)**2 + (target.location.y - pos.location.y)**2)
rot_diff = (target.rotation.yaw - pos.rotation.yaw)
velocity = self.player.get_velocity()
v = math.sqrt((velocity.x)**2 + (velocity.y)**2)
r = ((-pos_diff / 10) + 1) / 5
if v < 0.01:
r -= 0.5
if v > 15.0:
r -= 10
at_final_position = False
if pos_diff < 1.8:
if self.was_big_reward == False:
r += 50
self.was_big_reward = True
if abs(rot_diff) < 5 and v < 0.01:
at_final_position = True
else:
if self.was_big_reward == True:
self.was_big_reward = False
r -= 50
done = False
if at_final_position :
print("Success")
r += 100
done = True
if self.collision_sensor.collision is not None:
r -= 250
done = True
if action == IDLE:
r -= 0.3
return self.observation(), r, done, None
class ActionSpace:
n = ACTION_SPACE
class ObservationSpace:
shape = [5]
class CarlaEnvironment:
action_space = ActionSpace
observation_space = ObservationSpace
world = None
client = None
camera = None
allow_render = False
def __init__(self, host="127.0.0.1", port=2000, render=False):
pygame.init()
self.client = carla.Client(host, port)
self.client.set_timeout(5.0)
time.sleep(1.0)
self.client.load_world('Town07')
time.sleep(1.0)
self.world = World(self.client.get_world())
if render:
self.allow_render = True
self.camera = Camera(self.world, camera_type='semantic_segmentation')
# self.camera = Camera(self.world, camera_type='rgb')
def reset(self):
self.world.reset()
return self.world.observation()
def render(self):
if self.allow_render:
self.camera.on_update()
def step(self, action, render=False):
if render:
self.render()
return self.world.step(action)
def __del__(self):
if self.camera is not None:
self.camera.sensor.destroy()
if self.world is not None:
self.world.destroy()
if __name__ == "__main__":
LOCALHOST="127.0.0.1"
SERVER="192.168.188.20"
env = CarlaEnvironment(host=LOCALHOST, render=True)
clock = pygame.time.Clock()
ctrl = Controller()
cumulated_reward = 0
while ctrl.is_running():
clock.tick(5)
ctrl.on_update()
obs, reward, done, _ = env.step(ctrl.get_action(), render=True)
print(str(reward) + ' ' + str(done))
cumulated_reward += reward
if done:
break
env.world.destroy()
pygame.quit()
print(cumulated_reward)