Fix upload and serial (#661)

* get serial connection status from BE

* handle serial connect in the BE

* allow breakpoints on vscode (windows)

* Timeout on config change to prevent serial busy

* serial-service tests
This commit is contained in:
Francesco Stasi
2021-12-07 17:38:43 +01:00
committed by GitHub
parent 88397931c5
commit 767b09d2f1
19 changed files with 576 additions and 756 deletions

View File

@@ -63,27 +63,61 @@ namespace ErrorWithCode {
@injectable()
export class SerialServiceImpl implements SerialService {
@named(SerialServiceName)
@inject(ILogger)
protected readonly logger: ILogger;
protected theiaFEClient?: SerialServiceClient;
protected serialConfig?: SerialConfig;
@inject(MonitorClientProvider)
protected readonly serialClientProvider: MonitorClientProvider;
@inject(WebSocketService)
protected readonly webSocketService: WebSocketService;
protected client?: SerialServiceClient;
protected serialConnection?: {
duplex: ClientDuplexStream<StreamingOpenRequest, StreamingOpenResponse>;
config: SerialConfig;
};
protected messages: string[] = [];
protected onMessageReceived: Disposable | null;
protected onWSClientsNumberChanged: Disposable | null;
protected flushMessagesInterval: NodeJS.Timeout | null;
uploadInProgress = false;
constructor(
@inject(ILogger)
@named(SerialServiceName)
protected readonly logger: ILogger,
@inject(MonitorClientProvider)
protected readonly serialClientProvider: MonitorClientProvider,
@inject(WebSocketService)
protected readonly webSocketService: WebSocketService
) {}
async isSerialPortOpen(): Promise<boolean> {
return !!this.serialConnection;
}
setClient(client: SerialServiceClient | undefined): void {
this.client = client;
this.theiaFEClient = client;
this.theiaFEClient?.notifyWebSocketChanged(
this.webSocketService.getAddress().port
);
// listen for the number of websocket clients and create or dispose the serial connection
this.onWSClientsNumberChanged =
this.webSocketService.onClientsNumberChanged(async () => {
await this.connectSerialIfRequired();
});
}
public async clientsAttached(): Promise<number> {
return this.webSocketService.getConnectedClientsNumber.bind(
this.webSocketService
)();
}
public async connectSerialIfRequired(): Promise<void> {
if (this.uploadInProgress) return;
const clients = await this.clientsAttached();
clients > 0 ? await this.connect() : await this.disconnect();
}
dispose(): void {
@@ -92,7 +126,13 @@ export class SerialServiceImpl implements SerialService {
this.disconnect();
}
this.logger.info('<<< Disposed serial service.');
this.client = undefined;
this.theiaFEClient = undefined;
}
async setSerialConfig(config: SerialConfig): Promise<void> {
this.serialConfig = config;
await this.disconnect();
await this.connectSerialIfRequired();
}
async updateWsConfigParam(
@@ -105,12 +145,17 @@ export class SerialServiceImpl implements SerialService {
this.webSocketService.sendMessage(JSON.stringify(msg));
}
async connect(config: SerialConfig): Promise<Status> {
private async connect(): Promise<Status> {
if (!this.serialConfig) {
return Status.CONFIG_MISSING;
}
this.logger.info(
`>>> Creating serial connection for ${Board.toString(
config.board
)} on port ${Port.toString(config.port)}...`
this.serialConfig.board
)} on port ${Port.toString(this.serialConfig.port)}...`
);
if (this.serialConnection) {
return Status.ALREADY_CONNECTED;
}
@@ -122,27 +167,29 @@ export class SerialServiceImpl implements SerialService {
return { message: client.message };
}
const duplex = client.streamingOpen();
this.serialConnection = { duplex, config };
this.serialConnection = { duplex, config: this.serialConfig };
const serialConfig = this.serialConfig;
duplex.on(
'error',
((error: Error) => {
const serialError = ErrorWithCode.toSerialError(error, config);
this.disconnect(serialError).then(() => {
if (this.client) {
this.client.notifyError(serialError);
}
if (serialError.code === undefined) {
// Log the original, unexpected error.
this.logger.error(error);
}
});
const serialError = ErrorWithCode.toSerialError(error, serialConfig);
if (serialError.code !== SerialError.ErrorCodes.CLIENT_CANCEL) {
this.disconnect(serialError).then(() => {
if (this.theiaFEClient) {
this.theiaFEClient.notifyError(serialError);
}
});
}
if (serialError.code === undefined) {
// Log the original, unexpected error.
this.logger.error(error);
}
}).bind(this)
);
this.client?.notifyWebSocketChanged(
this.webSocketService.getAddress().port
);
this.updateWsConfigParam({ connected: !!this.serialConnection });
const flushMessagesToFrontend = () => {
if (this.messages.length) {
@@ -162,17 +209,17 @@ export class SerialServiceImpl implements SerialService {
break;
case SerialPlotter.Protocol.Command.PLOTTER_SET_BAUDRATE:
this.client?.notifyBaudRateChanged(
this.theiaFEClient?.notifyBaudRateChanged(
parseInt(message.data, 10) as SerialConfig.BaudRate
);
break;
case SerialPlotter.Protocol.Command.PLOTTER_SET_LINE_ENDING:
this.client?.notifyLineEndingChanged(message.data);
this.theiaFEClient?.notifyLineEndingChanged(message.data);
break;
case SerialPlotter.Protocol.Command.PLOTTER_SET_INTERPOLATE:
this.client?.notifyInterpolateChanged(message.data);
this.theiaFEClient?.notifyInterpolateChanged(message.data);
break;
default:
@@ -185,27 +232,6 @@ export class SerialServiceImpl implements SerialService {
// empty the queue every 32ms (~30fps)
this.flushMessagesInterval = setInterval(flushMessagesToFrontend, 32);
// converts 'ab\nc\nd' => [ab\n,c\n,d]
const stringToArray = (string: string, separator = '\n') => {
const retArray: string[] = [];
let prevChar = separator;
for (let i = 0; i < string.length; i++) {
const currChar = string[i];
if (prevChar === separator) {
retArray.push(currChar);
} else {
const lastWord = retArray[retArray.length - 1];
retArray[retArray.length - 1] = lastWord + currChar;
}
prevChar = currChar;
}
return retArray;
};
duplex.on(
'data',
((resp: StreamingOpenResponse) => {
@@ -219,69 +245,105 @@ export class SerialServiceImpl implements SerialService {
}).bind(this)
);
const { type, port } = config;
const { type, port } = this.serialConfig;
const req = new StreamingOpenRequest();
const monitorConfig = new GrpcMonitorConfig();
monitorConfig.setType(this.mapType(type));
monitorConfig.setTarget(port.address);
if (config.baudRate !== undefined) {
if (this.serialConfig.baudRate !== undefined) {
monitorConfig.setAdditionalConfig(
Struct.fromJavaScript({ BaudRate: config.baudRate })
Struct.fromJavaScript({ BaudRate: this.serialConfig.baudRate })
);
}
req.setConfig(monitorConfig);
return new Promise<Status>((resolve) => {
if (this.serialConnection) {
this.serialConnection.duplex.write(req, () => {
if (!this.serialConnection) {
return await this.disconnect();
}
const writeTimeout = new Promise<Status>((resolve) => {
setTimeout(async () => {
resolve(Status.NOT_CONNECTED);
}, 1000);
});
const writePromise = (serialConnection: any) => {
return new Promise<Status>((resolve) => {
serialConnection.duplex.write(req, () => {
const boardName = this.serialConfig?.board
? Board.toString(this.serialConfig.board, {
useFqbn: false,
})
: 'unknown board';
const portName = this.serialConfig?.port
? Port.toString(this.serialConfig.port)
: 'unknown port';
this.logger.info(
`<<< Serial connection created for ${Board.toString(config.board, {
useFqbn: false,
})} on port ${Port.toString(config.port)}.`
`<<< Serial connection created for ${boardName} on port ${portName}.`
);
resolve(Status.OK);
});
return;
}
this.disconnect().then(() => resolve(Status.NOT_CONNECTED));
});
});
};
const status = await Promise.race([
writeTimeout,
writePromise(this.serialConnection),
]);
if (status === Status.NOT_CONNECTED) {
this.disconnect();
}
return status;
}
async disconnect(reason?: SerialError): Promise<Status> {
try {
if (this.onMessageReceived) {
this.onMessageReceived.dispose();
this.onMessageReceived = null;
}
if (this.flushMessagesInterval) {
clearInterval(this.flushMessagesInterval);
this.flushMessagesInterval = null;
}
public async disconnect(reason?: SerialError): Promise<Status> {
return new Promise<Status>((resolve) => {
try {
if (this.onMessageReceived) {
this.onMessageReceived.dispose();
this.onMessageReceived = null;
}
if (this.flushMessagesInterval) {
clearInterval(this.flushMessagesInterval);
this.flushMessagesInterval = null;
}
if (
!this.serialConnection &&
reason &&
reason.code === SerialError.ErrorCodes.CLIENT_CANCEL
) {
return Status.OK;
if (
!this.serialConnection &&
reason &&
reason.code === SerialError.ErrorCodes.CLIENT_CANCEL
) {
resolve(Status.OK);
return;
}
this.logger.info('>>> Disposing serial connection...');
if (!this.serialConnection) {
this.logger.warn('<<< Not connected. Nothing to dispose.');
resolve(Status.NOT_CONNECTED);
return;
}
const { duplex, config } = this.serialConnection;
this.logger.info(
`<<< Disposed serial connection for ${Board.toString(config.board, {
useFqbn: false,
})} on port ${Port.toString(config.port)}.`
);
duplex.cancel();
} finally {
this.serialConnection = undefined;
this.updateWsConfigParam({ connected: !!this.serialConnection });
this.messages.length = 0;
setTimeout(() => {
resolve(Status.OK);
}, 200);
}
this.logger.info('>>> Disposing serial connection...');
if (!this.serialConnection) {
this.logger.warn('<<< Not connected. Nothing to dispose.');
return Status.NOT_CONNECTED;
}
const { duplex, config } = this.serialConnection;
duplex.cancel();
this.logger.info(
`<<< Disposed serial connection for ${Board.toString(config.board, {
useFqbn: false,
})} on port ${Port.toString(config.port)}.`
);
this.serialConnection = undefined;
return Status.OK;
} finally {
this.messages.length = 0;
}
});
}
async sendMessageToSerial(message: string): Promise<Status> {
@@ -312,3 +374,24 @@ export class SerialServiceImpl implements SerialService {
}
}
}
// converts 'ab\nc\nd' => [ab\n,c\n,d]
function stringToArray(string: string, separator = '\n') {
const retArray: string[] = [];
let prevChar = separator;
for (let i = 0; i < string.length; i++) {
const currChar = string[i];
if (prevChar === separator) {
retArray.push(currChar);
} else {
const lastWord = retArray[retArray.length - 1];
retArray[retArray.length - 1] = lastWord + currChar;
}
prevChar = currChar;
}
return retArray;
}