From 6d62616da486b4da859b6c3956052ba22ec00ad4 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 16 Nov 2023 14:32:40 -0500 Subject: [PATCH 01/44] refactor(NODE-5741): use async io helpers From 0994beb59e775bfb5f9c9f469309b5e557400b5c Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 16 Nov 2023 14:33:22 -0500 Subject: [PATCH 02/44] refactor: move command preparing to helper --- src/cmap/connection.ts | 51 ++++++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index d505496519e..dc026ad23ad 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -1116,12 +1116,7 @@ export class ModernConnection extends TypedEventEmitter { } } - command( - ns: MongoDBNamespace, - command: Document, - options: CommandOptions | undefined, - callback: Callback - ): void { + private prepareCommand(db: string, command: Document, options: CommandOptions) { let cmd = { ...command }; const readPreference = getReadPreference(options); @@ -1145,12 +1140,10 @@ export class ModernConnection extends TypedEventEmitter { clusterTime = session.clusterTime; } - const err = applySession(session, cmd, options); - if (err) { - return callback(err); - } + const sessionError = applySession(session, cmd, options); + if (sessionError) throw sessionError; } else if (session?.explicit) { - return callback(new MongoCompatibilityError('Current topology does not support sessions')); + throw new MongoCompatibilityError('Current topology does not support sessions'); } // if we have a known cluster time, gossip it @@ -1171,23 +1164,33 @@ export class ModernConnection extends TypedEventEmitter { }; } - const commandOptions: Document = Object.assign( - { - numberToSkip: 0, - numberToReturn: -1, - checkKeys: false, - // This value is not overridable - secondaryOk: readPreference.secondaryOk() - }, - options - ); + const commandOptions = { + numberToSkip: 0, + numberToReturn: -1, + checkKeys: false, + // This value is not overridable + secondaryOk: readPreference.secondaryOk(), + ...options, + readPreference // ensure we pass in ReadPreference instance + }; const message = this.supportsOpMsg - ? new OpMsgRequest(ns.db, cmd, commandOptions) - : new OpQueryRequest(ns.db, cmd, commandOptions); + ? new OpMsgRequest(db, cmd, commandOptions) + : new OpQueryRequest(db, cmd, commandOptions); + + return message; + } + + command( + ns: MongoDBNamespace, + command: Document, + options: CommandOptions = {}, + callback: Callback + ): void { + const message = this.prepareCommand(ns.db, command, options); try { - write(this as any as Connection, message, commandOptions, callback); + write(this as any as Connection, message, options, callback); } catch (err) { callback(err); } From 8f3d6df54cb2fe4ba17c35cad97331dd2c874b7f Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 16 Nov 2023 14:37:35 -0500 Subject: [PATCH 03/44] refactor: make command async --- src/cmap/connection.ts | 51 +++++++++++++++++++++++++++++++++++------- 1 file changed, 43 insertions(+), 8 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index dc026ad23ad..e7bfe1d6134 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -1181,19 +1181,54 @@ export class ModernConnection extends TypedEventEmitter { return message; } - command( + async command( ns: MongoDBNamespace, command: Document, - options: CommandOptions = {}, - callback: Callback - ): void { + options: CommandOptions = {} + ): Promise { const message = this.prepareCommand(ns.db, command, options); - try { - write(this as any as Connection, message, options, callback); - } catch (err) { - callback(err); + let started = 0; + if (this.monitorCommands) { + started = now(); + this.emit( + ModernConnection.COMMAND_STARTED, + new CommandStartedEvent(this as unknown as Connection, message) + ); + } + + const document = await this.sendCommand(message, options); + + if (document.writeConcernError) { + const writeConcernError = new MongoWriteConcernError(document.writeConcernError, document); + if (this.monitorCommands) { + this.emit( + ModernConnection.COMMAND_SUCCEEDED, + new CommandSucceededEvent(this as unknown as Connection, message, document, started) + ); + } + throw writeConcernError; + } + + if (document.ok === 0 || document.$err || document.errmsg || document.code) { + const serverError = new MongoServerError(document); + if (this.monitorCommands) { + this.emit( + ModernConnection.COMMAND_FAILED, + new CommandFailedEvent(this as unknown as Connection, message, serverError, started) + ); + } + throw serverError; } + + if (this.monitorCommands) { + this.emit( + ModernConnection.COMMAND_SUCCEEDED, + new CommandSucceededEvent(this as unknown as Connection, message, document, started) + ); + } + + return document; } } From 6b97abcd0179916d8fd046ea06027a75bb6781a2 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 16 Nov 2023 14:40:50 -0500 Subject: [PATCH 04/44] refactor: add sendCommand helper for send and recv --- src/cmap/connection.ts | 45 ++++++++++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index e7bfe1d6134..45f5af2d27c 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -783,11 +783,7 @@ export class ModernConnection extends TypedEventEmitter { lastHelloMS?: number; serverApi?: ServerApi; helloOk?: boolean; - commandAsync: ( - ns: MongoDBNamespace, - cmd: Document, - options: CommandOptions | undefined - ) => Promise; + commandAsync: ModernConnection['command']; /** @internal */ authContext?: AuthContext; @@ -830,14 +826,7 @@ export class ModernConnection extends TypedEventEmitter { constructor(stream: Stream, options: ConnectionOptions) { super(); - this.commandAsync = promisify( - ( - ns: MongoDBNamespace, - cmd: Document, - options: CommandOptions | undefined, - callback: Callback - ) => this.command(ns, cmd, options, callback as any) - ); + this.commandAsync = this.command.bind(this); this.id = options.id; this.address = streamIdentifier(stream, options); @@ -1181,6 +1170,36 @@ export class ModernConnection extends TypedEventEmitter { return message; } + private async sendCommand( + message: WriteProtocolMessageType, + options: CommandOptions + ): Promise { + await writeCommand(this, message, { + agreedCompressor: this.description.compressor ?? 'none', + zlibCompressionLevel: this.description.zlibCompressionLevel + }); + + const response = await read(this); + + response.parse(options); + + const [document] = response.documents; + + if (!Buffer.isBuffer(document)) { + const { session } = options; + if (session) { + updateSessionFromResponse(session, document); + } + + if (document.$clusterTime) { + this[kClusterTime] = document.$clusterTime; + this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime); + } + } + + return document; + } + async command( ns: MongoDBNamespace, command: Document, From 114ec148416806e4075249ae2fb7ebda487bb132 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 16 Nov 2023 14:41:34 -0500 Subject: [PATCH 05/44] refactor: remove onMessage --- src/cmap/connection.ts | 90 ------------------------------------------ 1 file changed, 90 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 45f5af2d27c..20a8511e197 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -951,96 +951,6 @@ export class ModernConnection extends TypedEventEmitter { }, 1).unref(); // No need for this timer to hold the event loop open } - onMessage(message: OpMsgResponse | OpQueryResponse) { - const delayedTimeoutId = this[kDelayedTimeoutId]; - if (delayedTimeoutId != null) { - clearTimeout(delayedTimeoutId); - this[kDelayedTimeoutId] = null; - } - - const socketTimeoutMS = this.socket.timeout ?? 0; - this.socket.setTimeout(0); - - // always emit the message, in case we are streaming - this.emit('message', message); - let operationDescription = this[kQueue].get(message.responseTo); - - if (!operationDescription && this.isMonitoringConnection) { - // This is how we recover when the initial hello's requestId is not - // the responseTo when hello responses have been skipped: - - // First check if the map is of invalid size - if (this[kQueue].size > 1) { - this.cleanup(true, new MongoRuntimeError(INVALID_QUEUE_SIZE)); - } else { - // Get the first orphaned operation description. - const entry = this[kQueue].entries().next(); - if (entry.value != null) { - const [requestId, orphaned]: [number, OperationDescription] = entry.value; - // If the orphaned operation description exists then set it. - operationDescription = orphaned; - // Remove the entry with the bad request id from the queue. - this[kQueue].delete(requestId); - } - } - } - - if (!operationDescription) { - return; - } - - const callback = operationDescription.cb; - - // SERVER-45775: For exhaust responses we should be able to use the same requestId to - // track response, however the server currently synthetically produces remote requests - // making the `responseTo` change on each response - this[kQueue].delete(message.responseTo); - if ('moreToCome' in message && message.moreToCome) { - // If the operation description check above does find an orphaned - // description and sets the operationDescription then this line will put one - // back in the queue with the correct requestId and will resolve not being able - // to find the next one via the responseTo of the next streaming hello. - this[kQueue].set(message.requestId, operationDescription); - this.socket.setTimeout(socketTimeoutMS); - } - - try { - // Pass in the entire description because it has BSON parsing options - message.parse(operationDescription); - } catch (err) { - // If this error is generated by our own code, it will already have the correct class applied - // if it is not, then it is coming from a catastrophic data parse failure or the BSON library - // in either case, it should not be wrapped - callback(err); - return; - } - - if (message.documents[0]) { - const document: Document = message.documents[0]; - const session = operationDescription.session; - if (session) { - updateSessionFromResponse(session, document); - } - - if (document.$clusterTime) { - this[kClusterTime] = document.$clusterTime; - this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime); - } - - if (document.writeConcernError) { - callback(new MongoWriteConcernError(document.writeConcernError, document), document); - return; - } - - if (document.ok === 0 || document.$err || document.errmsg || document.code) { - callback(new MongoServerError(document)); - return; - } - } - - callback(undefined, message.documents[0]); - } - destroy(options: DestroyOptions, callback?: Callback): void { if (this.closed) { process.nextTick(() => callback?.()); From d08f98591598908805c585abf28f97a193ab3666 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 16 Nov 2023 14:48:26 -0500 Subject: [PATCH 06/44] refactor: remove message stream --- src/cmap/connection.ts | 43 +++--------------------------------------- 1 file changed, 3 insertions(+), 40 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 20a8511e197..e92af1bb7c3 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -796,10 +796,6 @@ export class ModernConnection extends TypedEventEmitter { /** @internal */ [kLastUseTime]: number; /** @internal */ - [kQueue]: Map; - /** @internal */ - [kMessageStream]: MessageStream; - /** @internal */ socket: Stream; /** @internal */ [kHello]: Document | null; @@ -841,27 +837,12 @@ export class ModernConnection extends TypedEventEmitter { this[kGeneration] = options.generation; this[kLastUseTime] = now(); - // setup parser stream and message handling - this[kQueue] = new Map(); - this[kMessageStream] = new MessageStream({ - ...options, - maxBsonMessageSize: this.hello?.maxBsonMessageSize - }); this.socket = stream; this[kDelayedTimeoutId] = null; - this[kMessageStream].on('message', message => this.onMessage(message)); - this[kMessageStream].on('error', error => this.onError(error)); - this.socket.on('close', () => this.onClose()); - this.socket.on('timeout', () => this.onTimeout()); - this.socket.on('error', () => { - /* ignore errors, listen to `close` instead */ - }); - - // hook the message stream up to the passed in stream - this.socket.pipe(this[kMessageStream]); - this[kMessageStream].pipe(this.socket); + this.socket.once('timeout', this.onTimeout.bind(this)); + this.socket.once('close', this.onTimeout.bind(this)); } get description(): StreamDescription { @@ -881,15 +862,6 @@ export class ModernConnection extends TypedEventEmitter { this[kHello] = response; } - // Set the whether the message stream is for a monitoring connection. - set isMonitoringConnection(value: boolean) { - this[kMessageStream].isMonitoringConnection = value; - } - - get isMonitoringConnection(): boolean { - return this[kMessageStream].isMonitoringConnection; - } - get serviceId(): ObjectId | undefined { return this.hello?.serviceId; } @@ -977,7 +949,7 @@ export class ModernConnection extends TypedEventEmitter { * * This method does nothing if the connection is already closed. */ - private cleanup(force: boolean, error?: Error): void { + private cleanup(force: boolean, _error?: Error): void { if (this.closed) { return; } @@ -985,19 +957,10 @@ export class ModernConnection extends TypedEventEmitter { this.closed = true; const completeCleanup = () => { - for (const op of this[kQueue].values()) { - op.cb(error); - } - - this[kQueue].clear(); - this.emit(Connection.CLOSE); }; this.socket.removeAllListeners(); - this[kMessageStream].removeAllListeners(); - - this[kMessageStream].destroy(); if (force) { this.socket.destroy(); From 4d9c5f614999052747c74fe80299a70b8e9067bb Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 16 Nov 2023 15:01:02 -0500 Subject: [PATCH 07/44] refactor: update cleanup method --- src/cmap/connection.ts | 27 ++++----------------------- 1 file changed, 4 insertions(+), 23 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index e92af1bb7c3..797dce33b3a 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -925,7 +925,7 @@ export class ModernConnection extends TypedEventEmitter { destroy(options: DestroyOptions, callback?: Callback): void { if (this.closed) { - process.nextTick(() => callback?.()); + if (typeof callback === 'function') process.nextTick(callback); return; } if (typeof callback === 'function') { @@ -949,33 +949,14 @@ export class ModernConnection extends TypedEventEmitter { * * This method does nothing if the connection is already closed. */ - private cleanup(force: boolean, _error?: Error): void { + private cleanup(force: boolean, error?: Error): void { if (this.closed) { return; } this.closed = true; - - const completeCleanup = () => { - this.emit(Connection.CLOSE); - }; - - this.socket.removeAllListeners(); - - if (force) { - this.socket.destroy(); - completeCleanup(); - return; - } - - if (!this.socket.writableEnded) { - this.socket.end(() => { - this.socket.destroy(); - completeCleanup(); - }); - } else { - completeCleanup(); - } + this.socket.destroy(error); + this.emit(Connection.CLOSE); } private prepareCommand(db: string, command: Document, options: CommandOptions) { From 9414f0a2e44931a953884ef4bccab6d27a970b4c Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 16 Nov 2023 15:04:58 -0500 Subject: [PATCH 08/44] refactor: remove force flag and support noResponse --- src/cmap/connection.ts | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 797dce33b3a..d258994ee61 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -907,19 +907,19 @@ export class ModernConnection extends TypedEventEmitter { } onError(error: Error) { - this.cleanup(true, error); + this.cleanup(error); } onClose() { const message = `connection ${this.id} to ${this.address} closed`; - this.cleanup(true, new MongoNetworkError(message)); + this.cleanup(new MongoNetworkError(message)); } onTimeout() { this[kDelayedTimeoutId] = setTimeout(() => { const message = `connection ${this.id} to ${this.address} timed out`; const beforeHandshake = this.hello == null; - this.cleanup(true, new MongoNetworkTimeoutError(message, { beforeHandshake })); + this.cleanup(new MongoNetworkTimeoutError(message, { beforeHandshake })); }, 1).unref(); // No need for this timer to hold the event loop open } @@ -938,7 +938,7 @@ export class ModernConnection extends TypedEventEmitter { this.removeAllListeners(Connection.PINNED); this.removeAllListeners(Connection.UNPINNED); const message = `connection ${this.id} to ${this.address} closed`; - this.cleanup(options.force, new MongoNetworkError(message)); + this.cleanup(new MongoNetworkError(message)); } /** @@ -949,7 +949,7 @@ export class ModernConnection extends TypedEventEmitter { * * This method does nothing if the connection is already closed. */ - private cleanup(force: boolean, error?: Error): void { + private cleanup(error?: Error): void { if (this.closed) { return; } @@ -1033,6 +1033,8 @@ export class ModernConnection extends TypedEventEmitter { zlibCompressionLevel: this.description.zlibCompressionLevel }); + if (options.noResponse) return { ok: 1 }; + const response = await read(this); response.parse(options); From 308075256bafcfbf924f210ea9599ddcb3a2c8bf Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 16 Nov 2023 15:06:49 -0500 Subject: [PATCH 09/44] refactor: remove drain event listener --- src/cmap/connection.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index d258994ee61..2752eb5bf83 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -1162,7 +1162,6 @@ export async function writeCommand( command: WriteProtocolMessageType, options: Partial> ): Promise { - const drained = once(connection.socket, 'drain'); const finalCommand = options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command) ? command @@ -1171,8 +1170,7 @@ export async function writeCommand( zlibCompressionLevel: options.zlibCompressionLevel ?? 0 }); const buffer = Buffer.concat(await finalCommand.toBin()); - connection.socket.push(buffer); - await drained; + await promisify(connection.socket.write.bind(connection.socket))(buffer); } /** From aa9a6550329569b89174fbee07dd39af1ea9cf6b Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 17 Nov 2023 14:46:57 -0500 Subject: [PATCH 10/44] refactor: connect error handling to signal --- src/cmap/connection.ts | 73 +++++++++++++++++++++++++++++------------- src/sdam/monitor.ts | 31 ++++++++---------- src/sdam/server.ts | 19 ++++++----- src/utils.ts | 5 +++ 4 files changed, 80 insertions(+), 48 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 2752eb5bf83..cdf3b9f3be3 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -1,4 +1,3 @@ -import { once } from 'events'; import { on } from 'stream'; import { clearTimeout, setTimeout } from 'timers'; import { promisify } from 'util'; @@ -35,6 +34,7 @@ import { type Callback, HostAddress, maxWireVersion, + mayAbort, type MongoDBNamespace, now, uuidV4 @@ -797,6 +797,7 @@ export class ModernConnection extends TypedEventEmitter { [kLastUseTime]: number; /** @internal */ socket: Stream; + controller: AbortController; /** @internal */ [kHello]: Document | null; /** @internal */ @@ -838,11 +839,9 @@ export class ModernConnection extends TypedEventEmitter { this[kLastUseTime] = now(); this.socket = stream; + this.controller = new AbortController(); this[kDelayedTimeoutId] = null; - - this.socket.once('timeout', this.onTimeout.bind(this)); - this.socket.once('close', this.onTimeout.bind(this)); } get description(): StreamDescription { @@ -906,10 +905,6 @@ export class ModernConnection extends TypedEventEmitter { this[kLastUseTime] = now(); } - onError(error: Error) { - this.cleanup(error); - } - onClose() { const message = `connection ${this.id} to ${this.address} closed`; this.cleanup(new MongoNetworkError(message)); @@ -955,7 +950,8 @@ export class ModernConnection extends TypedEventEmitter { } this.closed = true; - this.socket.destroy(error); + this.socket.destroy(); + this.controller.abort(error); this.emit(Connection.CLOSE); } @@ -1028,14 +1024,28 @@ export class ModernConnection extends TypedEventEmitter { message: WriteProtocolMessageType, options: CommandOptions ): Promise { - await writeCommand(this, message, { - agreedCompressor: this.description.compressor ?? 'none', - zlibCompressionLevel: this.description.zlibCompressionLevel - }); + const { signal } = this.controller; - if (options.noResponse) return { ok: 1 }; + if (typeof options.socketTimeoutMS === 'number') { + this.socket.setTimeout(options.socketTimeoutMS); + } else if (this.socketTimeoutMS !== 0) { + this.socket.setTimeout(this.socketTimeoutMS); + } + + let response; + try { + await writeCommand(this, message, { + agreedCompressor: this.description.compressor ?? 'none', + zlibCompressionLevel: this.description.zlibCompressionLevel, + signal + }); - const response = await read(this); + if (options.noResponse) return { ok: 1 }; + + response = await read(this, { signal }); + } finally { + this.controller = new AbortController(); + } response.parse(options); @@ -1061,6 +1071,12 @@ export class ModernConnection extends TypedEventEmitter { command: Document, options: CommandOptions = {} ): Promise { + // Temporary to make sure no callback usage: + // eslint-disable-next-line prefer-rest-params + if (typeof arguments[arguments.length - 1] === 'function') { + throw new Error('no callbacks allowed!!'); + } + const message = this.prepareCommand(ns.db, command, options); let started = 0; @@ -1121,11 +1137,12 @@ const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4; * Note that `for-await` loops call `return` automatically when the loop is exited. */ export async function* readWireProtocolMessages( - connection: ModernConnection + connection: ModernConnection, + { signal }: { signal?: AbortSignal } = {} ): AsyncGenerator { const bufferPool = new BufferPool(); const maxBsonMessageSize = connection.hello?.maxBsonMessageSize ?? kDefaultMaxBsonMessageSize; - for await (const [chunk] of on(connection.socket, 'data')) { + for await (const [chunk] of on(connection.socket, 'data', { signal })) { bufferPool.append(chunk); const sizeOfMessage = bufferPool.getInt32(); @@ -1160,7 +1177,9 @@ export async function* readWireProtocolMessages( export async function writeCommand( connection: ModernConnection, command: WriteProtocolMessageType, - options: Partial> + options: Partial> & { + signal?: AbortSignal; + } ): Promise { const finalCommand = options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command) @@ -1170,7 +1189,11 @@ export async function writeCommand( zlibCompressionLevel: options.zlibCompressionLevel ?? 0 }); const buffer = Buffer.concat(await finalCommand.toBin()); - await promisify(connection.socket.write.bind(connection.socket))(buffer); + options.signal?.throwIfAborted(); + await Promise.race([ + promisify(connection.socket.write.bind(connection.socket))(buffer), + mayAbort(options.signal) + ]); } /** @@ -1183,9 +1206,10 @@ export async function writeCommand( * Note that `for-await` loops call `return` automatically when the loop is exited. */ export async function* readMany( - connection: ModernConnection + connection: ModernConnection, + options: { signal?: AbortSignal } = {} ): AsyncGenerator { - for await (const message of readWireProtocolMessages(connection)) { + for await (const message of readWireProtocolMessages(connection, options)) { const response = await decompressResponse(message); yield response; @@ -1200,8 +1224,11 @@ export async function* readMany( * * Reads a single wire protocol message out of a connection. */ -export async function read(connection: ModernConnection): Promise { - for await (const value of readMany(connection)) { +export async function read( + connection: ModernConnection, + options: { signal?: AbortSignal } = {} +): Promise { + for await (const value of readMany(connection, options)) { return value; } diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index f4d9404d0cb..1cc7cfad7f1 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -127,24 +127,21 @@ export class Monitor extends TypedEventEmitter { const cancellationToken = this[kCancellationToken]; // TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration - const connectOptions = Object.assign( - { - id: '' as const, - generation: server.pool.generation, - connectionType: Connection, - cancellationToken, - hostAddress: server.description.hostAddress - }, - options, + const connectOptions = { + id: '' as const, + generation: server.pool.generation, + cancellationToken, + hostAddress: server.description.hostAddress, + ...options, // force BSON serialization options - { - raw: false, - useBigInt64: false, - promoteLongs: true, - promoteValues: true, - promoteBuffers: true - } - ); + raw: false, + useBigInt64: false, + promoteLongs: true, + promoteValues: true, + promoteBuffers: true, + // TODO(NODE-5741): override monitors to use old connection + connectionType: Connection + }; // ensure no authentication is used for monitoring delete connectOptions.credentials; diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 8e6d2b13d39..04139c991ff 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -367,14 +367,13 @@ export class Server extends TypedEventEmitter { return cb(err); } - conn.command( - ns, - cmd, - finalOptions, - makeOperationHandler(this, conn, cmd, finalOptions, (error, response) => { - this.decrementOperationCount(); - cb(error, response); - }) + const handler = makeOperationHandler(this, conn, cmd, finalOptions, (error, response) => { + this.decrementOperationCount(); + cb(error, response); + }); + conn.commandAsync(ns, cmd, finalOptions).then( + r => handler(undefined, r), + e => handler(e) ); }, callback @@ -532,6 +531,10 @@ function makeOperationHandler( return callback(new MongoUnexpectedServerResponseError('Empty response with no error')); } + if (error.name === 'AbortError' && error.cause instanceof MongoError) { + error = error.cause; + } + if (!(error instanceof MongoError)) { // Node.js or some other error we have not special handling for return callback(error); diff --git a/src/utils.ts b/src/utils.ts index 8bf4425c18b..4a217a91a62 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1299,3 +1299,8 @@ export const COSMOS_DB_MSG = export function isHostMatch(match: RegExp, host?: string): boolean { return host && match.test(host.toLowerCase()) ? true : false; } + +export async function mayAbort(signal?: AbortSignal) { + if (signal == null) return new Promise(() => null); // never ending story + return new Promise((_, reject) => signal.addEventListener('abort', reject)); +} From b816ce1b1f70d0b3b11cbaa926cf54ebebb9bb21 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 17 Nov 2023 15:29:29 -0500 Subject: [PATCH 11/44] test: add new variants enabling ModernConnection --- .evergreen/config.yml | 35 ++++++++++++++++++++++++++ .evergreen/generate_evergreen_tasks.js | 11 ++++++++ .evergreen/run-tests.sh | 1 + src/cmap/connection_pool.ts | 2 +- test/tools/runner/config.ts | 5 ++++ 5 files changed, 53 insertions(+), 1 deletion(-) diff --git a/.evergreen/config.yml b/.evergreen/config.yml index 2addb34e4bb..d08614663ad 100644 --- a/.evergreen/config.yml +++ b/.evergreen/config.yml @@ -4558,6 +4558,41 @@ buildvariants: - test-3.6-server-noauth - test-3.6-replica_set-noauth - test-3.6-sharded_cluster-noauth + - name: rhel8-new-connection-tests + display_name: New Connection Tests + run_on: rhel80-large + expansions: + CLIENT_ENCRYPTION: true + MONGODB_NEW_CONNECTION: true + tasks: + - test-latest-server + - test-latest-replica_set + - test-latest-sharded_cluster + - test-rapid-server + - test-rapid-replica_set + - test-rapid-sharded_cluster + - test-7.0-server + - test-7.0-replica_set + - test-7.0-sharded_cluster + - test-6.0-server + - test-6.0-replica_set + - test-6.0-sharded_cluster + - test-5.0-server + - test-5.0-replica_set + - test-5.0-sharded_cluster + - test-4.4-server + - test-4.4-replica_set + - test-4.4-sharded_cluster + - test-4.2-server + - test-4.2-replica_set + - test-4.2-sharded_cluster + - test-4.0-server + - test-4.0-replica_set + - test-4.0-sharded_cluster + - test-3.6-server + - test-3.6-replica_set + - test-3.6-sharded_cluster + - test-latest-server-v1-api - name: rhel8-test-lambda display_name: AWS Lambda handler tests run_on: rhel80-large diff --git a/.evergreen/generate_evergreen_tasks.js b/.evergreen/generate_evergreen_tasks.js index 3c89e085c0c..9ce5f5c0491 100644 --- a/.evergreen/generate_evergreen_tasks.js +++ b/.evergreen/generate_evergreen_tasks.js @@ -725,6 +725,17 @@ BUILD_VARIANTS.push({ tasks: AUTH_DISABLED_TASKS.map(({ name }) => name) }); +BUILD_VARIANTS.push({ + name: 'rhel8-new-connection-tests', + display_name: 'New Connection Tests', + run_on: DEFAULT_OS, + expansions: { + CLIENT_ENCRYPTION: true, + MONGODB_NEW_CONNECTION: true + }, + tasks: BASE_TASKS.map(({ name }) => name) +}); + BUILD_VARIANTS.push({ name: 'rhel8-test-lambda', display_name: 'AWS Lambda handler tests', diff --git a/.evergreen/run-tests.sh b/.evergreen/run-tests.sh index 080857cd381..170547a6340 100755 --- a/.evergreen/run-tests.sh +++ b/.evergreen/run-tests.sh @@ -63,4 +63,5 @@ export MONGODB_URI=${MONGODB_URI} export LOAD_BALANCER=${LOAD_BALANCER} export TEST_CSFLE=${TEST_CSFLE} export COMPRESSOR=${COMPRESSOR} +export MONGODB_NEW_CONNECTION=${MONGODB_NEW_CONNECTION} npm run "${TEST_NPM_SCRIPT}" diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 187d6b09a00..a4f0edfac0a 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -218,8 +218,8 @@ export class ConnectionPool extends TypedEventEmitter { super(); this.options = Object.freeze({ - ...options, connectionType: Connection, + ...options, maxPoolSize: options.maxPoolSize ?? 100, minPoolSize: options.minPoolSize ?? 0, maxConnecting: options.maxConnecting ?? 2, diff --git a/test/tools/runner/config.ts b/test/tools/runner/config.ts index b24a9e77d87..ac3d05605ac 100644 --- a/test/tools/runner/config.ts +++ b/test/tools/runner/config.ts @@ -6,6 +6,7 @@ import * as url from 'url'; import { type AuthMechanism, HostAddress, + ModernConnection, MongoClient, type ServerApi, TopologyType, @@ -241,6 +242,10 @@ export class TestConfiguration { throw new Error(`Cannot use options to specify host/port, must be in ${connectionString}`); } + if (process.env.MONGODB_NEW_CONNECTION === 'true') { + serverOptions.ConnectionType = ModernConnection; + } + return new MongoClient(connectionString, serverOptions); } From 8724375fa9c1368485fdfe9adf8fb2aa4f4acb43 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 17 Nov 2023 17:05:23 -0500 Subject: [PATCH 12/44] refactor: re-introduce listeners for signal aborting with error --- src/cmap/connection.ts | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index cdf3b9f3be3..e78fe7c552e 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -778,8 +778,6 @@ export class ModernConnection extends TypedEventEmitter { address: string; socketTimeoutMS: number; monitorCommands: boolean; - /** Indicates that the connection (including underlying TCP socket) has been closed. */ - closed: boolean; lastHelloMS?: number; serverApi?: ServerApi; helloOk?: boolean; @@ -830,7 +828,6 @@ export class ModernConnection extends TypedEventEmitter { this.socketTimeoutMS = options.socketTimeoutMS ?? 0; this.monitorCommands = options.monitorCommands; this.serverApi = options.serverApi; - this.closed = false; this[kHello] = null; this[kClusterTime] = null; @@ -840,10 +837,18 @@ export class ModernConnection extends TypedEventEmitter { this.socket = stream; this.controller = new AbortController(); + this.socket.on('error', this.onError.bind(this)); + this.socket.on('close', this.onClose.bind(this)); + this.socket.on('timeout', this.onTimeout.bind(this)); this[kDelayedTimeoutId] = null; } + /** Indicates that the connection (including underlying TCP socket) has been closed. */ + get closed(): boolean { + return this.controller.signal.aborted; + } + get description(): StreamDescription { return this[kDescription]; } @@ -905,6 +910,10 @@ export class ModernConnection extends TypedEventEmitter { this[kLastUseTime] = now(); } + onError(error?: Error) { + this.cleanup(error); + } + onClose() { const message = `connection ${this.id} to ${this.address} closed`; this.cleanup(new MongoNetworkError(message)); @@ -949,7 +958,6 @@ export class ModernConnection extends TypedEventEmitter { return; } - this.closed = true; this.socket.destroy(); this.controller.abort(error); this.emit(Connection.CLOSE); @@ -1115,7 +1123,12 @@ export class ModernConnection extends TypedEventEmitter { if (this.monitorCommands) { this.emit( ModernConnection.COMMAND_SUCCEEDED, - new CommandSucceededEvent(this as unknown as Connection, message, document, started) + new CommandSucceededEvent( + this as unknown as Connection, + message, + options.noResponse ? undefined : document, + started + ) ); } From afd396f423ee7a222991d7892679784e4f63af2f Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 17 Nov 2023 17:05:46 -0500 Subject: [PATCH 13/44] test: set env variable for new connection tests --- .evergreen/config.in.yml | 1 + .evergreen/config.yml | 1 + test/tools/runner/hooks/configuration.js | 3 ++- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/.evergreen/config.in.yml b/.evergreen/config.in.yml index c7c2be41800..15f8ab4b73f 100644 --- a/.evergreen/config.in.yml +++ b/.evergreen/config.in.yml @@ -200,6 +200,7 @@ functions: type: test params: working_dir: "src" + add_expansions_to_env: true timeout_secs: 300 shell: bash script: | diff --git a/.evergreen/config.yml b/.evergreen/config.yml index d08614663ad..ccffbb9846c 100644 --- a/.evergreen/config.yml +++ b/.evergreen/config.yml @@ -170,6 +170,7 @@ functions: type: test params: working_dir: src + add_expansions_to_env: true timeout_secs: 300 shell: bash script: | diff --git a/test/tools/runner/hooks/configuration.js b/test/tools/runner/hooks/configuration.js index e9288e1b028..392d066d54b 100644 --- a/test/tools/runner/hooks/configuration.js +++ b/test/tools/runner/hooks/configuration.js @@ -176,7 +176,8 @@ const testConfigBeforeHook = async function () { ocsp: process.env.OCSP_TLS_SHOULD_SUCCEED != null && process.env.CA_FILE != null, socks5: MONGODB_URI.includes('proxyHost='), compressor: process.env.COMPRESSOR, - cryptSharedLibPath: process.env.CRYPT_SHARED_LIB_PATH + cryptSharedLibPath: process.env.CRYPT_SHARED_LIB_PATH, + newConnectionTesting: process.env.MONGODB_NEW_CONNECTION }; console.error(inspect(currentEnv, { colors: true })); From 88da43da11e5d674387303ceb69577c0a366d336 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 17 Nov 2023 17:06:13 -0500 Subject: [PATCH 14/44] test: fix utf8 validation option tests --- src/cmap/commands.ts | 5 +++- .../bson-options/utf8_validation.test.ts | 27 +++++++++---------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/cmap/commands.ts b/src/cmap/commands.ts index ee1e7b6a7f0..2b2f7539fba 100644 --- a/src/cmap/commands.ts +++ b/src/cmap/commands.ts @@ -705,7 +705,10 @@ export class OpMsgResponse { this.parsed = true; } - parseBsonSerializationOptions({ enableUtf8Validation }: BSONSerializeOptions): { + parseBsonSerializationOptions( + this: void, + { enableUtf8Validation }: BSONSerializeOptions + ): { utf8: { writeErrors: false } | false; } { if (enableUtf8Validation === false) { diff --git a/test/integration/node-specific/bson-options/utf8_validation.test.ts b/test/integration/node-specific/bson-options/utf8_validation.test.ts index 33715394d19..f8220a9fc6c 100644 --- a/test/integration/node-specific/bson-options/utf8_validation.test.ts +++ b/test/integration/node-specific/bson-options/utf8_validation.test.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; -import { Connection } from '../../../mongodb'; +import { OpMsgResponse } from '../../../mongodb'; const EXPECTED_VALIDATION_DISABLED_ARGUMENT = { utf8: false @@ -13,17 +13,17 @@ const EXPECTED_VALIDATION_ENABLED_ARGUMENT = { } }; -describe('class BinMsg', () => { - let onMessageSpy: sinon.SinonSpy; +describe('class OpMsgResponse', () => { + let bsonSpy: sinon.SinonSpy; beforeEach(() => { - onMessageSpy = sinon.spy(Connection.prototype, 'onMessage'); + bsonSpy = sinon.spy(OpMsgResponse.prototype, 'parseBsonSerializationOptions'); }); afterEach(() => { - onMessageSpy?.restore(); + bsonSpy?.restore(); // @ts-expect-error: Allow this to be garbage collected - onMessageSpy = null; + bsonSpy = null; }); let client; @@ -49,9 +49,8 @@ describe('class BinMsg', () => { passOptionTo === 'operation' ? option : {} ); - expect(onMessageSpy).to.have.been.called; - const binMsg = onMessageSpy.lastCall.firstArg; - const result = binMsg.parseBsonSerializationOptions(option); + expect(bsonSpy).to.have.been.called; + const result = bsonSpy.lastCall.returnValue; expect(result).to.deep.equal(EXPECTED_VALIDATION_DISABLED_ARGUMENT); }); } @@ -76,9 +75,8 @@ describe('class BinMsg', () => { passOptionTo === 'operation' ? option : {} ); - expect(onMessageSpy).to.have.been.called; - const binMsg = onMessageSpy.lastCall.firstArg; - const result = binMsg.parseBsonSerializationOptions(option); + expect(bsonSpy).to.have.been.called; + const result = bsonSpy.lastCall.returnValue; expect(result).to.deep.equal(EXPECTED_VALIDATION_ENABLED_ARGUMENT); }); } @@ -102,9 +100,8 @@ describe('class BinMsg', () => { passOptionTo === 'operation' ? option : {} ); - expect(onMessageSpy).to.have.been.called; - const binMsg = onMessageSpy.lastCall.firstArg; - const result = binMsg.parseBsonSerializationOptions(option); + expect(bsonSpy).to.have.been.called; + const result = bsonSpy.lastCall.returnValue; expect(result).to.deep.equal(EXPECTED_VALIDATION_ENABLED_ARGUMENT); }); } From 23b9f7fb47b3bc9a525254540bef1aeb0c2da4bf Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 17 Nov 2023 17:06:29 -0500 Subject: [PATCH 15/44] test: session usage back down to 1! woo! --- test/integration/sessions/sessions.prose.test.ts | 2 +- test/integration/sessions/sessions.test.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/test/integration/sessions/sessions.prose.test.ts b/test/integration/sessions/sessions.prose.test.ts index 3fc41a5de43..5628d2493cc 100644 --- a/test/integration/sessions/sessions.prose.test.ts +++ b/test/integration/sessions/sessions.prose.test.ts @@ -110,7 +110,7 @@ describe('Sessions Prose Tests', () => { expect(events).to.have.lengthOf(operations.length); // This is a guarantee in node, unless you are performing a transaction (which is not being done in this test) - expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex')))).to.have.lengthOf(2); + expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex')))).to.have.lengthOf(1); }); }); diff --git a/test/integration/sessions/sessions.test.ts b/test/integration/sessions/sessions.test.ts index 3bb00e181cc..1647beb9497 100644 --- a/test/integration/sessions/sessions.test.ts +++ b/test/integration/sessions/sessions.test.ts @@ -410,7 +410,7 @@ describe('Sessions Spec', function () { await utilClient?.close(); }); - it('should only use two sessions for many operations when maxPoolSize is 1', async () => { + it('should only use one sessions for many operations when maxPoolSize is 1', async () => { const documents = Array.from({ length: 50 }).map((_, idx) => ({ _id: idx })); const events: CommandStartedEvent[] = []; @@ -420,7 +420,7 @@ describe('Sessions Spec', function () { expect(allResults).to.have.lengthOf(documents.length); expect(events).to.have.lengthOf(documents.length); - expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex'))).size).to.equal(2); + expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex'))).size).to.equal(1); }); }); From 97b8ff30302ffd3afad6cc5a46d874b2181d7f6e Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 17 Nov 2023 17:22:43 -0500 Subject: [PATCH 16/44] test: fix spec auth test to spy on correct connection class --- test/integration/auth/auth.prose.test.ts | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/test/integration/auth/auth.prose.test.ts b/test/integration/auth/auth.prose.test.ts index e9d0fc651ff..e1be56b63e8 100644 --- a/test/integration/auth/auth.prose.test.ts +++ b/test/integration/auth/auth.prose.test.ts @@ -1,7 +1,13 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; -import { Connection, LEGACY_HELLO_COMMAND, type MongoClient, ScramSHA256 } from '../../mongodb'; +import { + Connection, + LEGACY_HELLO_COMMAND, + ModernConnection, + type MongoClient, + ScramSHA256 +} from '../../mongodb'; function makeConnectionString(config, username, password) { return `mongodb://${username}:${password}@${config.host}:${config.port}/admin?`; @@ -289,7 +295,9 @@ describe('Authentication Spec Prose Tests', function () { }; client = this.configuration.newClient({}, options); - const commandSpy = sinon.spy(Connection.prototype, 'command'); + const connectionType = + process.env.MONGODB_NEW_CONNECTION === 'true' ? ModernConnection : Connection; + const commandSpy = sinon.spy(connectionType.prototype, 'command'); await client.connect(); const calls = commandSpy .getCalls() From 3a8275f744857efe35090441563e6ef6b3e07546 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 17 Nov 2023 17:27:08 -0500 Subject: [PATCH 17/44] test: latest lts avoiding npm install issue --- .evergreen/config.yml | 1 + .evergreen/generate_evergreen_tasks.js | 1 + 2 files changed, 2 insertions(+) diff --git a/.evergreen/config.yml b/.evergreen/config.yml index ccffbb9846c..d32fa2e78af 100644 --- a/.evergreen/config.yml +++ b/.evergreen/config.yml @@ -4563,6 +4563,7 @@ buildvariants: display_name: New Connection Tests run_on: rhel80-large expansions: + NODE_LTS_VERSION: 20 CLIENT_ENCRYPTION: true MONGODB_NEW_CONNECTION: true tasks: diff --git a/.evergreen/generate_evergreen_tasks.js b/.evergreen/generate_evergreen_tasks.js index 9ce5f5c0491..29044312cbf 100644 --- a/.evergreen/generate_evergreen_tasks.js +++ b/.evergreen/generate_evergreen_tasks.js @@ -730,6 +730,7 @@ BUILD_VARIANTS.push({ display_name: 'New Connection Tests', run_on: DEFAULT_OS, expansions: { + NODE_LTS_VERSION: LATEST_LTS, CLIENT_ENCRYPTION: true, MONGODB_NEW_CONNECTION: true }, From 771d9130255956084c99d76ac35ec583c32ae880 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 17 Nov 2023 19:45:19 -0500 Subject: [PATCH 18/44] test: fix unit tests, skip i/o helpers temporarily --- ...rver_discovery_and_monitoring.spec.test.ts | 19 ++++++++++--------- test/unit/cmap/modern_connection.test.ts | 8 +++++--- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts b/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts index 43976b53f3c..8d84d1cffe6 100644 --- a/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts +++ b/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts @@ -358,7 +358,7 @@ async function executeSDAMTest(testData: SDAMTest) { }; expect( thrownError, - 'expected the error thrown to be one of MongoNetworkError, MongoNetworkTimeoutError or MongoServerError (referred to in the spec as an "Application Error")' + `expected the error thrown to be one of MongoNetworkError, MongoNetworkTimeoutError or MongoServerError (referred to in the spec as an "Application Error") got ${thrownError.name} ${thrownError.stack}` ).to.satisfy(isApplicationError); } } else if (phase.outcome != null && Object.keys(phase).length === 1) { @@ -413,18 +413,19 @@ function withConnectionStubImpl(appError) { generation: typeof appError.generation === 'number' ? appError.generation : connectionPool.generation, - command(ns, cmd, options, callback) { + async command(ns, cmd, options) { if (appError.type === 'network') { - callback(new MongoNetworkError('test generated')); + throw new MongoNetworkError('test generated'); } else if (appError.type === 'timeout') { - callback( - new MongoNetworkTimeoutError('xxx timed out', { - beforeHandshake: appError.when === 'beforeHandshakeCompletes' - }) - ); + throw new MongoNetworkTimeoutError('xxx timed out', { + beforeHandshake: appError.when === 'beforeHandshakeCompletes' + }); } else { - callback(new MongoServerError(appError.response)); + throw new MongoServerError(appError.response); } + }, + async commandAsync(ns, cmd, options) { + return this.command(ns, cmd, options); } }; diff --git a/test/unit/cmap/modern_connection.test.ts b/test/unit/cmap/modern_connection.test.ts index c4405bcffbf..17f34e92fb4 100644 --- a/test/unit/cmap/modern_connection.test.ts +++ b/test/unit/cmap/modern_connection.test.ts @@ -23,8 +23,9 @@ import { class MockSocket extends EventEmitter { buffer: Buffer[] = []; - push(...args: Buffer[]) { - this.buffer.push(...args); + write(b: Buffer, cb: (e?: Error) => void) { + this.buffer.push(b); + queueMicrotask(cb); } } @@ -32,7 +33,7 @@ class MockModernConnection { socket = new MockSocket(); } -describe('writeCommand', () => { +describe.skip('writeCommand', () => { context('when compression is disabled', () => { it('pushes an uncompressed command into the socket buffer', async () => { const command = new OpMsgRequest('db', { find: 1 }, { requestId: 1 }); @@ -104,6 +105,7 @@ describe('writeCommand', () => { context('when a `drain` event is emitted from the underlying socket', () => { it('resolves', async () => { const connection = new MockModernConnection(); + connection.socket.write = () => null; const promise = writeCommand(connection, new OpMsgRequest('db', { ping: 1 }, {}), { agreedCompressor: 'none' }); From cbaa60d0de05bb72da62ab3b8e911e539facdfad Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Mon, 20 Nov 2023 11:37:59 -0500 Subject: [PATCH 19/44] test: fix lint, unit stub of command, delayedTimeout --- src/cmap/connection.ts | 17 +++++++++++------ ...server_discovery_and_monitoring.spec.test.ts | 2 +- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index e78fe7c552e..1d5683b7be3 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -786,7 +786,7 @@ export class ModernConnection extends TypedEventEmitter { authContext?: AuthContext; /**@internal */ - [kDelayedTimeoutId]: NodeJS.Timeout | null; + delayedTimeoutId: NodeJS.Timeout | null = null; /** @internal */ [kDescription]: StreamDescription; /** @internal */ @@ -840,13 +840,11 @@ export class ModernConnection extends TypedEventEmitter { this.socket.on('error', this.onError.bind(this)); this.socket.on('close', this.onClose.bind(this)); this.socket.on('timeout', this.onTimeout.bind(this)); - - this[kDelayedTimeoutId] = null; } /** Indicates that the connection (including underlying TCP socket) has been closed. */ get closed(): boolean { - return this.controller.signal.aborted; + return this.controller.signal.aborted || this.socket.destroyed; } get description(): StreamDescription { @@ -920,7 +918,7 @@ export class ModernConnection extends TypedEventEmitter { } onTimeout() { - this[kDelayedTimeoutId] = setTimeout(() => { + this.delayedTimeoutId = setTimeout(() => { const message = `connection ${this.id} to ${this.address} timed out`; const beforeHandshake = this.hello == null; this.cleanup(new MongoNetworkTimeoutError(message, { beforeHandshake })); @@ -1034,6 +1032,8 @@ export class ModernConnection extends TypedEventEmitter { ): Promise { const { signal } = this.controller; + signal.throwIfAborted(); + if (typeof options.socketTimeoutMS === 'number') { this.socket.setTimeout(options.socketTimeoutMS); } else if (this.socketTimeoutMS !== 0) { @@ -1052,7 +1052,7 @@ export class ModernConnection extends TypedEventEmitter { response = await read(this, { signal }); } finally { - this.controller = new AbortController(); + if (!signal.aborted) this.controller = new AbortController(); } response.parse(options); @@ -1156,6 +1156,11 @@ export async function* readWireProtocolMessages( const bufferPool = new BufferPool(); const maxBsonMessageSize = connection.hello?.maxBsonMessageSize ?? kDefaultMaxBsonMessageSize; for await (const [chunk] of on(connection.socket, 'data', { signal })) { + if (connection.delayedTimeoutId) { + clearTimeout(connection.delayedTimeoutId); + connection.delayedTimeoutId = null; + } + bufferPool.append(chunk); const sizeOfMessage = bufferPool.getInt32(); diff --git a/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts b/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts index 8d84d1cffe6..06037279b74 100644 --- a/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts +++ b/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts @@ -413,7 +413,7 @@ function withConnectionStubImpl(appError) { generation: typeof appError.generation === 'number' ? appError.generation : connectionPool.generation, - async command(ns, cmd, options) { + async command(_ns, _cmd, _options) { if (appError.type === 'network') { throw new MongoNetworkError('test generated'); } else if (appError.type === 'timeout') { From c179bdd4cd07b81ef2fe1fcc7532f4743acb1c77 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Mon, 20 Nov 2023 11:53:04 -0500 Subject: [PATCH 20/44] chore: unset FLE variable on windows --- .evergreen/run-tests.sh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.evergreen/run-tests.sh b/.evergreen/run-tests.sh index 170547a6340..c0ff5280166 100755 --- a/.evergreen/run-tests.sh +++ b/.evergreen/run-tests.sh @@ -55,6 +55,11 @@ fi npm install @mongodb-js/zstd npm install snappy +if [[ "$OS" = "Windows_NT" ]]; then + # TODO: Temp fix for add_expansions_to_env usage that breaks Windows + unset CSFLE_KMS_PROVIDERS +fi + export AUTH=$AUTH export SINGLE_MONGOS_LB_URI=${SINGLE_MONGOS_LB_URI} export MULTI_MONGOS_LB_URI=${MULTI_MONGOS_LB_URI} From e2338d7d7e5114d3e7fe72a4fb6cd90c9e93bad5 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Mon, 20 Nov 2023 15:55:14 -0500 Subject: [PATCH 21/44] refactor: fix closed condition --- .../transactions/transactions.test.ts | 70 +++++++++---------- 1 file changed, 32 insertions(+), 38 deletions(-) diff --git a/test/integration/transactions/transactions.test.ts b/test/integration/transactions/transactions.test.ts index c05bead5367..92ffceff090 100644 --- a/test/integration/transactions/transactions.test.ts +++ b/test/integration/transactions/transactions.test.ts @@ -263,50 +263,44 @@ describe('Transactions', function () { }); describe('TransientTransactionError', function () { + let client: MongoClient; + beforeEach(async function () { + client = this.configuration.newClient(); + }); + + afterEach(async function () { + await client.close(); + }); + it('should have a TransientTransactionError label inside of a transaction', { metadata: { requires: { topology: 'replicaset', mongodb: '>=4.0.0' } }, - test: function (done) { - const configuration = this.configuration; - const client = configuration.newClient({ w: 1 }); + test: async function () { + const session = client.startSession(); + const db = client.db(); - client.connect(err => { - expect(err).to.not.exist; + await db.collection('transaction_error_test_2').drop(); + const coll = await db.createCollection('transaction_error_test_2'); - const session = client.startSession(); - const db = client.db(configuration.db); - db.collection('transaction_error_test_2').drop(() => { - db.createCollection('transaction_error_test_2', (err, coll) => { - expect(err).to.not.exist; + session.startTransaction(); - session.startTransaction(); - coll.insertOne({ a: 1 }, { session }, err => { - expect(err).to.not.exist; - expect(session.inTransaction()).to.be.true; - - client.db('admin').command( - { - configureFailPoint: 'failCommand', - mode: { times: 1 }, - data: { failCommands: ['insert'], closeConnection: true } - }, - err => { - expect(err).to.not.exist; - expect(session.inTransaction()).to.be.true; - - coll.insertOne({ b: 2 }, { session }, err => { - expect(err).to.exist.and.to.be.an.instanceof(MongoNetworkError); - if (err instanceof MongoNetworkError) { - expect(err.hasErrorLabel('TransientTransactionError')).to.be.true; - } - - session.abortTransaction(() => session.endSession(() => client.close(done))); - }); - } - ); - }); - }); - }); + await coll.insertOne({ a: 1 }, { session }); + + expect(session.inTransaction()).to.be.true; + + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { failCommands: ['insert'], closeConnection: true } }); + + expect(session.inTransaction()).to.be.true; + + const error = await coll.insertOne({ b: 2 }, { session }).catch(e => e); + expect(error).to.exist.and.to.be.an.instanceof(MongoNetworkError); + expect(error.hasErrorLabel('TransientTransactionError')).to.be.true; + + await session.abortTransaction(); + await session.endSession(); } }); From 3d2ca60b3d699b01f524a7525f07576b450a0a44 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Mon, 20 Nov 2023 15:55:39 -0500 Subject: [PATCH 22/44] refactor: fix closed condition --- src/cmap/connection.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 1d5683b7be3..4b009f6ebd5 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -844,7 +844,7 @@ export class ModernConnection extends TypedEventEmitter { /** Indicates that the connection (including underlying TCP socket) has been closed. */ get closed(): boolean { - return this.controller.signal.aborted || this.socket.destroyed; + return this.controller.signal.aborted; } get description(): StreamDescription { From 5b3d350148bae3c084368ae2230cd4a9dff14509 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Mon, 20 Nov 2023 16:12:04 -0500 Subject: [PATCH 23/44] test: catch ns not found --- .../transactions/transactions.test.ts | 41 ++++++++----------- 1 file changed, 16 insertions(+), 25 deletions(-) diff --git a/test/integration/transactions/transactions.test.ts b/test/integration/transactions/transactions.test.ts index 92ffceff090..141a7fc003f 100644 --- a/test/integration/transactions/transactions.test.ts +++ b/test/integration/transactions/transactions.test.ts @@ -278,7 +278,10 @@ describe('Transactions', function () { const session = client.startSession(); const db = client.db(); - await db.collection('transaction_error_test_2').drop(); + await db + .collection('transaction_error_test_2') + .drop() + .catch(() => null); const coll = await db.createCollection('transaction_error_test_2'); session.startTransaction(); @@ -295,8 +298,8 @@ describe('Transactions', function () { expect(session.inTransaction()).to.be.true; - const error = await coll.insertOne({ b: 2 }, { session }).catch(e => e); - expect(error).to.exist.and.to.be.an.instanceof(MongoNetworkError); + const error = await coll.insertOne({ b: 2 }, { session }).catch(error => error); + expect(error).to.be.instanceOf(MongoNetworkError); expect(error.hasErrorLabel('TransientTransactionError')).to.be.true; await session.abortTransaction(); @@ -306,30 +309,18 @@ describe('Transactions', function () { it('should not have a TransientTransactionError label outside of a transaction', { metadata: { requires: { topology: 'replicaset', mongodb: '>=4.0.0' } }, - test: function (done) { - const configuration = this.configuration; - const client = configuration.newClient({ w: 1 }); + test: async function () { + const db = client.db(); + const coll = db.collection('test'); - client.connect(err => { - expect(err).to.not.exist; - const db = client.db(configuration.db); - const coll = db.collection('transaction_error_test1'); - - client.db('admin').command( - { - configureFailPoint: 'failCommand', - mode: { times: 2 }, - data: { failCommands: ['insert'], closeConnection: true } - }, - err => { - expect(err).to.not.exist; - coll.insertOne({ a: 1 }, err => { - expect(err).to.exist.and.to.be.an.instanceOf(MongoNetworkError); - client.close(done); - }); - } - ); + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: { times: 2 }, // fail 2 times for retry + data: { failCommands: ['insert'], closeConnection: true } }); + + const error = await coll.insertOne({ a: 1 }).catch(error => error); + expect(error).to.be.instanceOf(MongoNetworkError); } }); }); From 8480f07412e4678441dffba73fce9f585b767d1e Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Tue, 21 Nov 2023 09:56:22 -0500 Subject: [PATCH 24/44] revert: parseBsonSerializationOptions this type --- src/cmap/commands.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/cmap/commands.ts b/src/cmap/commands.ts index 2b2f7539fba..ee1e7b6a7f0 100644 --- a/src/cmap/commands.ts +++ b/src/cmap/commands.ts @@ -705,10 +705,7 @@ export class OpMsgResponse { this.parsed = true; } - parseBsonSerializationOptions( - this: void, - { enableUtf8Validation }: BSONSerializeOptions - ): { + parseBsonSerializationOptions({ enableUtf8Validation }: BSONSerializeOptions): { utf8: { writeErrors: false } | false; } { if (enableUtf8Validation === false) { From 82393213345f36979387198595f9a6eca6a82a87 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Tue, 21 Nov 2023 10:27:20 -0500 Subject: [PATCH 25/44] refactor: improve aborted and add tests --- src/cmap/connection.ts | 4 ++-- src/utils.ts | 13 ++++++++++--- test/unit/utils.test.ts | 39 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 5 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 4b009f6ebd5..2741ddbd648 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -34,7 +34,7 @@ import { type Callback, HostAddress, maxWireVersion, - mayAbort, + aborted, type MongoDBNamespace, now, uuidV4 @@ -1210,7 +1210,7 @@ export async function writeCommand( options.signal?.throwIfAborted(); await Promise.race([ promisify(connection.socket.write.bind(connection.socket))(buffer), - mayAbort(options.signal) + aborted(options.signal) ]); } diff --git a/src/utils.ts b/src/utils.ts index 4a217a91a62..64372f57625 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1300,7 +1300,14 @@ export function isHostMatch(match: RegExp, host?: string): boolean { return host && match.test(host.toLowerCase()) ? true : false; } -export async function mayAbort(signal?: AbortSignal) { - if (signal == null) return new Promise(() => null); // never ending story - return new Promise((_, reject) => signal.addEventListener('abort', reject)); +/** + * Takes an AbortSignal and creates a promise that will reject when the signal aborts + * If the argument provided is nullish the returned promise will **never** resolve. + * @param signal - an optional abort signal to link to a promise rejection + */ +export async function aborted(signal?: AbortSignal): Promise { + signal?.throwIfAborted(); + return new Promise((_, reject) => + signal?.addEventListener('abort', () => reject(signal.reason), { once: true }) + ); } diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index 9614cd9c923..f083fcfeec9 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -1,7 +1,9 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; +import { setTimeout } from 'timers'; import { + aborted, BufferPool, ByteUtils, compareObjectId, @@ -15,6 +17,7 @@ import { maybeCallback, MongoDBCollectionNamespace, MongoDBNamespace, + MongoError, MongoRuntimeError, ObjectId, shuffle, @@ -1165,4 +1168,40 @@ describe('driver utils', function () { }); }); }); + + describe('aborted()', () => { + context('when given an aborted signal', () => { + it('rejects the returned promise', async () => { + const controller = new AbortController(); + controller.abort(new MongoError('my error')); + const error = await aborted(controller.signal).catch(error => error); + expect(error).to.be.instanceOf(MongoError); + expect(error.message).to.equal('my error'); + }); + }); + + context('when given a signal that aborts after some time', () => { + it('rejects the returned promise after the timeout', async () => { + const controller = new AbortController(); + setTimeout(() => controller.abort(new MongoError('my error')), 10); + const start = +new Date(); + const error = await aborted(controller.signal).catch(error => error); + const end = +new Date(); + + expect(error).to.be.instanceOf(MongoError); + expect(error.message).to.equal('my error'); + expect(end - start).to.be.within(8, 13); + }); + }); + + context('when given a nullish value', () => { + it('returns a promise that pends forever', async () => { + const error = await Promise.race([ + aborted().catch(error => error), + aborted(AbortSignal.timeout(100)) + ]).catch(error => error); + expect(error).to.be.instanceOf(DOMException); + }); + }); + }); }); From 0805ef87eff04331bcef896f7aebc2c6bf60aa4c Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Tue, 21 Nov 2023 10:58:09 -0500 Subject: [PATCH 26/44] fix: lint and node 16 unit --- src/cmap/connection.ts | 2 +- test/unit/utils.test.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 2741ddbd648..32e6a2ebb72 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -29,12 +29,12 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types'; import type { ReadPreferenceLike } from '../read_preference'; import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions'; import { + aborted, BufferPool, calculateDurationInMs, type Callback, HostAddress, maxWireVersion, - aborted, type MongoDBNamespace, now, uuidV4 diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index f083fcfeec9..61ae72a0e47 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -1200,7 +1200,7 @@ describe('driver utils', function () { aborted().catch(error => error), aborted(AbortSignal.timeout(100)) ]).catch(error => error); - expect(error).to.be.instanceOf(DOMException); + expect(error.name).to.equal('TimeoutError'); }); }); }); From 0778c8689a7ec46710ffe9a692e898b69cca8dc6 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 22 Nov 2023 12:29:36 -0500 Subject: [PATCH 27/44] refactor: remove throwIfAborted --- src/cmap/connection.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 32e6a2ebb72..64f718c109c 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -1207,7 +1207,6 @@ export async function writeCommand( zlibCompressionLevel: options.zlibCompressionLevel ?? 0 }); const buffer = Buffer.concat(await finalCommand.toBin()); - options.signal?.throwIfAborted(); await Promise.race([ promisify(connection.socket.write.bind(connection.socket))(buffer), aborted(options.signal) From a8fea73471dcaf7315d96f1f4b9a951033a5bd4a Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 22 Nov 2023 12:40:54 -0500 Subject: [PATCH 28/44] docs: add comment about abort listener --- src/cmap/connection.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 64f718c109c..d9564ec2327 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -1052,6 +1052,7 @@ export class ModernConnection extends TypedEventEmitter { response = await read(this, { signal }); } finally { + // Replace controller to avoid boundless 'abort' listeners if (!signal.aborted) this.controller = new AbortController(); } From 3fe7dfdfa38788f31771bd0fae70a481fcf7a8ca Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 22 Nov 2023 12:48:18 -0500 Subject: [PATCH 29/44] fix: command monitoring for io failures --- src/cmap/connection.ts | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index d9564ec2327..fe50115bec8 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -22,6 +22,7 @@ import { MongoParseError, MongoRuntimeError, MongoServerError, + MongoUnexpectedServerResponseError, MongoWriteConcernError } from '../error'; import type { ServerApi, SupportedNodeConnectionOptions } from '../mongo_client'; @@ -1097,7 +1098,29 @@ export class ModernConnection extends TypedEventEmitter { ); } - const document = await this.sendCommand(message, options); + let document = null; + try { + document = await this.sendCommand(message, options); + } catch (ioError) { + if (this.monitorCommands) { + this.emit( + ModernConnection.COMMAND_FAILED, + new CommandFailedEvent(this as unknown as Connection, message, ioError, started) + ); + } + throw ioError; + } + + if (document == null) { + const unexpected = new MongoUnexpectedServerResponseError( + 'sendCommand did not throw and did not return a document' + ); + this.emit( + ModernConnection.COMMAND_FAILED, + new CommandFailedEvent(this as unknown as Connection, message, unexpected, started) + ); + throw unexpected; + } if (document.writeConcernError) { const writeConcernError = new MongoWriteConcernError(document.writeConcernError, document); From 303e2e9b475fc0ccfd84ba022d8d2becf456db58 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 22 Nov 2023 12:49:02 -0500 Subject: [PATCH 30/44] fix: command monitoring if stmt --- src/cmap/connection.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index fe50115bec8..0dcd827937a 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -1115,10 +1115,12 @@ export class ModernConnection extends TypedEventEmitter { const unexpected = new MongoUnexpectedServerResponseError( 'sendCommand did not throw and did not return a document' ); - this.emit( - ModernConnection.COMMAND_FAILED, - new CommandFailedEvent(this as unknown as Connection, message, unexpected, started) - ); + if (this.monitorCommands) { + this.emit( + ModernConnection.COMMAND_FAILED, + new CommandFailedEvent(this as unknown as Connection, message, unexpected, started) + ); + } throw unexpected; } From 8f5be31f3a09e47c78322f0002a275caea700ffd Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Tue, 28 Nov 2023 13:17:51 -0500 Subject: [PATCH 31/44] test: remove ModernConnection unit tests --- test/unit/cmap/modern_connection.test.ts | 427 ----------------------- 1 file changed, 427 deletions(-) delete mode 100644 test/unit/cmap/modern_connection.test.ts diff --git a/test/unit/cmap/modern_connection.test.ts b/test/unit/cmap/modern_connection.test.ts deleted file mode 100644 index 17f34e92fb4..00000000000 --- a/test/unit/cmap/modern_connection.test.ts +++ /dev/null @@ -1,427 +0,0 @@ -import { expect } from 'chai'; -import * as sinon from 'sinon'; -import { EventEmitter } from 'stream'; -import { setTimeout } from 'timers/promises'; - -// eslint-disable-next-line @typescript-eslint/no-restricted-imports -import * as compression from '../../../src/cmap/wire_protocol/compression'; -import { - decompressResponse, - LEGACY_HELLO_COMMAND, - MongoDecompressionError, - MongoParseError, - OP_COMPRESSED, - OP_MSG, - OpCompressedRequest, - OpMsgRequest, - OpMsgResponse, - type OpQueryResponse, - read, - readMany, - writeCommand -} from '../../mongodb'; - -class MockSocket extends EventEmitter { - buffer: Buffer[] = []; - write(b: Buffer, cb: (e?: Error) => void) { - this.buffer.push(b); - queueMicrotask(cb); - } -} - -class MockModernConnection { - socket = new MockSocket(); -} - -describe.skip('writeCommand', () => { - context('when compression is disabled', () => { - it('pushes an uncompressed command into the socket buffer', async () => { - const command = new OpMsgRequest('db', { find: 1 }, { requestId: 1 }); - const connection = new MockModernConnection(); - const prom = writeCommand(connection as any, command, { - agreedCompressor: 'none' - }); - - connection.socket.emit('drain'); - await prom; - - const [buffer] = connection.socket.buffer; - expect(buffer).to.exist; - const opCode = buffer.readInt32LE(12); - - expect(opCode).to.equal(OP_MSG); - }); - }); - - context('when compression is enabled', () => { - context('when the command is compressible', () => { - it('pushes a compressed command into the socket buffer', async () => { - const command = new OpMsgRequest('db', { find: 1 }, { requestId: 1 }); - const connection = new MockModernConnection(); - const prom = writeCommand(connection as any, command, { - agreedCompressor: 'snappy' - }); - - connection.socket.emit('drain'); - await prom; - - const [buffer] = connection.socket.buffer; - expect(buffer).to.exist; - const opCode = buffer.readInt32LE(12); - - expect(opCode).to.equal(OP_COMPRESSED); - }); - }); - context('when the command is not compressible', () => { - it('pushes an uncompressed command into the socket buffer', async () => { - const command = new OpMsgRequest('db', { [LEGACY_HELLO_COMMAND]: 1 }, { requestId: 1 }); - const connection = new MockModernConnection(); - const prom = writeCommand(connection as any, command, { - agreedCompressor: 'snappy' - }); - - connection.socket.emit('drain'); - await prom; - - const [buffer] = connection.socket.buffer; - expect(buffer).to.exist; - const opCode = buffer.readInt32LE(12); - - expect(opCode).to.equal(OP_MSG); - }); - }); - }); - context('when a `drain` event is not emitted from the underlying socket', () => { - it('never resolves', async () => { - const connection = new MockModernConnection(); - const promise = writeCommand(connection, new OpMsgRequest('db', { ping: 1 }, {}), { - agreedCompressor: 'none' - }); - const result = await Promise.race([promise, setTimeout(1000, 'timeout', { ref: false })]); - expect(result).to.equal('timeout'); - }); - }); - - context('when a `drain` event is emitted from the underlying socket', () => { - it('resolves', async () => { - const connection = new MockModernConnection(); - connection.socket.write = () => null; - const promise = writeCommand(connection, new OpMsgRequest('db', { ping: 1 }, {}), { - agreedCompressor: 'none' - }); - connection.socket.emit('drain'); - const result = await Promise.race([promise, setTimeout(5000, 'timeout', { ref: false })]); - expect(result).to.be.undefined; - }); - }); -}); - -describe('decompressResponse()', () => { - context('when the message is not compressed', () => { - let message: Buffer; - let response: OpMsgResponse | OpQueryResponse; - let spy; - beforeEach(async () => { - message = Buffer.concat(new OpMsgRequest('db', { find: 1 }, { requestId: 1 }).toBin()); - spy = sinon.spy(compression, 'decompress'); - - response = await decompressResponse(message); - }); - afterEach(() => sinon.restore()); - it('returns a wire protocol message', () => { - expect(response).to.be.instanceOf(OpMsgResponse); - }); - it('does not attempt decompression', () => { - expect(spy).not.to.have.been.called; - }); - }); - - context('when the message is compressed', () => { - let message: Buffer; - let response: OpMsgResponse | OpQueryResponse; - beforeEach(async () => { - const msg = new OpMsgRequest('db', { find: 1 }, { requestId: 1 }); - message = Buffer.concat( - await new OpCompressedRequest(msg, { - zlibCompressionLevel: 0, - agreedCompressor: 'snappy' - }).toBin() - ); - - response = await decompressResponse(message); - }); - - it('returns a wire protocol message', () => { - expect(response).to.be.instanceOf(OpMsgResponse); - }); - it('correctly decompresses the message', () => { - response.parse({}); - expect(response.documents[0]).to.deep.equal({ $db: 'db', find: 1 }); - }); - - context( - 'when the compressed message does not match the compression metadata in the header', - () => { - beforeEach(async () => { - const msg = new OpMsgRequest('db', { find: 1 }, { requestId: 1 }); - message = Buffer.concat( - await new OpCompressedRequest(msg, { - zlibCompressionLevel: 0, - agreedCompressor: 'snappy' - }).toBin() - ); - message.writeInt32LE( - 100, - 16 + 4 // message header size + offset to length - ); // write an invalid message length into the header - }); - it('throws a MongoDecompressionError', async () => { - const error = await decompressResponse(message).catch(e => e); - expect(error).to.be.instanceOf(MongoDecompressionError); - }); - } - ); - }); -}); - -describe('read()', () => { - let connection: MockModernConnection; - let message: Buffer; - - beforeEach(() => { - connection = new MockModernConnection(); - message = Buffer.concat(new OpMsgRequest('db', { ping: 1 }, { requestId: 1 }).toBin()); - }); - it('does not resolve if there are no data events', async () => { - const promise = read(connection); - const result = await Promise.race([promise, setTimeout(1000, 'timeout', { ref: false })]); - expect(result).to.equal('timeout'); - }); - - it('does not resolve until there is a complete message', async () => { - const promise = read(connection); - { - const result = await Promise.race([promise, setTimeout(1000, 'timeout', { ref: false })]); - expect(result, 'received data on empty socket').to.equal('timeout'); - } - - { - connection.socket.emit('data', message.slice(0, 10)); - const result = await Promise.race([promise, setTimeout(1000, 'timeout', { ref: false })]); - expect( - result, - 'received data when only part of message was emitted from the socket' - ).to.equal('timeout'); - } - - { - connection.socket.emit('data', message.slice(10)); - const result = await Promise.race([promise, setTimeout(1000, 'timeout', { ref: false })]); - expect(result, 'expected OpMsgResponse - got timeout instead').to.be.instanceOf( - OpMsgResponse - ); - } - }); - - it('removes all event listeners from the socket after a message is received', async () => { - const promise = read(connection); - - connection.socket.emit('data', message); - await promise; - - expect(connection.socket.listenerCount('data')).to.equal(0); - }); - - it('when `moreToCome` is set in the response, it only returns one message', async () => { - message = Buffer.concat( - new OpMsgRequest('db', { ping: 1 }, { requestId: 1, moreToCome: true }).toBin() - ); - - const promise = read(connection); - - connection.socket.emit('data', message); - await promise; - - expect(connection.socket.listenerCount('data')).to.equal(0); - }); - - context('when reading an invalid message', () => { - context('when the message < 0', () => { - it('throws a mongo parse error', async () => { - message.writeInt32LE(-1); - const promise = read(connection).catch(e => e); - - connection.socket.emit('data', message); - const error = await promise; - expect(error).to.be.instanceof(MongoParseError); - }); - }); - - context('when the message length > max bson message size', () => { - it('throws a mongo parse error', async () => { - message.writeInt32LE(1024 * 1024 * 16 * 4 + 1); - const promise = read(connection).catch(e => e); - - connection.socket.emit('data', message); - const error = await promise; - expect(error).to.be.instanceof(MongoParseError); - }); - }); - }); - - context('when compression is enabled', () => { - it('returns a decompressed message', async () => { - const message = Buffer.concat( - await new OpCompressedRequest( - new OpMsgRequest('db', { ping: 1 }, { requestId: 1, moreToCome: true }), - { zlibCompressionLevel: 0, agreedCompressor: 'snappy' } - ).toBin() - ); - - const promise = read(connection); - - connection.socket.emit('data', message); - const result = await promise; - - expect(result).to.be.instanceOf(OpMsgResponse); - }); - }); -}); - -describe('readMany()', () => { - let connection: MockModernConnection; - let message: Buffer; - - beforeEach(() => { - connection = new MockModernConnection(); - message = Buffer.concat(new OpMsgRequest('db', { ping: 1 }, { requestId: 1 }).toBin()); - }); - it('does not resolve if there are no data events', async () => { - const generator = readMany(connection); - const result = await Promise.race([ - generator.next(), - setTimeout(1000, 'timeout', { ref: false }) - ]); - expect(result).to.equal('timeout'); - }); - - it('does not resolve until there is a complete message', async () => { - const generator = readMany(connection); - const promise = generator.next(); - { - const result = await Promise.race([promise, setTimeout(1000, 'timeout', { ref: false })]); - expect(result, 'received data on empty socket').to.equal('timeout'); - } - - { - connection.socket.emit('data', message.slice(0, 10)); - const result = await Promise.race([promise, setTimeout(1000, 'timeout', { ref: false })]); - expect( - result, - 'received data when only part of message was emitted from the socket' - ).to.equal('timeout'); - } - - { - connection.socket.emit('data', message.slice(10)); - const result = await Promise.race([promise, setTimeout(1000, 'timeout', { ref: false })]); - expect(result.value, 'expected OpMsgResponse - got timeout instead').to.be.instanceOf( - OpMsgResponse - ); - } - }); - - it('when moreToCome is set, it does not remove `data` listeners after receiving a message', async () => { - const generator = readMany(connection); - const promise = generator.next(); - message = Buffer.concat( - new OpMsgRequest('db', { ping: 1 }, { requestId: 1, moreToCome: true }).toBin() - ); - connection.socket.emit('data', message); - - const { value: response } = await promise; - - expect(response).to.be.instanceOf(OpMsgResponse); - expect(connection.socket.listenerCount('data')).to.equal(1); - }); - - it('returns messages until `moreToCome` is false', async () => { - const generator = readMany(connection); - - for ( - let i = 0, - message = Buffer.concat( - new OpMsgRequest('db', { ping: 1 }, { requestId: 1, moreToCome: true }).toBin() - ); - i < 3; - ++i - ) { - const promise = generator.next(); - connection.socket.emit('data', message); - const { value: response } = await promise; - expect(response, `response ${i} was not OpMsgResponse`).to.be.instanceOf(OpMsgResponse); - expect( - connection.socket.listenerCount('data'), - `listener count for ${i} was non-zero` - ).to.equal(1); - } - - const message = Buffer.concat( - new OpMsgRequest('db', { ping: 1 }, { requestId: 1, moreToCome: false }).toBin() - ); - const promise = generator.next(); - connection.socket.emit('data', message); - const { value: response } = await promise; - expect(response, `response was not OpMsgResponse`).to.be.instanceOf(OpMsgResponse); - expect(connection.socket.listenerCount('data')).to.equal(1); - - await generator.next(); - expect(connection.socket.listenerCount('data')).to.equal(0); - }); - - context('when reading an invalid message', () => { - context('when the message < 0', () => { - it('throws a mongo parse error', async () => { - message.writeInt32LE(-1); - const promise = readMany(connection) - .next() - .catch(e => e); - - connection.socket.emit('data', message); - const error = await promise; - expect(error).to.be.instanceof(MongoParseError); - }); - }); - - context('when the message length > max bson message size', () => { - it('throws a mongo parse error', async () => { - message.writeInt32LE(1024 * 1024 * 16 * 4 + 1); - const promise = readMany(connection) - .next() - .catch(e => e); - - connection.socket.emit('data', message); - const error = await promise; - expect(error).to.be.instanceof(MongoParseError); - }); - }); - }); - - context('when compression is enabled', () => { - it('returns a decompressed message', async () => { - const message = Buffer.concat( - await new OpCompressedRequest(new OpMsgRequest('db', { ping: 1 }, { requestId: 1 }), { - zlibCompressionLevel: 0, - agreedCompressor: 'snappy' - }).toBin() - ); - - const generator = readMany(connection); - const promise = generator.next(); - connection.socket.emit('data', message); - const { value: response } = await promise; - - expect(response).to.be.instanceOf(OpMsgResponse); - }); - }); -}); From 150d9d657861d6596222a1cad15afd24a68d5e35 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Tue, 28 Nov 2023 18:18:34 -0500 Subject: [PATCH 32/44] refactor: add abortable helper --- src/cmap/connection.ts | 19 +++++------ src/utils.ts | 27 ++++++++++++++-- test/unit/utils.test.ts | 72 +++++++++++++++++++++++++++++++++++++++-- 3 files changed, 101 insertions(+), 17 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 0dcd827937a..09a9d28c784 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -30,7 +30,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types'; import type { ReadPreferenceLike } from '../read_preference'; import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions'; import { - aborted, + abortable, BufferPool, calculateDurationInMs, type Callback, @@ -1049,6 +1049,8 @@ export class ModernConnection extends TypedEventEmitter { signal }); + signal.throwIfAborted(); + if (options.noResponse) return { ok: 1 }; response = await read(this, { signal }); @@ -1081,12 +1083,6 @@ export class ModernConnection extends TypedEventEmitter { command: Document, options: CommandOptions = {} ): Promise { - // Temporary to make sure no callback usage: - // eslint-disable-next-line prefer-rest-params - if (typeof arguments[arguments.length - 1] === 'function') { - throw new Error('no callbacks allowed!!'); - } - const message = this.prepareCommand(ns.db, command, options); let started = 0; @@ -1232,11 +1228,12 @@ export async function writeCommand( agreedCompressor: options.agreedCompressor ?? 'none', zlibCompressionLevel: options.zlibCompressionLevel ?? 0 }); + const buffer = Buffer.concat(await finalCommand.toBin()); - await Promise.race([ - promisify(connection.socket.write.bind(connection.socket))(buffer), - aborted(options.signal) - ]); + + const socketWriteFn = promisify(connection.socket.write.bind(connection.socket)); + + return abortable(socketWriteFn(buffer), options); } /** diff --git a/src/utils.ts b/src/utils.ts index 64372f57625..5871695c098 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1300,14 +1300,35 @@ export function isHostMatch(match: RegExp, host?: string): boolean { return host && match.test(host.toLowerCase()) ? true : false; } +export async function abortable( + promise: Promise, + { signal }: { signal?: AbortSignal } +): Promise { + const { abort, done } = aborted(signal); + try { + return await Promise.race([promise, abort]); + } finally { + done.abort(); + } +} + /** * Takes an AbortSignal and creates a promise that will reject when the signal aborts * If the argument provided is nullish the returned promise will **never** resolve. * @param signal - an optional abort signal to link to a promise rejection */ -export async function aborted(signal?: AbortSignal): Promise { +export function aborted(signal?: AbortSignal): { + abort: Promise; + done: AbortController; +} { + const done = new AbortController(); signal?.throwIfAborted(); - return new Promise((_, reject) => - signal?.addEventListener('abort', () => reject(signal.reason), { once: true }) + const abort = new Promise((_, reject) => + signal?.addEventListener('abort', () => reject(signal.reason), { + once: true, + // @ts-expect-error: @types/node erroneously claim this does not exist + signal: done.signal + }) ); + return { abort, done }; } diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index 61ae72a0e47..c30efc56a74 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -3,6 +3,7 @@ import * as sinon from 'sinon'; import { setTimeout } from 'timers'; import { + abortable, aborted, BufferPool, ByteUtils, @@ -1174,7 +1175,7 @@ describe('driver utils', function () { it('rejects the returned promise', async () => { const controller = new AbortController(); controller.abort(new MongoError('my error')); - const error = await aborted(controller.signal).catch(error => error); + const error = await aborted(controller.signal).abort.catch(error => error); expect(error).to.be.instanceOf(MongoError); expect(error.message).to.equal('my error'); }); @@ -1185,7 +1186,7 @@ describe('driver utils', function () { const controller = new AbortController(); setTimeout(() => controller.abort(new MongoError('my error')), 10); const start = +new Date(); - const error = await aborted(controller.signal).catch(error => error); + const error = await aborted(controller.signal).abort.catch(error => error); const end = +new Date(); expect(error).to.be.instanceOf(MongoError); @@ -1197,11 +1198,76 @@ describe('driver utils', function () { context('when given a nullish value', () => { it('returns a promise that pends forever', async () => { const error = await Promise.race([ - aborted().catch(error => error), + aborted().abort.catch(error => error), aborted(AbortSignal.timeout(100)) ]).catch(error => error); expect(error.name).to.equal('TimeoutError'); }); }); }); + + describe('abortable()', () => { + it("rejects with the signal's reason", async () => { + const controller = new AbortController(); + controller.abort(new Error('my error')); + const error = await abortable(Promise.reject(new Error('Not expected')), { + signal: controller.signal + }).catch(error => error); + expect(error.message).to.equal('my error'); + }); + + context('when given a promise that rejects', () => { + it("rejects with the given promise's rejection value", async () => { + const controller = new AbortController(); + const error = await abortable(Promise.reject(new Error('my error')), { + signal: controller.signal + }).catch(error => error); + expect(error.message).to.equal('my error'); + }); + + it('cleans up abort listener', async () => { + const controller = new AbortController(); + const addEventListener = sinon.spy(controller.signal, 'addEventListener'); + const removeEventListener = sinon.spy(controller.signal, 'removeEventListener'); + await abortable(Promise.reject(), { signal: controller.signal }).catch(() => null); + expect(removeEventListener).to.have.been.called; + expect(removeEventListener).to.have.callCount(addEventListener.callCount); + }); + }); + + context('when given a promise that resolves', () => { + it("resolves with the given promise's resolution value", async () => { + const controller = new AbortController(); + const result = await abortable(Promise.resolve({ ok: 1 }), { + signal: controller.signal + }); + expect(result).to.have.property('ok', 1); + }); + + it('cleans up abort listener', async () => { + const controller = new AbortController(); + const addEventListener = sinon.spy(controller.signal, 'addEventListener'); + const removeEventListener = sinon.spy(controller.signal, 'removeEventListener'); + await abortable(Promise.resolve(), { signal: controller.signal }); + expect(removeEventListener).to.have.been.called; + expect(removeEventListener).to.have.callCount(addEventListener.callCount); + }); + }); + + context('when given a promise that pends forever', () => { + it('cleans up abort listener when given signal is aborted', async () => { + const controller = new AbortController(); + const addEventListener = sinon.spy(controller.signal, 'addEventListener'); + const removeEventListener = sinon.spy(controller.signal, 'removeEventListener'); + setTimeout(() => controller.abort(new MongoError('my error')), 10); + const error = await abortable(new Promise(() => null), { + signal: controller.signal + }).catch(error => error); + + expect(error?.message).to.equal('my error'); + expect(removeEventListener).to.have.been.called; + expect(removeEventListener).to.have.callCount(addEventListener.callCount); + }); + }); + }); }); From 04c261f9fdb77c884adf6b1fcc4aef94faa3386e Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 29 Nov 2023 11:15:12 -0500 Subject: [PATCH 33/44] chore: add todo --- src/cmap/connection.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 09a9d28c784..4d3f8003a01 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -1055,7 +1055,7 @@ export class ModernConnection extends TypedEventEmitter { response = await read(this, { signal }); } finally { - // Replace controller to avoid boundless 'abort' listeners + // TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners if (!signal.aborted) this.controller = new AbortController(); } From 62852727cf5c8bfd0083f78392c9a0cd41c58baf Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 29 Nov 2023 11:48:05 -0500 Subject: [PATCH 34/44] test: fix unit tests --- src/utils.ts | 4 +++- test/unit/utils.test.ts | 31 +++++++++++++++++++++++-------- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/utils.ts b/src/utils.ts index 5871695c098..3b013d7797c 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1322,7 +1322,9 @@ export function aborted(signal?: AbortSignal): { done: AbortController; } { const done = new AbortController(); - signal?.throwIfAborted(); + if (signal?.aborted) { + return { abort: Promise.reject(signal.reason), done }; + } const abort = new Promise((_, reject) => signal?.addEventListener('abort', () => reject(signal.reason), { once: true, diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index c30efc56a74..8e2bf6094a2 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -24,6 +24,7 @@ import { shuffle, TimeoutController } from '../mongodb'; +import { sleep } from '../tools/utils'; import { createTimerSandbox } from './timer_sandbox'; describe('driver utils', function () { @@ -1199,7 +1200,7 @@ describe('driver utils', function () { it('returns a promise that pends forever', async () => { const error = await Promise.race([ aborted().abort.catch(error => error), - aborted(AbortSignal.timeout(100)) + aborted(AbortSignal.timeout(100)).abort ]).catch(error => error); expect(error.name).to.equal('TimeoutError'); }); @@ -1207,13 +1208,27 @@ describe('driver utils', function () { }); describe('abortable()', () => { - it("rejects with the signal's reason", async () => { - const controller = new AbortController(); - controller.abort(new Error('my error')); - const error = await abortable(Promise.reject(new Error('Not expected')), { - signal: controller.signal - }).catch(error => error); - expect(error.message).to.equal('my error'); + context('when given a promise that is already rejected', () => { + it("rejects with the promises's reason", async () => { + const controller = new AbortController(); + controller.abort(new Error('Not expected')); + const error = await abortable(Promise.reject(new Error('my error')), { + signal: controller.signal + }).catch(error => error); + expect(error.message).to.equal('my error'); + }); + }); + + context('when given a signal that is already aborted', () => { + it("rejects with the signal's reason", async () => { + const controller = new AbortController(); + controller.abort(new Error('my error')); + const error = await abortable( + sleep(1).then(() => Promise.reject(new Error('Not expected'))), + { signal: controller.signal } + ).catch(error => error); + expect(error.message).to.equal('my error'); + }); }); context('when given a promise that rejects', () => { From effa15521e691ce9691fe6065731a7b70faf5fa2 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 29 Nov 2023 11:55:02 -0500 Subject: [PATCH 35/44] docs: add util docs --- src/utils.ts | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/utils.ts b/src/utils.ts index 3b013d7797c..91621b26ccd 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1300,9 +1300,17 @@ export function isHostMatch(match: RegExp, host?: string): boolean { return host && match.test(host.toLowerCase()) ? true : false; } +/** + * Takes a promise and races it with a promise wrapping the abort event of the optionally provided signal. + * If the signal aborts the returned promise will reject, otherwise the given promises resolved value will be returned. + * - The given promises rejection will proceed the aborted signal rejection. + * If given an already rejected promise and an aborted signal, the promise returned will reject with the given promises rejection value + * @param promise - A promise to discard if the signal aborts + * @param options - An options object carrying an optional signal + */ export async function abortable( promise: Promise, - { signal }: { signal?: AbortSignal } + { signal }: { signal?: AbortSignal } = {} ): Promise { const { abort, done } = aborted(signal); try { @@ -1315,6 +1323,7 @@ export async function abortable( /** * Takes an AbortSignal and creates a promise that will reject when the signal aborts * If the argument provided is nullish the returned promise will **never** resolve. + * Also returns a done controller - abort the done controller when your task is done to remove the abort listeners * @param signal - an optional abort signal to link to a promise rejection */ export function aborted(signal?: AbortSignal): { From 9347327a98f1fcd75a094cc7af799fac11507628 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 1 Dec 2023 10:44:31 -0500 Subject: [PATCH 36/44] daria: fixes Co-authored-by: Daria Pardue --- src/utils.ts | 4 ++-- test/unit/utils.test.ts | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/utils.ts b/src/utils.ts index 91621b26ccd..5e2d709db91 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1302,9 +1302,9 @@ export function isHostMatch(match: RegExp, host?: string): boolean { /** * Takes a promise and races it with a promise wrapping the abort event of the optionally provided signal. - * If the signal aborts the returned promise will reject, otherwise the given promises resolved value will be returned. + * If the signal aborts, the returned promise will reject; otherwise the given promise's resolved value will be returned. * - The given promises rejection will proceed the aborted signal rejection. - * If given an already rejected promise and an aborted signal, the promise returned will reject with the given promises rejection value + * If given an already rejected promise and an aborted signal, the promise returned will reject with the given promise's rejection value * @param promise - A promise to discard if the signal aborts * @param options - An options object carrying an optional signal */ diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index 8e2bf6094a2..85b3c416cb8 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -1209,7 +1209,7 @@ describe('driver utils', function () { describe('abortable()', () => { context('when given a promise that is already rejected', () => { - it("rejects with the promises's reason", async () => { + it("rejects with the promise's reason", async () => { const controller = new AbortController(); controller.abort(new Error('Not expected')); const error = await abortable(Promise.reject(new Error('my error')), { From f7da8108ec8d17ac159e2348c0da90b151890eb5 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 1 Dec 2023 11:02:59 -0500 Subject: [PATCH 37/44] daria: depluralize Co-authored-by: Daria Pardue --- test/integration/sessions/sessions.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/sessions/sessions.test.ts b/test/integration/sessions/sessions.test.ts index 1647beb9497..607ac5b2bfa 100644 --- a/test/integration/sessions/sessions.test.ts +++ b/test/integration/sessions/sessions.test.ts @@ -410,7 +410,7 @@ describe('Sessions Spec', function () { await utilClient?.close(); }); - it('should only use one sessions for many operations when maxPoolSize is 1', async () => { + it('should only use one session for many operations when maxPoolSize is 1', async () => { const documents = Array.from({ length: 50 }).map((_, idx) => ({ _id: idx })); const events: CommandStartedEvent[] = []; From 94762305ff7a126d4da22f2ac1bfa9c1c721b2ab Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 1 Dec 2023 11:05:36 -0500 Subject: [PATCH 38/44] ci: only specify needed var --- .evergreen/config.in.yml | 3 ++- .evergreen/config.yml | 3 ++- .evergreen/run-tests.sh | 5 ----- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/.evergreen/config.in.yml b/.evergreen/config.in.yml index 15f8ab4b73f..e916165e9b2 100644 --- a/.evergreen/config.in.yml +++ b/.evergreen/config.in.yml @@ -200,7 +200,8 @@ functions: type: test params: working_dir: "src" - add_expansions_to_env: true + env: + MONGODB_NEW_CONNECTION: ${MONGODB_NEW_CONNECTION|false} timeout_secs: 300 shell: bash script: | diff --git a/.evergreen/config.yml b/.evergreen/config.yml index d32fa2e78af..16656047b8c 100644 --- a/.evergreen/config.yml +++ b/.evergreen/config.yml @@ -170,7 +170,8 @@ functions: type: test params: working_dir: src - add_expansions_to_env: true + env: + MONGODB_NEW_CONNECTION: ${MONGODB_NEW_CONNECTION|false} timeout_secs: 300 shell: bash script: | diff --git a/.evergreen/run-tests.sh b/.evergreen/run-tests.sh index c0ff5280166..170547a6340 100755 --- a/.evergreen/run-tests.sh +++ b/.evergreen/run-tests.sh @@ -55,11 +55,6 @@ fi npm install @mongodb-js/zstd npm install snappy -if [[ "$OS" = "Windows_NT" ]]; then - # TODO: Temp fix for add_expansions_to_env usage that breaks Windows - unset CSFLE_KMS_PROVIDERS -fi - export AUTH=$AUTH export SINGLE_MONGOS_LB_URI=${SINGLE_MONGOS_LB_URI} export MULTI_MONGOS_LB_URI=${MULTI_MONGOS_LB_URI} From 9939dfe5ebc0bd0bcd6de85190fc6bc20d9fa74e Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 1 Dec 2023 11:24:46 -0500 Subject: [PATCH 39/44] docs: improved for abortable --- src/utils.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/utils.ts b/src/utils.ts index 5e2d709db91..e9e0b2d36fe 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1302,9 +1302,11 @@ export function isHostMatch(match: RegExp, host?: string): boolean { /** * Takes a promise and races it with a promise wrapping the abort event of the optionally provided signal. - * If the signal aborts, the returned promise will reject; otherwise the given promise's resolved value will be returned. - * - The given promises rejection will proceed the aborted signal rejection. - * If given an already rejected promise and an aborted signal, the promise returned will reject with the given promise's rejection value + * The given promise is _always_ ordered before the signal's abort promise. + * When given an already rejected promise and an already aborted signal, the promises rejection takes precedence. + * + * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/race + * * @param promise - A promise to discard if the signal aborts * @param options - An options object carrying an optional signal */ From 6315e0e59a4748e27561b5bfd2b912a3ed8c1009 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 1 Dec 2023 11:42:59 -0500 Subject: [PATCH 40/44] test: complete coverage of abortable helper --- test/unit/utils.test.ts | 115 +++++++++++++++++++++++----------------- 1 file changed, 65 insertions(+), 50 deletions(-) diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index 85b3c416cb8..305f488b146 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -1208,80 +1208,95 @@ describe('driver utils', function () { }); describe('abortable()', () => { - context('when given a promise that is already rejected', () => { - it("rejects with the promise's reason", async () => { + const goodError = new Error('good error'); + const badError = new Error('unexpected bad error!'); + const expectedValue = "don't panic"; + + context('when given already rejected promise with already aborted signal', () => { + it('returns promise rejection', async () => { const controller = new AbortController(); - controller.abort(new Error('Not expected')); - const error = await abortable(Promise.reject(new Error('my error')), { - signal: controller.signal - }).catch(error => error); - expect(error.message).to.equal('my error'); + const { signal } = controller; + controller.abort(badError); + const result = await abortable(Promise.reject(goodError), { signal }).catch(e => e); + expect(result).to.deep.equal(goodError); }); }); - context('when given a signal that is already aborted', () => { - it("rejects with the signal's reason", async () => { + context('when given already resolved promise with already aborted signal', () => { + it('returns promise resolution', async () => { const controller = new AbortController(); - controller.abort(new Error('my error')); - const error = await abortable( - sleep(1).then(() => Promise.reject(new Error('Not expected'))), - { signal: controller.signal } - ).catch(error => error); - expect(error.message).to.equal('my error'); + const { signal } = controller; + controller.abort(badError); + const result = await abortable(Promise.resolve(expectedValue), { signal }).catch(e => e); + expect(result).to.deep.equal(expectedValue); }); }); - context('when given a promise that rejects', () => { - it("rejects with the given promise's rejection value", async () => { + context('when given already rejected promise with not yet aborted signal', () => { + it('returns promise rejection', async () => { const controller = new AbortController(); - const error = await abortable(Promise.reject(new Error('my error')), { - signal: controller.signal - }).catch(error => error); - expect(error.message).to.equal('my error'); + const { signal } = controller; + const result = await abortable(Promise.reject(goodError), { signal }).catch(e => e); + expect(result).to.deep.equal(goodError); }); + }); - it('cleans up abort listener', async () => { + context('when given already resolved promise with not yet aborted signal', () => { + it('returns promise resolution', async () => { const controller = new AbortController(); - const addEventListener = sinon.spy(controller.signal, 'addEventListener'); - const removeEventListener = sinon.spy(controller.signal, 'removeEventListener'); - await abortable(Promise.reject(), { signal: controller.signal }).catch(() => null); - expect(removeEventListener).to.have.been.called; - expect(removeEventListener).to.have.callCount(addEventListener.callCount); + const { signal } = controller; + const result = await abortable(Promise.resolve(expectedValue), { signal }).catch(e => e); + expect(result).to.deep.equal(expectedValue); }); }); - context('when given a promise that resolves', () => { - it("resolves with the given promise's resolution value", async () => { + context('when given unresolved promise with an already aborted signal', () => { + it('returns signal reason', async () => { const controller = new AbortController(); - const result = await abortable(Promise.resolve({ ok: 1 }), { - signal: controller.signal - }); - expect(result).to.have.property('ok', 1); + const { signal } = controller; + controller.abort(goodError); + const result = await abortable(new Promise(() => null), { signal }).catch(e => e); + expect(result).to.deep.equal(goodError); }); + }); + + context('when given eventually rejecting promise with not yet aborted signal', () => { + const eventuallyReject = async () => { + await sleep(1); + throw goodError; + }; - it('cleans up abort listener', async () => { + it('returns promise rejection', async () => { const controller = new AbortController(); - const addEventListener = sinon.spy(controller.signal, 'addEventListener'); - const removeEventListener = sinon.spy(controller.signal, 'removeEventListener'); - await abortable(Promise.resolve(), { signal: controller.signal }); - expect(removeEventListener).to.have.been.called; - expect(removeEventListener).to.have.callCount(addEventListener.callCount); + const { signal } = controller; + const result = await abortable(eventuallyReject(), { signal }).catch(e => e); + expect(result).to.deep.equal(goodError); }); }); - context('when given a promise that pends forever', () => { - it('cleans up abort listener when given signal is aborted', async () => { + context('when given eventually resolving promise with not yet aborted signal', () => { + const eventuallyResolve = async () => { + await sleep(1); + return expectedValue; + }; + + it('returns promise resolution', async () => { const controller = new AbortController(); - const addEventListener = sinon.spy(controller.signal, 'addEventListener'); - const removeEventListener = sinon.spy(controller.signal, 'removeEventListener'); - setTimeout(() => controller.abort(new MongoError('my error')), 10); - const error = await abortable(new Promise(() => null), { - signal: controller.signal - }).catch(error => error); + const { signal } = controller; + const result = await abortable(eventuallyResolve(), { signal }).catch(e => e); + expect(result).to.deep.equal(expectedValue); + }); + }); + + context('when given unresolved promise with eventually aborted signal', () => { + it('returns signal reason', async () => { + const controller = new AbortController(); + const { signal } = controller; + + setTimeout(() => controller.abort(goodError), 1); - expect(error?.message).to.equal('my error'); - expect(removeEventListener).to.have.been.called; - expect(removeEventListener).to.have.callCount(addEventListener.callCount); + const result = await abortable(new Promise(() => null), { signal }).catch(e => e); + expect(result).to.deep.equal(goodError); }); }); }); From ccf3d2d896c8ae9fc1c7635319fbb4f7cc7a62f7 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 1 Dec 2023 15:33:26 -0500 Subject: [PATCH 41/44] daria: fix Co-authored-by: Daria Pardue --- src/utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils.ts b/src/utils.ts index e9e0b2d36fe..924df71bea0 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1303,7 +1303,7 @@ export function isHostMatch(match: RegExp, host?: string): boolean { /** * Takes a promise and races it with a promise wrapping the abort event of the optionally provided signal. * The given promise is _always_ ordered before the signal's abort promise. - * When given an already rejected promise and an already aborted signal, the promises rejection takes precedence. + * When given an already rejected promise and an already aborted signal, the promise's rejection takes precedence. * * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/race * From d49ef29a4e79b1799c5b516deae99c2787ac5787 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 1 Dec 2023 15:55:59 -0500 Subject: [PATCH 42/44] chore: test and noResponse --- src/cmap/connection.ts | 4 +-- src/utils.ts | 2 +- test/unit/utils.test.ts | 78 ++++++++++++++++++++++++----------------- 3 files changed, 49 insertions(+), 35 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 4d3f8003a01..6168d38ec04 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -1049,10 +1049,10 @@ export class ModernConnection extends TypedEventEmitter { signal }); - signal.throwIfAborted(); - if (options.noResponse) return { ok: 1 }; + signal.throwIfAborted(); + response = await read(this, { signal }); } finally { // TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners diff --git a/src/utils.ts b/src/utils.ts index 924df71bea0..20e2468e6a0 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1328,7 +1328,7 @@ export async function abortable( * Also returns a done controller - abort the done controller when your task is done to remove the abort listeners * @param signal - an optional abort signal to link to a promise rejection */ -export function aborted(signal?: AbortSignal): { +function aborted(signal?: AbortSignal): { abort: Promise; done: AbortController; } { diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index 305f488b146..ba71828e8ad 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -1171,46 +1171,60 @@ describe('driver utils', function () { }); }); - describe('aborted()', () => { - context('when given an aborted signal', () => { - it('rejects the returned promise', async () => { - const controller = new AbortController(); - controller.abort(new MongoError('my error')); - const error = await aborted(controller.signal).abort.catch(error => error); - expect(error).to.be.instanceOf(MongoError); - expect(error.message).to.equal('my error'); + describe('abortable()', () => { + const goodError = new Error('good error'); + const badError = new Error('unexpected bad error!'); + const expectedValue = "don't panic"; + + context('when not given a signal', () => { + it('returns promise fulfillment', async () => { + expect(await abortable(Promise.resolve(expectedValue))).to.equal(expectedValue); + expect(await abortable(Promise.reject(goodError)).catch(e => e)).to.equal(goodError); + }); + + it('pends indefinitely', async () => { + const forever = abortable(new Promise(() => null)); + // Assume 100ms is good enough to prove "forever" + expect(await Promise.race([forever, sleep(100).then(() => expectedValue)])).to.equal( + expectedValue + ); }); }); - context('when given a signal that aborts after some time', () => { - it('rejects the returned promise after the timeout', async () => { - const controller = new AbortController(); - setTimeout(() => controller.abort(new MongoError('my error')), 10); - const start = +new Date(); - const error = await aborted(controller.signal).abort.catch(error => error); - const end = +new Date(); + context('always removes abort listener it attaches', () => { + let controller; + let removeEventListenerSpy; + let addEventListenerSpy; - expect(error).to.be.instanceOf(MongoError); - expect(error.message).to.equal('my error'); - expect(end - start).to.be.within(8, 13); + beforeEach(() => { + controller = new AbortController(); + addEventListenerSpy = sinon.spy(controller.signal, 'addEventListener'); + removeEventListenerSpy = sinon.spy(controller.signal, 'removeEventListener'); }); - }); - context('when given a nullish value', () => { - it('returns a promise that pends forever', async () => { - const error = await Promise.race([ - aborted().abort.catch(error => error), - aborted(AbortSignal.timeout(100)).abort - ]).catch(error => error); - expect(error.name).to.equal('TimeoutError'); + afterEach(() => sinon.restore()); + + const expectListenerCleanup = () => { + expect(addEventListenerSpy).to.have.been.calledOnce; + expect(removeEventListenerSpy).to.have.been.calledOnce; + }; + + it('when promise rejects', async () => { + await abortable(Promise.reject(goodError), { signal: controller.signal }).catch(e => e); + expectListenerCleanup(); }); - }); - }); - describe('abortable()', () => { - const goodError = new Error('good error'); - const badError = new Error('unexpected bad error!'); - const expectedValue = "don't panic"; + it('when promise resolves', async () => { + await abortable(Promise.resolve(expectedValue), { signal: controller.signal }); + expectListenerCleanup(); + }); + + it('when signal aborts', async () => { + setTimeout(() => controller.abort(goodError)); + await abortable(new Promise(() => null), { signal: controller.signal }).catch(e => e); + expectListenerCleanup(); + }); + }); context('when given already rejected promise with already aborted signal', () => { it('returns promise rejection', async () => { From 5602453adb7e2173e0720845e8ab3ee06ab394c6 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 1 Dec 2023 16:09:24 -0500 Subject: [PATCH 43/44] lint: imports --- test/unit/utils.test.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index ba71828e8ad..1559f1d4ff0 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -4,7 +4,6 @@ import { setTimeout } from 'timers'; import { abortable, - aborted, BufferPool, ByteUtils, compareObjectId, @@ -18,7 +17,6 @@ import { maybeCallback, MongoDBCollectionNamespace, MongoDBNamespace, - MongoError, MongoRuntimeError, ObjectId, shuffle, From 293a46aca38be04fd0168fba2b503c2fecd9fce3 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 1 Dec 2023 16:30:25 -0500 Subject: [PATCH 44/44] daria: test title improvements Co-authored-by: Daria Pardue --- test/unit/utils.test.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index 1559f1d4ff0..980daf498a6 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -1175,12 +1175,12 @@ describe('driver utils', function () { const expectedValue = "don't panic"; context('when not given a signal', () => { - it('returns promise fulfillment', async () => { + it('returns promise fulfillment if the promise resolves or rejects', async () => { expect(await abortable(Promise.resolve(expectedValue))).to.equal(expectedValue); expect(await abortable(Promise.reject(goodError)).catch(e => e)).to.equal(goodError); }); - it('pends indefinitely', async () => { + it('pends indefinitely if the promise is never settled', async () => { const forever = abortable(new Promise(() => null)); // Assume 100ms is good enough to prove "forever" expect(await Promise.race([forever, sleep(100).then(() => expectedValue)])).to.equal( @@ -1189,7 +1189,7 @@ describe('driver utils', function () { }); }); - context('always removes abort listener it attaches', () => { + context('always removes the abort listener it attaches', () => { let controller; let removeEventListenerSpy; let addEventListenerSpy;