Browse Source

Connection robustness improvements (#813)

* Clear heartbeat and queue when disconnected

* Give clearer error in case configure fails due to a lost connection. Used to throw 'Packet does not exist'

* If the queue processing error is due to a lost connection, throw it instead of looping endlessly

* In case we send a disconnection event we don't need to also throw

* Catch heartbeat errors

* Also handle invalid state errors

* Handle socket timeouts

* Log heartbeat failures

* Make linter happy

* Transform stream being a singleton prevented reconnection attempts

* Adapt tests to not using singleton

* Aborting already ends the connection
pull/834/head
Henri Bergius 9 months ago
committed by GitHub
parent
commit
bcdda8b751
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 21
      packages/core/src/meshDevice.ts
  2. 8
      packages/core/src/utils/queue.ts
  3. 28
      packages/core/src/utils/transform/toDevice.ts
  4. 5
      packages/transport-deno/src/transport.ts
  5. 2
      packages/transport-http/src/transport.ts
  6. 14
      packages/transport-node-serial/src/transport.test.ts
  7. 5
      packages/transport-node-serial/src/transport.ts
  8. 14
      packages/transport-node/src/transport.test.ts
  9. 59
      packages/transport-node/src/transport.ts
  10. 14
      packages/transport-web-serial/src/transport.test.ts
  11. 8
      packages/transport-web-serial/src/transport.ts

21
packages/core/src/meshDevice.ts

@ -64,6 +64,11 @@ export class MeshDevice {
this.isConfigured = true;
} else if (status === DeviceStatusEnum.DeviceConfiguring) {
this.isConfigured = false;
} else if (status === DeviceStatusEnum.DeviceDisconnected) {
if (this._heartbeatIntervalId !== undefined) {
clearInterval(this._heartbeatIntervalId);
}
this.complete();
}
});
@ -770,7 +775,14 @@ export class MeshDevice {
},
});
return this.sendRaw(toBinary(Protobuf.Mesh.ToRadioSchema, toRadio));
return this.sendRaw(toBinary(Protobuf.Mesh.ToRadioSchema, toRadio)).catch(
(e) => {
if (this.deviceStatus === DeviceStatusEnum.DeviceDisconnected) {
throw new Error("Device connection lost");
}
throw e;
},
);
}
/**
@ -797,7 +809,12 @@ export class MeshDevice {
clearInterval(this._heartbeatIntervalId);
}
this._heartbeatIntervalId = setInterval(() => {
this.heartbeat();
this.heartbeat().catch((err) => {
this.log.error(
Emitter[Emitter.Ping],
`⚠️ Unable to send heartbeat: ${err.message}`,
);
});
}, interval);
}

8
packages/core/src/utils/queue.ts

@ -117,6 +117,14 @@ export class Queue {
await writer.write(item.data);
item.sent = true;
} catch (error) {
if (
error?.code === "ECONNRESET" ||
error?.code === "ERR_INVALID_STATE"
) {
writer.releaseLock();
this.lock = false;
throw error;
}
console.error(`Error sending packet ${item.id}`, error);
}
}

28
packages/core/src/utils/transform/toDevice.ts

@ -1,16 +1,18 @@
/**
* Pads packets with appropriate framing information before writing to the output stream.
*/
export const toDeviceStream: TransformStream<Uint8Array, Uint8Array> =
new TransformStream<Uint8Array, Uint8Array>({
transform(chunk: Uint8Array, controller): void {
const bufLen = chunk.length;
const header = new Uint8Array([
0x94,
0xc3,
(bufLen >> 8) & 0xff,
bufLen & 0xff,
]);
controller.enqueue(new Uint8Array([...header, ...chunk]));
},
});
export const toDeviceStream: () => TransformStream<Uint8Array, Uint8Array> =
() => {
return new TransformStream<Uint8Array, Uint8Array>({
transform(chunk: Uint8Array, controller): void {
const bufLen = chunk.length;
const header = new Uint8Array([
0x94,
0xc3,
(bufLen >> 8) & 0xff,
bufLen & 0xff,
]);
controller.enqueue(new Uint8Array([...header, ...chunk]));
},
});
};

5
packages/transport-deno/src/transport.ts

