diff --git a/.evergreen/config.in.yml b/.evergreen/config.in.yml index c7c2be41800..e916165e9b2 100644 --- a/.evergreen/config.in.yml +++ b/.evergreen/config.in.yml @@ -200,6 +200,8 @@ functions: type: test params: working_dir: "src" + env: + MONGODB_NEW_CONNECTION: ${MONGODB_NEW_CONNECTION|false} timeout_secs: 300 shell: bash script: | diff --git a/.evergreen/config.yml b/.evergreen/config.yml index b600c90e9f6..c494c76e215 100644 --- a/.evergreen/config.yml +++ b/.evergreen/config.yml @@ -170,6 +170,8 @@ functions: type: test params: working_dir: src + env: + MONGODB_NEW_CONNECTION: ${MONGODB_NEW_CONNECTION|false} timeout_secs: 300 shell: bash script: | @@ -4552,6 +4554,42 @@ buildvariants: - test-3.6-server-noauth - test-3.6-replica_set-noauth - test-3.6-sharded_cluster-noauth + - name: rhel8-new-connection-tests + display_name: New Connection Tests + run_on: rhel80-large + expansions: + NODE_LTS_VERSION: 20 + CLIENT_ENCRYPTION: true + MONGODB_NEW_CONNECTION: true + tasks: + - test-latest-server + - test-latest-replica_set + - test-latest-sharded_cluster + - test-rapid-server + - test-rapid-replica_set + - test-rapid-sharded_cluster + - test-7.0-server + - test-7.0-replica_set + - test-7.0-sharded_cluster + - test-6.0-server + - test-6.0-replica_set + - test-6.0-sharded_cluster + - test-5.0-server + - test-5.0-replica_set + - test-5.0-sharded_cluster + - test-4.4-server + - test-4.4-replica_set + - test-4.4-sharded_cluster + - test-4.2-server + - test-4.2-replica_set + - test-4.2-sharded_cluster + - test-4.0-server + - test-4.0-replica_set + - test-4.0-sharded_cluster + - test-3.6-server + - test-3.6-replica_set + - test-3.6-sharded_cluster + - test-latest-server-v1-api - name: rhel8-test-lambda display_name: AWS Lambda handler tests run_on: rhel80-large diff --git a/.evergreen/generate_evergreen_tasks.js b/.evergreen/generate_evergreen_tasks.js index 0db9af1bd12..a1233fe3fa7 100644 --- a/.evergreen/generate_evergreen_tasks.js +++ b/.evergreen/generate_evergreen_tasks.js @@ -728,6 +728,18 @@ BUILD_VARIANTS.push({ tasks: AUTH_DISABLED_TASKS.map(({ name }) => name) }); +BUILD_VARIANTS.push({ + name: 'rhel8-new-connection-tests', + display_name: 'New Connection Tests', + run_on: DEFAULT_OS, + expansions: { + NODE_LTS_VERSION: LATEST_LTS, + CLIENT_ENCRYPTION: true, + MONGODB_NEW_CONNECTION: true + }, + tasks: BASE_TASKS.map(({ name }) => name) +}); + BUILD_VARIANTS.push({ name: 'rhel8-test-lambda', display_name: 'AWS Lambda handler tests', diff --git a/.evergreen/run-tests.sh b/.evergreen/run-tests.sh index 080857cd381..170547a6340 100755 --- a/.evergreen/run-tests.sh +++ b/.evergreen/run-tests.sh @@ -63,4 +63,5 @@ export MONGODB_URI=${MONGODB_URI} export LOAD_BALANCER=${LOAD_BALANCER} export TEST_CSFLE=${TEST_CSFLE} export COMPRESSOR=${COMPRESSOR} +export MONGODB_NEW_CONNECTION=${MONGODB_NEW_CONNECTION} npm run "${TEST_NPM_SCRIPT}" diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index d505496519e..6168d38ec04 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -1,4 +1,3 @@ -import { once } from 'events'; import { on } from 'stream'; import { clearTimeout, setTimeout } from 'timers'; import { promisify } from 'util'; @@ -23,6 +22,7 @@ import { MongoParseError, MongoRuntimeError, MongoServerError, + MongoUnexpectedServerResponseError, MongoWriteConcernError } from '../error'; import type { ServerApi, SupportedNodeConnectionOptions } from '../mongo_client'; @@ -30,6 +30,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types'; import type { ReadPreferenceLike } from '../read_preference'; import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions'; import { + abortable, BufferPool, calculateDurationInMs, type Callback, @@ -778,21 +779,15 @@ export class ModernConnection extends TypedEventEmitter { address: string; socketTimeoutMS: number; monitorCommands: boolean; - /** Indicates that the connection (including underlying TCP socket) has been closed. */ - closed: boolean; lastHelloMS?: number; serverApi?: ServerApi; helloOk?: boolean; - commandAsync: ( - ns: MongoDBNamespace, - cmd: Document, - options: CommandOptions | undefined - ) => Promise; + commandAsync: ModernConnection['command']; /** @internal */ authContext?: AuthContext; /**@internal */ - [kDelayedTimeoutId]: NodeJS.Timeout | null; + delayedTimeoutId: NodeJS.Timeout | null = null; /** @internal */ [kDescription]: StreamDescription; /** @internal */ @@ -800,11 +795,8 @@ export class ModernConnection extends TypedEventEmitter { /** @internal */ [kLastUseTime]: number; /** @internal */ - [kQueue]: Map; - /** @internal */ - [kMessageStream]: MessageStream; - /** @internal */ socket: Stream; + controller: AbortController; /** @internal */ [kHello]: Document | null; /** @internal */ @@ -830,21 +822,13 @@ export class ModernConnection extends TypedEventEmitter { constructor(stream: Stream, options: ConnectionOptions) { super(); - this.commandAsync = promisify( - ( - ns: MongoDBNamespace, - cmd: Document, - options: CommandOptions | undefined, - callback: Callback - ) => this.command(ns, cmd, options, callback as any) - ); + this.commandAsync = this.command.bind(this); this.id = options.id; this.address = streamIdentifier(stream, options); this.socketTimeoutMS = options.socketTimeoutMS ?? 0; this.monitorCommands = options.monitorCommands; this.serverApi = options.serverApi; - this.closed = false; this[kHello] = null; this[kClusterTime] = null; @@ -852,27 +836,16 @@ export class ModernConnection extends TypedEventEmitter { this[kGeneration] = options.generation; this[kLastUseTime] = now(); - // setup parser stream and message handling - this[kQueue] = new Map(); - this[kMessageStream] = new MessageStream({ - ...options, - maxBsonMessageSize: this.hello?.maxBsonMessageSize - }); this.socket = stream; + this.controller = new AbortController(); + this.socket.on('error', this.onError.bind(this)); + this.socket.on('close', this.onClose.bind(this)); + this.socket.on('timeout', this.onTimeout.bind(this)); + } - this[kDelayedTimeoutId] = null; - - this[kMessageStream].on('message', message => this.onMessage(message)); - this[kMessageStream].on('error', error => this.onError(error)); - this.socket.on('close', () => this.onClose()); - this.socket.on('timeout', () => this.onTimeout()); - this.socket.on('error', () => { - /* ignore errors, listen to `close` instead */ - }); - - // hook the message stream up to the passed in stream - this.socket.pipe(this[kMessageStream]); - this[kMessageStream].pipe(this.socket); + /** Indicates that the connection (including underlying TCP socket) has been closed. */ + get closed(): boolean { + return this.controller.signal.aborted; } get description(): StreamDescription { @@ -892,15 +865,6 @@ export class ModernConnection extends TypedEventEmitter { this[kHello] = response; } - // Set the whether the message stream is for a monitoring connection. - set isMonitoringConnection(value: boolean) { - this[kMessageStream].isMonitoringConnection = value; - } - - get isMonitoringConnection(): boolean { - return this[kMessageStream].isMonitoringConnection; - } - get serviceId(): ObjectId | undefined { return this.hello?.serviceId; } @@ -945,116 +909,26 @@ export class ModernConnection extends TypedEventEmitter { this[kLastUseTime] = now(); } - onError(error: Error) { - this.cleanup(true, error); + onError(error?: Error) { + this.cleanup(error); } onClose() { const message = `connection ${this.id} to ${this.address} closed`; - this.cleanup(true, new MongoNetworkError(message)); + this.cleanup(new MongoNetworkError(message)); } onTimeout() { - this[kDelayedTimeoutId] = setTimeout(() => { + this.delayedTimeoutId = setTimeout(() => { const message = `connection ${this.id} to ${this.address} timed out`; const beforeHandshake = this.hello == null; - this.cleanup(true, new MongoNetworkTimeoutError(message, { beforeHandshake })); + this.cleanup(new MongoNetworkTimeoutError(message, { beforeHandshake })); }, 1).unref(); // No need for this timer to hold the event loop open } - onMessage(message: OpMsgResponse | OpQueryResponse) { - const delayedTimeoutId = this[kDelayedTimeoutId]; - if (delayedTimeoutId != null) { - clearTimeout(delayedTimeoutId); - this[kDelayedTimeoutId] = null; - } - - const socketTimeoutMS = this.socket.timeout ?? 0; - this.socket.setTimeout(0); - - // always emit the message, in case we are streaming - this.emit('message', message); - let operationDescription = this[kQueue].get(message.responseTo); - - if (!operationDescription && this.isMonitoringConnection) { - // This is how we recover when the initial hello's requestId is not - // the responseTo when hello responses have been skipped: - - // First check if the map is of invalid size - if (this[kQueue].size > 1) { - this.cleanup(true, new MongoRuntimeError(INVALID_QUEUE_SIZE)); - } else { - // Get the first orphaned operation description. - const entry = this[kQueue].entries().next(); - if (entry.value != null) { - const [requestId, orphaned]: [number, OperationDescription] = entry.value; - // If the orphaned operation description exists then set it. - operationDescription = orphaned; - // Remove the entry with the bad request id from the queue. - this[kQueue].delete(requestId); - } - } - } - - if (!operationDescription) { - return; - } - - const callback = operationDescription.cb; - - // SERVER-45775: For exhaust responses we should be able to use the same requestId to - // track response, however the server currently synthetically produces remote requests - // making the `responseTo` change on each response - this[kQueue].delete(message.responseTo); - if ('moreToCome' in message && message.moreToCome) { - // If the operation description check above does find an orphaned - // description and sets the operationDescription then this line will put one - // back in the queue with the correct requestId and will resolve not being able - // to find the next one via the responseTo of the next streaming hello. - this[kQueue].set(message.requestId, operationDescription); - this.socket.setTimeout(socketTimeoutMS); - } - - try { - // Pass in the entire description because it has BSON parsing options - message.parse(operationDescription); - } catch (err) { - // If this error is generated by our own code, it will already have the correct class applied - // if it is not, then it is coming from a catastrophic data parse failure or the BSON library - // in either case, it should not be wrapped - callback(err); - return; - } - - if (message.documents[0]) { - const document: Document = message.documents[0]; - const session = operationDescription.session; - if (session) { - updateSessionFromResponse(session, document); - } - - if (document.$clusterTime) { - this[kClusterTime] = document.$clusterTime; - this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime); - } - - if (document.writeConcernError) { - callback(new MongoWriteConcernError(document.writeConcernError, document), document); - return; - } - - if (document.ok === 0 || document.$err || document.errmsg || document.code) { - callback(new MongoServerError(document)); - return; - } - } - - callback(undefined, message.documents[0]); - } - destroy(options: DestroyOptions, callback?: Callback): void { if (this.closed) { - process.nextTick(() => callback?.()); + if (typeof callback === 'function') process.nextTick(callback); return; } if (typeof callback === 'function') { @@ -1067,7 +941,7 @@ export class ModernConnection extends TypedEventEmitter { this.removeAllListeners(Connection.PINNED); this.removeAllListeners(Connection.UNPINNED); const message = `connection ${this.id} to ${this.address} closed`; - this.cleanup(options.force, new MongoNetworkError(message)); + this.cleanup(new MongoNetworkError(message)); } /** @@ -1078,50 +952,17 @@ export class ModernConnection extends TypedEventEmitter { * * This method does nothing if the connection is already closed. */ - private cleanup(force: boolean, error?: Error): void { + private cleanup(error?: Error): void { if (this.closed) { return; } - this.closed = true; - - const completeCleanup = () => { - for (const op of this[kQueue].values()) { - op.cb(error); - } - - this[kQueue].clear(); - - this.emit(Connection.CLOSE); - }; - - this.socket.removeAllListeners(); - this[kMessageStream].removeAllListeners(); - - this[kMessageStream].destroy(); - - if (force) { - this.socket.destroy(); - completeCleanup(); - return; - } - - if (!this.socket.writableEnded) { - this.socket.end(() => { - this.socket.destroy(); - completeCleanup(); - }); - } else { - completeCleanup(); - } + this.socket.destroy(); + this.controller.abort(error); + this.emit(Connection.CLOSE); } - command( - ns: MongoDBNamespace, - command: Document, - options: CommandOptions | undefined, - callback: Callback - ): void { + private prepareCommand(db: string, command: Document, options: CommandOptions) { let cmd = { ...command }; const readPreference = getReadPreference(options); @@ -1145,12 +986,10 @@ export class ModernConnection extends TypedEventEmitter { clusterTime = session.clusterTime; } - const err = applySession(session, cmd, options); - if (err) { - return callback(err); - } + const sessionError = applySession(session, cmd, options); + if (sessionError) throw sessionError; } else if (session?.explicit) { - return callback(new MongoCompatibilityError('Current topology does not support sessions')); + throw new MongoCompatibilityError('Current topology does not support sessions'); } // if we have a known cluster time, gossip it @@ -1171,26 +1010,151 @@ export class ModernConnection extends TypedEventEmitter { }; } - const commandOptions: Document = Object.assign( - { - numberToSkip: 0, - numberToReturn: -1, - checkKeys: false, - // This value is not overridable - secondaryOk: readPreference.secondaryOk() - }, - options - ); + const commandOptions = { + numberToSkip: 0, + numberToReturn: -1, + checkKeys: false, + // This value is not overridable + secondaryOk: readPreference.secondaryOk(), + ...options, + readPreference // ensure we pass in ReadPreference instance + }; const message = this.supportsOpMsg - ? new OpMsgRequest(ns.db, cmd, commandOptions) - : new OpQueryRequest(ns.db, cmd, commandOptions); + ? new OpMsgRequest(db, cmd, commandOptions) + : new OpQueryRequest(db, cmd, commandOptions); + + return message; + } + + private async sendCommand( + message: WriteProtocolMessageType, + options: CommandOptions + ): Promise { + const { signal } = this.controller; + signal.throwIfAborted(); + + if (typeof options.socketTimeoutMS === 'number') { + this.socket.setTimeout(options.socketTimeoutMS); + } else if (this.socketTimeoutMS !== 0) { + this.socket.setTimeout(this.socketTimeoutMS); + } + + let response; try { - write(this as any as Connection, message, commandOptions, callback); - } catch (err) { - callback(err); + await writeCommand(this, message, { + agreedCompressor: this.description.compressor ?? 'none', + zlibCompressionLevel: this.description.zlibCompressionLevel, + signal + }); + + if (options.noResponse) return { ok: 1 }; + + signal.throwIfAborted(); + + response = await read(this, { signal }); + } finally { + // TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners + if (!signal.aborted) this.controller = new AbortController(); + } + + response.parse(options); + + const [document] = response.documents; + + if (!Buffer.isBuffer(document)) { + const { session } = options; + if (session) { + updateSessionFromResponse(session, document); + } + + if (document.$clusterTime) { + this[kClusterTime] = document.$clusterTime; + this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime); + } } + + return document; + } + + async command( + ns: MongoDBNamespace, + command: Document, + options: CommandOptions = {} + ): Promise { + const message = this.prepareCommand(ns.db, command, options); + + let started = 0; + if (this.monitorCommands) { + started = now(); + this.emit( + ModernConnection.COMMAND_STARTED, + new CommandStartedEvent(this as unknown as Connection, message) + ); + } + + let document = null; + try { + document = await this.sendCommand(message, options); + } catch (ioError) { + if (this.monitorCommands) { + this.emit( + ModernConnection.COMMAND_FAILED, + new CommandFailedEvent(this as unknown as Connection, message, ioError, started) + ); + } + throw ioError; + } + + if (document == null) { + const unexpected = new MongoUnexpectedServerResponseError( + 'sendCommand did not throw and did not return a document' + ); + if (this.monitorCommands) { + this.emit( + ModernConnection.COMMAND_FAILED, + new CommandFailedEvent(this as unknown as Connection, message, unexpected, started) + ); + } + throw unexpected; + } + + if (document.writeConcernError) { + const writeConcernError = new MongoWriteConcernError(document.writeConcernError, document); + if (this.monitorCommands) { + this.emit( + ModernConnection.COMMAND_SUCCEEDED, + new CommandSucceededEvent(this as unknown as Connection, message, document, started) + ); + } + throw writeConcernError; + } + + if (document.ok === 0 || document.$err || document.errmsg || document.code) { + const serverError = new MongoServerError(document); + if (this.monitorCommands) { + this.emit( + ModernConnection.COMMAND_FAILED, + new CommandFailedEvent(this as unknown as Connection, message, serverError, started) + ); + } + throw serverError; + } + + if (this.monitorCommands) { + this.emit( + ModernConnection.COMMAND_SUCCEEDED, + new CommandSucceededEvent( + this as unknown as Connection, + message, + options.noResponse ? undefined : document, + started + ) + ); + } + + return document; } } @@ -1208,11 +1172,17 @@ const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4; * Note that `for-await` loops call `return` automatically when the loop is exited. */ export async function* readWireProtocolMessages( - connection: ModernConnection + connection: ModernConnection, + { signal }: { signal?: AbortSignal } = {} ): AsyncGenerator { const bufferPool = new BufferPool(); const maxBsonMessageSize = connection.hello?.maxBsonMessageSize ?? kDefaultMaxBsonMessageSize; - for await (const [chunk] of on(connection.socket, 'data')) { + for await (const [chunk] of on(connection.socket, 'data', { signal })) { + if (connection.delayedTimeoutId) { + clearTimeout(connection.delayedTimeoutId); + connection.delayedTimeoutId = null; + } + bufferPool.append(chunk); const sizeOfMessage = bufferPool.getInt32(); @@ -1247,9 +1217,10 @@ export async function* readWireProtocolMessages( export async function writeCommand( connection: ModernConnection, command: WriteProtocolMessageType, - options: Partial> + options: Partial> & { + signal?: AbortSignal; + } ): Promise { - const drained = once(connection.socket, 'drain'); const finalCommand = options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command) ? command @@ -1257,9 +1228,12 @@ export async function writeCommand( agreedCompressor: options.agreedCompressor ?? 'none', zlibCompressionLevel: options.zlibCompressionLevel ?? 0 }); + const buffer = Buffer.concat(await finalCommand.toBin()); - connection.socket.push(buffer); - await drained; + + const socketWriteFn = promisify(connection.socket.write.bind(connection.socket)); + + return abortable(socketWriteFn(buffer), options); } /** @@ -1272,9 +1246,10 @@ export async function writeCommand( * Note that `for-await` loops call `return` automatically when the loop is exited. */ export async function* readMany( - connection: ModernConnection + connection: ModernConnection, + options: { signal?: AbortSignal } = {} ): AsyncGenerator { - for await (const message of readWireProtocolMessages(connection)) { + for await (const message of readWireProtocolMessages(connection, options)) { const response = await decompressResponse(message); yield response; @@ -1289,8 +1264,11 @@ export async function* readMany( * * Reads a single wire protocol message out of a connection. */ -export async function read(connection: ModernConnection): Promise { - for await (const value of readMany(connection)) { +export async function read( + connection: ModernConnection, + options: { signal?: AbortSignal } = {} +): Promise { + for await (const value of readMany(connection, options)) { return value; } diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 187d6b09a00..a4f0edfac0a 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -218,8 +218,8 @@ export class ConnectionPool extends TypedEventEmitter { super(); this.options = Object.freeze({ - ...options, connectionType: Connection, + ...options, maxPoolSize: options.maxPoolSize ?? 100, minPoolSize: options.minPoolSize ?? 0, maxConnecting: options.maxConnecting ?? 2, diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index f4d9404d0cb..1cc7cfad7f1 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -127,24 +127,21 @@ export class Monitor extends TypedEventEmitter { const cancellationToken = this[kCancellationToken]; // TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration - const connectOptions = Object.assign( - { - id: '' as const, - generation: server.pool.generation, - connectionType: Connection, - cancellationToken, - hostAddress: server.description.hostAddress - }, - options, + const connectOptions = { + id: '' as const, + generation: server.pool.generation, + cancellationToken, + hostAddress: server.description.hostAddress, + ...options, // force BSON serialization options - { - raw: false, - useBigInt64: false, - promoteLongs: true, - promoteValues: true, - promoteBuffers: true - } - ); + raw: false, + useBigInt64: false, + promoteLongs: true, + promoteValues: true, + promoteBuffers: true, + // TODO(NODE-5741): override monitors to use old connection + connectionType: Connection + }; // ensure no authentication is used for monitoring delete connectOptions.credentials; diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 8e6d2b13d39..04139c991ff 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -367,14 +367,13 @@ export class Server extends TypedEventEmitter { return cb(err); } - conn.command( - ns, - cmd, - finalOptions, - makeOperationHandler(this, conn, cmd, finalOptions, (error, response) => { - this.decrementOperationCount(); - cb(error, response); - }) + const handler = makeOperationHandler(this, conn, cmd, finalOptions, (error, response) => { + this.decrementOperationCount(); + cb(error, response); + }); + conn.commandAsync(ns, cmd, finalOptions).then( + r => handler(undefined, r), + e => handler(e) ); }, callback @@ -532,6 +531,10 @@ function makeOperationHandler( return callback(new MongoUnexpectedServerResponseError('Empty response with no error')); } + if (error.name === 'AbortError' && error.cause instanceof MongoError) { + error = error.cause; + } + if (!(error instanceof MongoError)) { // Node.js or some other error we have not special handling for return callback(error); diff --git a/src/utils.ts b/src/utils.ts index 8bf4425c18b..20e2468e6a0 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1299,3 +1299,49 @@ export const COSMOS_DB_MSG = export function isHostMatch(match: RegExp, host?: string): boolean { return host && match.test(host.toLowerCase()) ? true : false; } + +/** + * Takes a promise and races it with a promise wrapping the abort event of the optionally provided signal. + * The given promise is _always_ ordered before the signal's abort promise. + * When given an already rejected promise and an already aborted signal, the promise's rejection takes precedence. + * + * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/race + * + * @param promise - A promise to discard if the signal aborts + * @param options - An options object carrying an optional signal + */ +export async function abortable( + promise: Promise, + { signal }: { signal?: AbortSignal } = {} +): Promise { + const { abort, done } = aborted(signal); + try { + return await Promise.race([promise, abort]); + } finally { + done.abort(); + } +} + +/** + * Takes an AbortSignal and creates a promise that will reject when the signal aborts + * If the argument provided is nullish the returned promise will **never** resolve. + * Also returns a done controller - abort the done controller when your task is done to remove the abort listeners + * @param signal - an optional abort signal to link to a promise rejection + */ +function aborted(signal?: AbortSignal): { + abort: Promise; + done: AbortController; +} { + const done = new AbortController(); + if (signal?.aborted) { + return { abort: Promise.reject(signal.reason), done }; + } + const abort = new Promise((_, reject) => + signal?.addEventListener('abort', () => reject(signal.reason), { + once: true, + // @ts-expect-error: @types/node erroneously claim this does not exist + signal: done.signal + }) + ); + return { abort, done }; +} diff --git a/test/integration/auth/auth.prose.test.ts b/test/integration/auth/auth.prose.test.ts index e9d0fc651ff..e1be56b63e8 100644 --- a/test/integration/auth/auth.prose.test.ts +++ b/test/integration/auth/auth.prose.test.ts @@ -1,7 +1,13 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; -import { Connection, LEGACY_HELLO_COMMAND, type MongoClient, ScramSHA256 } from '../../mongodb'; +import { + Connection, + LEGACY_HELLO_COMMAND, + ModernConnection, + type MongoClient, + ScramSHA256 +} from '../../mongodb'; function makeConnectionString(config, username, password) { return `mongodb://${username}:${password}@${config.host}:${config.port}/admin?`; @@ -289,7 +295,9 @@ describe('Authentication Spec Prose Tests', function () { }; client = this.configuration.newClient({}, options); - const commandSpy = sinon.spy(Connection.prototype, 'command'); + const connectionType = + process.env.MONGODB_NEW_CONNECTION === 'true' ? ModernConnection : Connection; + const commandSpy = sinon.spy(connectionType.prototype, 'command'); await client.connect(); const calls = commandSpy .getCalls() diff --git a/test/integration/node-specific/bson-options/utf8_validation.test.ts b/test/integration/node-specific/bson-options/utf8_validation.test.ts index 33715394d19..f8220a9fc6c 100644 --- a/test/integration/node-specific/bson-options/utf8_validation.test.ts +++ b/test/integration/node-specific/bson-options/utf8_validation.test.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; -import { Connection } from '../../../mongodb'; +import { OpMsgResponse } from '../../../mongodb'; const EXPECTED_VALIDATION_DISABLED_ARGUMENT = { utf8: false @@ -13,17 +13,17 @@ const EXPECTED_VALIDATION_ENABLED_ARGUMENT = { } }; -describe('class BinMsg', () => { - let onMessageSpy: sinon.SinonSpy; +describe('class OpMsgResponse', () => { + let bsonSpy: sinon.SinonSpy; beforeEach(() => { - onMessageSpy = sinon.spy(Connection.prototype, 'onMessage'); + bsonSpy = sinon.spy(OpMsgResponse.prototype, 'parseBsonSerializationOptions'); }); afterEach(() => { - onMessageSpy?.restore(); + bsonSpy?.restore(); // @ts-expect-error: Allow this to be garbage collected - onMessageSpy = null; + bsonSpy = null; }); let client; @@ -49,9 +49,8 @@ describe('class BinMsg', () => { passOptionTo === 'operation' ? option : {} ); - expect(onMessageSpy).to.have.been.called; - const binMsg = onMessageSpy.lastCall.firstArg; - const result = binMsg.parseBsonSerializationOptions(option); + expect(bsonSpy).to.have.been.called; + const result = bsonSpy.lastCall.returnValue; expect(result).to.deep.equal(EXPECTED_VALIDATION_DISABLED_ARGUMENT); }); } @@ -76,9 +75,8 @@ describe('class BinMsg', () => { passOptionTo === 'operation' ? option : {} ); - expect(onMessageSpy).to.have.been.called; - const binMsg = onMessageSpy.lastCall.firstArg; - const result = binMsg.parseBsonSerializationOptions(option); + expect(bsonSpy).to.have.been.called; + const result = bsonSpy.lastCall.returnValue; expect(result).to.deep.equal(EXPECTED_VALIDATION_ENABLED_ARGUMENT); }); } @@ -102,9 +100,8 @@ describe('class BinMsg', () => { passOptionTo === 'operation' ? option : {} ); - expect(onMessageSpy).to.have.been.called; - const binMsg = onMessageSpy.lastCall.firstArg; - const result = binMsg.parseBsonSerializationOptions(option); + expect(bsonSpy).to.have.been.called; + const result = bsonSpy.lastCall.returnValue; expect(result).to.deep.equal(EXPECTED_VALIDATION_ENABLED_ARGUMENT); }); } diff --git a/test/integration/sessions/sessions.prose.test.ts b/test/integration/sessions/sessions.prose.test.ts index 3fc41a5de43..5628d2493cc 100644 --- a/test/integration/sessions/sessions.prose.test.ts +++ b/test/integration/sessions/sessions.prose.test.ts @@ -110,7 +110,7 @@ describe('Sessions Prose Tests', () => { expect(events).to.have.lengthOf(operations.length); // This is a guarantee in node, unless you are performing a transaction (which is not being done in this test) - expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex')))).to.have.lengthOf(2); + expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex')))).to.have.lengthOf(1); }); }); diff --git a/test/integration/sessions/sessions.test.ts b/test/integration/sessions/sessions.test.ts index 3bb00e181cc..607ac5b2bfa 100644 --- a/test/integration/sessions/sessions.test.ts +++ b/test/integration/sessions/sessions.test.ts @@ -410,7 +410,7 @@ describe('Sessions Spec', function () { await utilClient?.close(); }); - it('should only use two sessions for many operations when maxPoolSize is 1', async () => { + it('should only use one session for many operations when maxPoolSize is 1', async () => { const documents = Array.from({ length: 50 }).map((_, idx) => ({ _id: idx })); const events: CommandStartedEvent[] = []; @@ -420,7 +420,7 @@ describe('Sessions Spec', function () { expect(allResults).to.have.lengthOf(documents.length); expect(events).to.have.lengthOf(documents.length); - expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex'))).size).to.equal(2); + expect(new Set(events.map(ev => ev.command.lsid.id.toString('hex'))).size).to.equal(1); }); }); diff --git a/test/integration/transactions/transactions.test.ts b/test/integration/transactions/transactions.test.ts index c05bead5367..141a7fc003f 100644 --- a/test/integration/transactions/transactions.test.ts +++ b/test/integration/transactions/transactions.test.ts @@ -263,79 +263,64 @@ describe('Transactions', function () { }); describe('TransientTransactionError', function () { + let client: MongoClient; + beforeEach(async function () { + client = this.configuration.newClient(); + }); + + afterEach(async function () { + await client.close(); + }); + it('should have a TransientTransactionError label inside of a transaction', { metadata: { requires: { topology: 'replicaset', mongodb: '>=4.0.0' } }, - test: function (done) { - const configuration = this.configuration; - const client = configuration.newClient({ w: 1 }); + test: async function () { + const session = client.startSession(); + const db = client.db(); - client.connect(err => { - expect(err).to.not.exist; + await db + .collection('transaction_error_test_2') + .drop() + .catch(() => null); + const coll = await db.createCollection('transaction_error_test_2'); - const session = client.startSession(); - const db = client.db(configuration.db); - db.collection('transaction_error_test_2').drop(() => { - db.createCollection('transaction_error_test_2', (err, coll) => { - expect(err).to.not.exist; - - session.startTransaction(); - coll.insertOne({ a: 1 }, { session }, err => { - expect(err).to.not.exist; - expect(session.inTransaction()).to.be.true; - - client.db('admin').command( - { - configureFailPoint: 'failCommand', - mode: { times: 1 }, - data: { failCommands: ['insert'], closeConnection: true } - }, - err => { - expect(err).to.not.exist; - expect(session.inTransaction()).to.be.true; - - coll.insertOne({ b: 2 }, { session }, err => { - expect(err).to.exist.and.to.be.an.instanceof(MongoNetworkError); - if (err instanceof MongoNetworkError) { - expect(err.hasErrorLabel('TransientTransactionError')).to.be.true; - } - - session.abortTransaction(() => session.endSession(() => client.close(done))); - }); - } - ); - }); - }); - }); + session.startTransaction(); + + await coll.insertOne({ a: 1 }, { session }); + + expect(session.inTransaction()).to.be.true; + + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { failCommands: ['insert'], closeConnection: true } }); + + expect(session.inTransaction()).to.be.true; + + const error = await coll.insertOne({ b: 2 }, { session }).catch(error => error); + expect(error).to.be.instanceOf(MongoNetworkError); + expect(error.hasErrorLabel('TransientTransactionError')).to.be.true; + + await session.abortTransaction(); + await session.endSession(); } }); it('should not have a TransientTransactionError label outside of a transaction', { metadata: { requires: { topology: 'replicaset', mongodb: '>=4.0.0' } }, - test: function (done) { - const configuration = this.configuration; - const client = configuration.newClient({ w: 1 }); + test: async function () { + const db = client.db(); + const coll = db.collection('test'); - client.connect(err => { - expect(err).to.not.exist; - const db = client.db(configuration.db); - const coll = db.collection('transaction_error_test1'); - - client.db('admin').command( - { - configureFailPoint: 'failCommand', - mode: { times: 2 }, - data: { failCommands: ['insert'], closeConnection: true } - }, - err => { - expect(err).to.not.exist; - coll.insertOne({ a: 1 }, err => { - expect(err).to.exist.and.to.be.an.instanceOf(MongoNetworkError); - client.close(done); - }); - } - ); + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: { times: 2 }, // fail 2 times for retry + data: { failCommands: ['insert'], closeConnection: true } }); + + const error = await coll.insertOne({ a: 1 }).catch(error => error); + expect(error).to.be.instanceOf(MongoNetworkError); } }); }); diff --git a/test/tools/runner/config.ts b/test/tools/runner/config.ts index b24a9e77d87..ac3d05605ac 100644 --- a/test/tools/runner/config.ts +++ b/test/tools/runner/config.ts @@ -6,6 +6,7 @@ import * as url from 'url'; import { type AuthMechanism, HostAddress, + ModernConnection, MongoClient, type ServerApi, TopologyType, @@ -241,6 +242,10 @@ export class TestConfiguration { throw new Error(`Cannot use options to specify host/port, must be in ${connectionString}`); } + if (process.env.MONGODB_NEW_CONNECTION === 'true') { + serverOptions.ConnectionType = ModernConnection; + } + return new MongoClient(connectionString, serverOptions); } diff --git a/test/tools/runner/hooks/configuration.js b/test/tools/runner/hooks/configuration.js index e9288e1b028..392d066d54b 100644 --- a/test/tools/runner/hooks/configuration.js +++ b/test/tools/runner/hooks/configuration.js @@ -176,7 +176,8 @@ const testConfigBeforeHook = async function () { ocsp: process.env.OCSP_TLS_SHOULD_SUCCEED != null && process.env.CA_FILE != null, socks5: MONGODB_URI.includes('proxyHost='), compressor: process.env.COMPRESSOR, - cryptSharedLibPath: process.env.CRYPT_SHARED_LIB_PATH + cryptSharedLibPath: process.env.CRYPT_SHARED_LIB_PATH, + newConnectionTesting: process.env.MONGODB_NEW_CONNECTION }; console.error(inspect(currentEnv, { colors: true })); diff --git a/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts b/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts index 43976b53f3c..06037279b74 100644 --- a/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts +++ b/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts @@ -358,7 +358,7 @@ async function executeSDAMTest(testData: SDAMTest) { }; expect( thrownError, - 'expected the error thrown to be one of MongoNetworkError, MongoNetworkTimeoutError or MongoServerError (referred to in the spec as an "Application Error")' + `expected the error thrown to be one of MongoNetworkError, MongoNetworkTimeoutError or MongoServerError (referred to in the spec as an "Application Error") got ${thrownError.name} ${thrownError.stack}` ).to.satisfy(isApplicationError); } } else if (phase.outcome != null && Object.keys(phase).length === 1) { @@ -413,18 +413,19 @@ function withConnectionStubImpl(appError) { generation: typeof appError.generation === 'number' ? appError.generation : connectionPool.generation, - command(ns, cmd, options, callback) { + async command(_ns, _cmd, _options) { if (appError.type === 'network') { - callback(new MongoNetworkError('test generated')); + throw new MongoNetworkError('test generated'); } else if (appError.type === 'timeout') { - callback( - new MongoNetworkTimeoutError('xxx timed out', { - beforeHandshake: appError.when === 'beforeHandshakeCompletes' - }) - ); + throw new MongoNetworkTimeoutError('xxx timed out', { + beforeHandshake: appError.when === 'beforeHandshakeCompletes' + }); } else { - callback(new MongoServerError(appError.response)); + throw new MongoServerError(appError.response); } + }, + async commandAsync(ns, cmd, options) { + return this.command(ns, cmd, options); } }; diff --git a/test/unit/cmap/modern_connection.test.ts b/test/unit/cmap/modern_connection.test.ts deleted file mode 100644 index c4405bcffbf..00000000000 --- a/test/unit/cmap/modern_connection.test.ts +++ /dev/null @@ -1,425 +0,0 @@ -import { expect } from 'chai'; -import * as sinon from 'sinon'; -import { EventEmitter } from 'stream'; -import { setTimeout } from 'timers/promises'; - -// eslint-disable-next-line @typescript-eslint/no-restricted-imports -import * as compression from '../../../src/cmap/wire_protocol/compression'; -import { - decompressResponse, - LEGACY_HELLO_COMMAND, - MongoDecompressionError, - MongoParseError, - OP_COMPRESSED, - OP_MSG, - OpCompressedRequest, - OpMsgRequest, - OpMsgResponse, - type OpQueryResponse, - read, - readMany, - writeCommand -} from '../../mongodb'; - -class MockSocket extends EventEmitter { - buffer: Buffer[] = []; - push(...args: Buffer[]) { - this.buffer.push(...args); - } -} - -class MockModernConnection { - socket = new MockSocket(); -} - -describe('writeCommand', () => { - context('when compression is disabled', () => { - it('pushes an uncompressed command into the socket buffer', async () => { - const command = new OpMsgRequest('db', { find: 1 }, { requestId: 1 }); - const connection = new MockModernConnection(); - const prom = writeCommand(connection as any, command, { - agreedCompressor: 'none' - }); - - connection.socket.emit('drain'); - await prom; - - const [buffer] = connection.socket.buffer; - expect(buffer).to.exist; - const opCode = buffer.readInt32LE(12); - - expect(opCode).to.equal(OP_MSG); - }); - }); - - context('when compression is enabled', () => { - context('when the command is compressible', () => { - it('pushes a compressed command into the socket buffer', async () => { - const command = new OpMsgRequest('db', { find: 1 }, { requestId: 1 }); - const connection = new MockModernConnection(); - const prom = writeCommand(connection as any, command, { - agreedCompressor: 'snappy' - }); - - connection.socket.emit('drain'); - await prom; - - const [buffer] = connection.socket.buffer; - expect(buffer).to.exist; - const opCode = buffer.readInt32LE(12); - - expect(opCode).to.equal(OP_COMPRESSED); - }); - }); - context('when the command is not compressible', () => { - it('pushes an uncompressed command into the socket buffer', async () => { - const command = new OpMsgRequest('db', { [LEGACY_HELLO_COMMAND]: 1 }, { requestId: 1 }); - const connection = new MockModernConnection(); - const prom = writeCommand(connection as any, command, { - agreedCompressor: 'snappy' - }); - - connection.socket.emit('drain'); - await prom; - - const [buffer] = connection.socket.buffer; - expect(buffer).to.exist; - const opCode = buffer.readInt32LE(12); - - expect(opCode).to.equal(OP_MSG); - }); - }); - }); - context('when a `drain` event is not emitted from the underlying socket', () => { - it('never resolves', async () => { - const connection = new MockModernConnection(); - const promise = writeCommand(connection, new OpMsgRequest('db', { ping: 1 }, {}), { - agreedCompressor: 'none' - }); - const result = await Promise.race([promise, setTimeout(1000, 'timeout', { ref: false })]); - expect(result).to.equal('timeout'); - }); - }); - - context('when a `drain` event is emitted from the underlying socket', () => { - it('resolves', async () => { - const connection = new MockModernConnection(); - const promise = writeCommand(connection, new OpMsgRequest('db', { ping: 1 }, {}), { - agreedCompressor: 'none' - }); - connection.socket.emit('drain'); - const result = await Promise.race([promise, setTimeout(5000, 'timeout', { ref: false })]); - expect(result).to.be.undefined; - }); - }); -}); - -describe('decompressResponse()', () => { - context('when the message is not compressed', () => { - let message: Buffer; - let response: OpMsgResponse | OpQueryResponse; - let spy; - beforeEach(async () => { - message = Buffer.concat(new OpMsgRequest('db', { find: 1 }, { requestId: 1 }).toBin()); - spy = sinon.spy(compression, 'decompress'); - - response = await decompressResponse(message); - }); - afterEach(() => sinon.restore()); - it('returns a wire protocol message', () => { - expect(response).to.be.instanceOf(OpMsgResponse); - }); - it('does not attempt decompression', () => { - expect(spy).not.to.have.been.called; - }); - }); - - context('when the message is compressed', () => { - let message: Buffer; - let response: OpMsgResponse | OpQueryResponse; - beforeEach(async () => { - const msg = new OpMsgRequest('db', { find: 1 }, { requestId: 1 }); - message = Buffer.concat( - await new OpCompressedRequest(msg, { - zlibCompressionLevel: 0, - agreedCompressor: 'snappy' - }).toBin() - ); - - response = await decompressResponse(message); - }); - - it('returns a wire protocol message', () => { - expect(response).to.be.instanceOf(OpMsgResponse); - }); - it('correctly decompresses the message', () => { - response.parse({}); - expect(response.documents[0]).to.deep.equal({ $db: 'db', find: 1 }); - }); - - context( - 'when the compressed message does not match the compression metadata in the header', - () => { - beforeEach(async () => { - const msg = new OpMsgRequest('db', { find: 1 }, { requestId: 1 }); - message = Buffer.concat( - await new OpCompressedRequest(msg, { - zlibCompressionLevel: 0, - agreedCompressor: 'snappy' - }).toBin() - ); - message.writeInt32LE( - 100, - 16 + 4 // message header size + offset to length - ); // write an invalid message length into the header - }); - it('throws a MongoDecompressionError', async () => { - const error = await decompressResponse(message).catch(e => e); - expect(error).to.be.instanceOf(MongoDecompressionError); - }); - } - ); - }); -}); - -describe('read()', () => { - let connection: MockModernConnection; - let message: Buffer; - - beforeEach(() => { - connection = new MockModernConnection(); - message = Buffer.concat(new OpMsgRequest('db', { ping: 1 }, { requestId: 1 }).toBin()); - }); - it('does not resolve if there are no data events', async () => { - const promise = read(connection); - const result = await Promise.race([promise, setTimeout(1000, 'timeout', { ref: false })]); - expect(result).to.equal('timeout'); - }); - - it('does not resolve until there is a complete message', async () => { - const promise = read(connection); - { - const result = await Promise.race([promise, setTimeout(1000, 'timeout', { ref: false })]); - expect(result, 'received data on empty socket').to.equal('timeout'); - } - - { - connection.socket.emit('data', message.slice(0, 10)); - const result = await Promise.race([promise, setTimeout(1000, 'timeout', { ref: false })]); - expect( - result, - 'received data when only part of message was emitted from the socket' - ).to.equal('timeout'); - } - - { - connection.socket.emit('data', message.slice(10)); - const result = await Promise.race([promise, setTimeout(1000, 'timeout', { ref: false })]); - expect(result, 'expected OpMsgResponse - got timeout instead').to.be.instanceOf( - OpMsgResponse - ); - } - }); - - it('removes all event listeners from the socket after a message is received', async () => { - const promise = read(connection); - - connection.socket.emit('data', message); - await promise; - - expect(connection.socket.listenerCount('data')).to.equal(0); - }); - - it('when `moreToCome` is set in the response, it only returns one message', async () => { - message = Buffer.concat( - new OpMsgRequest('db', { ping: 1 }, { requestId: 1, moreToCome: true }).toBin() - ); - - const promise = read(connection); - - connection.socket.emit('data', message); - await promise; - - expect(connection.socket.listenerCount('data')).to.equal(0); - }); - - context('when reading an invalid message', () => { - context('when the message < 0', () => { - it('throws a mongo parse error', async () => { - message.writeInt32LE(-1); - const promise = read(connection).catch(e => e); - - connection.socket.emit('data', message); - const error = await promise; - expect(error).to.be.instanceof(MongoParseError); - }); - }); - - context('when the message length > max bson message size', () => { - it('throws a mongo parse error', async () => { - message.writeInt32LE(1024 * 1024 * 16 * 4 + 1); - const promise = read(connection).catch(e => e); - - connection.socket.emit('data', message); - const error = await promise; - expect(error).to.be.instanceof(MongoParseError); - }); - }); - }); - - context('when compression is enabled', () => { - it('returns a decompressed message', async () => { - const message = Buffer.concat( - await new OpCompressedRequest( - new OpMsgRequest('db', { ping: 1 }, { requestId: 1, moreToCome: true }), - { zlibCompressionLevel: 0, agreedCompressor: 'snappy' } - ).toBin() - ); - - const promise = read(connection); - - connection.socket.emit('data', message); - const result = await promise; - - expect(result).to.be.instanceOf(OpMsgResponse); - }); - }); -}); - -describe('readMany()', () => { - let connection: MockModernConnection; - let message: Buffer; - - beforeEach(() => { - connection = new MockModernConnection(); - message = Buffer.concat(new OpMsgRequest('db', { ping: 1 }, { requestId: 1 }).toBin()); - }); - it('does not resolve if there are no data events', async () => { - const generator = readMany(connection); - const result = await Promise.race([ - generator.next(), - setTimeout(1000, 'timeout', { ref: false }) - ]); - expect(result).to.equal('timeout'); - }); - - it('does not resolve until there is a complete message', async () => { - const generator = readMany(connection); - const promise = generator.next(); - { - const result = await Promise.race([promise, setTimeout(1000, 'timeout', { ref: false })]); - expect(result, 'received data on empty socket').to.equal('timeout'); - } - - { - connection.socket.emit('data', message.slice(0, 10)); - const result = await Promise.race([promise, setTimeout(1000, 'timeout', { ref: false })]); - expect( - result, - 'received data when only part of message was emitted from the socket' - ).to.equal('timeout'); - } - - { - connection.socket.emit('data', message.slice(10)); - const result = await Promise.race([promise, setTimeout(1000, 'timeout', { ref: false })]); - expect(result.value, 'expected OpMsgResponse - got timeout instead').to.be.instanceOf( - OpMsgResponse - ); - } - }); - - it('when moreToCome is set, it does not remove `data` listeners after receiving a message', async () => { - const generator = readMany(connection); - const promise = generator.next(); - message = Buffer.concat( - new OpMsgRequest('db', { ping: 1 }, { requestId: 1, moreToCome: true }).toBin() - ); - connection.socket.emit('data', message); - - const { value: response } = await promise; - - expect(response).to.be.instanceOf(OpMsgResponse); - expect(connection.socket.listenerCount('data')).to.equal(1); - }); - - it('returns messages until `moreToCome` is false', async () => { - const generator = readMany(connection); - - for ( - let i = 0, - message = Buffer.concat( - new OpMsgRequest('db', { ping: 1 }, { requestId: 1, moreToCome: true }).toBin() - ); - i < 3; - ++i - ) { - const promise = generator.next(); - connection.socket.emit('data', message); - const { value: response } = await promise; - expect(response, `response ${i} was not OpMsgResponse`).to.be.instanceOf(OpMsgResponse); - expect( - connection.socket.listenerCount('data'), - `listener count for ${i} was non-zero` - ).to.equal(1); - } - - const message = Buffer.concat( - new OpMsgRequest('db', { ping: 1 }, { requestId: 1, moreToCome: false }).toBin() - ); - const promise = generator.next(); - connection.socket.emit('data', message); - const { value: response } = await promise; - expect(response, `response was not OpMsgResponse`).to.be.instanceOf(OpMsgResponse); - expect(connection.socket.listenerCount('data')).to.equal(1); - - await generator.next(); - expect(connection.socket.listenerCount('data')).to.equal(0); - }); - - context('when reading an invalid message', () => { - context('when the message < 0', () => { - it('throws a mongo parse error', async () => { - message.writeInt32LE(-1); - const promise = readMany(connection) - .next() - .catch(e => e); - - connection.socket.emit('data', message); - const error = await promise; - expect(error).to.be.instanceof(MongoParseError); - }); - }); - - context('when the message length > max bson message size', () => { - it('throws a mongo parse error', async () => { - message.writeInt32LE(1024 * 1024 * 16 * 4 + 1); - const promise = readMany(connection) - .next() - .catch(e => e); - - connection.socket.emit('data', message); - const error = await promise; - expect(error).to.be.instanceof(MongoParseError); - }); - }); - }); - - context('when compression is enabled', () => { - it('returns a decompressed message', async () => { - const message = Buffer.concat( - await new OpCompressedRequest(new OpMsgRequest('db', { ping: 1 }, { requestId: 1 }), { - zlibCompressionLevel: 0, - agreedCompressor: 'snappy' - }).toBin() - ); - - const generator = readMany(connection); - const promise = generator.next(); - connection.socket.emit('data', message); - const { value: response } = await promise; - - expect(response).to.be.instanceOf(OpMsgResponse); - }); - }); -}); diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index 9614cd9c923..980daf498a6 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -1,7 +1,9 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; +import { setTimeout } from 'timers'; import { + abortable, BufferPool, ByteUtils, compareObjectId, @@ -20,6 +22,7 @@ import { shuffle, TimeoutController } from '../mongodb'; +import { sleep } from '../tools/utils'; import { createTimerSandbox } from './timer_sandbox'; describe('driver utils', function () { @@ -1165,4 +1168,148 @@ describe('driver utils', function () { }); }); }); + + describe('abortable()', () => { + const goodError = new Error('good error'); + const badError = new Error('unexpected bad error!'); + const expectedValue = "don't panic"; + + context('when not given a signal', () => { + it('returns promise fulfillment if the promise resolves or rejects', async () => { + expect(await abortable(Promise.resolve(expectedValue))).to.equal(expectedValue); + expect(await abortable(Promise.reject(goodError)).catch(e => e)).to.equal(goodError); + }); + + it('pends indefinitely if the promise is never settled', async () => { + const forever = abortable(new Promise(() => null)); + // Assume 100ms is good enough to prove "forever" + expect(await Promise.race([forever, sleep(100).then(() => expectedValue)])).to.equal( + expectedValue + ); + }); + }); + + context('always removes the abort listener it attaches', () => { + let controller; + let removeEventListenerSpy; + let addEventListenerSpy; + + beforeEach(() => { + controller = new AbortController(); + addEventListenerSpy = sinon.spy(controller.signal, 'addEventListener'); + removeEventListenerSpy = sinon.spy(controller.signal, 'removeEventListener'); + }); + + afterEach(() => sinon.restore()); + + const expectListenerCleanup = () => { + expect(addEventListenerSpy).to.have.been.calledOnce; + expect(removeEventListenerSpy).to.have.been.calledOnce; + }; + + it('when promise rejects', async () => { + await abortable(Promise.reject(goodError), { signal: controller.signal }).catch(e => e); + expectListenerCleanup(); + }); + + it('when promise resolves', async () => { + await abortable(Promise.resolve(expectedValue), { signal: controller.signal }); + expectListenerCleanup(); + }); + + it('when signal aborts', async () => { + setTimeout(() => controller.abort(goodError)); + await abortable(new Promise(() => null), { signal: controller.signal }).catch(e => e); + expectListenerCleanup(); + }); + }); + + context('when given already rejected promise with already aborted signal', () => { + it('returns promise rejection', async () => { + const controller = new AbortController(); + const { signal } = controller; + controller.abort(badError); + const result = await abortable(Promise.reject(goodError), { signal }).catch(e => e); + expect(result).to.deep.equal(goodError); + }); + }); + + context('when given already resolved promise with already aborted signal', () => { + it('returns promise resolution', async () => { + const controller = new AbortController(); + const { signal } = controller; + controller.abort(badError); + const result = await abortable(Promise.resolve(expectedValue), { signal }).catch(e => e); + expect(result).to.deep.equal(expectedValue); + }); + }); + + context('when given already rejected promise with not yet aborted signal', () => { + it('returns promise rejection', async () => { + const controller = new AbortController(); + const { signal } = controller; + const result = await abortable(Promise.reject(goodError), { signal }).catch(e => e); + expect(result).to.deep.equal(goodError); + }); + }); + + context('when given already resolved promise with not yet aborted signal', () => { + it('returns promise resolution', async () => { + const controller = new AbortController(); + const { signal } = controller; + const result = await abortable(Promise.resolve(expectedValue), { signal }).catch(e => e); + expect(result).to.deep.equal(expectedValue); + }); + }); + + context('when given unresolved promise with an already aborted signal', () => { + it('returns signal reason', async () => { + const controller = new AbortController(); + const { signal } = controller; + controller.abort(goodError); + const result = await abortable(new Promise(() => null), { signal }).catch(e => e); + expect(result).to.deep.equal(goodError); + }); + }); + + context('when given eventually rejecting promise with not yet aborted signal', () => { + const eventuallyReject = async () => { + await sleep(1); + throw goodError; + }; + + it('returns promise rejection', async () => { + const controller = new AbortController(); + const { signal } = controller; + const result = await abortable(eventuallyReject(), { signal }).catch(e => e); + expect(result).to.deep.equal(goodError); + }); + }); + + context('when given eventually resolving promise with not yet aborted signal', () => { + const eventuallyResolve = async () => { + await sleep(1); + return expectedValue; + }; + + it('returns promise resolution', async () => { + const controller = new AbortController(); + const { signal } = controller; + const result = await abortable(eventuallyResolve(), { signal }).catch(e => e); + expect(result).to.deep.equal(expectedValue); + }); + }); + + context('when given unresolved promise with eventually aborted signal', () => { + it('returns signal reason', async () => { + const controller = new AbortController(); + const { signal } = controller; + + setTimeout(() => controller.abort(goodError), 1); + + const result = await abortable(new Promise(() => null), { signal }).catch(e => e); + expect(result).to.deep.equal(goodError); + }); + }); + }); });