Browse Source

Add transport status events (#790)

* Transport status events

Add symbol docs
Emit transport status events
Transport test suite

* Review fixes

* Remove core dependency

* HTTP transport use AbortSignal, error handling in TransportNode

* Improve stream handling

* Update packages/transport-web-serial/src/transport.ts

Co-authored-by: Copilot <[email protected]>

* Fix linting

---------

Co-authored-by: philon- <[email protected]>
Co-authored-by: Copilot <[email protected]>
pull/802/head
Jeremy Gallant 9 months ago
committed by GitHub
parent
commit
d7e32e9b03
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 1
      .vscode/settings.json
  2. 8
      packages/core/src/types.ts
  3. 13
      packages/core/src/utils/transform/decodePacket.ts
  4. 229
      packages/transport-http/src/transport.test.ts
  5. 226
      packages/transport-http/src/transport.ts
  6. 187
      packages/transport-node-serial/src/transport.test.ts
  7. 131
      packages/transport-node-serial/src/transport.ts
  8. 186
      packages/transport-node/src/transport.test.ts
  9. 135
      packages/transport-node/src/transport.ts
  10. 166
      packages/transport-web-bluetooth/src/transport.test.ts
  11. 212
      packages/transport-web-bluetooth/src/transport.ts
  12. 225
      packages/transport-web-serial/src/transport.test.ts
  13. 189
      packages/transport-web-serial/src/transport.ts
  14. 29
      packages/web/vitest.config.ts
  15. 122
      tests/utils/transportContract.ts

1
.vscode/settings.json

@ -6,4 +6,5 @@
"search.exclude": { "search.exclude": {
"**/i18n/locales/*-*/**": true, "**/i18n/locales/*-*/**": true,
}, },
"vitest.workspaceConfig": "vitest.config.ts",
} }

8
packages/core/src/types.ts

