diff --git a/.gitignore b/.gitignore index 33bc7e5..e143fbf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ testenv/ -__pycache__/ \ No newline at end of file +__pycache__/ +docker-compose.yaml \ No newline at end of file diff --git a/logger.py b/logger.py index 3c6e98b..2b93622 100644 --- a/logger.py +++ b/logger.py @@ -1,12 +1,12 @@ class logger: @staticmethod - def info(t): - print("[INFO]", t) + def info(*t): + print("[INFO]", " ".join([str(s) for s in t])) @staticmethod - def error(t): - print("[ERROR]", t) + def error(*t): + print("[ERROR]", " ".join([str(s) for s in t])) @staticmethod - def debug(t): - print("[DEBUG]", t) \ No newline at end of file + def debug(*t): + print("[DEBUG]", " ".join([str(s) for s in t])) \ No newline at end of file diff --git a/service.py b/service.py index b520bc7..106949e 100644 --- a/service.py +++ b/service.py @@ -45,16 +45,19 @@ class MeshListener(MeshArgsParse): if args.transport == "serial": from transport_serial import SerialTransport - self.transport = SerialTransport(port = args.serial_port, baudrate = args.serial_baudrate) - self.device = MeshtDevice(self.transport) elif args.transport == "ble": from transport_ble import BLETransport self.transport = BLETransport(args.ble_mesh_mac, args.ble_adapter) - self.device = MeshtDevice(self.transport) + elif args.transport == "tcp": + from transport_tcp import TCPTransport + ip, port = args.tcp_address.split(":") + self.transport = TCPTransport(ip, int(port)) else: logger.error("Unknown mesh transport") sys.exit(1) + + self.device = MeshtDevice(self.transport) #task async def meshWorker(self, queue: asyncio.Queue): @@ -231,7 +234,7 @@ class MeshCenter(MeshListener, MeshApi, MongoDriver, MeshArgsParse): if __name__ == "__main__": parser = argparse.ArgumentParser() #mesh - parser.add_argument("--transport", default="ble") + parser.add_argument("--transport", default="tcp") parser.add_argument("--disable-mesh", action="store_true", default=False) #serial transport parser.add_argument("--serial-port", default="/dev/tty.usbmodemD0CF1309DC141") @@ -239,6 +242,8 @@ if __name__ == "__main__": #ble transport parser.add_argument("--ble-adapter", default=None) parser.add_argument("--ble-mesh-mac", default="22AC1D28-5345-465E-2E82-18CDE5857A45") + #tcp trasponse + parser.add_argument("--tcp-address", default="192.168.3.26:8886") #fastapi parser.add_argument("--web-host", default="0.0.0.0") parser.add_argument("--web-port", default=8680) diff --git a/transport_tcp.py b/transport_tcp.py new file mode 100644 index 0000000..2878212 --- /dev/null +++ b/transport_tcp.py @@ -0,0 +1,156 @@ +import asyncio +import logging +from asyncio.streams import StreamReader, StreamWriter +from time import time + +from logger import logger + +MAGIC0 = 0x94 +MAGIC1 = 0xC3 +READ_CHUNK = 4096 + + +def encode_frame(payload): + n = len(payload) + header = bytes([MAGIC0, MAGIC1, (n >> 8) & 0xFF, n & 0xFF]) + return header + payload + + +class TCPTransport: + def __init__(self, host, port=4403): + self.host = host + self.port = int(port) + self.reader: StreamReader = None + self.writer = None + self._recv_q = asyncio.Queue() + self._buf = bytearray() + self._reader_task = None + self._error = None + self._closing = False + self.socket_start = time() + + async def start(self): + self._closing = False + self._error = None + self._buf = bytearray() + self._recv_q = asyncio.Queue() + try: + #sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + #sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + reader, writer = await asyncio.open_connection(self.host, self.port) + self.socket_start = time() + except Exception as e: + self._error = e + raise + self.reader = reader + self.writer = writer + self._reader_task = asyncio.create_task(self._reader_loop(), name="tcp-reader") + self._alive_task = asyncio.create_task(self._alive_loop(), name="alive-loop") + logger.debug("TCPTransport.start: connected to %s:%s", self.host, self.port) + + async def _alive_loop(self): + try: + while True: + if self.writer: + #logger.debug("Send alive", int(time() - self.socket_start)) + self.writer.write(b"0") + await self.writer.drain() + await asyncio.sleep(1) + except asyncio.CancelledError: + pass + except: + logger.error("Tcp socket is down, close connection") + await self.close() + + async def _reader_loop(self): + assert self.reader is not None + r = self.reader + try: + while True: + data = await r.read(READ_CHUNK) + if not data: + if self._closing: + return + self._error = ConnectionError("tcp connection closed") + logger.error("tcp read error: connection closed") + try: + self._recv_q.put_nowait(None) + except Exception: + pass + return + self._buf.extend(data) + while True: + start = -1 + for i in range(len(self._buf) - 1): + if self._buf[i] == MAGIC0 and self._buf[i + 1] == MAGIC1: + start = i + break + if start == -1: + if len(self._buf) > 1: + self._buf[:] = self._buf[-1:] + break + if start > 0: + del self._buf[:start] + if len(self._buf) < 4: + break + length = (self._buf[2] << 8) | self._buf[3] + total = 4 + length + if len(self._buf) < total: + break + payload = bytes(self._buf[4:total]) + await self._recv_q.put(payload) + del self._buf[:total] + except asyncio.CancelledError: + pass + except Exception as e: + if self._closing: + return + self._error = e + logger.error("tcp read error: %s", e) + try: + self._recv_q.put_nowait(None) + except Exception: + pass + + async def send(self, payload): + if self.writer is None: + return + if not isinstance(payload, (bytes, bytearray)): + raise TypeError("payload must be bytes") + try: + frame = encode_frame(bytes(payload)) + self.writer.write(frame) + await self.writer.drain() + except Exception as e: + if self._closing: + return + self._error = e + logger.error("tcp write error: %s", e) + try: + self._recv_q.put_nowait(None) + except Exception: + pass + + async def recv(self): + item = await self._recv_q.get() + if item is None: + raise ConnectionError(self._error or "tcp transport error") + return item + + async def close(self): + self._closing = True + if self._reader_task is not None: + self._reader_task.cancel() + try: + await self._reader_task + except Exception: + pass + self._reader_task = None + if self.writer is not None: + try: + self.writer.close() + await self.writer.wait_closed() + except Exception: + pass + self.writer = None + self.reader = None \ No newline at end of file