@ -16,9 +16,10 @@ export class TransportDeno implements Types.Transport {
constructor(connection: Deno.Conn) {
this.connection = connection;
Utils.toDeviceStream.readable.pipeTo(this.connection.writable);
const toDeviceStream = Utils.toDeviceStream();
toDeviceStream.readable.pipeTo(this.connection.writable);
this._toDevice = Utils.toDeviceStream.writable;
this._toDevice = toDeviceStream.writable;
this._fromDevice = this.connection.readable.pipeThrough(
Utils.fromDeviceStream(),
);

2
packages/transport-http/src/transport.ts

@ -76,6 +76,7 @@ export class TransportHTTP implements Types.Transport {
Types.DeviceStatusEnum.DeviceDisconnected,
this.isTimeoutOrAbort(error) ? "write-timeout" : "write-error",
);
return;
}
throw error;
}
@ -165,6 +166,7 @@ export class TransportHTTP implements Types.Transport {
Types.DeviceStatusEnum.DeviceDisconnected,
this.isTimeoutOrAbort(error) ? "write-timeout" : "write-error",
);
return;
}
throw error;
}

14
packages/transport-node-serial/src/transport.test.ts

@ -53,11 +53,12 @@ class FakeSerialPort extends Duplex {
}
function stubCoreTransforms() {
const toDevice = new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
controller.enqueue(chunk);
},
});
const toDevice = () =>
new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
controller.enqueue(chunk);
},
});
const fromDeviceFactory = () =>
new TransformStream<Uint8Array, Types.DeviceOutput>({
@ -67,8 +68,9 @@ function stubCoreTransforms() {
});
// Utils.toDeviceStream is a getter
const transform = Utils.toDeviceStream;
vi.spyOn(Utils, "toDeviceStream", "get").mockReturnValue(
toDevice as unknown as typeof Utils.toDeviceStream,
toDevice as unknown as typeof transform,
);
vi.spyOn(Utils, "fromDeviceStream").mockImplementation(

5
packages/transport-node-serial/src/transport.ts

@ -112,9 +112,10 @@ export class TransportNodeSerial implements Types.Transport {
});
// Stream for data going FROM the application TO the Meshtastic device.
this._toDevice = Utils.toDeviceStream.writable;
const toDeviceTransform = Utils.toDeviceStream();
this._toDevice = toDeviceTransform.writable;
this.pipePromise = Utils.toDeviceStream.readable
this.pipePromise = toDeviceTransform.readable
.pipeTo(Writable.toWeb(port) as WritableStream<Uint8Array>, {
signal: controller.signal,
})

14
packages/transport-node/src/transport.test.ts

@ -54,11 +54,12 @@ class FakeSocket extends Duplex {
}
function stubCoreTransforms() {
const toDevice = new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
controller.enqueue(chunk);
},
});
const toDevice = () =>
new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
controller.enqueue(chunk);
},
});
const fromDeviceFactory = () =>
new TransformStream<Uint8Array, Types.DeviceOutput>({
@ -67,8 +68,9 @@ function stubCoreTransforms() {
},
});
const transform = Utils.toDeviceStream;
vi.spyOn(Utils, "toDeviceStream", "get").mockReturnValue(
toDevice as unknown as typeof Utils.toDeviceStream,
toDevice as unknown as typeof transform,
);
vi.spyOn(Utils, "fromDeviceStream").mockImplementation(

59
packages/transport-node/src/transport.ts

@ -20,28 +20,36 @@ export class TransportNode implements Types.Transport {
Types.DeviceStatusEnum.DeviceDisconnected;
private closingByUser = false;
private errored = false;
/**
* Creates and connects a new TransportNode instance.
* @param hostname - The IP address or hostname of the Meshtastic device.
* @param port - The port number for the TCP connection (defaults to 4403).
* @param timeout - TCP socket timeout in milliseconds (defaults to 60000).
* @returns A promise that resolves with a connected TransportNode instance.
*/
public static create(hostname: string, port = 4403): Promise<TransportNode> {
public static create(
hostname: string,
port = 4403,
timeout = 60000,
): Promise<TransportNode> {
return new Promise((resolve, reject) => {
const socket = new Socket();
const onError = (err: Error) => {
socket.destroy();
socket.removeAllListeners();
reject(err);
};
socket.once("error", onError);
socket.connect(port, hostname, () => {
socket.once("ready", () => {
socket.removeListener("error", onError);
resolve(new TransportNode(socket));
});
socket.setTimeout(timeout);
socket.connect(port, hostname);
});
}
@ -52,8 +60,10 @@ export class TransportNode implements Types.Transport {
constructor(connection: Socket) {
this.socket = connection;
this.socket.on("error", (err) => {
console.error("Socket connection error:", err);
this.socket.on("error", () => {
this.errored = true;
this.socket?.removeAllListeners();
this.socket?.destroy();
if (!this.closingByUser) {
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
@ -62,6 +72,24 @@ export class TransportNode implements Types.Transport {
}
});
this.socket.on("end", () => {
if (this.closingByUser) {
return; // suppress close-derived disconnect in user flow
}
this.emitStatus(Types.DeviceStatusEnum.DeviceDisconnected, "socket-end");
this.socket?.removeAllListeners();
this.socket?.destroy();
});
this.socket.on("timeout", () => {
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
"socket-timeout",
);
this.socket?.removeAllListeners();
this.socket?.destroy();
});
this.socket.on("close", () => {
if (this.closingByUser) {
return; // suppress close-derived disconnect in user flow
@ -98,7 +126,7 @@ export class TransportNode implements Types.Transport {
}
ctrl.close();
} catch (error) {
if (this.closingByUser) {
if (this.closingByUser || this.errored) {
ctrl.close();
} else {
this.emitStatus(
@ -120,7 +148,7 @@ export class TransportNode implements Types.Transport {
});
// Stream for data going FROM the application TO the Meshtastic device.
const toDeviceTransform = Utils.toDeviceStream;
const toDeviceTransform = Utils.toDeviceStream();
this._toDevice = toDeviceTransform.writable;
this.pipePromise = toDeviceTransform.readable
@ -128,10 +156,9 @@ export class TransportNode implements Types.Transport {
signal: abortController.signal,
})
.catch((err) => {
if (abortController.signal.aborted) {
if (abortController.signal.aborted || this.socket?.destroyed) {
return;
}
console.error("Error piping data to socket:", err);
const error = err instanceof Error ? err : new Error(String(err));
this.socket?.destroy(error);
});
@ -160,11 +187,11 @@ export class TransportNode implements Types.Transport {
if (this.pipePromise) {
await this.pipePromise;
}
this.socket?.destroy();
} finally {
this.socket = undefined;
this.closingByUser = false;
this.errored = false;
}
}
@ -173,9 +200,13 @@ export class TransportNode implements Types.Transport {
return;
}
this.lastStatus = next;
this.fromDeviceController?.enqueue({
type: "status",
data: { status: next, reason },
});
try {
this.fromDeviceController?.enqueue({
type: "status",
data: { status: next, reason },
});
} catch (e) {
console.error("Enqueue fail", e);
}
}
}

14
packages/transport-web-serial/src/transport.test.ts

@ -4,11 +4,12 @@ import { runTransportContract } from "../../../tests/utils/transportContract.ts"
import { TransportWebSerial } from "./transport.ts";
function stubCoreTransforms() {
const toDevice = new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
controller.enqueue(chunk);
},
});
const toDevice = () =>
new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
controller.enqueue(chunk);
},
});
// maps raw bytes -> DeviceOutput.packet
const fromDeviceFactory = () =>
@ -18,9 +19,10 @@ function stubCoreTransforms() {
},
});
const transform = Utils.toDeviceStream;
const restoreTo = vi
.spyOn(Utils, "toDeviceStream", "get")
.mockReturnValue(toDevice as unknown as typeof Utils.toDeviceStream);
.mockReturnValue(toDevice as unknown as typeof transform);
const restoreFrom = vi
.spyOn(Utils, "fromDeviceStream")

8
packages/transport-web-serial/src/transport.ts

@ -58,7 +58,8 @@ export class TransportWebSerial implements Types.Transport {
const abortController = this.abortController;
// Set up the pipe with abort signal for clean cancellation
this.pipePromise = Utils.toDeviceStream.readable
const toDeviceTransform = Utils.toDeviceStream();
this.pipePromise = toDeviceTransform.readable
.pipeTo(connection.writable, { signal: this.abortController.signal })
.catch((err) => {
// Ignore expected rejection when we cancel it via the AbortController.
@ -73,7 +74,7 @@ export class TransportWebSerial implements Types.Transport {
);
});
this._toDevice = Utils.toDeviceStream.writable;
this._toDevice = toDeviceTransform.writable;
// Wrap + capture controller to inject status packets
this._fromDevice = new ReadableStream<Types.DeviceOutput>({
@ -199,7 +200,8 @@ export class TransportWebSerial implements Types.Transport {
const abortController = this.abortController;
// Re-establish the pipe connection
this.pipePromise = Utils.toDeviceStream.readable
const toDeviceTransform = Utils.toDeviceStream();
this.pipePromise = toDeviceTransform.readable
.pipeTo(this.connection.writable, {
signal: this.abortController.signal,
})

Loading…
Cancel
Save