You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

254 lines
9.3 KiB

#fs imports
from fastapi import FastAPI, HTTPException, WebSocket
from fastapi.responses import HTMLResponse
from uvicorn import run as UR
from contextlib import asynccontextmanager
#sys imports
import argparse
import traceback
import sys
import asyncio
from time import time
from typing import List, Dict
import random
#msh imports
from transport_serial import SerialTransport
from mesht_device import MeshtDevice
from mesht_models import *
from mesht_models import _wait_for_config_complete
class Servlet:
app: FastAPI
transport = None
device: MeshtDevice
pulse = 0
channels: Dict[int, Channel] = {}
nodes: Dict[int, Node] = {}
message: List[Message] = []
msh_state = NOT_CONNECTED
ws_clients: List[WebSocket] = []
lstChange = {
"channels": time(),
"messages": time(),
"nodes": time(),
"state": time()
}
def __init__(self, args):
self.app = FastAPI(lifespan=self.lifespan)
self.args = args
if args.transport == "serial":
self.transport = SerialTransport(port=args.serial_port, baudrate=args.serial_baudrate)
self.device = MeshtDevice(self.transport)
self.buildRoutes()
self.buildWS()
def run(self):
UR(self.app, host=self.args.host, port=self.args.port)#, reload = True)#todo
@asynccontextmanager
async def lifespan(self, app: FastAPI):
print("server is started, now connect to mesh")
asyncio.create_task(self.meshWorker())
yield
print("server is shutdown now")
async def ws_update(self, data):
data.update({"type": WS_TYPE_NEW})
for client in self.ws_clients:
try:
await client.send_json(data)
except:
traceback.print_exc()
async def meshWorker(self):
run = True
while run:
try:
await self.device.start()
self.msh_state = WAIT_CONFIG
self.lstChange["state"] = time()
await self.ws_update({"event": WS_EVENT_STATE, "data": self.msh_state})
init_data = await _wait_for_config_complete(self.device)
for from_radio in init_data:#todo check dubs
await self.meshCollector(from_radio)
self.msh_state = AVAILABLE
self.lstChange["state"] = time()
await self.ws_update({"event": WS_EVENT_STATE, "data": self.msh_state})
await self.ws_update({"event": WS_EVENT_MYID, "data": self.device.nid})
while True:
self.pulse = time()
from_radio, _ = await self.device.recv()
await self.meshCollector(from_radio)
except asyncio.exceptions.CancelledError:
print("Web close")
run = False
pass
except:
self.msh_state = ERR
await self.ws_update({"event": WS_EVENT_STATE, "data": self.msh_state})
self.lstChange["state"] = time()
print("Mesh worker is has error, reconnect to mesh device after seconds...")
traceback.print_exc()
self.msh_state = RECONNECT
await self.ws_update({"event": WS_EVENT_STATE, "data": self.msh_state})
self.lstChange["state"] = time()
await asyncio.sleep(1)
finally:
await self.device.close()
async def meshCollector(self, from_radio):
try:
print(from_radio)
if Channel.isThis(from_radio):
print("Found channel packet")
channel = Channel(from_radio)
self.channels[channel.index] = channel
await self.ws_update({"type":WS_TYPE_NEW, "event": WS_EVENT_CHANNEL, "data": channel.__dict__})
self.lstChange["channels"] = time()
return
if Node.isThis(from_radio):
print("Found node packet")
node = Node(from_radio)
self.nodes[node.num] = node
await self.ws_update({"type":WS_TYPE_NEW, "event": WS_EVENT_NODE, "data": {"id": node.num, "name": str(node)}})
self.lstChange["nodes"] = time()
return
if Message.isThis(from_radio):
print("Found message packet")
msg = Message(from_radio)
self.message.append(msg)
await self.ws_update({"type":WS_TYPE_NEW, "event": WS_EVENT_MESSAGE, "data": msg.__dict__})
self.lstChange["messages"] = time()
return
except:
print("cannot parse packet")
traceback.print_exc()
def buildWS(self):
#from json import dumps, loads
from mesh_ws_debug import page
@self.app.get("/mesh/wsdebug", response_class=HTMLResponse)
async def wsdebug():
return page
async def ws_hello(websocket: WebSocket):
print("send hello")
try:
await websocket.send_json({"type":WS_TYPE_INIT, "event": WS_EVENT_MYID, "data": self.device.nid})
#await websocket.send_json({"type":WS_TYPE_INIT, "event": WS_EVENT_MYNODE, "data": self.nodes.get(self.device.nid).__dict__ if self.device.nid in self.nodes else None})
for channel in self.channels.values():
await websocket.send_json({"type":WS_TYPE_INIT, "event": WS_EVENT_CHANNEL, "data": channel.__dict__})
await websocket.send_json({"type":WS_TYPE_INIT, "event": WS_EVENT_STATE, "data": self.msh_state})
for node in self.nodes.values():
await websocket.send_json({"type":WS_TYPE_INIT, "event": WS_EVENT_NODE, "data": {"id": node.num, "name": str(node)}})
for message in self.message:
await websocket.send_json({"type":WS_TYPE_INIT, "event": WS_EVENT_MESSAGE, "data": message.__dict__})
except:
traceback.print_exc()
pass
@self.app.websocket("/mesh/ws")
async def ws_endpoint(websocket: WebSocket):
await websocket.accept()
self.ws_clients.append(websocket)
#грузим фуру кала
try:
await ws_hello(websocket)
while True:
msg = await websocket.receive_json()
print(msg)
except:
traceback.print_exc()
pass
finally:
self.ws_clients.remove(websocket)
try:
await websocket.close()
except:
pass
def buildRoutes(self):
route = "/mesh/api"
@self.app.get(route)
async def root():
return {"status": self.msh_state}
@self.app.get(route + "/nodes")
async def getNodeList():
return [{"id": node.num, "name": str(node)} for node in self.nodes.values()]
@self.app.get(route + "/nodes/{id}")
async def getNodeInfo(id: int):
if id in self.nodes:
#print("found")
#print(self.nodes[id].__dict__)
return self.nodes[id].__dict__
else:
raise HTTPException(404)
@self.app.get(route + "/channels")
async def getChannelList():
return [channel for channel in self.channels.values()]
@self.app.get(route + "/messages")#todo before after
async def getMessages():
return self.message
@self.app.post(route + "/messages")
#curl -X POST -d '{"txt":"test", "node_id":"xxxxxxxxxx"}' -H "Content-Type: application/json" http://127.0.0.1:8868/mesh/api/messages
async def postMessage(msg: NewMessage):
magicpkg = {
"packet": {
"from": self.device.nid,
"to": PUB_CH,
"decoded": {"payload": msg.txt.encode()},
"id": random.randint(1, 0x7FFFFFFF),
}
}
async def genNewMsg(from_radio):
msg = Message(from_radio)
self.message.append(msg)
await self.ws_update({"type":WS_TYPE_NEW, "event": WS_EVENT_MESSAGE, "data": msg.__dict__})
self.lstChange["messages"] = time()
if msg.node_id:
res = await self.device.sendMsgToDM(msg.txt, msg.node_id)
magicpkg["to"] = msg.node_id
else:
magicpkg["channel"] = msg.channel_id
res = await self.device.sendMsgToChannel(msg.txt, msg.channel_id)
await genNewMsg(magicpkg)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
#msh
parser.add_argument("--transport", default="serial")
parser.add_argument("--serial-port", default="/dev/tty.usbmodemD0CF1309D56C1")#todo
parser.add_argument("--serial-baudrate", default=115200)
#srv
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", default=8868)
args = parser.parse_args()
if (args.transport == "serial"):
Servlet(args).run()
else:
print("Exists only serial transport")
sys.exit(1)