You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

219 lines
6.6 KiB

#sys imports
import argparse
import traceback
import sys
import asyncio
from time import time
from typing import List, Dict
import copy
from logger import logger
#mesh
from mesht_device import MeshtDevice
from mesht_models import _wait_for_config_complete
from mesht_models import NOT_CONNECTED, WAIT_CONFIG, AVAILABLE, ERR, RECONNECT
#fs imports
from fastapi import FastAPI, HTTPException, WebSocket
from fastapi.responses import HTMLResponse
from contextlib import asynccontextmanager
#mongo
from pymongo import AsyncMongoClient
#other
from botManager import BotManager
def isInt(any):
try:
int(any)
return True
except:
return False
class MeshArgsParse:
def __init__(self, args):
self.args = args
class MeshListener(MeshArgsParse):
def __init__(self, args):
super().__init__(args)
self.meshState = NOT_CONNECTED
if args.transport == "serial":
from transport_serial import SerialTransport
self.transport = SerialTransport(port = args.serial_port, baudrate = args.serial_baudrate)
self.device = MeshtDevice(self.transport)
else:
logger.error("Unknown mesh transport")
sys.exit(1)
#task
async def meshWorker(self, queue: asyncio.Queue):
logger.info("Start mesh queue listener")
run = True
while run:
try:
await self.device.start()
self.meshState = WAIT_CONFIG
self.init_data = await _wait_for_config_complete(self.device)
for from_radio in self.init_data:
await queue.put(from_radio)
self.meshState = AVAILABLE
while True:
from_radio, _ = await self.device.recv()
#logger.debug(from_radio)
await queue.put(from_radio)
except asyncio.exceptions.CancelledError:
logger.info("Kill mesh device")
run = False
except:
self.meshState = ERR
traceback.print_exc()
await asyncio.sleep(1)
self.meshState = RECONNECT
finally:
await self.device.close()
class MeshApi(MeshArgsParse):
app: FastAPI
tasks: List
def __init__(self, args):
super().__init__(args)
self.app = FastAPI(lifespan=self.lifespan)
self.tasks = []
@asynccontextmanager
async def lifespan(self, app: FastAPI):
logger.info("web started, now create bg tasks")
for task in self.tasks:
asyncio.create_task(task())
yield
logger.info("kill web server")
def run(self):
import uvicorn
uvicorn.run(self.app, host=self.args.web_host, port = self.args.web_port)
class MongoDriver(MeshArgsParse):
def __init__(self, args):
super().__init__(args)
if args.mongo_url:
self.dbClient = AsyncMongoClient(args.mongo_url)
elif args.mongo_host and args.mongo_port:
self.dbClient = AsyncMongoClient(args.mongo_host, args.mongo_port)
else:
logger.error("Unknown mongo client")
sys.exit(1)
self.dbStore = self.dbClient[self.args.mongo_db]
#self.dbCollection = self.dbStore.from_radio
async def dbSaveRadio(self, new_from_radio):
'''try:
anyJson = from_radio["packet"]
except:
logger.debug(from_radio)
return'''
#logger.debug(from_radio)
#logger.debug(len(list(from_radio.keys())))
from_radio = copy.deepcopy(new_from_radio)
for k, v in from_radio.items():
if type(v) != dict:
v = {"data": v, "ts": time()}
else:
v["ts"] = time()
if "decoded" in v:
v.update(v["decoded"])
del v["decoded"]
if "payload" in v:
try:
v["decoded_payload"] = v["payload"].decode()
except:
pass
if "user" in v:
v.update(v["user"])
del v["user"]
await self.dbStore[k].insert_one(v)
'''async def dbQueueListener(self, queue: asyncio.Queue):
logger.info("Start db queue listener")
run = True
while run:
try:
from_radio = await queue.get()
if from_radio is None:
continue
await self.dbSaveRadio(from_radio)
except asyncio.exceptions.CancelledError:
logger.info("Kill db listener")
run = False
except:
traceback.print_exc()'''
class MeshCenter(MeshListener, MeshApi, MongoDriver, MeshArgsParse):
queue: asyncio.Queue = asyncio.Queue()
def __init__(self, args):
MeshListener.__init__(self, args)
MeshArgsParse.__init__(self, args)
MeshApi.__init__(self, args)
MongoDriver.__init__(self, args)
self.buildBackgroundTasks()
self.bot = BotManager(self)
async def queueHandler(self):
logger.info("Start queue handler")
run = True
while run:
try:
from_radio = await self.queue.get()
if from_radio:
yield from_radio
except asyncio.exceptions.CancelledError:
run = False
except:
traceback.print_exc()
def buildBackgroundTasks(self):
#input queue
async def mL():
await self.meshWorker(self.queue)
self.tasks.append(mL)
#output
async def handlerTask():
async for from_radio in self.queueHandler():
#logger.debug(from_radio)
asyncio.create_task(self.dbSaveRadio(from_radio))
asyncio.create_task(self.bot.handleMessage(from_radio))
self.tasks.append(handlerTask)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
#mesh
parser.add_argument("--transport", default="serial")
parser.add_argument("--serial-port", default="/dev/tty.usbmodemD0CF1309DC141")
parser.add_argument("--serial-baudrate", default=115200)
#fastapi
parser.add_argument("--web-host", default="0.0.0.0")
parser.add_argument("--web-port", default=8680)
#mongodb
parser.add_argument("--mongo-url")
parser.add_argument("--mongo-host", default="127.0.0.1")
parser.add_argument("--mongo-port", default=27017)
parser.add_argument("--mongo-db", default="meshtastic")
a = MeshCenter(parser.parse_args())
a.run()