@ -10,7 +10,12 @@ interface DebugLog {
data: string; data: string;
} }
export type DeviceOutput = Packet | DebugLog; interface StatusEvent {
type: "status";
data: { status: DeviceStatusEnum; reason?: string };
}
export type DeviceOutput = Packet | DebugLog | StatusEvent;
export interface Transport { export interface Transport {
toDevice: WritableStream<Uint8Array>; toDevice: WritableStream<Uint8Array>;
@ -101,6 +106,7 @@ export enum Emitter {
RemoveNodeByNum = 32, RemoveNodeByNum = 32,
SetCannedMessages = 33, SetCannedMessages = 33,
Disconnect = 34, Disconnect = 34,
ConnectionStatus = 35,
} }
export interface LogEvent { export interface LogEvent {

13
packages/core/src/utils/transform/decodePacket.ts

@ -7,6 +7,19 @@ export const decodePacket = (device: MeshDevice) =>
new WritableStream<DeviceOutput>({ new WritableStream<DeviceOutput>({
write(chunk) { write(chunk) {
switch (chunk.type) { switch (chunk.type) {
case "status": {
const { status, reason } = chunk.data as {
status: Types.DeviceStatusEnum;
reason?: string;
};
device.updateDeviceStatus(status);
device.log.info(
Types.Emitter[Types.Emitter.ConnectionStatus],
`🔗 ${Types.DeviceStatusEnum[status]} ${reason ? `(${reason})` : ""}`,
);
break;
}
case "debug": { case "debug": {
break; break;
} }

229
packages/transport-http/src/transport.test.ts

@ -0,0 +1,229 @@
import { describe, vi, expect, it, beforeEach, afterEach, type MockInstance } from "vitest";
import { runTransportContract } from "../../../tests/utils/transportContract";
import { TransportHTTP } from "./transport";
let abortTimeoutSpy: MockInstance | undefined;
beforeEach(() => {
abortTimeoutSpy = vi.spyOn(
globalThis.AbortSignal as unknown as { timeout(ms: number): AbortSignal },
"timeout",
).mockImplementation((ms: number) => {
const ctrl = new AbortController();
const abort = () =>
ctrl.abort(new DOMException("Timeout reached", "TimeoutError"));
// Uses setTimeout so vi.useFakeTimers() can fast-forward it
setTimeout(abort, ms);
return ctrl.signal;
});
});
afterEach(() => {
abortTimeoutSpy?.mockRestore();
});
function stubFetch() {
const inbox: Uint8Array[] = [];
let lastWritten: ArrayBuffer | undefined;
let forceNextReadToHang = false;
let forceNextReadToReturn500 = false;
function makeAbortAwareHang(signal?: AbortSignal): Promise<Response> {
return new Promise((_, reject) => {
const abort = () => reject(new DOMException("Aborted", "AbortError"));
if (signal?.aborted) {
abort();
return;
}
if (signal) {
signal.addEventListener("abort", abort, { once: true });
}
});
}
const mockFetch = vi.fn(async (url: string, init?: RequestInit) => {
const method = (init?.method ?? "GET").toUpperCase();
if (url.includes("/api/v1/toradio") && method === "OPTIONS") {
return { ok: true, status: 204 } as Response;
}
if (url.includes("/api/v1/toradio") && method === "PUT") {
lastWritten = init?.body as ArrayBuffer;
return { ok: true, status: 200 } as Response;
}
if (url.includes("/api/v1/fromradio") && method === "GET") {
if (forceNextReadToHang) {
forceNextReadToHang = false;
return makeAbortAwareHang(init?.signal ?? undefined);
}
if (forceNextReadToReturn500) {
forceNextReadToReturn500 = false;
return {
ok: false,
status: 500,
arrayBuffer: async () => new ArrayBuffer(0),
} as Response;
}
const next = inbox.shift() ?? new Uint8Array();
return {
ok: true,
status: 200,
arrayBuffer: async () => next.buffer,
} as Response;
}
return { ok: true, status: 200 } as Response;
});
vi.stubGlobal("fetch", mockFetch);
return {
pushIncoming: (u8: Uint8Array) => inbox.push(u8),
assertLastWritten: (u8: Uint8Array) => {
const got = new Uint8Array(lastWritten || new ArrayBuffer(0));
expect(got).toEqual(u8);
},
forceReadErrorOnce: () => {
forceNextReadToReturn500 = true;
},
forceReadTimeoutOnce: () => {
forceNextReadToHang = true;
},
getMock: () => mockFetch,
cleanup: () => vi.unstubAllGlobals(),
};
}
async function tickNextTimer() {
try {
await vi.advanceTimersToNextTimerAsync();
} catch {
await new Promise((r) => setTimeout(r, 5));
}
}
describe("TransportHTTP (contract)", () => {
runTransportContract({
name: "TransportHTTP",
setup: () => {
vi.useFakeTimers();
},
teardown: () => {
vi.useRealTimers();
vi.restoreAllMocks();
vi.unstubAllGlobals();
},
create: async () => {
(globalThis as unknown as { __http: ReturnType<typeof stubFetch> }).__http = stubFetch();
const transport = await TransportHTTP.create("127.0.0.1:80", false);
await tickNextTimer();
return transport;
},
pushIncoming: async (bytes) => {
(globalThis as unknown as { __http: ReturnType<typeof stubFetch> }).__http.pushIncoming(bytes);
await tickNextTimer();
},
assertLastWritten: (bytes) => {
(globalThis as unknown as { __http: ReturnType<typeof stubFetch> }).__http.assertLastWritten(bytes);
},
triggerDisconnect: async () => {
(globalThis as unknown as { __http: ReturnType<typeof stubFetch> }).__http.forceReadErrorOnce();
await tickNextTimer();
},
});
});
describe("TransportHTTP (extras)", () => {
let httpStub: ReturnType<typeof stubFetch> | undefined;
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
vi.restoreAllMocks();
httpStub?.cleanup();
httpStub = undefined;
});
async function createTransport(): Promise<TransportHTTP> {
httpStub = stubFetch();
const transport = await TransportHTTP.create("127.0.0.1:80", false);
await tickNextTimer();
return transport;
}
async function advanceOnePoll() {
await tickNextTimer();
}
it("emits DeviceDisconnected with reason 'read-timeout' when GET /fromradio hangs", async () => {
const transport = await createTransport();
const reader = transport.fromDevice.getReader();
httpStub!.forceReadTimeoutOnce();
await tickNextTimer();
await vi.advanceTimersByTimeAsync(8000);
let sawReadTimeout = false;
for (let i = 0; i < 6; i++) {
const { value } = await reader.read();
if (value?.type === "status" && value.data.reason === "read-timeout") {
sawReadTimeout = true;
break;
}
}
expect(sawReadTimeout).toBe(true);
reader.releaseLock();
await transport.disconnect();
});
it("stops polling after disconnect()", async () => {
const transport = await createTransport();
const fetchMock = httpStub!.getMock();
const callsBeforeDisconnect = fetchMock.mock.calls.length;
await transport.disconnect();
await advanceOnePoll();
await vi.runOnlyPendingTimersAsync();
const callsAfterDisconnect = fetchMock.mock.calls.length;
expect(callsAfterDisconnect).toBe(callsBeforeDisconnect);
});
it("emits DeviceDisconnected with reason 'read-timeout' when GET /fromradio hangs", async () => {
const transport = await createTransport();
const reader = transport.fromDevice.getReader();
httpStub!.forceReadTimeoutOnce();
await vi.advanceTimersToNextTimerAsync();
await vi.advanceTimersByTimeAsync(8000);
await Promise.resolve();
await Promise.resolve();
let sawReadTimeout = false;
for (let i = 0; i < 6; i++) {
const { value } = await reader.read();
if (value?.type === "status" && value.data.reason === "read-timeout") {
sawReadTimeout = true;
break;
}
}
expect(sawReadTimeout).toBe(true);
reader.releaseLock();
await transport.disconnect();
});
});

226
packages/transport-http/src/transport.ts

@ -1,14 +1,49 @@
import type { Types } from "@meshtastic/core"; import { Types } from "@meshtastic/core";
const FETCH_INTERVAL_MS = 3000;
const READ_TIMEOUT_MS = 7000;
const WRITE_TIMEOUT_MS = 4000;
function toArrayBuffer(uint8array: Uint8Array): ArrayBuffer {
if (
uint8array.buffer instanceof ArrayBuffer &&
uint8array.byteOffset === 0 &&
uint8array.byteLength === uint8array.buffer.byteLength
) {
return uint8array.buffer;
}
return uint8array.slice().buffer;
}
/**
* Provides HTTP(S) transport for Meshtastic devices.
*
* Implements {@link Types.Transport} using the device's HTTP API.
* Polls `/api/v1/fromradio` for incoming packets and writes to `/api/v1/toradio`.
*/
export class TransportHTTP implements Types.Transport { export class TransportHTTP implements Types.Transport {
private _toDevice: WritableStream<Uint8Array>; private _toDevice: WritableStream<Uint8Array>;
private _fromDevice: ReadableStream<Types.DeviceOutput>; private _fromDevice: ReadableStream<Types.DeviceOutput>;
private fromDeviceController?: ReadableStreamDefaultController<Types.DeviceOutput>;
private url: string; private url: string;
private receiveBatchRequests: boolean; private receiveBatchRequests: boolean;
private fetchInterval: number; private fetchInterval: number;
private fetching: boolean; private fetching: boolean;
private interval: ReturnType<typeof setInterval> | undefined; private interval: ReturnType<typeof setInterval> | undefined;
private inflightReadController?: AbortController;
private lastStatus: Types.DeviceStatusEnum =
Types.DeviceStatusEnum.DeviceDisconnected;
private closingByUser = false;
/**
* Probe the device and return a connected HTTP transport.
*
* @param address Hostname or IP address (with optional port).
* @param tls Use HTTPS if true, HTTP otherwise.
*/
public static async create( public static async create(
address: string, address: string,
tls?: boolean, tls?: boolean,
@ -17,97 +52,192 @@ export class TransportHTTP implements Types.Transport {
await fetch(`${connectionUrl}/api/v1/toradio`, { await fetch(`${connectionUrl}/api/v1/toradio`, {
method: "OPTIONS", method: "OPTIONS",
}); });
await Promise.resolve();
return new TransportHTTP(connectionUrl); return new TransportHTTP(connectionUrl);
} }
/**
* Construct a new HTTP transport for the given device URL.
*
* @param url Base URL of the device (`http://host:port` or `https://host:port`).
*/
constructor(url: string) { constructor(url: string) {
this.url = url; this.url = url;
this.receiveBatchRequests = false; this.receiveBatchRequests = false;
this.fetchInterval = 3000; this.fetchInterval = FETCH_INTERVAL_MS;
this.fetching = false; this.fetching = false;
this._toDevice = new WritableStream<Uint8Array>({ this._toDevice = new WritableStream<Uint8Array>({
write: async (chunk) => { write: async (chunk) => {
await this.writeToRadio(chunk); try {
await this.writeToRadio(chunk);
} catch (error) {
if (!this.closingByUser) {
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
this.isTimeoutOrAbort(error) ? "write-timeout" : "write-error",
);
}
throw error;
}
}, },
}); });
let controller: ReadableStreamDefaultController<Types.DeviceOutput>;
this._fromDevice = new ReadableStream<Types.DeviceOutput>({ this._fromDevice = new ReadableStream<Types.DeviceOutput>({
start: (ctrl) => { start: (ctrl) => {
controller = ctrl; this.fromDeviceController = ctrl;
this.emitStatus(Types.DeviceStatusEnum.DeviceConnecting);
// Start polling immediately
void this.safePoll();
this.interval = setInterval(
() => void this.safePoll(),
this.fetchInterval,
);
},
cancel: () => {
if (this.interval) {
clearInterval(this.interval);
}
this.interval = undefined;
}, },
}); });
this.interval = setInterval(async () => {
if (this.fetching) {
// We still have the previous request open
return;
}
this.fetching = true;
try {
await this.readFromRadio(controller);
} catch {
// TODO: Emit disconnection events for certain types of errors
}
this.fetching = false;
}, this.fetchInterval);
} }
private async readFromRadio( /** Poll `/api/v1/fromradio` and enqueue incoming packets. */
controller: ReadableStreamDefaultController<Types.DeviceOutput>, private async readFromRadio(): Promise<void> {
): Promise<void> {
let readBuffer = new ArrayBuffer(1); let readBuffer = new ArrayBuffer(1);
while (readBuffer.byteLength > 0) { while (readBuffer.byteLength > 0) {
const response = await fetch( const inflight = new AbortController();
`${this.url}/api/v1/fromradio?all=${ this.inflightReadController = inflight;
this.receiveBatchRequests ? "true" : "false"
}`, const signal = AbortSignal.any([
{ inflight.signal,
method: "GET", AbortSignal.timeout(READ_TIMEOUT_MS),
headers: { ]);
Accept: "application/x-protobuf",
try {
const response = await fetch(
`${this.url}/api/v1/fromradio?all=${this.receiveBatchRequests ? "true" : "false"}`,
{
method: "GET",
headers: { Accept: "application/x-protobuf" },
signal,
}, },
}, );
); if (!response.ok) {
throw new Error(
`fromradio ${response.status} ${response.statusText}`,
);
}
this.emitStatus(Types.DeviceStatusEnum.DeviceConnected);
readBuffer = await response.arrayBuffer(); readBuffer = await response.arrayBuffer();
if (readBuffer.byteLength > 0) { if (readBuffer.byteLength > 0) {
controller.enqueue({ this.fromDeviceController?.enqueue({
type: "packet", type: "packet",
data: new Uint8Array(readBuffer), data: new Uint8Array(readBuffer),
}); });
}
} finally {
this.inflightReadController = undefined;
} }
} }
} }
/** Write a protobuf-encoded request to `/api/v1/toradio`. */
private async writeToRadio(data: Uint8Array): Promise<void> { private async writeToRadio(data: Uint8Array): Promise<void> {
await fetch(`${this.url}/api/v1/toradio`, { try {
method: "PUT", const response = await fetch(`${this.url}/api/v1/toradio`, {
headers: { method: "PUT",
"Content-Type": "application/x-protobuf", headers: { "Content-Type": "application/x-protobuf" },
}, body: toArrayBuffer(data),
body: data, signal: AbortSignal.timeout(WRITE_TIMEOUT_MS),
}); });
if (!response.ok) {
throw new Error(`toradio ${response.status} ${response.statusText}`);
}
} catch (error) {
if (!this.closingByUser) {
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
this.isTimeoutOrAbort(error) ? "write-timeout" : "write-error",
);
}
throw error;
}
} }
/** Writable stream of bytes to the device. */
get toDevice(): WritableStream<Uint8Array> { get toDevice(): WritableStream<Uint8Array> {
return this._toDevice; return this._toDevice;
} }
/** Readable stream of {@link Types.DeviceOutput} from the device. */
get fromDevice(): ReadableStream<Types.DeviceOutput> { get fromDevice(): ReadableStream<Types.DeviceOutput> {
return this._fromDevice; return this._fromDevice;
} }
/**
* Stop polling and emit `DeviceDisconnected("user")`.
*/
disconnect(): Promise<void> { disconnect(): Promise<void> {
this.fetching = false; this.closingByUser = true;
if (this.interval) { if (this.interval) {
clearInterval(this.interval); clearInterval(this.interval);
} }
this.interval = undefined; this.interval = undefined;
this.fetching = false;
try {
this.inflightReadController?.abort();
} catch {}
this.inflightReadController = undefined;
this.emitStatus(Types.DeviceStatusEnum.DeviceDisconnected, "user");
return Promise.resolve(); return Promise.resolve();
} }
private emitStatus(next: Types.DeviceStatusEnum, reason?: string): void {
if (next === this.lastStatus) {
return;
}
this.lastStatus = next;
this.fromDeviceController?.enqueue({
type: "status",
data: { status: next, reason },
});
}
private isTimeoutOrAbort(err: unknown): boolean {
return (
(err instanceof DOMException &&
(err.name === "AbortError" || err.name === "TimeoutError")) ||
(err instanceof Error &&
(err.name === "AbortError" || err.name === "TimeoutError"))
);
}
private async safePoll(): Promise<void> {
if (this.fetching) {
return;
}
this.fetching = true;
try {
await this.readFromRadio();
} catch (error) {
if (!this.closingByUser) {
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
this.isTimeoutOrAbort(error) ? "read-timeout" : "read-error",
);
}
} finally {
this.fetching = false;
}
}
} }

187
packages/transport-node-serial/src/transport.test.ts

@ -0,0 +1,187 @@
import { describe, vi, expect, beforeEach, afterEach, it } from "vitest";
import { Duplex } from "node:stream";
import type { SerialPort } from "serialport";
import { Types, Utils } from "@meshtastic/core";
import { runTransportContract } from "../../../tests/utils/transportContract";
import { TransportNodeSerial } from "./transport";
function isStatusEvent(
output: Types.DeviceOutput | undefined,
): output is Extract<Types.DeviceOutput, { type: "status" }> {
return output !== undefined && output.type === "status";
}
class FakeSerialPort extends Duplex {
public lastWritten: Uint8Array | undefined;
constructor() {
super({ objectMode: false });
}
_read() {}
_write(
chunk: Buffer,
_encoding: BufferEncoding,
callback: (error?: Error | null) => void,
) {
this.lastWritten = new Uint8Array(
chunk.buffer,
chunk.byteOffset,
chunk.byteLength,
);
callback();
}
pushIncoming(data: Uint8Array) {
const buf = Buffer.from(data.buffer, data.byteOffset, data.byteLength);
this.push(buf);
}
emitErrorOnce(message = "simulated serial error") {
this.emit("error", new Error(message));
}
emitClose() {
this.emit("close");
}
close() {
this.destroy();
this.emit("close");
}
}
function stubCoreTransforms() {
const toDevice = new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
controller.enqueue(chunk);
},
});
const fromDeviceFactory = () =>
new TransformStream<Uint8Array, Types.DeviceOutput>({
transform(chunk, controller) {
controller.enqueue({ type: "packet", data: chunk });
},
});
// Utils.toDeviceStream is a getter
vi.spyOn(Utils, "toDeviceStream", "get").mockReturnValue(
toDevice as unknown as typeof Utils.toDeviceStream,
);
vi.spyOn(Utils, "fromDeviceStream").mockImplementation(
() =>
fromDeviceFactory() as unknown as TransformStream<
Uint8Array,
Types.DeviceOutput
>,
);
return {
restore: () => vi.restoreAllMocks(),
};
}
describe("TransportNodeSerial (contract)", () => {
let transformsStub: { restore: () => void } | undefined;
beforeEach(() => {
transformsStub = stubCoreTransforms();
});
afterEach(() => {
transformsStub?.restore();
});
runTransportContract({
name: "TransportNodeSerial",
setup: () => {},
teardown: () => {
vi.restoreAllMocks();
},
create: async () => {
const fakePort = new FakeSerialPort();
const transport = new TransportNodeSerial(
fakePort as unknown as SerialPort,
);
await Promise.resolve();
(globalThis as unknown as { __fakePort: FakeSerialPort }).__fakePort =
fakePort;
return transport;
},
pushIncoming: async (bytes) => {
(globalThis as unknown as { __fakePort: FakeSerialPort }).__fakePort.pushIncoming(
bytes,
);
await Promise.resolve();
},
assertLastWritten: (bytes) => {
const port =
(globalThis as unknown as { __fakePort: FakeSerialPort }).__fakePort;
expect(port.lastWritten).toBeDefined();
expect(port.lastWritten).toEqual(bytes);
},
triggerDisconnect: async () => {
(globalThis as unknown as { __fakePort: FakeSerialPort }).__fakePort.emitErrorOnce(
"test-disconnect",
);
await Promise.resolve();
},
});
});
describe("TransportNodeSerial (extras)", () => {
let transformsStub: { restore: () => void } | undefined;
beforeEach(() => {
transformsStub = stubCoreTransforms();
});
afterEach(() => {
transformsStub?.restore();
});
it("emits DeviceDisconnected with reason 'port-closed' on close event", async () => {
const fakePort = new FakeSerialPort();
const transport = new TransportNodeSerial(
fakePort as unknown as SerialPort,
);
const reader = transport.fromDevice.getReader();
await Promise.resolve();
const first = await reader.read();
expect(isStatusEvent(first.value)).toBe(true);
if (isStatusEvent(first.value)) {
expect(first.value.data.status).toBe(
Types.DeviceStatusEnum.DeviceConnecting,
);
}
const second = await reader.read();
expect(isStatusEvent(second.value)).toBe(true);
if (isStatusEvent(second.value)) {
expect(second.value.data.status).toBe(
Types.DeviceStatusEnum.DeviceConnected,
);
}
fakePort.emitClose();
await Promise.resolve();
let sawClosed = false;
for (let i = 0; i < 6; i++) {
const { value } = await reader.read();
if (isStatusEvent(value) && value.data.reason === "port-closed") {
sawClosed = true;
break;
}
}
expect(sawClosed).toBe(true);
reader.releaseLock();
await transport.disconnect();
});
});

131
packages/transport-node-serial/src/transport.ts

@ -1,17 +1,29 @@
import { Readable, Writable } from "node:stream"; import { Readable, Writable } from "node:stream";
import type { Types } from "@meshtastic/core"; import { Types, Utils } from "@meshtastic/core";
import { Utils } from "@meshtastic/core";
import { SerialPort } from "serialport"; import { SerialPort } from "serialport";
/**
* Node.js Serial transport for Meshtastic.
*
* Implements {@link Types.Transport} on top of a Node `SerialPort`.
* Use {@link TransportNodeSerial.create} for a convenient factory, or
* `new TransportNodeSerial(port)` if you already have an open port.
*/
export class TransportNodeSerial implements Types.Transport { export class TransportNodeSerial implements Types.Transport {
private readonly _toDevice: WritableStream<Uint8Array>; private readonly _toDevice: WritableStream<Uint8Array>;
private readonly _fromDevice: ReadableStream<Types.DeviceOutput>; private readonly _fromDevice: ReadableStream<Types.DeviceOutput>;
private fromDeviceController?: ReadableStreamDefaultController<Types.DeviceOutput>;
private port: SerialPort | undefined; private port: SerialPort | undefined;
private pipePromise?: Promise<void>;
private abortController: AbortController;
private lastStatus: Types.DeviceStatusEnum =
Types.DeviceStatusEnum.DeviceDisconnected;
private closingByUser = false;
/** /**
* Creates and connects a new TransportNode instance. * Creates and connects a new TransportNode instance.
* @param path - Path to the serial device * @param path - Path to the serial device
* @param baudRate - The port number for the TCP connection (defaults to 4403). * @param baudRate - Baud rate for the serial connection (default is 115200).
* @returns A promise that resolves with a connected TransportNode instance. * @returns A promise that resolves with a connected TransportNode instance.
*/ */
public static create( public static create(
@ -46,23 +58,78 @@ export class TransportNodeSerial implements Types.Transport {
this.port = port; this.port = port;
this.port.on("error", (err) => { this.port.on("error", (err) => {
console.error("Serial port connection error:", err); console.error("Serial port connection error:", err);
this.emitStatus(Types.DeviceStatusEnum.DeviceDisconnected, "port-error");
});
this.port.on("close", () => {
if (this.closingByUser) {
return;
}
this.emitStatus(Types.DeviceStatusEnum.DeviceDisconnected, "port-closed");
}); });
const fromDeviceSource = Readable.toWeb(port) as ReadableStream<Uint8Array>; const fromDeviceSource = Readable.toWeb(port) as ReadableStream<Uint8Array>;
this._fromDevice = fromDeviceSource.pipeThrough(Utils.fromDeviceStream()); const transformed = fromDeviceSource.pipeThrough(Utils.fromDeviceStream());
this.abortController = new AbortController();
const controller = this.abortController;
this._fromDevice = new ReadableStream<Types.DeviceOutput>({
start: async (ctrl) => {
this.fromDeviceController = ctrl;
this.emitStatus(Types.DeviceStatusEnum.DeviceConnecting);
this.emitStatus(Types.DeviceStatusEnum.DeviceConnected);
const reader = transformed.getReader();
try {
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
ctrl.enqueue(value);
}
ctrl.close();
} catch (error) {
if (this.closingByUser) {
ctrl.close(); // graceful EOF on user
} else {
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
"read-error",
);
ctrl.error(
error instanceof Error ? error : new Error(String(error)),
);
}
try {
await transformed.cancel();
} catch {}
} finally {
reader.releaseLock();
}
},
});
// Stream for data going FROM the application TO the Meshtastic device. // Stream for data going FROM the application TO the Meshtastic device.
const toDeviceTransform = Utils.toDeviceStream; this._toDevice = Utils.toDeviceStream.writable;
this._toDevice = toDeviceTransform.writable;
this.pipePromise = Utils.toDeviceStream.readable
// The readable end of the transform is then piped to the Node.js SerialPort connection. .pipeTo(Writable.toWeb(port) as WritableStream<Uint8Array>, {
// A similar assertion is needed here because `Writable.toWeb` also returns signal: controller.signal,
// a generically typed stream (`WritableStream<any>`). })
toDeviceTransform.readable .catch((error) => {
.pipeTo(Writable.toWeb(port) as WritableStream<Uint8Array>) if (controller.signal.aborted || this.closingByUser) {
.catch((err) => { return;
console.error("Error piping data to serial port:", err); }
this.port.close(err as Error); console.error("Error piping data to serial port:", error);
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
"write-error",
);
try {
this.port?.close();
} catch {}
}); });
} }
@ -80,9 +147,35 @@ export class TransportNodeSerial implements Types.Transport {
return this._fromDevice; return this._fromDevice;
} }
disconnect() { /**
this.port.close(); * Disconnect from the serial port and emit `DeviceDisconnected("user")`.
this.port = undefined; * Safe to call multiple times.
return Promise.resolve(); */
async disconnect() {
try {
this.closingByUser = true;
this.emitStatus(Types.DeviceStatusEnum.DeviceDisconnected, "user");
this.abortController?.abort();
await this.pipePromise?.catch(() => {});
try {
this.port?.close();
} catch {}
} finally {
this.port = undefined;
this.closingByUser = false;
}
}
private emitStatus(next: Types.DeviceStatusEnum, reason?: string): void {
if (next === this.lastStatus) {
return;
}
this.lastStatus = next;
this.fromDeviceController?.enqueue({
type: "status",
data: { status: next, reason },
});
} }
} }

186
packages/transport-node/src/transport.test.ts

@ -0,0 +1,186 @@
import { describe, vi, expect, beforeEach, afterEach, it } from "vitest";
import { Duplex } from "node:stream";
import type { Socket } from "node:net";
import { runTransportContract } from "../../../tests/utils/transportContract";
import { TransportNode } from "./transport";
import { Utils, Types } from "@meshtastic/core";
function isStatusEvent(
out: Types.DeviceOutput | undefined,
): out is Extract<Types.DeviceOutput, { type: "status" }> {
return !!out && (out as any).type === "status";
}
class FakeSocket extends Duplex {
public lastWritten: Uint8Array | undefined;
constructor() {
super({ objectMode: false });
}
_read() {}
_write(
chunk: Buffer,
_encoding: BufferEncoding,
callback: (error?: Error | null) => void,
) {
this.lastWritten = new Uint8Array(
chunk.buffer,
chunk.byteOffset,
chunk.byteLength,
);
callback();
}
pushIncoming(data: Uint8Array) {
const buf = Buffer.from(data.buffer, data.byteOffset, data.byteLength);
this.push(buf);
}
emitErrorOnce(message = "simulated error") {
this.emit("error", new Error(message));
}
emitClose() {
this.emit("close");
}
override destroy(error?: Error) {
super.destroy(error);
this.emit("close");
return this;
}
}
function stubCoreTransforms() {
const toDevice = new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
controller.enqueue(chunk);
},
});
const fromDeviceFactory = () =>
new TransformStream<Uint8Array, Types.DeviceOutput>({
transform(chunk, controller) {
controller.enqueue({ type: "packet", data: chunk });
},
});
vi.spyOn(Utils, "toDeviceStream", "get").mockReturnValue(
toDevice as unknown as typeof Utils.toDeviceStream,
);
vi
.spyOn(Utils, "fromDeviceStream")
.mockImplementation(
() =>
fromDeviceFactory() as unknown as TransformStream<
Uint8Array,
Types.DeviceOutput
>,
);
return {
restore: () => vi.restoreAllMocks(),
};
}
describe("TransportNode (contract)", () => {
let transformsStub: { restore: () => void } | undefined;
beforeEach(() => {
transformsStub = stubCoreTransforms();
});
afterEach(() => {
transformsStub?.restore();
});
runTransportContract({
name: "TransportNode",
setup: () => {},
teardown: () => {
vi.restoreAllMocks();
},
create: async () => {
const fakeSocket = new FakeSocket();
const transport = new TransportNode(fakeSocket as unknown as Socket);
await Promise.resolve();
(globalThis as unknown as { __nodeSock: FakeSocket }).__nodeSock =
fakeSocket;
return transport;
},
pushIncoming: async (bytes) => {
(globalThis as unknown as { __nodeSock: FakeSocket }).__nodeSock.pushIncoming(
bytes,
);
await Promise.resolve();
},
assertLastWritten: (bytes) => {
const sock = (globalThis as unknown as { __nodeSock: FakeSocket })
.__nodeSock;
expect(sock.lastWritten).toBeDefined();
expect(sock.lastWritten).toEqual(bytes);
},
triggerDisconnect: async () => {
(globalThis as unknown as { __nodeSock: FakeSocket }).__nodeSock.emitErrorOnce(
"test-disconnect",
);
await Promise.resolve();
},
});
});
describe("TransportNode (extras)", () => {
let transformsStub: { restore: () => void } | undefined;
beforeEach(() => {
transformsStub = stubCoreTransforms();
});
afterEach(() => {
transformsStub?.restore();
});
it("emits DeviceDisconnected with reason 'socket-closed' on close event", async () => {
const fakeSocket = new FakeSocket();
const transport = new TransportNode(fakeSocket as unknown as Socket);
const reader = transport.fromDevice.getReader();
await Promise.resolve();
const first = await reader.read();
expect(isStatusEvent(first.value)).toBe(true);
if (isStatusEvent(first.value)) {
expect(first.value.data.status).toBe(
Types.DeviceStatusEnum.DeviceConnecting,
);
}
const second = await reader.read();
expect(isStatusEvent(second.value)).toBe(true);
if (isStatusEvent(second.value)) {
expect(second.value.data.status).toBe(
Types.DeviceStatusEnum.DeviceConnected,
);
}
fakeSocket.emitClose();
await Promise.resolve();
let sawClosed = false;
for (let i = 0; i < 6; i++) {
const { value } = await reader.read();
if (isStatusEvent(value) && value.data.reason === "socket-closed") {
sawClosed = true;
break;
}
}
expect(sawClosed).toBe(true);
reader.releaseLock();
await transport.disconnect();
});
});

