You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

99 lines
2.9 KiB

import pydantic
from time import time
import logging
logger = logging.getLogger(__name__)
PUB_CH = 0xFFFFFFFF
PACKET = "packet"
NOT_CONNECTED = 0
WAIT_CONFIG = 1
AVAILABLE = 2
ERR = 3
RECONNECT = 4
WS_EVENT_CHANNEL = 0
WS_EVENT_NODE = 1
WS_EVENT_STATE = 2
WS_EVENT_MESSAGE = 3
WS_EVENT_MYID = 4
#WS_EVENT_MYNODE = 5
WS_TYPE_INIT = 0
WS_TYPE_NEW = 1
WS_TYPE_FRONTEND = 2
async def _wait_for_config_complete(device, extraInfo = False):
if device.skip_init:
logger.info(str(device), "init config skip")
return []
logger.info("wait config")
packets = []
while True:
from_radio, _ = await device.recv()
packets.append(from_radio)
if extraInfo:
logger.info(from_radio)
logger.info("---")
if isinstance(from_radio, dict) and from_radio.get("config_complete_id") is not None:
logger.info("config catching")
return packets
class Channel:
def __init__(self, fr):
self.index = fr["channel"].get("index", 0)
self.name = fr["channel"]["settings"].get("name", f"Channel {self.index}")
@staticmethod
def isThis(fr):
return isinstance(fr, dict) and "channel" in fr and fr["channel"].get("settings", {}).keys().__len__() > 0
class Node: #aka node info
def __init__(self, fr):
self.num = fr["node_info"]["num"]
self.user = {}
self.user["id"] = fr["node_info"].get("user", {}).get("id", None)
self.user["short_name"] = fr["node_info"].get("user", {}).get("short_name", None)
self.user["long_name"] = fr["node_info"].get("user", {}).get("long_name", None)
self.snr = fr["node_info"].get("snr", None)
self.last_heard = fr["node_info"].get("last_heard", None)
self.hops_away = fr["node_info"].get("hops_away", None)
@staticmethod
def isThis(fr):
return isinstance(fr, dict) and "node_info" in fr
def __str__(self):
return f"({self.user.get('short_name', None)}) {self.user.get('long_name', None)}"
class NewMessage(pydantic.BaseModel):
channel_id: int = 0
node_id: int = 0
txt: str
class Message:
def __init__(self, fr):
self.fr0m = fr[PACKET]["from"]
self.to = fr[PACKET]["to"]
self.msg = fr[PACKET]["decoded"]["payload"].decode("utf-8")
self.id = fr[PACKET]["id"]
self.rx_time = fr[PACKET].get("rx_time", 0)
self.rx_snr = fr[PACKET].get("rx_snr", None)
self.hop_limit = fr[PACKET].get("hop_limit", None)
self.rx_rssi = fr[PACKET].get("rx_rssi", None)
self.channel = fr[PACKET].get("channel", 0)
self.append_time = time()
@property
def isDm(self):
return self.to != PUB_CH
@property
def isPub(self):
return not self.isDm
@staticmethod
def isThis(fr):#1: "TEXT_MESSAGE_APP",
return isinstance(fr, dict) and "packet" in fr and "decoded" in fr["packet"] and fr["packet"]["decoded"]["portnum"] == 1