11 changed files with 163 additions and 6 deletions
Binary file not shown.
Binary file not shown.
@ -0,0 +1,3 @@ |
|||
#!/bin/bash |
|||
cd /home/pipboy/Pipboy3Kmark4/pipboyIO/legacy |
|||
sudo -u pipboy python3 serialX.py |
|||
@ -0,0 +1 @@ |
|||
pyserial-asyncio |
|||
@ -0,0 +1,150 @@ |
|||
import asyncio |
|||
import serial_asyncio |
|||
import os, subprocess |
|||
import argparse |
|||
|
|||
#5 - 101 - a d |
|||
#4 - xxx |
|||
#3 - 104 - 1 2 3 4 5 |
|||
#2 - 103 - hz |
|||
#1 - 102 - w s |
|||
#upButton = 1001 |
|||
|
|||
class MOVEMENT: |
|||
RIGHT = 1 |
|||
NONE = 0 |
|||
LEFT = -1 |
|||
|
|||
class Pin: |
|||
def __init__(self, id, initValue): |
|||
self.id = id |
|||
self.value = initValue |
|||
self.movement = MOVEMENT.NONE |
|||
|
|||
def updateValue(self, value): #(Pin.class, value, movement) |
|||
if self.value == value: |
|||
self.value = 0 |
|||
self.movement = MOVEMENT.NONE |
|||
return (self, self.value, MOVEMENT.NONE) |
|||
else: |
|||
self.movement = MOVEMENT.LEFT if value < self.value else MOVEMENT.RIGHT |
|||
self.value = value |
|||
return (self, value, self.movement) |
|||
|
|||
def __call__(self): |
|||
return (self, self.value, self.movement) |
|||
|
|||
|
|||
class ENTER: |
|||
def __init__(self): |
|||
self.pin = 1001 |
|||
self.enter = "xdotool key Return".split() |
|||
def __call__(self, data): |
|||
print(self.enter) |
|||
subprocess.Popen(self.enter) |
|||
|
|||
class AD: |
|||
def __init__(self): |
|||
self.pin = 101 |
|||
self.a = "xdotool key a".split() |
|||
self.d = "xdotool key d".split() |
|||
|
|||
def __call__(self, data): |
|||
if data[2] == MOVEMENT.LEFT: |
|||
print(self.a) |
|||
subprocess.Popen(self.a) |
|||
if data[2] == MOVEMENT.RIGHT: |
|||
print(self.d) |
|||
subprocess.Popen(self.d) |
|||
|
|||
class WS: |
|||
def __init__(self): |
|||
self.pin = 101 |
|||
self.w = "xdotool key w".split() |
|||
self.s = "xdotool key s".split() |
|||
|
|||
def __call__(self, data): |
|||
if data[2] == MOVEMENT.LEFT: |
|||
print(self.s) |
|||
subprocess.Popen(self.s) |
|||
if data[2] == MOVEMENT.RIGHT: |
|||
print(self.w) |
|||
subprocess.Popen(self.w) |
|||
|
|||
class MODE: |
|||
def __init__(self): |
|||
self.pin = 104 |
|||
self.min_key = 1 |
|||
self.max_key = 5 |
|||
self.current_key = 1 |
|||
|
|||
def __call__(self, data): |
|||
if data[2] == MOVEMENT.LEFT: |
|||
self.current_key -= 1 |
|||
if data[2] == MOVEMENT.RIGHT: |
|||
self.current_key += 1 |
|||
|
|||
if (self.current_key < self.min_key): |
|||
self.current_key = self.min_key |
|||
|
|||
if (self.current_key > self.max_key): |
|||
self.current_key = self.max_key |
|||
|
|||
cmd = f"xdotool key {self.current_key}".split() |
|||
print(cmd) |
|||
subprocess.Popen(cmd) |
|||
|
|||
class Listener(asyncio.Protocol): |
|||
binds = { |
|||
101: AD(), |
|||
104: MODE(), |
|||
102: WS(), |
|||
1001: ENTER() |
|||
} |
|||
|
|||
def __init__(self): |
|||
super().__init__() |
|||
self.buffer = b"" |
|||
self.store = {} |
|||
|
|||
def connection_made(self, transport): |
|||
self.transport = transport |
|||
print("Connected") |
|||
|
|||
def data_received(self, data): |
|||
self.buffer += data |
|||
lines = self.buffer.split(b'\n') |
|||
self.buffer = lines[-1] |
|||
|
|||
for line in lines[:-1]: |
|||
try: |
|||
pin, value = line.split("*") |
|||
pin = int(pin) |
|||
value = int(value) |
|||
|
|||
if pin not in self.store: |
|||
self.store[pin] = Pin(pin, value) |
|||
else: |
|||
self.store[pin].updateValue(value) |
|||
|
|||
self.useX(self.store[pin]()) |
|||
except: |
|||
print("Cannot parse") |
|||
|
|||
def useX(self, pin): |
|||
if pin[0].id in self.binds: |
|||
self.binds[pin[0]](pin) |
|||
|
|||
if __name__ == "__main__": |
|||
os.environ['DISPLAY'] = ':0' |
|||
|
|||
parser = argparse.ArgumentParser() |
|||
parser.add_argument("--serial", type=str, default="/dev/serial0") |
|||
parser.add_argument("--baudrate", type=int, default=9600) |
|||
args = parser.parse_args() |
|||
|
|||
loop = asyncio.get_event_loop() |
|||
coro = serial_asyncio.create_serial_connection(loop, Listener, args.serial, baudrate = args.baudrate) |
|||
transport, protocol = loop.run_until_complete(coro) |
|||
loop.run_forever() |
|||
loop.close() |
|||
@ -1,3 +0,0 @@ |
|||
#!/bin/bash |
|||
cd /home/pipboy/Pipboy3Kmark4/pipboyIO |
|||
sudo -u pipboy python3 serialX.py |
|||
Loading…
Reference in new issue