commit 083f362f258d3266b1e9b8ccee71bd4db73c0690 Author: gsd Date: Sun Feb 8 16:37:45 2026 +0300 init fekalis diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..33bc7e5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +testenv/ +__pycache__/ \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..8a2b0f7 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,19 @@ +services: + mongodb: + image: docker.pblr-nyk.pro/mongo:8.2.4 + container_name: mongodb-0 + ports: + - 27017:27017 + volumes: + - ./testenv/mongodb:/data/db + + mongoExpress: + image: docker.pblr-nyk.pro/mongo-express + container_name: mongoExpress-0 + ports: + - 8081:8081 + environment: + - ME_CONFIG_MONGODB_URL=mongodb://mongodb:27017 + depends_on: + - mongodb + \ No newline at end of file diff --git a/mesht_device.py b/mesht_device.py new file mode 100644 index 0000000..56a2501 --- /dev/null +++ b/mesht_device.py @@ -0,0 +1,300 @@ +import asyncio +import random +import pb +import logging +logger = logging.getLogger(__name__) +from mesht_models import * + + +DATA_SCHEMA = [ + ("varint", "portnum", 1), + ("bytes", "payload", 2), + ("bool", "want_response", 3), + ("fixed32", "dest", 4), + ("fixed32", "source", 5), + ("fixed32", "request_id", 6), + ("fixed32", "reply_id", 7), + ("fixed32", "emoji", 8), + ("uint32", "bitfield", 9), +] + +MESHPACKET_SCHEMA = [ + ("fixed32", "from", 1), + ("fixed32", "to", 2), + ("uint32", "channel", 3), + ("oneof", "payload_variant", [ + (DATA_SCHEMA, "decoded", 4), + ("bytes", "encrypted", 5), + ]), + ("fixed32", "id", 6), + ("fixed32", "rx_time", 7), + ("float", "rx_snr", 8), + ("uint32", "hop_limit", 9), + ("bool", "want_ack", 10), + ("int32", "priority", 11), + ("int32", "rx_rssi", 12), + ("bool", "via_mqtt", 14), + ("uint32", "hop_start", 15), + ("bytes", "public_key", 16), + ("bool", "pki_encrypted", 17), + ("uint32", "next_hop", 18), + ("uint32", "relay_node", 19), + ("uint32", "tx_after", 20), +] + +CHANNEL_SETTINGS_SCHEMA = [ + ("uint32", "channel_num", 1), + ("string", "name", 3), +] + +CHANNEL_SCHEMA = [ + ("int32", "index", 1), + (CHANNEL_SETTINGS_SCHEMA, "settings", 2), + ("int32", "role", 3), +] + +USER_SCHEMA = [ + ("string", "id", 1), + ("string", "long_name", 2), + ("string", "short_name", 3), + ("int32", "hw_model", 5), + ("bool", "is_licensed", 6), + ("int32", "role", 7), + ("bytes", "public_key", 8), +] + +DEVICEMETRICS_SCHEMA = [ + ("uint32", "battery_level", 1), + ("float", "voltage", 2), +] + +MYNODEINFO_SCHEMA = [ + ("uint32", "my_node_num", 1), +] + +NODEINFO_SCHEMA = [ + ("uint32", "num", 1), + (USER_SCHEMA, "user", 2), + ("float", "snr", 4), + ("fixed32", "last_heard", 5), + (DEVICEMETRICS_SCHEMA, "device_metrics", 6), + ("uint32", "hops_away", 9), +] + +LORACONFIG_SCHEMA = [ + ("bool", "use_preset", 1), + ("int32", "modem_preset", 2), + ("int32", "region", 7), +] + +CONFIG_SCHEMA = [ + ("oneof", "variant", [ + (LORACONFIG_SCHEMA, "lora", 6), + ]), +] + +FROMRADIO_SCHEMA = [ + ("uint32", "id", 1), + ("oneof", "payload_variant", [ + (MESHPACKET_SCHEMA, "packet", 2), + (MYNODEINFO_SCHEMA, "my_info", 3), + (NODEINFO_SCHEMA, "node_info", 4), + (CONFIG_SCHEMA, "config", 5), + ("uint32", "config_complete_id", 7), + (CHANNEL_SCHEMA, "channel", 10), + ]), +] + +TORADIO_SCHEMA = [ + (MESHPACKET_SCHEMA, "packet", 1), + ("uint32", "want_config_id", 3), +] + +PRESET_NAMES = { + 0: "LongFast", + 1: "LongSlow", + 2: "VeryLongSlow", + 3: "MediumSlow", + 4: "MediumFast", + 5: "ShortSlow", + 6: "ShortFast", + 7: "LongModerate", + 8: "ShortTurbo", +} + +REGION_NAMES = { + 0: "UNSET", + 1: "US", + 2: "EU_433", + 3: "EU_868", + 4: "CN", + 5: "JP", + 6: "ANZ", + 7: "KR", + 8: "TW", + 9: "RU", + 10: "IN", + 11: "NZ_865", + 12: "TH", + 13: "LORA_24", + 14: "UA_433", + 15: "UA_868", + 16: "MY_433", + 17: "MY_919", + 18: "SG_923", + 19: "PH_433", + 20: "PH_868", + 21: "PH_915", + 22: "ANZ_433", + 23: "KZ_433", + 24: "KZ_863", + 25: "NP_865", + 26: "BR_902", +} + +PORTNUMS = { + 0: "UNKNOWN_APP", + 1: "TEXT_MESSAGE_APP", + 2: "REMOTE_HARDWARE_APP", + 3: "POSITION_APP", + 4: "NODEINFO_APP", + 5: "ROUTING_APP", + 6: "ADMIN_APP", + 7: "TEXT_MESSAGE_COMPRESSED_APP", + 8: "WAYPOINT_APP", + 9: "AUDIO_APP", + 10: "DETECTION_SENSOR_APP", + 11: "ALERT_APP", + 32: "REPLY_APP", + 33: "IP_TUNNEL_APP", + 34: "PAXCOUNTER_APP", + 64: "SERIAL_APP", + 65: "STORE_FORWARD_APP", + 66: "RANGE_TEST_APP", + 67: "TELEMETRY_APP", + 68: "ZPS_APP", + 69: "SIMULATOR_APP", + 70: "TRACEROUTE_APP", + 71: "NEIGHBORINFO_APP", + 72: "ATAK_PLUGIN", + 73: "MAP_REPORT_APP", + 74: "POWERSTRESS_APP", + 76: "RETICULUM_TUNNEL_APP", + 256: "PRIVATE_APP", + 257: "ATAK_FORWARDER", + 511: "MAX", +} +NAMES_TO_PORTNUMS = {v: k for k, v in PORTNUMS.items()} + + +class Channel: + def __init__(self, index, name, role): + self.index = int(index) + self.name = name + self.role = int(role or 0) + + +class MeshtDevice: + def __init__(self, transport): + self.transport = transport + self.channels = [] + self.lora_config = None + # Track local node number from MyNodeInfo + self.my_node_id = "00000000" + self.nid = None + + async def start(self): + await self.transport.start() + nonce = random.randint(1, 1_000_000_000) + logger.debug("MeshtDevice.start: sending want_config_id nonce=%s", nonce) + await self.transport.send(pb.encode({"want_config_id": nonce}, TORADIO_SCHEMA)) + + async def close(self): + return await self.transport.close() + + async def sendMsgToChannel(self, text, channel_index = 0): + return await self.send_text(text, channel_index= channel_index) + + async def sendMsgToDM(self, text, num): + return await self.send_text(text, num) + + async def send_text(self, text, num = 0xFFFFFFFF, channel_index = 0): + data = { + "portnum": NAMES_TO_PORTNUMS["TEXT_MESSAGE_APP"], + "payload": text.encode("utf-8"), + "want_response": False, + } + meshpacket = { + "id": random.randint(1, 0x7FFFFFFF), + #https://github.com/meshtastic/python/blob/5cc0dae3947cd72f5a05d079a93751fc924afac6/meshtastic/mesh_interface.py#L935 + "to": num, #3148365392,#0xFFFFFFFF, + "channel": int(channel_index), + "want_ack": True, + "decoded": data, + } + payload = pb.encode({"packet": meshpacket}, TORADIO_SCHEMA) + await self.transport.send(payload) + # Return the meshpacket details for logging by callers + return meshpacket + + async def recv(self): + data = await self.transport.recv() + fr = pb.decode(data, FROMRADIO_SCHEMA) + logger.debug(f"FromRadio: {fr}") + self._maybe_store_channel(fr) + self._maybe_store_lora_config(fr) + self._maybe_store_my_node(fr) + return fr, data + + def get_channel_index(self, name): + for ch in self.channels: + if ch.name == name: + return ch.index + return None + + def get_channels(self): + return list(self.channels) + + def _maybe_store_my_node(self, from_radio): + mi = from_radio.get("my_info") + if not mi: + return + + self.nid = mi.get("my_node_num", 0) + self.my_node_id = f"{self.nid & 0xFFFFFFFF:08x}" + + def _maybe_store_channel(self, from_radio): + if not isinstance(from_radio, dict): + return + ch = from_radio.get("channel") + if not isinstance(ch, dict): + return + # protobuf default for 0 means 'missing' -> treat as index 0 + idx = int(ch.get("index") or 0) + name = (ch.get("settings") or {}).get("name") or f"Channel {idx}" + role = int(ch.get("role") or 0) + + for i, existing in enumerate(self.channels): + if existing.index == idx: + if role == 0: + # Remove if disabled + self.channels.pop(i) + return + existing.name = name + existing.role = role + break + else: + if role != 0: + self.channels.append(Channel(idx, name, role)) + + self.channels.sort(key=lambda c: c.index) + + def _maybe_store_lora_config(self, from_radio): + if not isinstance(from_radio, dict): + return + cfg = from_radio.get("config") + if not isinstance(cfg, dict): + return + lora = cfg.get("lora") + if isinstance(lora, dict): + self.lora_config = lora \ No newline at end of file diff --git a/mesht_models.py b/mesht_models.py new file mode 100644 index 0000000..8a41ff5 --- /dev/null +++ b/mesht_models.py @@ -0,0 +1,95 @@ +import pydantic +from time import time + +import logging +logger = logging.getLogger(__name__) + +PUB_CH = 0xFFFFFFFF +PACKET = "packet" + +NOT_CONNECTED = 0 +WAIT_CONFIG = 1 +AVAILABLE = 2 +ERR = 3 +RECONNECT = 4 + +WS_EVENT_CHANNEL = 0 +WS_EVENT_NODE = 1 +WS_EVENT_STATE = 2 +WS_EVENT_MESSAGE = 3 +WS_EVENT_MYID = 4 +#WS_EVENT_MYNODE = 5 + +WS_TYPE_INIT = 0 +WS_TYPE_NEW = 1 +WS_TYPE_FRONTEND = 2 + +async def _wait_for_config_complete(device, extraInfo = False): + logger.info("wait config") + packets = [] + while True: + from_radio, _ = await device.recv() + packets.append(from_radio) + if extraInfo: + logger.info(from_radio) + logger.info("---") + if isinstance(from_radio, dict) and from_radio.get("config_complete_id") is not None: + logger.info("config catching") + return packets + +class Channel: + def __init__(self, fr): + self.index = fr["channel"].get("index", 0) + self.name = fr["channel"]["settings"].get("name", f"Channel {self.index}") + + @staticmethod + def isThis(fr): + return isinstance(fr, dict) and "channel" in fr and fr["channel"].get("settings", {}).keys().__len__() > 0 + +class Node: #aka node info + def __init__(self, fr): + self.num = fr["node_info"]["num"] + self.user = {} + self.user["id"] = fr["node_info"].get("user", {}).get("id", None) + self.user["short_name"] = fr["node_info"].get("user", {}).get("short_name", None) + self.user["long_name"] = fr["node_info"].get("user", {}).get("long_name", None) + self.snr = fr["node_info"].get("snr", None) + self.last_heard = fr["node_info"].get("last_heard", None) + self.hops_away = fr["node_info"].get("hops_away", None) + + @staticmethod + def isThis(fr): + return isinstance(fr, dict) and "node_info" in fr + + def __str__(self): + return f"({self.user.get('short_name', None)}) {self.user.get('long_name', None)}" + +class NewMessage(pydantic.BaseModel): + channel_id: int = 0 + node_id: int = 0 + txt: str + +class Message: + def __init__(self, fr): + self.fr0m = fr[PACKET]["from"] + self.to = fr[PACKET]["to"] + self.msg = fr[PACKET]["decoded"]["payload"].decode("utf-8") + self.id = fr[PACKET]["id"] + self.rx_time = fr[PACKET].get("rx_time", 0) + self.rx_snr = fr[PACKET].get("rx_snr", None) + self.hop_limit = fr[PACKET].get("hop_limit", None) + self.rx_rssi = fr[PACKET].get("rx_rssi", None) + self.channel = fr[PACKET].get("channel", 0) + self.append_time = time() + + @property + def isDm(self): + return self.to != PUB_CH + + @property + def isPub(self): + return not self.isDm + + @staticmethod + def isThis(fr):#1: "TEXT_MESSAGE_APP", + return isinstance(fr, dict) and "packet" in fr and "decoded" in fr["packet"] and fr["packet"]["decoded"]["portnum"] == 1 \ No newline at end of file diff --git a/pb.py b/pb.py new file mode 100644 index 0000000..975dede --- /dev/null +++ b/pb.py @@ -0,0 +1,554 @@ +""" +https://github.com/allanrbo/pb.py +""" + +import struct + + +_MAX_VARINT_BYTES = 10 # uint64 max +_MAX_NESTING_DEPTH = 100 + + +def _read_varint(b, i): + shift = 0 + n = 0 + read = 0 + blen = len(b) + while True: + if i >= blen: + raise ValueError("truncated varint") + c = b[i] + i += 1 + read += 1 + n |= (c & 0x7F) << shift + if not (c & 0x80): + break + if read >= _MAX_VARINT_BYTES: + raise ValueError("varint too long") + shift += 7 + return n, i + + +def _write_varint(n): + if n < 0: + raise ValueError("varint requires non-negative integer; use 'sint' for signed") + out = bytearray() + while True: + to_write = n & 0x7F + n >>= 7 + if n: + out.append(to_write | 0x80) + else: + out.append(to_write) + break + return bytes(out) + + +def zigzag_encode(n): + return (n << 1) ^ (n >> 63) + + +def zigzag_decode(n): + return (n >> 1) ^ -(n & 1) + + +def _zz32(n): + # zigzag for 32-bit signed + n = int(n) + if n < -0x80000000 or n > 0x7FFFFFFF: + raise ValueError("sint32 out of range") + v = (n << 1) ^ (n >> 31) + return v & 0xFFFFFFFF + + +def _i32_from_uvarint(v): + v &= 0xFFFFFFFF + return v - 0x100000000 if v >= 0x80000000 else v + + +def _i64_from_uvarint(v): + return v - 0x10000000000000000 if v >= 0x8000000000000000 else v + + +def _i32_to_uvarint(n): + # represent signed 32-bit as 64-bit varint two's complement + n = int(n) + if n < 0: + u = (n & 0xFFFFFFFF) | 0xFFFFFFFF00000000 + else: + u = n & 0xFFFFFFFF + return u & 0xFFFFFFFFFFFFFFFF + + +def _read_fixed32(b, i): + if i + 4 > len(b): + raise ValueError("truncated fixed32") + v = struct.unpack_from(" len(b): + raise ValueError("truncated fixed64") + v = struct.unpack_from(" 1: + raise ValueError(f"oneof group '{grp}' has multiple fields set: {ks}") + + # Emit fields in caller-provided order + for key, val in values.items(): + # Allow using field names; map to number if needed + if isinstance(key, int): + field = key + else: + field = ns["names"].get(key) + if field is None: + raise KeyError(f"unknown field name '{key}' in schema") + spec = ns["fields"].get(field) + if not spec: + raise KeyError(f"field {field} not defined in schema") + + if spec["kind"] == "packed": + seq = val if isinstance(val, list) else [val] + packed = _encode_packed(seq, spec["type"]) + _ld_write(field, packed) + continue + + # For repeated, the value must be a list; otherwise wrap singletons + is_repeated = (spec["kind"] == "repeated") + vals = val if isinstance(val, list) else ([val] if is_repeated else [val]) + for v in vals: + t = spec.get("type") if spec else None + if spec["kind"] == "scalar" and t in {"varint", "uint64"}: + _var_write(field, int(v)) + elif spec["kind"] == "scalar" and t == "uint32": + _var_write(field, int(v) & 0xFFFFFFFF) + elif spec["kind"] == "scalar" and t == "int64": + _var_write(field, int(v) & 0xFFFFFFFFFFFFFFFF) + elif spec["kind"] == "scalar" and t == "int32": + u = _i32_to_uvarint(v) + _var_write(field, u) + elif spec["kind"] == "scalar" and t in {"sint", "sint64"}: + _var_write(field, zigzag_encode(int(v))) + elif spec["kind"] == "scalar" and t == "sint32": + zz = _zz32(v) + _var_write(field, zz) + elif spec["kind"] == "scalar" and t == "fixed32": + out += _write_key(field, 5) + out += _write_fixed32(int(v)) + elif spec["kind"] == "scalar" and t == "sfixed32": + out += _write_key(field, 5) + out += _write_sfixed32(v) + elif spec["kind"] == "scalar" and t == "fixed64": + out += _write_key(field, 1) + out += _write_fixed64(int(v)) + elif spec["kind"] == "scalar" and t == "sfixed64": + out += _write_key(field, 1) + out += _write_sfixed64(v) + elif spec["kind"] == "scalar" and t == "float": + out += _write_key(field, 5) + out += struct.pack(" _max_depth: + raise ValueError("message nesting too deep") + ns = _normalize_schema(schema) + if isinstance(buf, bytearray): + b = buf + else: + b = memoryview(buf).toreadonly() + out = {} + oneof_seen = {} + i = 0 + n = len(b) + while i < n: + key, i = _read_varint(b, i) + field = key >> 3 + wt = key & 7 + + spec = ns["fields"].get(field) + + if wt == 0: + v, i = _read_varint(b, i) + if spec: + t = spec.get("type") + if t in {"sint", "sint64"}: + v = zigzag_decode(v) + elif t == "sint32": + v = _i32_from_uvarint(zigzag_decode(v)) + elif t == "int64": + v = _i64_from_uvarint(v) + elif t == "int32": + v = _i32_from_uvarint(v) + elif t == "uint32": + v &= 0xFFFFFFFF + elif t in {"uint64", "varint"}: + pass + elif t == "bool": + v = bool(v) + elif wt == 1: + v, i = _read_fixed64(b, i) + if spec: + tt = spec.get("type") + if tt == "double": + v = struct.unpack("= 0x8000000000000000 else v + elif wt == 2: + length, i = _read_varint(b, i) + if i + length > n: + raise ValueError("truncated length-delimited field") + chunk = bytes(b[i:i+length]) + i += length + t = spec.get("type") if spec else None + if spec and spec.get("kind") == "message": + v = decode(chunk, spec["schema"], _depth=_depth+1, _max_depth=_max_depth) + elif spec and spec.get("kind") == "repeated" and "type" in spec and t in {"varint", "uint64", "uint32", "int64", "int32", "sint", "sint64", "sint32", "fixed32", "fixed64", "sfixed32", "sfixed64", "float", "double", "bool"}: + # Proto3 default: repeated numeric are packed; flatten into list + v = _decode_packed(chunk, t) + elif spec and spec.get("kind") == "repeated" and "schema" in spec: + v = decode(chunk, spec["schema"], _depth=_depth+1, _max_depth=_max_depth) + elif spec and spec.get("kind") == "packed": + v = _decode_packed(chunk, t) + elif t == "string": + v = chunk.decode("utf-8") + elif t == "bytes" or spec is None: + v = chunk + else: + v = chunk + elif wt == 5: + v, i = _read_fixed32(b, i) + if spec: + tt = spec.get("type") + if tt == "float": + v = struct.unpack("= 0x80000000 else v + else: + raise ValueError("unsupported wire type {}".format(wt)) + + out_key = (spec.get("name") if spec else field) + if isInt(out_key): + out_key = f"unk_proto_{out_key}" + + if spec: + is_repeated = spec.get("kind") in {"repeated", "packed"} + if is_repeated: + # Repeated scalar (numeric packed yields list 'v'), or repeated message (dict 'v') + seq = v if isinstance(v, list) else [v] + out.setdefault(out_key, []).extend(seq) + else: + # Singleton (scalar or message) — store as value, not list + grp = spec.get("oneof") + if grp: + prev = oneof_seen.get(grp) + if prev and prev in out: + try: + del out[prev] + except Exception: + out.pop(prev, None) + oneof_seen[grp] = out_key + out[out_key] = v + else: + # Unknown field: if first occurrence, store scalar; if multiple, upgrade to list + if out_key in out: + prev = out[out_key] + if isinstance(prev, list): + prev.append(v) + else: + out[out_key] = [prev, v] + else: + out[out_key] = v + + return out + + +# Packed repeated element decode +def _decode_packed(chunk, elem_type): + b = memoryview(chunk).toreadonly() + i = 0 + out = [] + while i < len(b): + if elem_type in {"varint", "uint64"}: + v, i = _read_varint(b, i) + out.append(v) + elif elem_type == "uint32": + v, i = _read_varint(b, i) + out.append(v & 0xFFFFFFFF) + elif elem_type == "int64": + v, i = _read_varint(b, i) + out.append(_i64_from_uvarint(v)) + elif elem_type == "int32": + v, i = _read_varint(b, i) + out.append(_i32_from_uvarint(v)) + elif elem_type == "sint": + v, i = _read_varint(b, i) + out.append(zigzag_decode(v)) + elif elem_type == "sint64": + v, i = _read_varint(b, i) + out.append(zigzag_decode(v)) + elif elem_type == "sint32": + v, i = _read_varint(b, i) + out.append(_i32_from_uvarint(zigzag_decode(v))) + elif elem_type == "fixed32": + v, i = _read_fixed32(b, i) + out.append(v) + elif elem_type == "sfixed32": + v, i = _read_fixed32(b, i) + v = v - 0x100000000 if v >= 0x80000000 else v + out.append(v) + elif elem_type == "fixed64": + v, i = _read_fixed64(b, i) + out.append(v) + elif elem_type == "sfixed64": + v, i = _read_fixed64(b, i) + v = v - 0x10000000000000000 if v >= 0x8000000000000000 else v + out.append(v) + elif elem_type == "float": + v, i = _read_fixed32(b, i) + out.append(struct.unpack("> 8) & 0xFF, n & 0xFF]) + return header + payload + + +class SerialTransport: + def __init__(self, port, baudrate=115200): + self.port = port + self.baudrate = baudrate + self.reader = None + self.writer = None + self._in_q = asyncio.Queue() + self._buf = bytearray() + self._reader_task = None + self._error = None + + async def start(self): + # Reset state to avoid stale sentinels/data from prior sessions + self._in_q = asyncio.Queue() + self._buf = bytearray() + self._error = None + self.reader, self.writer = await serial_asyncio.open_serial_connection( + url=self.port, baudrate=self.baudrate + ) + self._reader_task = asyncio.create_task(self._reader_loop(), name="serial-reader") + logger.debug("SerialTransport.start: opened %s @ %s", self.port, self.baudrate) + + async def _reader_loop(self): + assert self.reader is not None + r = self.reader + try: + while True: + data = await r.read(READ_CHUNK) + if not data: + await asyncio.sleep(0) + continue + self._buf.extend(data) + while True: + start = -1 + for i in range(len(self._buf) - 1): + if self._buf[i] == MAGIC0 and self._buf[i + 1] == MAGIC1: + start = i + break + if start == -1: + if len(self._buf) > 1: + self._buf[:] = self._buf[-1:] + break + if start > 0: + del self._buf[:start] + if len(self._buf) < 4: + break + length = (self._buf[2] << 8) | self._buf[3] + total = 4 + length + if len(self._buf) < total: + break + payload = bytes(self._buf[4:total]) + await self._in_q.put(payload) + del self._buf[:total] + except asyncio.CancelledError: + pass + except Exception as e: + # Record error and notify receivers by placing a sentinel + self._error = e + logger.error("serial read error: %s", e) + try: + self._in_q.put_nowait(None) + except Exception: + pass + + async def send(self, payload): + if self.writer is None: + return + if not isinstance(payload, (bytes, bytearray)): + raise TypeError("payload must be bytes") + try: + self.writer.write(encode_frame(payload)) + await self.writer.drain() + except Exception as e: + # Treat as an error condition; surface to receiver path + self._error = e + logger.error("serial write error: %s", e) + try: + self._in_q.put_nowait(None) + except Exception: + pass + + async def recv(self): + item = await self._in_q.get() + if item is None: + raise ConnectionError(self._error or "serial transport error") + return item + + async def close(self): + if self._reader_task is not None: + self._reader_task.cancel() + try: + await self._reader_task + except Exception: + pass + self._reader_task = None + if self.writer is not None: + try: + self.writer.close() + except Exception: + pass + self.writer = None + self.reader = None \ No newline at end of file