Skip to content
Snippets Groups Projects
Commit 15d394cc authored by Frederic Aust's avatar Frederic Aust
Browse files

Initial Init der Python Dateien fuer die Steuerung des Batte Pongs

parent 7a22b627
Branches
Tags release-2.7.14
1 merge request!6Binding all settings to game
from enum import Enum
class ActionSpace(Enum):
nothing=0
up=1
down=2
training=3
\ No newline at end of file
import asyncio
import random
from socket import socket
import json
import websockets
from ActionSpace import ActionSpace
import logging
class Gym:
host = None
port = None
player = None
families = None
types = None
protocols = None
logger = None
# Create a TCP/IP socket
sock = None
last_recieved_JSON = None
websocket_server = None
def __init__(self, host, port, player):
self.host = host
self.port = port
self.player = player
random.seed()
self.families = self.get_constants('AF_')
self.types = self.get_constants('SOCK_')
self.protocols = self.get_constants('IPPROTO_')
self.logger = logging.getLogger('websockets')
self.logger.setLevel(logging.INFO)
self.logger.addHandler(logging.StreamHandler())
print('New gym instance IP:%s Port:%s Playername:%S', host, port, player)
# Call reset function of environment and returns observation
async def reset(self):
message = '["'+self.player+'","start_game"]'
data = []
print('send_reset:', message)
await self.send_websocket(message, data)
print("awaited send_reset")
#print(data)
return data
# Connects the player and returns observation
async def connect_player(self):
message = '["'+self.player+'","connect_player"]'
data = []
print('connect_player: ', message)
await self.send_websocket(message, data)
print("awaited send_connect_player")
#print(data)
return data
# TPC - creates env
def open(self, server_ip=None, port=None):
if server_ip is not None:
self.host = server_ip
if port is not None:
self.port = port
# connect to server
self.create_socket(self.host, self.port)
# TCP - closes the env
def close(self):
# send close to server
# close socket
self.close_socket("Closing environment")
pass
# send action to env returns observation, reward, done, info
# creates JSON object
async def step(self, action):
if isinstance(action, ActionSpace):
# action is part of ActionSpace
print("go step")
data = await self.send_action(action)
#print('step done', data)
return data
else:
print("Action is not a valid Step")
""" Returns a random action of action space
"""
def get_sample_action(self):
item = random.randrange(0, len(ActionSpace))
return ActionSpace(item)
# socket_echo_client_easy.py
def get_constants(self, prefix):
"""Create a dictionary mapping socket module
constants to their names.
"""
return {
getattr(socket, n): n
for n in dir(socket)
if n.startswith(prefix)
}
def create_socket(self, server_ip=None, port=None):
if server_ip is not None:
self.host = server_ip
if port is not None:
self.port = port
# Create a TCP/IP socket
self.sock = socket.create_connection(self.host, self.port)
print('Family :', self.families[self.sock.family])
print('Type :', self.types[self.sock.type])
print('Protocol:', self.protocols[self.sock.proto])
async def send_action(self, next_action):
message = '["'+self.player+'","' +self.player+"_"+ next_action.name + '"]'
data = []
print('send_action:',message)
await self.send_websocket(message, data)
#print("awaited send_action")
#print(data)
return data
async def send_websocket(self, message, data):
uri = "ws://localhost:9080"
async with websockets.connect(uri) as websocket:
await websocket.send(message)
# print(f"> {name}")
recieved_values = await websocket.recv()
data.append(json.loads(recieved_values))
#print(f"< {data}")
#print(data)
async def connect_to_trainer(self):
self.websocket_server = websockets.serve(self.get_training, "localhost", 8765)
async def get_training(self):
recieved_values = await self.websocket_server.recv()
print(f"<{recieved_values}")
return json.loads(recieved_values)
def send_tcp(self, message):
try:
# Send data
# test message
# message = b'This is the message. It will be repeated.'
print('sending {!r}'.format(message))
self.sock.sendall(message)
amount_received = 0
amount_expected = len(message)
data = None
while amount_received < amount_expected:
data = self.sock.recv(16)
amount_received += len(data)
#print('received {!r}'.format(data))
return data
except Exception as e:
self.close_socket("closing socket because of exception {}".format(e.args[-1]))
# TCP - Closing the socket
def close_socket(self, message):
print("Closing Socket")
print("Msg: " + message)
self.sock.close()
class Observation():
# picture of the game
picture = None
# X-Y Coordinates
player_pos = None
# X-Y Coordinates
enemy_pos = None
# 2D-Vector
player_direction = None
# 2D-Vector
enemy_direction = None
# Given observation will be interpreted
def __init__(self, observation_tcp):
pass
\ No newline at end of file
import asyncio
import show_image as si
from ActionSpace import ActionSpace
from Gym import Gym
async def main():
print("Hello World!")
gym = Gym("localhost", 10000, "player_one")
# data = await gym.step(ActionSpace.UP)
# print('main data', data)
recieved_values = await gym.connect_player()
data = recieved_values[0]
recieved_values = await gym.reset()
data = recieved_values[0]
print('main data', data)
image_mode = 'RGB'
try:
# Test show Image
si.show_image(image_mode, data['info']['screenshot']['width'], data['info']['screenshot']['height'],
data['info']['screenshot']['image'])
except Exception as e:
print("failed showing image: " + str(e))
resetted = True
while (True):
if data['done']:
recieved_values = await gym.reset()
data = recieved_values[0]
if resetted:
resetted = False
player_one_y = data['observation']['PlayerOne']['Y']
ball_y = data['observation']['ball']['position']['Y']
if (player_one_y < ball_y):
recieved_values = await gym.step(ActionSpace.down)
data = recieved_values[0]
print('main data', data)
if (player_one_y > ball_y):
recieved_values = await gym.step(ActionSpace.up)
data = recieved_values[0]
print('main data', data)
if (player_one_y == ball_y):
recieved_values = await gym.step(ActionSpace.nothing)
data = recieved_values[0]
print('main data', data)
try:
# Test show Image
si.show_image(image_mode, data['info']['screenshot']['width'],
data['info']['screenshot']['height'],
data['info']['screenshot']['image'])
except Exception as e:
print("failed showing image: " + str(e))
print('done')
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import asyncio
from ActionSpace import ActionSpace
from Gym import Gym
async def main():
print("Hello World!")
gym = Gym("localhost", 9080, "player_two")
recieved_values= await gym.connect_player()
data = recieved_values[0]
print('main data', data)
while(True):
player_two_y=data['observation']['PlayerTwo']['Y']
ball_y = data['observation']['ball']['position']['Y']
if (player_two_y <ball_y):
recieved_values = await gym.step(ActionSpace.down)
data = recieved_values[0]
print('main data', data)
if (player_two_y >ball_y):
recieved_values = await gym.step(ActionSpace.up)
data = recieved_values[0]
print('main data', data)
if (player_two_y ==ball_y):
recieved_values = await gym.step(ActionSpace.nothing)
data = recieved_values[0]
print('main data', data)
print('done')
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import asyncio
import show_image as si
from ActionSpace import ActionSpace
from Gym import Gym
async def main():
print("Hello World!")
gym = Gym("localhost", 9080, "player_one")
recieved_values = await gym.connect_player()
data = recieved_values[0]
print('main data', data)
while (True):
player_two_y = data['observation']['PlayerTwo']['Y']
ball_y = data['observation']['ball']['position']['Y']
recieved_values = await gym.step(ActionSpace.training)
data = recieved_values[0]
try:
# Test show Image
si.show_image("L", data['info']['screenshot']['width'],
data['info']['screenshot']['height'],
data['info']['screenshot']['image'])
except Exception as e:
print("failed showing image: " + str(e))
print('main data', data)
print('done')
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import numpy as np
import base64
from PIL import Image
import zlib as zl
def show_image(format, width, height, image):
image_decoded = base64.b64decode(image)
print(len(image_decoded))
print(type(image_decoded))
#image_data = ... # byte values of the image
image = Image.frombytes(format, (width,height),image_decoded,'raw')
#image.show()
# Save to file
#a=np.asarray(image)
#im = Image.fromarray(a)
#image.save("your_file.jpeg")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment