import pydantic from time import time import logging logger = logging.getLogger(__name__) PUB_CH = 0xFFFFFFFF PACKET = "packet" NOT_CONNECTED = 0 WAIT_CONFIG = 1 AVAILABLE = 2 ERR = 3 RECONNECT = 4 WS_EVENT_CHANNEL = 0 WS_EVENT_NODE = 1 WS_EVENT_STATE = 2 WS_EVENT_MESSAGE = 3 WS_EVENT_MYID = 4 #WS_EVENT_MYNODE = 5 WS_TYPE_INIT = 0 WS_TYPE_NEW = 1 WS_TYPE_FRONTEND = 2 async def _wait_for_config_complete(device, extraInfo = False): if device.skip_init: logger.info(str(device), "init config skip") return [] logger.info("wait config") packets = [] while True: from_radio, _ = await device.recv() packets.append(from_radio) if extraInfo: logger.info(from_radio) logger.info("---") if isinstance(from_radio, dict) and from_radio.get("config_complete_id") is not None: logger.info("config catching") return packets class Channel: def __init__(self, fr): self.index = fr["channel"].get("index", 0) self.name = fr["channel"]["settings"].get("name", f"Channel {self.index}") @staticmethod def isThis(fr): return isinstance(fr, dict) and "channel" in fr and fr["channel"].get("settings", {}).keys().__len__() > 0 class Node: #aka node info def __init__(self, fr): self.num = fr["node_info"]["num"] self.user = {} self.user["id"] = fr["node_info"].get("user", {}).get("id", None) self.user["short_name"] = fr["node_info"].get("user", {}).get("short_name", None) self.user["long_name"] = fr["node_info"].get("user", {}).get("long_name", None) self.snr = fr["node_info"].get("snr", None) self.last_heard = fr["node_info"].get("last_heard", None) self.hops_away = fr["node_info"].get("hops_away", None) @staticmethod def isThis(fr): return isinstance(fr, dict) and "node_info" in fr def __str__(self): return f"({self.user.get('short_name', None)}) {self.user.get('long_name', None)}" class NewMessage(pydantic.BaseModel): channel_id: int = 0 node_id: int = 0 txt: str class Message: def __init__(self, fr): self.fr0m = fr[PACKET]["from"] self.to = fr[PACKET]["to"] self.msg = fr[PACKET]["decoded"]["payload"].decode("utf-8") self.id = fr[PACKET]["id"] self.rx_time = fr[PACKET].get("rx_time", 0) self.rx_snr = fr[PACKET].get("rx_snr", None) self.hop_limit = fr[PACKET].get("hop_limit", None) self.rx_rssi = fr[PACKET].get("rx_rssi", None) self.channel = fr[PACKET].get("channel", 0) self.append_time = time() @property def isDm(self): return self.to != PUB_CH @property def isPub(self): return not self.isDm @staticmethod def isThis(fr):#1: "TEXT_MESSAGE_APP", return isinstance(fr, dict) and "packet" in fr and "decoded" in fr["packet"] and fr["packet"]["decoded"]["portnum"] == 1