import os
import random
import ctypes
import time
import h5py
import numpy as np
import pyglet
from pyglet.gl import *
from GramophoneTools.LinMaze import Rule
from GramophoneTools.LinMaze.Tools.Stopwatch import Stopwatch
from GramophoneTools.LinMaze.Tools.filehandler import select_file
from GramophoneTools import Comms
import GramophoneTools
class LinMazeError(Exception):
"""
Generic Exception for LinMaze related errors.
"""
pass
[docs]class VRWindow(pyglet.window.Window):
'''
A pyglet window that can display VR on a given screen.
:param session: The session that is played in this window.
:type session: Session
:param screen_number: Which monitor the window should display on.
:type screen_number: int
:param mirrored: True if the contents of this window should be
mirrored horizontally. False by default.
:type mirrored: bool
:param fullscreen: True if the window should be fullscreen. True by default.
:type fullscreen: bool
'''
def __init__(self, session, screen_number,
mirrored=False, fullscreen=True):
self.session = session
self.mirrored = mirrored
disp = pyglet.window.Display()
screens = disp.get_screens()
screen_number -= 1
super().__init__(self.session.level.screen_width,
self.session.level.screen_height,
screen=screens[screen_number],
resizable=False, vsync=True,
fullscreen=fullscreen, visible=False)
self.set_mouse_visible(False)
self.set_caption('LinMaze - Monitor #' + str(screen_number + 1))
dir = os.path.dirname(__file__)
icon = pyglet.image.load(dir+'\\res\\icon.png')
self.set_icon(icon)
# OpenGL init
glMatrixMode(GL_PROJECTION)
glLoadIdentity()
glOrtho(0, self.session.level.screen_width, 0,
self.session.level.screen_height, -1, 1)
glMatrixMode(GL_MODELVIEW)
glDisable(GL_DEPTH_TEST)
glClearColor(0.0, 0.0, 0.0, 0.0)
glEnable(GL_TEXTURE_2D)
glTexParameterf(GL_TEXTURE_2D, GL_TEXTURE_WRAP_S, GL_CLAMP_TO_EDGE)
glTexParameterf(GL_TEXTURE_2D, GL_TEXTURE_WRAP_T, GL_CLAMP_TO_EDGE)
glTexParameteri(GL_TEXTURE_2D, GL_TEXTURE_MAG_FILTER, GL_NEAREST)
glTexParameteri(GL_TEXTURE_2D, GL_TEXTURE_MIN_FILTER, GL_NEAREST)
[docs] def on_key_press(self, symbol, modifiers):
if symbol == pyglet.window.key.SPACE:
if self.session.paused:
self.session.unpause()
else:
self.session.pause()
if symbol == pyglet.window.key.ESCAPE:
pyglet.app.exit()
for rule in self.session.level.rules:
# Check keypress rules
if type(rule) is Rule.KeyPressRule:
rule.check(symbol)
if modifiers & pyglet.window.key.MOD_CTRL:
if symbol == pyglet.window.key._1:
target = int(not self.session.last_read['DO-1'])
self.session.gramophone.write_output(1, target)
if symbol == pyglet.window.key._2:
target = int(not self.session.last_read['DO-2'])
self.session.gramophone.write_output(2, target)
if symbol == pyglet.window.key._3:
target = int(not self.session.last_read['DO-3'])
self.session.gramophone.write_output(3, target)
if symbol == pyglet.window.key._4:
target = int(not self.session.last_read['DO-4'])
self.session.gramophone.write_output(4, target)
if symbol == pyglet.window.key.LEFT:
self.session.manual_vel += 5
if symbol == pyglet.window.key.RIGHT:
self.session.manual_vel -= 5
[docs] def on_close(self):
pyglet.app.exit()
[docs] def on_draw(self):
self.switch_to()
# Clean canvas
glClear(GL_COLOR_BUFFER_BIT)
# Dispay all vr units
for vru in self.session.vr_units:
sy = self.session.level.screen_height
sx = vru["length"]
glLoadIdentity()
glColor3f(1.0, 1.0, 1.0)
glBindTexture(GL_TEXTURE_2D, vru["texture_id"])
if self.mirrored:
glTranslatef((-1) * vru["position"] +
self.session.level.screen_width -
vru["length"], 0, 0)
glBegin(GL_QUADS)
glTexCoord2f(0, 0)
glVertex2i(sx, 0)
glTexCoord2f(1, 0)
glVertex2i(0, 0)
glTexCoord2f(1, 1)
glVertex2i(0, sy)
glTexCoord2f(0, 1)
glVertex2i(sx, sy)
else:
glTranslatef(vru["position"], 0, 0)
glBegin(GL_QUADS)
glTexCoord2f(1, 0)
glVertex2i(sx, 0)
glTexCoord2f(0, 0)
glVertex2i(0, 0)
glTexCoord2f(0, 1)
glVertex2i(0, sy)
glTexCoord2f(1, 1)
glVertex2i(sx, sy)
glEnd()
if self.session.offset_arrow:
glLoadIdentity()
glColor3f(1.0, 0.0, 0.0)
glTranslatef(self.session.level.zone_offset, 0, 0)
glBindTexture(GL_TEXTURE_2D, 0)
glBegin(GL_TRIANGLES)
glVertex2i(-50, 0)
glVertex2i(50, 0)
glVertex2i(0, 150)
glEnd()
glFlush()
[docs]class VRLog(object):
"""
A logger for LinMaze Sessions.
:param session: The session that should be logged.
:type session: Session
"""
def __init__(self, session):
self.session = session
# Make headers
self.zone_types = list(
set([zone.zone_type for zone in session.level.zones]))
self.vrl = h5py.File(session.filename, "w")
self.vrl.attrs['level_name'] = session.level.name
self.vrl.attrs['start_time'] = session.start_time
self.vrl.attrs['start_time_hr'] = session.start_time_hr
self.vrl.attrs['runtime_limit'] = str(session.runtime_limit)
self.vrl.attrs['screen_width'] = session.level.screen_width
self.vrl.attrs['screen_height'] = session.level.screen_height
self.vrl.attrs['zone_offset'] = session.level.zone_offset
self.vrl.attrs['transition_width'] = session.level.transition_width
self.vrl.attrs['RGB'] = session.level.rgb
self.vrl.attrs['left_monitor'] = str(session.left_monitor)
self.vrl.attrs['right_monitor'] = str(session.right_monitor)
self.vrl.attrs['device_serial'] = str(session.gramophone_serial)
self.vrl.attrs['velocity_ratio'] = session.vel_ratio
self.vrl.attrs['software_version'] = GramophoneTools.__version__
self.vrl.create_dataset("time", (0,),
maxshape=(None,), dtype=np.float64)
self.vrl.create_dataset("g_time", (0,),
maxshape=(None,), dtype=np.uint64)
self.vrl.create_dataset("velocity", (0,),
maxshape=(None,), dtype=np.int8)
self.vrl.create_dataset("position", (0,),
maxshape=(None,), dtype=np.uint64)
self.vrl.create_dataset("teleport", (0,),
maxshape=(None,), dtype=np.int8)
self.vrl.create_dataset("paused", (0,),
maxshape=(None,), dtype=np.int8)
self.vrl.create_dataset("input_1", (0,),
maxshape=(None,), dtype=np.int8)
self.vrl.create_dataset("input_2", (0,),
maxshape=(None,), dtype=np.int8)
self.vrl.create_dataset("output_1", (0,),
maxshape=(None,), dtype=np.int8)
self.vrl.create_dataset("output_2", (0,),
maxshape=(None,), dtype=np.int8)
self.vrl.create_dataset("output_3", (0,),
maxshape=(None,), dtype=np.int8)
self.vrl.create_dataset("output_4", (0,),
maxshape=(None,), dtype=np.int8)
zone_count = len(session.level.zones)
self.vrl.create_dataset("zone", (0, zone_count), maxshape=(
None, zone_count), dtype=np.int8)
for zone_type in self.zone_types:
self.vrl.create_dataset(
"zone_types/" + zone_type, (0,),
maxshape=(None,),
dtype=np.int8)
# Make log lists
self.time_record = []
self.g_time_record = []
self.vel_record = []
self.pos_record = []
self.teleport_record = []
self.pause_record = []
self.input_record_1 = []
self.input_record_2 = []
self.output_record_1 = []
self.output_record_2 = []
self.output_record_3 = []
self.output_record_4 = []
self.zone_id_records = np.empty((0, zone_count), dtype=bool)
self.zone_type_records = {zt: [] for zt in self.zone_types}
[docs] def make_entry(self, vel, g_time, in_1, in_2, out_1, out_2, out_3, out_4):
'''
Makes an entry in all the session logs.
:param vel: The velocity that sould be logged for this entry.
:type vel: int
:param g_time: The internal clock value of the Gramophone.
:type g_time: int
:param in_1: The state of digital input 1.
:type in_1: int
:param in_2: The state of digital input 2.
:type in_2: int
'''
self.time_record.append(self.session.runtime.value())
self.g_time_record.append(g_time)
self.vel_record.append(-vel)
self.pos_record.append(self.session.virtual_position)
for zone_type in self.zone_types:
self.zone_type_records[zone_type].append(
int(self.session.current_zone.zone_type == zone_type))
zone_row = [int(self.session.current_zone.zone_id == zone.zone_id)
for zone in self.session.level.zones]
self.zone_id_records = np.append(
self.zone_id_records, [zone_row], axis=0)
self.teleport_record.append(int(self.session.teleported))
self.session.teleported = False
self.input_record_1.append(in_1)
self.input_record_2.append(in_2)
self.output_record_1.append(out_1)
self.output_record_2.append(out_2)
self.output_record_3.append(out_3)
self.output_record_4.append(out_4)
self.pause_record.append(int(self.session.paused))
if len(self.time_record) >= 600:
self.flush_all()
[docs] def flush_all(self):
''' Writes all temporary data to file. '''
self.flush(self.time_record, 'time')
self.flush(self.g_time_record, 'g_time')
self.flush(self.vel_record, 'velocity')
self.flush(self.pos_record, 'position')
self.flush(self.teleport_record, 'teleport')
self.flush(self.pause_record, 'paused')
self.flush(self.input_record_1, 'input_1')
self.flush(self.input_record_2, 'input_2')
self.flush(self.output_record_1, 'output_1')
self.flush(self.output_record_2, 'output_2')
self.flush(self.output_record_3, 'output_3')
self.flush(self.output_record_4, 'output_4')
for zone_type in self.zone_types:
self.flush(
self.zone_type_records[zone_type], "zone_types/" + zone_type)
self.flush(self.zone_id_records, 'zone')
[docs] def flush(self, record, fieldname):
'''
Writes the record list into the given field and clears it.
:param record: The record that should be written to file.
:type record: list or np.ndarray
:param fieldname: The name of the field in the HDF5 file this record should
be written into.
:type fieldname: str
'''
if type(record) is np.ndarray:
new_count = record.shape[0] - self.vrl[fieldname].shape[0]
self.vrl[fieldname].resize(record.shape[0], axis=0)
self.vrl[fieldname][-new_count:] = record[-new_count:]
else:
self.vrl[fieldname].resize(
self.vrl[fieldname].shape[0] + len(record), axis=0)
self.vrl[fieldname][-len(record):] = record
del record[:]
[docs] def close(self):
''' Record the time and close the log. '''
end_time = time.time()
self.vrl.attrs['end_time'] = end_time
self.vrl.attrs['end_time_hr'] = time.strftime(
"%Y.%m.%d - %H:%M:%S", time.localtime(end_time))
self.vrl.close()
[docs]class Session(object):
'''
A play/simulation session for a LinMaze Level.
:param level: The LinMaze Level that will be played in this Session.
:type level: Level
:param vel_ratio: The velocity read from the Gramophone is multiplied
with this. 1 by default.
:type vel_ratio: float
:param runtime_limit: How long should the simulation run in minutes.
Set to None to run infinately. None by default
:type runtime_limit: float or None
:param left_monitor: The number of the monitor to the right of the animal. Set to
None to disable this monitor. 1 by default.
:type left_monitor: int or None
:param right_monitor: The number of the monitor to the right of the animal. Set to
None to disable this monitor. None by default.
:type right_monitor: int or None
:param gramophone_serial: The serial of the Gramophone used for the simulation.
Set to None to find a Gramophone automatically. None by default.
:type gramophone_serial: int or None
:param fullscreen: Should the simulation run in fullscreen mode? True by default.
:type fullscreen: bool
:param offset_arrow: Should the zone_offset of the Level be shown as a red arrow on screen?
False by default.
:type offset_arrow: bool
:param skip_save: Should the saving of a log be skipped for this Session? False by default.
:type skip_save: bool
'''
def __init__(self, level, vel_ratio=1, runtime_limit=None,
left_monitor=1, right_monitor=None, gramophone_serial=None,
fullscreen=True, offset_arrow=False, skip_save=False):
self.level = level
self.vel_ratio = vel_ratio
self.runtime_limit = runtime_limit
self.left_monitor = left_monitor
self.right_monitor = right_monitor
self.gramophone_serial = gramophone_serial
self.offset_arrow = offset_arrow
self.skip_save = skip_save
self.manual_vel = 0 # For controlling movement with keyboard
grams = Comms.find_devices()
if grams:
if self.gramophone_serial is None:
print('\nNo Gramophone specified. Using the first one.')
self.gramophone_serial = list(grams)[0]
self.gramophone = grams[self.gramophone_serial]
else:
raise(LinMazeError('No gramophones connected.'))
self.vr_units = []
self.runtime = Stopwatch()
self.position = level.zone_offset - 1
self.current_zone = self.level.zones[0]
self.paused = False
self.teleported = False
self.last_position = 0
# Render the level if it wasn't pre rendered
if not level.rendered:
level.render()
# Set session for all events
for key in self.level.events:
self.level.events[key].set_session(self)
# Make the window
if left_monitor is not None:
left_window = VRWindow(
self, self.left_monitor, mirrored=False, fullscreen=fullscreen)
if right_monitor is not None:
right_window = VRWindow(
self, self.right_monitor, mirrored=True, fullscreen=fullscreen)
def main_loop(dt):
'''
Commands executed at each frame refresh.
:param dt: Time since last reftesh is seconds. Passed by the pyglet clock.
:type dt: float
'''
# print('FPS:', 1/dt)s
params = {}
try:
for key, val in self.gramophone.read_linmaze_params().items():
params[self.gramophone.parameters[key].name] = val
self.last_read = params
except Comms.GramophoneError as err:
print("Communication ERROR:", err)
print("Using the previously read values.")
params = self.last_read
velocity = round(
self.vel_ratio*(params['ENCPOS'] - self.last_position)/14400)
velocity += self.manual_vel
self.last_position = params['ENCPOS']
if self.paused:
self.movement(0)
else:
self.movement(velocity)
self.check_zone()
if not self.skip_save:
self.log.make_entry(velocity, params['TIME'],
params['DI-1'], params['DI-2'],
params['DO-1'], params['DO-2'],
params['DO-3'], params['DO-4'])
self.check_rules(velocity, params['DI-1'], params['DI-2'])
if self.runtime_limit is not None and\
self.runtime.value() >= self.runtime_limit * 60:
pyglet.app.exit()
# Make an OpenGL texture from every frame's texture
texture_ids = []
textures = [frame.texture for frame in self.level.frames]
textures.append(self.level.dummy_frame.texture)
for tex in textures:
# from PIL import Image
# Image.fromarray(tex).show()
sy = tex.shape[0]
sx = tex.shape[1]
# print('X', tex.shape[1], 'Y', tex.shape[0], 'C', tex.shape[2])
tid = GLuint(0)
glGenTextures(1, ctypes.byref(tid))
glPixelStorei(GL_UNPACK_ALIGNMENT, 1)
glBindTexture(GL_TEXTURE_2D, tid)
glTexImage2D(GL_TEXTURE_2D, 0, GL_RGB, sx, sy,
0, GL_RGB, GL_UNSIGNED_BYTE, tex.ctypes.data)
glGenerateMipmap(GL_TEXTURE_2D)
texture_ids.append(tid)
# Make VR units
for i, frame in enumerate(self.level.frames):
self.vr_units.append(
{"length": frame.width - self.level.transition_width,
"texture_id": texture_ids[i],
"position": self.level.zones[i].begin})
# VR unit of dummy frame
self.vr_units.append(
{"length": self.level.dummy_frame.width,
"texture_id": texture_ids[-1],
"position": self.level.zones[-1].end})
# Calculate level length
self.virtual_length = 0
for vru in self.vr_units:
self.virtual_length += vru["length"]
# Connect Gramophone, and reset outputs to 0
self.gramophone.write_output(1, 0)
self.gramophone.write_output(2, 0)
self.gramophone.write_output(3, 0)
self.gramophone.write_output(4, 0)
self.gramophone.write_analog(0)
# Save start date and time
self.start_time = time.time()
self.start_time_hr = time.strftime(
"%Y.%m.%d - %H:%M:%S", time.localtime(self.start_time))
default_filename = time.strftime(
"%Y-%m-%d %H_%M", time.localtime(self.start_time)) +\
' (' + self.level.name + ')'
# Set up VR logger
self.filename = None
if not self.skip_save:
self.filename = select_file(defaultextension='.vrl',
filetypes=[('VR Log', '.vrl')],
title='Save log for this session',
initialdir=os.getcwd(),
initialfile=default_filename)
self.folder = os.path.dirname(os.path.realpath(self.filename))
self.log = VRLog(self)
# Show the window
if left_monitor is not None:
left_window.set_visible()
if right_monitor is not None:
right_window.set_visible()
# Reset runtime & display start message
self.runtime.reset()
print("Starting VR session \n Level: " + self.level.name +
"\n Date & Time: " + self.start_time_hr +
"\n Log file: " + str(self.filename) + "\n")
if self.runtime_limit is not None:
finish_time = self.start_time + self.runtime_limit * 60
print("Training is limited to", self.runtime_limit,
'minutes. It will automatically end at',
time.strftime("%H:%M", time.localtime(finish_time)), '\n')
# Reset gramophone reisters
self.gramophone.reset_time()
self.gramophone.reset_position()
# Reset all Rule delay timers
for rule in [rule for rule in self.level.rules if hasattr(rule, 'delay_timer')]:
rule.delay_timer.reset()
# Schedule main loop
pyglet.clock.schedule(main_loop)
# pyglet.clock.set_fps_limit(30)
# pyglet.clock.schedule_interval(main_loop, 0.005)
# Main app run
pyglet.app.run()
# After main app is closed
# Reset outputs to 0 and disconnect the Gramophone
self.gramophone.stop_burst(1)
self.gramophone.stop_burst(2)
self.gramophone.stop_burst(3)
self.gramophone.stop_burst(4)
self.gramophone.write_output(1, 0)
self.gramophone.write_output(2, 0)
self.gramophone.write_output(3, 0)
self.gramophone.write_output(4, 0)
self.gramophone.write_analog(0)
# Save all remaining data
if not self.skip_save:
self.log.flush_all()
self.log.close()
[docs] def movement(self, vel):
'''
Move on the map with given velocity.
:param vel: Distance to move in pixels.
:type vel: int
'''
# Base movement (used to calculate others, loops around)
self.position += vel
self.position %= -self.virtual_length + self.level.screen_width
# Image movement
img_move = self.position - self.vr_units[0]["position"]
for vru in self.vr_units:
vru["position"] += img_move
# Virtual movement (position of the "character")
limit = self.virtual_length - self.level.screen_width \
- self.level.zone_offset
if -self.position > limit:
self.virtual_position = -self.position - \
(self.virtual_length - self.level.screen_width -
self.level.zone_offset)
else:
self.virtual_position = -self.position + self.level.zone_offset
[docs] def pause(self, position=None):
'''
Pauses the level at the given position.
:param position: Where should the simulation pause on the Level in pixels.
Set to None to pause at current position. None by default.
:type position: int or None
'''
if not self.paused:
if position is not None:
self.teleport(position)
self.paused = True
[docs] def unpause(self, position=None):
'''
Unpauses the level at the given position.
:param position: Where should the simulation unpause on the Level in pixels.
Set to None to unpause at current position. None by default.
:type position: int or None
'''
if self.paused:
if position is not None:
self.teleport(position)
# for zr in self.zone_rules:
# zr.delay_timer.reset()
self.paused = False
[docs] def teleport(self, target_pos):
'''
Teleports to the given position.
:param target_pos: Where should the teleportation land in pixels.
:type position: int
'''
self.position = -(target_pos - self.level.zone_offset)
self.teleported = True
[docs] def random_teleport(self, target_zone_types):
'''
Teleports to the middle of a random zone with one of the given zone types.
:param target_zone_types: list of possible landing zone types.
:type target_zone_types: [str]
'''
zone_selection = [
zone for zone in self.level.zones
if zone.zone_type in target_zone_types]
# and zone.zone_type != self.current_zone.zone_type
target_zone = random.choice(zone_selection)
middle_of_target = (target_zone.begin + target_zone.end) // 2
self.teleport(middle_of_target)
[docs] def check_zone(self):
''' Updates the current zone. '''
self.current_zone = [zone for zone in self.level.zones
if zone.check(self.virtual_position)][0]
[docs] def check_rules(self, vel, in_1, in_2):
'''
Checks all the rules of the Level.
:param vel: The current velocity (for velocity based rules).
:type vel: int
:param in_1: The state of input 1
:type in_1: int
:param in_2: The state of input 2
:type in_2: int
'''
for rule in self.level.rules:
# Check zone rules
if type(rule) is Rule.ZoneRule:
rule.check(self.current_zone.zone_type)
# Check speed and velocity rules
if type(rule) in [Rule.SpeedRule, Rule.VelocityRule, Rule.SmoothVelocityRule]:
rule.check(vel)
# Check input rules
if type(rule) is Rule.InputRule:
rule.check(1, in_1)
rule.check(2, in_2)