135
packages/transport-node/src/transport.ts

@ -1,12 +1,25 @@
import { Socket } from "node:net"; import { Socket } from "node:net";
import { Readable, Writable } from "node:stream"; import { Readable, Writable } from "node:stream";
import type { Types } from "@meshtastic/core"; import { Types, Utils } from "@meshtastic/core";
import { Utils } from "@meshtastic/core";
/**
* Node.js TCP transport for Meshtastic.
*
* Implements {@link Types.Transport} on top of a Node `net.Socket`.
* Use {@link TransportNode.create} to open a new connection, or
* construct directly with an existing socket.
*/
export class TransportNode implements Types.Transport { export class TransportNode implements Types.Transport {
private readonly _toDevice: WritableStream<Uint8Array>; private readonly _toDevice: WritableStream<Uint8Array>;
private readonly _fromDevice: ReadableStream<Types.DeviceOutput>; private readonly _fromDevice: ReadableStream<Types.DeviceOutput>;
private fromDeviceController?: ReadableStreamDefaultController<Types.DeviceOutput>;
private socket: Socket | undefined; private socket: Socket | undefined;
private pipePromise?: Promise<void>;
private abortController: AbortController;
private lastStatus: Types.DeviceStatusEnum =
Types.DeviceStatusEnum.DeviceDisconnected;
private closingByUser = false;
/** /**
* Creates and connects a new TransportNode instance. * Creates and connects a new TransportNode instance.
@ -38,47 +51,131 @@ export class TransportNode implements Types.Transport {
*/ */
constructor(connection: Socket) { constructor(connection: Socket) {
this.socket = connection; this.socket = connection;
this.socket.on("error", (err) => { this.socket.on("error", (err) => {
console.error("Socket connection error:", err); console.error("Socket connection error:", err);
if (!this.closingByUser) {
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
"socket-error",
);
}
});
this.socket.on("close", () => {
if (this.closingByUser) {
return; // suppress close-derived disconnect in user flow
}
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
"socket-closed",
);
}); });
this.abortController = new AbortController();
const abortController = this.abortController;
const fromDeviceSource = Readable.toWeb( const fromDeviceSource = Readable.toWeb(
connection, connection,
) as ReadableStream<Uint8Array>; ) as ReadableStream<Uint8Array>;
this._fromDevice = fromDeviceSource.pipeThrough(Utils.fromDeviceStream()); const transformed = fromDeviceSource.pipeThrough(Utils.fromDeviceStream());
this._fromDevice = new ReadableStream<Types.DeviceOutput>({
start: async (ctrl) => {
this.fromDeviceController = ctrl;
this.emitStatus(Types.DeviceStatusEnum.DeviceConnecting);
this.emitStatus(Types.DeviceStatusEnum.DeviceConnected);
const reader = transformed.getReader();
try {
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
ctrl.enqueue(value);
}
ctrl.close();
} catch (error) {
if (this.closingByUser) {
ctrl.close();
} else {
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
"read-error",
);
ctrl.error(
error instanceof Error ? error : new Error(String(error)),
);
}
try {
await transformed.cancel();
} catch {}
} finally {
reader.releaseLock();
}
},
});
// Stream for data going FROM the application TO the Meshtastic device. // Stream for data going FROM the application TO the Meshtastic device.
const toDeviceTransform = Utils.toDeviceStream; const toDeviceTransform = Utils.toDeviceStream;
this._toDevice = toDeviceTransform.writable; this._toDevice = toDeviceTransform.writable;
// The readable end of the transform is then piped to the Node.js socket. this.pipePromise = toDeviceTransform.readable
// A similar assertion is needed here because `Writable.toWeb` also returns .pipeTo(Writable.toWeb(connection) as WritableStream<Uint8Array>, {
// a generically typed stream (`WritableStream<any>`). signal: abortController.signal,
toDeviceTransform.readable })
.pipeTo(Writable.toWeb(connection) as WritableStream<Uint8Array>)
.catch((err) => { .catch((err) => {
if (abortController.signal.aborted) {
return;
}
console.error("Error piping data to socket:", err); console.error("Error piping data to socket:", err);
this.socket.destroy(err as Error); const error = err instanceof Error ? err : new Error(String(err));
this.socket?.destroy(error);
}); });
} }
/** /** WritableStream to send data to the device. */
* The WritableStream to send data to the Meshtastic device.
*/
public get toDevice(): WritableStream<Uint8Array> { public get toDevice(): WritableStream<Uint8Array> {
return this._toDevice; return this._toDevice;
} }
/** /** ReadableStream to receive data from the device. */
* The ReadableStream to receive data from the Meshtastic device.
*/
public get fromDevice(): ReadableStream<Types.DeviceOutput> { public get fromDevice(): ReadableStream<Types.DeviceOutput> {
return this._fromDevice; return this._fromDevice;
} }
disconnect() { /**
this.socket.destroy(); * Disconnect from the TCP socket and emit `DeviceDisconnected("user")`.
this.socket = undefined; * Safe to call multiple times.
return Promise.resolve(); */
async disconnect(): Promise<void> {
try {
this.closingByUser = true;
this.emitStatus(Types.DeviceStatusEnum.DeviceDisconnected, "user");
this.abortController.abort();
if (this.pipePromise) {
await this.pipePromise;
}
this.socket?.destroy();
} finally {
this.socket = undefined;
this.closingByUser = false;
}
}
private emitStatus(next: Types.DeviceStatusEnum, reason?: string): void {
if (next === this.lastStatus) {
return;
}
this.lastStatus = next;
this.fromDeviceController?.enqueue({
type: "status",
data: { status: next, reason },
});
} }
} }

166
packages/transport-web-bluetooth/src/transport.test.ts

@ -0,0 +1,166 @@
import { describe, vi, expect, beforeEach, afterEach } from "vitest";
import { runTransportContract } from "../../../tests/utils/transportContract";
import { TransportWebBluetooth } from "./transport";
class MiniEmitter {
private listeners = new Map<string, Set<(e: Event) => void>>();
addEventListener(type: string, listener: (e: Event) => void) {
if (!this.listeners.has(type)) this.listeners.set(type, new Set());
this.listeners.get(type)!.add(listener);
}
removeEventListener(type: string, listener: (e: Event) => void) {
this.listeners.get(type)?.delete(listener);
}
dispatchEvent(event: Event) {
this.listeners.get(event.type)?.forEach((l) => l(event));
}
}
function stubWebBluetooth() {
const incomingQueue: Uint8Array[] = [];
let lastWritten: Uint8Array | undefined;
// fromRadioCharacteristic: read bytes from queue, one buffer per read
const fromRadioCharacteristic: BluetoothRemoteGATTCharacteristic = {
async readValue() {
const next = incomingQueue.shift() ?? new Uint8Array();
return new DataView(
next.buffer,
next.byteOffset,
next.byteLength,
) as unknown as DataView;
},
addEventListener() {},
removeEventListener() {},
} as unknown as BluetoothRemoteGATTCharacteristic;
// characteristicvaluechanged event plumbing (fromNumCharacteristic)
const charEmitter = new MiniEmitter();
const fromNumCharacteristic: BluetoothRemoteGATTCharacteristic = {
async startNotifications() {
return this;
},
addEventListener(type: string, listener: (e: Event) => void) {
charEmitter.addEventListener(type, listener);
},
removeEventListener(type: string, listener: (e: Event) => void) {
charEmitter.removeEventListener(type, listener);
},
} as unknown as BluetoothRemoteGATTCharacteristic;
const toRadioCharacteristic: BluetoothRemoteGATTCharacteristic = {
async writeValue(bufferSource: BufferSource) {
const u8 =
bufferSource instanceof ArrayBuffer
? new Uint8Array(bufferSource)
: new Uint8Array(
bufferSource.buffer,
bufferSource.byteOffset,
bufferSource.byteLength,
);
lastWritten = new Uint8Array(u8);
},
} as unknown as BluetoothRemoteGATTCharacteristic;
// Primary service returns our three characteristics by UUID
const primaryService: BluetoothRemoteGATTService = {
async getCharacteristic(uuid: string) {
if (uuid === TransportWebBluetooth.ToRadioUuid) return toRadioCharacteristic;
if (uuid === TransportWebBluetooth.FromRadioUuid) return fromRadioCharacteristic;
if (uuid === TransportWebBluetooth.FromNumUuid) return fromNumCharacteristic;
throw new Error("Unknown characteristic: " + uuid);
},
} as unknown as BluetoothRemoteGATTService;
// Device-level emitter to deliver gattserverdisconnected
const deviceEmitter = new MiniEmitter();
// GATT server with readonly connected
let isConnected = true;
const gattServer: BluetoothRemoteGATTServer = {
get connected() {
return isConnected;
},
async connect() {
isConnected = true;
return gattServer;
},
disconnect() {
isConnected = false;
deviceEmitter.dispatchEvent(new Event("gattserverdisconnected"));
},
async getPrimaryService() {
return primaryService;
},
device: {
addEventListener: (...args: Parameters<EventTarget["addEventListener"]>) =>
deviceEmitter.addEventListener(args[0] as string, args[1] as (e: Event) => void),
removeEventListener: (...args: Parameters<EventTarget["removeEventListener"]>) =>
deviceEmitter.removeEventListener(args[0] as string, args[1] as (e: Event) => void),
} as unknown as BluetoothDevice,
} as unknown as BluetoothRemoteGATTServer;
const fakeDevice: BluetoothDevice = {
async watchAdvertisements() {},
gatt: gattServer,
} as unknown as BluetoothDevice;
const fakeNavigator = {
bluetooth: {
async requestDevice() {
return fakeDevice;
},
},
};
vi.stubGlobal("navigator", Object.assign({}, globalThis.navigator, fakeNavigator));
// helper actions for tests/contract
return {
pushIncoming: (u8: Uint8Array) => {
incomingQueue.push(u8);
charEmitter.dispatchEvent(new Event("characteristicvaluechanged"));
},
assertLastWritten: (u8: Uint8Array) => {
expect(lastWritten).toBeDefined();
expect(lastWritten).toEqual(u8);
},
// simulate underlying link drop (OS-level disconnect)
triggerGattDisconnect: () => {
isConnected = false;
deviceEmitter.dispatchEvent(new Event("gattserverdisconnected"));
},
cleanup: () => {
vi.unstubAllGlobals();
},
};
}
describe("TransportWebBluetooth (contract)", () => {
runTransportContract({
name: "TransportWebBluetooth",
setup: () => {},
teardown: () => {
(globalThis as unknown as { __ble?: ReturnType<typeof stubWebBluetooth> }).__ble?.cleanup();
(globalThis as unknown as { __ble?: ReturnType<typeof stubWebBluetooth> }).__ble = undefined;
vi.restoreAllMocks();
vi.unstubAllGlobals();
},
create: async () => {
(globalThis as unknown as { __ble: ReturnType<typeof stubWebBluetooth> }).__ble = stubWebBluetooth();
return await TransportWebBluetooth.create();
},
pushIncoming: async (bytes) => {
(globalThis as unknown as { __ble: ReturnType<typeof stubWebBluetooth> }).__ble.pushIncoming(bytes);
await Promise.resolve();
},
assertLastWritten: (bytes) => {
(globalThis as unknown as { __ble: ReturnType<typeof stubWebBluetooth> }).__ble.assertLastWritten(bytes);
},
triggerDisconnect: async () => {
(globalThis as unknown as { __ble: ReturnType<typeof stubWebBluetooth> }).__ble.triggerGattDisconnect();
await Promise.resolve();
},
});
});

212
packages/transport-web-bluetooth/src/transport.ts

@ -1,21 +1,63 @@
import type { Types } from "@meshtastic/core"; import { Types } from "@meshtastic/core";
function toArrayBuffer(uint8array: Uint8Array): ArrayBuffer {
if (
uint8array.buffer instanceof ArrayBuffer &&
uint8array.byteOffset === 0 &&
uint8array.byteLength === uint8array.buffer.byteLength
) {
return uint8array.buffer;
}
return uint8array.slice().buffer;
}
/**
* Provides Web Bluetooth transport for Meshtastic devices.
*
* Implements the {@link Types.Transport} contract using the Web Bluetooth API.
* Use {@link TransportWebBluetooth.create} or {@link TransportWebBluetooth.createFromDevice}
* to construct an instance.
*/
export class TransportWebBluetooth implements Types.Transport { export class TransportWebBluetooth implements Types.Transport {
private _toDevice: WritableStream<Uint8Array>; private _toDevice: WritableStream<Uint8Array>;
private _fromDevice: ReadableStream<Types.DeviceOutput>; private _fromDevice: ReadableStream<Types.DeviceOutput>;
private _fromDeviceController?: ReadableStreamDefaultController<Types.DeviceOutput>; private fromDeviceController?: ReadableStreamDefaultController<Types.DeviceOutput>;
private _isFirstWrite = true;
private toRadioCharacteristic: BluetoothRemoteGATTCharacteristic; private toRadioCharacteristic: BluetoothRemoteGATTCharacteristic;
private fromRadioCharacteristic: BluetoothRemoteGATTCharacteristic; private fromRadioCharacteristic: BluetoothRemoteGATTCharacteristic;
private fromNumCharacteristic: BluetoothRemoteGATTCharacteristic; private fromNumCharacteristic: BluetoothRemoteGATTCharacteristic;
private gattServer: BluetoothRemoteGATTServer; private gattServer: BluetoothRemoteGATTServer;
private lastStatus: Types.DeviceStatusEnum =
Types.DeviceStatusEnum.DeviceDisconnected;
private closingByUser = false;
private reading = false;
/** UUID for the "toRadio" write characteristic. */
static ToRadioUuid = "f75c76d2-129e-4dad-a1dd-7866124401e7"; static ToRadioUuid = "f75c76d2-129e-4dad-a1dd-7866124401e7";
/** UUID for the "fromRadio" read characteristic. */
static FromRadioUuid = "2c55e69e-4993-11ed-b878-0242ac120002"; static FromRadioUuid = "2c55e69e-4993-11ed-b878-0242ac120002";
/** UUID for the "fromNum" notification characteristic. */
static FromNumUuid = "ed9da18c-a800-4f66-a670-aa7547e34453"; static FromNumUuid = "ed9da18c-a800-4f66-a670-aa7547e34453";
/** UUID for the Meshtastic GATT service. */
static ServiceUuid = "6ba1b218-15a8-461f-9fa8-5dcae273eafd"; static ServiceUuid = "6ba1b218-15a8-461f-9fa8-5dcae273eafd";
private onGattDisconnected = () => {
if (this.closingByUser) {
return;
}
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
"gatt-disconnected",
);
};
private onFromNumChanged = () => {
void this.readFromRadio();
};
/**
* Prompts the user to select a Bluetooth device, connects it, and returns a transport.
*/
public static async create(): Promise<TransportWebBluetooth> { public static async create(): Promise<TransportWebBluetooth> {
const device = await navigator.bluetooth.requestDevice({ const device = await navigator.bluetooth.requestDevice({
filters: [{ services: [TransportWebBluetooth.ServiceUuid] }], filters: [{ services: [TransportWebBluetooth.ServiceUuid] }],
@ -23,17 +65,25 @@ export class TransportWebBluetooth implements Types.Transport {
return await TransportWebBluetooth.prepareConnection(device); return await TransportWebBluetooth.prepareConnection(device);
} }
/**
* Creates a transport from an existing, user-provided {@link BluetoothDevice}.
*/
public static async createFromDevice( public static async createFromDevice(
device: BluetoothDevice, device: BluetoothDevice,
): Promise<TransportWebBluetooth> { ): Promise<TransportWebBluetooth> {
return await TransportWebBluetooth.prepareConnection(device); return await TransportWebBluetooth.prepareConnection(device);
} }
/**
* Prepares and connects to a {@link BluetoothDevice}, resolving its GATT server
* and characteristics, then returning a transport.
*
* @throws if required services or characteristics are missing.
*/
public static async prepareConnection( public static async prepareConnection(
device: BluetoothDevice, device: BluetoothDevice,
): Promise<TransportWebBluetooth> { ): Promise<TransportWebBluetooth> {
const gattServer = await device.gatt?.connect(); const gattServer = await device.gatt?.connect();
if (!gattServer) { if (!gattServer) {
throw new Error("Failed to connect to GATT server"); throw new Error("Failed to connect to GATT server");
} }
@ -41,7 +91,6 @@ export class TransportWebBluetooth implements Types.Transport {
const service = await gattServer.getPrimaryService( const service = await gattServer.getPrimaryService(
TransportWebBluetooth.ServiceUuid, TransportWebBluetooth.ServiceUuid,
); );
const toRadioCharacteristic = await service.getCharacteristic( const toRadioCharacteristic = await service.getCharacteristic(
TransportWebBluetooth.ToRadioUuid, TransportWebBluetooth.ToRadioUuid,
); );
@ -60,8 +109,6 @@ export class TransportWebBluetooth implements Types.Transport {
throw new Error("Failed to find required characteristics"); throw new Error("Failed to find required characteristics");
} }
console.log("Connected to device", device.name);
return new TransportWebBluetooth( return new TransportWebBluetooth(
toRadioCharacteristic, toRadioCharacteristic,
fromRadioCharacteristic, fromRadioCharacteristic,
@ -70,6 +117,10 @@ export class TransportWebBluetooth implements Types.Transport {
); );
} }
/**
* Create a transport from resolved GATT characteristics and server.
* Prefer using the static factory methods instead.
*/
constructor( constructor(
toRadioCharacteristic: BluetoothRemoteGATTCharacteristic, toRadioCharacteristic: BluetoothRemoteGATTCharacteristic,
fromRadioCharacteristic: BluetoothRemoteGATTCharacteristic, fromRadioCharacteristic: BluetoothRemoteGATTCharacteristic,
@ -81,66 +132,135 @@ export class TransportWebBluetooth implements Types.Transport {
this.fromNumCharacteristic = fromNumCharacteristic; this.fromNumCharacteristic = fromNumCharacteristic;
this.gattServer = gattServer; this.gattServer = gattServer;
this._fromDevice = new ReadableStream({ this._fromDevice = new ReadableStream<Types.DeviceOutput>({
start: (ctrl) => { start: async (ctrl) => {
this._fromDeviceController = ctrl; this.fromDeviceController = ctrl;
}, this.emitStatus(Types.DeviceStatusEnum.DeviceConnecting);
});
this._toDevice = new WritableStream({ this.gattServer.device.addEventListener(
write: async (chunk) => { "gattserverdisconnected",
await this.toRadioCharacteristic.writeValue(chunk); this.onGattDisconnected,
);
if (this._isFirstWrite && this._fromDeviceController) { try {
this._isFirstWrite = false; await this.fromNumCharacteristic.startNotifications();
this.fromNumCharacteristic.addEventListener(
setTimeout(() => { "characteristicvaluechanged",
// biome-ignore lint/style/noNonNullAssertion: we know this will be set this.onFromNumChanged,
this.readFromRadio(this._fromDeviceController!); );
}, 50); this.emitStatus(Types.DeviceStatusEnum.DeviceConnected);
// prime once in case data already queued
void this.readFromRadio();
} catch {
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
"notify-failed",
);
this.gattServer.device.removeEventListener(
"gattserverdisconnected",
this.onGattDisconnected,
);
} }
}, },
}); });
this.fromNumCharacteristic.addEventListener( this._toDevice = new WritableStream<Uint8Array>({
"characteristicvaluechanged", write: async (chunk) => {
() => { try {
if (this._fromDeviceController) { const ab = toArrayBuffer(chunk);
this.readFromRadio(this._fromDeviceController); await this.toRadioCharacteristic.writeValue(ab);
void this.readFromRadio(); // ensure we read any response
} catch (error) {
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
"write-error",
);
throw error;
} }
}, },
); });
this.fromNumCharacteristic.startNotifications();
} }
/** Writable stream of bytes to the device. */
get toDevice(): WritableStream<Uint8Array> { get toDevice(): WritableStream<Uint8Array> {
return this._toDevice; return this._toDevice;
} }
/** Readable stream of {@link Types.DeviceOutput} from the device. */
get fromDevice(): ReadableStream<Types.DeviceOutput> { get fromDevice(): ReadableStream<Types.DeviceOutput> {
return this._fromDevice; return this._fromDevice;
} }
protected async readFromRadio( /**
controller: ReadableStreamDefaultController<Types.DeviceOutput>, * Closes the GATT connection and emits `DeviceDisconnected("user")`.
): Promise<void> { */
let hasMoreData = true; disconnect(): Promise<void> {
while (hasMoreData && this.fromRadioCharacteristic) { try {
const value = await this.fromRadioCharacteristic.readValue(); this.closingByUser = true;
if (value.byteLength === 0) { this.emitStatus(Types.DeviceStatusEnum.DeviceDisconnected, "user");
hasMoreData = false; try {
continue; this.fromNumCharacteristic.stopNotifications?.();
} catch {}
this.fromNumCharacteristic.removeEventListener(
"characteristicvaluechanged",
this.onFromNumChanged,
);
this.gattServer.device.removeEventListener(
"gattserverdisconnected",
this.onGattDisconnected,
);
this.gattServer.disconnect();
} finally {
this.closingByUser = false;
}
return Promise.resolve();
}
private async readFromRadio(): Promise<void> {
if (this.reading) {
return;
}
this.reading = true;
try {
let hasMoreData = true;
while (hasMoreData && this.fromRadioCharacteristic) {
const value = await this.fromRadioCharacteristic.readValue();
if (value.byteLength === 0) {
hasMoreData = false;
continue;
}
this.enqueue({
type: "packet",
data: new Uint8Array(value.buffer),
});
}
} catch (error) {
if (!this.closingByUser) {
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
"read-error",
);
} }
controller.enqueue({ throw error;
type: "packet", } finally {
data: new Uint8Array(value.buffer), this.reading = false;
});
} }
} }
disconnect(): Promise<void> { private emitStatus(next: Types.DeviceStatusEnum, reason?: string): void {
this.gattServer.disconnect(); if (next === this.lastStatus) {
return Promise.resolve(); return;
}
this.lastStatus = next;
this.fromDeviceController?.enqueue({
type: "status",
data: { status: next, reason },
});
}
private enqueue(output: Types.DeviceOutput): void {
this.fromDeviceController?.enqueue(output);
} }
} }

225
packages/transport-web-serial/src/transport.test.ts

@ -0,0 +1,225 @@
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { TransportWebSerial } from "./transport";
import { Types, Utils } from "@meshtastic/core";
import { runTransportContract } from "../../../tests/utils/transportContract";
function stubCoreTransforms() {
const toDevice = new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
controller.enqueue(chunk);
},
});
// maps raw bytes -> DeviceOutput.packet
const fromDeviceFactory = () =>
new TransformStream<Uint8Array, Types.DeviceOutput>({
transform(chunk, controller) {
controller.enqueue({ type: "packet", data: chunk });
},
});
const restoreTo = vi
.spyOn(Utils, "toDeviceStream", "get")
.mockReturnValue(toDevice as unknown as typeof Utils.toDeviceStream);
const restoreFrom = vi
.spyOn(Utils, "fromDeviceStream")
.mockImplementation(
() =>
fromDeviceFactory() as unknown as TransformStream<
Uint8Array,
Types.DeviceOutput
>,
);
return {
restore: () => {
restoreTo.mockRestore();
restoreFrom.mockRestore();
},
};
}
function stubNavigatorSerial() {
type SerialDisconnectHandler = (ev: { port?: any }) => void;
const handlers = new Set<SerialDisconnectHandler>();
const serialStub = {
addEventListener: (type: string, handler: EventListenerOrEventListenerObject) => {
if (type === "disconnect") handlers.add(handler as any as SerialDisconnectHandler);
},
removeEventListener: (type: string, handler: EventListenerOrEventListenerObject) => {
if (type === "disconnect") handlers.delete(handler as any as SerialDisconnectHandler);
},
dispatchDisconnect(port: any) {
for (const h of handlers) h({ port });
},
requestPort: vi.fn(async () => new FakeSerialPort()),
};
const nav: any = (globalThis as any).navigator ?? {};
const hadNavigator = !!(globalThis as any).navigator;
const originalSerial = nav.serial;
if (!hadNavigator) {
Object.defineProperty(globalThis as any, "navigator", {
value: nav,
configurable: true,
writable: false,
});
}
Object.defineProperty(nav, "serial", {
value: serialStub,
configurable: true,
enumerable: true,
writable: true,
});
return {
serialStub,
restore: () => {
if (hadNavigator) {
if (originalSerial === undefined) {
delete (globalThis as any).navigator.serial;
} else {
Object.defineProperty((globalThis as any).navigator, "serial", {
value: originalSerial,
configurable: true,
enumerable: true,
writable: true,
});
}
} else {
delete (globalThis as any).navigator;
}
},
};
}
class FakeSerialPort {
readable: ReadableStream<Uint8Array>;
writable: WritableStream<Uint8Array>;
lastWritten?: Uint8Array;
private _readController!: ReadableStreamDefaultController<Uint8Array>;
constructor() {
this.readable = new ReadableStream<Uint8Array>({
start: (controller) => {
this._readController = controller;
},
});
this.writable = new WritableStream<Uint8Array>({
write: async (chunk) => {
this.lastWritten = chunk;
},
});
}
open(_options?: { baudRate?: number }): Promise<void> {
return Promise.resolve();
}
close(): Promise<void> {
try {
this._readController.close();
} catch {}
return Promise.resolve();
}
pushIncoming(bytes: Uint8Array) {
this._readController.enqueue(bytes);
}
}
describe("TransportWebSerial (contract)", () => {
let transforms: { restore(): void } | undefined;
let navSerial: { serialStub: any; restore(): void } | undefined;
beforeEach(() => {
transforms = stubCoreTransforms();
navSerial = stubNavigatorSerial();
});
afterEach(() => {
transforms?.restore();
navSerial?.restore();
vi.restoreAllMocks();
});
runTransportContract({
name: "TransportWebSerial",
setup: () => {},
teardown: () => {},
create: async () => {
const fake = new FakeSerialPort();
const transport = await TransportWebSerial.createFromPort(fake as any);
(globalThis as any).__ws = { fake, serial: navSerial!.serialStub };
await Promise.resolve();
return transport;
},
pushIncoming: async (bytes) => {
(globalThis as any).__ws.fake.pushIncoming(bytes);
await Promise.resolve();
},
assertLastWritten: (bytes) => {
expect((globalThis as any).__ws.fake.lastWritten).toEqual(bytes);
},
triggerDisconnect: async () => {
(globalThis as any).__ws.serial.dispatchDisconnect(
(globalThis as any).__ws.fake,
);
await Promise.resolve();
},
});
});
describe("TransportWebSerial (extras)", () => {
let transforms: { restore(): void } | undefined;
let navSerial: { serialStub: any; restore(): void } | undefined;
beforeEach(() => {
transforms = stubCoreTransforms();
navSerial = stubNavigatorSerial();
});
afterEach(() => {
transforms?.restore();
navSerial?.restore();
vi.restoreAllMocks();
});
it("emits DeviceDisconnected('serial-disconnected') on OS disconnect event", async () => {
const fake = new FakeSerialPort();
const transport = await TransportWebSerial.createFromPort(fake as any);
(globalThis as any).__ws = { fake, serial: navSerial!.serialStub };
const reader = transport.fromDevice.getReader();
// drain statuses until connected
for (let i = 0; i < 3; i++) {
const { value } = await reader.read();
if (!value || value.type !== "status") break;
if (value.data.status === Types.DeviceStatusEnum.DeviceConnected) break;
}
// fire OS-level disconnect
navSerial!.serialStub.dispatchDisconnect(fake as any);
await Promise.resolve();
let saw = false;
for (let i = 0; i < 6; i++) {
const { value } = await reader.read();
if (value?.type === "status" && value.data.reason === "serial-disconnected") {
saw = true;
break;
}
}
expect(saw).toBe(true);
reader.releaseLock();
await transport.disconnect();
});
});

189
packages/transport-web-serial/src/transport.ts

@ -1,72 +1,165 @@
import type { Types } from "@meshtastic/core"; import { Types, Utils } from "@meshtastic/core";
import { Utils } from "@meshtastic/core";
/**
* Provides Web Serial transport for Meshtastic devices.
*
* Implements the {@link Types.Transport} contract using the Web Serial API.
* Use {@link TransportWebSerial.create} or {@link TransportWebSerial.createFromPort}
* to construct an instance.
*/
export class TransportWebSerial implements Types.Transport { export class TransportWebSerial implements Types.Transport {
private _toDevice: WritableStream<Uint8Array>; private _toDevice: WritableStream<Uint8Array>;
private _fromDevice: ReadableStream<Types.DeviceOutput>; private _fromDevice: ReadableStream<Types.DeviceOutput>;
private fromDeviceController?: ReadableStreamDefaultController<Types.DeviceOutput>;
private connection: SerialPort; private connection: SerialPort;
private pipePromise: Promise<void> | null = null; private pipePromise: Promise<void> | null = null;
private abortController: AbortController; private abortController: AbortController;
private portReadable: ReadableStream<Uint8Array>;
private lastStatus: Types.DeviceStatusEnum =
Types.DeviceStatusEnum.DeviceDisconnected;
private closingByUser = false;
/**
* Create a new TransportWebSerial instance using a serial port.
*/
public static async create(baudRate?: number): Promise<TransportWebSerial> { public static async create(baudRate?: number): Promise<TransportWebSerial> {
const port = await navigator.serial.requestPort(); const port = await navigator.serial.requestPort();
await port.open({ baudRate: baudRate || 115200 }); await port.open({ baudRate: baudRate || 115200 });
return new TransportWebSerial(port); return new TransportWebSerial(port);
} }
/**
* Creates a new TransportWebSerial instance from an existing, provided {@link SerialPort}.
* Opens it if not already open.
*/
public static async createFromPort( public static async createFromPort(
port: SerialPort, port: SerialPort,
baudRate?: number, baudRate?: number,
): Promise<TransportWebSerial> { ): Promise<TransportWebSerial> {
await port.open({ baudRate: baudRate || 115200 }); if (!port.readable || !port.writable) {
await port.open({ baudRate: baudRate || 115200 });
}
return new TransportWebSerial(port); return new TransportWebSerial(port);
} }
/**
* Constructs a transport around a given {@link SerialPort}.
* @throws If the port lacks readable or writable streams.
*/
constructor(connection: SerialPort) { constructor(connection: SerialPort) {
if (!connection.readable || !connection.writable) { if (!connection.readable || !connection.writable) {
throw new Error("Stream not accessible"); throw new Error("Stream not accessible");
} }
this.connection = connection; this.connection = connection;
this.portReadable = connection.readable;
this.abortController = new AbortController(); this.abortController = new AbortController();
const abortController = this.abortController;
// Set up the pipe with abort signal for clean cancellation // Set up the pipe with abort signal for clean cancellation
this.pipePromise = Utils.toDeviceStream.readable.pipeTo( this.pipePromise = Utils.toDeviceStream.readable
connection.writable, .pipeTo(connection.writable, { signal: this.abortController.signal })
{ signal: this.abortController.signal }, .catch((err) => {
); // Ignore expected rejection when we cancel it via the AbortController.
if (abortController.signal.aborted) {
return;
}
console.error("Error piping data to serial port:", err);
this.connection.close().catch(() => {});
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
"write-error",
);
});
this._toDevice = Utils.toDeviceStream.writable; this._toDevice = Utils.toDeviceStream.writable;
this._fromDevice = connection.readable.pipeThrough(
Utils.fromDeviceStream(), // Wrap + capture controller to inject status packets
); this._fromDevice = new ReadableStream<Types.DeviceOutput>({
start: async (ctrl) => {
this.fromDeviceController = ctrl;
this.emitStatus(Types.DeviceStatusEnum.DeviceConnecting);
const transformed = this.portReadable.pipeThrough(
Utils.fromDeviceStream(),
);
const reader = transformed.getReader();
const onOsDisconnect = (ev: Event) => {
const { port } = ev as unknown as { port?: SerialPort };
if (port && port === this.connection) {
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
"serial-disconnected",
);
}
};
navigator.serial.addEventListener("disconnect", onOsDisconnect);
this.emitStatus(Types.DeviceStatusEnum.DeviceConnected);
try {
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
ctrl.enqueue(value);
}
ctrl.close();
} catch (error) {
if (!this.closingByUser) {
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
"read-error",
);
}
ctrl.error(error instanceof Error ? error : new Error(String(error)));
try {
await transformed.cancel();
} catch {}
} finally {
reader.releaseLock();
navigator.serial.removeEventListener("disconnect", onOsDisconnect);
}
},
});
} }
get toDevice(): WritableStream<Uint8Array> { /** Writable stream of bytes to the device. */
public get toDevice(): WritableStream<Uint8Array> {
return this._toDevice; return this._toDevice;
} }
get fromDevice(): ReadableStream<Types.DeviceOutput> { /** Readable stream of {@link Types.DeviceOutput} from the device. */
public get fromDevice(): ReadableStream<Types.DeviceOutput> {
return this._fromDevice; return this._fromDevice;
} }
private emitStatus(next: Types.DeviceStatusEnum, reason?: string): void {
if (next === this.lastStatus) {
return;
}
this.lastStatus = next;
this.fromDeviceController?.enqueue({
type: "status",
data: { status: next, reason },
});
}
/** /**
* Safely disconnects the serial port, following best practices from * Closes the serial port and emits `DeviceDisconnected("user")`.
* https://github.com/WICG/serial/. Cancels any active pipe
* operations and only closes the port after streams are unlocked.
*/ */
async disconnect() { public async disconnect(): Promise<void> {
try { try {
this.abortController.abort(); this.closingByUser = true;
// Stop outbound piping
this.abortController.abort();
if (this.pipePromise) { if (this.pipePromise) {
try { await this.pipePromise;
await this.pipePromise;
} catch (error) {
if (error instanceof Error && error.name !== "AbortError") {
throw error;
}
}
} }
// Cancel any remaining streams // Cancel any remaining streams
@ -82,6 +175,9 @@ export class TransportWebSerial implements Types.Transport {
} catch (error) { } catch (error) {
// If we can't close cleanly, let the browser handle cleanup // If we can't close cleanly, let the browser handle cleanup
console.warn("Could not cleanly disconnect serial port:", error); console.warn("Could not cleanly disconnect serial port:", error);
} finally {
this.emitStatus(Types.DeviceStatusEnum.DeviceDisconnected, "user");
this.closingByUser = false;
} }
} }
@ -89,14 +185,43 @@ export class TransportWebSerial implements Types.Transport {
* Reconnects the transport by creating a new AbortController and re-establishing * Reconnects the transport by creating a new AbortController and re-establishing
* the pipe connection. Only call this after disconnect() or if the connection failed. * the pipe connection. Only call this after disconnect() or if the connection failed.
*/ */
async reconnect() { public async reconnect() {
// Create a new AbortController for the new connection this.emitStatus(Types.DeviceStatusEnum.DeviceConnecting, "reconnect");
this.abortController = new AbortController();
try {
if (!this.connection.readable || !this.connection.writable) {
throw new Error("Stream not accessible");
}
this.portReadable = this.connection.readable;
// Re-establish the pipe connection // Create a new AbortController for the new connection
this.pipePromise = Utils.toDeviceStream.readable.pipeTo( this.abortController = new AbortController();
this.connection.writable, const abortController = this.abortController;
{ signal: this.abortController.signal },
); // Re-establish the pipe connection
this.pipePromise = Utils.toDeviceStream.readable
.pipeTo(this.connection.writable, {
signal: this.abortController.signal,
})
.catch((error) => {
if (abortController.signal.aborted) {
return;
}
console.error("Error piping data to serial port (reconnect):", error);
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
"write-error",
);
});
this.emitStatus(Types.DeviceStatusEnum.DeviceConnected, "reconnected");
} catch (error) {
// Couldn’t re-pipe
this.emitStatus(
Types.DeviceStatusEnum.DeviceDisconnected,
"reconnect-failed",
);
throw error;
}
} }
} }

