commit
083f362f25
8 changed files with 1295 additions and 0 deletions
@ -0,0 +1,2 @@ |
|||
testenv/ |
|||
__pycache__/ |
|||
@ -0,0 +1,19 @@ |
|||
services: |
|||
mongodb: |
|||
image: docker.pblr-nyk.pro/mongo:8.2.4 |
|||
container_name: mongodb-0 |
|||
ports: |
|||
- 27017:27017 |
|||
volumes: |
|||
- ./testenv/mongodb:/data/db |
|||
|
|||
mongoExpress: |
|||
image: docker.pblr-nyk.pro/mongo-express |
|||
container_name: mongoExpress-0 |
|||
ports: |
|||
- 8081:8081 |
|||
environment: |
|||
- ME_CONFIG_MONGODB_URL=mongodb://mongodb:27017 |
|||
depends_on: |
|||
- mongodb |
|||
|
|||
@ -0,0 +1,300 @@ |
|||
import asyncio |
|||
import random |
|||
import pb |
|||
import logging |
|||
logger = logging.getLogger(__name__) |
|||
from mesht_models import * |
|||
|
|||
|
|||
DATA_SCHEMA = [ |
|||
("varint", "portnum", 1), |
|||
("bytes", "payload", 2), |
|||
("bool", "want_response", 3), |
|||
("fixed32", "dest", 4), |
|||
("fixed32", "source", 5), |
|||
("fixed32", "request_id", 6), |
|||
("fixed32", "reply_id", 7), |
|||
("fixed32", "emoji", 8), |
|||
("uint32", "bitfield", 9), |
|||
] |
|||
|
|||
MESHPACKET_SCHEMA = [ |
|||
("fixed32", "from", 1), |
|||
("fixed32", "to", 2), |
|||
("uint32", "channel", 3), |
|||
("oneof", "payload_variant", [ |
|||
(DATA_SCHEMA, "decoded", 4), |
|||
("bytes", "encrypted", 5), |
|||
]), |
|||
("fixed32", "id", 6), |
|||
("fixed32", "rx_time", 7), |
|||
("float", "rx_snr", 8), |
|||
("uint32", "hop_limit", 9), |
|||
("bool", "want_ack", 10), |
|||
("int32", "priority", 11), |
|||
("int32", "rx_rssi", 12), |
|||
("bool", "via_mqtt", 14), |
|||
("uint32", "hop_start", 15), |
|||
("bytes", "public_key", 16), |
|||
("bool", "pki_encrypted", 17), |
|||
("uint32", "next_hop", 18), |
|||
("uint32", "relay_node", 19), |
|||
("uint32", "tx_after", 20), |
|||
] |
|||
|
|||
CHANNEL_SETTINGS_SCHEMA = [ |
|||
("uint32", "channel_num", 1), |
|||
("string", "name", 3), |
|||
] |
|||
|
|||
CHANNEL_SCHEMA = [ |
|||
("int32", "index", 1), |
|||
(CHANNEL_SETTINGS_SCHEMA, "settings", 2), |
|||
("int32", "role", 3), |
|||
] |
|||
|
|||
USER_SCHEMA = [ |
|||
("string", "id", 1), |
|||
("string", "long_name", 2), |
|||
("string", "short_name", 3), |
|||
("int32", "hw_model", 5), |
|||
("bool", "is_licensed", 6), |
|||
("int32", "role", 7), |
|||
("bytes", "public_key", 8), |
|||
] |
|||
|
|||
DEVICEMETRICS_SCHEMA = [ |
|||
("uint32", "battery_level", 1), |
|||
("float", "voltage", 2), |
|||
] |
|||
|
|||
MYNODEINFO_SCHEMA = [ |
|||
("uint32", "my_node_num", 1), |
|||
] |
|||
|
|||
NODEINFO_SCHEMA = [ |
|||
("uint32", "num", 1), |
|||
(USER_SCHEMA, "user", 2), |
|||
("float", "snr", 4), |
|||
("fixed32", "last_heard", 5), |
|||
(DEVICEMETRICS_SCHEMA, "device_metrics", 6), |
|||
("uint32", "hops_away", 9), |
|||
] |
|||
|
|||
LORACONFIG_SCHEMA = [ |
|||
("bool", "use_preset", 1), |
|||
("int32", "modem_preset", 2), |
|||
("int32", "region", 7), |
|||
] |
|||
|
|||
CONFIG_SCHEMA = [ |
|||
("oneof", "variant", [ |
|||
(LORACONFIG_SCHEMA, "lora", 6), |
|||
]), |
|||
] |
|||
|
|||
FROMRADIO_SCHEMA = [ |
|||
("uint32", "id", 1), |
|||
("oneof", "payload_variant", [ |
|||
(MESHPACKET_SCHEMA, "packet", 2), |
|||
(MYNODEINFO_SCHEMA, "my_info", 3), |
|||
(NODEINFO_SCHEMA, "node_info", 4), |
|||
(CONFIG_SCHEMA, "config", 5), |
|||
("uint32", "config_complete_id", 7), |
|||
(CHANNEL_SCHEMA, "channel", 10), |
|||
]), |
|||
] |
|||
|
|||
TORADIO_SCHEMA = [ |
|||
(MESHPACKET_SCHEMA, "packet", 1), |
|||
("uint32", "want_config_id", 3), |
|||
] |
|||
|
|||
PRESET_NAMES = { |
|||
0: "LongFast", |
|||
1: "LongSlow", |
|||
2: "VeryLongSlow", |
|||
3: "MediumSlow", |
|||
4: "MediumFast", |
|||
5: "ShortSlow", |
|||
6: "ShortFast", |
|||
7: "LongModerate", |
|||
8: "ShortTurbo", |
|||
} |
|||
|
|||
REGION_NAMES = { |
|||
0: "UNSET", |
|||
1: "US", |
|||
2: "EU_433", |
|||
3: "EU_868", |
|||
4: "CN", |
|||
5: "JP", |
|||
6: "ANZ", |
|||
7: "KR", |
|||
8: "TW", |
|||
9: "RU", |
|||
10: "IN", |
|||
11: "NZ_865", |
|||
12: "TH", |
|||
13: "LORA_24", |
|||
14: "UA_433", |
|||
15: "UA_868", |
|||
16: "MY_433", |
|||
17: "MY_919", |
|||
18: "SG_923", |
|||
19: "PH_433", |
|||
20: "PH_868", |
|||
21: "PH_915", |
|||
22: "ANZ_433", |
|||
23: "KZ_433", |
|||
24: "KZ_863", |
|||
25: "NP_865", |
|||
26: "BR_902", |
|||
} |
|||
|
|||
PORTNUMS = { |
|||
0: "UNKNOWN_APP", |
|||
1: "TEXT_MESSAGE_APP", |
|||
2: "REMOTE_HARDWARE_APP", |
|||
3: "POSITION_APP", |
|||
4: "NODEINFO_APP", |
|||
5: "ROUTING_APP", |
|||
6: "ADMIN_APP", |
|||
7: "TEXT_MESSAGE_COMPRESSED_APP", |
|||
8: "WAYPOINT_APP", |
|||
9: "AUDIO_APP", |
|||
10: "DETECTION_SENSOR_APP", |
|||
11: "ALERT_APP", |
|||
32: "REPLY_APP", |
|||
33: "IP_TUNNEL_APP", |
|||
34: "PAXCOUNTER_APP", |
|||
64: "SERIAL_APP", |
|||
65: "STORE_FORWARD_APP", |
|||
66: "RANGE_TEST_APP", |
|||
67: "TELEMETRY_APP", |
|||
68: "ZPS_APP", |
|||
69: "SIMULATOR_APP", |
|||
70: "TRACEROUTE_APP", |
|||
71: "NEIGHBORINFO_APP", |
|||
72: "ATAK_PLUGIN", |
|||
73: "MAP_REPORT_APP", |
|||
74: "POWERSTRESS_APP", |
|||
76: "RETICULUM_TUNNEL_APP", |
|||
256: "PRIVATE_APP", |
|||
257: "ATAK_FORWARDER", |
|||
511: "MAX", |
|||
} |
|||
NAMES_TO_PORTNUMS = {v: k for k, v in PORTNUMS.items()} |
|||
|
|||
|
|||
class Channel: |
|||
def __init__(self, index, name, role): |
|||
self.index = int(index) |
|||
self.name = name |
|||
self.role = int(role or 0) |
|||
|
|||
|
|||
class MeshtDevice: |
|||
def __init__(self, transport): |
|||
self.transport = transport |
|||
self.channels = [] |
|||
self.lora_config = None |
|||
# Track local node number from MyNodeInfo |
|||
self.my_node_id = "00000000" |
|||
self.nid = None |
|||
|
|||
async def start(self): |
|||
await self.transport.start() |
|||
nonce = random.randint(1, 1_000_000_000) |
|||
logger.debug("MeshtDevice.start: sending want_config_id nonce=%s", nonce) |
|||
await self.transport.send(pb.encode({"want_config_id": nonce}, TORADIO_SCHEMA)) |
|||
|
|||
async def close(self): |
|||
return await self.transport.close() |
|||
|
|||
async def sendMsgToChannel(self, text, channel_index = 0): |
|||
return await self.send_text(text, channel_index= channel_index) |
|||
|
|||
async def sendMsgToDM(self, text, num): |
|||
return await self.send_text(text, num) |
|||
|
|||
async def send_text(self, text, num = 0xFFFFFFFF, channel_index = 0): |
|||
data = { |
|||
"portnum": NAMES_TO_PORTNUMS["TEXT_MESSAGE_APP"], |
|||
"payload": text.encode("utf-8"), |
|||
"want_response": False, |
|||
} |
|||
meshpacket = { |
|||
"id": random.randint(1, 0x7FFFFFFF), |
|||
#https://github.com/meshtastic/python/blob/5cc0dae3947cd72f5a05d079a93751fc924afac6/meshtastic/mesh_interface.py#L935 |
|||
"to": num, #3148365392,#0xFFFFFFFF, |
|||
"channel": int(channel_index), |
|||
"want_ack": True, |
|||
"decoded": data, |
|||
} |
|||
payload = pb.encode({"packet": meshpacket}, TORADIO_SCHEMA) |
|||
await self.transport.send(payload) |
|||
# Return the meshpacket details for logging by callers |
|||
return meshpacket |
|||
|
|||
async def recv(self): |
|||
data = await self.transport.recv() |
|||
fr = pb.decode(data, FROMRADIO_SCHEMA) |
|||
logger.debug(f"FromRadio: {fr}") |
|||
self._maybe_store_channel(fr) |
|||
self._maybe_store_lora_config(fr) |
|||
self._maybe_store_my_node(fr) |
|||
return fr, data |
|||
|
|||
def get_channel_index(self, name): |
|||
for ch in self.channels: |
|||
if ch.name == name: |
|||
return ch.index |
|||
return None |
|||
|
|||
def get_channels(self): |
|||
return list(self.channels) |
|||
|
|||
def _maybe_store_my_node(self, from_radio): |
|||
mi = from_radio.get("my_info") |
|||
if not mi: |
|||
return |
|||
|
|||
self.nid = mi.get("my_node_num", 0) |
|||
self.my_node_id = f"{self.nid & 0xFFFFFFFF:08x}" |
|||
|
|||
def _maybe_store_channel(self, from_radio): |
|||
if not isinstance(from_radio, dict): |
|||
return |
|||
ch = from_radio.get("channel") |
|||
if not isinstance(ch, dict): |
|||
return |
|||
# protobuf default for 0 means 'missing' -> treat as index 0 |
|||
idx = int(ch.get("index") or 0) |
|||
name = (ch.get("settings") or {}).get("name") or f"Channel {idx}" |
|||
role = int(ch.get("role") or 0) |
|||
|
|||
for i, existing in enumerate(self.channels): |
|||
if existing.index == idx: |
|||
if role == 0: |
|||
# Remove if disabled |
|||
self.channels.pop(i) |
|||
return |
|||
existing.name = name |
|||
existing.role = role |
|||
break |
|||
else: |
|||
if role != 0: |
|||
self.channels.append(Channel(idx, name, role)) |
|||
|
|||
self.channels.sort(key=lambda c: c.index) |
|||
|
|||
def _maybe_store_lora_config(self, from_radio): |
|||
if not isinstance(from_radio, dict): |
|||
return |
|||
cfg = from_radio.get("config") |
|||
if not isinstance(cfg, dict): |
|||
return |
|||
lora = cfg.get("lora") |
|||
if isinstance(lora, dict): |
|||
self.lora_config = lora |
|||
@ -0,0 +1,95 @@ |
|||
import pydantic |
|||
from time import time |
|||
|
|||
import logging |
|||
logger = logging.getLogger(__name__) |
|||
|
|||
PUB_CH = 0xFFFFFFFF |
|||
PACKET = "packet" |
|||
|
|||
NOT_CONNECTED = 0 |
|||
WAIT_CONFIG = 1 |
|||
AVAILABLE = 2 |
|||
ERR = 3 |
|||
RECONNECT = 4 |
|||
|
|||
WS_EVENT_CHANNEL = 0 |
|||
WS_EVENT_NODE = 1 |
|||
WS_EVENT_STATE = 2 |
|||
WS_EVENT_MESSAGE = 3 |
|||
WS_EVENT_MYID = 4 |
|||
#WS_EVENT_MYNODE = 5 |
|||
|
|||
WS_TYPE_INIT = 0 |
|||
WS_TYPE_NEW = 1 |
|||
WS_TYPE_FRONTEND = 2 |
|||
|
|||
async def _wait_for_config_complete(device, extraInfo = False): |
|||
logger.info("wait config") |
|||
packets = [] |
|||
while True: |
|||
from_radio, _ = await device.recv() |
|||
packets.append(from_radio) |
|||
if extraInfo: |
|||
logger.info(from_radio) |
|||
logger.info("---") |
|||
if isinstance(from_radio, dict) and from_radio.get("config_complete_id") is not None: |
|||
logger.info("config catching") |
|||
return packets |
|||
|
|||
class Channel: |
|||
def __init__(self, fr): |
|||
self.index = fr["channel"].get("index", 0) |
|||
self.name = fr["channel"]["settings"].get("name", f"Channel {self.index}") |
|||
|
|||
@staticmethod |
|||
def isThis(fr): |
|||
return isinstance(fr, dict) and "channel" in fr and fr["channel"].get("settings", {}).keys().__len__() > 0 |
|||
|
|||
class Node: #aka node info |
|||
def __init__(self, fr): |
|||
self.num = fr["node_info"]["num"] |
|||
self.user = {} |
|||
self.user["id"] = fr["node_info"].get("user", {}).get("id", None) |
|||
self.user["short_name"] = fr["node_info"].get("user", {}).get("short_name", None) |
|||
self.user["long_name"] = fr["node_info"].get("user", {}).get("long_name", None) |
|||
self.snr = fr["node_info"].get("snr", None) |
|||
self.last_heard = fr["node_info"].get("last_heard", None) |
|||
self.hops_away = fr["node_info"].get("hops_away", None) |
|||
|
|||
@staticmethod |
|||
def isThis(fr): |
|||
return isinstance(fr, dict) and "node_info" in fr |
|||
|
|||
def __str__(self): |
|||
return f"({self.user.get('short_name', None)}) {self.user.get('long_name', None)}" |
|||
|
|||
class NewMessage(pydantic.BaseModel): |
|||
channel_id: int = 0 |
|||
node_id: int = 0 |
|||
txt: str |
|||
|
|||
class Message: |
|||
def __init__(self, fr): |
|||
self.fr0m = fr[PACKET]["from"] |
|||
self.to = fr[PACKET]["to"] |
|||
self.msg = fr[PACKET]["decoded"]["payload"].decode("utf-8") |
|||
self.id = fr[PACKET]["id"] |
|||
self.rx_time = fr[PACKET].get("rx_time", 0) |
|||
self.rx_snr = fr[PACKET].get("rx_snr", None) |
|||
self.hop_limit = fr[PACKET].get("hop_limit", None) |
|||
self.rx_rssi = fr[PACKET].get("rx_rssi", None) |
|||
self.channel = fr[PACKET].get("channel", 0) |
|||
self.append_time = time() |
|||
|
|||
@property |
|||
def isDm(self): |
|||
return self.to != PUB_CH |
|||
|
|||
@property |
|||
def isPub(self): |
|||
return not self.isDm |
|||
|
|||
@staticmethod |
|||
def isThis(fr):#1: "TEXT_MESSAGE_APP", |
|||
return isinstance(fr, dict) and "packet" in fr and "decoded" in fr["packet"] and fr["packet"]["decoded"]["portnum"] == 1 |
|||
@ -0,0 +1,554 @@ |
|||
""" |
|||
https://github.com/allanrbo/pb.py |
|||
""" |
|||
|
|||
import struct |
|||
|
|||
|
|||
_MAX_VARINT_BYTES = 10 # uint64 max |
|||
_MAX_NESTING_DEPTH = 100 |
|||
|
|||
|
|||
def _read_varint(b, i): |
|||
shift = 0 |
|||
n = 0 |
|||
read = 0 |
|||
blen = len(b) |
|||
while True: |
|||
if i >= blen: |
|||
raise ValueError("truncated varint") |
|||
c = b[i] |
|||
i += 1 |
|||
read += 1 |
|||
n |= (c & 0x7F) << shift |
|||
if not (c & 0x80): |
|||
break |
|||
if read >= _MAX_VARINT_BYTES: |
|||
raise ValueError("varint too long") |
|||
shift += 7 |
|||
return n, i |
|||
|
|||
|
|||
def _write_varint(n): |
|||
if n < 0: |
|||
raise ValueError("varint requires non-negative integer; use 'sint' for signed") |
|||
out = bytearray() |
|||
while True: |
|||
to_write = n & 0x7F |
|||
n >>= 7 |
|||
if n: |
|||
out.append(to_write | 0x80) |
|||
else: |
|||
out.append(to_write) |
|||
break |
|||
return bytes(out) |
|||
|
|||
|
|||
def zigzag_encode(n): |
|||
return (n << 1) ^ (n >> 63) |
|||
|
|||
|
|||
def zigzag_decode(n): |
|||
return (n >> 1) ^ -(n & 1) |
|||
|
|||
|
|||
def _zz32(n): |
|||
# zigzag for 32-bit signed |
|||
n = int(n) |
|||
if n < -0x80000000 or n > 0x7FFFFFFF: |
|||
raise ValueError("sint32 out of range") |
|||
v = (n << 1) ^ (n >> 31) |
|||
return v & 0xFFFFFFFF |
|||
|
|||
|
|||
def _i32_from_uvarint(v): |
|||
v &= 0xFFFFFFFF |
|||
return v - 0x100000000 if v >= 0x80000000 else v |
|||
|
|||
|
|||
def _i64_from_uvarint(v): |
|||
return v - 0x10000000000000000 if v >= 0x8000000000000000 else v |
|||
|
|||
|
|||
def _i32_to_uvarint(n): |
|||
# represent signed 32-bit as 64-bit varint two's complement |
|||
n = int(n) |
|||
if n < 0: |
|||
u = (n & 0xFFFFFFFF) | 0xFFFFFFFF00000000 |
|||
else: |
|||
u = n & 0xFFFFFFFF |
|||
return u & 0xFFFFFFFFFFFFFFFF |
|||
|
|||
|
|||
def _read_fixed32(b, i): |
|||
if i + 4 > len(b): |
|||
raise ValueError("truncated fixed32") |
|||
v = struct.unpack_from("<I", b, i)[0] |
|||
return v, i + 4 |
|||
|
|||
|
|||
def _write_fixed32(v): |
|||
return struct.pack("<I", v) |
|||
|
|||
|
|||
def _read_fixed64(b, i): |
|||
if i + 8 > len(b): |
|||
raise ValueError("truncated fixed64") |
|||
v = struct.unpack_from("<Q", b, i)[0] |
|||
return v, i + 8 |
|||
|
|||
|
|||
def _write_fixed64(v): |
|||
return struct.pack("<Q", v) |
|||
|
|||
|
|||
def _write_sfixed32(v): |
|||
return struct.pack("<i", int(v)) |
|||
|
|||
|
|||
def _write_sfixed64(v): |
|||
return struct.pack("<q", int(v)) |
|||
|
|||
|
|||
_ScalarTypes = { |
|||
# integer families |
|||
"varint", "uint64", "uint32", "int64", "int32", "sint", "sint64", "sint32", |
|||
# signed fixed-width integers |
|||
"sfixed32", "sfixed64", |
|||
# fixed-width integer/float |
|||
"fixed32", "fixed64", "float", "double", |
|||
# misc |
|||
"bool", "string", "bytes", |
|||
} |
|||
|
|||
|
|||
def _normalize_schema(schema): |
|||
if not schema: |
|||
return {"fields": {}, "names": {}} |
|||
# Pass through if already normalized |
|||
if isinstance(schema, dict) and "fields" in schema and "names" in schema: |
|||
return schema |
|||
fields, names = {}, {} |
|||
for item in schema: |
|||
if not isinstance(item, tuple): |
|||
raise TypeError("schema must be list of tuples") |
|||
|
|||
# oneof group: ("oneof", group_name, [ alternatives ]) |
|||
if len(item) == 3 and item[0] == "oneof": |
|||
_, group_name, alts = item |
|||
if not isinstance(alts, (list, tuple)): |
|||
raise ValueError("oneof alternatives must be a list") |
|||
# Normalize each alternative and mark with oneof group |
|||
for alt in alts: |
|||
if not isinstance(alt, tuple): |
|||
raise TypeError("oneof alternative must be a tuple") |
|||
# Allow both 3-tuple and 4-tuple forms for alternatives |
|||
if len(alt) == 3: |
|||
typ_or_schema, name, fid = alt |
|||
if isinstance(typ_or_schema, (list, tuple)): |
|||
fs = {"kind": "message", "schema": _normalize_schema(typ_or_schema), "name": name, "oneof": group_name} |
|||
else: |
|||
typ = typ_or_schema |
|||
if typ not in _ScalarTypes: |
|||
raise ValueError(f"unknown scalar type {typ}") |
|||
fs = {"kind": "scalar", "type": typ, "name": name, "oneof": group_name} |
|||
fields[int(fid)] = fs |
|||
names[name] = int(fid) |
|||
elif len(alt) == 4: |
|||
kind, name, fid, spec = alt |
|||
if kind == "message": |
|||
fs = {"kind": "message", "schema": _normalize_schema(spec), "name": name, "oneof": group_name} |
|||
elif kind == "scalar": |
|||
if spec not in _ScalarTypes: |
|||
raise ValueError(f"unknown scalar type {spec}") |
|||
fs = {"kind": "scalar", "type": spec, "name": name, "oneof": group_name} |
|||
else: |
|||
raise ValueError("oneof alternatives must be scalar or message singletons") |
|||
fields[int(fid)] = fs |
|||
names[name] = int(fid) |
|||
else: |
|||
raise ValueError("unsupported tuple length in oneof alternative") |
|||
continue |
|||
|
|||
if len(item) == 3: |
|||
typ_or_schema, name, fid = item |
|||
# Message singletons: first element is a subschema list |
|||
if isinstance(typ_or_schema, (list, tuple)): |
|||
fields[int(fid)] = {"kind": "message", "schema": _normalize_schema(typ_or_schema), "name": name} |
|||
else: |
|||
typ = typ_or_schema |
|||
if typ not in _ScalarTypes: |
|||
raise ValueError(f"unknown scalar type {typ}") |
|||
fields[int(fid)] = {"kind": "scalar", "type": typ, "name": name} |
|||
names[name] = int(fid) |
|||
elif len(item) == 4: |
|||
kind, name, fid, spec = item |
|||
if kind == "packed": |
|||
if spec not in {"varint", "uint64", "uint32", "int64", "int32", "sint", "sint64", "sint32", "fixed32", "fixed64", "sfixed32", "sfixed64", "float", "double", "bool"}: |
|||
raise ValueError(f"unsupported packed elem {spec}") |
|||
fields[int(fid)] = {"kind": "packed", "type": spec, "name": name} |
|||
elif kind == "message": |
|||
fields[int(fid)] = {"kind": "message", "schema": _normalize_schema(spec), "name": name} |
|||
elif kind == "repeated": |
|||
# spec can be scalar type or subschema list for repeated message |
|||
if isinstance(spec, (list, tuple)): |
|||
fields[int(fid)] = {"kind": "repeated", "schema": _normalize_schema(spec), "name": name} |
|||
else: |
|||
if spec not in _ScalarTypes: |
|||
raise ValueError(f"unknown scalar type {spec}") |
|||
fields[int(fid)] = {"kind": "repeated", "type": spec, "name": name} |
|||
elif kind == "scalar": |
|||
# Optional synonym: ("scalar", name, id, type) |
|||
if spec not in _ScalarTypes: |
|||
raise ValueError(f"unknown scalar type {spec}") |
|||
fields[int(fid)] = {"kind": "scalar", "type": spec, "name": name} |
|||
else: |
|||
raise ValueError(f"unknown field kind {kind}") |
|||
names[name] = int(fid) |
|||
else: |
|||
raise ValueError("unsupported tuple length in schema") |
|||
|
|||
return {"fields": fields, "names": names} |
|||
|
|||
|
|||
def _encode_packed(vals, elem_type): |
|||
out = bytearray() |
|||
for v in vals: |
|||
if elem_type in {"varint", "uint64"}: |
|||
out += _write_varint(int(v)) |
|||
elif elem_type == "uint32": |
|||
out += _write_varint(int(v) & 0xFFFFFFFF) |
|||
elif elem_type == "int64": |
|||
out += _write_varint(int(v) & 0xFFFFFFFFFFFFFFFF) |
|||
elif elem_type == "int32": |
|||
out += _write_varint(_i32_to_uvarint(v)) |
|||
elif elem_type == "sint": |
|||
out += _write_varint(zigzag_encode(int(v))) |
|||
elif elem_type == "sint64": |
|||
out += _write_varint(zigzag_encode(int(v))) |
|||
elif elem_type == "sint32": |
|||
out += _write_varint(_zz32(v)) |
|||
elif elem_type == "fixed32": |
|||
out += _write_fixed32(int(v)) |
|||
elif elem_type == "sfixed32": |
|||
out += _write_sfixed32(v) |
|||
elif elem_type == "fixed64": |
|||
out += _write_fixed64(int(v)) |
|||
elif elem_type == "sfixed64": |
|||
out += _write_sfixed64(v) |
|||
elif elem_type == "float": |
|||
out += struct.pack("<f", float(v)) |
|||
elif elem_type == "double": |
|||
out += struct.pack("<d", float(v)) |
|||
elif elem_type == "bool": |
|||
out += _write_varint(1 if bool(v) else 0) |
|||
else: |
|||
raise ValueError("unsupported packed elem {}".format(elem_type)) |
|||
|
|||
return bytes(out) |
|||
|
|||
|
|||
def _write_key(field, wt): |
|||
return _write_varint((field << 3) | wt) |
|||
|
|||
|
|||
# Encoder (proto3 binary) |
|||
def encode(values, schema=None): |
|||
ns = _normalize_schema(schema) |
|||
out = bytearray() |
|||
|
|||
def _ld_write(field, payload): |
|||
out.extend(_write_key(field, 2)) |
|||
out.extend(_write_varint(len(payload))) |
|||
out.extend(payload) |
|||
|
|||
def _var_write(field, val): |
|||
out.extend(_write_key(field, 0)) |
|||
out.extend(_write_varint(val)) |
|||
|
|||
# Pre-scan to enforce oneof constraint: at most one set per group |
|||
groups = {} |
|||
for key in values.keys(): |
|||
if isinstance(key, int): |
|||
field = key |
|||
else: |
|||
field = ns["names"].get(key) |
|||
if field is None: |
|||
continue |
|||
spec = ns["fields"].get(field) |
|||
if not spec: |
|||
continue |
|||
grp = spec.get("oneof") if isinstance(spec, dict) else None |
|||
if grp: |
|||
groups.setdefault(grp, []).append(key) |
|||
|
|||
for grp, ks in groups.items(): |
|||
if len(ks) > 1: |
|||
raise ValueError(f"oneof group '{grp}' has multiple fields set: {ks}") |
|||
|
|||
# Emit fields in caller-provided order |
|||
for key, val in values.items(): |
|||
# Allow using field names; map to number if needed |
|||
if isinstance(key, int): |
|||
field = key |
|||
else: |
|||
field = ns["names"].get(key) |
|||
if field is None: |
|||
raise KeyError(f"unknown field name '{key}' in schema") |
|||
spec = ns["fields"].get(field) |
|||
if not spec: |
|||
raise KeyError(f"field {field} not defined in schema") |
|||
|
|||
if spec["kind"] == "packed": |
|||
seq = val if isinstance(val, list) else [val] |
|||
packed = _encode_packed(seq, spec["type"]) |
|||
_ld_write(field, packed) |
|||
continue |
|||
|
|||
# For repeated, the value must be a list; otherwise wrap singletons |
|||
is_repeated = (spec["kind"] == "repeated") |
|||
vals = val if isinstance(val, list) else ([val] if is_repeated else [val]) |
|||
for v in vals: |
|||
t = spec.get("type") if spec else None |
|||
if spec["kind"] == "scalar" and t in {"varint", "uint64"}: |
|||
_var_write(field, int(v)) |
|||
elif spec["kind"] == "scalar" and t == "uint32": |
|||
_var_write(field, int(v) & 0xFFFFFFFF) |
|||
elif spec["kind"] == "scalar" and t == "int64": |
|||
_var_write(field, int(v) & 0xFFFFFFFFFFFFFFFF) |
|||
elif spec["kind"] == "scalar" and t == "int32": |
|||
u = _i32_to_uvarint(v) |
|||
_var_write(field, u) |
|||
elif spec["kind"] == "scalar" and t in {"sint", "sint64"}: |
|||
_var_write(field, zigzag_encode(int(v))) |
|||
elif spec["kind"] == "scalar" and t == "sint32": |
|||
zz = _zz32(v) |
|||
_var_write(field, zz) |
|||
elif spec["kind"] == "scalar" and t == "fixed32": |
|||
out += _write_key(field, 5) |
|||
out += _write_fixed32(int(v)) |
|||
elif spec["kind"] == "scalar" and t == "sfixed32": |
|||
out += _write_key(field, 5) |
|||
out += _write_sfixed32(v) |
|||
elif spec["kind"] == "scalar" and t == "fixed64": |
|||
out += _write_key(field, 1) |
|||
out += _write_fixed64(int(v)) |
|||
elif spec["kind"] == "scalar" and t == "sfixed64": |
|||
out += _write_key(field, 1) |
|||
out += _write_sfixed64(v) |
|||
elif spec["kind"] == "scalar" and t == "float": |
|||
out += _write_key(field, 5) |
|||
out += struct.pack("<f", float(v)) |
|||
elif spec["kind"] == "scalar" and t == "double": |
|||
out += _write_key(field, 1) |
|||
out += struct.pack("<d", float(v)) |
|||
elif spec["kind"] == "scalar" and t == "bool": |
|||
_var_write(field, 1 if bool(v) else 0) |
|||
elif spec["kind"] == "scalar" and t == "string": |
|||
bs = v.encode("utf-8") |
|||
_ld_write(field, bs) |
|||
elif spec["kind"] == "scalar" and t == "bytes": |
|||
bs = bytes(v) |
|||
_ld_write(field, bs) |
|||
elif spec["kind"] == "message": |
|||
inner = encode(v, spec["schema"]) |
|||
_ld_write(field, inner) |
|||
elif spec["kind"] == "repeated" and "type" in spec: |
|||
# Proto3 default: pack eligible numeric types into one segment |
|||
if spec["type"] in {"varint", "uint64", "uint32", "int64", "int32", "sint", "sint64", "sint32", "fixed32", "fixed64", "sfixed32", "sfixed64", "float", "double", "bool"}: |
|||
packed = _encode_packed(vals, spec["type"]) # use prepared list |
|||
_ld_write(field, packed) |
|||
break # emitted entire list as one segment |
|||
else: |
|||
# Non-packable types (string/bytes) emit one tag per element |
|||
if spec["type"] == "string": |
|||
bs = v.encode("utf-8") |
|||
_ld_write(field, bs) |
|||
elif spec["type"] == "bytes": |
|||
bs = bytes(v) |
|||
_ld_write(field, bs) |
|||
else: |
|||
raise ValueError("unsupported repeated scalar type {}".format(spec["type"])) |
|||
elif spec["kind"] == "repeated" and "schema" in spec: |
|||
# repeated message: each v is a dict for one message instance |
|||
inner = encode(v, spec["schema"]) |
|||
_ld_write(field, inner) |
|||
else: |
|||
raise ValueError("unsupported type for field {}".format(field)) |
|||
|
|||
return bytes(out) |
|||
|
|||
def isInt(any): |
|||
try: |
|||
int(any) |
|||
return True |
|||
except: |
|||
return False |
|||
|
|||
def decode(buf, schema=None, _depth=0, _max_depth=_MAX_NESTING_DEPTH): |
|||
if _depth > _max_depth: |
|||
raise ValueError("message nesting too deep") |
|||
ns = _normalize_schema(schema) |
|||
if isinstance(buf, bytearray): |
|||
b = buf |
|||
else: |
|||
b = memoryview(buf).toreadonly() |
|||
out = {} |
|||
oneof_seen = {} |
|||
i = 0 |
|||
n = len(b) |
|||
while i < n: |
|||
key, i = _read_varint(b, i) |
|||
field = key >> 3 |
|||
wt = key & 7 |
|||
|
|||
spec = ns["fields"].get(field) |
|||
|
|||
if wt == 0: |
|||
v, i = _read_varint(b, i) |
|||
if spec: |
|||
t = spec.get("type") |
|||
if t in {"sint", "sint64"}: |
|||
v = zigzag_decode(v) |
|||
elif t == "sint32": |
|||
v = _i32_from_uvarint(zigzag_decode(v)) |
|||
elif t == "int64": |
|||
v = _i64_from_uvarint(v) |
|||
elif t == "int32": |
|||
v = _i32_from_uvarint(v) |
|||
elif t == "uint32": |
|||
v &= 0xFFFFFFFF |
|||
elif t in {"uint64", "varint"}: |
|||
pass |
|||
elif t == "bool": |
|||
v = bool(v) |
|||
elif wt == 1: |
|||
v, i = _read_fixed64(b, i) |
|||
if spec: |
|||
tt = spec.get("type") |
|||
if tt == "double": |
|||
v = struct.unpack("<d", struct.pack("<Q", v))[0] |
|||
elif tt == "sfixed64": |
|||
v = v - 0x10000000000000000 if v >= 0x8000000000000000 else v |
|||
elif wt == 2: |
|||
length, i = _read_varint(b, i) |
|||
if i + length > n: |
|||
raise ValueError("truncated length-delimited field") |
|||
chunk = bytes(b[i:i+length]) |
|||
i += length |
|||
t = spec.get("type") if spec else None |
|||
if spec and spec.get("kind") == "message": |
|||
v = decode(chunk, spec["schema"], _depth=_depth+1, _max_depth=_max_depth) |
|||
elif spec and spec.get("kind") == "repeated" and "type" in spec and t in {"varint", "uint64", "uint32", "int64", "int32", "sint", "sint64", "sint32", "fixed32", "fixed64", "sfixed32", "sfixed64", "float", "double", "bool"}: |
|||
# Proto3 default: repeated numeric are packed; flatten into list |
|||
v = _decode_packed(chunk, t) |
|||
elif spec and spec.get("kind") == "repeated" and "schema" in spec: |
|||
v = decode(chunk, spec["schema"], _depth=_depth+1, _max_depth=_max_depth) |
|||
elif spec and spec.get("kind") == "packed": |
|||
v = _decode_packed(chunk, t) |
|||
elif t == "string": |
|||
v = chunk.decode("utf-8") |
|||
elif t == "bytes" or spec is None: |
|||
v = chunk |
|||
else: |
|||
v = chunk |
|||
elif wt == 5: |
|||
v, i = _read_fixed32(b, i) |
|||
if spec: |
|||
tt = spec.get("type") |
|||
if tt == "float": |
|||
v = struct.unpack("<f", struct.pack("<I", v))[0] |
|||
elif tt == "sfixed32": |
|||
v = v - 0x100000000 if v >= 0x80000000 else v |
|||
else: |
|||
raise ValueError("unsupported wire type {}".format(wt)) |
|||
|
|||
out_key = (spec.get("name") if spec else field) |
|||
if isInt(out_key): |
|||
out_key = f"unk_proto_{out_key}" |
|||
|
|||
if spec: |
|||
is_repeated = spec.get("kind") in {"repeated", "packed"} |
|||
if is_repeated: |
|||
# Repeated scalar (numeric packed yields list 'v'), or repeated message (dict 'v') |
|||
seq = v if isinstance(v, list) else [v] |
|||
out.setdefault(out_key, []).extend(seq) |
|||
else: |
|||
# Singleton (scalar or message) — store as value, not list |
|||
grp = spec.get("oneof") |
|||
if grp: |
|||
prev = oneof_seen.get(grp) |
|||
if prev and prev in out: |
|||
try: |
|||
del out[prev] |
|||
except Exception: |
|||
out.pop(prev, None) |
|||
oneof_seen[grp] = out_key |
|||
out[out_key] = v |
|||
else: |
|||
# Unknown field: if first occurrence, store scalar; if multiple, upgrade to list |
|||
if out_key in out: |
|||
prev = out[out_key] |
|||
if isinstance(prev, list): |
|||
prev.append(v) |
|||
else: |
|||
out[out_key] = [prev, v] |
|||
else: |
|||
out[out_key] = v |
|||
|
|||
return out |
|||
|
|||
|
|||
# Packed repeated element decode |
|||
def _decode_packed(chunk, elem_type): |
|||
b = memoryview(chunk).toreadonly() |
|||
i = 0 |
|||
out = [] |
|||
while i < len(b): |
|||
if elem_type in {"varint", "uint64"}: |
|||
v, i = _read_varint(b, i) |
|||
out.append(v) |
|||
elif elem_type == "uint32": |
|||
v, i = _read_varint(b, i) |
|||
out.append(v & 0xFFFFFFFF) |
|||
elif elem_type == "int64": |
|||
v, i = _read_varint(b, i) |
|||
out.append(_i64_from_uvarint(v)) |
|||
elif elem_type == "int32": |
|||
v, i = _read_varint(b, i) |
|||
out.append(_i32_from_uvarint(v)) |
|||
elif elem_type == "sint": |
|||
v, i = _read_varint(b, i) |
|||
out.append(zigzag_decode(v)) |
|||
elif elem_type == "sint64": |
|||
v, i = _read_varint(b, i) |
|||
out.append(zigzag_decode(v)) |
|||
elif elem_type == "sint32": |
|||
v, i = _read_varint(b, i) |
|||
out.append(_i32_from_uvarint(zigzag_decode(v))) |
|||
elif elem_type == "fixed32": |
|||
v, i = _read_fixed32(b, i) |
|||
out.append(v) |
|||
elif elem_type == "sfixed32": |
|||
v, i = _read_fixed32(b, i) |
|||
v = v - 0x100000000 if v >= 0x80000000 else v |
|||
out.append(v) |
|||
elif elem_type == "fixed64": |
|||
v, i = _read_fixed64(b, i) |
|||
out.append(v) |
|||
elif elem_type == "sfixed64": |
|||
v, i = _read_fixed64(b, i) |
|||
v = v - 0x10000000000000000 if v >= 0x8000000000000000 else v |
|||
out.append(v) |
|||
elif elem_type == "float": |
|||
v, i = _read_fixed32(b, i) |
|||
out.append(struct.unpack("<f", struct.pack("<I", v))[0]) |
|||
elif elem_type == "double": |
|||
v, i = _read_fixed64(b, i) |
|||
out.append(struct.unpack("<d", struct.pack("<Q", v))[0]) |
|||
elif elem_type == "bool": |
|||
v, i = _read_varint(b, i) |
|||
out.append(bool(v)) |
|||
else: |
|||
raise ValueError("unsupported packed elem {}".format(elem_type)) |
|||
return out |
|||
@ -0,0 +1,4 @@ |
|||
pyserial-asyncio |
|||
uvicorn[standard] |
|||
fastapi |
|||
pymongo |
|||
@ -0,0 +1,203 @@ |
|||
#sys imports |
|||
import argparse |
|||
import traceback |
|||
import sys |
|||
import asyncio |
|||
from time import time |
|||
from typing import List, Dict |
|||
|
|||
class logger: |
|||
@staticmethod |
|||
def info(t): |
|||
print("[INFO]", t) |
|||
|
|||
@staticmethod |
|||
def error(t): |
|||
print("[ERROR]", t) |
|||
|
|||
@staticmethod |
|||
def debug(t): |
|||
print("[DEBUG]", t) |
|||
|
|||
#mesh |
|||
from mesht_device import MeshtDevice |
|||
from mesht_models import _wait_for_config_complete |
|||
from mesht_models import NOT_CONNECTED, WAIT_CONFIG, AVAILABLE, ERR, RECONNECT |
|||
|
|||
#fs imports |
|||
from fastapi import FastAPI, HTTPException, WebSocket |
|||
from fastapi.responses import HTMLResponse |
|||
from contextlib import asynccontextmanager |
|||
|
|||
#mongo |
|||
from pymongo import AsyncMongoClient |
|||
|
|||
def isInt(any): |
|||
try: |
|||
int(any) |
|||
return True |
|||
except: |
|||
return False |
|||
|
|||
|
|||
class MeshArgsParse: |
|||
def __init__(self, args): |
|||
self.args = args |
|||
|
|||
class MeshListener(MeshArgsParse): |
|||
def __init__(self, args): |
|||
super().__init__(args) |
|||
self.meshState = NOT_CONNECTED |
|||
|
|||
if args.transport == "serial": |
|||
from transport_serial import SerialTransport |
|||
|
|||
self.transport = SerialTransport(port = args.serial_port, baudrate = args.serial_baudrate) |
|||
self.device = MeshtDevice(self.transport) |
|||
else: |
|||
logger.error("Unknown mesh transport") |
|||
sys.exit(1) |
|||
|
|||
#task |
|||
async def meshWorker(self, queue: asyncio.Queue): |
|||
logger.info("Start mesh queue listener") |
|||
run = True |
|||
while run: |
|||
try: |
|||
await self.device.start() |
|||
self.meshState = WAIT_CONFIG |
|||
self.init_data = await _wait_for_config_complete(self.device) |
|||
for from_radio in self.init_data: |
|||
await queue.put(from_radio) |
|||
|
|||
self.meshState = AVAILABLE |
|||
while True: |
|||
from_radio, _ = await self.device.recv() |
|||
logger.debug(from_radio) |
|||
await queue.put(from_radio) |
|||
except asyncio.exceptions.CancelledError: |
|||
logger.info("Kill mesh device") |
|||
run = False |
|||
except: |
|||
self.meshState = ERR |
|||
traceback.print_exc() |
|||
await asyncio.sleep(1) |
|||
self.meshState = RECONNECT |
|||
finally: |
|||
await self.device.close() |
|||
|
|||
class MeshApi(MeshArgsParse): |
|||
app: FastAPI |
|||
tasks: List |
|||
def __init__(self, args): |
|||
super().__init__(args) |
|||
self.app = FastAPI(lifespan=self.lifespan) |
|||
self.tasks = [] |
|||
|
|||
@asynccontextmanager |
|||
async def lifespan(self, app: FastAPI): |
|||
logger.info("web started, now create bg tasks") |
|||
for task in self.tasks: |
|||
asyncio.create_task(task()) |
|||
yield |
|||
logger.info("kill web server") |
|||
|
|||
def run(self): |
|||
import uvicorn |
|||
uvicorn.run(self.app, host=self.args.web_host, port = self.args.web_port) |
|||
|
|||
class MongoDriver(MeshArgsParse): |
|||
def __init__(self, args): |
|||
super().__init__(args) |
|||
if args.mongo_url: |
|||
self.dbClient = AsyncMongoClient(args.mongo_url) |
|||
elif args.mongo_host and args.mongo_port: |
|||
self.dbClient = AsyncMongoClient(args.mongo_host, args.mongo_port) |
|||
else: |
|||
logger.error("Unknown mongo client") |
|||
sys.exit(1) |
|||
self.dbStore = self.dbClient[self.args.mongo_db] |
|||
#self.dbCollection = self.dbStore.from_radio |
|||
|
|||
async def dbSaveRadio(self, from_radio): |
|||
'''try: |
|||
anyJson = from_radio["packet"] |
|||
except: |
|||
logger.debug(from_radio) |
|||
return''' |
|||
|
|||
#logger.debug(from_radio) |
|||
#logger.debug(len(list(from_radio.keys()))) |
|||
for k, v in from_radio.items(): |
|||
if type(v) != dict: |
|||
v = {"data": v, "ts": time()} |
|||
else: |
|||
v["ts"] = time() |
|||
if "decoded" in v: |
|||
v.update(v["decoded"]) |
|||
del v["decoded"] |
|||
|
|||
if "payload" in v: |
|||
try: |
|||
v["decoded_payload"] = v["payload"].decode() |
|||
except: |
|||
pass |
|||
|
|||
if "user" in v: |
|||
v.update(v["user"]) |
|||
del v["user"] |
|||
|
|||
await self.dbStore[k].insert_one(v) |
|||
|
|||
async def dbQueueListener(self, queue: asyncio.Queue): |
|||
logger.info("Start db queue listener") |
|||
run = True |
|||
while run: |
|||
try: |
|||
from_radio = await queue.get() |
|||
if from_radio is None: |
|||
continue |
|||
|
|||
await self.dbSaveRadio(from_radio) |
|||
except asyncio.exceptions.CancelledError: |
|||
logger.info("Kill db listener") |
|||
run = False |
|||
except: |
|||
traceback.print_exc() |
|||
|
|||
class MeshCenter(MeshListener, MeshApi, MongoDriver, MeshArgsParse): |
|||
queue: asyncio.Queue = asyncio.Queue() |
|||
|
|||
def __init__(self, args): |
|||
MeshListener.__init__(self, args) |
|||
MeshArgsParse.__init__(self, args) |
|||
MeshApi.__init__(self, args) |
|||
MongoDriver.__init__(self, args) |
|||
self.buildBackgroundTasks() |
|||
|
|||
def buildBackgroundTasks(self): |
|||
async def mL(): |
|||
await self.meshWorker(self.queue) |
|||
self.tasks.append(mL) |
|||
|
|||
async def dbL(): |
|||
await self.dbQueueListener(self.queue) |
|||
self.tasks.append(dbL) |
|||
|
|||
if __name__ == "__main__": |
|||
parser = argparse.ArgumentParser() |
|||
#mesh |
|||
parser.add_argument("--transport", default="serial") |
|||
parser.add_argument("--serial-port", default="/dev/tty.usbmodemD0CF1309DC141") |
|||
parser.add_argument("--serial-baudrate", default=115200) |
|||
#fastapi |
|||
parser.add_argument("--web-host", default="0.0.0.0") |
|||
parser.add_argument("--web-port", default=8680) |
|||
#mongodb |
|||
parser.add_argument("--mongo-url") |
|||
parser.add_argument("--mongo-host", default="127.0.0.1") |
|||
parser.add_argument("--mongo-port", default=27017) |
|||
parser.add_argument("--mongo-db", default="meshtastic") |
|||
|
|||
a = MeshCenter(parser.parse_args()) |
|||
a.run() |
|||
@ -0,0 +1,118 @@ |
|||
import asyncio |
|||
import serial_asyncio |
|||
import logging |
|||
logger = logging.getLogger(__name__) |
|||
|
|||
MAGIC0 = 0x94 |
|||
MAGIC1 = 0xC3 |
|||
READ_CHUNK = 4096 |
|||
|
|||
|
|||
def encode_frame(payload): |
|||
n = len(payload) |
|||
header = bytes([MAGIC0, MAGIC1, (n >> 8) & 0xFF, n & 0xFF]) |
|||
return header + payload |
|||
|
|||
|
|||
class SerialTransport: |
|||
def __init__(self, port, baudrate=115200): |
|||
self.port = port |
|||
self.baudrate = baudrate |
|||
self.reader = None |
|||
self.writer = None |
|||
self._in_q = asyncio.Queue() |
|||
self._buf = bytearray() |
|||
self._reader_task = None |
|||
self._error = None |
|||
|
|||
async def start(self): |
|||
# Reset state to avoid stale sentinels/data from prior sessions |
|||
self._in_q = asyncio.Queue() |
|||
self._buf = bytearray() |
|||
self._error = None |
|||
self.reader, self.writer = await serial_asyncio.open_serial_connection( |
|||
url=self.port, baudrate=self.baudrate |
|||
) |
|||
self._reader_task = asyncio.create_task(self._reader_loop(), name="serial-reader") |
|||
logger.debug("SerialTransport.start: opened %s @ %s", self.port, self.baudrate) |
|||
|
|||
async def _reader_loop(self): |
|||
assert self.reader is not None |
|||
r = self.reader |
|||
try: |
|||
while True: |
|||
data = await r.read(READ_CHUNK) |
|||
if not data: |
|||
await asyncio.sleep(0) |
|||
continue |
|||
self._buf.extend(data) |
|||
while True: |
|||
start = -1 |
|||
for i in range(len(self._buf) - 1): |
|||
if self._buf[i] == MAGIC0 and self._buf[i + 1] == MAGIC1: |
|||
start = i |
|||
break |
|||
if start == -1: |
|||
if len(self._buf) > 1: |
|||
self._buf[:] = self._buf[-1:] |
|||
break |
|||
if start > 0: |
|||
del self._buf[:start] |
|||
if len(self._buf) < 4: |
|||
break |
|||
length = (self._buf[2] << 8) | self._buf[3] |
|||
total = 4 + length |
|||
if len(self._buf) < total: |
|||
break |
|||
payload = bytes(self._buf[4:total]) |
|||
await self._in_q.put(payload) |
|||
del self._buf[:total] |
|||
except asyncio.CancelledError: |
|||
pass |
|||
except Exception as e: |
|||
# Record error and notify receivers by placing a sentinel |
|||
self._error = e |
|||
logger.error("serial read error: %s", e) |
|||
try: |
|||
self._in_q.put_nowait(None) |
|||
except Exception: |
|||
pass |
|||
|
|||
async def send(self, payload): |
|||
if self.writer is None: |
|||
return |
|||
if not isinstance(payload, (bytes, bytearray)): |
|||
raise TypeError("payload must be bytes") |
|||
try: |
|||
self.writer.write(encode_frame(payload)) |
|||
await self.writer.drain() |
|||
except Exception as e: |
|||
# Treat as an error condition; surface to receiver path |
|||
self._error = e |
|||
logger.error("serial write error: %s", e) |
|||
try: |
|||
self._in_q.put_nowait(None) |
|||
except Exception: |
|||
pass |
|||
|
|||
async def recv(self): |
|||
item = await self._in_q.get() |
|||
if item is None: |
|||
raise ConnectionError(self._error or "serial transport error") |
|||
return item |
|||
|
|||
async def close(self): |
|||
if self._reader_task is not None: |
|||
self._reader_task.cancel() |
|||
try: |
|||
await self._reader_task |
|||
except Exception: |
|||
pass |
|||
self._reader_task = None |
|||
if self.writer is not None: |
|||
try: |
|||
self.writer.close() |
|||
except Exception: |
|||
pass |
|||
self.writer = None |
|||
self.reader = None |
|||
Loading…
Reference in new issue