From 116b3d598417d15120d5fb055d3cddc775579688 Mon Sep 17 00:00:00 2001 From: Silvano Cerza Date: Fri, 4 Mar 2022 18:00:34 +0100 Subject: [PATCH] Moved some interfaces --- .../src/node/arduino-ide-backend-module.ts | 6 +- .../src/node/monitor-service.ts | 327 ++++++++++++++++++ 2 files changed, 328 insertions(+), 5 deletions(-) create mode 100644 arduino-ide-extension/src/node/monitor-service.ts diff --git a/arduino-ide-extension/src/node/arduino-ide-backend-module.ts b/arduino-ide-extension/src/node/arduino-ide-backend-module.ts index 2e5e4b17..6b3f585c 100644 --- a/arduino-ide-extension/src/node/arduino-ide-backend-module.ts +++ b/arduino-ide-extension/src/node/arduino-ide-backend-module.ts @@ -86,11 +86,7 @@ import { ArduinoLocalizationContribution } from './arduino-localization-contribu import { LocalizationContribution } from '@theia/core/lib/node/i18n/localization-contribution'; import { MonitorManagerProxyImpl } from './monitor-manager-proxy-impl'; import { MonitorManager } from './monitor-manager'; -import { - MonitorManagerProxy, - MonitorManagerProxyClient, - MonitorManagerProxyPath, -} from '../common/monitor-manager-proxy'; +import { MonitorManagerProxy, MonitorManagerProxyClient, MonitorManagerProxyPath } from '../common/protocol/monitor-service'; export default new ContainerModule((bind, unbind, isBound, rebind) => { bind(BackendApplication).toSelf().inSingletonScope(); diff --git a/arduino-ide-extension/src/node/monitor-service.ts b/arduino-ide-extension/src/node/monitor-service.ts new file mode 100644 index 00000000..cbb408f0 --- /dev/null +++ b/arduino-ide-extension/src/node/monitor-service.ts @@ -0,0 +1,327 @@ +import { ClientDuplexStream } from "@grpc/grpc-js"; +import { Disposable, Emitter, ILogger } from "@theia/core"; +import { inject, named } from "@theia/core/shared/inversify"; +import { Board, Port, Status, MonitorSettings } from "../common/protocol"; +import { MonitorPortConfiguration, MonitorPortSetting, MonitorRequest, MonitorResponse } from "./cli-protocol/cc/arduino/cli/commands/v1/monitor_pb"; +import { CoreClientAware } from "./core-client-provider"; +import { WebSocketProvider } from "./web-socket/web-socket-provider"; +import { Port as gRPCPort } from 'arduino-ide-extension/src/node/cli-protocol/cc/arduino/cli/commands/v1/port_pb' +import WebSocketProviderImpl from "./web-socket/web-socket-provider-impl"; + +export class MonitorService extends CoreClientAware implements Disposable { + // Bidirectional gRPC stream used to receive and send data from the running + // pluggable monitor managed by the Arduino CLI. + protected duplex: ClientDuplexStream | null; + + // Settings used by the currently running pluggable monitor. + // They can be freely modified while running. + protected settings: MonitorSettings; + + // List of messages received from the running pluggable monitor. + // These are flushed from time to time to the frontend. + protected messages: string[] = []; + + // Handles messages received from the frontend via websocket. + protected onMessageReceived?: Disposable; + + // Sends messages to the frontend from time to time. + protected flushMessagesInterval?: NodeJS.Timeout; + + // Triggered each time the number of clients connected + // to the this service WebSocket changes. + protected onWSClientsNumberChanged?: Disposable; + + // Used to notify that the monitor is being disposed + protected readonly onDisposeEmitter = new Emitter(); + readonly onDispose = this.onDisposeEmitter.event; + + protected readonly webSocketProvider: WebSocketProvider = new WebSocketProviderImpl(); + + constructor( + @inject(ILogger) + @named("monitor-service") + protected readonly logger: ILogger, + + private readonly board: Board, + private readonly port: Port, + ) { + super(); + + this.onWSClientsNumberChanged = this.webSocketProvider.onClientsNumberChanged(async (clients: number) => { + if (clients === 0) { + // There are no more clients that want to receive + // data from this monitor, we can freely close + // and dispose it. + this.dispose(); + } + }); + } + + getWebsocketAddress(): number { + return this.webSocketProvider.getAddress().port; + } + + dispose(): void { + this.stop(); + this.onDisposeEmitter.fire(); + } + + /** + * isStarted is used to know if the currently running pluggable monitor is started. + * @returns true if pluggable monitor communication duplex is open, + * false in all other cases. + */ + isStarted(): boolean { + return !!this.duplex; + } + + /** + * Start and connects a monitor using currently set board and port. + * If a monitor is already started or board fqbn, port address and/or protocol + * are missing nothing happens. + * @returns a status to verify connection has been established. + */ + async start(): Promise { + if (this.duplex) { + return Status.ALREADY_CONNECTED; + } + + if (!this.board?.fqbn || !this.port?.address || !this.port?.protocol) { + return Status.CONFIG_MISSING + } + + this.logger.info("starting monitor"); + const coreClient = await this.coreClient(); + const { client, instance } = coreClient; + + this.duplex = client.monitor() + this.duplex + .on('close', () => { + this.logger.info(`monitor to ${this.port?.address} using ${this.port?.protocol} closed by client`) + }) + .on('end', () => { + this.logger.info(`monitor to ${this.port?.address} using ${this.port?.protocol} closed by server`) + }) + .on('error', (err: Error) => { + this.logger.error(err); + // TODO + // this.theiaFEClient?.notifyError() + }) + .on('data', ((res: MonitorResponse) => { + if (res.getError()) { + // TODO: Maybe disconnect + this.logger.error(res.getError()); + return; + } + const data = res.getRxData() + const message = + typeof data === 'string' ? data : new TextDecoder('utf8').decode(data); + this.messages.push(...splitLines(message)) + }).bind(this)); + + const req = new MonitorRequest(); + req.setInstance(instance); + if (this.board?.fqbn) { + req.setFqbn(this.board.fqbn) + } + if (this.port?.address && this.port?.protocol) { + const port = new gRPCPort() + port.setAddress(this.port.address); + port.setProtocol(this.port.protocol); + req.setPort(port); + } + const config = new MonitorPortConfiguration(); + for (const id in this.settings) { + const s = new MonitorPortSetting(); + s.setSettingId(id); + s.setValue(this.settings[id].selectedValue); + config.addSettings(s); + } + req.setPortConfiguration(config) + + const connect = new Promise(resolve => { + if (this.duplex?.write(req)) { + this.startMessagesHandlers(); + this.logger.info(`started monitor to ${this.port?.address} using ${this.port?.protocol}`) + resolve(Status.OK); + } + this.logger.warn(`failed starting monitor to ${this.port?.address} using ${this.port?.protocol}`) + resolve(Status.NOT_CONNECTED); + }); + + const connectTimeout = new Promise(resolve => { + setTimeout(async () => { + this.logger.warn(`timeout starting monitor to ${this.port?.address} using ${this.port?.protocol}`) + resolve(Status.NOT_CONNECTED); + }, 1000); + }); + // Try opening a monitor connection with a timeout + return await Promise.race([ + connect, + connectTimeout, + ]) + } + + /** + * Pauses the currently running monitor, it still closes the gRPC connection + * with the underlying monitor process but it doesn't stop the message handlers + * currently running. + * This is mainly used to handle upload when to the board/port combination + * the monitor is listening to. + * @returns + */ + async pause(): Promise { + return new Promise(resolve => { + if (!this.duplex) { + this.logger.warn(`monitor to ${this.port?.address} using ${this.port?.protocol} already stopped`) + return resolve(); + } + // It's enough to close the connection with the client + // to stop the monitor process + this.duplex.cancel(); + this.duplex = null; + this.logger.info(`stopped monitor to ${this.port?.address} using ${this.port?.protocol}`) + resolve(); + }) + } + + /** + * Stop the monitor currently running + */ + async stop(): Promise { + return this.pause().finally( + this.stopMessagesHandlers + ); + } + + /** + * Send a message to the running monitor, a well behaved monitor + * will then send that message to the board. + * We MUST NEVER send a message that wasn't a user's input to the board. + * @param message string sent to running monitor + * @returns a status to verify message has been sent. + */ + async send(message: string): Promise { + if (!this.duplex) { + return Status.NOT_CONNECTED; + } + const coreClient = await this.coreClient(); + const { instance } = coreClient; + + const req = new MonitorRequest(); + req.setInstance(instance); + req.setTxData(new TextEncoder().encode(message)); + return new Promise(resolve => { + if (this.duplex) { + this.duplex?.write(req, () => { + resolve(Status.OK); + }); + return; + } + this.stop().then(() => resolve(Status.NOT_CONNECTED)); + }) + } + + /** + * Set monitor settings, if there is a running monitor they'll be sent + * to it, otherwise they'll be used when starting one. + * Only values in settings parameter will be change, other values won't + * be changed in any way. + * @param settings map of monitor settings to change + * @returns a status to verify settings have been sent. + */ + async changeSettings(settings: MonitorSettings): Promise { + const config = new MonitorPortConfiguration(); + for (const id in settings) { + const s = new MonitorPortSetting(); + s.setSettingId(id); + s.setValue(settings[id].selectedValue); + config.addSettings(s); + this.settings[id] = settings[id]; + } + + if (!this.duplex) { + return Status.NOT_CONNECTED; + } + const coreClient = await this.coreClient(); + const { instance } = coreClient; + + const req = new MonitorRequest(); + req.setInstance(instance); + req.setPortConfiguration(config) + this.duplex.write(req); + return Status.OK + } + + /** + * Starts the necessary handlers to send and receive + * messages to and from the frontend and the running monitor + */ + private startMessagesHandlers(): void { + if (!this.flushMessagesInterval) { + const flushMessagesToFrontend = () => { + if (this.messages.length) { + this.webSocketProvider.sendMessage(JSON.stringify(this.messages)); + this.messages = []; + } + }; + this.flushMessagesInterval = setInterval(flushMessagesToFrontend, 32); + } + + if (!this.onMessageReceived) { + this.onMessageReceived = this.webSocketProvider.onMessageReceived( + (msg: string) => { + const message: SerialPlotter.Protocol.Message = JSON.parse(msg); + + switch (message.command) { + case SerialPlotter.Protocol.Command.PLOTTER_SEND_MESSAGE: + this.send(message.data); + break; + + case SerialPlotter.Protocol.Command.PLOTTER_SET_BAUDRATE: + this.theiaFEClient?.notifyBaudRateChanged( + parseInt(message.data, 10) as SerialConfig.BaudRate + ); + break; + + case SerialPlotter.Protocol.Command.PLOTTER_SET_LINE_ENDING: + this.theiaFEClient?.notifyLineEndingChanged(message.data); + break; + + case SerialPlotter.Protocol.Command.PLOTTER_SET_INTERPOLATE: + this.theiaFEClient?.notifyInterpolateChanged(message.data); + break; + + default: + break; + } + } + ) + } + } + + /** + * Stops the necessary handlers to send and receive messages to + * and from the frontend and the running monitor + */ + private stopMessagesHandlers(): void { + if (this.flushMessagesInterval) { + clearInterval(this.flushMessagesInterval); + this.flushMessagesInterval = undefined; + } + if (this.onMessageReceived) { + this.onMessageReceived.dispose(); + this.onMessageReceived = undefined; + } + } + +} + +/** + * Splits a string into an array without removing newline char. + * @param s string to split into lines + * @returns an lines array + */ +function splitLines(s: string): string[] { + return s.split(/(?<=\n)/); +}