Moved some interfaces

This commit is contained in:
Silvano Cerza 2022-03-04 18:00:34 +01:00 committed by Alberto Iannaccone
parent 750796d3a0
commit 116b3d5984
2 changed files with 328 additions and 5 deletions

View File

@ -86,11 +86,7 @@ import { ArduinoLocalizationContribution } from './arduino-localization-contribu
import { LocalizationContribution } from '@theia/core/lib/node/i18n/localization-contribution';
import { MonitorManagerProxyImpl } from './monitor-manager-proxy-impl';
import { MonitorManager } from './monitor-manager';
import {
MonitorManagerProxy,
MonitorManagerProxyClient,
MonitorManagerProxyPath,
} from '../common/monitor-manager-proxy';
import { MonitorManagerProxy, MonitorManagerProxyClient, MonitorManagerProxyPath } from '../common/protocol/monitor-service';
export default new ContainerModule((bind, unbind, isBound, rebind) => {
bind(BackendApplication).toSelf().inSingletonScope();

View File

@ -0,0 +1,327 @@
import { ClientDuplexStream } from "@grpc/grpc-js";
import { Disposable, Emitter, ILogger } from "@theia/core";
import { inject, named } from "@theia/core/shared/inversify";
import { Board, Port, Status, MonitorSettings } from "../common/protocol";
import { MonitorPortConfiguration, MonitorPortSetting, MonitorRequest, MonitorResponse } from "./cli-protocol/cc/arduino/cli/commands/v1/monitor_pb";
import { CoreClientAware } from "./core-client-provider";
import { WebSocketProvider } from "./web-socket/web-socket-provider";
import { Port as gRPCPort } from 'arduino-ide-extension/src/node/cli-protocol/cc/arduino/cli/commands/v1/port_pb'
import WebSocketProviderImpl from "./web-socket/web-socket-provider-impl";
export class MonitorService extends CoreClientAware implements Disposable {
// Bidirectional gRPC stream used to receive and send data from the running
// pluggable monitor managed by the Arduino CLI.
protected duplex: ClientDuplexStream<MonitorRequest, MonitorResponse> | null;
// Settings used by the currently running pluggable monitor.
// They can be freely modified while running.
protected settings: MonitorSettings;
// List of messages received from the running pluggable monitor.
// These are flushed from time to time to the frontend.
protected messages: string[] = [];
// Handles messages received from the frontend via websocket.
protected onMessageReceived?: Disposable;
// Sends messages to the frontend from time to time.
protected flushMessagesInterval?: NodeJS.Timeout;
// Triggered each time the number of clients connected
// to the this service WebSocket changes.
protected onWSClientsNumberChanged?: Disposable;
// Used to notify that the monitor is being disposed
protected readonly onDisposeEmitter = new Emitter<void>();
readonly onDispose = this.onDisposeEmitter.event;
protected readonly webSocketProvider: WebSocketProvider = new WebSocketProviderImpl();
constructor(
@inject(ILogger)
@named("monitor-service")
protected readonly logger: ILogger,
private readonly board: Board,
private readonly port: Port,
) {
super();
this.onWSClientsNumberChanged = this.webSocketProvider.onClientsNumberChanged(async (clients: number) => {
if (clients === 0) {
// There are no more clients that want to receive
// data from this monitor, we can freely close
// and dispose it.
this.dispose();
}
});
}
getWebsocketAddress(): number {
return this.webSocketProvider.getAddress().port;
}
dispose(): void {
this.stop();
this.onDisposeEmitter.fire();
}
/**
* isStarted is used to know if the currently running pluggable monitor is started.
* @returns true if pluggable monitor communication duplex is open,
* false in all other cases.
*/
isStarted(): boolean {
return !!this.duplex;
}
/**
* Start and connects a monitor using currently set board and port.
* If a monitor is already started or board fqbn, port address and/or protocol
* are missing nothing happens.
* @returns a status to verify connection has been established.
*/
async start(): Promise<Status> {
if (this.duplex) {
return Status.ALREADY_CONNECTED;
}
if (!this.board?.fqbn || !this.port?.address || !this.port?.protocol) {
return Status.CONFIG_MISSING
}
this.logger.info("starting monitor");
const coreClient = await this.coreClient();
const { client, instance } = coreClient;
this.duplex = client.monitor()
this.duplex
.on('close', () => {
this.logger.info(`monitor to ${this.port?.address} using ${this.port?.protocol} closed by client`)
})
.on('end', () => {
this.logger.info(`monitor to ${this.port?.address} using ${this.port?.protocol} closed by server`)
})
.on('error', (err: Error) => {
this.logger.error(err);
// TODO
// this.theiaFEClient?.notifyError()
})
.on('data', ((res: MonitorResponse) => {
if (res.getError()) {
// TODO: Maybe disconnect
this.logger.error(res.getError());
return;
}
const data = res.getRxData()
const message =
typeof data === 'string' ? data : new TextDecoder('utf8').decode(data);
this.messages.push(...splitLines(message))
}).bind(this));
const req = new MonitorRequest();
req.setInstance(instance);
if (this.board?.fqbn) {
req.setFqbn(this.board.fqbn)
}
if (this.port?.address && this.port?.protocol) {
const port = new gRPCPort()
port.setAddress(this.port.address);
port.setProtocol(this.port.protocol);
req.setPort(port);
}
const config = new MonitorPortConfiguration();
for (const id in this.settings) {
const s = new MonitorPortSetting();
s.setSettingId(id);
s.setValue(this.settings[id].selectedValue);
config.addSettings(s);
}
req.setPortConfiguration(config)
const connect = new Promise<Status>(resolve => {
if (this.duplex?.write(req)) {
this.startMessagesHandlers();
this.logger.info(`started monitor to ${this.port?.address} using ${this.port?.protocol}`)
resolve(Status.OK);
}
this.logger.warn(`failed starting monitor to ${this.port?.address} using ${this.port?.protocol}`)
resolve(Status.NOT_CONNECTED);
});
const connectTimeout = new Promise<Status>(resolve => {
setTimeout(async () => {
this.logger.warn(`timeout starting monitor to ${this.port?.address} using ${this.port?.protocol}`)
resolve(Status.NOT_CONNECTED);
}, 1000);
});
// Try opening a monitor connection with a timeout
return await Promise.race([
connect,
connectTimeout,
])
}
/**
* Pauses the currently running monitor, it still closes the gRPC connection
* with the underlying monitor process but it doesn't stop the message handlers
* currently running.
* This is mainly used to handle upload when to the board/port combination
* the monitor is listening to.
* @returns
*/
async pause(): Promise<void> {
return new Promise(resolve => {
if (!this.duplex) {
this.logger.warn(`monitor to ${this.port?.address} using ${this.port?.protocol} already stopped`)
return resolve();
}
// It's enough to close the connection with the client
// to stop the monitor process
this.duplex.cancel();
this.duplex = null;
this.logger.info(`stopped monitor to ${this.port?.address} using ${this.port?.protocol}`)
resolve();
})
}
/**
* Stop the monitor currently running
*/
async stop(): Promise<void> {
return this.pause().finally(
this.stopMessagesHandlers
);
}
/**
* Send a message to the running monitor, a well behaved monitor
* will then send that message to the board.
* We MUST NEVER send a message that wasn't a user's input to the board.
* @param message string sent to running monitor
* @returns a status to verify message has been sent.
*/
async send(message: string): Promise<Status> {
if (!this.duplex) {
return Status.NOT_CONNECTED;
}
const coreClient = await this.coreClient();
const { instance } = coreClient;
const req = new MonitorRequest();
req.setInstance(instance);
req.setTxData(new TextEncoder().encode(message));
return new Promise<Status>(resolve => {
if (this.duplex) {
this.duplex?.write(req, () => {
resolve(Status.OK);
});
return;
}
this.stop().then(() => resolve(Status.NOT_CONNECTED));
})
}
/**
* Set monitor settings, if there is a running monitor they'll be sent
* to it, otherwise they'll be used when starting one.
* Only values in settings parameter will be change, other values won't
* be changed in any way.
* @param settings map of monitor settings to change
* @returns a status to verify settings have been sent.
*/
async changeSettings(settings: MonitorSettings): Promise<Status> {
const config = new MonitorPortConfiguration();
for (const id in settings) {
const s = new MonitorPortSetting();
s.setSettingId(id);
s.setValue(settings[id].selectedValue);
config.addSettings(s);
this.settings[id] = settings[id];
}
if (!this.duplex) {
return Status.NOT_CONNECTED;
}
const coreClient = await this.coreClient();
const { instance } = coreClient;
const req = new MonitorRequest();
req.setInstance(instance);
req.setPortConfiguration(config)
this.duplex.write(req);
return Status.OK
}
/**
* Starts the necessary handlers to send and receive
* messages to and from the frontend and the running monitor
*/
private startMessagesHandlers(): void {
if (!this.flushMessagesInterval) {
const flushMessagesToFrontend = () => {
if (this.messages.length) {
this.webSocketProvider.sendMessage(JSON.stringify(this.messages));
this.messages = [];
}
};
this.flushMessagesInterval = setInterval(flushMessagesToFrontend, 32);
}
if (!this.onMessageReceived) {
this.onMessageReceived = this.webSocketProvider.onMessageReceived(
(msg: string) => {
const message: SerialPlotter.Protocol.Message = JSON.parse(msg);
switch (message.command) {
case SerialPlotter.Protocol.Command.PLOTTER_SEND_MESSAGE:
this.send(message.data);
break;
case SerialPlotter.Protocol.Command.PLOTTER_SET_BAUDRATE:
this.theiaFEClient?.notifyBaudRateChanged(
parseInt(message.data, 10) as SerialConfig.BaudRate
);
break;
case SerialPlotter.Protocol.Command.PLOTTER_SET_LINE_ENDING:
this.theiaFEClient?.notifyLineEndingChanged(message.data);
break;
case SerialPlotter.Protocol.Command.PLOTTER_SET_INTERPOLATE:
this.theiaFEClient?.notifyInterpolateChanged(message.data);
break;
default:
break;
}
}
)
}
}
/**
* Stops the necessary handlers to send and receive messages to
* and from the frontend and the running monitor
*/
private stopMessagesHandlers(): void {
if (this.flushMessagesInterval) {
clearInterval(this.flushMessagesInterval);
this.flushMessagesInterval = undefined;
}
if (this.onMessageReceived) {
this.onMessageReceived.dispose();
this.onMessageReceived = undefined;
}
}
}
/**
* Splits a string into an array without removing newline char.
* @param s string to split into lines
* @returns an lines array
*/
function splitLines(s: string): string[] {
return s.split(/(?<=\n)/);
}