Update package index on 3rd party URLs change.

Closes #637
Closes #906

Signed-off-by: Akos Kitta <a.kitta@arduino.cc>
This commit is contained in:
Akos Kitta
2022-06-29 11:14:56 +02:00
committed by Akos Kitta
parent 1073c3fc7d
commit a36524e02a
43 changed files with 1052 additions and 704 deletions

View File

@@ -1,18 +1,18 @@
import { join } from 'path';
import * as grpc from '@grpc/grpc-js';
import {
inject,
injectable,
postConstruct,
} from '@theia/core/shared/inversify';
import { Event, Emitter } from '@theia/core/lib/common/event';
import { GrpcClientProvider } from './grpc-client-provider';
import { Emitter } from '@theia/core/lib/common/event';
import { ArduinoCoreServiceClient } from './cli-protocol/cc/arduino/cli/commands/v1/commands_grpc_pb';
import { Instance } from './cli-protocol/cc/arduino/cli/commands/v1/common_pb';
import {
CreateRequest,
CreateResponse,
InitRequest,
InitResponse,
UpdateCoreLibrariesIndexResponse,
UpdateIndexRequest,
UpdateIndexResponse,
UpdateLibrariesIndexRequest,
@@ -25,263 +25,363 @@ import {
Status as RpcStatus,
Status,
} from './cli-protocol/google/rpc/status_pb';
import { ConfigServiceImpl } from './config-service-impl';
import { ArduinoDaemonImpl } from './arduino-daemon-impl';
import { DisposableCollection } from '@theia/core/lib/common/disposable';
import { Disposable } from '@theia/core/shared/vscode-languageserver-protocol';
import {
IndexesUpdateProgressHandler,
ExecuteWithProgress,
} from './grpc-progressible';
import type { DefaultCliConfig } from './cli-config';
import { ServiceError } from './service-error';
@injectable()
export class CoreClientProvider extends GrpcClientProvider<CoreClientProvider.Client> {
export class CoreClientProvider {
@inject(ArduinoDaemonImpl)
private readonly daemon: ArduinoDaemonImpl;
@inject(ConfigServiceImpl)
private readonly configService: ConfigServiceImpl;
@inject(NotificationServiceServer)
protected readonly notificationService: NotificationServiceServer;
private readonly notificationService: NotificationServiceServer;
protected readonly onClientReadyEmitter = new Emitter<void>();
private ready = new Deferred<void>();
private pending: Deferred<CoreClientProvider.Client> | undefined;
private _client: CoreClientProvider.Client | undefined;
private readonly toDisposeBeforeCreate = new DisposableCollection();
private readonly toDisposeAfterDidCreate = new DisposableCollection();
private readonly onClientReadyEmitter =
new Emitter<CoreClientProvider.Client>();
private readonly onClientReady = this.onClientReadyEmitter.event;
protected _created = new Deferred<void>();
protected _initialized = new Deferred<void>();
get created(): Promise<void> {
return this._created.promise;
}
get initialized(): Promise<void> {
return this._initialized.promise;
}
get onClientReady(): Event<void> {
return this.onClientReadyEmitter.event;
}
close(client: CoreClientProvider.Client): void {
client.client.close();
this._created.reject();
this._initialized.reject();
this._created = new Deferred<void>();
this._initialized = new Deferred<void>();
}
protected override async reconcileClient(port: string): Promise<void> {
if (port && port === this._port) {
// No need to create a new gRPC client, but we have to update the indexes.
if (this._client && !(this._client instanceof Error)) {
await this.updateIndexes(this._client);
this.onClientReadyEmitter.fire();
@postConstruct()
protected init(): void {
this.daemon.tryGetPort().then((port) => {
if (port) {
this.create(port);
}
});
this.daemon.onDaemonStarted((port) => this.create(port));
this.daemon.onDaemonStopped(() => this.closeClient());
this.configService.onConfigChange(() => this.refreshIndexes());
}
get tryGetClient(): CoreClientProvider.Client | undefined {
return this._client;
}
get client(): Promise<CoreClientProvider.Client> {
const client = this.tryGetClient;
if (client) {
return Promise.resolve(client);
}
if (!this.pending) {
this.pending = new Deferred();
this.toDisposeAfterDidCreate.pushAll([
Disposable.create(() => (this.pending = undefined)),
this.onClientReady((client) => {
this.pending?.resolve(client);
this.toDisposeAfterDidCreate.dispose();
}),
]);
}
return this.pending.promise;
}
/**
* Encapsulates both the gRPC core client creation (`CreateRequest`) and initialization (`InitRequest`).
*/
private async create(port: string): Promise<CoreClientProvider.Client> {
this.closeClient();
const address = this.address(port);
const client = await this.createClient(address);
this.toDisposeBeforeCreate.pushAll([
Disposable.create(() => client.client.close()),
Disposable.create(() => {
this.ready.reject(
new Error(
`Disposed. Creating a new gRPC core client on address ${address}.`
)
);
this.ready = new Deferred();
}),
]);
await this.initInstanceWithFallback(client);
setTimeout(async () => this.refreshIndexes(), 10_000); // Update the indexes asynchronously
return this.useClient(client);
}
/**
* By default, calling this method is equivalent to the `initInstance(Client)` call.
* When the IDE2 starts and one of the followings is missing,
* the IDE2 must run the index update before the core client initialization:
*
* - primary package index (`#directories.data/package_index.json`),
* - library index (`#directories.data/library_index.json`),
* - built-in tools (`builtin:serial-discovery` or `builtin:mdns-discovery`)
*
* This method detects such errors and runs an index update before initializing the client.
* The index update will fail if the 3rd URLs list contains an invalid URL,
* and the IDE2 will be [non-functional](https://github.com/arduino/arduino-ide/issues/1084). Since the CLI [cannot update only the primary package index]((https://github.com/arduino/arduino-cli/issues/1788)), IDE2 does its dirty solution.
*/
private async initInstanceWithFallback(
client: CoreClientProvider.Client
): Promise<void> {
try {
await this.initInstance(client);
} catch (err) {
if (err instanceof IndexUpdateRequiredBeforeInitError) {
console.error(
'The primary packages indexes are missing. Running indexes update before initializing the core gRPC client',
err.message
);
await this.updateIndexes(client); // TODO: this should run without the 3rd party URLs
await this.initInstance(client);
console.info(
`Downloaded the primary packages indexes, and successfully initialized the core gRPC client.`
);
} else {
console.error(
'Error occurred while initializing the core gRPC client provider',
err
);
throw err;
}
} else {
await super.reconcileClient(port);
this.onClientReadyEmitter.fire();
}
}
@postConstruct()
protected override init(): void {
this.daemon.getPort().then(async (port) => {
// First create the client and the instance synchronously
// and notify client is ready.
// TODO: Creation failure should probably be handled here
await this.reconcileClient(port); // create instance
this._created.resolve();
// Normal startup workflow:
// 1. create instance,
// 2. init instance,
// 3. update indexes asynchronously.
// First startup workflow:
// 1. create instance,
// 2. update indexes and wait,
// 3. init instance.
if (this._client && !(this._client instanceof Error)) {
try {
await this.initInstance(this._client); // init instance
this._initialized.resolve();
this.updateIndex(this._client); // Update the indexes asynchronously
} catch (error: unknown) {
console.error(
'Error occurred while initializing the core gRPC client provider',
error
);
if (error instanceof IndexUpdateRequiredBeforeInitError) {
// If it's a first start, IDE2 must run index update before the init request.
await this.updateIndexes(this._client);
await this.initInstance(this._client);
this._initialized.resolve();
} else {
throw error;
}
}
}
});
this.daemon.onDaemonStopped(() => {
if (this._client && !(this._client instanceof Error)) {
this.close(this._client);
}
this._client = undefined;
this._port = undefined;
});
private useClient(
client: CoreClientProvider.Client
): CoreClientProvider.Client {
this._client = client;
this.onClientReadyEmitter.fire(this._client);
return this._client;
}
protected async createClient(
port: string | number
private closeClient(): void {
return this.toDisposeBeforeCreate.dispose();
}
private async createClient(
address: string
): Promise<CoreClientProvider.Client> {
// https://github.com/agreatfool/grpc_tools_node_protoc_ts/blob/master/doc/grpcjs_support.md#usage
const ArduinoCoreServiceClient = grpc.makeClientConstructor(
// @ts-expect-error: ignore
commandsGrpcPb['cc.arduino.cli.commands.v1.ArduinoCoreService'],
'ArduinoCoreServiceService'
// eslint-disable-next-line @typescript-eslint/no-explicit-any
) as any;
const client = new ArduinoCoreServiceClient(
`localhost:${port}`,
address,
grpc.credentials.createInsecure(),
this.channelOptions
) as ArduinoCoreServiceClient;
const createRes = await new Promise<CreateResponse>((resolve, reject) => {
client.create(new CreateRequest(), (err, res: CreateResponse) => {
const instance = await new Promise<Instance>((resolve, reject) => {
client.create(new CreateRequest(), (err, resp) => {
if (err) {
reject(err);
return;
}
resolve(res);
const instance = resp.getInstance();
if (!instance) {
reject(
new Error(
'`CreateResponse` was OK, but the retrieved `instance` was `undefined`.'
)
);
return;
}
resolve(instance);
});
});
const instance = createRes.getInstance();
if (!instance) {
throw new Error(
'Could not retrieve instance from the initialize response.'
);
}
return { instance, client };
}
protected async initInstance({
private async initInstance({
client,
instance,
}: CoreClientProvider.Client): Promise<void> {
const initReq = new InitRequest();
initReq.setInstance(instance);
return new Promise<void>((resolve, reject) => {
const stream = client.init(initReq);
const errors: RpcStatus[] = [];
stream.on('data', (res: InitResponse) => {
const progress = res.getInitProgress();
if (progress) {
const downloadProgress = progress.getDownloadProgress();
if (downloadProgress && downloadProgress.getCompleted()) {
const file = downloadProgress.getFile();
console.log(`Downloaded ${file}`);
client
.init(new InitRequest().setInstance(instance))
.on('data', (resp: InitResponse) => {
// XXX: The CLI never sends `initProgress`, it's always `error` or nothing. Is this a CLI bug?
// According to the gRPC API, the CLI should send either a `TaskProgress` or a `DownloadProgress`, but it does not.
const error = resp.getError();
if (error) {
const { code, message } = Status.toObject(false, error);
console.error(
`Detected an error response during the gRPC core client initialization: code: ${code}, message: ${message}`
);
errors.push(error);
}
const taskProgress = progress.getTaskProgress();
if (taskProgress && taskProgress.getCompleted()) {
const name = taskProgress.getName();
console.log(`Completed ${name}`);
})
.on('error', reject)
.on('end', () => {
const error = this.evaluateErrorStatus(errors);
if (error) {
reject(error);
return;
}
}
const error = res.getError();
if (error) {
const { code, message } = Status.toObject(false, error);
console.error(
`Detected an error response during the gRPC core client initialization: code: ${code}, message: ${message}`
);
errors.push(error);
}
});
stream.on('error', reject);
stream.on('end', () => {
const error = this.evaluateErrorStatus(errors);
if (error) {
reject(error);
return;
}
resolve();
});
resolve();
});
});
}
private evaluateErrorStatus(status: RpcStatus[]): Error | undefined {
const error = isIndexUpdateRequiredBeforeInit(status); // put future error matching here
return error;
const { cliConfiguration } = this.configService;
if (!cliConfiguration) {
// If the CLI config is not available, do not even try to guess what went wrong.
return new Error(`Could not read the CLI configuration file.`);
}
return isIndexUpdateRequiredBeforeInit(status, cliConfiguration); // put future error matching here
}
protected async updateIndexes(
client: CoreClientProvider.Client
): Promise<CoreClientProvider.Client> {
/**
* Updates all indexes and runs an init to [reload the indexes](https://github.com/arduino/arduino-cli/pull/1274#issue-866154638).
*/
private async refreshIndexes(): Promise<void> {
const client = this._client;
if (client) {
const progressHandler = this.createProgressHandler();
try {
await this.updateIndexes(client, progressHandler);
await this.initInstance(client);
// notify clients about the index update only after the client has been "re-initialized" and the new content is available.
progressHandler.reportEnd();
} catch (err) {
console.error('Failed to update indexes', err);
progressHandler.reportError(
ServiceError.is(err) ? err.details : String(err)
);
}
}
}
private async updateIndexes(
client: CoreClientProvider.Client,
progressHandler?: IndexesUpdateProgressHandler
): Promise<void> {
await Promise.all([
retry(() => this.updateIndex(client), 50, 3),
retry(() => this.updateLibraryIndex(client), 50, 3),
this.updateIndex(client, progressHandler),
this.updateLibraryIndex(client, progressHandler),
]);
this.notificationService.notifyIndexUpdated();
return client;
}
protected async updateLibraryIndex({
client,
instance,
}: CoreClientProvider.Client): Promise<void> {
const req = new UpdateLibrariesIndexRequest();
req.setInstance(instance);
const resp = client.updateLibrariesIndex(req);
let file: string | undefined;
resp.on('data', (data: UpdateLibrariesIndexResponse) => {
const progress = data.getDownloadProgress();
if (progress) {
if (!file && progress.getFile()) {
file = `${progress.getFile()}`;
}
if (progress.getCompleted()) {
if (file) {
if (/\s/.test(file)) {
console.log(`${file} completed.`);
} else {
console.log(`Download of '${file}' completed.`);
}
} else {
console.log('The library index has been successfully updated.');
}
file = undefined;
}
}
});
await new Promise<void>((resolve, reject) => {
resp.on('error', (error) => {
reject(error);
});
resp.on('end', resolve);
});
private async updateIndex(
client: CoreClientProvider.Client,
progressHandler?: IndexesUpdateProgressHandler
): Promise<void> {
return this.doUpdateIndex(
() =>
client.client.updateIndex(
new UpdateIndexRequest().setInstance(client.instance)
),
progressHandler,
'platform-index'
);
}
protected async updateIndex({
client,
instance,
}: CoreClientProvider.Client): Promise<void> {
const updateReq = new UpdateIndexRequest();
updateReq.setInstance(instance);
const updateResp = client.updateIndex(updateReq);
let file: string | undefined;
updateResp.on('data', (o: UpdateIndexResponse) => {
const progress = o.getDownloadProgress();
if (progress) {
if (!file && progress.getFile()) {
file = `${progress.getFile()}`;
}
if (progress.getCompleted()) {
if (file) {
if (/\s/.test(file)) {
console.log(`${file} completed.`);
} else {
console.log(`Download of '${file}' completed.`);
}
} else {
console.log('The index has been successfully updated.');
}
file = undefined;
}
}
});
await new Promise<void>((resolve, reject) => {
updateResp.on('error', reject);
updateResp.on('end', resolve);
});
private async updateLibraryIndex(
client: CoreClientProvider.Client,
progressHandler?: IndexesUpdateProgressHandler
): Promise<void> {
return this.doUpdateIndex(
() =>
client.client.updateLibrariesIndex(
new UpdateLibrariesIndexRequest().setInstance(client.instance)
),
progressHandler,
'library-index'
);
}
private async doUpdateIndex<
R extends
| UpdateIndexResponse
| UpdateLibrariesIndexResponse
| UpdateCoreLibrariesIndexResponse // not used by IDE2
>(
responseProvider: () => grpc.ClientReadableStream<R>,
progressHandler?: IndexesUpdateProgressHandler,
task?: string
): Promise<void> {
const progressId = progressHandler?.progressId;
return retry(
() =>
new Promise<void>((resolve, reject) => {
responseProvider()
.on(
'data',
ExecuteWithProgress.createDataCallback({
responseService: {
appendToOutput: ({ chunk: message }) => {
console.log(
`core-client-provider${task ? ` [${task}]` : ''}`,
message
);
progressHandler?.reportProgress(message);
},
},
progressId,
})
)
.on('error', reject)
.on('end', resolve);
}),
50,
3
);
}
private createProgressHandler(): IndexesUpdateProgressHandler {
const additionalUrlsCount =
this.configService.cliConfiguration?.board_manager?.additional_urls
?.length ?? 0;
return new IndexesUpdateProgressHandler(
additionalUrlsCount,
(progressMessage) =>
this.notificationService.notifyIndexUpdateDidProgress(progressMessage),
({ progressId, message }) =>
this.notificationService.notifyIndexUpdateDidFail({
progressId,
message,
}),
(progressId) =>
this.notificationService.notifyIndexWillUpdate(progressId),
(progressId) => this.notificationService.notifyIndexDidUpdate(progressId)
);
}
private address(port: string): string {
return `localhost:${port}`;
}
private get channelOptions(): Record<string, unknown> {
return {
'grpc.max_send_message_length': 512 * 1024 * 1024,
'grpc.max_receive_message_length': 512 * 1024 * 1024,
'grpc.primary_user_agent': `arduino-ide/${this.version}`,
};
}
private _version: string | undefined;
private get version(): string {
if (this._version) {
return this._version;
}
const json = require('../../package.json');
if ('version' in json) {
this._version = json.version;
}
if (!this._version) {
this._version = '0.0.0';
}
return this._version;
}
}
export namespace CoreClientProvider {
@@ -291,22 +391,18 @@ export namespace CoreClientProvider {
}
}
/**
* Sugar for making the gRPC core client available for the concrete service classes.
*/
@injectable()
export abstract class CoreClientAware {
@inject(CoreClientProvider)
protected readonly coreClientProvider: CoreClientProvider;
protected async coreClient(): Promise<CoreClientProvider.Client> {
return await new Promise<CoreClientProvider.Client>(
async (resolve, reject) => {
const client = await this.coreClientProvider.client();
if (client && client instanceof Error) {
reject(client);
} else if (client) {
return resolve(client);
}
}
);
private readonly coreClientProvider: CoreClientProvider;
/**
* Returns with a promise that resolves when the core client is initialized and ready.
*/
protected get coreClient(): Promise<CoreClientProvider.Client> {
return this.coreClientProvider.client;
}
}
@@ -326,13 +422,14 @@ ${causes
}
function isIndexUpdateRequiredBeforeInit(
status: RpcStatus[]
status: RpcStatus[],
cliConfig: DefaultCliConfig
): IndexUpdateRequiredBeforeInitError | undefined {
const causes = status
.filter((s) =>
IndexUpdateRequiredBeforeInit.map((predicate) => predicate(s)).some(
Boolean
)
IndexUpdateRequiredBeforeInit.map((predicate) =>
predicate(s, cliConfig)
).some(Boolean)
)
.map((s) => RpcStatus.toObject(false, s));
return causes.length
@@ -343,9 +440,14 @@ const IndexUpdateRequiredBeforeInit = [
isPackageIndexMissingStatus,
isDiscoveryNotFoundStatus,
];
function isPackageIndexMissingStatus(status: RpcStatus): boolean {
function isPackageIndexMissingStatus(
status: RpcStatus,
{ directories: { data } }: DefaultCliConfig
): boolean {
const predicate = ({ message }: RpcStatus.AsObject) =>
message.includes('loading json index file');
message.includes('loading json index file') &&
(message.includes(join(data, 'package_index.json')) ||
message.includes(join(data, 'library_index.json')));
// https://github.com/arduino/arduino-cli/blob/f0245bc2da6a56fccea7b2c9ea09e85fdcc52cb8/arduino/cores/packagemanager/package_manager.go#L247
return evaluate(status, predicate);
}