fix monitor connection

This commit is contained in:
Alberto Iannaccone 2022-04-08 15:42:52 +02:00
parent fbe8fb421a
commit eff960bb7f
5 changed files with 737 additions and 643 deletions

View File

@ -1,103 +1,123 @@
import { Emitter, MessageService } from "@theia/core"; import { Emitter, MessageService } from '@theia/core';
import { inject, injectable } from "@theia/core/shared/inversify"; import { inject, injectable } from '@theia/core/shared/inversify';
import { Board, Port } from "../common/protocol"; import { Board, Port } from '../common/protocol';
import { Monitor, MonitorManagerProxyClient, MonitorManagerProxyFactory, MonitorSettings } from "../common/protocol/monitor-service"; import {
Monitor,
MonitorManagerProxyClient,
MonitorManagerProxyFactory,
MonitorSettings,
} from '../common/protocol/monitor-service';
@injectable() @injectable()
export class MonitorManagerProxyClientImpl implements MonitorManagerProxyClient { export class MonitorManagerProxyClientImpl
// When pluggable monitor messages are received from the backend implements MonitorManagerProxyClient
// this event is triggered. {
// Ideally a frontend component is connected to this event // When pluggable monitor messages are received from the backend
// to update the UI. // this event is triggered.
protected readonly onMessagesReceivedEmitter = new Emitter<{ messages: string[] }>(); // Ideally a frontend component is connected to this event
readonly onMessagesReceived = this.onMessagesReceivedEmitter.event; // to update the UI.
protected readonly onMessagesReceivedEmitter = new Emitter<{
messages: string[];
}>();
readonly onMessagesReceived = this.onMessagesReceivedEmitter.event;
// WebSocket used to handle pluggable monitor communication between protected readonly onWSConnectionChangedEmitter = new Emitter<boolean>();
// frontend and backend. readonly onWSConnectionChanged = this.onWSConnectionChangedEmitter.event;
private webSocket?: WebSocket;
private wsPort?: number;
getWebSocketPort(): number | undefined { // WebSocket used to handle pluggable monitor communication between
return this.wsPort; // frontend and backend.
private webSocket?: WebSocket;
private wsPort?: number;
getWebSocketPort(): number | undefined {
return this.wsPort;
}
constructor(
@inject(MessageService)
protected messageService: MessageService,
// This is necessary to call the backend methods from the frontend
@inject(MonitorManagerProxyFactory)
protected server: MonitorManagerProxyFactory
) {}
/**
* Connects a localhost WebSocket using the specified port.
* @param addressPort port of the WebSocket
*/
connect(addressPort: number): void {
if (this.webSocket) {
return;
}
try {
this.webSocket = new WebSocket(`ws://localhost:${addressPort}`);
this.onWSConnectionChangedEmitter.fire(true);
} catch {
this.messageService.error('Unable to connect to websocket');
return;
} }
constructor( this.webSocket.onmessage = (res) => {
@inject(MessageService) const messages = JSON.parse(res.data);
protected messageService: MessageService, this.onMessagesReceivedEmitter.fire({ messages });
};
this.wsPort = addressPort;
}
// This is necessary to call the backend methods from the frontend /**
@inject(MonitorManagerProxyFactory) * Disconnects the WebSocket if connected.
protected server: MonitorManagerProxyFactory */
) { disconnect(): void {
try {
this.webSocket?.close();
this.webSocket = undefined;
this.onWSConnectionChangedEmitter.fire(false);
} catch {
this.messageService.error('Unable to close websocket');
}
}
async isWSConnected(): Promise<boolean> {
return !!this.webSocket;
}
async startMonitor(
board: Board,
port: Port,
settings?: MonitorSettings
): Promise<void> {
return this.server().startMonitor(board, port, settings);
}
getCurrentSettings(board: Board, port: Port): MonitorSettings {
return this.server().getCurrentSettings(board, port);
}
send(message: string): void {
if (!this.webSocket) {
return;
} }
/** this.webSocket.send(
* Connects a localhost WebSocket using the specified port. JSON.stringify({
* @param addressPort port of the WebSocket command: Monitor.Command.SEND_MESSAGE,
*/ data: message,
connect(addressPort: number): void { })
if (this.webSocket) { );
return; }
}
try {
this.webSocket = new WebSocket(`ws://localhost:${addressPort}`);
} catch {
this.messageService.error('Unable to connect to websocket');
return;
}
this.webSocket.onmessage = (res) => { changeSettings(settings: MonitorSettings): void {
const messages = JSON.parse(res.data); if (!this.webSocket) {
this.onMessagesReceivedEmitter.fire({ messages }); return;
}
this.wsPort = addressPort;
} }
/** this.webSocket.send(
* Disconnects the WebSocket if connected. JSON.stringify({
*/ command: Monitor.Command.CHANGE_SETTINGS,
disconnect(): void { // TODO: This might be wrong, verify if it works
try { data: settings,
this.webSocket?.close(); })
this.webSocket = undefined; );
} catch { }
this.messageService.error('Unable to close websocket');
}
}
async isWSConnected(): Promise<boolean> {
return !!this.webSocket;
}
async startMonitor(board: Board, port: Port, settings?: MonitorSettings): Promise<void> {
return this.server().startMonitor(board, port, settings);
}
getCurrentSettings(board: Board, port: Port): MonitorSettings {
return this.server().getCurrentSettings(board, port);
}
send(message: string): void {
if (!this.webSocket) {
return;
}
this.webSocket.send(JSON.stringify({
command: Monitor.Command.SEND_MESSAGE,
data: message,
}));
}
changeSettings(settings: MonitorSettings): void {
if (!this.webSocket) {
return;
}
this.webSocket.send(JSON.stringify({
command: Monitor.Command.CHANGE_SETTINGS,
// TODO: This might be wrong, verify if it works
data: settings,
}));
}
} }

View File

