From c986bf0241ee1d3c1e3424c2928db84464a042a9 Mon Sep 17 00:00:00 2001 From: Akos Kitta Date: Mon, 11 Jul 2022 11:42:46 +0200 Subject: [PATCH 1/5] Restart discovery after re-initializing client. Otherwise, board discovery stops working after indexes update. Signed-off-by: Akos Kitta --- arduino-ide-extension/src/node/board-discovery.ts | 7 ++++++- .../src/node/core-client-provider.ts | 13 ++++++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/arduino-ide-extension/src/node/board-discovery.ts b/arduino-ide-extension/src/node/board-discovery.ts index f2d2dcb41..9a83df31d 100644 --- a/arduino-ide-extension/src/node/board-discovery.ts +++ b/arduino-ide-extension/src/node/board-discovery.ts @@ -56,6 +56,11 @@ export class BoardDiscovery extends CoreClientAware { @postConstruct() protected async init(): Promise { this.coreClient.then((client) => this.startBoardListWatch(client)); + this.onClientDidRefresh((client) => + this.stopBoardListWatch(client).then(() => + this.startBoardListWatch(client) + ) + ); } stopBoardListWatch(coreClient: CoreClientProvider.Client): Promise { @@ -79,7 +84,7 @@ export class BoardDiscovery extends CoreClientAware { startBoardListWatch(coreClient: CoreClientProvider.Client): void { if (this.watching) { // We want to avoid starting the board list watch process multiple - // times to meet unforseen consequences + // times to meet unforeseen consequences return; } this.watching = true; diff --git a/arduino-ide-extension/src/node/core-client-provider.ts b/arduino-ide-extension/src/node/core-client-provider.ts index 3258ef829..449f28ab5 100644 --- a/arduino-ide-extension/src/node/core-client-provider.ts +++ b/arduino-ide-extension/src/node/core-client-provider.ts @@ -5,7 +5,7 @@ import { injectable, postConstruct, } from '@theia/core/shared/inversify'; -import { Emitter } from '@theia/core/lib/common/event'; +import { Emitter, Event } from '@theia/core/lib/common/event'; import { ArduinoCoreServiceClient } from './cli-protocol/cc/arduino/cli/commands/v1/commands_grpc_pb'; import { Instance } from './cli-protocol/cc/arduino/cli/commands/v1/common_pb'; import { @@ -53,6 +53,8 @@ export class CoreClientProvider { private readonly onClientReadyEmitter = new Emitter(); private readonly onClientReady = this.onClientReadyEmitter.event; + private readonly onClientDidRefreshEmitter = + new Emitter(); @postConstruct() protected init(): void { @@ -88,6 +90,10 @@ export class CoreClientProvider { return this.pending.promise; } + get onClientDidRefresh(): Event { + return this.onClientDidRefreshEmitter.event; + } + /** * Encapsulates both the gRPC core client creation (`CreateRequest`) and initialization (`InitRequest`). */ @@ -253,6 +259,7 @@ export class CoreClientProvider { await this.initInstance(client); // notify clients about the index update only after the client has been "re-initialized" and the new content is available. progressHandler.reportEnd(); + this.onClientDidRefreshEmitter.fire(client); } catch (err) { console.error('Failed to update indexes', err); progressHandler.reportError( @@ -404,6 +411,10 @@ export abstract class CoreClientAware { protected get coreClient(): Promise { return this.coreClientProvider.client; } + + protected get onClientDidRefresh(): Event { + return this.coreClientProvider.onClientDidRefresh; + } } class IndexUpdateRequiredBeforeInitError extends Error { From b94f827f3dd38ca6d44f3bc10653e059f334ac54 Mon Sep 17 00:00:00 2001 From: Akos Kitta Date: Tue, 12 Jul 2022 15:59:03 +0200 Subject: [PATCH 2/5] another way to cancel the discovery. Signed-off-by: Akos Kitta --- .../src/node/arduino-ide-backend-module.ts | 5 +- .../src/node/board-discovery.ts | 230 +++++++++++++----- .../src/node/boards-service-impl.ts | 8 +- .../src/node/library-service-impl.ts | 12 +- .../src/node/service-error.ts | 3 + 5 files changed, 191 insertions(+), 67 deletions(-) diff --git a/arduino-ide-extension/src/node/arduino-ide-backend-module.ts b/arduino-ide-extension/src/node/arduino-ide-backend-module.ts index 1cf4f39af..169a728a9 100644 --- a/arduino-ide-extension/src/node/arduino-ide-backend-module.ts +++ b/arduino-ide-extension/src/node/arduino-ide-backend-module.ts @@ -203,6 +203,7 @@ export default new ContainerModule((bind, unbind, isBound, rebind) => { // Shared port/board discovery for the server bind(BoardDiscovery).toSelf().inSingletonScope(); + bind(BackendApplicationContribution).toService(BoardDiscovery); // Core service -> `verify` and `upload`. Singleton per BE, each FE connection gets its proxy. bind(ConnectionContainerModule).toConstantValue( @@ -350,10 +351,10 @@ export default new ContainerModule((bind, unbind, isBound, rebind) => { bind(ILogger) .toDynamicValue((ctx) => { const parentLogger = ctx.container.get(ILogger); - return parentLogger.child('discovery'); + return parentLogger.child('discovery-log'); // TODO: revert }) .inSingletonScope() - .whenTargetNamed('discovery'); + .whenTargetNamed('discovery-log'); // TODO: revert // Logger for the CLI config service. From the CLI config (FS path aware), we make a URI-aware app config. bind(ILogger) diff --git a/arduino-ide-extension/src/node/board-discovery.ts b/arduino-ide-extension/src/node/board-discovery.ts index 9a83df31d..f3a2eac47 100644 --- a/arduino-ide-extension/src/node/board-discovery.ts +++ b/arduino-ide-extension/src/node/board-discovery.ts @@ -1,8 +1,8 @@ -import { injectable, inject, postConstruct, named } from '@theia/core/shared/inversify'; +import { injectable, inject, named } from '@theia/core/shared/inversify'; import { ClientDuplexStream } from '@grpc/grpc-js'; import { ILogger } from '@theia/core/lib/common/logger'; import { deepClone } from '@theia/core/lib/common/objects'; -import { CoreClientAware, CoreClientProvider } from './core-client-provider'; +import { CoreClientAware } from './core-client-provider'; import { BoardListWatchRequest, BoardListWatchResponse, @@ -14,6 +14,19 @@ import { AvailablePorts, AttachedBoardsChangeEvent, } from '../common/protocol'; +import { Emitter } from '@theia/core/lib/common/event'; +import { DisposableCollection } from '@theia/core/lib/common/disposable'; +import { Disposable } from '@theia/core/shared/vscode-languageserver-protocol'; +import { ArduinoCoreServiceClient } from './cli-protocol/cc/arduino/cli/commands/v1/commands_grpc_pb'; +import { v4 } from 'uuid'; +import { ServiceError } from './service-error'; +import { BackendApplicationContribution } from '@theia/core/lib/node'; + +type Duplex = ClientDuplexStream; +interface StreamWrapper extends Disposable { + readonly stream: Duplex; + readonly uuid: string; // For logging only +} /** * Singleton service for tracking the available ports and board and broadcasting the @@ -21,24 +34,27 @@ import { * Unlike other services, this is not connection scoped. */ @injectable() -export class BoardDiscovery extends CoreClientAware { +export class BoardDiscovery + extends CoreClientAware + implements BackendApplicationContribution +{ @inject(ILogger) - @named('discovery') - protected discoveryLogger: ILogger; + @named('discovery-log') + private readonly logger: ILogger; @inject(NotificationServiceServer) - protected readonly notificationService: NotificationServiceServer; + private readonly notificationService: NotificationServiceServer; // Used to know if the board watch process is already running to avoid // starting it multiple times private watching: boolean; - - protected boardWatchDuplex: - | ClientDuplexStream - | undefined; + private wrapper: StreamWrapper | undefined; + private readonly onStreamDidEndEmitter = new Emitter(); // sent from the CLI when the discovery process is killed for example after the indexes update and the core client re-initialization. + private readonly onStreamDidCancelEmitter = new Emitter(); // when the watcher is canceled by the IDE2 + private readonly toDisposeOnStopWatch = new DisposableCollection(); /** - * Keys are the `address` of the ports. \ + * Keys are the `address` of the ports. * The `protocol` is ignored because the board detach event does not carry the protocol information, * just the address. * ```json @@ -48,62 +64,153 @@ export class BoardDiscovery extends CoreClientAware { * } * ``` */ - protected _state: AvailablePorts = {}; + private _state: AvailablePorts = {}; get state(): AvailablePorts { return this._state; } - @postConstruct() - protected async init(): Promise { - this.coreClient.then((client) => this.startBoardListWatch(client)); - this.onClientDidRefresh((client) => - this.stopBoardListWatch(client).then(() => - this.startBoardListWatch(client) - ) - ); + onStart(): void { + this.start(); + this.onClientDidRefresh(() => this.start()); } - stopBoardListWatch(coreClient: CoreClientProvider.Client): Promise { - return new Promise((resolve, reject) => { - if (!this.boardWatchDuplex) { - return resolve(); - } + onStop(): void { + this.stop(); + } - const { instance } = coreClient; - const req = new BoardListWatchRequest(); - req.setInstance(instance); - try { - this.boardWatchDuplex.write(req.setInterrupt(true), resolve); - } catch (e) { - this.discoveryLogger.error(e); - resolve(); + stop(): Promise { + this.logger.info('>>> Stopping boards watcher...'); + return new Promise((resolve, reject) => { + const timeout = this.timeout(BoardDiscovery.StopWatchTimeout, reject); + const toDispose = new DisposableCollection(); + toDispose.pushAll([ + timeout, + this.onStreamDidEndEmitter.event(() => { + this.logger.info( + `<<< Received the end event from the stream. Boards watcher has been successfully stopped.` + ); + this.watching = false; + toDispose.dispose(); + resolve(); + }), + this.onStreamDidCancelEmitter.event(() => { + this.logger.info( + `<<< Received the cancel event from the stream. Boards watcher has been successfully stopped.` + ); + this.watching = false; + toDispose.dispose(); + resolve(); + }), + ]); + this.logger.info('Canceling boards watcher...'); + this.toDisposeOnStopWatch.dispose(); + }); + } + + private timeout( + after: number, + onTimeout: (error: Error) => void + ): Disposable { + const timer = setTimeout( + () => onTimeout(new Error(`Timed out after ${after} ms.`)), + after + ); + return Disposable.create(() => clearTimeout(timer)); + } + + private async write( + req: BoardListWatchRequest, + duplex: Duplex + ): Promise { + return new Promise((resolve, reject) => { + this.logger.info(`>>> Writing ${this.toJson(req)} to the stream...`); + if ( + !duplex.write(req, (err: Error | undefined) => { + if (err) { + this.logger.error( + `<<< Error ocurred while writing to the stream.`, + err + ); + reject(err); + return; + } + }) + ) { + duplex.once('drain', () => { + this.logger.info( + `<<< Board list watch request has been successfully written to the stream after the handling backpressure.` + ); + resolve(); + }); + } else { + process.nextTick(() => { + this.logger.info( + `<<< Board list watch request has been successfully written to the stream.` + ); + resolve(); + }); } }); } - startBoardListWatch(coreClient: CoreClientProvider.Client): void { + private async createWrapper( + client: ArduinoCoreServiceClient + ): Promise { + if (this.wrapper) { + throw new Error(`Duplex was already set.`); + } + const stream = client + .boardListWatch() + .on('end', () => this.onStreamDidEndEmitter.fire()) + .on('error', (error) => { + if (ServiceError.isCancel(error)) { + this.onStreamDidCancelEmitter.fire(); + } else { + this.logger.error( + 'Unexpected error occurred during the boards discovery.', + error + ); + // TODO: terminate? restart? reject? + } + }); + const wrapper = { + stream, + uuid: v4(), + dispose: () => { + // Cancelling the stream will kill the discovery `builtin:mdns-discovery process`. + // The client (this class) will receive a `{"eventType":"quit","error":""}` response from the CLI. + stream.cancel(); + this.wrapper = undefined; + }, + }; + this.toDisposeOnStopWatch.pushAll([wrapper]); + return wrapper; + } + + private toJson(arg: BoardListWatchRequest | BoardListWatchResponse): string { + let object: Record | undefined = undefined; + if (arg instanceof BoardListWatchRequest) { + object = BoardListWatchRequest.toObject(false, arg); + } else if (arg instanceof BoardListWatchResponse) { + object = BoardListWatchResponse.toObject(false, arg); + } else { + throw new Error(`Unhandled object type: ${arg}`); + } + return JSON.stringify(object); + } + + async start(): Promise { if (this.watching) { // We want to avoid starting the board list watch process multiple // times to meet unforeseen consequences return; } - this.watching = true; - const { client, instance } = coreClient; - const req = new BoardListWatchRequest(); - req.setInstance(instance); - this.boardWatchDuplex = client.boardListWatch(); - this.boardWatchDuplex.on('end', () => { - this.watching = false; - console.info('board watch ended'); - }); - this.boardWatchDuplex.on('close', () => { - this.watching = false; - console.info('board watch ended'); - }); - this.boardWatchDuplex.on('data', (resp: BoardListWatchResponse) => { + const { client, instance } = await this.coreClient; + const wrapper = await this.createWrapper(client); + wrapper.stream.on('data', async (resp: BoardListWatchResponse) => { + this.logger.info('onData', this.toJson(resp)); if (resp.getEventType() === 'quit') { - this.watching = false; - console.info('board watch ended'); + await this.stop(); return; } @@ -135,7 +242,9 @@ export class BoardDiscovery extends CoreClientAware { // protocols. const portID = `${address}|${protocol}`; const label = (detectedPort as any).getPort().getLabel(); - const protocolLabel = (detectedPort as any).getPort().getProtocolLabel(); + const protocolLabel = (detectedPort as any) + .getPort() + .getProtocolLabel(); const port = { id: portID, address, @@ -155,8 +264,10 @@ export class BoardDiscovery extends CoreClientAware { if (eventType === 'add') { if (newState[portID]) { const [, knownBoards] = newState[portID]; - console.warn( - `Port '${Port.toString(port)}' was already available. Known boards before override: ${JSON.stringify( + this.logger.warn( + `Port '${Port.toString( + port + )}' was already available. Known boards before override: ${JSON.stringify( knownBoards )}` ); @@ -164,7 +275,9 @@ export class BoardDiscovery extends CoreClientAware { newState[portID] = [port, boards]; } else if (eventType === 'remove') { if (!newState[portID]) { - console.warn(`Port '${Port.toString(port)}' was not available. Skipping`); + this.logger.warn( + `Port '${Port.toString(port)}' was not available. Skipping` + ); return; } delete newState[portID]; @@ -189,7 +302,11 @@ export class BoardDiscovery extends CoreClientAware { this.notificationService.notifyAttachedBoardsDidChange(event); } }); - this.boardWatchDuplex.write(req); + await this.write( + new BoardListWatchRequest().setInstance(instance), + wrapper.stream + ); + this.watching = true; } getAttachedBoards(state: AvailablePorts = this.state): Board[] { @@ -210,3 +327,6 @@ export class BoardDiscovery extends CoreClientAware { return availablePorts; } } +export namespace BoardDiscovery { + export const StopWatchTimeout = 10_000; +} diff --git a/arduino-ide-extension/src/node/boards-service-impl.ts b/arduino-ide-extension/src/node/boards-service-impl.ts index 98cdcb7fb..ae88b4f3b 100644 --- a/arduino-ide-extension/src/node/boards-service-impl.ts +++ b/arduino-ide-extension/src/node/boards-service-impl.ts @@ -414,7 +414,7 @@ export class BoardsServiceImpl console.info('>>> Starting boards package installation...', item); // stop the board discovery - await this.boardDiscovery.stopBoardListWatch(coreClient); + await this.boardDiscovery.stop(); const resp = client.platformInstall(req); resp.on( @@ -426,7 +426,7 @@ export class BoardsServiceImpl ); await new Promise((resolve, reject) => { resp.on('end', () => { - this.boardDiscovery.startBoardListWatch(coreClient); + this.boardDiscovery.start(); // TODO: remove discovery dependency from boards service. See https://github.com/arduino/arduino-ide/pull/1107 why this is here. resolve(); }); resp.on('error', (error) => { @@ -465,7 +465,7 @@ export class BoardsServiceImpl console.info('>>> Starting boards package uninstallation...', item); // stop the board discovery - await this.boardDiscovery.stopBoardListWatch(coreClient); + await this.boardDiscovery.stop(); const resp = client.platformUninstall(req); resp.on( @@ -477,7 +477,7 @@ export class BoardsServiceImpl ); await new Promise((resolve, reject) => { resp.on('end', () => { - this.boardDiscovery.startBoardListWatch(coreClient); + this.boardDiscovery.start(); // TODO: remove discovery dependency from boards service. See https://github.com/arduino/arduino-ide/pull/1107 why this is here. resolve(); }); resp.on('error', reject); diff --git a/arduino-ide-extension/src/node/library-service-impl.ts b/arduino-ide-extension/src/node/library-service-impl.ts index 42ad456c4..e0d3be567 100644 --- a/arduino-ide-extension/src/node/library-service-impl.ts +++ b/arduino-ide-extension/src/node/library-service-impl.ts @@ -269,7 +269,7 @@ export class LibraryServiceImpl console.info('>>> Starting library package installation...', item); // stop the board discovery - await this.boardDiscovery.stopBoardListWatch(coreClient); + await this.boardDiscovery.stop(); const resp = client.libraryInstall(req); resp.on( @@ -281,7 +281,7 @@ export class LibraryServiceImpl ); await new Promise((resolve, reject) => { resp.on('end', () => { - this.boardDiscovery.startBoardListWatch(coreClient); + this.boardDiscovery.start(); // TODO: remove discovery dependency from boards service. See https://github.com/arduino/arduino-ide/pull/1107 why this is here. resolve(); }); resp.on('error', (error) => { @@ -323,7 +323,7 @@ export class LibraryServiceImpl } // stop the board discovery - await this.boardDiscovery.stopBoardListWatch(coreClient); + await this.boardDiscovery.stop(); const resp = client.zipLibraryInstall(req); resp.on( @@ -335,7 +335,7 @@ export class LibraryServiceImpl ); await new Promise((resolve, reject) => { resp.on('end', () => { - this.boardDiscovery.startBoardListWatch(coreClient); + this.boardDiscovery.start(); // TODO: remove discovery dependency from boards service. See https://github.com/arduino/arduino-ide/pull/1107 why this is here. resolve(); }); resp.on('error', reject); @@ -358,7 +358,7 @@ export class LibraryServiceImpl console.info('>>> Starting library package uninstallation...', item); // stop the board discovery - await this.boardDiscovery.stopBoardListWatch(coreClient); + await this.boardDiscovery.stop(); const resp = client.libraryUninstall(req); resp.on( @@ -370,7 +370,7 @@ export class LibraryServiceImpl ); await new Promise((resolve, reject) => { resp.on('end', () => { - this.boardDiscovery.startBoardListWatch(coreClient); + this.boardDiscovery.start(); // TODO: remove discovery dependency from boards service. See https://github.com/arduino/arduino-ide/pull/1107 why this is here. resolve(); }); resp.on('error', reject); diff --git a/arduino-ide-extension/src/node/service-error.ts b/arduino-ide-extension/src/node/service-error.ts index 3abbbc0b0..a56cf13ea 100644 --- a/arduino-ide-extension/src/node/service-error.ts +++ b/arduino-ide-extension/src/node/service-error.ts @@ -2,6 +2,9 @@ import { Metadata, StatusObject } from '@grpc/grpc-js'; export type ServiceError = StatusObject & Error; export namespace ServiceError { + export function isCancel(arg: unknown): arg is ServiceError & { code: 1 } { + return is(arg) && arg.code === 1; // https://grpc.github.io/grpc/core/md_doc_statuscodes.html + } export function is(arg: unknown): arg is ServiceError { return arg instanceof Error && isStatusObjet(arg); } From 4b63d685879c106ed8e317fb91259559e1569a15 Mon Sep 17 00:00:00 2001 From: Akos Kitta Date: Wed, 13 Jul 2022 09:39:44 +0200 Subject: [PATCH 3/5] removed unused logger Signed-off-by: Akos Kitta --- arduino-ide-extension/src/node/boards-service-impl.ts | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/arduino-ide-extension/src/node/boards-service-impl.ts b/arduino-ide-extension/src/node/boards-service-impl.ts index ae88b4f3b..3b084877a 100644 --- a/arduino-ide-extension/src/node/boards-service-impl.ts +++ b/arduino-ide-extension/src/node/boards-service-impl.ts @@ -1,4 +1,4 @@ -import { injectable, inject, named } from '@theia/core/shared/inversify'; +import { injectable, inject } from '@theia/core/shared/inversify'; import { ILogger } from '@theia/core/lib/common/logger'; import { notEmpty } from '@theia/core/lib/common/objects'; import { @@ -50,10 +50,6 @@ export class BoardsServiceImpl @inject(ILogger) protected logger: ILogger; - @inject(ILogger) - @named('discovery') - protected discoveryLogger: ILogger; - @inject(ResponseService) protected readonly responseService: ResponseService; From b50bf427ca4fb2c8f6a42b5d2f0338d1b8f6800e Mon Sep 17 00:00:00 2001 From: Akos Kitta Date: Wed, 13 Jul 2022 10:29:18 +0200 Subject: [PATCH 4/5] rename Signed-off-by: Akos Kitta --- .../src/node/board-discovery.ts | 30 ++++++------------- 1 file changed, 9 insertions(+), 21 deletions(-) diff --git a/arduino-ide-extension/src/node/board-discovery.ts b/arduino-ide-extension/src/node/board-discovery.ts index f3a2eac47..17e2c449b 100644 --- a/arduino-ide-extension/src/node/board-discovery.ts +++ b/arduino-ide-extension/src/node/board-discovery.ts @@ -81,7 +81,10 @@ export class BoardDiscovery stop(): Promise { this.logger.info('>>> Stopping boards watcher...'); return new Promise((resolve, reject) => { - const timeout = this.timeout(BoardDiscovery.StopWatchTimeout, reject); + const timeout = this.createTimeout( + BoardDiscovery.StopWatchTimeout, + reject + ); const toDispose = new DisposableCollection(); toDispose.pushAll([ timeout, @@ -107,7 +110,7 @@ export class BoardDiscovery }); } - private timeout( + private createTimeout( after: number, onTimeout: (error: Error) => void ): Disposable { @@ -118,37 +121,22 @@ export class BoardDiscovery return Disposable.create(() => clearTimeout(timer)); } - private async write( + private async requestStartWatch( req: BoardListWatchRequest, duplex: Duplex ): Promise { return new Promise((resolve, reject) => { - this.logger.info(`>>> Writing ${this.toJson(req)} to the stream...`); if ( !duplex.write(req, (err: Error | undefined) => { if (err) { - this.logger.error( - `<<< Error ocurred while writing to the stream.`, - err - ); reject(err); return; } }) ) { - duplex.once('drain', () => { - this.logger.info( - `<<< Board list watch request has been successfully written to the stream after the handling backpressure.` - ); - resolve(); - }); + duplex.once('drain', resolve); } else { - process.nextTick(() => { - this.logger.info( - `<<< Board list watch request has been successfully written to the stream.` - ); - resolve(); - }); + process.nextTick(resolve); } }); } @@ -302,7 +290,7 @@ export class BoardDiscovery this.notificationService.notifyAttachedBoardsDidChange(event); } }); - await this.write( + await this.requestStartWatch( new BoardListWatchRequest().setInstance(instance), wrapper.stream ); From e9cd290ec7367896d2146ff8ccc3f1017fd805b3 Mon Sep 17 00:00:00 2001 From: Akos Kitta Date: Wed, 13 Jul 2022 14:29:23 +0200 Subject: [PATCH 5/5] fixup. Signed-off-by: Akos Kitta --- .../src/node/board-discovery.ts | 124 +++++++++++------- .../src/node/boards-service-impl.ts | 2 +- 2 files changed, 80 insertions(+), 46 deletions(-) diff --git a/arduino-ide-extension/src/node/board-discovery.ts b/arduino-ide-extension/src/node/board-discovery.ts index 17e2c449b..448d48e42 100644 --- a/arduino-ide-extension/src/node/board-discovery.ts +++ b/arduino-ide-extension/src/node/board-discovery.ts @@ -14,13 +14,14 @@ import { AvailablePorts, AttachedBoardsChangeEvent, } from '../common/protocol'; -import { Emitter } from '@theia/core/lib/common/event'; +import { Emitter, Event } from '@theia/core/lib/common/event'; import { DisposableCollection } from '@theia/core/lib/common/disposable'; import { Disposable } from '@theia/core/shared/vscode-languageserver-protocol'; import { ArduinoCoreServiceClient } from './cli-protocol/cc/arduino/cli/commands/v1/commands_grpc_pb'; import { v4 } from 'uuid'; import { ServiceError } from './service-error'; import { BackendApplicationContribution } from '@theia/core/lib/node'; +import { Deferred } from '@theia/core/lib/common/promise-util'; type Duplex = ClientDuplexStream; interface StreamWrapper extends Disposable { @@ -30,7 +31,8 @@ interface StreamWrapper extends Disposable { /** * Singleton service for tracking the available ports and board and broadcasting the - * changes to all connected frontend instances. \ + * changes to all connected frontend instances. + * * Unlike other services, this is not connection scoped. */ @injectable() @@ -45,9 +47,8 @@ export class BoardDiscovery @inject(NotificationServiceServer) private readonly notificationService: NotificationServiceServer; - // Used to know if the board watch process is already running to avoid - // starting it multiple times - private watching: boolean; + private watching: Deferred | undefined; + private stopping: Deferred | undefined; private wrapper: StreamWrapper | undefined; private readonly onStreamDidEndEmitter = new Emitter(); // sent from the CLI when the discovery process is killed for example after the indexes update and the core client re-initialization. private readonly onStreamDidCancelEmitter = new Emitter(); // when the watcher is canceled by the IDE2 @@ -55,6 +56,7 @@ export class BoardDiscovery /** * Keys are the `address` of the ports. + * * The `protocol` is ignored because the board detach event does not carry the protocol information, * just the address. * ```json @@ -64,46 +66,57 @@ export class BoardDiscovery * } * ``` */ - private _state: AvailablePorts = {}; - get state(): AvailablePorts { - return this._state; + private _availablePorts: AvailablePorts = {}; + get availablePorts(): AvailablePorts { + return this._availablePorts; } onStart(): void { this.start(); - this.onClientDidRefresh(() => this.start()); + this.onClientDidRefresh(() => this.restart()); + } + + private async restart(): Promise { + this.logger.info('restarting before stop'); + await this.stop(); + this.logger.info('restarting after stop'); + return this.start(); } onStop(): void { this.stop(); } - stop(): Promise { + async stop(restart = false): Promise { + this.logger.info('stop'); + if (this.stopping) { + this.logger.info('stop already stopping'); + return this.stopping.promise; + } + if (!this.watching) { + return; + } + this.stopping = new Deferred(); this.logger.info('>>> Stopping boards watcher...'); return new Promise((resolve, reject) => { - const timeout = this.createTimeout( - BoardDiscovery.StopWatchTimeout, - reject - ); + const timeout = this.createTimeout(10_000, reject); const toDispose = new DisposableCollection(); - toDispose.pushAll([ - timeout, - this.onStreamDidEndEmitter.event(() => { - this.logger.info( - `<<< Received the end event from the stream. Boards watcher has been successfully stopped.` - ); - this.watching = false; + const waitForEvent = (event: Event) => + event(() => { + this.logger.info('stop received event: either end or cancel'); toDispose.dispose(); + this.stopping?.resolve(); + this.stopping = undefined; + this.logger.info('stop stopped'); resolve(); - }), - this.onStreamDidCancelEmitter.event(() => { - this.logger.info( - `<<< Received the cancel event from the stream. Boards watcher has been successfully stopped.` - ); - this.watching = false; - toDispose.dispose(); - resolve(); - }), + if (restart) { + this.start(); + } + }); + toDispose.pushAll([ + timeout, + waitForEvent(this.onStreamDidEndEmitter.event), + waitForEvent(this.onStreamDidCancelEmitter.event), ]); this.logger.info('Canceling boards watcher...'); this.toDisposeOnStopWatch.dispose(); @@ -149,9 +162,14 @@ export class BoardDiscovery } const stream = client .boardListWatch() - .on('end', () => this.onStreamDidEndEmitter.fire()) + .on('end', () => { + this.logger.info('received end'); + this.onStreamDidEndEmitter.fire(); + }) .on('error', (error) => { + this.logger.info('error received'); if (ServiceError.isCancel(error)) { + this.logger.info('cancel error received!'); this.onStreamDidCancelEmitter.fire(); } else { this.logger.error( @@ -165,13 +183,21 @@ export class BoardDiscovery stream, uuid: v4(), dispose: () => { + this.logger.info('disposing requesting cancel'); // Cancelling the stream will kill the discovery `builtin:mdns-discovery process`. // The client (this class) will receive a `{"eventType":"quit","error":""}` response from the CLI. stream.cancel(); + this.logger.info('disposing canceled'); this.wrapper = undefined; }, }; - this.toDisposeOnStopWatch.pushAll([wrapper]); + this.toDisposeOnStopWatch.pushAll([ + wrapper, + Disposable.create(() => { + this.watching?.reject(new Error(`Stopping watcher.`)); + this.watching = undefined; + }), + ]); return wrapper; } @@ -188,17 +214,25 @@ export class BoardDiscovery } async start(): Promise { + this.logger.info('start'); + if (this.stopping) { + this.logger.info('start is stopping wait'); + await this.stopping.promise; + this.logger.info('start stopped'); + } if (this.watching) { - // We want to avoid starting the board list watch process multiple - // times to meet unforeseen consequences - return; + this.logger.info('start already watching'); + return this.watching.promise; } + this.watching = new Deferred(); + this.logger.info('start new deferred'); const { client, instance } = await this.coreClient; const wrapper = await this.createWrapper(client); wrapper.stream.on('data', async (resp: BoardListWatchResponse) => { this.logger.info('onData', this.toJson(resp)); if (resp.getEventType() === 'quit') { - await this.stop(); + this.logger.info('quit received'); + this.stop(); return; } @@ -217,8 +251,8 @@ export class BoardDiscovery throw new Error(`Unexpected event type: '${resp.getEventType()}'`); } - const oldState = deepClone(this._state); - const newState = deepClone(this._state); + const oldState = deepClone(this._availablePorts); + const newState = deepClone(this._availablePorts); const address = (detectedPort as any).getPort().getAddress(); const protocol = (detectedPort as any).getPort().getProtocol(); @@ -286,18 +320,21 @@ export class BoardDiscovery }, }; - this._state = newState; + this._availablePorts = newState; this.notificationService.notifyAttachedBoardsDidChange(event); } }); + this.logger.info('start request start watch'); await this.requestStartWatch( new BoardListWatchRequest().setInstance(instance), wrapper.stream ); - this.watching = true; + this.logger.info('start requested start watch'); + this.watching.resolve(); + this.logger.info('start resolved watching'); } - getAttachedBoards(state: AvailablePorts = this.state): Board[] { + getAttachedBoards(state: AvailablePorts = this.availablePorts): Board[] { const attachedBoards: Board[] = []; for (const portID of Object.keys(state)) { const [, boards] = state[portID]; @@ -306,7 +343,7 @@ export class BoardDiscovery return attachedBoards; } - getAvailablePorts(state: AvailablePorts = this.state): Port[] { + getAvailablePorts(state: AvailablePorts = this.availablePorts): Port[] { const availablePorts: Port[] = []; for (const portID of Object.keys(state)) { const [port] = state[portID]; @@ -315,6 +352,3 @@ export class BoardDiscovery return availablePorts; } } -export namespace BoardDiscovery { - export const StopWatchTimeout = 10_000; -} diff --git a/arduino-ide-extension/src/node/boards-service-impl.ts b/arduino-ide-extension/src/node/boards-service-impl.ts index 3b084877a..5f2e1e64e 100644 --- a/arduino-ide-extension/src/node/boards-service-impl.ts +++ b/arduino-ide-extension/src/node/boards-service-impl.ts @@ -60,7 +60,7 @@ export class BoardsServiceImpl protected readonly boardDiscovery: BoardDiscovery; async getState(): Promise { - return this.boardDiscovery.state; + return this.boardDiscovery.availablePorts; } async getAttachedBoards(): Promise {