diff --git a/packages/core/src/meshDevice.ts b/packages/core/src/meshDevice.ts index 1ba7a13c..0be2b224 100755 --- a/packages/core/src/meshDevice.ts +++ b/packages/core/src/meshDevice.ts @@ -64,6 +64,11 @@ export class MeshDevice { this.isConfigured = true; } else if (status === DeviceStatusEnum.DeviceConfiguring) { this.isConfigured = false; + } else if (status === DeviceStatusEnum.DeviceDisconnected) { + if (this._heartbeatIntervalId !== undefined) { + clearInterval(this._heartbeatIntervalId); + } + this.complete(); } }); @@ -770,7 +775,14 @@ export class MeshDevice { }, }); - return this.sendRaw(toBinary(Protobuf.Mesh.ToRadioSchema, toRadio)); + return this.sendRaw(toBinary(Protobuf.Mesh.ToRadioSchema, toRadio)).catch( + (e) => { + if (this.deviceStatus === DeviceStatusEnum.DeviceDisconnected) { + throw new Error("Device connection lost"); + } + throw e; + }, + ); } /** @@ -797,7 +809,12 @@ export class MeshDevice { clearInterval(this._heartbeatIntervalId); } this._heartbeatIntervalId = setInterval(() => { - this.heartbeat(); + this.heartbeat().catch((err) => { + this.log.error( + Emitter[Emitter.Ping], + `⚠️ Unable to send heartbeat: ${err.message}`, + ); + }); }, interval); } diff --git a/packages/core/src/utils/queue.ts b/packages/core/src/utils/queue.ts index 55f23282..65bff569 100644 --- a/packages/core/src/utils/queue.ts +++ b/packages/core/src/utils/queue.ts @@ -117,6 +117,14 @@ export class Queue { await writer.write(item.data); item.sent = true; } catch (error) { + if ( + error?.code === "ECONNRESET" || + error?.code === "ERR_INVALID_STATE" + ) { + writer.releaseLock(); + this.lock = false; + throw error; + } console.error(`Error sending packet ${item.id}`, error); } } diff --git a/packages/core/src/utils/transform/toDevice.ts b/packages/core/src/utils/transform/toDevice.ts index 05350413..7f6a3932 100644 --- a/packages/core/src/utils/transform/toDevice.ts +++ b/packages/core/src/utils/transform/toDevice.ts @@ -1,16 +1,18 @@ /** * Pads packets with appropriate framing information before writing to the output stream. */ -export const toDeviceStream: TransformStream = - new TransformStream({ - transform(chunk: Uint8Array, controller): void { - const bufLen = chunk.length; - const header = new Uint8Array([ - 0x94, - 0xc3, - (bufLen >> 8) & 0xff, - bufLen & 0xff, - ]); - controller.enqueue(new Uint8Array([...header, ...chunk])); - }, - }); +export const toDeviceStream: () => TransformStream = + () => { + return new TransformStream({ + transform(chunk: Uint8Array, controller): void { + const bufLen = chunk.length; + const header = new Uint8Array([ + 0x94, + 0xc3, + (bufLen >> 8) & 0xff, + bufLen & 0xff, + ]); + controller.enqueue(new Uint8Array([...header, ...chunk])); + }, + }); + }; diff --git a/packages/transport-deno/src/transport.ts b/packages/transport-deno/src/transport.ts index d5e41aa6..eec90695 100644 --- a/packages/transport-deno/src/transport.ts +++ b/packages/transport-deno/src/transport.ts @@ -16,9 +16,10 @@ export class TransportDeno implements Types.Transport { constructor(connection: Deno.Conn) { this.connection = connection; - Utils.toDeviceStream.readable.pipeTo(this.connection.writable); + const toDeviceStream = Utils.toDeviceStream(); + toDeviceStream.readable.pipeTo(this.connection.writable); - this._toDevice = Utils.toDeviceStream.writable; + this._toDevice = toDeviceStream.writable; this._fromDevice = this.connection.readable.pipeThrough( Utils.fromDeviceStream(), ); diff --git a/packages/transport-http/src/transport.ts b/packages/transport-http/src/transport.ts index dae3baaa..3bdf57db 100644 --- a/packages/transport-http/src/transport.ts +++ b/packages/transport-http/src/transport.ts @@ -76,6 +76,7 @@ export class TransportHTTP implements Types.Transport { Types.DeviceStatusEnum.DeviceDisconnected, this.isTimeoutOrAbort(error) ? "write-timeout" : "write-error", ); + return; } throw error; } @@ -165,6 +166,7 @@ export class TransportHTTP implements Types.Transport { Types.DeviceStatusEnum.DeviceDisconnected, this.isTimeoutOrAbort(error) ? "write-timeout" : "write-error", ); + return; } throw error; } diff --git a/packages/transport-node-serial/src/transport.test.ts b/packages/transport-node-serial/src/transport.test.ts index 3345bbd0..d6290853 100644 --- a/packages/transport-node-serial/src/transport.test.ts +++ b/packages/transport-node-serial/src/transport.test.ts @@ -53,11 +53,12 @@ class FakeSerialPort extends Duplex { } function stubCoreTransforms() { - const toDevice = new TransformStream({ - transform(chunk, controller) { - controller.enqueue(chunk); - }, - }); + const toDevice = () => + new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk); + }, + }); const fromDeviceFactory = () => new TransformStream({ @@ -67,8 +68,9 @@ function stubCoreTransforms() { }); // Utils.toDeviceStream is a getter + const transform = Utils.toDeviceStream; vi.spyOn(Utils, "toDeviceStream", "get").mockReturnValue( - toDevice as unknown as typeof Utils.toDeviceStream, + toDevice as unknown as typeof transform, ); vi.spyOn(Utils, "fromDeviceStream").mockImplementation( diff --git a/packages/transport-node-serial/src/transport.ts b/packages/transport-node-serial/src/transport.ts index fc5df1a7..8bc0dd52 100644 --- a/packages/transport-node-serial/src/transport.ts +++ b/packages/transport-node-serial/src/transport.ts @@ -112,9 +112,10 @@ export class TransportNodeSerial implements Types.Transport { }); // Stream for data going FROM the application TO the Meshtastic device. - this._toDevice = Utils.toDeviceStream.writable; + const toDeviceTransform = Utils.toDeviceStream(); + this._toDevice = toDeviceTransform.writable; - this.pipePromise = Utils.toDeviceStream.readable + this.pipePromise = toDeviceTransform.readable .pipeTo(Writable.toWeb(port) as WritableStream, { signal: controller.signal, }) diff --git a/packages/transport-node/src/transport.test.ts b/packages/transport-node/src/transport.test.ts index ae9d5feb..03ab4a49 100644 --- a/packages/transport-node/src/transport.test.ts +++ b/packages/transport-node/src/transport.test.ts @@ -54,11 +54,12 @@ class FakeSocket extends Duplex { } function stubCoreTransforms() { - const toDevice = new TransformStream({ - transform(chunk, controller) { - controller.enqueue(chunk); - }, - }); + const toDevice = () => + new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk); + }, + }); const fromDeviceFactory = () => new TransformStream({ @@ -67,8 +68,9 @@ function stubCoreTransforms() { }, }); + const transform = Utils.toDeviceStream; vi.spyOn(Utils, "toDeviceStream", "get").mockReturnValue( - toDevice as unknown as typeof Utils.toDeviceStream, + toDevice as unknown as typeof transform, ); vi.spyOn(Utils, "fromDeviceStream").mockImplementation( diff --git a/packages/transport-node/src/transport.ts b/packages/transport-node/src/transport.ts index 03713d0b..9d41167c 100644 --- a/packages/transport-node/src/transport.ts +++ b/packages/transport-node/src/transport.ts @@ -20,28 +20,36 @@ export class TransportNode implements Types.Transport { Types.DeviceStatusEnum.DeviceDisconnected; private closingByUser = false; + private errored = false; /** * Creates and connects a new TransportNode instance. * @param hostname - The IP address or hostname of the Meshtastic device. * @param port - The port number for the TCP connection (defaults to 4403). + * @param timeout - TCP socket timeout in milliseconds (defaults to 60000). * @returns A promise that resolves with a connected TransportNode instance. */ - public static create(hostname: string, port = 4403): Promise { + public static create( + hostname: string, + port = 4403, + timeout = 60000, + ): Promise { return new Promise((resolve, reject) => { const socket = new Socket(); const onError = (err: Error) => { socket.destroy(); + socket.removeAllListeners(); reject(err); }; socket.once("error", onError); - - socket.connect(port, hostname, () => { + socket.once("ready", () => { socket.removeListener("error", onError); resolve(new TransportNode(socket)); }); + socket.setTimeout(timeout); + socket.connect(port, hostname); }); } @@ -52,8 +60,10 @@ export class TransportNode implements Types.Transport { constructor(connection: Socket) { this.socket = connection; - this.socket.on("error", (err) => { - console.error("Socket connection error:", err); + this.socket.on("error", () => { + this.errored = true; + this.socket?.removeAllListeners(); + this.socket?.destroy(); if (!this.closingByUser) { this.emitStatus( Types.DeviceStatusEnum.DeviceDisconnected, @@ -62,6 +72,24 @@ export class TransportNode implements Types.Transport { } }); + this.socket.on("end", () => { + if (this.closingByUser) { + return; // suppress close-derived disconnect in user flow + } + this.emitStatus(Types.DeviceStatusEnum.DeviceDisconnected, "socket-end"); + this.socket?.removeAllListeners(); + this.socket?.destroy(); + }); + + this.socket.on("timeout", () => { + this.emitStatus( + Types.DeviceStatusEnum.DeviceDisconnected, + "socket-timeout", + ); + this.socket?.removeAllListeners(); + this.socket?.destroy(); + }); + this.socket.on("close", () => { if (this.closingByUser) { return; // suppress close-derived disconnect in user flow @@ -98,7 +126,7 @@ export class TransportNode implements Types.Transport { } ctrl.close(); } catch (error) { - if (this.closingByUser) { + if (this.closingByUser || this.errored) { ctrl.close(); } else { this.emitStatus( @@ -120,7 +148,7 @@ export class TransportNode implements Types.Transport { }); // Stream for data going FROM the application TO the Meshtastic device. - const toDeviceTransform = Utils.toDeviceStream; + const toDeviceTransform = Utils.toDeviceStream(); this._toDevice = toDeviceTransform.writable; this.pipePromise = toDeviceTransform.readable @@ -128,10 +156,9 @@ export class TransportNode implements Types.Transport { signal: abortController.signal, }) .catch((err) => { - if (abortController.signal.aborted) { + if (abortController.signal.aborted || this.socket?.destroyed) { return; } - console.error("Error piping data to socket:", err); const error = err instanceof Error ? err : new Error(String(err)); this.socket?.destroy(error); }); @@ -160,11 +187,11 @@ export class TransportNode implements Types.Transport { if (this.pipePromise) { await this.pipePromise; } - this.socket?.destroy(); } finally { this.socket = undefined; this.closingByUser = false; + this.errored = false; } } @@ -173,9 +200,13 @@ export class TransportNode implements Types.Transport { return; } this.lastStatus = next; - this.fromDeviceController?.enqueue({ - type: "status", - data: { status: next, reason }, - }); + try { + this.fromDeviceController?.enqueue({ + type: "status", + data: { status: next, reason }, + }); + } catch (e) { + console.error("Enqueue fail", e); + } } } diff --git a/packages/transport-web-serial/src/transport.test.ts b/packages/transport-web-serial/src/transport.test.ts index dbf4ef71..403a3f38 100644 --- a/packages/transport-web-serial/src/transport.test.ts +++ b/packages/transport-web-serial/src/transport.test.ts @@ -4,11 +4,12 @@ import { runTransportContract } from "../../../tests/utils/transportContract.ts" import { TransportWebSerial } from "./transport.ts"; function stubCoreTransforms() { - const toDevice = new TransformStream({ - transform(chunk, controller) { - controller.enqueue(chunk); - }, - }); + const toDevice = () => + new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk); + }, + }); // maps raw bytes -> DeviceOutput.packet const fromDeviceFactory = () => @@ -18,9 +19,10 @@ function stubCoreTransforms() { }, }); + const transform = Utils.toDeviceStream; const restoreTo = vi .spyOn(Utils, "toDeviceStream", "get") - .mockReturnValue(toDevice as unknown as typeof Utils.toDeviceStream); + .mockReturnValue(toDevice as unknown as typeof transform); const restoreFrom = vi .spyOn(Utils, "fromDeviceStream") diff --git a/packages/transport-web-serial/src/transport.ts b/packages/transport-web-serial/src/transport.ts index 949e4d5c..ff81aee1 100644 --- a/packages/transport-web-serial/src/transport.ts +++ b/packages/transport-web-serial/src/transport.ts @@ -58,7 +58,8 @@ export class TransportWebSerial implements Types.Transport { const abortController = this.abortController; // Set up the pipe with abort signal for clean cancellation - this.pipePromise = Utils.toDeviceStream.readable + const toDeviceTransform = Utils.toDeviceStream(); + this.pipePromise = toDeviceTransform.readable .pipeTo(connection.writable, { signal: this.abortController.signal }) .catch((err) => { // Ignore expected rejection when we cancel it via the AbortController. @@ -73,7 +74,7 @@ export class TransportWebSerial implements Types.Transport { ); }); - this._toDevice = Utils.toDeviceStream.writable; + this._toDevice = toDeviceTransform.writable; // Wrap + capture controller to inject status packets this._fromDevice = new ReadableStream({ @@ -199,7 +200,8 @@ export class TransportWebSerial implements Types.Transport { const abortController = this.abortController; // Re-establish the pipe connection - this.pipePromise = Utils.toDeviceStream.readable + const toDeviceTransform = Utils.toDeviceStream(); + this.pipePromise = toDeviceTransform.readable .pipeTo(this.connection.writable, { signal: this.abortController.signal, })