import asyncio import serial_asyncio import os, subprocess import argparse import traceback #5 - 101 - a d #4 - xxx #3 - 104 - 1 2 3 4 5 #2 - 103 - hz #1 - 102 - w s #upButton = 1001 DEBUGIO = os.environ.get("DEBUGIO", False) class MOVEMENT: RIGHT = 1 NONE = 0 LEFT = -1 class Pin: def __init__(self, id, initValue): self.id = id self.value = initValue self.movement = MOVEMENT.NONE def updateValue(self, value): #(Pin.class, value, movement) if self.value == value: self.value = 0 self.movement = MOVEMENT.NONE return (self, self.value, MOVEMENT.NONE) else: self.movement = MOVEMENT.LEFT if value < self.value else MOVEMENT.RIGHT self.value = value return (self, value, self.movement) def __call__(self): return (self, self.value, self.movement) class ENTER: def __init__(self): self.pin = 1001 self.enter = "xdotool key Return".split() def __call__(self, data): if DEBUGIO: print(self.enter) subprocess.Popen(self.enter) class AD: def __init__(self): self.pin = 101 self.a = "xdotool key a".split() self.d = "xdotool key d".split() def __call__(self, data): if data[2] == MOVEMENT.LEFT: if DEBUGIO: print(self.a) subprocess.Popen(self.a) if data[2] == MOVEMENT.RIGHT: if DEBUGIO: print(self.d) subprocess.Popen(self.d) class WS: def __init__(self): self.pin = 101 self.w = "xdotool key w".split() self.s = "xdotool key s".split() def __call__(self, data): if data[2] == MOVEMENT.LEFT: if DEBUGIO: print(self.s) subprocess.Popen(self.s) if data[2] == MOVEMENT.RIGHT: if DEBUGIO: print(self.w) subprocess.Popen(self.w) class MODE: def __init__(self): self.pin = 104 self.min_key = 1 self.max_key = 5 self.current_key = 1 def __call__(self, data): if data[2] == MOVEMENT.LEFT: self.current_key -= 1 if data[2] == MOVEMENT.RIGHT: self.current_key += 1 if (self.current_key < self.min_key): self.current_key = self.min_key if (self.current_key > self.max_key): self.current_key = self.max_key cmd = f"xdotool key {self.current_key}".split() if DEBUGIO: print(cmd) subprocess.Popen(cmd) class Listener(asyncio.Protocol): binds = { 101: AD(), 104: MODE(), 102: WS(), 1001: ENTER() } def __init__(self): super().__init__() self.buffer = b"" self.store = {} def connection_made(self, transport): self.transport = transport print("Connected") def data_received(self, data): self.buffer += data lines = self.buffer.split(b'\n') self.buffer = lines[-1] for line in lines[:-1]: line = line.decode().replace("\r", "") if DEBUGIO: print(line) try: pin, value = line.split("*") pin = int(pin) value = int(value) if DEBUGIO: print(pin, value) if pin not in self.store: self.store[pin] = Pin(pin, value) else: self.store[pin].updateValue(value) self.useX(self.store[pin]()) except: if DEBUGIO: traceback.print_exc() print("Cannot parse") else: pass def useX(self, pin): if pin[0].id in self.binds: self.binds[pin[0].id](pin) if __name__ == "__main__": os.environ['DISPLAY'] = ':0' parser = argparse.ArgumentParser() parser.add_argument("--serial", type=str, default="/dev/serial0") parser.add_argument("--baudrate", type=int, default=9600) args = parser.parse_args() loop = asyncio.get_event_loop() coro = serial_asyncio.create_serial_connection(loop, Listener, args.serial, baudrate = args.baudrate) transport, protocol = loop.run_until_complete(coro) loop.run_forever() loop.close()