You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

229 lines
6.2 KiB

import asyncio
import serial_asyncio
import os, subprocess
import argparse
import traceback
import psutil
#5 - 101 - a d
#4 - xxx
#3 - 104 - 1 2 3 4 5
#2 - 103 - hz
#1 - 102 - w s
#upButton = 1001
DEBUGIO = os.environ.get("DEBUGIO", False)
class MOVEMENT:
RIGHT = 1
NONE = 0
LEFT = -1
class Pin:
def __init__(self, id, initValue):
self.id = id
self.value = initValue
self.movement = MOVEMENT.NONE
def updateValue(self, value): #(Pin.class, value, movement)
if self.value == value:
self.value = 0
self.movement = MOVEMENT.NONE
return (self, self.value, MOVEMENT.NONE)
else:
self.movement = MOVEMENT.LEFT if value < self.value else MOVEMENT.RIGHT
self.value = value
return (self, value, self.movement)
def __call__(self):
return (self, self.value, self.movement)
class ENTER:
def __init__(self):
self.pin = 1001
self.enter = "xdotool key Return".split()
def __call__(self, data):
if DEBUGIO:
print(self.enter)
subprocess.Popen(self.enter)
class AD:
def __init__(self):
self.pin = 101
self.a = "xdotool key a".split()
self.d = "xdotool key d".split()
def __call__(self, data):
if data[2] == MOVEMENT.LEFT:
if DEBUGIO:
print(self.a)
subprocess.Popen(self.a)
if data[2] == MOVEMENT.RIGHT:
if DEBUGIO:
print(self.d)
subprocess.Popen(self.d)
class QE:
def __init__(self):
self.pin = 103
self.e = "xdotool key e".split()
self.q = "xdotool key q".split()
def __call__(self, data):
if data[2] == MOVEMENT.LEFT:
if DEBUGIO:
print(self.e)
subprocess.Popen(self.e)
if data[2] == MOVEMENT.RIGHT:
if DEBUGIO:
print(self.q)
subprocess.Popen(self.q)
class WS:
def __init__(self):
self.pin = 102
self.w = "xdotool key w".split()
self.s = "xdotool key s".split()
def __call__(self, data):
if data[2] == MOVEMENT.LEFT:
if DEBUGIO:
print(self.s)
subprocess.Popen(self.s)
if data[2] == MOVEMENT.RIGHT:
if DEBUGIO:
print(self.w)
subprocess.Popen(self.w)
class MODE:
def __init__(self):
self.pin = 104
self.min_key = 1
self.max_key = 5
self.current_key = 1
def __call__(self, data):
if data[2] == MOVEMENT.LEFT:
self.current_key -= 1
if data[2] == MOVEMENT.RIGHT:
self.current_key += 1
if (self.current_key < self.min_key):
self.current_key = self.min_key
if (self.current_key > self.max_key):
self.current_key = self.max_key
cmd = f"xdotool key {self.current_key}".split()
if DEBUGIO:
print(cmd)
subprocess.Popen(cmd)
class Listener(asyncio.Protocol):
binds = {
101: AD(),
104: MODE(),
102: WS(),
1001: ENTER(),
103: QE()
}
def __init__(self):
super().__init__()
self.is_connected = False
self.buffer = b""
self.store = {}
self.tasks = []
def connection_made(self, transport):
self.transport = transport
self.is_connected = True
self.tasks.append(asyncio.create_task(self.cpuCheck()))
self.tasks.append(asyncio.create_task(self.memCheck()))
print("Connected")
def connection_lost(self, exc):
self.is_connected = False
while self.tasks.__len__() != 0:
print("Try kill tasks")
self.tasks.pop().cancel()
async def cpuCheck(self):
DISPLAY_ID = 10001
try:
while self.is_connected:
cpu_percent = round(psutil.cpu_percent(interval=None), 1)
if self.transport and not self.transport.is_closing():
msg = f"{DISPLAY_ID} {cpu_percent} 0\n"
#print(msg)
self.transport.write(msg.encode())
await asyncio.sleep(1)
except:
traceback.print_exc()
async def memCheck(self):
DISPLAY_ID = 10002
try:
while self.is_connected:
mem = psutil.virtual_memory()
mem_percent = round(mem.percent / 10, 1)
if self.transport and not self.transport.is_closing():
msg = f"{DISPLAY_ID} {mem_percent} 0\n"
#print(msg)
self.transport.write(msg.encode())
await asyncio.sleep(1)
except:
traceback.print_exc()
def data_received(self, data):
self.buffer += data
lines = self.buffer.split(b'\n')
self.buffer = lines[-1]
for line in lines[:-1]:
line = line.decode().replace("\r", "")
if DEBUGIO:
print(line)
try:
pin, value = line.split("*")
pin = int(pin)
value = int(value)
if DEBUGIO:
print(pin, value)
if pin not in self.store:
self.store[pin] = Pin(pin, value)
else:
self.store[pin].updateValue(value)
self.useX(self.store[pin]())
except:
if DEBUGIO:
traceback.print_exc()
print("Cannot parse")
else:
pass
def useX(self, pin):
if pin[0].id in self.binds:
self.binds[pin[0].id](pin)
if __name__ == "__main__":
os.environ['DISPLAY'] = ':0'
parser = argparse.ArgumentParser()
parser.add_argument("--serial", type=str, default="/dev/serial0")
parser.add_argument("--baudrate", type=int, default=9600)
args = parser.parse_args()
loop = asyncio.get_event_loop()
coro = serial_asyncio.create_serial_connection(loop, Listener, args.serial, baudrate = args.baudrate)
transport, protocol = loop.run_until_complete(coro)
loop.run_forever()
loop.close()