You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

274 lines
9.6 KiB

#sys imports
import argparse
import traceback
import sys, os
import asyncio
from time import time
from typing import List, Dict
import copy
from logger import logger
#mesh
from mesht_device import MeshtDevice
from mesht_models import _wait_for_config_complete, PUB_CH
from mesht_models import NOT_CONNECTED, WAIT_CONFIG, AVAILABLE, ERR, RECONNECT
from protobufs_extra.telemetry_proto import *
from protobufs_extra.position_proto import *
import pb
#fs imports
from fastapi import FastAPI, HTTPException, WebSocket
from fastapi.responses import HTMLResponse
from contextlib import asynccontextmanager
#mongo
from pymongo import AsyncMongoClient
from dbService import DbService
#other
from botManager import BotManager
from utils import isInt, generate_random_string
class MeshArgsParse:
def __init__(self, args):
self.args = args
class MeshListener(MeshArgsParse):
def __init__(self, args):
super().__init__(args)
self.meshState = NOT_CONNECTED
self.PUB_CH = PUB_CH
self.last_packet_catch = time()
if args.transport == "serial":
from transport_serial import SerialTransport
self.transport = SerialTransport(port = args.serial_port, baudrate = args.serial_baudrate)
elif args.transport == "ble":
from transport_ble import BLETransport
self.transport = BLETransport(args.ble_mesh_mac, args.ble_adapter)
elif args.transport == "tcp":
from transport_tcp import TCPTransport
ip, port = args.tcp_address.split(":")
self.transport = TCPTransport(ip, int(port), alive_pool_connect=self.args.serial_alive_pool_seconds)
else:
logger.error("Unknown mesh transport")
sys.exit(1)
self.device = MeshtDevice(self.transport)
#task
async def meshWorker(self, queue: asyncio.Queue):
logger.info("Start mesh queue listener")
run = not self.args.disable_mesh or self.args.enable_mesh
while run:
try:
await self.device.start()
self.meshState = WAIT_CONFIG
logger.info("Mesh state: wait config")
self.init_data = await _wait_for_config_complete(self.device)
for from_radio in self.init_data:
await queue.put(from_radio)
logger.info("Mesh state: available")
self.meshState = AVAILABLE
while True:
from_radio, _ = await self.device.recv()
#logger.debug(from_radio)
await queue.put(from_radio)
self.last_packet_catch = time()
except asyncio.exceptions.CancelledError:
logger.info("Kill mesh device")
run = False
except:
logger.error("Mesh state: error")
self.meshState = ERR
traceback.print_exc()
await asyncio.sleep(1)
logger.info("Mesh state: reconnect")
self.meshState = RECONNECT
finally:
await self.device.close()
class MeshApi(MeshArgsParse):
app: FastAPI
tasks: List
context: str = "/api"
def __init__(self, args):
super().__init__(args)
self.app = FastAPI(lifespan=self.lifespan)
self.tasks = []
from authManager import AuthManager
self.authManager = AuthManager(args)
@asynccontextmanager
async def lifespan(self, app: FastAPI):
logger.info("web started, now create bg tasks")
for task in self.tasks:
asyncio.create_task(task())
yield
logger.info("kill web server")
def run(self):
import uvicorn
uvicorn.run(self.app, host=self.args.web_host, port = self.args.web_port)
class MongoDriver(MeshArgsParse):
def __init__(self, args):
super().__init__(args)
if args.mongo_url:
self.dbClient = AsyncMongoClient(args.mongo_url)
elif args.mongo_host and args.mongo_port:
self.dbClient = AsyncMongoClient(args.mongo_host, args.mongo_port)
else:
logger.error("Unknown mongo client")
sys.exit(1)
self.dbStore = self.dbClient[self.args.mongo_db]
self.dbService = DbService(self.dbStore)
async def dbSaveRadio(self, new_from_radio):
'''try:
anyJson = from_radio["packet"]
except:
logger.debug(from_radio)
return'''
#logger.debug(from_radio)
#logger.debug(len(list(from_radio.keys())))
from_radio = copy.deepcopy(new_from_radio)
for k, v in from_radio.items():
if type(v) != dict:
v = {"data": v, "ts": time()}
else:
v["ts"] = time()
if "decoded" in v:
v.update(v["decoded"])
del v["decoded"]
if "payload" in v:
try:
v["decoded_payload"] = v["payload"].decode()
except:
try:
if v.get("portnum", 0) == 67:
v["decoded_payload_object"] = pb.decode(v["payload"], TELEMETRY_SCHEME)
elif v.get("portnum", 0) == 3:
print(pb.decode(v["payload"], POSITION_SCHEME))
except:
traceback.print_exc()
pass
if "user" in v:
v.update(v["user"])
del v["user"]
await self.dbStore[k].insert_one(v)
'''async def dbQueueListener(self, queue: asyncio.Queue):
logger.info("Start db queue listener")
run = True
while run:
try:
from_radio = await queue.get()
if from_radio is None:
continue
await self.dbSaveRadio(from_radio)
except asyncio.exceptions.CancelledError:
logger.info("Kill db listener")
run = False
except:
traceback.print_exc()'''
class MeshCenter(MeshListener, MeshApi, MongoDriver, MeshArgsParse):
queue: asyncio.Queue = asyncio.Queue()
def __init__(self, args):
MeshListener.__init__(self, args)
MeshArgsParse.__init__(self, args)
MeshApi.__init__(self, args)
MongoDriver.__init__(self, args)
self.buildBackgroundTasks()
self.bot = BotManager(self)
self.extensionLoader(['webExtensions'])
async def queueHandler(self):
logger.info("Start queue handler")
run = True
while run:
try:
from_radio = await self.queue.get()
if from_radio:
yield from_radio
except asyncio.exceptions.CancelledError:
run = False
except:
traceback.print_exc()
def buildBackgroundTasks(self):
#input queue
async def mL():
await self.meshWorker(self.queue)
self.tasks.append(mL)
#output
async def handlerTask():
async for from_radio in self.queueHandler():
#logger.debug(from_radio)
asyncio.create_task(self.dbSaveRadio(from_radio))
asyncio.create_task(self.bot.handleMessage(from_radio))
self.tasks.append(handlerTask)
self.tasks.append(self.authManager.storeCleaner)
def extensionLoader(self, search_paths = []):
logger.info("Search fastapiExt")
os.chdir(os.path.dirname(os.path.abspath(__file__)))
if type(search_paths) == str:
search_paths = [search_paths]
for path in search_paths:
logger.info(f"Try found extensions in {path}")
if not os.path.exists(path) or not os.path.isdir(path):
logger.info(f"Directory is not exists or not directory, skip")
continue
sys.path.insert(0, path)
for extension in os.listdir(path):
extension, ext = os.path.splitext(extension)
if ext != ".py":
continue
logger.info(f"Found web ext: {extension}")
__import__(extension).WebExtension(self)
sys.path.pop(0)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
#mesh
parser.add_argument("--transport", default="tcp")
parser.add_argument("--disable-mesh", action="store_true", default=True)
parser.add_argument("--enable-mesh", action="store_true", default=False, help="Need to run in docker if git is bullshit updates")
#serial transport
parser.add_argument("--serial-port", default="/dev/tty.usbmodemD0CF1309DC141")
parser.add_argument("--serial-baudrate", default=115200)
parser.add_argument("--serial-alive-pool-seconds", default=60)
#ble transport
parser.add_argument("--ble-adapter", default=None)
parser.add_argument("--ble-mesh-mac", default="22AC1D28-5345-465E-2E82-18CDE5857A45")
#tcp trasponse
parser.add_argument("--tcp-address", default="192.168.3.26:8886")
#fastapi
parser.add_argument("--web-host", default="0.0.0.0")
parser.add_argument("--web-port", default=8680)
parser.add_argument("--web-salt", default=generate_random_string(32))
parser.add_argument("--web-auth-enable", default=False, action="store_true")
#mongodb
parser.add_argument("--mongo-url")
parser.add_argument("--mongo-host", default="192.168.3.2")
parser.add_argument("--mongo-port", default=27017)
parser.add_argument("--mongo-db", default="meshtastic")
a = MeshCenter(parser.parse_args())
a.run()