import asyncio import serial_asyncio import os, subprocess import argparse import traceback import psutil #5 - 101 - a d #4 - xxx #3 - 104 - 1 2 3 4 5 #2 - 103 - hz #1 - 102 - w s #upButton = 1001 DEBUGIO = os.environ.get("DEBUGIO", False) class MOVEMENT: RIGHT = 1 NONE = 0 LEFT = -1 class Pin: def __init__(self, id, initValue): self.id = id self.value = initValue self.movement = MOVEMENT.NONE def updateValue(self, value): #(Pin.class, value, movement) if self.value == value: self.value = 0 self.movement = MOVEMENT.NONE return (self, self.value, MOVEMENT.NONE) else: self.movement = MOVEMENT.LEFT if value < self.value else MOVEMENT.RIGHT self.value = value return (self, value, self.movement) def __call__(self): return (self, self.value, self.movement) class ENTER: def __init__(self): self.pin = 1001 self.enter = "xdotool key Return".split() def __call__(self, data): if DEBUGIO: print(self.enter) subprocess.Popen(self.enter) class AD: def __init__(self): self.pin = 101 self.a = "xdotool key a".split() self.d = "xdotool key d".split() def __call__(self, data): if data[2] == MOVEMENT.LEFT: if DEBUGIO: print(self.a) subprocess.Popen(self.a) if data[2] == MOVEMENT.RIGHT: if DEBUGIO: print(self.d) subprocess.Popen(self.d) class QE: def __init__(self): self.pin = 103 self.e = "xdotool key e".split() self.q = "xdotool key q".split() def __call__(self, data): if data[2] == MOVEMENT.LEFT: if DEBUGIO: print(self.e) subprocess.Popen(self.e) if data[2] == MOVEMENT.RIGHT: if DEBUGIO: print(self.q) subprocess.Popen(self.q) class WS: def __init__(self): self.pin = 102 self.w = "xdotool key w".split() self.s = "xdotool key s".split() def __call__(self, data): if data[2] == MOVEMENT.LEFT: if DEBUGIO: print(self.s) subprocess.Popen(self.s) if data[2] == MOVEMENT.RIGHT: if DEBUGIO: print(self.w) subprocess.Popen(self.w) class MODE: def __init__(self): self.pin = 104 self.min_key = 1 self.max_key = 5 self.current_key = 1 def __call__(self, data): if data[2] == MOVEMENT.LEFT: self.current_key -= 1 if data[2] == MOVEMENT.RIGHT: self.current_key += 1 if (self.current_key < self.min_key): self.current_key = self.min_key if (self.current_key > self.max_key): self.current_key = self.max_key cmd = f"xdotool key {self.current_key}".split() if DEBUGIO: print(cmd) subprocess.Popen(cmd) class Listener(asyncio.Protocol): binds = { 101: AD(), 104: MODE(), 102: WS(), 1001: ENTER(), 103: QE() } def __init__(self): super().__init__() self.is_connected = False self.buffer = b"" self.store = {} self.tasks = [] def connection_made(self, transport): self.transport = transport self.is_connected = True self.tasks.append(asyncio.create_task(self.cpuCheck())) self.tasks.append(asyncio.create_task(self.memCheck())) print("Connected") def connection_lost(self, exc): self.is_connected = False while self.tasks.__len__() != 0: print("Try kill tasks") self.tasks.pop().cancel() async def cpuCheck(self): DISPLAY_ID = 10001 try: while self.is_connected: cpu_percent = round(psutil.cpu_percent(interval=None), 1) if self.transport and not self.transport.is_closing(): msg = f"{DISPLAY_ID} {cpu_percent} 0\n" #print(msg) self.transport.write(msg.encode()) await asyncio.sleep(1) except: traceback.print_exc() async def memCheck(self): DISPLAY_ID = 10002 try: while self.is_connected: mem = psutil.virtual_memory() mem_percent = round(mem.percent / 10, 1) if self.transport and not self.transport.is_closing(): msg = f"{DISPLAY_ID} {mem_percent} 0\n" #print(msg) self.transport.write(msg.encode()) await asyncio.sleep(1) except: traceback.print_exc() def data_received(self, data): self.buffer += data lines = self.buffer.split(b'\n') self.buffer = lines[-1] for line in lines[:-1]: line = line.decode().replace("\r", "") if DEBUGIO: print(line) try: pin, value = line.split("*") pin = int(pin) value = int(value) if DEBUGIO: print(pin, value) if pin not in self.store: self.store[pin] = Pin(pin, value) else: self.store[pin].updateValue(value) self.useX(self.store[pin]()) except: if DEBUGIO: traceback.print_exc() print("Cannot parse") else: pass def useX(self, pin): if pin[0].id in self.binds: self.binds[pin[0].id](pin) if __name__ == "__main__": os.environ['DISPLAY'] = ':0' parser = argparse.ArgumentParser() parser.add_argument("--serial", type=str, default="/dev/serial0") parser.add_argument("--baudrate", type=int, default=9600) args = parser.parse_args() loop = asyncio.get_event_loop() coro = serial_asyncio.create_serial_connection(loop, Listener, args.serial, baudrate = args.baudrate) transport, protocol = loop.run_until_complete(coro) loop.run_forever() loop.close()