another way to cancel the discovery.

Signed-off-by: Akos Kitta <a.kitta@arduino.cc>
This commit is contained in:
Akos Kitta 2022-07-12 15:59:03 +02:00 committed by Akos Kitta
parent 431c3bdf2b
commit d8be8888ef
5 changed files with 191 additions and 67 deletions

View File

@ -203,6 +203,7 @@ export default new ContainerModule((bind, unbind, isBound, rebind) => {
// Shared port/board discovery for the server
bind(BoardDiscovery).toSelf().inSingletonScope();
bind(BackendApplicationContribution).toService(BoardDiscovery);
// Core service -> `verify` and `upload`. Singleton per BE, each FE connection gets its proxy.
bind(ConnectionContainerModule).toConstantValue(
@ -338,10 +339,10 @@ export default new ContainerModule((bind, unbind, isBound, rebind) => {
bind(ILogger)
.toDynamicValue((ctx) => {
const parentLogger = ctx.container.get<ILogger>(ILogger);
return parentLogger.child('discovery');
return parentLogger.child('discovery-log'); // TODO: revert
})
.inSingletonScope()
.whenTargetNamed('discovery');
.whenTargetNamed('discovery-log'); // TODO: revert
// Logger for the CLI config service. From the CLI config (FS path aware), we make a URI-aware app config.
bind(ILogger)

View File

@ -1,8 +1,8 @@
import { injectable, inject, postConstruct, named } from '@theia/core/shared/inversify';
import { injectable, inject, named } from '@theia/core/shared/inversify';
import { ClientDuplexStream } from '@grpc/grpc-js';
import { ILogger } from '@theia/core/lib/common/logger';
import { deepClone } from '@theia/core/lib/common/objects';
import { CoreClientAware, CoreClientProvider } from './core-client-provider';
import { CoreClientAware } from './core-client-provider';
import {
BoardListWatchRequest,
BoardListWatchResponse,
@ -14,6 +14,19 @@ import {
AvailablePorts,
AttachedBoardsChangeEvent,
} from '../common/protocol';
import { Emitter } from '@theia/core/lib/common/event';
import { DisposableCollection } from '@theia/core/lib/common/disposable';
import { Disposable } from '@theia/core/shared/vscode-languageserver-protocol';
import { ArduinoCoreServiceClient } from './cli-protocol/cc/arduino/cli/commands/v1/commands_grpc_pb';
import { v4 } from 'uuid';
import { ServiceError } from './service-error';
import { BackendApplicationContribution } from '@theia/core/lib/node';
type Duplex = ClientDuplexStream<BoardListWatchRequest, BoardListWatchResponse>;
interface StreamWrapper extends Disposable {
readonly stream: Duplex;
readonly uuid: string; // For logging only
}
/**
* Singleton service for tracking the available ports and board and broadcasting the
@ -21,24 +34,27 @@ import {
* Unlike other services, this is not connection scoped.
*/
@injectable()
export class BoardDiscovery extends CoreClientAware {
export class BoardDiscovery
extends CoreClientAware
implements BackendApplicationContribution
{
@inject(ILogger)
@named('discovery')
protected discoveryLogger: ILogger;
@named('discovery-log')
private readonly logger: ILogger;
@inject(NotificationServiceServer)
protected readonly notificationService: NotificationServiceServer;
private readonly notificationService: NotificationServiceServer;
// Used to know if the board watch process is already running to avoid
// starting it multiple times
private watching: boolean;
protected boardWatchDuplex:
| ClientDuplexStream<BoardListWatchRequest, BoardListWatchResponse>
| undefined;
private wrapper: StreamWrapper | undefined;
private readonly onStreamDidEndEmitter = new Emitter<void>(); // sent from the CLI when the discovery process is killed for example after the indexes update and the core client re-initialization.
private readonly onStreamDidCancelEmitter = new Emitter<void>(); // when the watcher is canceled by the IDE2
private readonly toDisposeOnStopWatch = new DisposableCollection();
/**
* Keys are the `address` of the ports. \
* Keys are the `address` of the ports.
* The `protocol` is ignored because the board detach event does not carry the protocol information,
* just the address.
* ```json
@ -48,62 +64,153 @@ export class BoardDiscovery extends CoreClientAware {
* }
* ```
*/
protected _state: AvailablePorts = {};
private _state: AvailablePorts = {};
get state(): AvailablePorts {
return this._state;
}
@postConstruct()
protected async init(): Promise<void> {
this.coreClient.then((client) => this.startBoardListWatch(client));
this.onClientDidRefresh((client) =>
this.stopBoardListWatch(client).then(() =>
this.startBoardListWatch(client)
)
);
onStart(): void {
this.start();
this.onClientDidRefresh(() => this.start());
}
stopBoardListWatch(coreClient: CoreClientProvider.Client): Promise<void> {
return new Promise((resolve, reject) => {
if (!this.boardWatchDuplex) {
return resolve();
}
onStop(): void {
this.stop();
}
const { instance } = coreClient;
const req = new BoardListWatchRequest();
req.setInstance(instance);
try {
this.boardWatchDuplex.write(req.setInterrupt(true), resolve);
} catch (e) {
this.discoveryLogger.error(e);
resolve();
stop(): Promise<void> {
this.logger.info('>>> Stopping boards watcher...');
return new Promise<void>((resolve, reject) => {
const timeout = this.timeout(BoardDiscovery.StopWatchTimeout, reject);
const toDispose = new DisposableCollection();
toDispose.pushAll([
timeout,
this.onStreamDidEndEmitter.event(() => {
this.logger.info(
`<<< Received the end event from the stream. Boards watcher has been successfully stopped.`
);
this.watching = false;
toDispose.dispose();
resolve();
}),
this.onStreamDidCancelEmitter.event(() => {
this.logger.info(
`<<< Received the cancel event from the stream. Boards watcher has been successfully stopped.`
);
this.watching = false;
toDispose.dispose();
resolve();
}),
]);
this.logger.info('Canceling boards watcher...');
this.toDisposeOnStopWatch.dispose();
});
}
private timeout(
after: number,
onTimeout: (error: Error) => void
): Disposable {
const timer = setTimeout(
() => onTimeout(new Error(`Timed out after ${after} ms.`)),
after
);
return Disposable.create(() => clearTimeout(timer));
}
private async write(
req: BoardListWatchRequest,
duplex: Duplex
): Promise<void> {
return new Promise<void>((resolve, reject) => {
this.logger.info(`>>> Writing ${this.toJson(req)} to the stream...`);
if (
!duplex.write(req, (err: Error | undefined) => {
if (err) {
this.logger.error(
`<<< Error ocurred while writing to the stream.`,
err
);
reject(err);
return;
}
})
) {
duplex.once('drain', () => {
this.logger.info(
`<<< Board list watch request has been successfully written to the stream after the handling backpressure.`
);
resolve();
});
} else {
process.nextTick(() => {
this.logger.info(
`<<< Board list watch request has been successfully written to the stream.`
);
resolve();
});
}
});
}
startBoardListWatch(coreClient: CoreClientProvider.Client): void {
private async createWrapper(
client: ArduinoCoreServiceClient
): Promise<StreamWrapper> {
if (this.wrapper) {
throw new Error(`Duplex was already set.`);
}
const stream = client
.boardListWatch()
.on('end', () => this.onStreamDidEndEmitter.fire())
.on('error', (error) => {
if (ServiceError.isCancel(error)) {
this.onStreamDidCancelEmitter.fire();
} else {
this.logger.error(
'Unexpected error occurred during the boards discovery.',
error
);
// TODO: terminate? restart? reject?
}
});
const wrapper = {
stream,
uuid: v4(),
dispose: () => {
// Cancelling the stream will kill the discovery `builtin:mdns-discovery process`.
// The client (this class) will receive a `{"eventType":"quit","error":""}` response from the CLI.
stream.cancel();
this.wrapper = undefined;
},
};
this.toDisposeOnStopWatch.pushAll([wrapper]);
return wrapper;
}
private toJson(arg: BoardListWatchRequest | BoardListWatchResponse): string {
let object: Record<string, unknown> | undefined = undefined;
if (arg instanceof BoardListWatchRequest) {
object = BoardListWatchRequest.toObject(false, arg);
} else if (arg instanceof BoardListWatchResponse) {
object = BoardListWatchResponse.toObject(false, arg);
} else {
throw new Error(`Unhandled object type: ${arg}`);
}
return JSON.stringify(object);
}
async start(): Promise<void> {
if (this.watching) {
// We want to avoid starting the board list watch process multiple
// times to meet unforeseen consequences
return;
}
this.watching = true;
const { client, instance } = coreClient;
const req = new BoardListWatchRequest();
req.setInstance(instance);
this.boardWatchDuplex = client.boardListWatch();
this.boardWatchDuplex.on('end', () => {
this.watching = false;
console.info('board watch ended');
});
this.boardWatchDuplex.on('close', () => {
this.watching = false;
console.info('board watch ended');
});
this.boardWatchDuplex.on('data', (resp: BoardListWatchResponse) => {
const { client, instance } = await this.coreClient;
const wrapper = await this.createWrapper(client);
wrapper.stream.on('data', async (resp: BoardListWatchResponse) => {
this.logger.info('onData', this.toJson(resp));
if (resp.getEventType() === 'quit') {
this.watching = false;
console.info('board watch ended');
await this.stop();
return;
}
@ -135,7 +242,9 @@ export class BoardDiscovery extends CoreClientAware {
// protocols.
const portID = `${address}|${protocol}`;
const label = (detectedPort as any).getPort().getLabel();
const protocolLabel = (detectedPort as any).getPort().getProtocolLabel();
const protocolLabel = (detectedPort as any)
.getPort()
.getProtocolLabel();
const port = {
id: portID,
address,
@ -155,8 +264,10 @@ export class BoardDiscovery extends CoreClientAware {
if (eventType === 'add') {
if (newState[portID]) {
const [, knownBoards] = newState[portID];
console.warn(
`Port '${Port.toString(port)}' was already available. Known boards before override: ${JSON.stringify(
this.logger.warn(
`Port '${Port.toString(
port
)}' was already available. Known boards before override: ${JSON.stringify(
knownBoards
)}`
);
@ -164,7 +275,9 @@ export class BoardDiscovery extends CoreClientAware {
newState[portID] = [port, boards];
} else if (eventType === 'remove') {
if (!newState[portID]) {
console.warn(`Port '${Port.toString(port)}' was not available. Skipping`);
this.logger.warn(
`Port '${Port.toString(port)}' was not available. Skipping`
);
return;
}
delete newState[portID];
@ -189,7 +302,11 @@ export class BoardDiscovery extends CoreClientAware {
this.notificationService.notifyAttachedBoardsDidChange(event);
}
});
this.boardWatchDuplex.write(req);
await this.write(
new BoardListWatchRequest().setInstance(instance),
wrapper.stream
);
this.watching = true;
}
getAttachedBoards(state: AvailablePorts = this.state): Board[] {
@ -210,3 +327,6 @@ export class BoardDiscovery extends CoreClientAware {
return availablePorts;
}
}
export namespace BoardDiscovery {
export const StopWatchTimeout = 10_000;
}

View File

@ -414,7 +414,7 @@ export class BoardsServiceImpl
console.info('>>> Starting boards package installation...', item);
// stop the board discovery
await this.boardDiscovery.stopBoardListWatch(coreClient);
await this.boardDiscovery.stop();
const resp = client.platformInstall(req);
resp.on(
@ -426,7 +426,7 @@ export class BoardsServiceImpl
);
await new Promise<void>((resolve, reject) => {
resp.on('end', () => {
this.boardDiscovery.startBoardListWatch(coreClient);
this.boardDiscovery.start(); // TODO: remove discovery dependency from boards service. See https://github.com/arduino/arduino-ide/pull/1107 why this is here.
resolve();
});
resp.on('error', (error) => {
@ -465,7 +465,7 @@ export class BoardsServiceImpl
console.info('>>> Starting boards package uninstallation...', item);
// stop the board discovery
await this.boardDiscovery.stopBoardListWatch(coreClient);
await this.boardDiscovery.stop();
const resp = client.platformUninstall(req);
resp.on(
@ -477,7 +477,7 @@ export class BoardsServiceImpl
);
await new Promise<void>((resolve, reject) => {
resp.on('end', () => {
this.boardDiscovery.startBoardListWatch(coreClient);
this.boardDiscovery.start(); // TODO: remove discovery dependency from boards service. See https://github.com/arduino/arduino-ide/pull/1107 why this is here.
resolve();
});
resp.on('error', reject);

View File

@ -269,7 +269,7 @@ export class LibraryServiceImpl
console.info('>>> Starting library package installation...', item);
// stop the board discovery
await this.boardDiscovery.stopBoardListWatch(coreClient);
await this.boardDiscovery.stop();
const resp = client.libraryInstall(req);
resp.on(
@ -281,7 +281,7 @@ export class LibraryServiceImpl
);
await new Promise<void>((resolve, reject) => {
resp.on('end', () => {
this.boardDiscovery.startBoardListWatch(coreClient);
this.boardDiscovery.start(); // TODO: remove discovery dependency from boards service. See https://github.com/arduino/arduino-ide/pull/1107 why this is here.
resolve();
});
resp.on('error', (error) => {
@ -323,7 +323,7 @@ export class LibraryServiceImpl
}
// stop the board discovery
await this.boardDiscovery.stopBoardListWatch(coreClient);
await this.boardDiscovery.stop();
const resp = client.zipLibraryInstall(req);
resp.on(
@ -335,7 +335,7 @@ export class LibraryServiceImpl
);
await new Promise<void>((resolve, reject) => {
resp.on('end', () => {
this.boardDiscovery.startBoardListWatch(coreClient);
this.boardDiscovery.start(); // TODO: remove discovery dependency from boards service. See https://github.com/arduino/arduino-ide/pull/1107 why this is here.
resolve();
});
resp.on('error', reject);
@ -358,7 +358,7 @@ export class LibraryServiceImpl
console.info('>>> Starting library package uninstallation...', item);
// stop the board discovery
await this.boardDiscovery.stopBoardListWatch(coreClient);
await this.boardDiscovery.stop();
const resp = client.libraryUninstall(req);
resp.on(
@ -370,7 +370,7 @@ export class LibraryServiceImpl
);
await new Promise<void>((resolve, reject) => {
resp.on('end', () => {
this.boardDiscovery.startBoardListWatch(coreClient);
this.boardDiscovery.start(); // TODO: remove discovery dependency from boards service. See https://github.com/arduino/arduino-ide/pull/1107 why this is here.
resolve();
});
resp.on('error', reject);

View File

@ -2,6 +2,9 @@ import { Metadata, StatusObject } from '@grpc/grpc-js';
export type ServiceError = StatusObject & Error;
export namespace ServiceError {
export function isCancel(arg: unknown): arg is ServiceError & { code: 1 } {
return is(arg) && arg.code === 1; // https://grpc.github.io/grpc/core/md_doc_statuscodes.html
}
export function is(arg: unknown): arg is ServiceError {
return arg instanceof Error && isStatusObjet(arg);
}