Skip to content
Snippets Groups Projects
Commit 2f39322b authored by Midras Lappe's avatar Midras Lappe
Browse files
parents ca0128f8 d8672b34
No related branches found
No related tags found
No related merge requests found
import asyncio
import random
import json
from socket import socket
import websockets
#from ActionSpace import ActionSpace
import logging
......@@ -13,143 +10,75 @@ class Gym:
host = None
port = None
player = None
families = None
types = None
protocols = None
logger = None
# Create a TCP/IP socket
sock = None
last_recieved_JSON = None
loop = None
maxaction = 5
#websocket = None
def __init__(self, host ='localhost', port =9080, player ="player1"):
self.host = host
self.port = port
self.player = player
#self.loop = loop
random.seed()
# self.families = self.get_constants('AF_')
# self.types = self.get_constants('SOCK_')
# self.protocols = self.get_constants('IPPROTO_')
self.logger = logging.getLogger('websockets')
self.logger.setLevel(logging.INFO)
self.logger.addHandler(logging.StreamHandler())
print('New gym instance IP:', host, 'Port:', port, 'Playername:', player)
# Call reset function of environment and returns observation
# Call reset function of environment and returns observation, reward, done, info
# Send data to reset enviroment
async def reset(self, delta):
message = json.dumps([['reset'], delta], sort_keys=True, indent=4)
data = []
#print('send_action:',message)
await self.send_websocket(message, data)
test = json.loads(data[0])
# print(test)
# print(test[0])
# print(test[1])
reward = test[3]
observation = test[0:3]
done = test[4]
info = 0
# print(data)
# print(data[0][0])
all_information = json.loads(data[0])
reward = all_information[3]
observation = all_information[0:3]
done = all_information[4]
info = all_information[5]
return observation, reward, done, info
# send action to env returns observation, reward, done, info
# Obeservation is an array with the following information:
# [speed, steering-angle, [length of all sensors (starting straight and then clockwise around the car)]]
# creates JSON object
async def step(self, action, delta):
global maxaction
# Test ob die Action im richtigen Bereich ist!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
test_message = []
if action.find("accelerate")!=-1:#(action>=0) and (action < maxaction):
test_message.append( "accelerate")
#message = json.dumps({'action': action, 'delta': delta}, sort_keys=True, indent=4)
#message = json.dumps([action], sort_keys=True, indent=4)
#message2 = str(action)
#hier data aufteilen!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
#test = json.loads(data)
#print(test)
#x = data.split(',')
#print(x)
message = []
if action.find("accelerate")!=-1:
message.append( "accelerate")
if action.find("left")!= -1:
test_message.append( "steer_left")
message.append( "steer_left")
if action.find("right")!= -1:
test_message.append("steer_right")
message.append("steer_right")
if action.find("brake")!= -1:
test_message.append("brake")
message.append("brake")
if action.find("reverse")!= -1:
test_message.append("reverse")
message.append("reverse")
if action.find("screen_on")!= -1:
test_message.append("screen_on")
message.append("screen_on")
if action.find("screen_off")!= -1:
test_message.append("screen_off")
else:
pass
#print("Action is not a valid Step")
message.append("screen_off")
message = json.dumps([test_message,delta], sort_keys=True, indent=4)
#message = json.dumps([action], sort_keys=True, indent=4)
message = json.dumps([message,delta], sort_keys=True, indent=4)
data = []
#print('send_action:', message)
await self.send_websocket(message, data)
#print("awaited send_action")
test = json.loads(data[0])
# print(test)
# print(test[0])
# print(test[1])
reward = test[3]
observation = test[0:3]
done = test[4]
info = test[5]
# print(data)
# print(data[0][0])
all_information = json.loads(data[0])
reward = all_information[3]
observation = all_information[0:3]
done = all_information[4]
info = all_information[5]
return observation, reward, done, info
""" Returns a random action of action space
"""
# Gibt jetzt eine Action zurück als Zahlenwert
def get_sample_action(self):
global maxaction
action = random.randrange(0, maxaction)
return action
# socket_echo_client_easy.py
# Wird jetzt nicht mehr benutzt !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# async def send_action(self, next_action):
# message = '["' + next_action.name + '"]'
# data = []
# print('send_action:',message)
# await self.send_websocket(message, data)
# print("awaited send_action")
# print(data)
# return data
async def connect_websocket(self):
print("creating Websocket ")
uri = "ws://localhost:9080"
self.websocket = await websockets.connect(uri)
# async with websockets.connect(uri) as websocket:
# print("Websocket created ")
# self.websocket = websocket
async def send_websocket(self, message, data):
async def send_websocket(self, message, data):
await self.websocket.send(message)
# print(f"> {name}")
data.append(await self.websocket.recv())
#print(f"< {data}")
#print(data)
async def close_websocket(self):
await self.websocket.close()
# Summary of the Project
Open_Ai_Gym_Autobahn_Auffahren is a project that makes it possible for a python-agent to play a godot game.\
This project shows an example for that with a car trying to enter a highway.\
It has a complete Godot-Enviroment with an example bot in python.\
# Installation Tutorial
## Godot
First you need to install Godot on your Computer.\
You can find the Link [here](https://godotengine.org/download).
## Python
[Here](https://www.python.org/downloads/) is one way to install Python on your PC.\
You also need the following packages: [websockets](https://pypi.org/project/websocket-client/), [Pillow](https://pypi.org/project/Pillow/).\
Please notice that the websockets package is not supportet in Spyder Enviroment.
## Repository
Download all the files to convinient folder.
# Setup
There are a few variables you can change in Godot
## World
1. human_player: If set to true, you can drive with wasd and brake with space. Otherwise it waits for a client to connect.
2. learn_with_images: Also returns a 60x80 screenshot of the game in every step, if set to true.
3. show_game: If set to false, it won't render the game, but still sends all the information to the client.
## Car
1. brake_str: Can be used to reduce the brake-strength of the car.
2. NUMBER_OF_SENSORS: Determines the number of sensors around the car (Always start at the front and goes clockwise).
3. max_range_sensor: Range of the sensors in pixels (~31 m)
4. range_per_step_sensor: Accuracy of the sensors in pixels (~0.31 m)
# Running the Project
## Godot
1. Start Godot
2. Click Scan to a a new project and select the folder with the downloaded files
3. Now doubleclick the project
4. Run the Project by pressing F5
## Python
1. After starting the Project in Godot you can run your own Python programm or the example progamm (main_test) from the files
___
Created by Midras Lappe and Theo Brockmann
# -*- coding: utf-8 -*-
"""
Created on Tue Jan 12 17:17:49 2021
@author: theob
"""
import asyncio
import Gym_new
import time
import nest_asyncio
import show_image as si
async def main():
image_mode = 'L'
env = Gym_new.Gym()
await env.connect_websocket()
# Resets the enviroment
observation, reward, done, info = await env.reset("",0.01)
print(observation)
print(reward)
print(done)
# Makes 1 step only driving straight
observation, reward, done, info = await env.step("accelerate,screen_on", 0.17)
print(observation)
# Obeservation is an array with the following information:
# [speed, steering-angle, [length of all sensors (starting straight and then clockwise around the car)]]
print(reward)
print(done)
try:
# Test show Image
si.show_image(image_mode,info['thumbnail']['width'],info['thumbnail']['height'],info['thumbnail']['image'])
except Exception as e:
print("failed showing image: " + str(e))
if __name__ == "__main__":
nest_asyncio.apply()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment