diff --git a/arduino-ide-extension/src/node/arduino-ide-backend-module.ts b/arduino-ide-extension/src/node/arduino-ide-backend-module.ts index 386f6d31..d0a39569 100644 --- a/arduino-ide-extension/src/node/arduino-ide-backend-module.ts +++ b/arduino-ide-extension/src/node/arduino-ide-backend-module.ts @@ -203,6 +203,7 @@ export default new ContainerModule((bind, unbind, isBound, rebind) => { // Shared port/board discovery for the server bind(BoardDiscovery).toSelf().inSingletonScope(); + bind(BackendApplicationContribution).toService(BoardDiscovery); // Core service -> `verify` and `upload`. Singleton per BE, each FE connection gets its proxy. bind(ConnectionContainerModule).toConstantValue( @@ -338,10 +339,10 @@ export default new ContainerModule((bind, unbind, isBound, rebind) => { bind(ILogger) .toDynamicValue((ctx) => { const parentLogger = ctx.container.get(ILogger); - return parentLogger.child('discovery'); + return parentLogger.child('discovery-log'); // TODO: revert }) .inSingletonScope() - .whenTargetNamed('discovery'); + .whenTargetNamed('discovery-log'); // TODO: revert // Logger for the CLI config service. From the CLI config (FS path aware), we make a URI-aware app config. bind(ILogger) diff --git a/arduino-ide-extension/src/node/board-discovery.ts b/arduino-ide-extension/src/node/board-discovery.ts index 9a83df31..f3a2eac4 100644 --- a/arduino-ide-extension/src/node/board-discovery.ts +++ b/arduino-ide-extension/src/node/board-discovery.ts @@ -1,8 +1,8 @@ -import { injectable, inject, postConstruct, named } from '@theia/core/shared/inversify'; +import { injectable, inject, named } from '@theia/core/shared/inversify'; import { ClientDuplexStream } from '@grpc/grpc-js'; import { ILogger } from '@theia/core/lib/common/logger'; import { deepClone } from '@theia/core/lib/common/objects'; -import { CoreClientAware, CoreClientProvider } from './core-client-provider'; +import { CoreClientAware } from './core-client-provider'; import { BoardListWatchRequest, BoardListWatchResponse, @@ -14,6 +14,19 @@ import { AvailablePorts, AttachedBoardsChangeEvent, } from '../common/protocol'; +import { Emitter } from '@theia/core/lib/common/event'; +import { DisposableCollection } from '@theia/core/lib/common/disposable'; +import { Disposable } from '@theia/core/shared/vscode-languageserver-protocol'; +import { ArduinoCoreServiceClient } from './cli-protocol/cc/arduino/cli/commands/v1/commands_grpc_pb'; +import { v4 } from 'uuid'; +import { ServiceError } from './service-error'; +import { BackendApplicationContribution } from '@theia/core/lib/node'; + +type Duplex = ClientDuplexStream; +interface StreamWrapper extends Disposable { + readonly stream: Duplex; + readonly uuid: string; // For logging only +} /** * Singleton service for tracking the available ports and board and broadcasting the @@ -21,24 +34,27 @@ import { * Unlike other services, this is not connection scoped. */ @injectable() -export class BoardDiscovery extends CoreClientAware { +export class BoardDiscovery + extends CoreClientAware + implements BackendApplicationContribution +{ @inject(ILogger) - @named('discovery') - protected discoveryLogger: ILogger; + @named('discovery-log') + private readonly logger: ILogger; @inject(NotificationServiceServer) - protected readonly notificationService: NotificationServiceServer; + private readonly notificationService: NotificationServiceServer; // Used to know if the board watch process is already running to avoid // starting it multiple times private watching: boolean; - - protected boardWatchDuplex: - | ClientDuplexStream - | undefined; + private wrapper: StreamWrapper | undefined; + private readonly onStreamDidEndEmitter = new Emitter(); // sent from the CLI when the discovery process is killed for example after the indexes update and the core client re-initialization. + private readonly onStreamDidCancelEmitter = new Emitter(); // when the watcher is canceled by the IDE2 + private readonly toDisposeOnStopWatch = new DisposableCollection(); /** - * Keys are the `address` of the ports. \ + * Keys are the `address` of the ports. * The `protocol` is ignored because the board detach event does not carry the protocol information, * just the address. * ```json @@ -48,62 +64,153 @@ export class BoardDiscovery extends CoreClientAware { * } * ``` */ - protected _state: AvailablePorts = {}; + private _state: AvailablePorts = {}; get state(): AvailablePorts { return this._state; } - @postConstruct() - protected async init(): Promise { - this.coreClient.then((client) => this.startBoardListWatch(client)); - this.onClientDidRefresh((client) => - this.stopBoardListWatch(client).then(() => - this.startBoardListWatch(client) - ) - ); + onStart(): void { + this.start(); + this.onClientDidRefresh(() => this.start()); } - stopBoardListWatch(coreClient: CoreClientProvider.Client): Promise { - return new Promise((resolve, reject) => { - if (!this.boardWatchDuplex) { - return resolve(); - } + onStop(): void { + this.stop(); + } - const { instance } = coreClient; - const req = new BoardListWatchRequest(); - req.setInstance(instance); - try { - this.boardWatchDuplex.write(req.setInterrupt(true), resolve); - } catch (e) { - this.discoveryLogger.error(e); - resolve(); + stop(): Promise { + this.logger.info('>>> Stopping boards watcher...'); + return new Promise((resolve, reject) => { + const timeout = this.timeout(BoardDiscovery.StopWatchTimeout, reject); + const toDispose = new DisposableCollection(); + toDispose.pushAll([ + timeout, + this.onStreamDidEndEmitter.event(() => { + this.logger.info( + `<<< Received the end event from the stream. Boards watcher has been successfully stopped.` + ); + this.watching = false; + toDispose.dispose(); + resolve(); + }), + this.onStreamDidCancelEmitter.event(() => { + this.logger.info( + `<<< Received the cancel event from the stream. Boards watcher has been successfully stopped.` + ); + this.watching = false; + toDispose.dispose(); + resolve(); + }), + ]); + this.logger.info('Canceling boards watcher...'); + this.toDisposeOnStopWatch.dispose(); + }); + } + + private timeout( + after: number, + onTimeout: (error: Error) => void + ): Disposable { + const timer = setTimeout( + () => onTimeout(new Error(`Timed out after ${after} ms.`)), + after + ); + return Disposable.create(() => clearTimeout(timer)); + } + + private async write( + req: BoardListWatchRequest, + duplex: Duplex + ): Promise { + return new Promise((resolve, reject) => { + this.logger.info(`>>> Writing ${this.toJson(req)} to the stream...`); + if ( + !duplex.write(req, (err: Error | undefined) => { + if (err) { + this.logger.error( + `<<< Error ocurred while writing to the stream.`, + err + ); + reject(err); + return; + } + }) + ) { + duplex.once('drain', () => { + this.logger.info( + `<<< Board list watch request has been successfully written to the stream after the handling backpressure.` + ); + resolve(); + }); + } else { + process.nextTick(() => { + this.logger.info( + `<<< Board list watch request has been successfully written to the stream.` + ); + resolve(); + }); } }); } - startBoardListWatch(coreClient: CoreClientProvider.Client): void { + private async createWrapper( + client: ArduinoCoreServiceClient + ): Promise { + if (this.wrapper) { + throw new Error(`Duplex was already set.`); + } + const stream = client + .boardListWatch() + .on('end', () => this.onStreamDidEndEmitter.fire()) + .on('error', (error) => { + if (ServiceError.isCancel(error)) { + this.onStreamDidCancelEmitter.fire(); + } else { + this.logger.error( + 'Unexpected error occurred during the boards discovery.', + error + ); + // TODO: terminate? restart? reject? + } + }); + const wrapper = { + stream, + uuid: v4(), + dispose: () => { + // Cancelling the stream will kill the discovery `builtin:mdns-discovery process`. + // The client (this class) will receive a `{"eventType":"quit","error":""}` response from the CLI. + stream.cancel(); + this.wrapper = undefined; + }, + }; + this.toDisposeOnStopWatch.pushAll([wrapper]); + return wrapper; + } + + private toJson(arg: BoardListWatchRequest | BoardListWatchResponse): string { + let object: Record | undefined = undefined; + if (arg instanceof BoardListWatchRequest) { + object = BoardListWatchRequest.toObject(false, arg); + } else if (arg instanceof BoardListWatchResponse) { + object = BoardListWatchResponse.toObject(false, arg); + } else { + throw new Error(`Unhandled object type: ${arg}`); + } + return JSON.stringify(object); + } + + async start(): Promise { if (this.watching) { // We want to avoid starting the board list watch process multiple // times to meet unforeseen consequences return; } - this.watching = true; - const { client, instance } = coreClient; - const req = new BoardListWatchRequest(); - req.setInstance(instance); - this.boardWatchDuplex = client.boardListWatch(); - this.boardWatchDuplex.on('end', () => { - this.watching = false; - console.info('board watch ended'); - }); - this.boardWatchDuplex.on('close', () => { - this.watching = false; - console.info('board watch ended'); - }); - this.boardWatchDuplex.on('data', (resp: BoardListWatchResponse) => { + const { client, instance } = await this.coreClient; + const wrapper = await this.createWrapper(client); + wrapper.stream.on('data', async (resp: BoardListWatchResponse) => { + this.logger.info('onData', this.toJson(resp)); if (resp.getEventType() === 'quit') { - this.watching = false; - console.info('board watch ended'); + await this.stop(); return; } @@ -135,7 +242,9 @@ export class BoardDiscovery extends CoreClientAware { // protocols. const portID = `${address}|${protocol}`; const label = (detectedPort as any).getPort().getLabel(); - const protocolLabel = (detectedPort as any).getPort().getProtocolLabel(); + const protocolLabel = (detectedPort as any) + .getPort() + .getProtocolLabel(); const port = { id: portID, address, @@ -155,8 +264,10 @@ export class BoardDiscovery extends CoreClientAware { if (eventType === 'add') { if (newState[portID]) { const [, knownBoards] = newState[portID]; - console.warn( - `Port '${Port.toString(port)}' was already available. Known boards before override: ${JSON.stringify( + this.logger.warn( + `Port '${Port.toString( + port + )}' was already available. Known boards before override: ${JSON.stringify( knownBoards )}` ); @@ -164,7 +275,9 @@ export class BoardDiscovery extends CoreClientAware { newState[portID] = [port, boards]; } else if (eventType === 'remove') { if (!newState[portID]) { - console.warn(`Port '${Port.toString(port)}' was not available. Skipping`); + this.logger.warn( + `Port '${Port.toString(port)}' was not available. Skipping` + ); return; } delete newState[portID]; @@ -189,7 +302,11 @@ export class BoardDiscovery extends CoreClientAware { this.notificationService.notifyAttachedBoardsDidChange(event); } }); - this.boardWatchDuplex.write(req); + await this.write( + new BoardListWatchRequest().setInstance(instance), + wrapper.stream + ); + this.watching = true; } getAttachedBoards(state: AvailablePorts = this.state): Board[] { @@ -210,3 +327,6 @@ export class BoardDiscovery extends CoreClientAware { return availablePorts; } } +export namespace BoardDiscovery { + export const StopWatchTimeout = 10_000; +} diff --git a/arduino-ide-extension/src/node/boards-service-impl.ts b/arduino-ide-extension/src/node/boards-service-impl.ts index 98cdcb7f..ae88b4f3 100644 --- a/arduino-ide-extension/src/node/boards-service-impl.ts +++ b/arduino-ide-extension/src/node/boards-service-impl.ts @@ -414,7 +414,7 @@ export class BoardsServiceImpl console.info('>>> Starting boards package installation...', item); // stop the board discovery - await this.boardDiscovery.stopBoardListWatch(coreClient); + await this.boardDiscovery.stop(); const resp = client.platformInstall(req); resp.on( @@ -426,7 +426,7 @@ export class BoardsServiceImpl ); await new Promise((resolve, reject) => { resp.on('end', () => { - this.boardDiscovery.startBoardListWatch(coreClient); + this.boardDiscovery.start(); // TODO: remove discovery dependency from boards service. See https://github.com/arduino/arduino-ide/pull/1107 why this is here. resolve(); }); resp.on('error', (error) => { @@ -465,7 +465,7 @@ export class BoardsServiceImpl console.info('>>> Starting boards package uninstallation...', item); // stop the board discovery - await this.boardDiscovery.stopBoardListWatch(coreClient); + await this.boardDiscovery.stop(); const resp = client.platformUninstall(req); resp.on( @@ -477,7 +477,7 @@ export class BoardsServiceImpl ); await new Promise((resolve, reject) => { resp.on('end', () => { - this.boardDiscovery.startBoardListWatch(coreClient); + this.boardDiscovery.start(); // TODO: remove discovery dependency from boards service. See https://github.com/arduino/arduino-ide/pull/1107 why this is here. resolve(); }); resp.on('error', reject); diff --git a/arduino-ide-extension/src/node/library-service-impl.ts b/arduino-ide-extension/src/node/library-service-impl.ts index 42ad456c..e0d3be56 100644 --- a/arduino-ide-extension/src/node/library-service-impl.ts +++ b/arduino-ide-extension/src/node/library-service-impl.ts @@ -269,7 +269,7 @@ export class LibraryServiceImpl console.info('>>> Starting library package installation...', item); // stop the board discovery - await this.boardDiscovery.stopBoardListWatch(coreClient); + await this.boardDiscovery.stop(); const resp = client.libraryInstall(req); resp.on( @@ -281,7 +281,7 @@ export class LibraryServiceImpl ); await new Promise((resolve, reject) => { resp.on('end', () => { - this.boardDiscovery.startBoardListWatch(coreClient); + this.boardDiscovery.start(); // TODO: remove discovery dependency from boards service. See https://github.com/arduino/arduino-ide/pull/1107 why this is here. resolve(); }); resp.on('error', (error) => { @@ -323,7 +323,7 @@ export class LibraryServiceImpl } // stop the board discovery - await this.boardDiscovery.stopBoardListWatch(coreClient); + await this.boardDiscovery.stop(); const resp = client.zipLibraryInstall(req); resp.on( @@ -335,7 +335,7 @@ export class LibraryServiceImpl ); await new Promise((resolve, reject) => { resp.on('end', () => { - this.boardDiscovery.startBoardListWatch(coreClient); + this.boardDiscovery.start(); // TODO: remove discovery dependency from boards service. See https://github.com/arduino/arduino-ide/pull/1107 why this is here. resolve(); }); resp.on('error', reject); @@ -358,7 +358,7 @@ export class LibraryServiceImpl console.info('>>> Starting library package uninstallation...', item); // stop the board discovery - await this.boardDiscovery.stopBoardListWatch(coreClient); + await this.boardDiscovery.stop(); const resp = client.libraryUninstall(req); resp.on( @@ -370,7 +370,7 @@ export class LibraryServiceImpl ); await new Promise((resolve, reject) => { resp.on('end', () => { - this.boardDiscovery.startBoardListWatch(coreClient); + this.boardDiscovery.start(); // TODO: remove discovery dependency from boards service. See https://github.com/arduino/arduino-ide/pull/1107 why this is here. resolve(); }); resp.on('error', reject); diff --git a/arduino-ide-extension/src/node/service-error.ts b/arduino-ide-extension/src/node/service-error.ts index 3abbbc0b..a56cf13e 100644 --- a/arduino-ide-extension/src/node/service-error.ts +++ b/arduino-ide-extension/src/node/service-error.ts @@ -2,6 +2,9 @@ import { Metadata, StatusObject } from '@grpc/grpc-js'; export type ServiceError = StatusObject & Error; export namespace ServiceError { + export function isCancel(arg: unknown): arg is ServiceError & { code: 1 } { + return is(arg) && arg.code === 1; // https://grpc.github.io/grpc/core/md_doc_statuscodes.html + } export function is(arg: unknown): arg is ServiceError { return arg instanceof Error && isStatusObjet(arg); }