Browse Source

Transport disconnect method (#753)

* Prevent reads from piling up

* Implement disconnect() method for all transports
pull/752/head
Henri Bergius 11 months ago
committed by GitHub
parent
commit
0a7b653ec8
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 1
      packages/core/src/meshDevice.ts
  2. 1
      packages/core/src/types.ts
  3. 12
      packages/transport-deno/src/transport.ts
  4. 26
      packages/transport-http/src/transport.ts
  5. 12
      packages/transport-node/src/transport.ts
  6. 9
      packages/transport-web-bluetooth/src/transport.ts
  7. 7
      packages/transport-web-serial/src/transport.ts

1
packages/core/src/meshDevice.ts

@ -818,6 +818,7 @@ export class MeshDevice {
this.complete();
await this.transport.toDevice.close();
await this.transport.disconnect();
}
/**

1
packages/core/src/types.ts

@ -15,6 +15,7 @@ export type DeviceOutput = Packet | DebugLog;
export interface Transport {
toDevice: WritableStream<Uint8Array>;
fromDevice: ReadableStream<DeviceOutput>;
disconnect(): Promise<void>;
}
export interface QueueItem {

12
packages/transport-deno/src/transport.ts

@ -4,6 +4,7 @@ import { Utils } from "@meshtastic/core";
export class TransportDeno implements Types.Transport {
private _toDevice: WritableStream<Uint8Array>;
private _fromDevice: ReadableStream<Types.DeviceOutput>;
private connection: Deno.Conn | undefined;
public static async create(hostname: string): Promise<TransportDeno> {
const connection = await Deno.connect({
@ -14,10 +15,11 @@ export class TransportDeno implements Types.Transport {
}
constructor(connection: Deno.Conn) {
Utils.toDeviceStream.readable.pipeTo(connection.writable);
this.connection = connection;
Utils.toDeviceStream.readable.pipeTo(this.connection.writable);
this._toDevice = Utils.toDeviceStream.writable;
this._fromDevice = connection.readable.pipeThrough(
this._fromDevice = this.connection.readable.pipeThrough(
Utils.fromDeviceStream(),
);
}
@ -29,4 +31,10 @@ export class TransportDeno implements Types.Transport {
get fromDevice(): ReadableStream<Types.DeviceOutput> {
return this._fromDevice;
}
disconnect(): Promise<void> {
this.connection.close();
this.connection = undefined;
return Promise.resolve();
}
}

26
packages/transport-http/src/transport.ts

@ -6,6 +6,8 @@ export class TransportHTTP implements Types.Transport {
private url: string;
private receiveBatchRequests: boolean;
private fetchInterval: number;
private fetching: boolean;
private interval: number | undefined;
public static async create(
address: string,
@ -23,6 +25,7 @@ export class TransportHTTP implements Types.Transport {
this.url = url;
this.receiveBatchRequests = false;
this.fetchInterval = 3000;
this.fetching = false;
this._toDevice = new WritableStream<Uint8Array>({
write: async (chunk) => {
@ -38,8 +41,18 @@ export class TransportHTTP implements Types.Transport {
},
});
setInterval(async () => {
await this.readFromRadio(controller);
this.interval = setInterval(async () => {
if (this.fetching) {
// We still have the previous request open
return;
}
this.fetching = true;
try {
await this.readFromRadio(controller);
} catch (e) {
// TODO: Emit disconnection events for certain types of errors
}
this.fetching = false;
}, this.fetchInterval);
}
@ -88,4 +101,13 @@ export class TransportHTTP implements Types.Transport {
get fromDevice(): ReadableStream<Types.DeviceOutput> {
return this._fromDevice;
}
disconnect() : Promise<void> {
this.fetching = false;
if (this.interval) {
clearInterval(this.interval);
}
this.interval = undefined;
return Promise.resolve();
}
}

12
packages/transport-node/src/transport.ts

@ -6,6 +6,7 @@ import { Utils } from "@meshtastic/core";
export class TransportNode implements Types.Transport {
private readonly _toDevice: WritableStream<Uint8Array>;
private readonly _fromDevice: ReadableStream<Types.DeviceOutput>;
private socket: Socket | undefined;
/**
* Creates and connects a new TransportNode instance.
@ -36,7 +37,8 @@ export class TransportNode implements Types.Transport {
* @param connection - An active Node.js net.Socket connection.
*/
constructor(connection: Socket) {
connection.on("error", (err) => {
this.socket = connection;
this.socket.on("error", (err) => {
console.error("Socket connection error:", err);
});
@ -56,7 +58,7 @@ export class TransportNode implements Types.Transport {
.pipeTo(Writable.toWeb(connection) as WritableStream<Uint8Array>)
.catch((err) => {
console.error("Error piping data to socket:", err);
connection.destroy(err as Error);
this.socket.destroy(err as Error);
});
}
@ -73,4 +75,10 @@ export class TransportNode implements Types.Transport {
public get fromDevice(): ReadableStream<Types.DeviceOutput> {
return this._fromDevice;
}
disconnect() {
this.socket.destroy();
this.socket = undefined;
return Promise.resolve();
}
}

9
packages/transport-web-bluetooth/src/transport.ts

@ -9,6 +9,7 @@ export class TransportWebBluetooth implements Types.Transport {
private toRadioCharacteristic: BluetoothRemoteGATTCharacteristic;
private fromRadioCharacteristic: BluetoothRemoteGATTCharacteristic;
private fromNumCharacteristic: BluetoothRemoteGATTCharacteristic;
private gattServer: BluetoothRemoteGATTServer;
static ToRadioUuid = "f75c76d2-129e-4dad-a1dd-7866124401e7";
static FromRadioUuid = "2c55e69e-4993-11ed-b878-0242ac120002";
@ -65,6 +66,7 @@ export class TransportWebBluetooth implements Types.Transport {
toRadioCharacteristic,
fromRadioCharacteristic,
fromNumCharacteristic,
gattServer,
);
}
@ -72,10 +74,12 @@ export class TransportWebBluetooth implements Types.Transport {
toRadioCharacteristic: BluetoothRemoteGATTCharacteristic,
fromRadioCharacteristic: BluetoothRemoteGATTCharacteristic,
fromNumCharacteristic: BluetoothRemoteGATTCharacteristic,
gattServer: BluetoothRemoteGATTServer,
) {
this.toRadioCharacteristic = toRadioCharacteristic;
this.fromRadioCharacteristic = fromRadioCharacteristic;
this.fromNumCharacteristic = fromNumCharacteristic;
this.gattServer = gattServer;
this._fromDevice = new ReadableStream({
start: (ctrl) => {
@ -133,4 +137,9 @@ export class TransportWebBluetooth implements Types.Transport {
});
}
}
disconnect() : Promise<void> {
this.gattServer.disconnect();
return Promise.resolve();
}
}

7
packages/transport-web-serial/src/transport.ts

@ -4,6 +4,7 @@ import { Utils } from "@meshtastic/core";
export class TransportWebSerial implements Types.Transport {
private _toDevice: WritableStream<Uint8Array>;
private _fromDevice: ReadableStream<Types.DeviceOutput>;
private connection: SerialPort;
public static async create(baudRate?: number): Promise<TransportWebSerial> {
const port = await navigator.serial.requestPort();
@ -24,6 +25,8 @@ export class TransportWebSerial implements Types.Transport {
throw new Error("Stream not accessible");
}
this.connection = connection;
Utils.toDeviceStream.readable.pipeTo(connection.writable);
this._toDevice = Utils.toDeviceStream.writable;
@ -39,4 +42,8 @@ export class TransportWebSerial implements Types.Transport {
get fromDevice(): ReadableStream<Types.DeviceOutput> {
return this._fromDevice;
}
disconnect() {
return this.connection.close();
}
}

Loading…
Cancel
Save