@ -5,6 +5,7 @@ import { isOSX } from '@theia/core/lib/common/os';
import { DisposableCollection, nls } from '@theia/core/lib/common'; import { DisposableCollection, nls } from '@theia/core/lib/common';
import { MonitorManagerProxyClient } from '../../../common/protocol'; import { MonitorManagerProxyClient } from '../../../common/protocol';
import { BoardsServiceProvider } from '../../boards/boards-service-provider'; import { BoardsServiceProvider } from '../../boards/boards-service-provider';
import { timeout } from '@theia/core/lib/common/promise-util';
export namespace SerialMonitorSendInput { export namespace SerialMonitorSendInput {
export interface Props { export interface Props {
@ -27,16 +28,33 @@ export class SerialMonitorSendInput extends React.Component<
constructor(props: Readonly<SerialMonitorSendInput.Props>) { constructor(props: Readonly<SerialMonitorSendInput.Props>) {
super(props); super(props);
this.state = { text: '', connected: false }; this.state = { text: '', connected: true };
this.onChange = this.onChange.bind(this); this.onChange = this.onChange.bind(this);
this.onSend = this.onSend.bind(this); this.onSend = this.onSend.bind(this);
this.onKeyDown = this.onKeyDown.bind(this); this.onKeyDown = this.onKeyDown.bind(this);
} }
componentDidMount(): void { componentDidMount(): void {
this.props.monitorManagerProxy.isWSConnected().then((connected) => { this.setState({ connected: true });
this.setState({ connected });
const checkWSConnection = new Promise<boolean>((resolve) => {
this.props.monitorManagerProxy.onWSConnectionChanged((connected) => {
this.setState({ connected });
resolve(true);
});
}); });
const checkWSTimeout = timeout(1000).then(() => false);
Promise.race<boolean>([checkWSConnection, checkWSTimeout]).then(
async (resolved) => {
if (!resolved) {
const connected =
await this.props.monitorManagerProxy.isWSConnected();
this.setState({ connected });
}
}
);
} }
componentWillUnmount(): void { componentWillUnmount(): void {
@ -49,7 +67,7 @@ export class SerialMonitorSendInput extends React.Component<
<input <input
ref={this.setRef} ref={this.setRef}
type="text" type="text"
className={`theia-input ${this.state.connected ? '' : 'warning'}`} className={`theia-input ${this.shouldShowWarning() ? 'warning' : ''}`}
placeholder={this.placeholder} placeholder={this.placeholder}
value={this.state.text} value={this.state.text}
onChange={this.onChange} onChange={this.onChange}
@ -58,16 +76,22 @@ export class SerialMonitorSendInput extends React.Component<
); );
} }
protected get placeholder(): string { protected shouldShowWarning(): boolean {
const board = this.props.boardsServiceProvider.boardsConfig.selectedBoard; const board = this.props.boardsServiceProvider.boardsConfig.selectedBoard;
const port = this.props.boardsServiceProvider.boardsConfig.selectedPort; const port = this.props.boardsServiceProvider.boardsConfig.selectedPort;
if (!this.state.connected || !board || !port) { return !this.state.connected || !board || !port;
}
protected get placeholder(): string {
if (this.shouldShowWarning()) {
return nls.localize( return nls.localize(
'arduino/serial/notConnected', 'arduino/serial/notConnected',
'Not connected. Select a board and a port to connect automatically.' 'Not connected. Select a board and a port to connect automatically.'
); );
} }
const board = this.props.boardsServiceProvider.boardsConfig.selectedBoard;
const port = this.props.boardsServiceProvider.boardsConfig.selectedPort;
return nls.localize( return nls.localize(
'arduino/serial/message', 'arduino/serial/message',
"Message ({0} + Enter to send message to '{1}' on '{2}')", "Message ({0} + Enter to send message to '{1}' on '{2}')",

View File

@ -1,4 +1,4 @@
import { Event, JsonRpcServer } from "@theia/core"; import { Event, JsonRpcServer } from '@theia/core';
import { Board, Port } from './boards-service'; import { Board, Port } from './boards-service';
export const MonitorManagerProxyFactory = Symbol('MonitorManagerProxyFactory'); export const MonitorManagerProxyFactory = Symbol('MonitorManagerProxyFactory');
@ -6,68 +6,82 @@ export type MonitorManagerProxyFactory = () => MonitorManagerProxy;
export const MonitorManagerProxyPath = '/services/monitor-manager-proxy'; export const MonitorManagerProxyPath = '/services/monitor-manager-proxy';
export const MonitorManagerProxy = Symbol('MonitorManagerProxy'); export const MonitorManagerProxy = Symbol('MonitorManagerProxy');
export interface MonitorManagerProxy extends JsonRpcServer<MonitorManagerProxyClient> { export interface MonitorManagerProxy
startMonitor(board: Board, port: Port, settings?: MonitorSettings): Promise<void>; extends JsonRpcServer<MonitorManagerProxyClient> {
changeMonitorSettings(board: Board, port: Port, settings: MonitorSettings): Promise<void>; startMonitor(
stopMonitor(board: Board, port: Port): Promise<void>; board: Board,
getCurrentSettings(board: Board, port: Port): MonitorSettings; port: Port,
settings?: MonitorSettings
): Promise<void>;
changeMonitorSettings(
board: Board,
port: Port,
settings: MonitorSettings
): Promise<void>;
stopMonitor(board: Board, port: Port): Promise<void>;
getCurrentSettings(board: Board, port: Port): MonitorSettings;
} }
export const MonitorManagerProxyClient = Symbol('MonitorManagerProxyClient'); export const MonitorManagerProxyClient = Symbol('MonitorManagerProxyClient');
export interface MonitorManagerProxyClient { export interface MonitorManagerProxyClient {
onMessagesReceived: Event<{ messages: string[] }>; onMessagesReceived: Event<{ messages: string[] }>;
connect(addressPort: number): void; onWSConnectionChanged: Event<boolean>;
disconnect(): void; connect(addressPort: number): void;
getWebSocketPort(): number | undefined; disconnect(): void;
isWSConnected(): Promise<boolean>; getWebSocketPort(): number | undefined;
startMonitor(board: Board, port: Port, settings?: MonitorSettings): Promise<void>; isWSConnected(): Promise<boolean>;
getCurrentSettings(board: Board, port: Port): MonitorSettings; startMonitor(
send(message: string): void; board: Board,
changeSettings(settings: MonitorSettings): void port: Port,
settings?: MonitorSettings
): Promise<void>;
getCurrentSettings(board: Board, port: Port): MonitorSettings;
send(message: string): void;
changeSettings(settings: MonitorSettings): void;
} }
export interface MonitorSetting { export interface MonitorSetting {
// The setting identifier // The setting identifier
readonly id: string; readonly id: string;
// A human-readable label of the setting (to be displayed on the GUI) // A human-readable label of the setting (to be displayed on the GUI)
readonly label: string; readonly label: string;
// The setting type (at the moment only "enum" is avaiable) // The setting type (at the moment only "enum" is avaiable)
readonly type: string; readonly type: string;
// The values allowed on "enum" types // The values allowed on "enum" types
readonly values: string[]; readonly values: string[];
// The selected value // The selected value
selectedValue: string; selectedValue: string;
} }
export type MonitorSettings = Record<string, MonitorSetting>; export type MonitorSettings = Record<string, MonitorSetting>;
export namespace Monitor { export namespace Monitor {
export enum Command { export enum Command {
SEND_MESSAGE = 'MONITOR_SEND_MESSAGE', SEND_MESSAGE = 'MONITOR_SEND_MESSAGE',
CHANGE_SETTINGS = 'MONITOR_CHANGE_SETTINGS', CHANGE_SETTINGS = 'MONITOR_CHANGE_SETTINGS',
} }
export type Message = { export type Message = {
command: Monitor.Command, command: Monitor.Command;
data: string; data: string;
} };
} }
export interface Status { } export interface Status {}
export type OK = Status; export type OK = Status;
export interface ErrorStatus extends Status { export interface ErrorStatus extends Status {
readonly message: string; readonly message: string;
} }
export namespace Status { export namespace Status {
export function isOK(status: Status & { message?: string }): status is OK { export function isOK(status: Status & { message?: string }): status is OK {
return !!status && typeof status.message !== 'string'; return !!status && typeof status.message !== 'string';
} }
export const OK: OK = {}; export const OK: OK = {};
export const NOT_CONNECTED: ErrorStatus = { message: 'Not connected.' }; export const NOT_CONNECTED: ErrorStatus = { message: 'Not connected.' };
export const ALREADY_CONNECTED: ErrorStatus = { export const ALREADY_CONNECTED: ErrorStatus = {
message: 'Already connected.', message: 'Already connected.',
}; };
export const CONFIG_MISSING: ErrorStatus = { export const CONFIG_MISSING: ErrorStatus = {
message: 'Serial Config missing.', message: 'Serial Config missing.',
}; };
} }

View File

@ -1,8 +1,8 @@
import { ILogger } from "@theia/core"; import { ILogger } from '@theia/core';
import { inject, injectable, named } from "@theia/core/shared/inversify"; import { inject, injectable, named } from '@theia/core/shared/inversify';
import { Board, Port, Status, MonitorSettings } from "../common/protocol"; import { Board, Port, Status, MonitorSettings } from '../common/protocol';
import { CoreClientAware } from "./core-client-provider"; import { CoreClientAware } from './core-client-provider';
import { MonitorService } from "./monitor-service"; import { MonitorService } from './monitor-service';
type MonitorID = string; type MonitorID = string;
@ -10,191 +10,194 @@ export const MonitorManagerName = 'monitor-manager';
@injectable() @injectable()
export class MonitorManager extends CoreClientAware { export class MonitorManager extends CoreClientAware {
// Map of monitor services that manage the running pluggable monitors. // Map of monitor services that manage the running pluggable monitors.
// Each service handles the lifetime of one, and only one, monitor. // Each service handles the lifetime of one, and only one, monitor.
// If either the board or port managed changes a new service must // If either the board or port managed changes a new service must
// be started. // be started.
private monitorServices = new Map<MonitorID, MonitorService>(); private monitorServices = new Map<MonitorID, MonitorService>();
constructor( constructor(
@inject(ILogger) @inject(ILogger)
@named(MonitorManagerName) @named(MonitorManagerName)
protected readonly logger: ILogger, protected readonly logger: ILogger
) { ) {
super(); super();
} }
/** /**
* Used to know if a monitor is started * Used to know if a monitor is started
* @param board board connected to port * @param board board connected to port
* @param port port to monitor * @param port port to monitor
* @returns true if the monitor is currently monitoring the board/port * @returns true if the monitor is currently monitoring the board/port
* combination specifed, false in all other cases. * combination specifed, false in all other cases.
*/ */
isStarted(board: Board, port: Port): boolean { isStarted(board: Board, port: Port): boolean {
const monitorID = this.monitorID(board, port); const monitorID = this.monitorID(board, port);
const monitor = this.monitorServices.get(monitorID); const monitor = this.monitorServices.get(monitorID);
if (monitor) { if (monitor) {
return monitor.isStarted(); return monitor.isStarted();
}
return false;
} }
return false;
}
/** /**
* Start a pluggable monitor that receives and sends messages * Start a pluggable monitor that receives and sends messages
* to the specified board and port combination. * to the specified board and port combination.
* @param board board connected to port * @param board board connected to port
* @param port port to monitor * @param port port to monitor
* @returns a Status object to know if the process has been * @returns a Status object to know if the process has been
* started or if there have been errors. * started or if there have been errors.
*/ */
async startMonitor(board: Board, port: Port): Promise<Status> { async startMonitor(board: Board, port: Port): Promise<Status> {
const monitorID = this.monitorID(board, port); const monitorID = this.monitorID(board, port);
let monitor = this.monitorServices.get(monitorID); let monitor = this.monitorServices.get(monitorID);
if (!monitor) { if (!monitor) {
monitor = this.createMonitor(board, port) monitor = this.createMonitor(board, port);
}
return await monitor.start();
} }
return await monitor.start();
}
/** /**
* Stop a pluggable monitor connected to the specified board/port * Stop a pluggable monitor connected to the specified board/port
* combination. It's a noop if monitor is not running. * combination. It's a noop if monitor is not running.
* @param board board connected to port * @param board board connected to port
* @param port port monitored * @param port port monitored
*/ */
async stopMonitor(board: Board, port: Port): Promise<void> { async stopMonitor(board: Board, port: Port): Promise<void> {
const monitorID = this.monitorID(board, port); const monitorID = this.monitorID(board, port);
const monitor = this.monitorServices.get(monitorID); const monitor = this.monitorServices.get(monitorID);
if (!monitor) { if (!monitor) {
// There's no monitor to stop, bail // There's no monitor to stop, bail
return; return;
}
return await monitor.stop();
} }
return await monitor.stop();
}
/** /**
* Returns the port of the WebSocket used by the MonitorService * Returns the port of the WebSocket used by the MonitorService
* that is handling the board/port combination * that is handling the board/port combination
* @param board board connected to port * @param board board connected to port
* @param port port to monitor * @param port port to monitor
* @returns port of the MonitorService's WebSocket * @returns port of the MonitorService's WebSocket
*/ */
getWebsocketAddressPort(board: Board, port: Port): number { getWebsocketAddressPort(board: Board, port: Port): number {
const monitorID = this.monitorID(board, port); const monitorID = this.monitorID(board, port);
const monitor = this.monitorServices.get(monitorID); const monitor = this.monitorServices.get(monitorID);
if (!monitor) { if (!monitor) {
return -1; return -1;
}
return monitor.getWebsocketAddressPort();
} }
return monitor.getWebsocketAddressPort();
}
/** /**
* Notifies the monitor service of that board/port combination * Notifies the monitor service of that board/port combination
* that an upload process started on that exact board/port combination. * that an upload process started on that exact board/port combination.
* This must be done so that we can stop the monitor for the time being * This must be done so that we can stop the monitor for the time being
* until the upload process finished. * until the upload process finished.
* @param board board connected to port * @param board board connected to port
* @param port port to monitor * @param port port to monitor
*/ */
async notifyUploadStarted(board?: Board, port?: Port): Promise<void> { async notifyUploadStarted(board?: Board, port?: Port): Promise<void> {
if (!board || !port) { if (!board || !port) {
// We have no way of knowing which monitor // We have no way of knowing which monitor
// to retrieve if we don't have this information. // to retrieve if we don't have this information.
return; return;
}
const monitorID = this.monitorID(board, port);
const monitor = this.monitorServices.get(monitorID);
if (!monitor) {
// There's no monitor running there, bail
return;
}
return await monitor.pause();
} }
const monitorID = this.monitorID(board, port);
const monitor = this.monitorServices.get(monitorID);
if (!monitor) {
// There's no monitor running there, bail
return;
}
return await monitor.pause();
}
/** /**
* Notifies the monitor service of that board/port combination * Notifies the monitor service of that board/port combination
* that an upload process started on that exact board/port combination. * that an upload process started on that exact board/port combination.
* @param board board connected to port * @param board board connected to port
* @param port port to monitor * @param port port to monitor
* @returns a Status object to know if the process has been * @returns a Status object to know if the process has been
* started or if there have been errors. * started or if there have been errors.
*/ */
async notifyUploadFinished(board?: Board, port?: Port): Promise<Status> { async notifyUploadFinished(board?: Board, port?: Port): Promise<Status> {
if (!board || !port) { if (!board || !port) {
// We have no way of knowing which monitor // We have no way of knowing which monitor
// to retrieve if we don't have this information. // to retrieve if we don't have this information.
return Status.NOT_CONNECTED; return Status.NOT_CONNECTED;
}
const monitorID = this.monitorID(board, port);
const monitor = this.monitorServices.get(monitorID);
if (!monitor) {
// There's no monitor running there, bail
return Status.NOT_CONNECTED;
}
return await monitor.start();
} }
const monitorID = this.monitorID(board, port);
const monitor = this.monitorServices.get(monitorID);
if (!monitor) {
// There's no monitor running there, bail
return Status.NOT_CONNECTED;
}
return await monitor.start();
}
/** /**
* Changes the settings of a pluggable monitor even if it's running. * Changes the settings of a pluggable monitor even if it's running.
* If monitor is not running they're going to be used as soon as it's started. * If monitor is not running they're going to be used as soon as it's started.
* @param board board connected to port * @param board board connected to port
* @param port port to monitor * @param port port to monitor
* @param settings monitor settings to change * @param settings monitor settings to change
*/ */
changeMonitorSettings(board: Board, port: Port, settings: MonitorSettings) { changeMonitorSettings(board: Board, port: Port, settings: MonitorSettings) {
const monitorID = this.monitorID(board, port); const monitorID = this.monitorID(board, port);
let monitor = this.monitorServices.get(monitorID); let monitor = this.monitorServices.get(monitorID);
if (!monitor) { if (!monitor) {
monitor = this.createMonitor(board, port) monitor = this.createMonitor(board, port);
monitor.changeSettings(settings); monitor.changeSettings(settings);
}
} }
}
/** /**
* Returns the settings currently used by the pluggable monitor * Returns the settings currently used by the pluggable monitor
* that's communicating with the specified board/port combination. * that's communicating with the specified board/port combination.
* @param board board connected to port * @param board board connected to port
* @param port port monitored * @param port port monitored
* @returns map of current monitor settings * @returns map of current monitor settings
*/ */
currentMonitorSettings(board: Board, port: Port): MonitorSettings { currentMonitorSettings(board: Board, port: Port): MonitorSettings {
const monitorID = this.monitorID(board, port); const monitorID = this.monitorID(board, port);
const monitor = this.monitorServices.get(monitorID); const monitor = this.monitorServices.get(monitorID);
if (!monitor) { if (!monitor) {
return {}; return {};
}
return monitor.currentSettings();
} }
return monitor.currentSettings();
}
/** /**
* Creates a MonitorService that handles the lifetime and the * Creates a MonitorService that handles the lifetime and the
* communication via WebSocket with the frontend. * communication via WebSocket with the frontend.
* @param board board connected to specified port * @param board board connected to specified port
* @param port port to monitor * @param port port to monitor
* @returns a new instance of MonitorService ready to use. * @returns a new instance of MonitorService ready to use.
*/ */
private createMonitor(board: Board, port: Port): MonitorService { private createMonitor(board: Board, port: Port): MonitorService {
const monitorID = this.monitorID(board, port); const monitorID = this.monitorID(board, port);
const monitor = new MonitorService( const monitor = new MonitorService(
this.logger, this.logger,
board, board,
port, port,
this.coreClientProvider, this.coreClientProvider
); );
monitor.onDispose((() => { this.monitorServices.set(monitorID, monitor);
this.monitorServices.delete(monitorID); monitor.onDispose(
}).bind(this)); (() => {
return monitor this.monitorServices.delete(monitorID);
} }).bind(this)
);
return monitor;
}
/** /**
* Utility function to create a unique ID for a monitor service. * Utility function to create a unique ID for a monitor service.
* @param board * @param board
* @param port * @param port
* @returns a unique monitor ID * @returns a unique monitor ID
*/ */
private monitorID(board: Board, port: Port): MonitorID { private monitorID(board: Board, port: Port): MonitorID {
return `${board.fqbn}-${port.address}-${port.protocol}`; return `${board.fqbn}-${port.address}-${port.protocol}`;
} }
} }

View File

@ -1,365 +1,398 @@
import { ClientDuplexStream } from "@grpc/grpc-js"; import { ClientDuplexStream } from '@grpc/grpc-js';
import { Disposable, Emitter, ILogger } from "@theia/core"; import { Disposable, Emitter, ILogger } from '@theia/core';
import { inject, named } from "@theia/core/shared/inversify"; import { inject, named } from '@theia/core/shared/inversify';
import { Board, Port, Status, MonitorSettings, Monitor } from "../common/protocol"; import {
import { EnumerateMonitorPortSettingsRequest, EnumerateMonitorPortSettingsResponse, MonitorPortConfiguration, MonitorPortSetting, MonitorRequest, MonitorResponse } from "./cli-protocol/cc/arduino/cli/commands/v1/monitor_pb"; Board,
import { CoreClientAware, CoreClientProvider } from "./core-client-provider"; Port,
import { WebSocketProvider } from "./web-socket/web-socket-provider"; Status,
import { Port as gRPCPort } from 'arduino-ide-extension/src/node/cli-protocol/cc/arduino/cli/commands/v1/port_pb' MonitorSettings,
import WebSocketProviderImpl from "./web-socket/web-socket-provider-impl"; Monitor,
} from '../common/protocol';
import {
EnumerateMonitorPortSettingsRequest,
EnumerateMonitorPortSettingsResponse,
MonitorPortConfiguration,
MonitorPortSetting,
MonitorRequest,
MonitorResponse,
} from './cli-protocol/cc/arduino/cli/commands/v1/monitor_pb';
import { CoreClientAware, CoreClientProvider } from './core-client-provider';
import { WebSocketProvider } from './web-socket/web-socket-provider';
import { Port as gRPCPort } from 'arduino-ide-extension/src/node/cli-protocol/cc/arduino/cli/commands/v1/port_pb';
import WebSocketProviderImpl from './web-socket/web-socket-provider-impl';
export const MonitorServiceName = 'monitor-service'; export const MonitorServiceName = 'monitor-service';
export class MonitorService extends CoreClientAware implements Disposable { export class MonitorService extends CoreClientAware implements Disposable {
// Bidirectional gRPC stream used to receive and send data from the running // Bidirectional gRPC stream used to receive and send data from the running
// pluggable monitor managed by the Arduino CLI. // pluggable monitor managed by the Arduino CLI.
protected duplex: ClientDuplexStream<MonitorRequest, MonitorResponse> | null; protected duplex: ClientDuplexStream<MonitorRequest, MonitorResponse> | null;
// Settings used by the currently running pluggable monitor. // Settings used by the currently running pluggable monitor.
// They can be freely modified while running. // They can be freely modified while running.
protected settings: MonitorSettings; protected settings: MonitorSettings;
// List of messages received from the running pluggable monitor. // List of messages received from the running pluggable monitor.
// These are flushed from time to time to the frontend. // These are flushed from time to time to the frontend.
protected messages: string[] = []; protected messages: string[] = [];
// Handles messages received from the frontend via websocket. // Handles messages received from the frontend via websocket.
protected onMessageReceived?: Disposable; protected onMessageReceived?: Disposable;
// Sends messages to the frontend from time to time. // Sends messages to the frontend from time to time.
protected flushMessagesInterval?: NodeJS.Timeout; protected flushMessagesInterval?: NodeJS.Timeout;
// Triggered each time the number of clients connected // Triggered each time the number of clients connected
// to the this service WebSocket changes. // to the this service WebSocket changes.
protected onWSClientsNumberChanged?: Disposable; protected onWSClientsNumberChanged?: Disposable;
// Used to notify that the monitor is being disposed // Used to notify that the monitor is being disposed
protected readonly onDisposeEmitter = new Emitter<void>(); protected readonly onDisposeEmitter = new Emitter<void>();
readonly onDispose = this.onDisposeEmitter.event; readonly onDispose = this.onDisposeEmitter.event;
protected readonly webSocketProvider: WebSocketProvider = new WebSocketProviderImpl(); protected readonly webSocketProvider: WebSocketProvider =
new WebSocketProviderImpl();
constructor( constructor(
@inject(ILogger) @inject(ILogger)
@named(MonitorServiceName) @named(MonitorServiceName)
protected readonly logger: ILogger, protected readonly logger: ILogger,
private readonly board: Board, private readonly board: Board,
private readonly port: Port, private readonly port: Port,
protected readonly coreClientProvider: CoreClientProvider, protected readonly coreClientProvider: CoreClientProvider
) { ) {
super(); super();
this.onWSClientsNumberChanged = this.webSocketProvider.onClientsNumberChanged(async (clients: number) => { this.onWSClientsNumberChanged =
if (clients === 0) { this.webSocketProvider.onClientsNumberChanged(async (clients: number) => {
// There are no more clients that want to receive if (clients === 0) {
// data from this monitor, we can freely close // There are no more clients that want to receive
// and dispose it. // data from this monitor, we can freely close
this.dispose(); // and dispose it.
} this.dispose();
}); }
});
// Sets default settings for this monitor // Sets default settings for this monitor
this.portMonitorSettings(port.protocol, board.fqbn!).then( this.portMonitorSettings(port.protocol, board.fqbn!).then(
settings => this.settings = settings (settings) => (this.settings = settings)
);
}
getWebsocketAddressPort(): number {
return this.webSocketProvider.getAddress().port;
}
dispose(): void {
this.stop();
this.onDisposeEmitter.fire();
}
/**
* isStarted is used to know if the currently running pluggable monitor is started.
* @returns true if pluggable monitor communication duplex is open,
* false in all other cases.
*/
isStarted(): boolean {
return !!this.duplex;
}
/**
* Start and connects a monitor using currently set board and port.
* If a monitor is already started or board fqbn, port address and/or protocol
* are missing nothing happens.
* @returns a status to verify connection has been established.
*/
async start(): Promise<Status> {
if (this.duplex) {
return Status.ALREADY_CONNECTED;
}
if (!this.board?.fqbn || !this.port?.address || !this.port?.protocol) {
return Status.CONFIG_MISSING;
}
this.logger.info('starting monitor');
await this.coreClientProvider.initialized;
const coreClient = await this.coreClient();
const { client, instance } = coreClient;
this.duplex = client.monitor();
this.duplex
.on('close', () => {
this.logger.info(
`monitor to ${this.port?.address} using ${this.port?.protocol} closed by client`
); );
} })
.on('end', () => {
getWebsocketAddressPort(): number { this.logger.info(
return this.webSocketProvider.getAddress().port; `monitor to ${this.port?.address} using ${this.port?.protocol} closed by server`
}
dispose(): void {
this.stop();
this.onDisposeEmitter.fire();
}
/**
* isStarted is used to know if the currently running pluggable monitor is started.
* @returns true if pluggable monitor communication duplex is open,
* false in all other cases.
*/
isStarted(): boolean {
return !!this.duplex;
}
/**
* Start and connects a monitor using currently set board and port.
* If a monitor is already started or board fqbn, port address and/or protocol
* are missing nothing happens.
* @returns a status to verify connection has been established.
*/
async start(): Promise<Status> {
if (this.duplex) {
return Status.ALREADY_CONNECTED;
}
if (!this.board?.fqbn || !this.port?.address || !this.port?.protocol) {
return Status.CONFIG_MISSING
}
this.logger.info("starting monitor");
await this.coreClientProvider.initialized;
const coreClient = await this.coreClient();
const { client, instance } = coreClient;
this.duplex = client.monitor()
this.duplex
.on('close', () => {
this.logger.info(`monitor to ${this.port?.address} using ${this.port?.protocol} closed by client`)
})
.on('end', () => {
this.logger.info(`monitor to ${this.port?.address} using ${this.port?.protocol} closed by server`)
})
.on('error', (err: Error) => {
this.logger.error(err);
// TODO
// this.theiaFEClient?.notifyError()
})
.on('data', ((res: MonitorResponse) => {
if (res.getError()) {
// TODO: Maybe disconnect
this.logger.error(res.getError());
return;
}
const data = res.getRxData()
const message =
typeof data === 'string' ? data : new TextDecoder('utf8').decode(data);
this.messages.push(...splitLines(message))
}).bind(this));
const req = new MonitorRequest();
req.setInstance(instance);
if (this.board?.fqbn) {
req.setFqbn(this.board.fqbn)
}
if (this.port?.address && this.port?.protocol) {
const port = new gRPCPort()
port.setAddress(this.port.address);
port.setProtocol(this.port.protocol);
req.setPort(port);
}
const config = new MonitorPortConfiguration();
for (const id in this.settings) {
const s = new MonitorPortSetting();
s.setSettingId(id);
s.setValue(this.settings[id].selectedValue);
config.addSettings(s);
}
req.setPortConfiguration(config)
const connect = new Promise<Status>(resolve => {
if (this.duplex?.write(req)) {
this.startMessagesHandlers();
this.logger.info(`started monitor to ${this.port?.address} using ${this.port?.protocol}`)
resolve(Status.OK);
return;
}
this.logger.warn(`failed starting monitor to ${this.port?.address} using ${this.port?.protocol}`)
resolve(Status.NOT_CONNECTED);
});
const connectTimeout = new Promise<Status>(resolve => {
setTimeout(async () => {
this.logger.warn(`timeout starting monitor to ${this.port?.address} using ${this.port?.protocol}`)
resolve(Status.NOT_CONNECTED);
}, 1000);
});
// Try opening a monitor connection with a timeout
return await Promise.race([
connect,
connectTimeout,
])
}
/**
* Pauses the currently running monitor, it still closes the gRPC connection
* with the underlying monitor process but it doesn't stop the message handlers
* currently running.
* This is mainly used to handle upload when to the board/port combination
* the monitor is listening to.
* @returns
*/
async pause(): Promise<void> {
return new Promise(resolve => {
if (!this.duplex) {
this.logger.warn(`monitor to ${this.port?.address} using ${this.port?.protocol} already stopped`)
return resolve();
}
// It's enough to close the connection with the client
// to stop the monitor process
this.duplex.cancel();
this.duplex = null;
this.logger.info(`stopped monitor to ${this.port?.address} using ${this.port?.protocol}`)
resolve();
})
}
/**
* Stop the monitor currently running
*/
async stop(): Promise<void> {
return this.pause().finally(
this.stopMessagesHandlers
); );
})
.on('error', (err: Error) => {
this.logger.error(err);
// TODO
// this.theiaFEClient?.notifyError()
})
.on(
'data',
((res: MonitorResponse) => {
if (res.getError()) {
// TODO: Maybe disconnect
this.logger.error(res.getError());
return;
}
const data = res.getRxData();
const message =
typeof data === 'string'
? data
: new TextDecoder('utf8').decode(data);
this.messages.push(...splitLines(message));
}).bind(this)
);
const req = new MonitorRequest();
req.setInstance(instance);
if (this.board?.fqbn) {
req.setFqbn(this.board.fqbn);
} }
if (this.port?.address && this.port?.protocol) {
/** const port = new gRPCPort();
* Send a message to the running monitor, a well behaved monitor port.setAddress(this.port.address);
* will then send that message to the board. port.setProtocol(this.port.protocol);
* We MUST NEVER send a message that wasn't a user's input to the board. req.setPort(port);
* @param message string sent to running monitor
* @returns a status to verify message has been sent.
*/
async send(message: string): Promise<Status> {
if (!this.duplex) {
return Status.NOT_CONNECTED;
}
await this.coreClientProvider.initialized;
const coreClient = await this.coreClient();
const { instance } = coreClient;
const req = new MonitorRequest();
req.setInstance(instance);
req.setTxData(new TextEncoder().encode(message));
return new Promise<Status>(resolve => {
if (this.duplex) {
this.duplex?.write(req, () => {
resolve(Status.OK);
});
return;
}
this.stop().then(() => resolve(Status.NOT_CONNECTED));
})
} }
const config = new MonitorPortConfiguration();
/** for (const id in this.settings) {
* const s = new MonitorPortSetting();
* @returns map of current monitor settings s.setSettingId(id);
*/ s.setValue(this.settings[id].selectedValue);
currentSettings(): MonitorSettings { config.addSettings(s);
return this.settings;
} }
req.setPortConfiguration(config);
/** const connect = new Promise<Status>((resolve) => {
* Returns the possible configurations used to connect a monitor if (this.duplex?.write(req)) {
* to the board specified by fqbn using the specified protocol this.startMessagesHandlers();
* @param protocol the protocol of the monitor we want get settings for this.logger.info(
* @param fqbn the fqbn of the board we want to monitor `started monitor to ${this.port?.address} using ${this.port?.protocol}`
* @returns a map of all the settings supported by the monitor );
*/ resolve(Status.OK);
private async portMonitorSettings(protocol: string, fqbn: string): Promise<MonitorSettings> { return;
await this.coreClientProvider.initialized; }
const coreClient = await this.coreClient(); this.logger.warn(
const { client, instance } = coreClient; `failed starting monitor to ${this.port?.address} using ${this.port?.protocol}`
const req = new EnumerateMonitorPortSettingsRequest(); );
req.setInstance(instance); resolve(Status.NOT_CONNECTED);
req.setPortProtocol(protocol); });
req.setFqbn(fqbn);
const res = await new Promise<EnumerateMonitorPortSettingsResponse>((resolve, reject) => { const connectTimeout = new Promise<Status>((resolve) => {
client.enumerateMonitorPortSettings(req, (err, resp) => { setTimeout(async () => {
if (!!err) { this.logger.warn(
reject(err) `timeout starting monitor to ${this.port?.address} using ${this.port?.protocol}`
} );
resolve(resp) resolve(Status.NOT_CONNECTED);
}) }, 1000);
});
// Try opening a monitor connection with a timeout
return await Promise.race([connect, connectTimeout]);
}
/**
* Pauses the currently running monitor, it still closes the gRPC connection
* with the underlying monitor process but it doesn't stop the message handlers
* currently running.
* This is mainly used to handle upload when to the board/port combination
* the monitor is listening to.
* @returns
*/
async pause(): Promise<void> {
return new Promise(async (resolve) => {
if (!this.duplex) {
this.logger.warn(
`monitor to ${this.port?.address} using ${this.port?.protocol} already stopped`
);
return resolve();
}
// It's enough to close the connection with the client
// to stop the monitor process
this.duplex.end();
this.duplex = null;
this.logger.info(
`stopped monitor to ${this.port?.address} using ${this.port?.protocol}`
);
resolve();
});
}
/**
* Stop the monitor currently running
*/
async stop(): Promise<void> {
return this.pause().finally(this.stopMessagesHandlers.bind(this));
}
/**
* Send a message to the running monitor, a well behaved monitor
* will then send that message to the board.
* We MUST NEVER send a message that wasn't a user's input to the board.
* @param message string sent to running monitor
* @returns a status to verify message has been sent.
*/
async send(message: string): Promise<Status> {
if (!this.duplex) {
return Status.NOT_CONNECTED;
}
await this.coreClientProvider.initialized;
const coreClient = await this.coreClient();
const { instance } = coreClient;
const req = new MonitorRequest();
req.setInstance(instance);
req.setTxData(new TextEncoder().encode(message));
return new Promise<Status>((resolve) => {
if (this.duplex) {
this.duplex?.write(req, () => {
resolve(Status.OK);
}); });
return;
}
this.stop().then(() => resolve(Status.NOT_CONNECTED));
});
}
let settings: MonitorSettings = {}; /**
for (const iterator of res.getSettingsList()) { *
settings[iterator.getSettingId()] = { * @returns map of current monitor settings
'id': iterator.getSettingId(), */
'label': iterator.getLabel(), currentSettings(): MonitorSettings {
'type': iterator.getType(), return this.settings;
'values': iterator.getEnumValuesList(), }
'selectedValue': iterator.getValue(),
} /**
} * Returns the possible configurations used to connect a monitor
return settings; * to the board specified by fqbn using the specified protocol
* @param protocol the protocol of the monitor we want get settings for
* @param fqbn the fqbn of the board we want to monitor
* @returns a map of all the settings supported by the monitor
*/
private async portMonitorSettings(
protocol: string,
fqbn: string
): Promise<MonitorSettings> {
await this.coreClientProvider.initialized;
const coreClient = await this.coreClient();
const { client, instance } = coreClient;
const req = new EnumerateMonitorPortSettingsRequest();
req.setInstance(instance);
req.setPortProtocol(protocol);
req.setFqbn(fqbn);
const res = await new Promise<EnumerateMonitorPortSettingsResponse>(
(resolve, reject) => {
client.enumerateMonitorPortSettings(req, (err, resp) => {
if (!!err) {
reject(err);
}
resolve(resp);
});
}
);
const settings: MonitorSettings = {};
for (const iterator of res.getSettingsList()) {
settings[iterator.getSettingId()] = {
id: iterator.getSettingId(),
label: iterator.getLabel(),
type: iterator.getType(),
values: iterator.getEnumValuesList(),
selectedValue: iterator.getValue(),
};
}
return settings;
}
/**
* Set monitor settings, if there is a running monitor they'll be sent
* to it, otherwise they'll be used when starting one.
* Only values in settings parameter will be change, other values won't
* be changed in any way.
* @param settings map of monitor settings to change
* @returns a status to verify settings have been sent.
*/
async changeSettings(settings: MonitorSettings): Promise<Status> {
const config = new MonitorPortConfiguration();
for (const id in settings) {
const s = new MonitorPortSetting();
s.setSettingId(id);
s.setValue(settings[id].selectedValue);
config.addSettings(s);
this.settings[id] = settings[id];
} }
/** if (!this.duplex) {
* Set monitor settings, if there is a running monitor they'll be sent return Status.NOT_CONNECTED;
* to it, otherwise they'll be used when starting one. }
* Only values in settings parameter will be change, other values won't await this.coreClientProvider.initialized;
* be changed in any way. const coreClient = await this.coreClient();
* @param settings map of monitor settings to change const { instance } = coreClient;
* @returns a status to verify settings have been sent.
*/
async changeSettings(settings: MonitorSettings): Promise<Status> {
const config = new MonitorPortConfiguration();
for (const id in settings) {
const s = new MonitorPortSetting();
s.setSettingId(id);
s.setValue(settings[id].selectedValue);
config.addSettings(s);
this.settings[id] = settings[id];
}
if (!this.duplex) { const req = new MonitorRequest();
return Status.NOT_CONNECTED; req.setInstance(instance);
} req.setPortConfiguration(config);
await this.coreClientProvider.initialized; this.duplex.write(req);
const coreClient = await this.coreClient(); return Status.OK;
const { instance } = coreClient; }
const req = new MonitorRequest(); /**
req.setInstance(instance); * Starts the necessary handlers to send and receive
req.setPortConfiguration(config) * messages to and from the frontend and the running monitor
this.duplex.write(req); */
return Status.OK private startMessagesHandlers(): void {
if (!this.flushMessagesInterval) {
const flushMessagesToFrontend = () => {
if (this.messages.length) {
this.webSocketProvider.sendMessage(JSON.stringify(this.messages));
this.messages = [];
}
};
this.flushMessagesInterval = setInterval(flushMessagesToFrontend, 32);
} }
/** if (!this.onMessageReceived) {
* Starts the necessary handlers to send and receive this.onMessageReceived = this.webSocketProvider.onMessageReceived(
* messages to and from the frontend and the running monitor (msg: string) => {
*/ const message: Monitor.Message = JSON.parse(msg);
private startMessagesHandlers(): void {
if (!this.flushMessagesInterval) {
const flushMessagesToFrontend = () => {
if (this.messages.length) {
this.webSocketProvider.sendMessage(JSON.stringify(this.messages));
this.messages = [];
}
};
this.flushMessagesInterval = setInterval(flushMessagesToFrontend, 32);
}
if (!this.onMessageReceived) { switch (message.command) {
this.onMessageReceived = this.webSocketProvider.onMessageReceived( case Monitor.Command.SEND_MESSAGE:
(msg: string) => { this.send(message.data);
const message: Monitor.Message = JSON.parse(msg); break;
case Monitor.Command.CHANGE_SETTINGS:
switch (message.command) { const settings: MonitorSettings = JSON.parse(message.data);
case Monitor.Command.SEND_MESSAGE: this.changeSettings(settings);
this.send(message.data); break;
break }
case Monitor.Command.CHANGE_SETTINGS:
const settings: MonitorSettings = JSON.parse(message.data);
this.changeSettings(settings);
break
}
}
)
} }
);
} }
}
/** /**
* Stops the necessary handlers to send and receive messages to * Stops the necessary handlers to send and receive messages to
* and from the frontend and the running monitor * and from the frontend and the running monitor
*/ */
private stopMessagesHandlers(): void { private stopMessagesHandlers(): void {
if (this.flushMessagesInterval) { if (this.flushMessagesInterval) {
clearInterval(this.flushMessagesInterval); clearInterval(this.flushMessagesInterval);
this.flushMessagesInterval = undefined; this.flushMessagesInterval = undefined;
}
if (this.onMessageReceived) {
this.onMessageReceived.dispose();
this.onMessageReceived = undefined;
}
} }
if (this.onMessageReceived) {
this.onMessageReceived.dispose();
this.onMessageReceived = undefined;
}
}
} }
/** /**
@ -368,5 +401,5 @@ export class MonitorService extends CoreClientAware implements Disposable {
* @returns an lines array * @returns an lines array
*/ */
function splitLines(s: string): string[] { function splitLines(s: string): string[] {
return s.split(/(?<=\n)/); return s.split(/(?<=\n)/);
} }