diff --git a/.gitignore b/.gitignore index e143fbf..e0a2f57 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ testenv/ __pycache__/ -docker-compose.yaml \ No newline at end of file +docker-compose.yaml +config/ \ No newline at end of file diff --git a/config.example.json b/config.example.json new file mode 100644 index 0000000..3a40266 --- /dev/null +++ b/config.example.json @@ -0,0 +1,6 @@ +[ + {"uuid": "692daca8-08e4-48b5-917b-a65a0f72cc12", "transport": "tcp", "address": "192.168.1.1:1234", "alive_pool_seconds": 60}, + {"uuid": "5e432648-6c04-4892-af98-f32382931645", "transport": "ble", "adapter":"hci0", "address": "00:00:00:00:00:00"}, + {"uuid": "0a8aa564-2f91-4e4d-a102-0aae57881bab", "transport": "serial", "port": "/dev/tty0USB", "baudrate": 115200}, + {"uuid": "c64ab2a4-5305-4c70-8776-83c74a5de796", "transport": "ws", "port": "60000"} +] \ No newline at end of file diff --git a/mesht_device.py b/mesht_device.py index 2ac1b66..470abe8 100644 --- a/mesht_device.py +++ b/mesht_device.py @@ -220,7 +220,7 @@ class Channel: class MeshtDevice: - def __init__(self, transport): + def __init__(self, transport, device_uuid, skip_init = False): self.transport = transport self.channels = [] self.lora_config = None @@ -228,6 +228,8 @@ class MeshtDevice: self.my_node_id = "00000000" self.my_node_id_dec = 0 self.nid = None + self.device_uuid = device_uuid + self.skip_init = skip_init async def start(self): await self.transport.start() @@ -266,6 +268,7 @@ class MeshtDevice: async def recv(self): data = await self.transport.recv() fr = pb.decode(data, FROMRADIO_SCHEMA) + fr["device_uuid"] = self.device_uuid logger.debug(f"FromRadio: {fr}") self._maybe_store_channel(fr) self._maybe_store_lora_config(fr) diff --git a/mesht_models.py b/mesht_models.py index 8a41ff5..46d6536 100644 --- a/mesht_models.py +++ b/mesht_models.py @@ -25,6 +25,10 @@ WS_TYPE_NEW = 1 WS_TYPE_FRONTEND = 2 async def _wait_for_config_complete(device, extraInfo = False): + if device.skip_init: + logger.info(str(device), "init config skip") + return [] + logger.info("wait config") packets = [] while True: diff --git a/service.py b/service.py index 760242c..4024ae9 100644 --- a/service.py +++ b/service.py @@ -35,59 +35,86 @@ class MeshArgsParse: def __init__(self, args): self.args = args -class MeshListener(MeshArgsParse): +class MeshMultiListener(MeshArgsParse): def __init__(self, args): - super().__init__(args) - self.meshState = NOT_CONNECTED self.PUB_CH = PUB_CH - self.last_packet_catch = time() + super().__init__(args) + self.devices:List[MeshtDevice] = [] + self.defaultDeviceUUID = "" + self.readConfig(args.change_workdir, args.mesh_config) - if args.transport == "serial": - from transport_serial import SerialTransport - self.transport = SerialTransport(port = args.serial_port, baudrate = args.serial_baudrate) - elif args.transport == "ble": - from transport_ble import BLETransport - self.transport = BLETransport(args.ble_mesh_mac, args.ble_adapter) - elif args.transport == "tcp": - from transport_tcp import TCPTransport - ip, port = args.tcp_address.split(":") - self.transport = TCPTransport(ip, int(port), alive_pool_connect=self.args.serial_alive_pool_seconds) - else: - logger.error("Unknown mesh transport") - sys.exit(1) - - self.device = MeshtDevice(self.transport) + ''' + [ + {"uuid": "692daca8-08e4-48b5-917b-a65a0f72cc12", "transport": "tcp", "address": "192.168.1.1:1234", "alive_pool_seconds": 60}, + {"uuid": "5e432648-6c04-4892-af98-f32382931645", "transport": "ble", "adapter":"hci0", "address": "00:00:00:00:00:00"}, + {"uuid": "0a8aa564-2f91-4e4d-a102-0aae57881bab", "transport": "serial", "port": "/dev/tty0USB", "baudrate": 115200}, + {"uuid", "c64ab2a4-5305-4c70-8776-83c74a5de796", "transport": "ws", "port": "60000"} + ] + ''' + #not need async + def readConfig(self, change_dir = True, path = "./config/mesh.json"): + from json import load + if change_dir: + path = f"{os.path.dirname(os.path.abspath(__file__))}/{path}" + + with open(path, "r") as config: + self.json_config = load(config) - #task - async def meshWorker(self, queue: asyncio.Queue): - logger.info("Start mesh queue listener") + for device_config in self.json_config: + if device_config.get("uuid", "") == "": + raise Exception("missed uuid section in ", device_config) + if device_config.get("transport", "") == "": + raise Exception("missed uuid section in ", device_config) + + if device_config["transport"] == "serial": + logger.info("Found serial transport") + from transport_serial import SerialTransport + transport = SerialTransport(port = device_config["port"], baudrate = device_config["baudrate"]) + self.devices.append(MeshtDevice(transport, device_config['uuid'])) + elif device_config["transport"] == "tcp": + logger.info("Found tcp transport") + from transport_tcp import TCPTransport + ip, port = device_config["address"].split(":") + transport = TCPTransport(ip, int(port), alive_pool_connect=device_config.get("alive_pool_seconds", 60)) + self.devices.append(MeshtDevice(transport, device_config['uuid'])) + elif device_config["transport"] == "ble": + logger.info("Found ble transport") + from transport_ble import BLETransport + transport = BLETransport(device_config["address"], device_config["adapter"]) + self.devices.append(MeshtDevice(transport, device_config['uuid'])) + elif device_config["transport"] == "ws": + raise Exception("ws transport not impl") + + #set default mesh + self.defaultDeviceUUID = self.json_config[0]["uuid"] + + async def meshListener(self, device: MeshtDevice, queue: asyncio.Queue): run = not self.args.disable_mesh or self.args.enable_mesh while run: try: - await self.device.start() - self.meshState = WAIT_CONFIG - logger.info("Mesh state: wait config") - self.init_data = await _wait_for_config_complete(self.device) - for from_radio in self.init_data: + await device.start() + device.state = WAIT_CONFIG + logger.info(str(device), " wait config") + init_data = await _wait_for_config_complete(device) + for from_radio in init_data: await queue.put(from_radio) - logger.info("Mesh state: available") - self.meshState = AVAILABLE + logger.info(str(device), " available") + device.state = AVAILABLE while True: - from_radio, _ = await self.device.recv() - #logger.debug(from_radio) + from_radio, _ = await device.recv() await queue.put(from_radio) - self.last_packet_catch = time() + device.last_packet_catch = time() except asyncio.exceptions.CancelledError: - logger.info("Kill mesh device") + logger.info(str(device), " kill device") run = False except: - logger.error("Mesh state: error") - self.meshState = ERR + logger.error(str(device), " has connect error") + device.state = ERR traceback.print_exc() await asyncio.sleep(1) - logger.info("Mesh state: reconnect") - self.meshState = RECONNECT + logger.info(str(device), " device will reconnect") + device.state = RECONNECT finally: await self.device.close() @@ -131,16 +158,11 @@ class MongoDriver(MeshArgsParse): self.tileManager = TileManager(self) async def dbSaveRadio(self, new_from_radio): - '''try: - anyJson = from_radio["packet"] - except: - logger.debug(from_radio) - return''' - - #logger.debug(from_radio) - #logger.debug(len(list(from_radio.keys()))) from_radio = copy.deepcopy(new_from_radio) for k, v in from_radio.items(): + if k == "device_uuid": + continue + if type(v) != dict: v = {"data": v, "ts": time()} else: @@ -173,29 +195,14 @@ class MongoDriver(MeshArgsParse): v.update(v["user"]) del v["user"] + v["device_uuid"] = from_radio["device_uuid"] await self.dbStore[k].insert_one(v) - '''async def dbQueueListener(self, queue: asyncio.Queue): - logger.info("Start db queue listener") - run = True - while run: - try: - from_radio = await queue.get() - if from_radio is None: - continue - - await self.dbSaveRadio(from_radio) - except asyncio.exceptions.CancelledError: - logger.info("Kill db listener") - run = False - except: - traceback.print_exc()''' - -class MeshCenter(MeshListener, MeshApi, MongoDriver, MeshArgsParse): +class MeshCenter(MeshMultiListener, MeshApi, MongoDriver, MeshArgsParse): queue: asyncio.Queue = asyncio.Queue() def __init__(self, args): - MeshListener.__init__(self, args) + MeshMultiListener.__init__(self, args) MeshArgsParse.__init__(self, args) MeshApi.__init__(self, args) MongoDriver.__init__(self, args) @@ -217,12 +224,12 @@ class MeshCenter(MeshListener, MeshApi, MongoDriver, MeshArgsParse): except: traceback.print_exc() - def buildBackgroundTasks(self): #input queue - async def mL(): - await self.meshWorker(self.queue) - self.tasks.append(mL) + for device in self.devices: + async def mL(): + await self.meshListener(device, self.queue) + self.tasks.append(mL) #output async def handlerTask(): @@ -258,9 +265,12 @@ class MeshCenter(MeshListener, MeshApi, MongoDriver, MeshArgsParse): if __name__ == "__main__": parser = argparse.ArgumentParser() #mesh - parser.add_argument("--transport", default="tcp") + parser.add_argument("--change-workdir", default=True, action="store_true") + parser.add_argument("--mesh-config", default="./config/mesh.json") parser.add_argument("--disable-mesh", action="store_true", default=True) parser.add_argument("--enable-mesh", action="store_true", default=False, help="Need to run in docker if git is bullshit updates") + ''' + parser.add_argument("--transport", default="tcp") #serial transport parser.add_argument("--serial-port", default="/dev/tty.usbmodemD0CF1309DC141") parser.add_argument("--serial-baudrate", default=115200) @@ -269,7 +279,7 @@ if __name__ == "__main__": parser.add_argument("--ble-adapter", default=None) parser.add_argument("--ble-mesh-mac", default="22AC1D28-5345-465E-2E82-18CDE5857A45") #tcp trasponse - parser.add_argument("--tcp-address", default="192.168.3.26:8886") + parser.add_argument("--tcp-address", default="192.168.3.26:8886")''' #fastapi parser.add_argument("--web-host", default="0.0.0.0") parser.add_argument("--web-port", default=8680)