diff --git a/.vscode/settings.json b/.vscode/settings.json index aa09339f..3b4797be 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -6,4 +6,5 @@ "search.exclude": { "**/i18n/locales/*-*/**": true, }, + "vitest.workspaceConfig": "vitest.config.ts", } diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index 5a487415..751f4a6c 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -10,7 +10,12 @@ interface DebugLog { data: string; } -export type DeviceOutput = Packet | DebugLog; +interface StatusEvent { + type: "status"; + data: { status: DeviceStatusEnum; reason?: string }; +} + +export type DeviceOutput = Packet | DebugLog | StatusEvent; export interface Transport { toDevice: WritableStream; @@ -101,6 +106,7 @@ export enum Emitter { RemoveNodeByNum = 32, SetCannedMessages = 33, Disconnect = 34, + ConnectionStatus = 35, } export interface LogEvent { diff --git a/packages/core/src/utils/transform/decodePacket.ts b/packages/core/src/utils/transform/decodePacket.ts index 94f0e54d..d44e52cf 100644 --- a/packages/core/src/utils/transform/decodePacket.ts +++ b/packages/core/src/utils/transform/decodePacket.ts @@ -7,6 +7,19 @@ export const decodePacket = (device: MeshDevice) => new WritableStream({ write(chunk) { switch (chunk.type) { + case "status": { + const { status, reason } = chunk.data as { + status: Types.DeviceStatusEnum; + reason?: string; + }; + + device.updateDeviceStatus(status); + device.log.info( + Types.Emitter[Types.Emitter.ConnectionStatus], + `🔗 ${Types.DeviceStatusEnum[status]} ${reason ? `(${reason})` : ""}`, + ); + break; + } case "debug": { break; } diff --git a/packages/transport-http/src/transport.test.ts b/packages/transport-http/src/transport.test.ts new file mode 100644 index 00000000..f1712964 --- /dev/null +++ b/packages/transport-http/src/transport.test.ts @@ -0,0 +1,229 @@ +import { describe, vi, expect, it, beforeEach, afterEach, type MockInstance } from "vitest"; +import { runTransportContract } from "../../../tests/utils/transportContract"; +import { TransportHTTP } from "./transport"; + +let abortTimeoutSpy: MockInstance | undefined; +beforeEach(() => { + abortTimeoutSpy = vi.spyOn( + globalThis.AbortSignal as unknown as { timeout(ms: number): AbortSignal }, + "timeout", + ).mockImplementation((ms: number) => { + const ctrl = new AbortController(); + const abort = () => + ctrl.abort(new DOMException("Timeout reached", "TimeoutError")); + // Uses setTimeout so vi.useFakeTimers() can fast-forward it + setTimeout(abort, ms); + return ctrl.signal; + }); +}); + +afterEach(() => { + abortTimeoutSpy?.mockRestore(); +}); + +function stubFetch() { + const inbox: Uint8Array[] = []; + let lastWritten: ArrayBuffer | undefined; + + let forceNextReadToHang = false; + let forceNextReadToReturn500 = false; + + function makeAbortAwareHang(signal?: AbortSignal): Promise { + return new Promise((_, reject) => { + const abort = () => reject(new DOMException("Aborted", "AbortError")); + if (signal?.aborted) { + abort(); + return; + } + if (signal) { + signal.addEventListener("abort", abort, { once: true }); + } + }); + } + + const mockFetch = vi.fn(async (url: string, init?: RequestInit) => { + const method = (init?.method ?? "GET").toUpperCase(); + + if (url.includes("/api/v1/toradio") && method === "OPTIONS") { + return { ok: true, status: 204 } as Response; + } + + if (url.includes("/api/v1/toradio") && method === "PUT") { + lastWritten = init?.body as ArrayBuffer; + return { ok: true, status: 200 } as Response; + } + + if (url.includes("/api/v1/fromradio") && method === "GET") { + if (forceNextReadToHang) { + forceNextReadToHang = false; + return makeAbortAwareHang(init?.signal ?? undefined); + } + + if (forceNextReadToReturn500) { + forceNextReadToReturn500 = false; + return { + ok: false, + status: 500, + arrayBuffer: async () => new ArrayBuffer(0), + } as Response; + } + + const next = inbox.shift() ?? new Uint8Array(); + return { + ok: true, + status: 200, + arrayBuffer: async () => next.buffer, + } as Response; + } + + return { ok: true, status: 200 } as Response; + }); + + vi.stubGlobal("fetch", mockFetch); + + return { + pushIncoming: (u8: Uint8Array) => inbox.push(u8), + assertLastWritten: (u8: Uint8Array) => { + const got = new Uint8Array(lastWritten || new ArrayBuffer(0)); + expect(got).toEqual(u8); + }, + forceReadErrorOnce: () => { + forceNextReadToReturn500 = true; + }, + forceReadTimeoutOnce: () => { + forceNextReadToHang = true; + }, + getMock: () => mockFetch, + cleanup: () => vi.unstubAllGlobals(), + }; +} + +async function tickNextTimer() { + try { + await vi.advanceTimersToNextTimerAsync(); + } catch { + await new Promise((r) => setTimeout(r, 5)); + } +} + +describe("TransportHTTP (contract)", () => { + runTransportContract({ + name: "TransportHTTP", + setup: () => { + vi.useFakeTimers(); + }, + teardown: () => { + vi.useRealTimers(); + vi.restoreAllMocks(); + vi.unstubAllGlobals(); + }, + create: async () => { + (globalThis as unknown as { __http: ReturnType }).__http = stubFetch(); + const transport = await TransportHTTP.create("127.0.0.1:80", false); + await tickNextTimer(); + return transport; + }, + pushIncoming: async (bytes) => { + (globalThis as unknown as { __http: ReturnType }).__http.pushIncoming(bytes); + await tickNextTimer(); + }, + assertLastWritten: (bytes) => { + (globalThis as unknown as { __http: ReturnType }).__http.assertLastWritten(bytes); + }, + triggerDisconnect: async () => { + (globalThis as unknown as { __http: ReturnType }).__http.forceReadErrorOnce(); + await tickNextTimer(); + }, + }); +}); + +describe("TransportHTTP (extras)", () => { + let httpStub: ReturnType | undefined; + + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.restoreAllMocks(); + httpStub?.cleanup(); + httpStub = undefined; + }); + + async function createTransport(): Promise { + httpStub = stubFetch(); + const transport = await TransportHTTP.create("127.0.0.1:80", false); + await tickNextTimer(); + return transport; + } + + async function advanceOnePoll() { + await tickNextTimer(); + } + + it("emits DeviceDisconnected with reason 'read-timeout' when GET /fromradio hangs", async () => { + const transport = await createTransport(); + const reader = transport.fromDevice.getReader(); + + httpStub!.forceReadTimeoutOnce(); + + await tickNextTimer(); + await vi.advanceTimersByTimeAsync(8000); + + let sawReadTimeout = false; + for (let i = 0; i < 6; i++) { + const { value } = await reader.read(); + if (value?.type === "status" && value.data.reason === "read-timeout") { + sawReadTimeout = true; + break; + } + } + expect(sawReadTimeout).toBe(true); + + reader.releaseLock(); + await transport.disconnect(); + }); + + it("stops polling after disconnect()", async () => { + const transport = await createTransport(); + + const fetchMock = httpStub!.getMock(); + const callsBeforeDisconnect = fetchMock.mock.calls.length; + + await transport.disconnect(); + + await advanceOnePoll(); + await vi.runOnlyPendingTimersAsync(); + + const callsAfterDisconnect = fetchMock.mock.calls.length; + expect(callsAfterDisconnect).toBe(callsBeforeDisconnect); + }); + + it("emits DeviceDisconnected with reason 'read-timeout' when GET /fromradio hangs", async () => { + const transport = await createTransport(); + const reader = transport.fromDevice.getReader(); + + httpStub!.forceReadTimeoutOnce(); + + await vi.advanceTimersToNextTimerAsync(); + + await vi.advanceTimersByTimeAsync(8000); + + await Promise.resolve(); + await Promise.resolve(); + + let sawReadTimeout = false; + for (let i = 0; i < 6; i++) { + const { value } = await reader.read(); + if (value?.type === "status" && value.data.reason === "read-timeout") { + sawReadTimeout = true; + break; + } + } + expect(sawReadTimeout).toBe(true); + + reader.releaseLock(); + await transport.disconnect(); + }); +}); diff --git a/packages/transport-http/src/transport.ts b/packages/transport-http/src/transport.ts index 42dd6221..dae3baaa 100644 --- a/packages/transport-http/src/transport.ts +++ b/packages/transport-http/src/transport.ts @@ -1,14 +1,49 @@ -import type { Types } from "@meshtastic/core"; +import { Types } from "@meshtastic/core"; +const FETCH_INTERVAL_MS = 3000; +const READ_TIMEOUT_MS = 7000; +const WRITE_TIMEOUT_MS = 4000; + +function toArrayBuffer(uint8array: Uint8Array): ArrayBuffer { + if ( + uint8array.buffer instanceof ArrayBuffer && + uint8array.byteOffset === 0 && + uint8array.byteLength === uint8array.buffer.byteLength + ) { + return uint8array.buffer; + } + return uint8array.slice().buffer; +} + +/** + * Provides HTTP(S) transport for Meshtastic devices. + * + * Implements {@link Types.Transport} using the device's HTTP API. + * Polls `/api/v1/fromradio` for incoming packets and writes to `/api/v1/toradio`. + */ export class TransportHTTP implements Types.Transport { private _toDevice: WritableStream; private _fromDevice: ReadableStream; + private fromDeviceController?: ReadableStreamDefaultController; + private url: string; private receiveBatchRequests: boolean; private fetchInterval: number; private fetching: boolean; private interval: ReturnType | undefined; + private inflightReadController?: AbortController; + + private lastStatus: Types.DeviceStatusEnum = + Types.DeviceStatusEnum.DeviceDisconnected; + private closingByUser = false; + + /** + * Probe the device and return a connected HTTP transport. + * + * @param address Hostname or IP address (with optional port). + * @param tls Use HTTPS if true, HTTP otherwise. + */ public static async create( address: string, tls?: boolean, @@ -17,97 +52,192 @@ export class TransportHTTP implements Types.Transport { await fetch(`${connectionUrl}/api/v1/toradio`, { method: "OPTIONS", }); - await Promise.resolve(); return new TransportHTTP(connectionUrl); } + /** + * Construct a new HTTP transport for the given device URL. + * + * @param url Base URL of the device (`http://host:port` or `https://host:port`). + */ constructor(url: string) { this.url = url; this.receiveBatchRequests = false; - this.fetchInterval = 3000; + this.fetchInterval = FETCH_INTERVAL_MS; this.fetching = false; this._toDevice = new WritableStream({ write: async (chunk) => { - await this.writeToRadio(chunk); + try { + await this.writeToRadio(chunk); + } catch (error) { + if (!this.closingByUser) { + this.emitStatus( + Types.DeviceStatusEnum.DeviceDisconnected, + this.isTimeoutOrAbort(error) ? "write-timeout" : "write-error", + ); + } + throw error; + } }, }); - let controller: ReadableStreamDefaultController; - this._fromDevice = new ReadableStream({ start: (ctrl) => { - controller = ctrl; + this.fromDeviceController = ctrl; + this.emitStatus(Types.DeviceStatusEnum.DeviceConnecting); + + // Start polling immediately + void this.safePoll(); + this.interval = setInterval( + () => void this.safePoll(), + this.fetchInterval, + ); + }, + cancel: () => { + if (this.interval) { + clearInterval(this.interval); + } + this.interval = undefined; }, }); - - this.interval = setInterval(async () => { - if (this.fetching) { - // We still have the previous request open - return; - } - this.fetching = true; - try { - await this.readFromRadio(controller); - } catch { - // TODO: Emit disconnection events for certain types of errors - } - this.fetching = false; - }, this.fetchInterval); } - private async readFromRadio( - controller: ReadableStreamDefaultController, - ): Promise { + /** Poll `/api/v1/fromradio` and enqueue incoming packets. */ + private async readFromRadio(): Promise { let readBuffer = new ArrayBuffer(1); + while (readBuffer.byteLength > 0) { - const response = await fetch( - `${this.url}/api/v1/fromradio?all=${ - this.receiveBatchRequests ? "true" : "false" - }`, - { - method: "GET", - headers: { - Accept: "application/x-protobuf", + const inflight = new AbortController(); + this.inflightReadController = inflight; + + const signal = AbortSignal.any([ + inflight.signal, + AbortSignal.timeout(READ_TIMEOUT_MS), + ]); + + try { + const response = await fetch( + `${this.url}/api/v1/fromradio?all=${this.receiveBatchRequests ? "true" : "false"}`, + { + method: "GET", + headers: { Accept: "application/x-protobuf" }, + signal, }, - }, - ); + ); + if (!response.ok) { + throw new Error( + `fromradio ${response.status} ${response.statusText}`, + ); + } + + this.emitStatus(Types.DeviceStatusEnum.DeviceConnected); - readBuffer = await response.arrayBuffer(); + readBuffer = await response.arrayBuffer(); - if (readBuffer.byteLength > 0) { - controller.enqueue({ - type: "packet", - data: new Uint8Array(readBuffer), - }); + if (readBuffer.byteLength > 0) { + this.fromDeviceController?.enqueue({ + type: "packet", + data: new Uint8Array(readBuffer), + }); + } + } finally { + this.inflightReadController = undefined; } } } + /** Write a protobuf-encoded request to `/api/v1/toradio`. */ private async writeToRadio(data: Uint8Array): Promise { - await fetch(`${this.url}/api/v1/toradio`, { - method: "PUT", - headers: { - "Content-Type": "application/x-protobuf", - }, - body: data, - }); + try { + const response = await fetch(`${this.url}/api/v1/toradio`, { + method: "PUT", + headers: { "Content-Type": "application/x-protobuf" }, + body: toArrayBuffer(data), + signal: AbortSignal.timeout(WRITE_TIMEOUT_MS), + }); + if (!response.ok) { + throw new Error(`toradio ${response.status} ${response.statusText}`); + } + } catch (error) { + if (!this.closingByUser) { + this.emitStatus( + Types.DeviceStatusEnum.DeviceDisconnected, + this.isTimeoutOrAbort(error) ? "write-timeout" : "write-error", + ); + } + throw error; + } } + /** Writable stream of bytes to the device. */ get toDevice(): WritableStream { return this._toDevice; } + /** Readable stream of {@link Types.DeviceOutput} from the device. */ get fromDevice(): ReadableStream { return this._fromDevice; } + /** + * Stop polling and emit `DeviceDisconnected("user")`. + */ disconnect(): Promise { - this.fetching = false; + this.closingByUser = true; + if (this.interval) { clearInterval(this.interval); } this.interval = undefined; + this.fetching = false; + + try { + this.inflightReadController?.abort(); + } catch {} + this.inflightReadController = undefined; + + this.emitStatus(Types.DeviceStatusEnum.DeviceDisconnected, "user"); + return Promise.resolve(); } + + private emitStatus(next: Types.DeviceStatusEnum, reason?: string): void { + if (next === this.lastStatus) { + return; + } + this.lastStatus = next; + this.fromDeviceController?.enqueue({ + type: "status", + data: { status: next, reason }, + }); + } + + private isTimeoutOrAbort(err: unknown): boolean { + return ( + (err instanceof DOMException && + (err.name === "AbortError" || err.name === "TimeoutError")) || + (err instanceof Error && + (err.name === "AbortError" || err.name === "TimeoutError")) + ); + } + + private async safePoll(): Promise { + if (this.fetching) { + return; + } + this.fetching = true; + try { + await this.readFromRadio(); + } catch (error) { + if (!this.closingByUser) { + this.emitStatus( + Types.DeviceStatusEnum.DeviceDisconnected, + this.isTimeoutOrAbort(error) ? "read-timeout" : "read-error", + ); + } + } finally { + this.fetching = false; + } + } } diff --git a/packages/transport-node-serial/src/transport.test.ts b/packages/transport-node-serial/src/transport.test.ts new file mode 100644 index 00000000..ef4e4a4e --- /dev/null +++ b/packages/transport-node-serial/src/transport.test.ts @@ -0,0 +1,187 @@ +import { describe, vi, expect, beforeEach, afterEach, it } from "vitest"; +import { Duplex } from "node:stream"; +import type { SerialPort } from "serialport"; +import { Types, Utils } from "@meshtastic/core"; +import { runTransportContract } from "../../../tests/utils/transportContract"; +import { TransportNodeSerial } from "./transport"; + +function isStatusEvent( + output: Types.DeviceOutput | undefined, +): output is Extract { + return output !== undefined && output.type === "status"; +} + +class FakeSerialPort extends Duplex { + public lastWritten: Uint8Array | undefined; + + constructor() { + super({ objectMode: false }); + } + + _read() {} + + _write( + chunk: Buffer, + _encoding: BufferEncoding, + callback: (error?: Error | null) => void, + ) { + this.lastWritten = new Uint8Array( + chunk.buffer, + chunk.byteOffset, + chunk.byteLength, + ); + callback(); + } + + pushIncoming(data: Uint8Array) { + const buf = Buffer.from(data.buffer, data.byteOffset, data.byteLength); + this.push(buf); + } + + emitErrorOnce(message = "simulated serial error") { + this.emit("error", new Error(message)); + } + + emitClose() { + this.emit("close"); + } + + close() { + this.destroy(); + this.emit("close"); + } +} + +function stubCoreTransforms() { + const toDevice = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk); + }, + }); + + const fromDeviceFactory = () => + new TransformStream({ + transform(chunk, controller) { + controller.enqueue({ type: "packet", data: chunk }); + }, + }); + + // Utils.toDeviceStream is a getter + vi.spyOn(Utils, "toDeviceStream", "get").mockReturnValue( + toDevice as unknown as typeof Utils.toDeviceStream, + ); + + vi.spyOn(Utils, "fromDeviceStream").mockImplementation( + () => + fromDeviceFactory() as unknown as TransformStream< + Uint8Array, + Types.DeviceOutput + >, + ); + + return { + restore: () => vi.restoreAllMocks(), + }; +} + +describe("TransportNodeSerial (contract)", () => { + let transformsStub: { restore: () => void } | undefined; + + beforeEach(() => { + transformsStub = stubCoreTransforms(); + }); + + afterEach(() => { + transformsStub?.restore(); + }); + + runTransportContract({ + name: "TransportNodeSerial", + setup: () => {}, + teardown: () => { + vi.restoreAllMocks(); + }, + create: async () => { + const fakePort = new FakeSerialPort(); + const transport = new TransportNodeSerial( + fakePort as unknown as SerialPort, + ); + await Promise.resolve(); + (globalThis as unknown as { __fakePort: FakeSerialPort }).__fakePort = + fakePort; + return transport; + }, + pushIncoming: async (bytes) => { + (globalThis as unknown as { __fakePort: FakeSerialPort }).__fakePort.pushIncoming( + bytes, + ); + await Promise.resolve(); + }, + assertLastWritten: (bytes) => { + const port = + (globalThis as unknown as { __fakePort: FakeSerialPort }).__fakePort; + expect(port.lastWritten).toBeDefined(); + expect(port.lastWritten).toEqual(bytes); + }, + triggerDisconnect: async () => { + (globalThis as unknown as { __fakePort: FakeSerialPort }).__fakePort.emitErrorOnce( + "test-disconnect", + ); + await Promise.resolve(); + }, + }); +}); + +describe("TransportNodeSerial (extras)", () => { + let transformsStub: { restore: () => void } | undefined; + + beforeEach(() => { + transformsStub = stubCoreTransforms(); + }); + + afterEach(() => { + transformsStub?.restore(); + }); + + it("emits DeviceDisconnected with reason 'port-closed' on close event", async () => { + const fakePort = new FakeSerialPort(); + const transport = new TransportNodeSerial( + fakePort as unknown as SerialPort, + ); + const reader = transport.fromDevice.getReader(); + + await Promise.resolve(); + + const first = await reader.read(); + expect(isStatusEvent(first.value)).toBe(true); + if (isStatusEvent(first.value)) { + expect(first.value.data.status).toBe( + Types.DeviceStatusEnum.DeviceConnecting, + ); + } + + const second = await reader.read(); + expect(isStatusEvent(second.value)).toBe(true); + if (isStatusEvent(second.value)) { + expect(second.value.data.status).toBe( + Types.DeviceStatusEnum.DeviceConnected, + ); + } + + fakePort.emitClose(); + await Promise.resolve(); + + let sawClosed = false; + for (let i = 0; i < 6; i++) { + const { value } = await reader.read(); + if (isStatusEvent(value) && value.data.reason === "port-closed") { + sawClosed = true; + break; + } + } + expect(sawClosed).toBe(true); + + reader.releaseLock(); + await transport.disconnect(); + }); +}); diff --git a/packages/transport-node-serial/src/transport.ts b/packages/transport-node-serial/src/transport.ts index 56947bb6..fc5df1a7 100644 --- a/packages/transport-node-serial/src/transport.ts +++ b/packages/transport-node-serial/src/transport.ts @@ -1,17 +1,29 @@ import { Readable, Writable } from "node:stream"; -import type { Types } from "@meshtastic/core"; -import { Utils } from "@meshtastic/core"; +import { Types, Utils } from "@meshtastic/core"; import { SerialPort } from "serialport"; +/** + * Node.js Serial transport for Meshtastic. + * + * Implements {@link Types.Transport} on top of a Node `SerialPort`. + * Use {@link TransportNodeSerial.create} for a convenient factory, or + * `new TransportNodeSerial(port)` if you already have an open port. + */ export class TransportNodeSerial implements Types.Transport { private readonly _toDevice: WritableStream; private readonly _fromDevice: ReadableStream; + private fromDeviceController?: ReadableStreamDefaultController; private port: SerialPort | undefined; + private pipePromise?: Promise; + private abortController: AbortController; + private lastStatus: Types.DeviceStatusEnum = + Types.DeviceStatusEnum.DeviceDisconnected; + private closingByUser = false; /** * Creates and connects a new TransportNode instance. * @param path - Path to the serial device - * @param baudRate - The port number for the TCP connection (defaults to 4403). + * @param baudRate - Baud rate for the serial connection (default is 115200). * @returns A promise that resolves with a connected TransportNode instance. */ public static create( @@ -46,23 +58,78 @@ export class TransportNodeSerial implements Types.Transport { this.port = port; this.port.on("error", (err) => { console.error("Serial port connection error:", err); + this.emitStatus(Types.DeviceStatusEnum.DeviceDisconnected, "port-error"); + }); + this.port.on("close", () => { + if (this.closingByUser) { + return; + } + this.emitStatus(Types.DeviceStatusEnum.DeviceDisconnected, "port-closed"); }); const fromDeviceSource = Readable.toWeb(port) as ReadableStream; - this._fromDevice = fromDeviceSource.pipeThrough(Utils.fromDeviceStream()); + const transformed = fromDeviceSource.pipeThrough(Utils.fromDeviceStream()); + + this.abortController = new AbortController(); + const controller = this.abortController; + + this._fromDevice = new ReadableStream({ + start: async (ctrl) => { + this.fromDeviceController = ctrl; + + this.emitStatus(Types.DeviceStatusEnum.DeviceConnecting); + this.emitStatus(Types.DeviceStatusEnum.DeviceConnected); + + const reader = transformed.getReader(); + try { + while (true) { + const { value, done } = await reader.read(); + if (done) { + break; + } + ctrl.enqueue(value); + } + ctrl.close(); + } catch (error) { + if (this.closingByUser) { + ctrl.close(); // graceful EOF on user + } else { + this.emitStatus( + Types.DeviceStatusEnum.DeviceDisconnected, + "read-error", + ); + ctrl.error( + error instanceof Error ? error : new Error(String(error)), + ); + } + try { + await transformed.cancel(); + } catch {} + } finally { + reader.releaseLock(); + } + }, + }); // Stream for data going FROM the application TO the Meshtastic device. - const toDeviceTransform = Utils.toDeviceStream; - this._toDevice = toDeviceTransform.writable; - - // The readable end of the transform is then piped to the Node.js SerialPort connection. - // A similar assertion is needed here because `Writable.toWeb` also returns - // a generically typed stream (`WritableStream`). - toDeviceTransform.readable - .pipeTo(Writable.toWeb(port) as WritableStream) - .catch((err) => { - console.error("Error piping data to serial port:", err); - this.port.close(err as Error); + this._toDevice = Utils.toDeviceStream.writable; + + this.pipePromise = Utils.toDeviceStream.readable + .pipeTo(Writable.toWeb(port) as WritableStream, { + signal: controller.signal, + }) + .catch((error) => { + if (controller.signal.aborted || this.closingByUser) { + return; + } + console.error("Error piping data to serial port:", error); + this.emitStatus( + Types.DeviceStatusEnum.DeviceDisconnected, + "write-error", + ); + try { + this.port?.close(); + } catch {} }); } @@ -80,9 +147,35 @@ export class TransportNodeSerial implements Types.Transport { return this._fromDevice; } - disconnect() { - this.port.close(); - this.port = undefined; - return Promise.resolve(); + /** + * Disconnect from the serial port and emit `DeviceDisconnected("user")`. + * Safe to call multiple times. + */ + async disconnect() { + try { + this.closingByUser = true; + this.emitStatus(Types.DeviceStatusEnum.DeviceDisconnected, "user"); + + this.abortController?.abort(); + await this.pipePromise?.catch(() => {}); + + try { + this.port?.close(); + } catch {} + } finally { + this.port = undefined; + this.closingByUser = false; + } + } + + private emitStatus(next: Types.DeviceStatusEnum, reason?: string): void { + if (next === this.lastStatus) { + return; + } + this.lastStatus = next; + this.fromDeviceController?.enqueue({ + type: "status", + data: { status: next, reason }, + }); } } diff --git a/packages/transport-node/src/transport.test.ts b/packages/transport-node/src/transport.test.ts new file mode 100644 index 00000000..0e5f7d79 --- /dev/null +++ b/packages/transport-node/src/transport.test.ts @@ -0,0 +1,186 @@ +import { describe, vi, expect, beforeEach, afterEach, it } from "vitest"; +import { Duplex } from "node:stream"; +import type { Socket } from "node:net"; +import { runTransportContract } from "../../../tests/utils/transportContract"; +import { TransportNode } from "./transport"; +import { Utils, Types } from "@meshtastic/core"; + +function isStatusEvent( + out: Types.DeviceOutput | undefined, +): out is Extract { + return !!out && (out as any).type === "status"; +} + +class FakeSocket extends Duplex { + public lastWritten: Uint8Array | undefined; + + constructor() { + super({ objectMode: false }); + } + + _read() {} + + _write( + chunk: Buffer, + _encoding: BufferEncoding, + callback: (error?: Error | null) => void, + ) { + this.lastWritten = new Uint8Array( + chunk.buffer, + chunk.byteOffset, + chunk.byteLength, + ); + callback(); + } + + pushIncoming(data: Uint8Array) { + const buf = Buffer.from(data.buffer, data.byteOffset, data.byteLength); + this.push(buf); + } + + emitErrorOnce(message = "simulated error") { + this.emit("error", new Error(message)); + } + + emitClose() { + this.emit("close"); + } + + override destroy(error?: Error) { + super.destroy(error); + this.emit("close"); + return this; + } +} + +function stubCoreTransforms() { + const toDevice = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk); + }, + }); + + const fromDeviceFactory = () => + new TransformStream({ + transform(chunk, controller) { + controller.enqueue({ type: "packet", data: chunk }); + }, + }); + + vi.spyOn(Utils, "toDeviceStream", "get").mockReturnValue( + toDevice as unknown as typeof Utils.toDeviceStream, + ); + + vi + .spyOn(Utils, "fromDeviceStream") + .mockImplementation( + () => + fromDeviceFactory() as unknown as TransformStream< + Uint8Array, + Types.DeviceOutput + >, + ); + + return { + restore: () => vi.restoreAllMocks(), + }; +} + +describe("TransportNode (contract)", () => { + let transformsStub: { restore: () => void } | undefined; + + beforeEach(() => { + transformsStub = stubCoreTransforms(); + }); + + afterEach(() => { + transformsStub?.restore(); + }); + + runTransportContract({ + name: "TransportNode", + setup: () => {}, + teardown: () => { + vi.restoreAllMocks(); + }, + create: async () => { + const fakeSocket = new FakeSocket(); + const transport = new TransportNode(fakeSocket as unknown as Socket); + await Promise.resolve(); + (globalThis as unknown as { __nodeSock: FakeSocket }).__nodeSock = + fakeSocket; + return transport; + }, + pushIncoming: async (bytes) => { + (globalThis as unknown as { __nodeSock: FakeSocket }).__nodeSock.pushIncoming( + bytes, + ); + await Promise.resolve(); + }, + assertLastWritten: (bytes) => { + const sock = (globalThis as unknown as { __nodeSock: FakeSocket }) + .__nodeSock; + expect(sock.lastWritten).toBeDefined(); + expect(sock.lastWritten).toEqual(bytes); + }, + triggerDisconnect: async () => { + (globalThis as unknown as { __nodeSock: FakeSocket }).__nodeSock.emitErrorOnce( + "test-disconnect", + ); + await Promise.resolve(); + }, + }); +}); + +describe("TransportNode (extras)", () => { + let transformsStub: { restore: () => void } | undefined; + + beforeEach(() => { + transformsStub = stubCoreTransforms(); + }); + + afterEach(() => { + transformsStub?.restore(); + }); + + it("emits DeviceDisconnected with reason 'socket-closed' on close event", async () => { + const fakeSocket = new FakeSocket(); + const transport = new TransportNode(fakeSocket as unknown as Socket); + const reader = transport.fromDevice.getReader(); + + await Promise.resolve(); + + const first = await reader.read(); + expect(isStatusEvent(first.value)).toBe(true); + if (isStatusEvent(first.value)) { + expect(first.value.data.status).toBe( + Types.DeviceStatusEnum.DeviceConnecting, + ); + } + + const second = await reader.read(); + expect(isStatusEvent(second.value)).toBe(true); + if (isStatusEvent(second.value)) { + expect(second.value.data.status).toBe( + Types.DeviceStatusEnum.DeviceConnected, + ); + } + + fakeSocket.emitClose(); + await Promise.resolve(); + + let sawClosed = false; + for (let i = 0; i < 6; i++) { + const { value } = await reader.read(); + if (isStatusEvent(value) && value.data.reason === "socket-closed") { + sawClosed = true; + break; + } + } + expect(sawClosed).toBe(true); + + reader.releaseLock(); + await transport.disconnect(); + }); + +}); diff --git a/packages/transport-node/src/transport.ts b/packages/transport-node/src/transport.ts index 1a98828d..03713d0b 100644 --- a/packages/transport-node/src/transport.ts +++ b/packages/transport-node/src/transport.ts @@ -1,12 +1,25 @@ import { Socket } from "node:net"; import { Readable, Writable } from "node:stream"; -import type { Types } from "@meshtastic/core"; -import { Utils } from "@meshtastic/core"; +import { Types, Utils } from "@meshtastic/core"; +/** + * Node.js TCP transport for Meshtastic. + * + * Implements {@link Types.Transport} on top of a Node `net.Socket`. + * Use {@link TransportNode.create} to open a new connection, or + * construct directly with an existing socket. + */ export class TransportNode implements Types.Transport { private readonly _toDevice: WritableStream; private readonly _fromDevice: ReadableStream; + private fromDeviceController?: ReadableStreamDefaultController; private socket: Socket | undefined; + private pipePromise?: Promise; + private abortController: AbortController; + private lastStatus: Types.DeviceStatusEnum = + Types.DeviceStatusEnum.DeviceDisconnected; + + private closingByUser = false; /** * Creates and connects a new TransportNode instance. @@ -38,47 +51,131 @@ export class TransportNode implements Types.Transport { */ constructor(connection: Socket) { this.socket = connection; + this.socket.on("error", (err) => { console.error("Socket connection error:", err); + if (!this.closingByUser) { + this.emitStatus( + Types.DeviceStatusEnum.DeviceDisconnected, + "socket-error", + ); + } + }); + + this.socket.on("close", () => { + if (this.closingByUser) { + return; // suppress close-derived disconnect in user flow + } + this.emitStatus( + Types.DeviceStatusEnum.DeviceDisconnected, + "socket-closed", + ); }); + this.abortController = new AbortController(); + const abortController = this.abortController; + const fromDeviceSource = Readable.toWeb( connection, ) as ReadableStream; - this._fromDevice = fromDeviceSource.pipeThrough(Utils.fromDeviceStream()); + const transformed = fromDeviceSource.pipeThrough(Utils.fromDeviceStream()); + + this._fromDevice = new ReadableStream({ + start: async (ctrl) => { + this.fromDeviceController = ctrl; + + this.emitStatus(Types.DeviceStatusEnum.DeviceConnecting); + this.emitStatus(Types.DeviceStatusEnum.DeviceConnected); + + const reader = transformed.getReader(); + try { + while (true) { + const { value, done } = await reader.read(); + if (done) { + break; + } + ctrl.enqueue(value); + } + ctrl.close(); + } catch (error) { + if (this.closingByUser) { + ctrl.close(); + } else { + this.emitStatus( + Types.DeviceStatusEnum.DeviceDisconnected, + "read-error", + ); + + ctrl.error( + error instanceof Error ? error : new Error(String(error)), + ); + } + try { + await transformed.cancel(); + } catch {} + } finally { + reader.releaseLock(); + } + }, + }); // Stream for data going FROM the application TO the Meshtastic device. const toDeviceTransform = Utils.toDeviceStream; this._toDevice = toDeviceTransform.writable; - // The readable end of the transform is then piped to the Node.js socket. - // A similar assertion is needed here because `Writable.toWeb` also returns - // a generically typed stream (`WritableStream`). - toDeviceTransform.readable - .pipeTo(Writable.toWeb(connection) as WritableStream) + this.pipePromise = toDeviceTransform.readable + .pipeTo(Writable.toWeb(connection) as WritableStream, { + signal: abortController.signal, + }) .catch((err) => { + if (abortController.signal.aborted) { + return; + } console.error("Error piping data to socket:", err); - this.socket.destroy(err as Error); + const error = err instanceof Error ? err : new Error(String(err)); + this.socket?.destroy(error); }); } - /** - * The WritableStream to send data to the Meshtastic device. - */ + /** WritableStream to send data to the device. */ public get toDevice(): WritableStream { return this._toDevice; } - /** - * The ReadableStream to receive data from the Meshtastic device. - */ + /** ReadableStream to receive data from the device. */ public get fromDevice(): ReadableStream { return this._fromDevice; } - disconnect() { - this.socket.destroy(); - this.socket = undefined; - return Promise.resolve(); + /** + * Disconnect from the TCP socket and emit `DeviceDisconnected("user")`. + * Safe to call multiple times. + */ + async disconnect(): Promise { + try { + this.closingByUser = true; + this.emitStatus(Types.DeviceStatusEnum.DeviceDisconnected, "user"); + + this.abortController.abort(); + if (this.pipePromise) { + await this.pipePromise; + } + + this.socket?.destroy(); + } finally { + this.socket = undefined; + this.closingByUser = false; + } + } + + private emitStatus(next: Types.DeviceStatusEnum, reason?: string): void { + if (next === this.lastStatus) { + return; + } + this.lastStatus = next; + this.fromDeviceController?.enqueue({ + type: "status", + data: { status: next, reason }, + }); } } diff --git a/packages/transport-web-bluetooth/src/transport.test.ts b/packages/transport-web-bluetooth/src/transport.test.ts new file mode 100644 index 00000000..9d9d0e63 --- /dev/null +++ b/packages/transport-web-bluetooth/src/transport.test.ts @@ -0,0 +1,166 @@ +import { describe, vi, expect, beforeEach, afterEach } from "vitest"; +import { runTransportContract } from "../../../tests/utils/transportContract"; +import { TransportWebBluetooth } from "./transport"; + +class MiniEmitter { + private listeners = new Map void>>(); + addEventListener(type: string, listener: (e: Event) => void) { + if (!this.listeners.has(type)) this.listeners.set(type, new Set()); + this.listeners.get(type)!.add(listener); + } + removeEventListener(type: string, listener: (e: Event) => void) { + this.listeners.get(type)?.delete(listener); + } + dispatchEvent(event: Event) { + this.listeners.get(event.type)?.forEach((l) => l(event)); + } +} + +function stubWebBluetooth() { + const incomingQueue: Uint8Array[] = []; + let lastWritten: Uint8Array | undefined; + + // fromRadioCharacteristic: read bytes from queue, one buffer per read + const fromRadioCharacteristic: BluetoothRemoteGATTCharacteristic = { + async readValue() { + const next = incomingQueue.shift() ?? new Uint8Array(); + return new DataView( + next.buffer, + next.byteOffset, + next.byteLength, + ) as unknown as DataView; + }, + addEventListener() {}, + removeEventListener() {}, + } as unknown as BluetoothRemoteGATTCharacteristic; + + // characteristicvaluechanged event plumbing (fromNumCharacteristic) + const charEmitter = new MiniEmitter(); + + const fromNumCharacteristic: BluetoothRemoteGATTCharacteristic = { + async startNotifications() { + return this; + }, + addEventListener(type: string, listener: (e: Event) => void) { + charEmitter.addEventListener(type, listener); + }, + removeEventListener(type: string, listener: (e: Event) => void) { + charEmitter.removeEventListener(type, listener); + }, + } as unknown as BluetoothRemoteGATTCharacteristic; + + const toRadioCharacteristic: BluetoothRemoteGATTCharacteristic = { + async writeValue(bufferSource: BufferSource) { + const u8 = + bufferSource instanceof ArrayBuffer + ? new Uint8Array(bufferSource) + : new Uint8Array( + bufferSource.buffer, + bufferSource.byteOffset, + bufferSource.byteLength, + ); + lastWritten = new Uint8Array(u8); + }, + } as unknown as BluetoothRemoteGATTCharacteristic; + + // Primary service returns our three characteristics by UUID + const primaryService: BluetoothRemoteGATTService = { + async getCharacteristic(uuid: string) { + if (uuid === TransportWebBluetooth.ToRadioUuid) return toRadioCharacteristic; + if (uuid === TransportWebBluetooth.FromRadioUuid) return fromRadioCharacteristic; + if (uuid === TransportWebBluetooth.FromNumUuid) return fromNumCharacteristic; + throw new Error("Unknown characteristic: " + uuid); + }, + } as unknown as BluetoothRemoteGATTService; + + // Device-level emitter to deliver gattserverdisconnected + const deviceEmitter = new MiniEmitter(); + + // GATT server with readonly connected + let isConnected = true; + const gattServer: BluetoothRemoteGATTServer = { + get connected() { + return isConnected; + }, + async connect() { + isConnected = true; + return gattServer; + }, + disconnect() { + isConnected = false; + deviceEmitter.dispatchEvent(new Event("gattserverdisconnected")); + }, + async getPrimaryService() { + return primaryService; + }, + device: { + addEventListener: (...args: Parameters) => + deviceEmitter.addEventListener(args[0] as string, args[1] as (e: Event) => void), + removeEventListener: (...args: Parameters) => + deviceEmitter.removeEventListener(args[0] as string, args[1] as (e: Event) => void), + } as unknown as BluetoothDevice, + } as unknown as BluetoothRemoteGATTServer; + + const fakeDevice: BluetoothDevice = { + async watchAdvertisements() {}, + gatt: gattServer, + } as unknown as BluetoothDevice; + + const fakeNavigator = { + bluetooth: { + async requestDevice() { + return fakeDevice; + }, + }, + }; + + vi.stubGlobal("navigator", Object.assign({}, globalThis.navigator, fakeNavigator)); + + // helper actions for tests/contract + return { + pushIncoming: (u8: Uint8Array) => { + incomingQueue.push(u8); + charEmitter.dispatchEvent(new Event("characteristicvaluechanged")); + }, + assertLastWritten: (u8: Uint8Array) => { + expect(lastWritten).toBeDefined(); + expect(lastWritten).toEqual(u8); + }, + // simulate underlying link drop (OS-level disconnect) + triggerGattDisconnect: () => { + isConnected = false; + deviceEmitter.dispatchEvent(new Event("gattserverdisconnected")); + }, + cleanup: () => { + vi.unstubAllGlobals(); + }, + }; +} + +describe("TransportWebBluetooth (contract)", () => { + runTransportContract({ + name: "TransportWebBluetooth", + setup: () => {}, + teardown: () => { + (globalThis as unknown as { __ble?: ReturnType }).__ble?.cleanup(); + (globalThis as unknown as { __ble?: ReturnType }).__ble = undefined; + vi.restoreAllMocks(); + vi.unstubAllGlobals(); + }, + create: async () => { + (globalThis as unknown as { __ble: ReturnType }).__ble = stubWebBluetooth(); + return await TransportWebBluetooth.create(); + }, + pushIncoming: async (bytes) => { + (globalThis as unknown as { __ble: ReturnType }).__ble.pushIncoming(bytes); + await Promise.resolve(); + }, + assertLastWritten: (bytes) => { + (globalThis as unknown as { __ble: ReturnType }).__ble.assertLastWritten(bytes); + }, + triggerDisconnect: async () => { + (globalThis as unknown as { __ble: ReturnType }).__ble.triggerGattDisconnect(); + await Promise.resolve(); + }, + }); +}); diff --git a/packages/transport-web-bluetooth/src/transport.ts b/packages/transport-web-bluetooth/src/transport.ts index 05cf094c..00c0e0c4 100644 --- a/packages/transport-web-bluetooth/src/transport.ts +++ b/packages/transport-web-bluetooth/src/transport.ts @@ -1,21 +1,63 @@ -import type { Types } from "@meshtastic/core"; +import { Types } from "@meshtastic/core"; +function toArrayBuffer(uint8array: Uint8Array): ArrayBuffer { + if ( + uint8array.buffer instanceof ArrayBuffer && + uint8array.byteOffset === 0 && + uint8array.byteLength === uint8array.buffer.byteLength + ) { + return uint8array.buffer; + } + return uint8array.slice().buffer; +} + +/** + * Provides Web Bluetooth transport for Meshtastic devices. + * + * Implements the {@link Types.Transport} contract using the Web Bluetooth API. + * Use {@link TransportWebBluetooth.create} or {@link TransportWebBluetooth.createFromDevice} + * to construct an instance. + */ export class TransportWebBluetooth implements Types.Transport { private _toDevice: WritableStream; private _fromDevice: ReadableStream; - private _fromDeviceController?: ReadableStreamDefaultController; - private _isFirstWrite = true; + private fromDeviceController?: ReadableStreamDefaultController; private toRadioCharacteristic: BluetoothRemoteGATTCharacteristic; private fromRadioCharacteristic: BluetoothRemoteGATTCharacteristic; private fromNumCharacteristic: BluetoothRemoteGATTCharacteristic; private gattServer: BluetoothRemoteGATTServer; + private lastStatus: Types.DeviceStatusEnum = + Types.DeviceStatusEnum.DeviceDisconnected; + + private closingByUser = false; + private reading = false; + /** UUID for the "toRadio" write characteristic. */ static ToRadioUuid = "f75c76d2-129e-4dad-a1dd-7866124401e7"; + /** UUID for the "fromRadio" read characteristic. */ static FromRadioUuid = "2c55e69e-4993-11ed-b878-0242ac120002"; + /** UUID for the "fromNum" notification characteristic. */ static FromNumUuid = "ed9da18c-a800-4f66-a670-aa7547e34453"; + /** UUID for the Meshtastic GATT service. */ static ServiceUuid = "6ba1b218-15a8-461f-9fa8-5dcae273eafd"; + private onGattDisconnected = () => { + if (this.closingByUser) { + return; + } + this.emitStatus( + Types.DeviceStatusEnum.DeviceDisconnected, + "gatt-disconnected", + ); + }; + private onFromNumChanged = () => { + void this.readFromRadio(); + }; + + /** + * Prompts the user to select a Bluetooth device, connects it, and returns a transport. + */ public static async create(): Promise { const device = await navigator.bluetooth.requestDevice({ filters: [{ services: [TransportWebBluetooth.ServiceUuid] }], @@ -23,17 +65,25 @@ export class TransportWebBluetooth implements Types.Transport { return await TransportWebBluetooth.prepareConnection(device); } + /** + * Creates a transport from an existing, user-provided {@link BluetoothDevice}. + */ public static async createFromDevice( device: BluetoothDevice, ): Promise { return await TransportWebBluetooth.prepareConnection(device); } + /** + * Prepares and connects to a {@link BluetoothDevice}, resolving its GATT server + * and characteristics, then returning a transport. + * + * @throws if required services or characteristics are missing. + */ public static async prepareConnection( device: BluetoothDevice, ): Promise { const gattServer = await device.gatt?.connect(); - if (!gattServer) { throw new Error("Failed to connect to GATT server"); } @@ -41,7 +91,6 @@ export class TransportWebBluetooth implements Types.Transport { const service = await gattServer.getPrimaryService( TransportWebBluetooth.ServiceUuid, ); - const toRadioCharacteristic = await service.getCharacteristic( TransportWebBluetooth.ToRadioUuid, ); @@ -60,8 +109,6 @@ export class TransportWebBluetooth implements Types.Transport { throw new Error("Failed to find required characteristics"); } - console.log("Connected to device", device.name); - return new TransportWebBluetooth( toRadioCharacteristic, fromRadioCharacteristic, @@ -70,6 +117,10 @@ export class TransportWebBluetooth implements Types.Transport { ); } + /** + * Create a transport from resolved GATT characteristics and server. + * Prefer using the static factory methods instead. + */ constructor( toRadioCharacteristic: BluetoothRemoteGATTCharacteristic, fromRadioCharacteristic: BluetoothRemoteGATTCharacteristic, @@ -81,66 +132,135 @@ export class TransportWebBluetooth implements Types.Transport { this.fromNumCharacteristic = fromNumCharacteristic; this.gattServer = gattServer; - this._fromDevice = new ReadableStream({ - start: (ctrl) => { - this._fromDeviceController = ctrl; - }, - }); + this._fromDevice = new ReadableStream({ + start: async (ctrl) => { + this.fromDeviceController = ctrl; + this.emitStatus(Types.DeviceStatusEnum.DeviceConnecting); - this._toDevice = new WritableStream({ - write: async (chunk) => { - await this.toRadioCharacteristic.writeValue(chunk); + this.gattServer.device.addEventListener( + "gattserverdisconnected", + this.onGattDisconnected, + ); - if (this._isFirstWrite && this._fromDeviceController) { - this._isFirstWrite = false; - - setTimeout(() => { - // biome-ignore lint/style/noNonNullAssertion: we know this will be set - this.readFromRadio(this._fromDeviceController!); - }, 50); + try { + await this.fromNumCharacteristic.startNotifications(); + this.fromNumCharacteristic.addEventListener( + "characteristicvaluechanged", + this.onFromNumChanged, + ); + this.emitStatus(Types.DeviceStatusEnum.DeviceConnected); + // prime once in case data already queued + void this.readFromRadio(); + } catch { + this.emitStatus( + Types.DeviceStatusEnum.DeviceDisconnected, + "notify-failed", + ); + this.gattServer.device.removeEventListener( + "gattserverdisconnected", + this.onGattDisconnected, + ); } }, }); - this.fromNumCharacteristic.addEventListener( - "characteristicvaluechanged", - () => { - if (this._fromDeviceController) { - this.readFromRadio(this._fromDeviceController); + this._toDevice = new WritableStream({ + write: async (chunk) => { + try { + const ab = toArrayBuffer(chunk); + await this.toRadioCharacteristic.writeValue(ab); + void this.readFromRadio(); // ensure we read any response + } catch (error) { + this.emitStatus( + Types.DeviceStatusEnum.DeviceDisconnected, + "write-error", + ); + throw error; } }, - ); - - this.fromNumCharacteristic.startNotifications(); + }); } + /** Writable stream of bytes to the device. */ get toDevice(): WritableStream { return this._toDevice; } + /** Readable stream of {@link Types.DeviceOutput} from the device. */ get fromDevice(): ReadableStream { return this._fromDevice; } - protected async readFromRadio( - controller: ReadableStreamDefaultController, - ): Promise { - let hasMoreData = true; - while (hasMoreData && this.fromRadioCharacteristic) { - const value = await this.fromRadioCharacteristic.readValue(); - if (value.byteLength === 0) { - hasMoreData = false; - continue; + /** + * Closes the GATT connection and emits `DeviceDisconnected("user")`. + */ + disconnect(): Promise { + try { + this.closingByUser = true; + this.emitStatus(Types.DeviceStatusEnum.DeviceDisconnected, "user"); + try { + this.fromNumCharacteristic.stopNotifications?.(); + } catch {} + this.fromNumCharacteristic.removeEventListener( + "characteristicvaluechanged", + this.onFromNumChanged, + ); + this.gattServer.device.removeEventListener( + "gattserverdisconnected", + this.onGattDisconnected, + ); + + this.gattServer.disconnect(); + } finally { + this.closingByUser = false; + } + return Promise.resolve(); + } + + private async readFromRadio(): Promise { + if (this.reading) { + return; + } + this.reading = true; + + try { + let hasMoreData = true; + while (hasMoreData && this.fromRadioCharacteristic) { + const value = await this.fromRadioCharacteristic.readValue(); + if (value.byteLength === 0) { + hasMoreData = false; + continue; + } + this.enqueue({ + type: "packet", + data: new Uint8Array(value.buffer), + }); + } + } catch (error) { + if (!this.closingByUser) { + this.emitStatus( + Types.DeviceStatusEnum.DeviceDisconnected, + "read-error", + ); } - controller.enqueue({ - type: "packet", - data: new Uint8Array(value.buffer), - }); + throw error; + } finally { + this.reading = false; } } - disconnect(): Promise { - this.gattServer.disconnect(); - return Promise.resolve(); + private emitStatus(next: Types.DeviceStatusEnum, reason?: string): void { + if (next === this.lastStatus) { + return; + } + this.lastStatus = next; + this.fromDeviceController?.enqueue({ + type: "status", + data: { status: next, reason }, + }); + } + + private enqueue(output: Types.DeviceOutput): void { + this.fromDeviceController?.enqueue(output); } } diff --git a/packages/transport-web-serial/src/transport.test.ts b/packages/transport-web-serial/src/transport.test.ts new file mode 100644 index 00000000..0a724a15 --- /dev/null +++ b/packages/transport-web-serial/src/transport.test.ts @@ -0,0 +1,225 @@ +import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; +import { TransportWebSerial } from "./transport"; +import { Types, Utils } from "@meshtastic/core"; +import { runTransportContract } from "../../../tests/utils/transportContract"; + +function stubCoreTransforms() { + const toDevice = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk); + }, + }); + + // maps raw bytes -> DeviceOutput.packet + const fromDeviceFactory = () => + new TransformStream({ + transform(chunk, controller) { + controller.enqueue({ type: "packet", data: chunk }); + }, + }); + + const restoreTo = vi + .spyOn(Utils, "toDeviceStream", "get") + .mockReturnValue(toDevice as unknown as typeof Utils.toDeviceStream); + + const restoreFrom = vi + .spyOn(Utils, "fromDeviceStream") + .mockImplementation( + () => + fromDeviceFactory() as unknown as TransformStream< + Uint8Array, + Types.DeviceOutput + >, + ); + + return { + restore: () => { + restoreTo.mockRestore(); + restoreFrom.mockRestore(); + }, + }; +} + +function stubNavigatorSerial() { + type SerialDisconnectHandler = (ev: { port?: any }) => void; + const handlers = new Set(); + + const serialStub = { + addEventListener: (type: string, handler: EventListenerOrEventListenerObject) => { + if (type === "disconnect") handlers.add(handler as any as SerialDisconnectHandler); + }, + removeEventListener: (type: string, handler: EventListenerOrEventListenerObject) => { + if (type === "disconnect") handlers.delete(handler as any as SerialDisconnectHandler); + }, + dispatchDisconnect(port: any) { + for (const h of handlers) h({ port }); + }, + requestPort: vi.fn(async () => new FakeSerialPort()), + }; + + const nav: any = (globalThis as any).navigator ?? {}; + const hadNavigator = !!(globalThis as any).navigator; + const originalSerial = nav.serial; + + if (!hadNavigator) { + Object.defineProperty(globalThis as any, "navigator", { + value: nav, + configurable: true, + writable: false, + }); + } + + Object.defineProperty(nav, "serial", { + value: serialStub, + configurable: true, + enumerable: true, + writable: true, + }); + + return { + serialStub, + restore: () => { + if (hadNavigator) { + if (originalSerial === undefined) { + delete (globalThis as any).navigator.serial; + } else { + Object.defineProperty((globalThis as any).navigator, "serial", { + value: originalSerial, + configurable: true, + enumerable: true, + writable: true, + }); + } + } else { + delete (globalThis as any).navigator; + } + }, + }; +} + +class FakeSerialPort { + readable: ReadableStream; + writable: WritableStream; + lastWritten?: Uint8Array; + + private _readController!: ReadableStreamDefaultController; + + constructor() { + this.readable = new ReadableStream({ + start: (controller) => { + this._readController = controller; + }, + }); + + this.writable = new WritableStream({ + write: async (chunk) => { + this.lastWritten = chunk; + }, + }); + } + + open(_options?: { baudRate?: number }): Promise { + return Promise.resolve(); + } + + close(): Promise { + try { + this._readController.close(); + } catch {} + return Promise.resolve(); + } + + pushIncoming(bytes: Uint8Array) { + this._readController.enqueue(bytes); + } +} + +describe("TransportWebSerial (contract)", () => { + let transforms: { restore(): void } | undefined; + let navSerial: { serialStub: any; restore(): void } | undefined; + + beforeEach(() => { + transforms = stubCoreTransforms(); + navSerial = stubNavigatorSerial(); + }); + + afterEach(() => { + transforms?.restore(); + navSerial?.restore(); + vi.restoreAllMocks(); + }); + + runTransportContract({ + name: "TransportWebSerial", + setup: () => {}, + teardown: () => {}, + create: async () => { + const fake = new FakeSerialPort(); + const transport = await TransportWebSerial.createFromPort(fake as any); + (globalThis as any).__ws = { fake, serial: navSerial!.serialStub }; + await Promise.resolve(); + return transport; + }, + pushIncoming: async (bytes) => { + (globalThis as any).__ws.fake.pushIncoming(bytes); + await Promise.resolve(); + }, + assertLastWritten: (bytes) => { + expect((globalThis as any).__ws.fake.lastWritten).toEqual(bytes); + }, + triggerDisconnect: async () => { + (globalThis as any).__ws.serial.dispatchDisconnect( + (globalThis as any).__ws.fake, + ); + await Promise.resolve(); + }, + }); +}); + +describe("TransportWebSerial (extras)", () => { + let transforms: { restore(): void } | undefined; + let navSerial: { serialStub: any; restore(): void } | undefined; + + beforeEach(() => { + transforms = stubCoreTransforms(); + navSerial = stubNavigatorSerial(); + }); + + afterEach(() => { + transforms?.restore(); + navSerial?.restore(); + vi.restoreAllMocks(); + }); + + it("emits DeviceDisconnected('serial-disconnected') on OS disconnect event", async () => { + const fake = new FakeSerialPort(); + const transport = await TransportWebSerial.createFromPort(fake as any); + (globalThis as any).__ws = { fake, serial: navSerial!.serialStub }; + + const reader = transport.fromDevice.getReader(); + + // drain statuses until connected + for (let i = 0; i < 3; i++) { + const { value } = await reader.read(); + if (!value || value.type !== "status") break; + if (value.data.status === Types.DeviceStatusEnum.DeviceConnected) break; + } + + // fire OS-level disconnect + navSerial!.serialStub.dispatchDisconnect(fake as any); + await Promise.resolve(); + + let saw = false; + for (let i = 0; i < 6; i++) { + const { value } = await reader.read(); + if (value?.type === "status" && value.data.reason === "serial-disconnected") { + saw = true; + break; + } + } + expect(saw).toBe(true); + + reader.releaseLock(); + await transport.disconnect(); + }); +}); diff --git a/packages/transport-web-serial/src/transport.ts b/packages/transport-web-serial/src/transport.ts index 69aa35a1..949e4d5c 100644 --- a/packages/transport-web-serial/src/transport.ts +++ b/packages/transport-web-serial/src/transport.ts @@ -1,72 +1,165 @@ -import type { Types } from "@meshtastic/core"; -import { Utils } from "@meshtastic/core"; +import { Types, Utils } from "@meshtastic/core"; +/** + * Provides Web Serial transport for Meshtastic devices. + * + * Implements the {@link Types.Transport} contract using the Web Serial API. + * Use {@link TransportWebSerial.create} or {@link TransportWebSerial.createFromPort} + * to construct an instance. + */ export class TransportWebSerial implements Types.Transport { private _toDevice: WritableStream; private _fromDevice: ReadableStream; + private fromDeviceController?: ReadableStreamDefaultController; private connection: SerialPort; private pipePromise: Promise | null = null; private abortController: AbortController; + private portReadable: ReadableStream; + private lastStatus: Types.DeviceStatusEnum = + Types.DeviceStatusEnum.DeviceDisconnected; + private closingByUser = false; + + /** + * Create a new TransportWebSerial instance using a serial port. + */ public static async create(baudRate?: number): Promise { const port = await navigator.serial.requestPort(); await port.open({ baudRate: baudRate || 115200 }); return new TransportWebSerial(port); } + /** + * Creates a new TransportWebSerial instance from an existing, provided {@link SerialPort}. + * Opens it if not already open. + */ public static async createFromPort( port: SerialPort, baudRate?: number, ): Promise { - await port.open({ baudRate: baudRate || 115200 }); + if (!port.readable || !port.writable) { + await port.open({ baudRate: baudRate || 115200 }); + } return new TransportWebSerial(port); } + /** + * Constructs a transport around a given {@link SerialPort}. + * @throws If the port lacks readable or writable streams. + */ constructor(connection: SerialPort) { if (!connection.readable || !connection.writable) { throw new Error("Stream not accessible"); } this.connection = connection; + this.portReadable = connection.readable; this.abortController = new AbortController(); + const abortController = this.abortController; // Set up the pipe with abort signal for clean cancellation - this.pipePromise = Utils.toDeviceStream.readable.pipeTo( - connection.writable, - { signal: this.abortController.signal }, - ); + this.pipePromise = Utils.toDeviceStream.readable + .pipeTo(connection.writable, { signal: this.abortController.signal }) + .catch((err) => { + // Ignore expected rejection when we cancel it via the AbortController. + if (abortController.signal.aborted) { + return; + } + console.error("Error piping data to serial port:", err); + this.connection.close().catch(() => {}); + this.emitStatus( + Types.DeviceStatusEnum.DeviceDisconnected, + "write-error", + ); + }); this._toDevice = Utils.toDeviceStream.writable; - this._fromDevice = connection.readable.pipeThrough( - Utils.fromDeviceStream(), - ); + + // Wrap + capture controller to inject status packets + this._fromDevice = new ReadableStream({ + start: async (ctrl) => { + this.fromDeviceController = ctrl; + + this.emitStatus(Types.DeviceStatusEnum.DeviceConnecting); + + const transformed = this.portReadable.pipeThrough( + Utils.fromDeviceStream(), + ); + const reader = transformed.getReader(); + + const onOsDisconnect = (ev: Event) => { + const { port } = ev as unknown as { port?: SerialPort }; + if (port && port === this.connection) { + this.emitStatus( + Types.DeviceStatusEnum.DeviceDisconnected, + "serial-disconnected", + ); + } + }; + navigator.serial.addEventListener("disconnect", onOsDisconnect); + + this.emitStatus(Types.DeviceStatusEnum.DeviceConnected); + + try { + while (true) { + const { value, done } = await reader.read(); + if (done) { + break; + } + ctrl.enqueue(value); + } + ctrl.close(); + } catch (error) { + if (!this.closingByUser) { + this.emitStatus( + Types.DeviceStatusEnum.DeviceDisconnected, + "read-error", + ); + } + ctrl.error(error instanceof Error ? error : new Error(String(error))); + try { + await transformed.cancel(); + } catch {} + } finally { + reader.releaseLock(); + navigator.serial.removeEventListener("disconnect", onOsDisconnect); + } + }, + }); } - get toDevice(): WritableStream { + /** Writable stream of bytes to the device. */ + public get toDevice(): WritableStream { return this._toDevice; } - get fromDevice(): ReadableStream { + /** Readable stream of {@link Types.DeviceOutput} from the device. */ + public get fromDevice(): ReadableStream { return this._fromDevice; } + private emitStatus(next: Types.DeviceStatusEnum, reason?: string): void { + if (next === this.lastStatus) { + return; + } + this.lastStatus = next; + this.fromDeviceController?.enqueue({ + type: "status", + data: { status: next, reason }, + }); + } + /** - * Safely disconnects the serial port, following best practices from - * https://github.com/WICG/serial/. Cancels any active pipe - * operations and only closes the port after streams are unlocked. + * Closes the serial port and emits `DeviceDisconnected("user")`. */ - async disconnect() { + public async disconnect(): Promise { try { - this.abortController.abort(); + this.closingByUser = true; + // Stop outbound piping + this.abortController.abort(); if (this.pipePromise) { - try { - await this.pipePromise; - } catch (error) { - if (error instanceof Error && error.name !== "AbortError") { - throw error; - } - } + await this.pipePromise; } // Cancel any remaining streams @@ -82,6 +175,9 @@ export class TransportWebSerial implements Types.Transport { } catch (error) { // If we can't close cleanly, let the browser handle cleanup console.warn("Could not cleanly disconnect serial port:", error); + } finally { + this.emitStatus(Types.DeviceStatusEnum.DeviceDisconnected, "user"); + this.closingByUser = false; } } @@ -89,14 +185,43 @@ export class TransportWebSerial implements Types.Transport { * Reconnects the transport by creating a new AbortController and re-establishing * the pipe connection. Only call this after disconnect() or if the connection failed. */ - async reconnect() { - // Create a new AbortController for the new connection - this.abortController = new AbortController(); + public async reconnect() { + this.emitStatus(Types.DeviceStatusEnum.DeviceConnecting, "reconnect"); + + try { + if (!this.connection.readable || !this.connection.writable) { + throw new Error("Stream not accessible"); + } + this.portReadable = this.connection.readable; - // Re-establish the pipe connection - this.pipePromise = Utils.toDeviceStream.readable.pipeTo( - this.connection.writable, - { signal: this.abortController.signal }, - ); + // Create a new AbortController for the new connection + this.abortController = new AbortController(); + const abortController = this.abortController; + + // Re-establish the pipe connection + this.pipePromise = Utils.toDeviceStream.readable + .pipeTo(this.connection.writable, { + signal: this.abortController.signal, + }) + .catch((error) => { + if (abortController.signal.aborted) { + return; + } + console.error("Error piping data to serial port (reconnect):", error); + this.emitStatus( + Types.DeviceStatusEnum.DeviceDisconnected, + "write-error", + ); + }); + + this.emitStatus(Types.DeviceStatusEnum.DeviceConnected, "reconnected"); + } catch (error) { + // Couldn’t re-pipe + this.emitStatus( + Types.DeviceStatusEnum.DeviceDisconnected, + "reconnect-failed", + ); + throw error; + } } } diff --git a/packages/web/vitest.config.ts b/packages/web/vitest.config.ts index e793d36b..c561d19c 100644 --- a/packages/web/vitest.config.ts +++ b/packages/web/vitest.config.ts @@ -1,23 +1,27 @@ import path from "node:path"; -import process from "node:process"; +import { fileURLToPath } from "node:url"; import react from "@vitejs/plugin-react"; import { enableMapSet } from "immer"; import { defineProject } from "vitest/config"; enableMapSet(); + +const __filename = fileURLToPath(import.meta.url); +const __dirname = path.dirname(__filename); +const pkgRoot = __dirname; +const srcDir = path.resolve(pkgRoot, "src"); +const publicDir = path.resolve(pkgRoot, "public"); + export default defineProject({ plugins: [react()], resolve: { alias: { - "@app": path.resolve(process.cwd(), "./packages/web/src"), - "@public": path.resolve(process.cwd(), "./packages/web/public"), - "@core": path.resolve(process.cwd(), "./packages/web/src/core"), - "@pages": path.resolve(process.cwd(), "./packages/web/src/pages"), - "@components": path.resolve( - process.cwd(), - "./packages/web/src/components", - ), - "@layouts": path.resolve(process.cwd(), "./packages/web/src/layouts"), + "@app": srcDir, + "@public": publicDir, + "@core": path.resolve(srcDir, "core"), + "@pages": path.resolve(srcDir, "pages"), + "@components": path.resolve(srcDir, "components"), + "@layouts": path.resolve(srcDir, "layouts"), }, }, test: { @@ -26,8 +30,7 @@ export default defineProject({ mockReset: true, clearMocks: true, restoreMocks: true, - root: path.resolve(process.cwd(), "./packages/web/src"), - include: ["**/*.{test,spec}.{ts,tsx}"], - setupFiles: ["./src/tests/setup.ts"], + include: ["src/**/*.{test,spec}.{ts,tsx}"], + setupFiles: [path.resolve(srcDir, "tests/setup.ts")], }, }); diff --git a/tests/utils/transportContract.ts b/tests/utils/transportContract.ts new file mode 100644 index 00000000..bafdb92f --- /dev/null +++ b/tests/utils/transportContract.ts @@ -0,0 +1,122 @@ +import { Types } from "@meshtastic/core"; +import { describe, expect, it } from "vitest"; + +export interface TransportContract { + name: string; + create: () => Promise; + setup?: () => void | Promise; + teardown?: () => void | Promise; + pushIncoming?: (bytes: Uint8Array) => void | Promise; + assertLastWritten?: (bytes: Uint8Array) => void; + triggerDisconnect?: () => void | Promise; +} + +async function readUntilType( + reader: ReadableStreamDefaultReader, + expectedType: Types.DeviceOutput["type"], + maxReads = 20, +): Promise { + for (let i = 0; i < maxReads; i++) { + const { value, done } = await reader.read(); + if (done) { + break; + } + if (value && value.type === expectedType) { + return value; + } + } + throw new Error( + `Did not receive a '${expectedType}' event within ${maxReads} reads`, + ); +} + +export function runTransportContract(contract: TransportContract) { + describe(contract.name, () => { + it("reads packets from fromDevice", async () => { + await contract.setup?.(); + const transport = await contract.create(); + + const reader = transport.fromDevice.getReader(); + const sampleBytes = new Uint8Array([0x01, 0x02, 0x03]); + + await contract.pushIncoming?.(sampleBytes); + + const packetEvent = await readUntilType(reader, "packet"); + expect("data" in packetEvent ? packetEvent.data : undefined).toEqual( + sampleBytes, + ); + + reader.releaseLock(); + await contract.teardown?.(); + }); + + it("writes bytes to toDevice", async () => { + await contract.setup?.(); + const transport = await contract.create(); + + const writer = transport.toDevice.getWriter(); + const outgoingBytes = new Uint8Array([0xaa, 0xbb]); + await writer.write(outgoingBytes); + await writer.close(); + + contract.assertLastWritten?.(outgoingBytes); + await contract.teardown?.(); + }); + + it("disconnect() emits DeviceDisconnected('user')", async () => { + await contract.setup?.(); + const transport = await contract.create(); + + const reader = transport.fromDevice.getReader(); + + // Trigger user disconnect + await transport.disconnect(); + + // Read a few events and assert we eventually see the user disconnect. + let sawUser = false; + for (let i = 0; i < 10; i++) { + const { value } = await reader.read(); + if ( + value && + value.type === "status" && + value.data.status === Types.DeviceStatusEnum.DeviceDisconnected && + value.data.reason === "user" + ) { + sawUser = true; + break; + } + } + expect(sawUser).toBe(true); + + reader.releaseLock(); + await contract.teardown?.(); + }); + + it("emits DeviceDisconnected when the underlying link drops", async () => { + await contract.setup?.(); + const transport = await contract.create(); + + const reader = transport.fromDevice.getReader(); + + await contract.triggerDisconnect?.(); + + // As above, read a few events and assert we eventually see "disconnected" + let sawDrop = false; + for (let i = 0; i < 10; i++) { + const { value } = await reader.read(); + if ( + value && + value.type === "status" && + value.data.status === Types.DeviceStatusEnum.DeviceDisconnected + ) { + sawDrop = true; + break; + } + } + expect(sawDrop).toBe(true); + + reader.releaseLock(); + await contract.teardown?.(); + }); + }); +}