Source code for jaclearn.rl.simulator.controller

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : controller.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 04/23/2017
#
# This file is part of Jacinle.
# Distributed under terms of the MIT license.

import tkinter as tk
import threading
import time

from PIL import Image, ImageTk


[docs] class Controller(object):
[docs] def __init__(self, title='Controller', fps=100): self.records = dict() self.last_key = None self.tk_root = None self.tk_canv = None self.tk_image = None self.__lock = threading.Lock() self.__title = title self.__current_lock = threading.Lock() self.__current = None self.__fps = fps self.__stop_event = threading.Event()
[docs] def update_title(self, title): assert self.tk_root is None self.__title = title
[docs] def update(self, img): with self.__current_lock: self.__current = img
[docs] def get_last_key(self): while True: self.__lock.acquire() res = self.last_key if res is None: self.__lock.release() time.sleep(0.1) else: self.last_key = None self.__lock.release() return res
[docs] def test_key(self, key): return self.records.get(key, False)
[docs] def mainloop(self): while True: if self.__current is None: time.sleep(1) if self.__stop_event.is_set(): return else: break self.__create_tk() self.__update_image_thread() self.tk_root.mainloop()
[docs] def quit(self): self.__stop_event.set() if self.tk_root is not None: self.tk_root.quit()
def __create_tk(self): self.tk_root = tk.Tk() self.tk_root.title(self.__title) self.__update_image() self.tk_canv = tk.Label(self.tk_root, image=self.tk_image, bd=0) self.tk_canv.pack(side=tk.TOP, expand=tk.YES, fill=tk.BOTH) self.tk_canv.focus_set() self.tk_canv.bind("<KeyPress>", self.__press) self.tk_canv.bind("<KeyRelease>", self.__release) def __update_image(self): with self.__current_lock: img = Image.fromarray(self.__current) imgtk = ImageTk.PhotoImage(img) self.tk_image = imgtk def __update_image_thread(self): self.__update_image() self.tk_canv.config(image=self.tk_image) self.tk_root.update_idletasks() self.tk_root.after(int(1000 / self.__fps), self.__update_image_thread) def __press(self, event): with self.__lock: self.records[event.keysym_num] = True self.last_key = event.keysym_num def __release(self, event): with self.__lock: self.records[event.keysym_num] = False