You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
99 lines
2.9 KiB
99 lines
2.9 KiB
import pydantic
|
|
from time import time
|
|
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
PUB_CH = 0xFFFFFFFF
|
|
PACKET = "packet"
|
|
|
|
NOT_CONNECTED = 0
|
|
WAIT_CONFIG = 1
|
|
AVAILABLE = 2
|
|
ERR = 3
|
|
RECONNECT = 4
|
|
|
|
WS_EVENT_CHANNEL = 0
|
|
WS_EVENT_NODE = 1
|
|
WS_EVENT_STATE = 2
|
|
WS_EVENT_MESSAGE = 3
|
|
WS_EVENT_MYID = 4
|
|
#WS_EVENT_MYNODE = 5
|
|
|
|
WS_TYPE_INIT = 0
|
|
WS_TYPE_NEW = 1
|
|
WS_TYPE_FRONTEND = 2
|
|
|
|
async def _wait_for_config_complete(device, extraInfo = False):
|
|
if device.skip_init:
|
|
logger.info(str(device), "init config skip")
|
|
return []
|
|
|
|
logger.info("wait config")
|
|
packets = []
|
|
while True:
|
|
from_radio, _ = await device.recv()
|
|
packets.append(from_radio)
|
|
if extraInfo:
|
|
logger.info(from_radio)
|
|
logger.info("---")
|
|
if isinstance(from_radio, dict) and from_radio.get("config_complete_id") is not None:
|
|
logger.info("config catching")
|
|
return packets
|
|
|
|
class Channel:
|
|
def __init__(self, fr):
|
|
self.index = fr["channel"].get("index", 0)
|
|
self.name = fr["channel"]["settings"].get("name", f"Channel {self.index}")
|
|
|
|
@staticmethod
|
|
def isThis(fr):
|
|
return isinstance(fr, dict) and "channel" in fr and fr["channel"].get("settings", {}).keys().__len__() > 0
|
|
|
|
class Node: #aka node info
|
|
def __init__(self, fr):
|
|
self.num = fr["node_info"]["num"]
|
|
self.user = {}
|
|
self.user["id"] = fr["node_info"].get("user", {}).get("id", None)
|
|
self.user["short_name"] = fr["node_info"].get("user", {}).get("short_name", None)
|
|
self.user["long_name"] = fr["node_info"].get("user", {}).get("long_name", None)
|
|
self.snr = fr["node_info"].get("snr", None)
|
|
self.last_heard = fr["node_info"].get("last_heard", None)
|
|
self.hops_away = fr["node_info"].get("hops_away", None)
|
|
|
|
@staticmethod
|
|
def isThis(fr):
|
|
return isinstance(fr, dict) and "node_info" in fr
|
|
|
|
def __str__(self):
|
|
return f"({self.user.get('short_name', None)}) {self.user.get('long_name', None)}"
|
|
|
|
class NewMessage(pydantic.BaseModel):
|
|
channel_id: int = 0
|
|
node_id: int = 0
|
|
txt: str
|
|
|
|
class Message:
|
|
def __init__(self, fr):
|
|
self.fr0m = fr[PACKET]["from"]
|
|
self.to = fr[PACKET]["to"]
|
|
self.msg = fr[PACKET]["decoded"]["payload"].decode("utf-8")
|
|
self.id = fr[PACKET]["id"]
|
|
self.rx_time = fr[PACKET].get("rx_time", 0)
|
|
self.rx_snr = fr[PACKET].get("rx_snr", None)
|
|
self.hop_limit = fr[PACKET].get("hop_limit", None)
|
|
self.rx_rssi = fr[PACKET].get("rx_rssi", None)
|
|
self.channel = fr[PACKET].get("channel", 0)
|
|
self.append_time = time()
|
|
|
|
@property
|
|
def isDm(self):
|
|
return self.to != PUB_CH
|
|
|
|
@property
|
|
def isPub(self):
|
|
return not self.isDm
|
|
|
|
@staticmethod
|
|
def isThis(fr):#1: "TEXT_MESSAGE_APP",
|
|
return isinstance(fr, dict) and "packet" in fr and "decoded" in fr["packet"] and fr["packet"]["decoded"]["portnum"] == 1
|