29
packages/web/vitest.config.ts

@ -1,23 +1,27 @@
import path from "node:path"; import path from "node:path";
import process from "node:process"; import { fileURLToPath } from "node:url";
import react from "@vitejs/plugin-react"; import react from "@vitejs/plugin-react";
import { enableMapSet } from "immer"; import { enableMapSet } from "immer";
import { defineProject } from "vitest/config"; import { defineProject } from "vitest/config";
enableMapSet(); enableMapSet();
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const pkgRoot = __dirname;
const srcDir = path.resolve(pkgRoot, "src");
const publicDir = path.resolve(pkgRoot, "public");
export default defineProject({ export default defineProject({
plugins: [react()], plugins: [react()],
resolve: { resolve: {
alias: { alias: {
"@app": path.resolve(process.cwd(), "./packages/web/src"), "@app": srcDir,
"@public": path.resolve(process.cwd(), "./packages/web/public"), "@public": publicDir,
"@core": path.resolve(process.cwd(), "./packages/web/src/core"), "@core": path.resolve(srcDir, "core"),
"@pages": path.resolve(process.cwd(), "./packages/web/src/pages"), "@pages": path.resolve(srcDir, "pages"),
"@components": path.resolve( "@components": path.resolve(srcDir, "components"),
process.cwd(), "@layouts": path.resolve(srcDir, "layouts"),
"./packages/web/src/components",
),
"@layouts": path.resolve(process.cwd(), "./packages/web/src/layouts"),
}, },
}, },
test: { test: {
@ -26,8 +30,7 @@ export default defineProject({
mockReset: true, mockReset: true,
clearMocks: true, clearMocks: true,
restoreMocks: true, restoreMocks: true,
root: path.resolve(process.cwd(), "./packages/web/src"), include: ["src/**/*.{test,spec}.{ts,tsx}"],
include: ["**/*.{test,spec}.{ts,tsx}"], setupFiles: [path.resolve(srcDir, "tests/setup.ts")],
setupFiles: ["./src/tests/setup.ts"],
}, },
}); });

122
tests/utils/transportContract.ts

@ -0,0 +1,122 @@
import { Types } from "@meshtastic/core";
import { describe, expect, it } from "vitest";
export interface TransportContract {
name: string;
create: () => Promise<Types.Transport>;
setup?: () => void | Promise<void>;
teardown?: () => void | Promise<void>;
pushIncoming?: (bytes: Uint8Array) => void | Promise<void>;
assertLastWritten?: (bytes: Uint8Array) => void;
triggerDisconnect?: () => void | Promise<void>;
}
async function readUntilType(
reader: ReadableStreamDefaultReader<Types.DeviceOutput>,
expectedType: Types.DeviceOutput["type"],
maxReads = 20,
): Promise<Types.DeviceOutput> {
for (let i = 0; i < maxReads; i++) {
const { value, done } = await reader.read();
if (done) {
break;
}
if (value && value.type === expectedType) {
return value;
}
}
throw new Error(
`Did not receive a '${expectedType}' event within ${maxReads} reads`,
);
}
export function runTransportContract(contract: TransportContract) {
describe(contract.name, () => {
it("reads packets from fromDevice", async () => {
await contract.setup?.();
const transport = await contract.create();
const reader = transport.fromDevice.getReader();
const sampleBytes = new Uint8Array([0x01, 0x02, 0x03]);
await contract.pushIncoming?.(sampleBytes);
const packetEvent = await readUntilType(reader, "packet");
expect("data" in packetEvent ? packetEvent.data : undefined).toEqual(
sampleBytes,
);
reader.releaseLock();
await contract.teardown?.();
});
it("writes bytes to toDevice", async () => {
await contract.setup?.();
const transport = await contract.create();
const writer = transport.toDevice.getWriter();
const outgoingBytes = new Uint8Array([0xaa, 0xbb]);
await writer.write(outgoingBytes);
await writer.close();
contract.assertLastWritten?.(outgoingBytes);
await contract.teardown?.();
});
it("disconnect() emits DeviceDisconnected('user')", async () => {
await contract.setup?.();
const transport = await contract.create();
const reader = transport.fromDevice.getReader();
// Trigger user disconnect
await transport.disconnect();
// Read a few events and assert we eventually see the user disconnect.
let sawUser = false;
for (let i = 0; i < 10; i++) {
const { value } = await reader.read();
if (
value &&
value.type === "status" &&
value.data.status === Types.DeviceStatusEnum.DeviceDisconnected &&
value.data.reason === "user"
) {
sawUser = true;
break;
}
}
expect(sawUser).toBe(true);
reader.releaseLock();
await contract.teardown?.();
});
it("emits DeviceDisconnected when the underlying link drops", async () => {
await contract.setup?.();
const transport = await contract.create();
const reader = transport.fromDevice.getReader();
await contract.triggerDisconnect?.();
// As above, read a few events and assert we eventually see "disconnected"
let sawDrop = false;
for (let i = 0; i < 10; i++) {
const { value } = await reader.read();
if (
value &&
value.type === "status" &&
value.data.status === Types.DeviceStatusEnum.DeviceDisconnected
) {
sawDrop = true;
break;
}
}
expect(sawDrop).toBe(true);
reader.releaseLock();
await contract.teardown?.();
});
});
}
Loading…
Cancel
Save