#sys imports import argparse import traceback import sys, os import asyncio from time import time from typing import List, Dict import copy from logger import logger #mesh from mesht_device import MeshtDevice, USER_SCHEMA from mesht_models import _wait_for_config_complete, PUB_CH from mesht_models import NOT_CONNECTED, WAIT_CONFIG, AVAILABLE, ERR, RECONNECT from protobufs_extra.telemetry_proto import * from protobufs_extra.position_proto import * from protobufs_extra.routing_proto import * import pb from protobuf_decoder.protobuf_decoder import Parser #fs imports from fastapi import FastAPI, HTTPException, WebSocket from fastapi.responses import HTMLResponse from contextlib import asynccontextmanager #mongo from pymongo import AsyncMongoClient from dbService import DbService #other from botManager import BotManager from utils import isInt, generate_random_string, md5hash class MeshArgsParse: def __init__(self, args): self.args = args class MeshMultiListener(MeshArgsParse): def __init__(self, args): self.PUB_CH = PUB_CH super().__init__(args) self.devices:List[MeshtDevice] = [] self.devicesUuidHashToUuid = {} self.defaultDeviceUUID = "" self.defaultDeviceUUIDHash = "" self.readConfig(args.change_workdir, args.mesh_config) ''' [ {"uuid": "692daca8-08e4-48b5-917b-a65a0f72cc12", "transport": "tcp", "address": "192.168.1.1:1234", "alive_pool_seconds": 60}, {"uuid": "5e432648-6c04-4892-af98-f32382931645", "transport": "ble", "adapter":"hci0", "address": "00:00:00:00:00:00"}, {"uuid": "0a8aa564-2f91-4e4d-a102-0aae57881bab", "transport": "serial", "port": "/dev/tty0USB", "baudrate": 115200}, {"uuid", "c64ab2a4-5305-4c70-8776-83c74a5de796", "transport": "ws", "port": "60000"} ] ''' #not need async def readConfig(self, change_dir = True, path = "./config/mesh.json"): from json import load if change_dir: path = f"{os.path.dirname(os.path.abspath(__file__))}/{path}" with open(path, "r") as config: self.json_config = load(config) for device_config in self.json_config: if device_config.get("uuid", "") == "": raise Exception("missed uuid section in ", device_config) if device_config.get("transport", "") == "": raise Exception("missed uuid section in ", device_config) if device_config["transport"] == "serial": logger.info("Found serial transport") from transport_serial import SerialTransport transport = SerialTransport(port = device_config["port"], baudrate = device_config["baudrate"]) self.devices.append(MeshtDevice(transport, device_config['uuid'])) elif device_config["transport"] == "tcp": logger.info("Found tcp transport") from transport_tcp import TCPTransport ip, port = device_config["address"].split(":") transport = TCPTransport(ip, int(port), alive_pool_connect=device_config.get("alive_pool_seconds", 60)) self.devices.append(MeshtDevice(transport, device_config['uuid'])) elif device_config["transport"] == "ble": logger.info("Found ble transport") from transport_ble import BLETransport transport = BLETransport(device_config["address"], device_config["adapter"]) self.devices.append(MeshtDevice(transport, device_config['uuid'])) elif device_config["transport"] == "ws": logger.info("Found ws transport") from transport_ws import WSTransport transport = WSTransport(device_config["port"], device_config["uuid"]) self.devices.append(MeshtDevice(transport, device_config['uuid'], True, False, True)) #set default mesh self.defaultDeviceUUID = self.json_config[0]["uuid"] self.defaultDeviceUUIDHash = md5hash(self.defaultDeviceUUID) for device in self.devices: print(md5hash(device.device_uuid), device) self.devicesUuidHashToUuid[md5hash(device.device_uuid)] = device.device_uuid async def meshListener(self, device: MeshtDevice, queue: asyncio.Queue): run = not self.args.disable_mesh or self.args.enable_mesh while run: try: await device.start() device.state = WAIT_CONFIG logger.info(str(device), " wait config") init_data = await _wait_for_config_complete(device) for from_radio in init_data: await queue.put(from_radio) logger.info(str(device), " available") device.state = AVAILABLE while True: from_radio, _ = await device.recv() if not device.test_client: await queue.put(from_radio) device.last_packet_catch = time() except asyncio.exceptions.CancelledError: logger.info(str(device), " kill device") run = False except: logger.error(str(device), " has connect error") device.state = ERR traceback.print_exc() await asyncio.sleep(1) logger.info(str(device), " device will reconnect") device.state = RECONNECT finally: await device.close() class MeshApi(MeshArgsParse): app: FastAPI tasks: List context: str = "/api" def __init__(self, args): super().__init__(args) self.app = FastAPI(lifespan=self.lifespan) self.tasks = [] self.listeners = [] #[{"func": func, "args":[]}] from authManager import AuthManager self.authManager = AuthManager(args) @asynccontextmanager async def lifespan(self, app: FastAPI): logger.info("web started") logger.info("create mesh listeners") for listener in self.listeners: asyncio.create_task(listener["func"](*listener["args"])) logger.info("now create bg tasks") for task in self.tasks: asyncio.create_task(task()) yield logger.info("kill web server") def run(self): import uvicorn uvicorn.run(self.app, host=self.args.web_host, port = self.args.web_port) class MongoDriver(MeshArgsParse): def __init__(self, args): super().__init__(args) if args.mongo_url: self.dbClient = AsyncMongoClient(args.mongo_url) elif args.mongo_host and args.mongo_port: self.dbClient = AsyncMongoClient(args.mongo_host, args.mongo_port) else: logger.error("Unknown mongo client") sys.exit(1) self.dbStore = self.dbClient[self.args.mongo_db] self.dbService = DbService(self.dbStore, self) from tileManager import TileManager self.tileManager = TileManager(self) async def dbSaveRadio(self, new_from_radio): from_radio = copy.deepcopy(new_from_radio) for k, v in from_radio.items(): if k == "device_uuid": continue if type(v) != dict: v = {"data": v, "ts": time()} else: v["ts"] = time() if "decoded" in v: v.update(v["decoded"]) del v["decoded"] if "payload" in v: try: if v.get("portnum", 0) == 1: #text v["decoded_payload"] = v["payload"].decode() elif v.get("portnum", 0) == 67: #telemetry v["decoded_payload_object"] = pb.decode(v["payload"], TELEMETRY_SCHEME) elif v.get("portnum", 0) == 3: #pos v["decoded_payload_object"] = pb.decode(v["payload"], POSITION_SCHEME) elif v.get("portnum", 0) == 5: #routing v["decoded_payload_object"] = pb.decode(v["payload"], ROUTING_SCHEME) elif v.get("portnum", 0) == 70: #traceroute v["decoded_payload_object"] = pb.decode(v["payload"], ROUTE_DISCOVERY_SCHEME) elif v.get("portnum", 0) == 4: #nodeinfo v["decoded_payload_object"] = pb.decode(v["payload"], USER_SCHEMA) else: self.tryParseProtobuf(v) #if "decoded_payload_object" in v: # print(v["decoded_payload_object"]) except: logger.error("Cannot decode protobuf: " + v.get("portnum", 0)) self.tryParseProtobuf(v) traceback.print_exc() if "user" in v: v.update(v["user"]) del v["user"] v["device_uuid"] = from_radio["device_uuid"] try: await self.dbStore[k].insert_one(v) except Exception as e: logger.error("Cannot save packet to db", str(e), v) def tryParseProtobuf(self, v): try: if v.get("portnum", 0) in []: pass else: to_parse = ' '.join(f'{byte:02x}' for byte in v["payload"]) res = Parser().parse(to_parse) logger.debug(v["portnum"], res.to_dict()) except: pass class MeshCenter(MeshMultiListener, MeshApi, MongoDriver, MeshArgsParse): queue: asyncio.Queue = asyncio.Queue() def __init__(self, args): MeshMultiListener.__init__(self, args) MeshArgsParse.__init__(self, args) MeshApi.__init__(self, args) MongoDriver.__init__(self, args) self.buildBackgroundTasks() self.bot = BotManager(self) self.extensionLoader(['webExtensions']) async def queueHandler(self): logger.info("Start queue handler") run = True while run: try: from_radio = await self.queue.get() if from_radio: yield from_radio except asyncio.exceptions.CancelledError: run = False except: traceback.print_exc() def buildBackgroundTasks(self): #input queue #[{"func": func, "args":[]}] for device in self.devices: logger.debug("Append mesh listener", device) self.listeners.append({"func": self.meshListener, "args":[device, self.queue]}) #output async def handlerTask(): async for from_radio in self.queueHandler(): #logger.debug(from_radio) asyncio.create_task(self.dbSaveRadio(from_radio)) asyncio.create_task(self.bot.handleMessage(from_radio)) self.tasks.append(handlerTask) self.tasks.append(self.authManager.storeCleaner) def extensionLoader(self, search_paths = []): logger.info("Search fastapiExt") os.chdir(os.path.dirname(os.path.abspath(__file__))) if type(search_paths) == str: search_paths = [search_paths] for path in search_paths: logger.info(f"Try found extensions in {path}") if not os.path.exists(path) or not os.path.isdir(path): logger.info(f"Directory is not exists or not directory, skip") continue sys.path.insert(0, path) for extension in os.listdir(path): extension, ext = os.path.splitext(extension) if ext != ".py": continue logger.info(f"Found web ext: {extension}") __import__(extension).WebExtension(self) sys.path.pop(0) if __name__ == "__main__": parser = argparse.ArgumentParser() #mesh parser.add_argument("--change-workdir", default=True, action="store_true") parser.add_argument("--mesh-config", default="./config/mesh.json") parser.add_argument("--disable-mesh", action="store_true", default=False) parser.add_argument("--enable-mesh", action="store_true", default=False, help="Need to run in docker if git is bullshit updates") ''' parser.add_argument("--transport", default="tcp") #serial transport parser.add_argument("--serial-port", default="/dev/tty.usbmodemD0CF1309DC141") parser.add_argument("--serial-baudrate", default=115200) parser.add_argument("--serial-alive-pool-seconds", default=60) #ble transport parser.add_argument("--ble-adapter", default=None) parser.add_argument("--ble-mesh-mac", default="22AC1D28-5345-465E-2E82-18CDE5857A45") #tcp trasponse parser.add_argument("--tcp-address", default="192.168.3.26:8886")''' #fastapi parser.add_argument("--web-host", default="0.0.0.0") parser.add_argument("--web-port", default=8680) parser.add_argument("--web-salt", default=generate_random_string(32)) parser.add_argument("--web-auth-enable", default=False, action="store_true") #mongodb parser.add_argument("--mongo-url") parser.add_argument("--mongo-host", default="192.168.3.2") parser.add_argument("--mongo-port", default=27017) parser.add_argument("--mongo-db", default="meshtastic") a = MeshCenter(parser.parse_args()) a.run()