Signed-off-by: Akos Kitta <a.kitta@arduino.cc>
This commit is contained in:
Akos Kitta 2022-07-13 14:29:23 +02:00 committed by Akos Kitta
parent aea550fe33
commit a0038315da
2 changed files with 81 additions and 47 deletions

View File

@ -14,13 +14,14 @@ import {
AvailablePorts, AvailablePorts,
AttachedBoardsChangeEvent, AttachedBoardsChangeEvent,
} from '../common/protocol'; } from '../common/protocol';
import { Emitter } from '@theia/core/lib/common/event'; import { Emitter, Event } from '@theia/core/lib/common/event';
import { DisposableCollection } from '@theia/core/lib/common/disposable'; import { DisposableCollection } from '@theia/core/lib/common/disposable';
import { Disposable } from '@theia/core/shared/vscode-languageserver-protocol'; import { Disposable } from '@theia/core/shared/vscode-languageserver-protocol';
import { ArduinoCoreServiceClient } from './cli-protocol/cc/arduino/cli/commands/v1/commands_grpc_pb'; import { ArduinoCoreServiceClient } from './cli-protocol/cc/arduino/cli/commands/v1/commands_grpc_pb';
import { v4 } from 'uuid'; import { v4 } from 'uuid';
import { ServiceError } from './service-error'; import { ServiceError } from './service-error';
import { BackendApplicationContribution } from '@theia/core/lib/node'; import { BackendApplicationContribution } from '@theia/core/lib/node';
import { Deferred } from '@theia/core/lib/common/promise-util';
type Duplex = ClientDuplexStream<BoardListWatchRequest, BoardListWatchResponse>; type Duplex = ClientDuplexStream<BoardListWatchRequest, BoardListWatchResponse>;
interface StreamWrapper extends Disposable { interface StreamWrapper extends Disposable {
@ -30,7 +31,8 @@ interface StreamWrapper extends Disposable {
/** /**
* Singleton service for tracking the available ports and board and broadcasting the * Singleton service for tracking the available ports and board and broadcasting the
* changes to all connected frontend instances. \ * changes to all connected frontend instances.
*
* Unlike other services, this is not connection scoped. * Unlike other services, this is not connection scoped.
*/ */
@injectable() @injectable()
@ -45,9 +47,8 @@ export class BoardDiscovery
@inject(NotificationServiceServer) @inject(NotificationServiceServer)
private readonly notificationService: NotificationServiceServer; private readonly notificationService: NotificationServiceServer;
// Used to know if the board watch process is already running to avoid private watching: Deferred<void> | undefined;
// starting it multiple times private stopping: Deferred<void> | undefined;
private watching: boolean;
private wrapper: StreamWrapper | undefined; private wrapper: StreamWrapper | undefined;
private readonly onStreamDidEndEmitter = new Emitter<void>(); // sent from the CLI when the discovery process is killed for example after the indexes update and the core client re-initialization. private readonly onStreamDidEndEmitter = new Emitter<void>(); // sent from the CLI when the discovery process is killed for example after the indexes update and the core client re-initialization.
private readonly onStreamDidCancelEmitter = new Emitter<void>(); // when the watcher is canceled by the IDE2 private readonly onStreamDidCancelEmitter = new Emitter<void>(); // when the watcher is canceled by the IDE2
@ -55,6 +56,7 @@ export class BoardDiscovery
/** /**
* Keys are the `address` of the ports. * Keys are the `address` of the ports.
*
* The `protocol` is ignored because the board detach event does not carry the protocol information, * The `protocol` is ignored because the board detach event does not carry the protocol information,
* just the address. * just the address.
* ```json * ```json
@ -64,46 +66,57 @@ export class BoardDiscovery
* } * }
* ``` * ```
*/ */
private _state: AvailablePorts = {}; private _availablePorts: AvailablePorts = {};
get state(): AvailablePorts { get availablePorts(): AvailablePorts {
return this._state; return this._availablePorts;
} }
onStart(): void { onStart(): void {
this.start(); this.start();
this.onClientDidRefresh(() => this.start()); this.onClientDidRefresh(() => this.restart());
}
private async restart(): Promise<void> {
this.logger.info('restarting before stop');
await this.stop();
this.logger.info('restarting after stop');
return this.start();
} }
onStop(): void { onStop(): void {
this.stop(); this.stop();
} }
stop(): Promise<void> { async stop(restart = false): Promise<void> {
this.logger.info('stop');
if (this.stopping) {
this.logger.info('stop already stopping');
return this.stopping.promise;
}
if (!this.watching) {
return;
}
this.stopping = new Deferred();
this.logger.info('>>> Stopping boards watcher...'); this.logger.info('>>> Stopping boards watcher...');
return new Promise<void>((resolve, reject) => { return new Promise<void>((resolve, reject) => {
const timeout = this.createTimeout( const timeout = this.createTimeout(10_000, reject);
BoardDiscovery.StopWatchTimeout,
reject
);
const toDispose = new DisposableCollection(); const toDispose = new DisposableCollection();
const waitForEvent = (event: Event<unknown>) =>
event(() => {
this.logger.info('stop received event: either end or cancel');
toDispose.dispose();
this.stopping?.resolve();
this.stopping = undefined;
this.logger.info('stop stopped');
resolve();
if (restart) {
this.start();
}
});
toDispose.pushAll([ toDispose.pushAll([
timeout, timeout,
this.onStreamDidEndEmitter.event(() => { waitForEvent(this.onStreamDidEndEmitter.event),
this.logger.info( waitForEvent(this.onStreamDidCancelEmitter.event),
`<<< Received the end event from the stream. Boards watcher has been successfully stopped.`
);
this.watching = false;
toDispose.dispose();
resolve();
}),
this.onStreamDidCancelEmitter.event(() => {
this.logger.info(
`<<< Received the cancel event from the stream. Boards watcher has been successfully stopped.`
);
this.watching = false;
toDispose.dispose();
resolve();
}),
]); ]);
this.logger.info('Canceling boards watcher...'); this.logger.info('Canceling boards watcher...');
this.toDisposeOnStopWatch.dispose(); this.toDisposeOnStopWatch.dispose();
@ -149,9 +162,14 @@ export class BoardDiscovery
} }
const stream = client const stream = client
.boardListWatch() .boardListWatch()
.on('end', () => this.onStreamDidEndEmitter.fire()) .on('end', () => {
this.logger.info('received end');
this.onStreamDidEndEmitter.fire();
})
.on('error', (error) => { .on('error', (error) => {
this.logger.info('error received');
if (ServiceError.isCancel(error)) { if (ServiceError.isCancel(error)) {
this.logger.info('cancel error received!');
this.onStreamDidCancelEmitter.fire(); this.onStreamDidCancelEmitter.fire();
} else { } else {
this.logger.error( this.logger.error(
@ -165,13 +183,21 @@ export class BoardDiscovery
stream, stream,
uuid: v4(), uuid: v4(),
dispose: () => { dispose: () => {
this.logger.info('disposing requesting cancel');
// Cancelling the stream will kill the discovery `builtin:mdns-discovery process`. // Cancelling the stream will kill the discovery `builtin:mdns-discovery process`.
// The client (this class) will receive a `{"eventType":"quit","error":""}` response from the CLI. // The client (this class) will receive a `{"eventType":"quit","error":""}` response from the CLI.
stream.cancel(); stream.cancel();
this.logger.info('disposing canceled');
this.wrapper = undefined; this.wrapper = undefined;
}, },
}; };
this.toDisposeOnStopWatch.pushAll([wrapper]); this.toDisposeOnStopWatch.pushAll([
wrapper,
Disposable.create(() => {
this.watching?.reject(new Error(`Stopping watcher.`));
this.watching = undefined;
}),
]);
return wrapper; return wrapper;
} }
@ -188,17 +214,25 @@ export class BoardDiscovery
} }
async start(): Promise<void> { async start(): Promise<void> {
if (this.watching) { this.logger.info('start');
// We want to avoid starting the board list watch process multiple if (this.stopping) {
// times to meet unforeseen consequences this.logger.info('start is stopping wait');
return; await this.stopping.promise;
this.logger.info('start stopped');
} }
if (this.watching) {
this.logger.info('start already watching');
return this.watching.promise;
}
this.watching = new Deferred();
this.logger.info('start new deferred');
const { client, instance } = await this.coreClient; const { client, instance } = await this.coreClient;
const wrapper = await this.createWrapper(client); const wrapper = await this.createWrapper(client);
wrapper.stream.on('data', async (resp: BoardListWatchResponse) => { wrapper.stream.on('data', async (resp: BoardListWatchResponse) => {
this.logger.info('onData', this.toJson(resp)); this.logger.info('onData', this.toJson(resp));
if (resp.getEventType() === 'quit') { if (resp.getEventType() === 'quit') {
await this.stop(); this.logger.info('quit received');
this.stop();
return; return;
} }
@ -217,8 +251,8 @@ export class BoardDiscovery
throw new Error(`Unexpected event type: '${resp.getEventType()}'`); throw new Error(`Unexpected event type: '${resp.getEventType()}'`);
} }
const oldState = deepClone(this._state); const oldState = deepClone(this._availablePorts);
const newState = deepClone(this._state); const newState = deepClone(this._availablePorts);
const address = (detectedPort as any).getPort().getAddress(); const address = (detectedPort as any).getPort().getAddress();
const protocol = (detectedPort as any).getPort().getProtocol(); const protocol = (detectedPort as any).getPort().getProtocol();
@ -286,18 +320,21 @@ export class BoardDiscovery
}, },
}; };
this._state = newState; this._availablePorts = newState;
this.notificationService.notifyAttachedBoardsDidChange(event); this.notificationService.notifyAttachedBoardsDidChange(event);
} }
}); });
this.logger.info('start request start watch');
await this.requestStartWatch( await this.requestStartWatch(
new BoardListWatchRequest().setInstance(instance), new BoardListWatchRequest().setInstance(instance),
wrapper.stream wrapper.stream
); );
this.watching = true; this.logger.info('start requested start watch');
this.watching.resolve();
this.logger.info('start resolved watching');
} }
getAttachedBoards(state: AvailablePorts = this.state): Board[] { getAttachedBoards(state: AvailablePorts = this.availablePorts): Board[] {
const attachedBoards: Board[] = []; const attachedBoards: Board[] = [];
for (const portID of Object.keys(state)) { for (const portID of Object.keys(state)) {
const [, boards] = state[portID]; const [, boards] = state[portID];
@ -306,7 +343,7 @@ export class BoardDiscovery
return attachedBoards; return attachedBoards;
} }
getAvailablePorts(state: AvailablePorts = this.state): Port[] { getAvailablePorts(state: AvailablePorts = this.availablePorts): Port[] {
const availablePorts: Port[] = []; const availablePorts: Port[] = [];
for (const portID of Object.keys(state)) { for (const portID of Object.keys(state)) {
const [port] = state[portID]; const [port] = state[portID];
@ -315,6 +352,3 @@ export class BoardDiscovery
return availablePorts; return availablePorts;
} }
} }
export namespace BoardDiscovery {
export const StopWatchTimeout = 10_000;
}

View File

@ -60,7 +60,7 @@ export class BoardsServiceImpl
protected readonly boardDiscovery: BoardDiscovery; protected readonly boardDiscovery: BoardDiscovery;
async getState(): Promise<AvailablePorts> { async getState(): Promise<AvailablePorts> {
return this.boardDiscovery.state; return this.boardDiscovery.availablePorts;
} }
async getAttachedBoards(): Promise<Board[]> { async getAttachedBoards(): Promise<Board[]> {