From 96d14f62018d3ab11de17d3c0f5516904a977e6b Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 13 Dec 2023 14:36:49 -0500 Subject: [PATCH 01/15] chore(NODE-5771): benchmark new connection --- modern.cjs | 42 ++++++++++++++++++++++++++++++++++++++++++ old.cjs | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+) create mode 100644 modern.cjs create mode 100644 old.cjs diff --git a/modern.cjs b/modern.cjs new file mode 100644 index 00000000000..2f6c9b84ac0 --- /dev/null +++ b/modern.cjs @@ -0,0 +1,42 @@ +const totalStart = performance.now(); +/* eslint-disable no-console */ +/* eslint-disable @typescript-eslint/no-var-requires */ +const process = require('node:process'); +const { MongoClient } = require('./lib/index.js'); +const { ModernConnection } = require('./lib/cmap/connection.js'); + +const tweet = require('./test/benchmarks/driverBench/spec/single_and_multi_document/tweet.json'); + +const client = new MongoClient(process.env.MONGODB_URI, { connectionType: ModernConnection }); + +async function main() { + console.log('modern connection'); + + const db = client.db('test'); + let collection = db.collection('test'); + await collection.drop().catch(() => null); + collection = await db.createCollection('test'); + await collection.insertOne(tweet); + + const total = 10_000; + + for (let i = 0; i < total; i++) { + await collection.findOne(); + } + + const start = performance.now() - totalStart; + for (let i = 0; i < total; i++) { + await collection.findOne(); + } + const end = performance.now() - totalStart; + + console.log( + `end - start = ms time for 10k findOne calls (script boot: ${totalStart.toFixed(3)})` + ); + console.log(`${end.toFixed(3)} - ${start.toFixed(3)} = ${(end - start).toFixed(4)}`); + console.log(`avg findOne: ${((end - start) / total).toFixed(3)} ms`); + + await client.close(); +} + +main().catch(console.error); diff --git a/old.cjs b/old.cjs new file mode 100644 index 00000000000..3dcc7914e69 --- /dev/null +++ b/old.cjs @@ -0,0 +1,42 @@ +const totalStart = performance.now(); +/* eslint-disable no-console */ +/* eslint-disable @typescript-eslint/no-var-requires */ +const process = require('node:process'); +const { MongoClient } = require('./lib/index.js'); +const { Connection } = require('./lib/cmap/connection.js'); + +const tweet = require('./test/benchmarks/driverBench/spec/single_and_multi_document/tweet.json'); + +const client = new MongoClient(process.env.MONGODB_URI, { connectionType: Connection }); + +async function main() { + console.log('old connection'); + + const db = client.db('test'); + let collection = db.collection('test'); + await collection.drop().catch(() => null); + collection = await db.createCollection('test'); + await collection.insertOne(tweet); + + const total = 10_000; + + for (let i = 0; i < total; i++) { + await collection.findOne(); + } + + const start = performance.now() - totalStart; + for (let i = 0; i < total; i++) { + await collection.findOne(); + } + const end = performance.now() - totalStart; + + console.log( + `end - start = ms time for 10k findOne calls (script boot: ${totalStart.toFixed(3)})` + ); + console.log(`${end.toFixed(3)} - ${start.toFixed(3)} = ${(end - start).toFixed(4)}`); + console.log(`avg findOne: ${((end - start) / total).toFixed(3)} ms`); + + await client.close(); +} + +main().catch(console.error); From 7b60df81dc242652c64a8abfb3520e69094f0a8f Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 15 Dec 2023 11:34:43 -0500 Subject: [PATCH 02/15] chore: clean up test files --- modern.cjs | 42 ------------------------------------------ old.cjs | 42 ------------------------------------------ 2 files changed, 84 deletions(-) delete mode 100644 modern.cjs delete mode 100644 old.cjs diff --git a/modern.cjs b/modern.cjs deleted file mode 100644 index 2f6c9b84ac0..00000000000 --- a/modern.cjs +++ /dev/null @@ -1,42 +0,0 @@ -const totalStart = performance.now(); -/* eslint-disable no-console */ -/* eslint-disable @typescript-eslint/no-var-requires */ -const process = require('node:process'); -const { MongoClient } = require('./lib/index.js'); -const { ModernConnection } = require('./lib/cmap/connection.js'); - -const tweet = require('./test/benchmarks/driverBench/spec/single_and_multi_document/tweet.json'); - -const client = new MongoClient(process.env.MONGODB_URI, { connectionType: ModernConnection }); - -async function main() { - console.log('modern connection'); - - const db = client.db('test'); - let collection = db.collection('test'); - await collection.drop().catch(() => null); - collection = await db.createCollection('test'); - await collection.insertOne(tweet); - - const total = 10_000; - - for (let i = 0; i < total; i++) { - await collection.findOne(); - } - - const start = performance.now() - totalStart; - for (let i = 0; i < total; i++) { - await collection.findOne(); - } - const end = performance.now() - totalStart; - - console.log( - `end - start = ms time for 10k findOne calls (script boot: ${totalStart.toFixed(3)})` - ); - console.log(`${end.toFixed(3)} - ${start.toFixed(3)} = ${(end - start).toFixed(4)}`); - console.log(`avg findOne: ${((end - start) / total).toFixed(3)} ms`); - - await client.close(); -} - -main().catch(console.error); diff --git a/old.cjs b/old.cjs deleted file mode 100644 index 3dcc7914e69..00000000000 --- a/old.cjs +++ /dev/null @@ -1,42 +0,0 @@ -const totalStart = performance.now(); -/* eslint-disable no-console */ -/* eslint-disable @typescript-eslint/no-var-requires */ -const process = require('node:process'); -const { MongoClient } = require('./lib/index.js'); -const { Connection } = require('./lib/cmap/connection.js'); - -const tweet = require('./test/benchmarks/driverBench/spec/single_and_multi_document/tweet.json'); - -const client = new MongoClient(process.env.MONGODB_URI, { connectionType: Connection }); - -async function main() { - console.log('old connection'); - - const db = client.db('test'); - let collection = db.collection('test'); - await collection.drop().catch(() => null); - collection = await db.createCollection('test'); - await collection.insertOne(tweet); - - const total = 10_000; - - for (let i = 0; i < total; i++) { - await collection.findOne(); - } - - const start = performance.now() - totalStart; - for (let i = 0; i < total; i++) { - await collection.findOne(); - } - const end = performance.now() - totalStart; - - console.log( - `end - start = ms time for 10k findOne calls (script boot: ${totalStart.toFixed(3)})` - ); - console.log(`${end.toFixed(3)} - ${start.toFixed(3)} = ${(end - start).toFixed(4)}`); - console.log(`avg findOne: ${((end - start) / total).toFixed(3)} ms`); - - await client.close(); -} - -main().catch(console.error); From fca03255bccd9812e6cf5d1b0a057eda17f60b36 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 15 Dec 2023 13:50:23 -0500 Subject: [PATCH 03/15] chore(NODE-5743): remove old connection class --- .evergreen/config.in.yml | 2 - .evergreen/config.yml | 3 - .evergreen/generate_evergreen_tasks.js | 1 - .evergreen/run-tests.sh | 1 - src/cmap/connection.ts | 724 ++---------------- src/cmap/message_stream.ts | 220 ------ src/cmap/wire_protocol/compression.ts | 3 +- src/index.ts | 7 - src/sdam/monitor.ts | 4 - test/integration/auth/auth.prose.test.ts | 12 +- .../mongodb-handshake.test.ts | 36 +- .../node-specific/mongo_client.test.ts | 10 +- test/mongodb.ts | 1 - test/tools/runner/config.ts | 4 - test/tools/runner/hooks/configuration.js | 3 +- 15 files changed, 98 insertions(+), 933 deletions(-) delete mode 100644 src/cmap/message_stream.ts diff --git a/.evergreen/config.in.yml b/.evergreen/config.in.yml index e6615d8bfcd..6ac4ff68f5d 100644 --- a/.evergreen/config.in.yml +++ b/.evergreen/config.in.yml @@ -200,8 +200,6 @@ functions: type: test params: working_dir: "src" - env: - MONGODB_NEW_CONNECTION: ${MONGODB_NEW_CONNECTION|false} timeout_secs: 300 shell: bash script: | diff --git a/.evergreen/config.yml b/.evergreen/config.yml index e811caa5a16..5d746ed7641 100644 --- a/.evergreen/config.yml +++ b/.evergreen/config.yml @@ -170,8 +170,6 @@ functions: type: test params: working_dir: src - env: - MONGODB_NEW_CONNECTION: ${MONGODB_NEW_CONNECTION|false} timeout_secs: 300 shell: bash script: | @@ -4432,7 +4430,6 @@ buildvariants: expansions: NODE_LTS_VERSION: 20 CLIENT_ENCRYPTION: true - MONGODB_NEW_CONNECTION: true tasks: - test-latest-server - test-latest-replica_set diff --git a/.evergreen/generate_evergreen_tasks.js b/.evergreen/generate_evergreen_tasks.js index c9b9f4b8037..36609de7fcf 100644 --- a/.evergreen/generate_evergreen_tasks.js +++ b/.evergreen/generate_evergreen_tasks.js @@ -745,7 +745,6 @@ BUILD_VARIANTS.push({ expansions: { NODE_LTS_VERSION: LATEST_LTS, CLIENT_ENCRYPTION: true, - MONGODB_NEW_CONNECTION: true }, tasks: BASE_TASKS.map(({ name }) => name) }); diff --git a/.evergreen/run-tests.sh b/.evergreen/run-tests.sh index 170547a6340..080857cd381 100755 --- a/.evergreen/run-tests.sh +++ b/.evergreen/run-tests.sh @@ -63,5 +63,4 @@ export MONGODB_URI=${MONGODB_URI} export LOAD_BALANCER=${LOAD_BALANCER} export TEST_CSFLE=${TEST_CSFLE} export COMPRESSOR=${COMPRESSOR} -export MONGODB_NEW_CONNECTION=${MONGODB_NEW_CONNECTION} npm run "${TEST_NPM_SCRIPT}" diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 510e4e82093..fe2482b1e59 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -1,5 +1,5 @@ import { type Readable, Transform, type TransformCallback } from 'stream'; -import { clearTimeout, setTimeout } from 'timers'; +import { setTimeout } from 'timers'; import { promisify } from 'util'; import type { BSONSerializeOptions, Document, ObjectId } from '../bson'; @@ -20,7 +20,6 @@ import { MongoNetworkError, MongoNetworkTimeoutError, MongoParseError, - MongoRuntimeError, MongoServerError, MongoUnexpectedServerResponseError, MongoWriteConcernError @@ -58,18 +57,11 @@ import { } from './commands'; import type { Stream } from './connect'; import type { ClientMetadata } from './handshake/client_metadata'; -import { MessageStream, type OperationDescription } from './message_stream'; import { StreamDescription, type StreamDescriptionOptions } from './stream_description'; -import { decompressResponse } from './wire_protocol/compression'; +import { type CompressorName, decompressResponse } from './wire_protocol/compression'; import { onData } from './wire_protocol/on_data'; import { getReadPreference, isSharded } from './wire_protocol/shared'; -/** @internal */ -const kStream = Symbol('stream'); -/** @internal */ -const kQueue = Symbol('queue'); -/** @internal */ -const kMessageStream = Symbol('messageStream'); /** @internal */ const kGeneration = Symbol('generation'); /** @internal */ @@ -82,10 +74,6 @@ const kDescription = Symbol('description'); const kHello = Symbol('hello'); /** @internal */ const kAutoEncrypter = Symbol('autoEncrypter'); -/** @internal */ -const kDelayedTimeoutId = Symbol('delayedTimeoutId'); - -const INVALID_QUEUE_SIZE = 'Connection internal queue contains more than 1 operation description'; /** @internal */ export interface CommandOptions extends BSONSerializeOptions { @@ -132,7 +120,7 @@ export interface ConnectionOptions serverApi?: ServerApi; monitorCommands: boolean; /** @internal */ - connectionType?: typeof Connection; + connectionType?: any; credentials?: MongoCredentials; connectTimeoutMS?: number; tls: boolean; @@ -160,492 +148,6 @@ export type ConnectionEvents = { unpinned(pinType: string): void; }; -/** @internal */ -export class Connection extends TypedEventEmitter { - id: number | ''; - address: string; - socketTimeoutMS: number; - monitorCommands: boolean; - /** Indicates that the connection (including underlying TCP socket) has been closed. */ - closed: boolean; - lastHelloMS?: number; - serverApi?: ServerApi; - helloOk?: boolean; - /** @internal */ - authContext?: AuthContext; - - /**@internal */ - [kDelayedTimeoutId]: NodeJS.Timeout | null; - /** @internal */ - [kDescription]: StreamDescription; - /** @internal */ - [kGeneration]: number; - /** @internal */ - [kLastUseTime]: number; - /** @internal */ - [kQueue]: Map; - /** @internal */ - [kMessageStream]: MessageStream; - /** @internal */ - [kStream]: Stream; - /** @internal */ - [kHello]: Document | null; - /** @internal */ - [kClusterTime]: Document | null; - - /** @event */ - static readonly COMMAND_STARTED = COMMAND_STARTED; - /** @event */ - static readonly COMMAND_SUCCEEDED = COMMAND_SUCCEEDED; - /** @event */ - static readonly COMMAND_FAILED = COMMAND_FAILED; - /** @event */ - static readonly CLUSTER_TIME_RECEIVED = CLUSTER_TIME_RECEIVED; - /** @event */ - static readonly CLOSE = CLOSE; - /** @event */ - static readonly MESSAGE = MESSAGE; - /** @event */ - static readonly PINNED = PINNED; - /** @event */ - static readonly UNPINNED = UNPINNED; - - constructor(stream: Stream, options: ConnectionOptions) { - super(); - - this.id = options.id; - this.address = streamIdentifier(stream, options); - this.socketTimeoutMS = options.socketTimeoutMS ?? 0; - this.monitorCommands = options.monitorCommands; - this.serverApi = options.serverApi; - this.closed = false; - this[kHello] = null; - this[kClusterTime] = null; - - this[kDescription] = new StreamDescription(this.address, options); - this[kGeneration] = options.generation; - this[kLastUseTime] = now(); - - // setup parser stream and message handling - this[kQueue] = new Map(); - this[kMessageStream] = new MessageStream({ - ...options, - maxBsonMessageSize: this.hello?.maxBsonMessageSize - }); - this[kStream] = stream; - - this[kDelayedTimeoutId] = null; - - this[kMessageStream].on('message', message => this.onMessage(message)); - this[kMessageStream].on('error', error => this.onError(error)); - this[kStream].on('close', () => this.onClose()); - this[kStream].on('timeout', () => this.onTimeout()); - this[kStream].on('error', () => { - /* ignore errors, listen to `close` instead */ - }); - - // hook the message stream up to the passed in stream - this[kStream].pipe(this[kMessageStream]); - this[kMessageStream].pipe(this[kStream]); - } - - // This whole class is temporary, - // Need to move this to be defined on the prototype for spying. - async commandAsync(ns: MongoDBNamespace, cmd: Document, opt?: CommandOptions) { - return promisify(this.command.bind(this))(ns, cmd, opt); - } - - get description(): StreamDescription { - return this[kDescription]; - } - - get hello(): Document | null { - return this[kHello]; - } - - // the `connect` method stores the result of the handshake hello on the connection - set hello(response: Document | null) { - this[kDescription].receiveResponse(response); - this[kDescription] = Object.freeze(this[kDescription]); - - // TODO: remove this, and only use the `StreamDescription` in the future - this[kHello] = response; - } - - // Set the whether the message stream is for a monitoring connection. - set isMonitoringConnection(value: boolean) { - this[kMessageStream].isMonitoringConnection = value; - } - - get isMonitoringConnection(): boolean { - return this[kMessageStream].isMonitoringConnection; - } - - get serviceId(): ObjectId | undefined { - return this.hello?.serviceId; - } - - get loadBalanced(): boolean { - return this.description.loadBalanced; - } - - get generation(): number { - return this[kGeneration] || 0; - } - - set generation(generation: number) { - this[kGeneration] = generation; - } - - get idleTime(): number { - return calculateDurationInMs(this[kLastUseTime]); - } - - get clusterTime(): Document | null { - return this[kClusterTime]; - } - - get stream(): Stream { - return this[kStream]; - } - - markAvailable(): void { - this[kLastUseTime] = now(); - } - - onError(error: Error) { - this.cleanup(true, error); - } - - onClose() { - const message = `connection ${this.id} to ${this.address} closed`; - this.cleanup(true, new MongoNetworkError(message)); - } - - onTimeout() { - this[kDelayedTimeoutId] = setTimeout(() => { - const message = `connection ${this.id} to ${this.address} timed out`; - const beforeHandshake = this.hello == null; - this.cleanup(true, new MongoNetworkTimeoutError(message, { beforeHandshake })); - }, 1).unref(); // No need for this timer to hold the event loop open - } - - onMessage(message: OpMsgResponse | OpQueryResponse) { - const delayedTimeoutId = this[kDelayedTimeoutId]; - if (delayedTimeoutId != null) { - clearTimeout(delayedTimeoutId); - this[kDelayedTimeoutId] = null; - } - - const socketTimeoutMS = this[kStream].timeout ?? 0; - this[kStream].setTimeout(0); - - // always emit the message, in case we are streaming - this.emit('message', message); - let operationDescription = this[kQueue].get(message.responseTo); - - if (!operationDescription && this.isMonitoringConnection) { - // This is how we recover when the initial hello's requestId is not - // the responseTo when hello responses have been skipped: - - // First check if the map is of invalid size - if (this[kQueue].size > 1) { - this.cleanup(true, new MongoRuntimeError(INVALID_QUEUE_SIZE)); - } else { - // Get the first orphaned operation description. - const entry = this[kQueue].entries().next(); - if (entry.value != null) { - const [requestId, orphaned]: [number, OperationDescription] = entry.value; - // If the orphaned operation description exists then set it. - operationDescription = orphaned; - // Remove the entry with the bad request id from the queue. - this[kQueue].delete(requestId); - } - } - } - - if (!operationDescription) { - return; - } - - const callback = operationDescription.cb; - - // SERVER-45775: For exhaust responses we should be able to use the same requestId to - // track response, however the server currently synthetically produces remote requests - // making the `responseTo` change on each response - this[kQueue].delete(message.responseTo); - if ('moreToCome' in message && message.moreToCome) { - // If the operation description check above does find an orphaned - // description and sets the operationDescription then this line will put one - // back in the queue with the correct requestId and will resolve not being able - // to find the next one via the responseTo of the next streaming hello. - this[kQueue].set(message.requestId, operationDescription); - this[kStream].setTimeout(socketTimeoutMS); - } - - try { - // Pass in the entire description because it has BSON parsing options - message.parse(operationDescription); - } catch (err) { - // If this error is generated by our own code, it will already have the correct class applied - // if it is not, then it is coming from a catastrophic data parse failure or the BSON library - // in either case, it should not be wrapped - callback(err); - return; - } - - if (message.documents[0]) { - const document: Document = message.documents[0]; - const session = operationDescription.session; - if (session) { - updateSessionFromResponse(session, document); - } - - if (document.$clusterTime) { - this[kClusterTime] = document.$clusterTime; - this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime); - } - - if (document.writeConcernError) { - callback(new MongoWriteConcernError(document.writeConcernError, document), document); - return; - } - - if (document.ok === 0 || document.$err || document.errmsg || document.code) { - callback(new MongoServerError(document)); - return; - } - } - - callback(undefined, message.documents[0]); - } - - destroy(options: DestroyOptions, callback?: Callback): void { - if (this.closed) { - process.nextTick(() => callback?.()); - return; - } - if (typeof callback === 'function') { - this.once('close', () => process.nextTick(() => callback())); - } - - // load balanced mode requires that these listeners remain on the connection - // after cleanup on timeouts, errors or close so we remove them before calling - // cleanup. - this.removeAllListeners(Connection.PINNED); - this.removeAllListeners(Connection.UNPINNED); - const message = `connection ${this.id} to ${this.address} closed`; - this.cleanup(options.force, new MongoNetworkError(message)); - } - - /** - * A method that cleans up the connection. When `force` is true, this method - * forcibly destroys the socket. - * - * If an error is provided, any in-flight operations will be closed with the error. - * - * This method does nothing if the connection is already closed. - */ - private cleanup(force: boolean, error?: Error): void { - if (this.closed) { - return; - } - - this.closed = true; - - const completeCleanup = () => { - for (const op of this[kQueue].values()) { - op.cb(error); - } - - this[kQueue].clear(); - - this.emit(Connection.CLOSE); - }; - - this[kStream].removeAllListeners(); - this[kMessageStream].removeAllListeners(); - - this[kMessageStream].destroy(); - - if (force) { - this[kStream].destroy(); - completeCleanup(); - return; - } - - if (!this[kStream].writableEnded) { - this[kStream].end(() => { - this[kStream].destroy(); - completeCleanup(); - }); - } else { - completeCleanup(); - } - } - - command( - ns: MongoDBNamespace, - command: Document, - options: CommandOptions | undefined, - callback: Callback - ): void { - let cmd = { ...command }; - - const readPreference = getReadPreference(options); - const shouldUseOpMsg = supportsOpMsg(this); - const session = options?.session; - - let clusterTime = this.clusterTime; - - if (this.serverApi) { - const { version, strict, deprecationErrors } = this.serverApi; - cmd.apiVersion = version; - if (strict != null) cmd.apiStrict = strict; - if (deprecationErrors != null) cmd.apiDeprecationErrors = deprecationErrors; - } - - if (hasSessionSupport(this) && session) { - if ( - session.clusterTime && - clusterTime && - session.clusterTime.clusterTime.greaterThan(clusterTime.clusterTime) - ) { - clusterTime = session.clusterTime; - } - - const err = applySession(session, cmd, options); - if (err) { - return callback(err); - } - } else if (session?.explicit) { - return callback(new MongoCompatibilityError('Current topology does not support sessions')); - } - - // if we have a known cluster time, gossip it - if (clusterTime) { - cmd.$clusterTime = clusterTime; - } - - if (isSharded(this) && !shouldUseOpMsg && readPreference && readPreference.mode !== 'primary') { - cmd = { - $query: cmd, - $readPreference: readPreference.toJSON() - }; - } - - const commandOptions: Document = Object.assign( - { - numberToSkip: 0, - numberToReturn: -1, - checkKeys: false, - // This value is not overridable - secondaryOk: readPreference.secondaryOk() - }, - options - ); - - const message = shouldUseOpMsg - ? new OpMsgRequest(ns.db, cmd, commandOptions) - : new OpQueryRequest(ns.db, cmd, commandOptions); - - try { - write(this, message, commandOptions, callback); - } catch (err) { - callback(err); - } - } - - exhaustCommand( - ns: MongoDBNamespace, - command: Document, - options: CommandOptions | undefined, - replyListener: Callback - ) { - return this.command(ns, command, options, replyListener); - } -} - -/** @internal */ -export class CryptoConnection extends Connection { - /** @internal */ - [kAutoEncrypter]?: AutoEncrypter; - - constructor(stream: Stream, options: ConnectionOptions) { - super(stream, options); - this[kAutoEncrypter] = options.autoEncrypter; - } - - /** @internal @override */ - override command( - ns: MongoDBNamespace, - cmd: Document, - options: CommandOptions, - callback: Callback - ): void { - const autoEncrypter = this[kAutoEncrypter]; - if (!autoEncrypter) { - return callback(new MongoMissingDependencyError('No AutoEncrypter available for encryption')); - } - - const serverWireVersion = maxWireVersion(this); - if (serverWireVersion === 0) { - // This means the initial handshake hasn't happened yet - return super.command(ns, cmd, options, callback); - } - - if (serverWireVersion < 8) { - callback( - new MongoCompatibilityError('Auto-encryption requires a minimum MongoDB version of 4.2') - ); - return; - } - - // Save sort or indexKeys based on the command being run - // the encrypt API serializes our JS objects to BSON to pass to the native code layer - // and then deserializes the encrypted result, the protocol level components - // of the command (ex. sort) are then converted to JS objects potentially losing - // import key order information. These fields are never encrypted so we can save the values - // from before the encryption and replace them after encryption has been performed - const sort: Map | null = cmd.find || cmd.findAndModify ? cmd.sort : null; - const indexKeys: Map[] | null = cmd.createIndexes - ? cmd.indexes.map((index: { key: Map }) => index.key) - : null; - - autoEncrypter.encrypt(ns.toString(), cmd, options).then( - encrypted => { - // Replace the saved values - if (sort != null && (cmd.find || cmd.findAndModify)) { - encrypted.sort = sort; - } - if (indexKeys != null && cmd.createIndexes) { - for (const [offset, index] of indexKeys.entries()) { - // @ts-expect-error `encrypted` is a generic "command", but we've narrowed for only `createIndexes` commands here - encrypted.indexes[offset].key = index; - } - } - - super.command(ns, encrypted, options, (err, response) => { - if (err || response == null) { - callback(err, response); - return; - } - - autoEncrypter.decrypt(response, options).then( - res => callback(undefined, res), - err => callback(err) - ); - }); - }, - err => { - if (err) { - callback(err, null); - } - } - ); - } -} - /** @internal */ export function hasSessionSupport(conn: Connection): boolean { const description = conn.description; @@ -676,128 +178,8 @@ function streamIdentifier(stream: Stream, options: ConnectionOptions): string { return uuidV4().toString('hex'); } -function write( - conn: Connection, - command: WriteProtocolMessageType, - options: CommandOptions, - callback: Callback -) { - options = options ?? {}; - const operationDescription: OperationDescription = { - requestId: command.requestId, - cb: callback, - session: options.session, - noResponse: typeof options.noResponse === 'boolean' ? options.noResponse : false, - documentsReturnedIn: options.documentsReturnedIn, - - // for BSON parsing - useBigInt64: typeof options.useBigInt64 === 'boolean' ? options.useBigInt64 : false, - promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true, - promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true, - promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false, - bsonRegExp: typeof options.bsonRegExp === 'boolean' ? options.bsonRegExp : false, - enableUtf8Validation: - typeof options.enableUtf8Validation === 'boolean' ? options.enableUtf8Validation : true, - raw: typeof options.raw === 'boolean' ? options.raw : false, - started: 0 - }; - - if (conn[kDescription] && conn[kDescription].compressor) { - operationDescription.agreedCompressor = conn[kDescription].compressor; - - if (conn[kDescription].zlibCompressionLevel) { - operationDescription.zlibCompressionLevel = conn[kDescription].zlibCompressionLevel; - } - } - - if (typeof options.socketTimeoutMS === 'number') { - conn[kStream].setTimeout(options.socketTimeoutMS); - } else if (conn.socketTimeoutMS !== 0) { - conn[kStream].setTimeout(conn.socketTimeoutMS); - } - - // if command monitoring is enabled we need to modify the callback here - if (conn.monitorCommands) { - conn.emit( - Connection.COMMAND_STARTED, - new CommandStartedEvent(conn, command, conn[kDescription].serverConnectionId) - ); - - operationDescription.started = now(); - operationDescription.cb = (err, reply) => { - // Command monitoring spec states that if ok is 1, then we must always emit - // a command succeeded event, even if there's an error. Write concern errors - // will have an ok: 1 in their reply. - if (err && reply?.ok !== 1) { - conn.emit( - Connection.COMMAND_FAILED, - new CommandFailedEvent( - conn, - command, - err, - operationDescription.started, - conn[kDescription].serverConnectionId - ) - ); - } else { - if (reply && (reply.ok === 0 || reply.$err)) { - conn.emit( - Connection.COMMAND_FAILED, - new CommandFailedEvent( - conn, - command, - reply, - operationDescription.started, - conn[kDescription].serverConnectionId - ) - ); - } else { - conn.emit( - Connection.COMMAND_SUCCEEDED, - new CommandSucceededEvent( - conn, - command, - reply, - operationDescription.started, - conn[kDescription].serverConnectionId - ) - ); - } - } - - if (typeof callback === 'function') { - // Since we're passing through the reply with the write concern error now, we - // need it not to be provided to the original callback in this case so - // retryability does not get tricked into thinking the command actually - // succeeded. - callback(err, err instanceof MongoWriteConcernError ? undefined : reply); - } - }; - } - - if (!operationDescription.noResponse) { - conn[kQueue].set(operationDescription.requestId, operationDescription); - } - - try { - conn[kMessageStream].writeCommand(command, operationDescription); - } catch (e) { - if (!operationDescription.noResponse) { - conn[kQueue].delete(operationDescription.requestId); - operationDescription.cb(e); - return; - } - } - - if (operationDescription.noResponse) { - operationDescription.cb(); - } -} - -/** in-progress connection layer */ - /** @internal */ -export class ModernConnection extends TypedEventEmitter { +export class Connection extends TypedEventEmitter { id: number | ''; address: string; socketTimeoutMS: number; @@ -1033,7 +415,6 @@ export class ModernConnection extends TypedEventEmitter { } if ( - // @ts-expect-error ModernConnections cannot be passed as connections isSharded(this) && !this.supportsOpMsg && readPreference && @@ -1122,14 +503,7 @@ export class ModernConnection extends TypedEventEmitter { let started = 0; if (this.monitorCommands) { started = now(); - this.emit( - ModernConnection.COMMAND_STARTED, - new CommandStartedEvent( - this as unknown as Connection, - message, - this[kDescription].serverConnectionId - ) - ); + this.emit(Connection.COMMAND_STARTED, new CommandStartedEvent(this, message)); } let document; @@ -1149,9 +523,9 @@ export class ModernConnection extends TypedEventEmitter { if (this.monitorCommands) { this.emit( - ModernConnection.COMMAND_SUCCEEDED, + Connection.COMMAND_SUCCEEDED, new CommandSucceededEvent( - this as unknown as Connection, + this, message, options.noResponse ? undefined : document, started, @@ -1167,9 +541,9 @@ export class ModernConnection extends TypedEventEmitter { if (this.monitorCommands) { error.name === 'MongoWriteConcernError' ? this.emit( - ModernConnection.COMMAND_SUCCEEDED, + Connection.COMMAND_SUCCEEDED, new CommandSucceededEvent( - this as unknown as Connection, + this, message, options.noResponse ? undefined : document, started, @@ -1177,14 +551,8 @@ export class ModernConnection extends TypedEventEmitter { ) ) : this.emit( - ModernConnection.COMMAND_FAILED, - new CommandFailedEvent( - this as unknown as Connection, - message, - error, - started, - this[kDescription].serverConnectionId - ) + Connection.COMMAND_FAILED, + new CommandFailedEvent(this, message, error, started) ); } throw error; @@ -1228,7 +596,7 @@ export class ModernConnection extends TypedEventEmitter { */ async writeCommand( command: WriteProtocolMessageType, - options: Partial> + options: { agreedCompressor?: CompressorName; zlibCompressionLevel?: number } ): Promise { const finalCommand = options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command) @@ -1267,9 +635,9 @@ export class ModernConnection extends TypedEventEmitter { /** @internal */ export class SizedMessageTransform extends Transform { bufferPool: BufferPool; - connection: ModernConnection; + connection: Connection; - constructor({ connection }: { connection: ModernConnection }) { + constructor({ connection }: { connection: Connection }) { super({ objectMode: false }); this.bufferPool = new BufferPool(); this.connection = connection; @@ -1299,3 +667,67 @@ export class SizedMessageTransform extends Transform { return callback(null, message); } } + +/** @internal */ +export class CryptoConnection extends Connection { + /** @internal */ + [kAutoEncrypter]?: AutoEncrypter; + + constructor(stream: Stream, options: ConnectionOptions) { + super(stream, options); + this[kAutoEncrypter] = options.autoEncrypter; + } + + /** @internal @override */ + override async command( + ns: MongoDBNamespace, + cmd: Document, + options: CommandOptions + ): Promise { + const autoEncrypter = this[kAutoEncrypter]; + if (!autoEncrypter) { + throw new MongoMissingDependencyError('No AutoEncrypter available for encryption'); + } + + const serverWireVersion = maxWireVersion(this); + if (serverWireVersion === 0) { + // This means the initial handshake hasn't happened yet + return super.command(ns, cmd, options); + } + + if (serverWireVersion < 8) { + throw new MongoCompatibilityError( + 'Auto-encryption requires a minimum MongoDB version of 4.2' + ); + } + + // Save sort or indexKeys based on the command being run + // the encrypt API serializes our JS objects to BSON to pass to the native code layer + // and then deserializes the encrypted result, the protocol level components + // of the command (ex. sort) are then converted to JS objects potentially losing + // import key order information. These fields are never encrypted so we can save the values + // from before the encryption and replace them after encryption has been performed + const sort: Map | null = cmd.find || cmd.findAndModify ? cmd.sort : null; + const indexKeys: Map[] | null = cmd.createIndexes + ? cmd.indexes.map((index: { key: Map }) => index.key) + : null; + + const encrypted = await autoEncrypter.encrypt(ns.toString(), cmd, options); + + // Replace the saved values + if (sort != null && (cmd.find || cmd.findAndModify)) { + encrypted.sort = sort; + } + + if (indexKeys != null && cmd.createIndexes) { + for (const [offset, index] of indexKeys.entries()) { + // @ts-expect-error `encrypted` is a generic "command", but we've narrowed for only `createIndexes` commands here + encrypted.indexes[offset].key = index; + } + } + + const response = await super.command(ns, encrypted, options); + + return autoEncrypter.decrypt(response, options); + } +} diff --git a/src/cmap/message_stream.ts b/src/cmap/message_stream.ts deleted file mode 100644 index 42d16ae26e5..00000000000 --- a/src/cmap/message_stream.ts +++ /dev/null @@ -1,220 +0,0 @@ -import { Duplex, type DuplexOptions } from 'stream'; - -import type { BSONSerializeOptions, Document } from '../bson'; -import { MongoDecompressionError, MongoParseError } from '../error'; -import type { ClientSession } from '../sessions'; -import { BufferPool, type Callback } from '../utils'; -import { - type MessageHeader, - OpCompressedRequest, - OpMsgResponse, - OpQueryResponse, - type WriteProtocolMessageType -} from './commands'; -import { compress, Compressor, type CompressorName, decompress } from './wire_protocol/compression'; -import { OP_COMPRESSED, OP_MSG } from './wire_protocol/constants'; - -const MESSAGE_HEADER_SIZE = 16; -const COMPRESSION_DETAILS_SIZE = 9; // originalOpcode + uncompressedSize, compressorID - -const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4; -/** @internal */ -const kBuffer = Symbol('buffer'); - -/** @internal */ -export interface MessageStreamOptions extends DuplexOptions { - maxBsonMessageSize?: number; -} - -/** @internal */ -export interface OperationDescription extends BSONSerializeOptions { - started: number; - cb: Callback; - documentsReturnedIn?: string; - noResponse: boolean; - raw: boolean; - requestId: number; - session?: ClientSession; - agreedCompressor?: CompressorName; - zlibCompressionLevel?: number; - $clusterTime?: Document; -} - -/** - * A duplex stream that is capable of reading and writing raw wire protocol messages, with - * support for optional compression - * @internal - */ -export class MessageStream extends Duplex { - /** @internal */ - maxBsonMessageSize: number; - /** @internal */ - [kBuffer]: BufferPool; - /** @internal */ - isMonitoringConnection = false; - - constructor(options: MessageStreamOptions = {}) { - super(options); - this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize; - this[kBuffer] = new BufferPool(); - } - - get buffer(): BufferPool { - return this[kBuffer]; - } - - override _write(chunk: Buffer, _: unknown, callback: Callback): void { - this[kBuffer].append(chunk); - processIncomingData(this, callback); - } - - override _read(/* size */): void { - // NOTE: This implementation is empty because we explicitly push data to be read - // when `writeMessage` is called. - return; - } - - writeCommand( - command: WriteProtocolMessageType, - operationDescription: OperationDescription - ): void { - const agreedCompressor = operationDescription.agreedCompressor ?? 'none'; - if (agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)) { - const data = command.toBin(); - this.push(Array.isArray(data) ? Buffer.concat(data) : data); - return; - } - // otherwise, compress the message - const concatenatedOriginalCommandBuffer = Buffer.concat(command.toBin()); - const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE); - - // Extract information needed for OP_COMPRESSED from the uncompressed message - const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12); - - const options = { - agreedCompressor, - zlibCompressionLevel: operationDescription.zlibCompressionLevel ?? 0 - }; - // Compress the message body - compress(options, messageToBeCompressed).then( - compressedMessage => { - // Create the msgHeader of OP_COMPRESSED - const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE); - msgHeader.writeInt32LE( - MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length, - 0 - ); // messageLength - msgHeader.writeInt32LE(command.requestId, 4); // requestID - msgHeader.writeInt32LE(0, 8); // responseTo (zero) - msgHeader.writeInt32LE(OP_COMPRESSED, 12); // opCode - - // Create the compression details of OP_COMPRESSED - const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE); - compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode - compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader - compressionDetails.writeUInt8(Compressor[agreedCompressor], 8); // compressorID - this.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage])); - }, - error => { - operationDescription.cb(error); - } - ); - } -} - -function processIncomingData(stream: MessageStream, callback: Callback): void { - const buffer = stream[kBuffer]; - const sizeOfMessage = buffer.getInt32(); - - if (sizeOfMessage == null) { - return callback(); - } - - if (sizeOfMessage < 0) { - return callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`)); - } - - if (sizeOfMessage > stream.maxBsonMessageSize) { - return callback( - new MongoParseError( - `Invalid message size: ${sizeOfMessage}, max allowed: ${stream.maxBsonMessageSize}` - ) - ); - } - - if (sizeOfMessage > buffer.length) { - return callback(); - } - - const message = buffer.read(sizeOfMessage); - const messageHeader: MessageHeader = { - length: message.readInt32LE(0), - requestId: message.readInt32LE(4), - responseTo: message.readInt32LE(8), - opCode: message.readInt32LE(12) - }; - - const monitorHasAnotherHello = () => { - if (stream.isMonitoringConnection) { - // Can we read the next message size? - const sizeOfMessage = buffer.getInt32(); - if (sizeOfMessage != null && sizeOfMessage <= buffer.length) { - return true; - } - } - return false; - }; - - let ResponseType = messageHeader.opCode === OP_MSG ? OpMsgResponse : OpQueryResponse; - if (messageHeader.opCode !== OP_COMPRESSED) { - const messageBody = message.subarray(MESSAGE_HEADER_SIZE); - - // If we are a monitoring connection message stream and - // there is more in the buffer that can be read, skip processing since we - // want the last hello command response that is in the buffer. - if (monitorHasAnotherHello()) { - return processIncomingData(stream, callback); - } - - stream.emit('message', new ResponseType(message, messageHeader, messageBody)); - - if (buffer.length >= 4) { - return processIncomingData(stream, callback); - } - return callback(); - } - - messageHeader.fromCompressed = true; - messageHeader.opCode = message.readInt32LE(MESSAGE_HEADER_SIZE); - messageHeader.length = message.readInt32LE(MESSAGE_HEADER_SIZE + 4); - const compressorID = message[MESSAGE_HEADER_SIZE + 8]; - const compressedBuffer = message.slice(MESSAGE_HEADER_SIZE + 9); - - // recalculate based on wrapped opcode - ResponseType = messageHeader.opCode === OP_MSG ? OpMsgResponse : OpQueryResponse; - decompress(compressorID, compressedBuffer).then( - messageBody => { - if (messageBody.length !== messageHeader.length) { - return callback( - new MongoDecompressionError('Message body and message header must be the same length') - ); - } - - // If we are a monitoring connection message stream and - // there is more in the buffer that can be read, skip processing since we - // want the last hello command response that is in the buffer. - if (monitorHasAnotherHello()) { - return processIncomingData(stream, callback); - } - stream.emit('message', new ResponseType(message, messageHeader, messageBody)); - - if (buffer.length >= 4) { - return processIncomingData(stream, callback); - } - return callback(); - }, - error => { - return callback(error); - } - ); -} diff --git a/src/cmap/wire_protocol/compression.ts b/src/cmap/wire_protocol/compression.ts index 74cca5da5fb..6a7d5a6bf4a 100644 --- a/src/cmap/wire_protocol/compression.ts +++ b/src/cmap/wire_protocol/compression.ts @@ -11,7 +11,6 @@ import { OpQueryResponse, type WriteProtocolMessageType } from '../commands'; -import { type OperationDescription } from '../message_stream'; import { OP_COMPRESSED, OP_MSG } from './constants'; /** @public */ @@ -144,7 +143,7 @@ const MESSAGE_HEADER_SIZE = 16; */ export async function compressCommand( command: WriteProtocolMessageType, - description: OperationDescription + description: { agreedCompressor?: CompressorName; zlibCompressionLevel?: number } ): Promise { const finalCommand = description.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command) diff --git a/src/index.ts b/src/index.ts index 5ef232cf687..82d702b5c44 100644 --- a/src/index.ts +++ b/src/index.ts @@ -264,11 +264,9 @@ export type { export type { LEGAL_TCP_SOCKET_OPTIONS, LEGAL_TLS_SOCKET_OPTIONS, Stream } from './cmap/connect'; export type { CommandOptions, - Connection, ConnectionEvents, ConnectionOptions, DestroyOptions, - ModernConnection, ProxyOptions } from './cmap/connection'; export type { @@ -281,11 +279,6 @@ export type { WithConnectionCallback } from './cmap/connection_pool'; export type { ClientMetadata, ClientMetadataOptions } from './cmap/handshake/client_metadata'; -export type { - MessageStream, - MessageStreamOptions, - OperationDescription -} from './cmap/message_stream'; export type { ConnectionPoolMetrics } from './cmap/metrics'; export type { StreamDescription, StreamDescriptionOptions } from './cmap/stream_description'; export type { CompressorName } from './cmap/wire_protocol/compression'; diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index 2645650c8e1..cdf91694f25 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -362,10 +362,6 @@ function checkServer(monitor: Monitor, callback: Callback) { } if (conn) { - // Tell the connection that we are using the streaming protocol so that the - // connection's message stream will only read the last hello on the buffer. - conn.isMonitoringConnection = true; - if (isInCloseState(monitor)) { conn.destroy({ force: true }); return; diff --git a/test/integration/auth/auth.prose.test.ts b/test/integration/auth/auth.prose.test.ts index e1be56b63e8..e9d0fc651ff 100644 --- a/test/integration/auth/auth.prose.test.ts +++ b/test/integration/auth/auth.prose.test.ts @@ -1,13 +1,7 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; -import { - Connection, - LEGACY_HELLO_COMMAND, - ModernConnection, - type MongoClient, - ScramSHA256 -} from '../../mongodb'; +import { Connection, LEGACY_HELLO_COMMAND, type MongoClient, ScramSHA256 } from '../../mongodb'; function makeConnectionString(config, username, password) { return `mongodb://${username}:${password}@${config.host}:${config.port}/admin?`; @@ -295,9 +289,7 @@ describe('Authentication Spec Prose Tests', function () { }; client = this.configuration.newClient({}, options); - const connectionType = - process.env.MONGODB_NEW_CONNECTION === 'true' ? ModernConnection : Connection; - const commandSpy = sinon.spy(connectionType.prototype, 'command'); + const commandSpy = sinon.spy(Connection.prototype, 'command'); await client.connect(); const calls = commandSpy .getCalls() diff --git a/test/integration/mongodb-handshake/mongodb-handshake.test.ts b/test/integration/mongodb-handshake/mongodb-handshake.test.ts index a1400a9818b..b43026da671 100644 --- a/test/integration/mongodb-handshake/mongodb-handshake.test.ts +++ b/test/integration/mongodb-handshake/mongodb-handshake.test.ts @@ -5,7 +5,6 @@ import * as sinon from 'sinon'; import { Connection, LEGACY_HELLO_COMMAND, - ModernConnection, MongoServerError, MongoServerSelectionError, OpMsgRequest, @@ -20,24 +19,19 @@ describe('MongoDB Handshake', () => { context('when hello is too large', () => { before(() => { - const connectionType = - process.env.MONGODB_NEW_CONNECTION === 'true' ? ModernConnection : Connection; - - sinon - .stub(connectionType.prototype, 'commandAsync') - .callsFake(async function (ns, cmd, options) { - // @ts-expect-error: sinon will place wrappedMethod there - const commandAsync = connectionType.prototype.commandAsync.wrappedMethod.bind(this); - - if (cmd.hello || cmd[LEGACY_HELLO_COMMAND]) { - return commandAsync( - ns, - { ...cmd, client: { driver: { name: 'a'.repeat(1000) } } }, - options - ); - } - return commandAsync(ns, cmd, options); - }); + sinon.stub(Connection.prototype, 'commandAsync').callsFake(async function (ns, cmd, options) { + // @ts-expect-error: sinon will place wrappedMethod there + const commandAsync = connectionType.prototype.commandAsync.wrappedMethod.bind(this); + + if (cmd.hello || cmd[LEGACY_HELLO_COMMAND]) { + return commandAsync( + ns, + { ...cmd, client: { driver: { name: 'a'.repeat(1000) } } }, + options + ); + } + return commandAsync(ns, cmd, options); + }); }); after(() => sinon.restore()); @@ -58,9 +52,7 @@ describe('MongoDB Handshake', () => { let spy: Sinon.SinonSpy; before(() => { - const connectionType = - process.env.MONGODB_NEW_CONNECTION === 'true' ? ModernConnection : Connection; - spy = sinon.spy(connectionType.prototype, 'commandAsync'); + spy = sinon.spy(Connection.prototype, 'commandAsync'); }); after(() => sinon.restore()); diff --git a/test/integration/node-specific/mongo_client.test.ts b/test/integration/node-specific/mongo_client.test.ts index daaf48130d4..0b902507ebc 100644 --- a/test/integration/node-specific/mongo_client.test.ts +++ b/test/integration/node-specific/mongo_client.test.ts @@ -9,7 +9,6 @@ import { Connection, Db, getTopology, - ModernConnection, MongoClient, MongoNotConnectedError, MongoServerSelectionError, @@ -347,10 +346,8 @@ describe('class MongoClient', function () { connectTimeoutMS: 0, heartbeatFrequencyMS: 500 }); - const connectionType = - process.env.MONGODB_NEW_CONNECTION === 'true' ? ModernConnection : Connection; - const spy = sinon.spy(connectionType.prototype, 'commandAsync'); + const spy = sinon.spy(Connection.prototype, 'commandAsync'); await client.connect(); @@ -367,10 +364,7 @@ describe('class MongoClient', function () { heartbeatFrequencyMS: 500 }); - const connectionType = - process.env.MONGODB_NEW_CONNECTION === 'true' ? ModernConnection : Connection; - - const spy = sinon.spy(connectionType.prototype, 'commandAsync'); + const spy = sinon.spy(Connection.prototype, 'commandAsync'); await client.connect(); diff --git a/test/mongodb.ts b/test/mongodb.ts index a5303838488..18986610e56 100644 --- a/test/mongodb.ts +++ b/test/mongodb.ts @@ -125,7 +125,6 @@ export * from '../src/cmap/connection_pool'; export * from '../src/cmap/connection_pool_events'; export * from '../src/cmap/errors'; export * from '../src/cmap/handshake/client_metadata'; -export * from '../src/cmap/message_stream'; export * from '../src/cmap/metrics'; export * from '../src/cmap/stream_description'; export * from '../src/cmap/wire_protocol/compression'; diff --git a/test/tools/runner/config.ts b/test/tools/runner/config.ts index ac3d05605ac..7f60a627e2e 100644 --- a/test/tools/runner/config.ts +++ b/test/tools/runner/config.ts @@ -242,10 +242,6 @@ export class TestConfiguration { throw new Error(`Cannot use options to specify host/port, must be in ${connectionString}`); } - if (process.env.MONGODB_NEW_CONNECTION === 'true') { - serverOptions.ConnectionType = ModernConnection; - } - return new MongoClient(connectionString, serverOptions); } diff --git a/test/tools/runner/hooks/configuration.js b/test/tools/runner/hooks/configuration.js index 886501d0d1c..9d4798faa1a 100644 --- a/test/tools/runner/hooks/configuration.js +++ b/test/tools/runner/hooks/configuration.js @@ -177,8 +177,7 @@ const testConfigBeforeHook = async function () { ocsp: process.env.OCSP_TLS_SHOULD_SUCCEED != null && process.env.CA_FILE != null, socks5: MONGODB_URI.includes('proxyHost='), compressor: process.env.COMPRESSOR, - cryptSharedLibPath: process.env.CRYPT_SHARED_LIB_PATH, - newConnectionTesting: process.env.MONGODB_NEW_CONNECTION + cryptSharedLibPath: process.env.CRYPT_SHARED_LIB_PATH }; console.error(inspect(currentEnv, { colors: true })); From 9e7425731387a3eb5e3ef06dd3997463aeebb8a5 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 15 Dec 2023 13:56:19 -0500 Subject: [PATCH 04/15] refactor: legacy tests --- .../connection.test.ts | 49 +++++++++---------- .../mongodb-handshake.test.ts | 2 +- 2 files changed, 24 insertions(+), 27 deletions(-) diff --git a/test/integration/connection-monitoring-and-pooling/connection.test.ts b/test/integration/connection-monitoring-and-pooling/connection.test.ts index d340cad8a3b..c5ca5509018 100644 --- a/test/integration/connection-monitoring-and-pooling/connection.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection.test.ts @@ -1,3 +1,5 @@ +import { promisify } from 'node:util'; + import { expect } from 'chai'; import { @@ -32,54 +34,49 @@ describe('Connection', function () { describe('Connection - functional/cmap', function () { it('should execute a command against a server', { metadata: { requires: { apiVersion: false, topology: '!load-balanced' } }, - test: function (done) { + test: async function () { const connectOptions: Partial = { connectionType: Connection, ...this.configuration.options, metadata: makeClientMetadata({ driverInfo: {} }) }; - connect(connectOptions as any as ConnectionOptions, (err, conn) => { - expect(err).to.not.exist; - this.defer(_done => conn.destroy(_done)); - - conn.command(ns('admin.$cmd'), { [LEGACY_HELLO_COMMAND]: 1 }, undefined, (err, hello) => { - expect(err).to.not.exist; - expect(hello).to.exist; - expect(hello.ok).to.equal(1); - done(); - }); - }); + let conn; + try { + conn = await promisify(connect)(connectOptions as any as ConnectionOptions); + const hello = await conn?.command(ns('admin.$cmd'), { [LEGACY_HELLO_COMMAND]: 1 }); + expect(hello).to.have.property('ok', 1); + } finally { + conn?.destroy(); + } } }); it('should emit command monitoring events', { metadata: { requires: { apiVersion: false, topology: '!load-balanced' } }, - test: function (done) { + test: async function () { const connectOptions: Partial = { connectionType: Connection, - monitorCommands: true, ...this.configuration.options, + monitorCommands: true, metadata: makeClientMetadata({ driverInfo: {} }) }; - connect(connectOptions as any as ConnectionOptions, (err, conn) => { - expect(err).to.not.exist; - this.defer(_done => conn.destroy(_done)); + let conn; + try { + conn = await promisify(connect)(connectOptions as any as ConnectionOptions); - const events = []; + const events: any[] = []; conn.on('commandStarted', event => events.push(event)); conn.on('commandSucceeded', event => events.push(event)); conn.on('commandFailed', event => events.push(event)); - conn.command(ns('admin.$cmd'), { [LEGACY_HELLO_COMMAND]: 1 }, undefined, (err, hello) => { - expect(err).to.not.exist; - expect(hello).to.exist; - expect(hello.ok).to.equal(1); - expect(events).to.have.length(2); - done(); - }); - }); + const hello = await conn?.command(ns('admin.$cmd'), { [LEGACY_HELLO_COMMAND]: 1 }); + expect(hello).to.have.property('ok', 1); + expect(events).to.have.lengthOf(2); + } finally { + conn?.destroy(); + } } }); diff --git a/test/integration/mongodb-handshake/mongodb-handshake.test.ts b/test/integration/mongodb-handshake/mongodb-handshake.test.ts index b43026da671..d90c9c81dda 100644 --- a/test/integration/mongodb-handshake/mongodb-handshake.test.ts +++ b/test/integration/mongodb-handshake/mongodb-handshake.test.ts @@ -21,7 +21,7 @@ describe('MongoDB Handshake', () => { before(() => { sinon.stub(Connection.prototype, 'commandAsync').callsFake(async function (ns, cmd, options) { // @ts-expect-error: sinon will place wrappedMethod there - const commandAsync = connectionType.prototype.commandAsync.wrappedMethod.bind(this); + const commandAsync = Connection.prototype.commandAsync.wrappedMethod.bind(this); if (cmd.hello || cmd[LEGACY_HELLO_COMMAND]) { return commandAsync( From 1c1fe748fdd4cd7c85688804f6b62b6525294943 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 15 Dec 2023 14:00:59 -0500 Subject: [PATCH 05/15] refactor: remove commandAsync --- src/cmap/auth/gssapi.ts | 2 +- src/cmap/auth/mongocr.ts | 8 ++------ src/cmap/auth/mongodb_aws.ts | 4 ++-- src/cmap/auth/mongodb_oidc/callback_workflow.ts | 4 ++-- src/cmap/auth/mongodb_oidc/service_workflow.ts | 2 +- src/cmap/auth/plain.ts | 2 +- src/cmap/auth/scram.ts | 6 +++--- src/cmap/auth/x509.ts | 6 +----- src/cmap/connect.ts | 2 +- src/cmap/connection.ts | 4 ---- src/sdam/monitor.ts | 4 ++-- src/sdam/server.ts | 2 +- .../rtt_pinger.test.ts | 2 +- .../mongodb-handshake/mongodb-handshake.test.ts | 14 +++++--------- .../integration/node-specific/mongo_client.test.ts | 4 ++-- .../server_discovery_and_monitoring.spec.test.ts | 2 +- test/unit/cmap/connection.test.ts | 4 ++-- 17 files changed, 28 insertions(+), 44 deletions(-) diff --git a/src/cmap/auth/gssapi.ts b/src/cmap/auth/gssapi.ts index 3f8135a4e4f..d1473c6cbe3 100644 --- a/src/cmap/auth/gssapi.ts +++ b/src/cmap/auth/gssapi.ts @@ -30,7 +30,7 @@ async function externalCommand( connection: Connection, command: ReturnType | ReturnType ): Promise<{ payload: string; conversationId: any }> { - return connection.commandAsync(ns('$external.$cmd'), command, undefined) as Promise<{ + return connection.command(ns('$external.$cmd'), command, undefined) as Promise<{ payload: string; conversationId: any; }>; diff --git a/src/cmap/auth/mongocr.ts b/src/cmap/auth/mongocr.ts index f886a7bd26c..acaab721e8d 100644 --- a/src/cmap/auth/mongocr.ts +++ b/src/cmap/auth/mongocr.ts @@ -13,11 +13,7 @@ export class MongoCR extends AuthProvider { const { username, password, source } = credentials; - const { nonce } = await connection.commandAsync( - ns(`${source}.$cmd`), - { getnonce: 1 }, - undefined - ); + const { nonce } = await connection.command(ns(`${source}.$cmd`), { getnonce: 1 }, undefined); const hashPassword = crypto .createHash('md5') @@ -37,6 +33,6 @@ export class MongoCR extends AuthProvider { key }; - await connection.commandAsync(ns(`${source}.$cmd`), authenticateCommand, undefined); + await connection.command(ns(`${source}.$cmd`), authenticateCommand, undefined); } } diff --git a/src/cmap/auth/mongodb_aws.ts b/src/cmap/auth/mongodb_aws.ts index cfaf8e6f9b3..e2b4604084d 100644 --- a/src/cmap/auth/mongodb_aws.ts +++ b/src/cmap/auth/mongodb_aws.ts @@ -109,7 +109,7 @@ export class MongoDBAWS extends AuthProvider { payload: BSON.serialize({ r: nonce, p: ASCII_N }, bsonOptions) }; - const saslStartResponse = await connection.commandAsync(ns(`${db}.$cmd`), saslStart, undefined); + const saslStartResponse = await connection.command(ns(`${db}.$cmd`), saslStart, undefined); const serverResponse = BSON.deserialize(saslStartResponse.payload.buffer, bsonOptions) as { s: Binary; @@ -169,7 +169,7 @@ export class MongoDBAWS extends AuthProvider { payload: BSON.serialize(payload, bsonOptions) }; - await connection.commandAsync(ns(`${db}.$cmd`), saslContinue, undefined); + await connection.command(ns(`${db}.$cmd`), saslContinue, undefined); } } diff --git a/src/cmap/auth/mongodb_oidc/callback_workflow.ts b/src/cmap/auth/mongodb_oidc/callback_workflow.ts index c220ae5b70c..9822fd1e505 100644 --- a/src/cmap/auth/mongodb_oidc/callback_workflow.ts +++ b/src/cmap/auth/mongodb_oidc/callback_workflow.ts @@ -163,7 +163,7 @@ export class CallbackWorkflow implements Workflow { if (!reauthenticating && response?.speculativeAuthenticate) { result = response.speculativeAuthenticate; } else { - result = await connection.commandAsync( + result = await connection.command( ns(credentials.source), startCommandDocument(credentials), undefined @@ -181,7 +181,7 @@ export class CallbackWorkflow implements Workflow { tokenResult: IdPServerResponse, conversationId?: number ): Promise { - const result = await connection.commandAsync( + const result = await connection.command( ns(credentials.source), finishCommandDocument(tokenResult.accessToken, conversationId), undefined diff --git a/src/cmap/auth/mongodb_oidc/service_workflow.ts b/src/cmap/auth/mongodb_oidc/service_workflow.ts index fb01e2c24ce..afea78fad53 100644 --- a/src/cmap/auth/mongodb_oidc/service_workflow.ts +++ b/src/cmap/auth/mongodb_oidc/service_workflow.ts @@ -18,7 +18,7 @@ export abstract class ServiceWorkflow implements Workflow { async execute(connection: Connection, credentials: MongoCredentials): Promise { const token = await this.getToken(credentials); const command = commandDocument(token); - return connection.commandAsync(ns(credentials.source), command, undefined); + return connection.command(ns(credentials.source), command, undefined); } /** diff --git a/src/cmap/auth/plain.ts b/src/cmap/auth/plain.ts index d568c939e9c..f5a43863113 100644 --- a/src/cmap/auth/plain.ts +++ b/src/cmap/auth/plain.ts @@ -20,6 +20,6 @@ export class Plain extends AuthProvider { autoAuthorize: 1 }; - await connection.commandAsync(ns('$external.$cmd'), command, undefined); + await connection.command(ns('$external.$cmd'), command, undefined); } } diff --git a/src/cmap/auth/scram.ts b/src/cmap/auth/scram.ts index a2eededa82f..53bbf531ebc 100644 --- a/src/cmap/auth/scram.ts +++ b/src/cmap/auth/scram.ts @@ -112,7 +112,7 @@ async function executeScram(cryptoMethod: CryptoMethod, authContext: AuthContext const db = credentials.source; const saslStartCmd = makeFirstMessage(cryptoMethod, credentials, nonce); - const response = await connection.commandAsync(ns(`${db}.$cmd`), saslStartCmd, undefined); + const response = await connection.command(ns(`${db}.$cmd`), saslStartCmd, undefined); await continueScramConversation(cryptoMethod, response, authContext); } @@ -186,7 +186,7 @@ async function continueScramConversation( payload: new Binary(Buffer.from(clientFinal)) }; - const r = await connection.commandAsync(ns(`${db}.$cmd`), saslContinueCmd, undefined); + const r = await connection.command(ns(`${db}.$cmd`), saslContinueCmd, undefined); const parsedResponse = parsePayload(r.payload); if (!compareDigest(Buffer.from(parsedResponse.v, 'base64'), serverSignature)) { @@ -204,7 +204,7 @@ async function continueScramConversation( payload: Buffer.alloc(0) }; - await connection.commandAsync(ns(`${db}.$cmd`), retrySaslContinueCmd, undefined); + await connection.command(ns(`${db}.$cmd`), retrySaslContinueCmd, undefined); } function parsePayload(payload: Binary) { diff --git a/src/cmap/auth/x509.ts b/src/cmap/auth/x509.ts index 793f28796bd..ec4c0007900 100644 --- a/src/cmap/auth/x509.ts +++ b/src/cmap/auth/x509.ts @@ -29,11 +29,7 @@ export class X509 extends AuthProvider { return; } - await connection.commandAsync( - ns('$external.$cmd'), - x509AuthenticateCommand(credentials), - undefined - ); + await connection.command(ns('$external.$cmd'), x509AuthenticateCommand(credentials), undefined); } } diff --git a/src/cmap/connect.ts b/src/cmap/connect.ts index d3839a5a03a..b22eb34059c 100644 --- a/src/cmap/connect.ts +++ b/src/cmap/connect.ts @@ -131,7 +131,7 @@ async function performInitialHandshake( } const start = new Date().getTime(); - const response = await conn.commandAsync(ns('admin.$cmd'), handshakeDoc, handshakeOptions); + const response = await conn.command(ns('admin.$cmd'), handshakeDoc, handshakeOptions); if (!('isWritablePrimary' in response)) { // Provide hello-style response document. diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index fe2482b1e59..f4be22df206 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -256,10 +256,6 @@ export class Connection extends TypedEventEmitter { }; } - async commandAsync(...args: Parameters) { - return this.command(...args); - } - /** Indicates that the connection (including underlying TCP socket) has been closed. */ get closed(): boolean { return this.controller.signal.aborted; diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index cdf91694f25..b3a10a74fbe 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -345,7 +345,7 @@ function checkServer(monitor: Monitor, callback: Callback) { awaited = false; connection - .commandAsync(ns('admin.$cmd'), cmd, options) + .command(ns('admin.$cmd'), cmd, options) .then(onHeartbeatSucceeded, onHeartbeatFailed); return; @@ -513,7 +513,7 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) { const commandName = connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND; - connection.commandAsync(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then( + connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined).then( () => measureAndReschedule(), () => { rttPinger.connection?.destroy({ force: true }); diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 04139c991ff..1eefbb01df4 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -371,7 +371,7 @@ export class Server extends TypedEventEmitter { this.decrementOperationCount(); cb(error, response); }); - conn.commandAsync(ns, cmd, finalOptions).then( + conn.command(ns, cmd, finalOptions).then( r => handler(undefined, r), e => handler(e) ); diff --git a/test/integration/connection-monitoring-and-pooling/rtt_pinger.test.ts b/test/integration/connection-monitoring-and-pooling/rtt_pinger.test.ts index 1a68848ea31..b171485a17d 100644 --- a/test/integration/connection-monitoring-and-pooling/rtt_pinger.test.ts +++ b/test/integration/connection-monitoring-and-pooling/rtt_pinger.test.ts @@ -164,7 +164,7 @@ describe('class RTTPinger', () => { const rttPingers = await getRTTPingers(client); for (const rtt of rttPingers) { - sinon.stub(rtt.connection, 'commandAsync').rejects(new Error('any error')); + sinon.stub(rtt.connection, 'command').rejects(new Error('any error')); } const spies = rttPingers.map(rtt => sinon.spy(rtt.connection, 'destroy')); diff --git a/test/integration/mongodb-handshake/mongodb-handshake.test.ts b/test/integration/mongodb-handshake/mongodb-handshake.test.ts index d90c9c81dda..be958498ad7 100644 --- a/test/integration/mongodb-handshake/mongodb-handshake.test.ts +++ b/test/integration/mongodb-handshake/mongodb-handshake.test.ts @@ -19,18 +19,14 @@ describe('MongoDB Handshake', () => { context('when hello is too large', () => { before(() => { - sinon.stub(Connection.prototype, 'commandAsync').callsFake(async function (ns, cmd, options) { + sinon.stub(Connection.prototype, 'command').callsFake(async function (ns, cmd, options) { // @ts-expect-error: sinon will place wrappedMethod there - const commandAsync = Connection.prototype.commandAsync.wrappedMethod.bind(this); + const command = Connection.prototype.command.wrappedMethod.bind(this); if (cmd.hello || cmd[LEGACY_HELLO_COMMAND]) { - return commandAsync( - ns, - { ...cmd, client: { driver: { name: 'a'.repeat(1000) } } }, - options - ); + return command(ns, { ...cmd, client: { driver: { name: 'a'.repeat(1000) } } }, options); } - return commandAsync(ns, cmd, options); + return command(ns, cmd, options); }); }); @@ -52,7 +48,7 @@ describe('MongoDB Handshake', () => { let spy: Sinon.SinonSpy; before(() => { - spy = sinon.spy(Connection.prototype, 'commandAsync'); + spy = sinon.spy(Connection.prototype, 'command'); }); after(() => sinon.restore()); diff --git a/test/integration/node-specific/mongo_client.test.ts b/test/integration/node-specific/mongo_client.test.ts index 0b902507ebc..c640535cebd 100644 --- a/test/integration/node-specific/mongo_client.test.ts +++ b/test/integration/node-specific/mongo_client.test.ts @@ -347,7 +347,7 @@ describe('class MongoClient', function () { heartbeatFrequencyMS: 500 }); - const spy = sinon.spy(Connection.prototype, 'commandAsync'); + const spy = sinon.spy(Connection.prototype, 'command'); await client.connect(); @@ -364,7 +364,7 @@ describe('class MongoClient', function () { heartbeatFrequencyMS: 500 }); - const spy = sinon.spy(Connection.prototype, 'commandAsync'); + const spy = sinon.spy(Connection.prototype, 'command'); await client.connect(); diff --git a/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts b/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts index 06037279b74..6c5e2a34c9e 100644 --- a/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts +++ b/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts @@ -424,7 +424,7 @@ function withConnectionStubImpl(appError) { throw new MongoServerError(appError.response); } }, - async commandAsync(ns, cmd, options) { + async command(ns, cmd, options) { return this.command(ns, cmd, options); } }; diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts index d3c33193d61..93d6bef065a 100644 --- a/test/unit/cmap/connection.test.ts +++ b/test/unit/cmap/connection.test.ts @@ -187,7 +187,7 @@ describe('new Connection()', function () { }); }); - it('calls the command function through commandAsync', async function () { + it('calls the command function through command', async function () { server.setMessageHandler(request => { const doc = request.document; if (isHello(doc)) { @@ -205,7 +205,7 @@ describe('new Connection()', function () { const connection: Connection = await connectAsync(options); const commandSpy = sinon.spy(connection, 'command'); - await connection.commandAsync(ns('dummy'), { ping: 1 }, {}); + await connection.command(ns('dummy'), { ping: 1 }, {}); expect(commandSpy).to.have.been.calledOnce; }); From 603e0cfd3bdf05729109a3cf413c71b69a5fbc75 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 15 Dec 2023 14:22:08 -0500 Subject: [PATCH 06/15] chore: symbol properties and access modifiers --- src/cmap/connection.ts | 153 +++++++++++---------------------- src/cmap/stream_description.ts | 3 + 2 files changed, 52 insertions(+), 104 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index f4be22df206..c38c017d686 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -1,5 +1,5 @@ import { type Readable, Transform, type TransformCallback } from 'stream'; -import { setTimeout } from 'timers'; +import { clearTimeout, setTimeout } from 'timers'; import { promisify } from 'util'; import type { BSONSerializeOptions, Document, ObjectId } from '../bson'; @@ -62,19 +62,6 @@ import { type CompressorName, decompressResponse } from './wire_protocol/compres import { onData } from './wire_protocol/on_data'; import { getReadPreference, isSharded } from './wire_protocol/shared'; -/** @internal */ -const kGeneration = Symbol('generation'); -/** @internal */ -const kLastUseTime = Symbol('lastUseTime'); -/** @internal */ -const kClusterTime = Symbol('clusterTime'); -/** @internal */ -const kDescription = Symbol('description'); -/** @internal */ -const kHello = Symbol('hello'); -/** @internal */ -const kAutoEncrypter = Symbol('autoEncrypter'); - /** @internal */ export interface CommandOptions extends BSONSerializeOptions { secondaryOk?: boolean; @@ -154,15 +141,6 @@ export function hasSessionSupport(conn: Connection): boolean { return description.logicalSessionTimeoutMinutes != null; } -function supportsOpMsg(conn: Connection) { - const description = conn.description; - if (description == null) { - return false; - } - - return maxWireVersion(conn) >= 6 && !description.__nodejs_mock_server__; -} - function streamIdentifier(stream: Stream, options: ConnectionOptions): string { if (options.proxyHost) { // If proxy options are specified, the properties of `stream` itself @@ -180,33 +158,24 @@ function streamIdentifier(stream: Stream, options: ConnectionOptions): string { /** @internal */ export class Connection extends TypedEventEmitter { - id: number | ''; - address: string; - socketTimeoutMS: number; - monitorCommands: boolean; - lastHelloMS?: number; - serverApi?: ServerApi; - helloOk?: boolean; - /** @internal */ - authContext?: AuthContext; - - delayedTimeoutId: NodeJS.Timeout | null = null; - /** @internal */ - [kDescription]: StreamDescription; - /** @internal */ - [kGeneration]: number; - /** @internal */ - [kLastUseTime]: number; - + public id: number | ''; + public address: string; + public lastHelloMS?: number; + public serverApi?: ServerApi; + public helloOk?: boolean; + public authContext?: AuthContext; + public delayedTimeoutId: NodeJS.Timeout | null = null; + public generation: number; + public readonly description: Readonly; + + private lastUseTime: number; + private socketTimeoutMS: number; + private monitorCommands: boolean; private socket: Stream; private controller: AbortController; private messageStream: Readable; private socketWrite: (buffer: Uint8Array) => Promise; - - /** @internal */ - [kHello]: Document | null; - /** @internal */ - [kClusterTime]: Document | null; + private clusterTime: Document | null = null; /** @event */ static readonly COMMAND_STARTED = COMMAND_STARTED; @@ -233,12 +202,10 @@ export class Connection extends TypedEventEmitter { this.socketTimeoutMS = options.socketTimeoutMS ?? 0; this.monitorCommands = options.monitorCommands; this.serverApi = options.serverApi; - this[kHello] = null; - this[kClusterTime] = null; - this[kDescription] = new StreamDescription(this.address, options); - this[kGeneration] = options.generation; - this[kLastUseTime] = now(); + this.description = new StreamDescription(this.address, options); + this.generation = options.generation; + this.lastUseTime = now(); this.socket = stream; this.controller = new AbortController(); @@ -257,81 +224,54 @@ export class Connection extends TypedEventEmitter { } /** Indicates that the connection (including underlying TCP socket) has been closed. */ - get closed(): boolean { + public get closed(): boolean { return this.controller.signal.aborted; } - get description(): StreamDescription { - return this[kDescription]; - } - - get hello(): Document | null { - return this[kHello]; - } - // the `connect` method stores the result of the handshake hello on the connection - set hello(response: Document | null) { - this[kDescription].receiveResponse(response); - this[kDescription] = Object.freeze(this[kDescription]); - - // TODO: remove this, and only use the `StreamDescription` in the future - this[kHello] = response; + public set hello(response: Document | null) { + this.description.receiveResponse(response); + Object.freeze(this.description); } - get serviceId(): ObjectId | undefined { + public get serviceId(): ObjectId | undefined { return this.hello?.serviceId; } - get loadBalanced(): boolean { + public get loadBalanced(): boolean { return this.description.loadBalanced; } - get generation(): number { - return this[kGeneration] || 0; - } - - set generation(generation: number) { - this[kGeneration] = generation; - } - - get idleTime(): number { - return calculateDurationInMs(this[kLastUseTime]); + public get idleTime(): number { + return calculateDurationInMs(this.lastUseTime); } - get clusterTime(): Document | null { - return this[kClusterTime]; - } - - get stream(): Stream { - return this.socket; - } - - get hasSessionSupport(): boolean { + private get hasSessionSupport(): boolean { return this.description.logicalSessionTimeoutMinutes != null; } - get supportsOpMsg(): boolean { + private get supportsOpMsg(): boolean { return ( this.description != null && - maxWireVersion(this as any as Connection) >= 6 && + maxWireVersion(this) >= 6 && !this.description.__nodejs_mock_server__ ); } - markAvailable(): void { - this[kLastUseTime] = now(); + public markAvailable(): void { + this.lastUseTime = now(); } - onError(error?: Error) { + public onError(error?: Error) { this.cleanup(error); } - onClose() { + private onClose() { const message = `connection ${this.id} to ${this.address} closed`; this.cleanup(new MongoNetworkError(message)); } - onTimeout() { + private onTimeout() { this.delayedTimeoutId = setTimeout(() => { const message = `connection ${this.id} to ${this.address} timed out`; const beforeHandshake = this.hello == null; @@ -339,7 +279,7 @@ export class Connection extends TypedEventEmitter { }, 1).unref(); // No need for this timer to hold the event loop open } - destroy(options: DestroyOptions, callback?: Callback): void { + public destroy(options: DestroyOptions, callback?: Callback): void { if (this.closed) { if (typeof callback === 'function') process.nextTick(callback); return; @@ -474,7 +414,7 @@ export class Connection extends TypedEventEmitter { } if (document.$clusterTime) { - this[kClusterTime] = document.$clusterTime; + this.clusterTime = document.$clusterTime; this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime); } } @@ -493,7 +433,11 @@ export class Connection extends TypedEventEmitter { } } - async *sendCommand(ns: MongoDBNamespace, command: Document, options: CommandOptions = {}) { + private async *sendCommand( + ns: MongoDBNamespace, + command: Document, + options: CommandOptions = {} + ) { const message = this.prepareCommand(ns.db, command, options); let started = 0; @@ -555,7 +499,7 @@ export class Connection extends TypedEventEmitter { } } - async command( + public async command( ns: MongoDBNamespace, command: Document, options: CommandOptions = {} @@ -567,7 +511,7 @@ export class Connection extends TypedEventEmitter { throw new MongoUnexpectedServerResponseError('Unable to get response from server'); } - exhaustCommand( + public exhaustCommand( ns: MongoDBNamespace, command: Document, options: CommandOptions, @@ -590,7 +534,7 @@ export class Connection extends TypedEventEmitter { * Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command. This method * waits until the socket's buffer has emptied (the Nodejs socket `drain` event has fired). */ - async writeCommand( + private async writeCommand( command: WriteProtocolMessageType, options: { agreedCompressor?: CompressorName; zlibCompressionLevel?: number } ): Promise { @@ -616,7 +560,7 @@ export class Connection extends TypedEventEmitter { * * Note that `for-await` loops call `return` automatically when the loop is exited. */ - async *readMany(): AsyncGenerator { + private async *readMany(): AsyncGenerator { for await (const message of onData(this.messageStream, { signal: this.controller.signal })) { const response = await decompressResponse(message); yield response; @@ -638,6 +582,7 @@ export class SizedMessageTransform extends Transform { this.bufferPool = new BufferPool(); this.connection = connection; } + override _transform(chunk: Buffer, encoding: unknown, callback: TransformCallback): void { if (this.connection.delayedTimeoutId != null) { clearTimeout(this.connection.delayedTimeoutId); @@ -667,11 +612,11 @@ export class SizedMessageTransform extends Transform { /** @internal */ export class CryptoConnection extends Connection { /** @internal */ - [kAutoEncrypter]?: AutoEncrypter; + autoEncrypter?: AutoEncrypter; constructor(stream: Stream, options: ConnectionOptions) { super(stream, options); - this[kAutoEncrypter] = options.autoEncrypter; + this.autoEncrypter = options.autoEncrypter; } /** @internal @override */ @@ -680,7 +625,7 @@ export class CryptoConnection extends Connection { cmd: Document, options: CommandOptions ): Promise { - const autoEncrypter = this[kAutoEncrypter]; + const { autoEncrypter } = this; if (!autoEncrypter) { throw new MongoMissingDependencyError('No AutoEncrypter available for encryption'); } diff --git a/src/cmap/stream_description.ts b/src/cmap/stream_description.ts index acf80e2dcdc..85decc2a091 100644 --- a/src/cmap/stream_description.ts +++ b/src/cmap/stream_description.ts @@ -38,6 +38,8 @@ export class StreamDescription { zlibCompressionLevel?: number; serverConnectionId: bigint | null; + public hello: Document | null = null; + constructor(address: string, options?: StreamDescriptionOptions) { this.address = address; this.type = ServerType.Unknown; @@ -59,6 +61,7 @@ export class StreamDescription { if (response == null) { return; } + this.hello = response; this.type = parseServerType(response); if ('connectionId' in response) { this.serverConnectionId = this.parseServerConnectionID(response.connectionId); From 5965524581762e529ec1e94263f98da3d7749ad6 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 15 Dec 2023 14:40:23 -0500 Subject: [PATCH 07/15] fix: hello getter --- src/cmap/auth/mongo_credentials.ts | 4 ++-- src/cmap/connection.ts | 4 ++++ src/cmap/connection_pool.ts | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/cmap/auth/mongo_credentials.ts b/src/cmap/auth/mongo_credentials.ts index 150a0841680..c086afb4e7e 100644 --- a/src/cmap/auth/mongo_credentials.ts +++ b/src/cmap/auth/mongo_credentials.ts @@ -12,7 +12,7 @@ import type { OIDCRefreshFunction, OIDCRequestFunction } from './mongodb_oidc'; import { AUTH_MECHS_AUTH_SRC_EXTERNAL, AuthMechanism } from './providers'; // https://github.com/mongodb/specifications/blob/master/source/auth/auth.rst -function getDefaultAuthMechanism(hello?: Document): AuthMechanism { +function getDefaultAuthMechanism(hello: Document | null): AuthMechanism { if (hello) { // If hello contains saslSupportedMechs, use scram-sha-256 // if it is available, else scram-sha-1 @@ -151,7 +151,7 @@ export class MongoCredentials { * * @param hello - A hello response from the server */ - resolveAuthMechanism(hello?: Document): MongoCredentials { + resolveAuthMechanism(hello: Document | null): MongoCredentials { // If the mechanism is not "default", then it does not need to be resolved if (this.mechanism.match(/DEFAULT/i)) { return new MongoCredentials({ diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index c38c017d686..ea4edba196a 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -228,6 +228,10 @@ export class Connection extends TypedEventEmitter { return this.controller.signal.aborted; } + public get hello() { + return this.description.hello; + } + // the `connect` method stores the result of the handshake hello on the connection public set hello(response: Document | null) { this.description.receiveResponse(response); diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index a4f0edfac0a..0df93f688af 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -621,7 +621,7 @@ export class ConnectionPool extends TypedEventEmitter { ) ); } - const resolvedCredentials = credentials.resolveAuthMechanism(connection.hello || undefined); + const resolvedCredentials = credentials.resolveAuthMechanism(connection.hello); const provider = AUTH_PROVIDERS.get(resolvedCredentials.mechanism); if (!provider) { return callback( From 6d5efc63b485f86afe66da8e75a235ee1f966664 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 15 Dec 2023 14:43:49 -0500 Subject: [PATCH 08/15] chore: remove message event --- src/cmap/connection.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index ea4edba196a..664d6747341 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -10,7 +10,6 @@ import { COMMAND_FAILED, COMMAND_STARTED, COMMAND_SUCCEEDED, - MESSAGE, PINNED, UNPINNED } from '../constants'; @@ -130,7 +129,6 @@ export type ConnectionEvents = { commandFailed(event: CommandFailedEvent): void; clusterTimeReceived(clusterTime: Document): void; close(): void; - message(message: any): void; pinned(pinType: string): void; unpinned(pinType: string): void; }; @@ -188,8 +186,6 @@ export class Connection extends TypedEventEmitter { /** @event */ static readonly CLOSE = CLOSE; /** @event */ - static readonly MESSAGE = MESSAGE; - /** @event */ static readonly PINNED = PINNED; /** @event */ static readonly UNPINNED = UNPINNED; From 9870037ec896fda9e3337a662eb1283c0f18922f Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 15 Dec 2023 14:46:55 -0500 Subject: [PATCH 09/15] ci: remove new connection variant --- .evergreen/config.yml | 35 -------------------------- .evergreen/generate_evergreen_tasks.js | 10 -------- 2 files changed, 45 deletions(-) diff --git a/.evergreen/config.yml b/.evergreen/config.yml index 5d746ed7641..c1056a6f0b4 100644 --- a/.evergreen/config.yml +++ b/.evergreen/config.yml @@ -4424,41 +4424,6 @@ buildvariants: - test-3.6-server-noauth - test-3.6-replica_set-noauth - test-3.6-sharded_cluster-noauth - - name: rhel8-new-connection-tests - display_name: New Connection Tests - run_on: rhel80-large - expansions: - NODE_LTS_VERSION: 20 - CLIENT_ENCRYPTION: true - tasks: - - test-latest-server - - test-latest-replica_set - - test-latest-sharded_cluster - - test-rapid-server - - test-rapid-replica_set - - test-rapid-sharded_cluster - - test-7.0-server - - test-7.0-replica_set - - test-7.0-sharded_cluster - - test-6.0-server - - test-6.0-replica_set - - test-6.0-sharded_cluster - - test-5.0-server - - test-5.0-replica_set - - test-5.0-sharded_cluster - - test-4.4-server - - test-4.4-replica_set - - test-4.4-sharded_cluster - - test-4.2-server - - test-4.2-replica_set - - test-4.2-sharded_cluster - - test-4.0-server - - test-4.0-replica_set - - test-4.0-sharded_cluster - - test-3.6-server - - test-3.6-replica_set - - test-3.6-sharded_cluster - - test-latest-server-v1-api - name: rhel8-test-lambda display_name: AWS Lambda handler tests run_on: rhel80-large diff --git a/.evergreen/generate_evergreen_tasks.js b/.evergreen/generate_evergreen_tasks.js index 36609de7fcf..89d2eb7c889 100644 --- a/.evergreen/generate_evergreen_tasks.js +++ b/.evergreen/generate_evergreen_tasks.js @@ -738,16 +738,6 @@ BUILD_VARIANTS.push({ tasks: AUTH_DISABLED_TASKS.map(({ name }) => name) }); -BUILD_VARIANTS.push({ - name: 'rhel8-new-connection-tests', - display_name: 'New Connection Tests', - run_on: DEFAULT_OS, - expansions: { - NODE_LTS_VERSION: LATEST_LTS, - CLIENT_ENCRYPTION: true, - }, - tasks: BASE_TASKS.map(({ name }) => name) -}); BUILD_VARIANTS.push({ name: 'rhel8-test-lambda', From 8c5d2beca236abd4599c96c45f95c0f89a75b20b Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 15 Dec 2023 14:48:10 -0500 Subject: [PATCH 10/15] chore: remove last modern connection references --- test/benchmarks/driverBench/common.js | 7 +------ test/benchmarks/driverBench/index.js | 1 - test/tools/runner/config.ts | 1 - 3 files changed, 1 insertion(+), 8 deletions(-) diff --git a/test/benchmarks/driverBench/common.js b/test/benchmarks/driverBench/common.js index b41ab406343..bb5b48babfd 100644 --- a/test/benchmarks/driverBench/common.js +++ b/test/benchmarks/driverBench/common.js @@ -6,9 +6,6 @@ const { Readable } = require('stream'); const { pipeline } = require('stream/promises'); const { MongoClient } = require('../../..'); const { GridFSBucket } = require('../../..'); -// eslint-disable-next-line no-restricted-modules -const { ModernConnection } = require('../../../lib/cmap/connection'); - // eslint-disable-next-line no-restricted-modules const { MONGODB_ERROR_CODES } = require('../../../lib/error'); @@ -27,9 +24,7 @@ function loadSpecString(filePath) { } function makeClient() { - this.client = new MongoClient(process.env.MONGODB_URI || 'mongodb://127.0.0.1:27017', { - connectionType: ModernConnection - }); + this.client = new MongoClient(process.env.MONGODB_URI || 'mongodb://127.0.0.1:27017'); } function connectClient() { diff --git a/test/benchmarks/driverBench/index.js b/test/benchmarks/driverBench/index.js index fd8f79ffd50..11d24cabf27 100644 --- a/test/benchmarks/driverBench/index.js +++ b/test/benchmarks/driverBench/index.js @@ -18,7 +18,6 @@ const platform = { name: hw[0].model, cores: hw.length, ram: `${ram}GB` }; const systemInfo = () => [ - `ModernConnection`, `\n- cpu: ${platform.name}`, `- cores: ${platform.cores}`, `- arch: ${os.arch()}`, diff --git a/test/tools/runner/config.ts b/test/tools/runner/config.ts index 7f60a627e2e..b24a9e77d87 100644 --- a/test/tools/runner/config.ts +++ b/test/tools/runner/config.ts @@ -6,7 +6,6 @@ import * as url from 'url'; import { type AuthMechanism, HostAddress, - ModernConnection, MongoClient, type ServerApi, TopologyType, From c542c9593d854c352bb11757781cc58ecfd960b9 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Tue, 19 Dec 2023 14:40:17 -0500 Subject: [PATCH 11/15] chore: fix serverConnectionId --- src/cmap/connection.ts | 43 ++++++++++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 664d6747341..904709d7604 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -443,7 +443,10 @@ export class Connection extends TypedEventEmitter { let started = 0; if (this.monitorCommands) { started = now(); - this.emit(Connection.COMMAND_STARTED, new CommandStartedEvent(this, message)); + this.emit( + Connection.COMMAND_STARTED, + new CommandStartedEvent(this, message, this.description.serverConnectionId) + ); } let document; @@ -469,7 +472,7 @@ export class Connection extends TypedEventEmitter { message, options.noResponse ? undefined : document, started, - this[kDescription].serverConnectionId + this.description.serverConnectionId ) ); } @@ -479,21 +482,29 @@ export class Connection extends TypedEventEmitter { } } catch (error) { if (this.monitorCommands) { - error.name === 'MongoWriteConcernError' - ? this.emit( - Connection.COMMAND_SUCCEEDED, - new CommandSucceededEvent( - this, - message, - options.noResponse ? undefined : document, - started, - this[kDescription].serverConnectionId - ) + if (error.name === 'MongoWriteConcernError') { + this.emit( + Connection.COMMAND_SUCCEEDED, + new CommandSucceededEvent( + this, + message, + options.noResponse ? undefined : document, + started, + this.description.serverConnectionId ) - : this.emit( - Connection.COMMAND_FAILED, - new CommandFailedEvent(this, message, error, started) - ); + ); + } else { + this.emit( + Connection.COMMAND_FAILED, + new CommandFailedEvent( + this, + message, + error, + started, + this.description.serverConnectionId + ) + ); + } } throw error; } From 8c19b28c211229d4a625de2f842afc500ae67228 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Tue, 19 Dec 2023 14:58:03 -0500 Subject: [PATCH 12/15] chore: fix lint and remove exhaust test --- src/index.ts | 1 + .../connection.test.ts | 62 +------------------ 2 files changed, 2 insertions(+), 61 deletions(-) diff --git a/src/index.ts b/src/index.ts index 82d702b5c44..a0f93947432 100644 --- a/src/index.ts +++ b/src/index.ts @@ -264,6 +264,7 @@ export type { export type { LEGAL_TCP_SOCKET_OPTIONS, LEGAL_TLS_SOCKET_OPTIONS, Stream } from './cmap/connect'; export type { CommandOptions, + Connection, ConnectionEvents, ConnectionOptions, DestroyOptions, diff --git a/test/integration/connection-monitoring-and-pooling/connection.test.ts b/test/integration/connection-monitoring-and-pooling/connection.test.ts index c5ca5509018..3c46cfe38e1 100644 --- a/test/integration/connection-monitoring-and-pooling/connection.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection.test.ts @@ -31,7 +31,7 @@ describe('Connection', function () { return setupDatabase(this.configuration); }); - describe('Connection - functional/cmap', function () { + describe('Connection.command', function () { it('should execute a command against a server', { metadata: { requires: { apiVersion: false, topology: '!load-balanced' } }, test: async function () { @@ -79,66 +79,6 @@ describe('Connection', function () { } } }); - - it('should support calling back multiple times on exhaust commands', { - metadata: { - requires: { apiVersion: false, mongodb: '>=4.2.0', topology: ['single'] } - }, - test: function (done) { - const namespace = ns(`${this.configuration.db}.$cmd`); - const connectOptions: Partial = { - connectionType: Connection, - ...this.configuration.options, - metadata: makeClientMetadata({ driverInfo: {} }) - }; - - connect(connectOptions as any as ConnectionOptions, (err, conn) => { - expect(err).to.not.exist; - this.defer(_done => conn.destroy(_done)); - - const documents = Array.from(Array(10000), (_, idx) => ({ - test: Math.floor(Math.random() * idx) - })); - - conn.command(namespace, { drop: 'test' }, undefined, () => { - conn.command(namespace, { insert: 'test', documents }, undefined, (err, res) => { - expect(err).to.not.exist; - expect(res).nested.property('n').to.equal(documents.length); - - let totalDocumentsRead = 0; - conn.command( - namespace, - { find: 'test', batchSize: 100 }, - undefined, - (err, result) => { - expect(err).to.not.exist; - expect(result).nested.property('cursor').to.exist; - const cursor = result.cursor; - totalDocumentsRead += cursor.firstBatch.length; - - conn.command( - namespace, - { getMore: cursor.id, collection: 'test', batchSize: 100 }, - { exhaustAllowed: true }, - (err, result) => { - expect(err).to.not.exist; - expect(result).nested.property('cursor').to.exist; - const cursor = result.cursor; - totalDocumentsRead += cursor.nextBatch.length; - - if (cursor.id === 0 || cursor.id.isZero()) { - expect(totalDocumentsRead).to.equal(documents.length); - done(); - } - } - ); - } - ); - }); - }); - }); - } - }); }); describe('Connection - functional', function () { From 11bf89e99e3e468368abd51add7fcf79e05fb161 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Tue, 19 Dec 2023 16:01:37 -0500 Subject: [PATCH 13/15] fix: dupe command function --- .../unit/assorted/server_discovery_and_monitoring.spec.test.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts b/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts index 6c5e2a34c9e..0d627d65cfa 100644 --- a/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts +++ b/test/unit/assorted/server_discovery_and_monitoring.spec.test.ts @@ -423,9 +423,6 @@ function withConnectionStubImpl(appError) { } else { throw new MongoServerError(appError.response); } - }, - async command(ns, cmd, options) { - return this.command(ns, cmd, options); } }; From 11259dd3187fd1ab62fafa3a9f4d6fcfe2608c96 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 20 Dec 2023 11:24:20 -0500 Subject: [PATCH 14/15] test: fixes --- test/unit/cmap/message_stream.test.ts | 155 -------------------------- test/unit/sdam/monitor.test.ts | 2 +- 2 files changed, 1 insertion(+), 156 deletions(-) delete mode 100644 test/unit/cmap/message_stream.test.ts diff --git a/test/unit/cmap/message_stream.test.ts b/test/unit/cmap/message_stream.test.ts deleted file mode 100644 index 3887d1a4f1e..00000000000 --- a/test/unit/cmap/message_stream.test.ts +++ /dev/null @@ -1,155 +0,0 @@ -import { expect } from 'chai'; -import { on, once } from 'events'; -import { Readable, Writable } from 'stream'; - -import { LEGACY_HELLO_COMMAND, MessageStream, OpMsgRequest } from '../../mongodb'; -import { bufferToStream, generateOpMsgBuffer } from '../../tools/utils'; - -describe('MessageStream', function () { - context('when the stream is for a monitoring connection', function () { - const response = { isWritablePrimary: true }; - const lastResponse = { ok: 1 }; - let firstHello; - let secondHello; - let thirdHello; - let partial; - - beforeEach(function () { - firstHello = generateOpMsgBuffer(response); - secondHello = generateOpMsgBuffer(response); - thirdHello = generateOpMsgBuffer(lastResponse); - partial = Buffer.alloc(5); - partial.writeInt32LE(100, 0); - }); - - it('only reads the last message in the buffer', async function () { - const inputStream = bufferToStream(Buffer.concat([firstHello, secondHello, thirdHello])); - const messageStream = new MessageStream(); - messageStream.isMonitoringConnection = true; - - inputStream.pipe(messageStream); - const messages = await once(messageStream, 'message'); - const msg = messages[0]; - msg.parse(); - expect(msg).to.have.property('documents').that.deep.equals([lastResponse]); - // Make sure there is nothing left in the buffer. - expect(messageStream.buffer.length).to.equal(0); - }); - - it('does not read partial messages', async function () { - const inputStream = bufferToStream( - Buffer.concat([firstHello, secondHello, thirdHello, partial]) - ); - const messageStream = new MessageStream(); - messageStream.isMonitoringConnection = true; - - inputStream.pipe(messageStream); - const messages = await once(messageStream, 'message'); - const msg = messages[0]; - msg.parse(); - expect(msg).to.have.property('documents').that.deep.equals([lastResponse]); - // Make sure the buffer wasn't read to the end. - expect(messageStream.buffer.length).to.equal(5); - }); - }); - - context('when the stream is not for a monitoring connection', function () { - context('when the messages are valid', function () { - const response = { isWritablePrimary: true }; - let firstHello; - let secondHello; - let thirdHello; - let messageCount = 0; - - beforeEach(function () { - firstHello = generateOpMsgBuffer(response); - secondHello = generateOpMsgBuffer(response); - thirdHello = generateOpMsgBuffer(response); - }); - - it('reads all messages in the buffer', async function () { - const inputStream = bufferToStream(Buffer.concat([firstHello, secondHello, thirdHello])); - const messageStream = new MessageStream(); - - inputStream.pipe(messageStream); - for await (const messages of on(messageStream, 'message')) { - messageCount++; - const msg = messages[0]; - msg.parse(); - expect(msg).to.have.property('documents').that.deep.equals([response]); - // Test will not complete until 3 messages processed. - if (messageCount === 3) { - return; - } - } - }); - }); - - context('when the messages are invalid', function () { - context('when the message size is negative', function () { - it('emits an error', async function () { - const inputStream = bufferToStream(Buffer.from('ffffffff', 'hex')); - const messageStream = new MessageStream(); - - inputStream.pipe(messageStream); - const errors = await once(messageStream, 'error'); - const err = errors[0]; - expect(err).to.have.property('message').that.equals('Invalid message size: -1'); - }); - }); - - context('when the message size exceeds the bson maximum', function () { - it('emits an error', async function () { - const inputStream = bufferToStream(Buffer.from('01000004', 'hex')); - const messageStream = new MessageStream(); - - inputStream.pipe(messageStream); - const errors = await once(messageStream, 'error'); - const err = errors[0]; - expect(err) - .to.have.property('message') - .that.equals('Invalid message size: 67108865, max allowed: 67108864'); - }); - }); - }); - }); - - context('when writing to the message stream', function () { - it('pushes the message', function (done) { - const readableStream = new Readable({ - read() { - // ignore - } - }); - const writeableStream = new Writable({ - write: (chunk, _, callback) => { - readableStream.push(chunk); - callback(); - } - }); - - readableStream.on('data', data => { - expect(data.toString('hex')).to.eql( - '370000000300000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' - ); - - done(); - }); - - const messageStream = new MessageStream(); - messageStream.pipe(writeableStream); - - const command = new OpMsgRequest('admin', { [LEGACY_HELLO_COMMAND]: 1 }, { requestId: 3 }); - messageStream.writeCommand(command, { - started: 0, - command: true, - noResponse: false, - raw: false, - requestId: command.requestId, - cb: err => { - done(err); - } - }); - }); - }); -}); diff --git a/test/unit/sdam/monitor.test.ts b/test/unit/sdam/monitor.test.ts index a652601274c..caf2cff17c0 100644 --- a/test/unit/sdam/monitor.test.ts +++ b/test/unit/sdam/monitor.test.ts @@ -160,7 +160,7 @@ describe('monitoring', function () { monitor.on('serverHeartbeatFailed', () => done(new Error('unexpected heartbeat failure'))); monitor.on('serverHeartbeatSucceeded', () => { - expect(monitor.connection.isMonitoringConnection).to.be.true; + expect(monitor.connection).to.have.property('id', ''); done(); }); monitor.connect(); From b37706b0d5ea19b60bff03c92a49c6bec7286bbe Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 20 Dec 2023 13:35:23 -0500 Subject: [PATCH 15/15] fix: unit tests --- test/unit/cmap/connect.test.ts | 3 +- test/unit/cmap/connection.test.ts | 957 +------------------------ test/unit/cmap/connection_pool.test.js | 33 +- 3 files changed, 44 insertions(+), 949 deletions(-) diff --git a/test/unit/cmap/connect.test.ts b/test/unit/cmap/connect.test.ts index 1638456078a..39ae57d0deb 100644 --- a/test/unit/cmap/connect.test.ts +++ b/test/unit/cmap/connect.test.ts @@ -145,7 +145,8 @@ describe('Connect Tests', function () { }); it('creates a connection with an infinite timeout', async () => { - expect(connection.stream).to.have.property('timeout', 0); + // @ts-expect-error: accessing private property + expect(connection.socket).to.have.property('timeout', 0); }); it('connection instance has property socketTimeoutMS equal to the value passed in the connectOptions', async () => { diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts index 93d6bef065a..04194d47b36 100644 --- a/test/unit/cmap/connection.test.ts +++ b/test/unit/cmap/connection.test.ts @@ -1,30 +1,18 @@ import { expect } from 'chai'; -import { EventEmitter, once } from 'events'; -import { Socket } from 'net'; import * as sinon from 'sinon'; -import { Readable } from 'stream'; -import { setTimeout } from 'timers'; import { promisify } from 'util'; import { - type ClientMetadata, - connect, + connect as driverConnectCb, Connection, - hasSessionSupport, - type HostAddress, isHello, - type MessageHeader, - type MessageStream, - MongoNetworkError, MongoNetworkTimeoutError, - MongoRuntimeError, - ns, - type OperationDescription, - OpMsgResponse + ns } from '../../mongodb'; import * as mock from '../../tools/mongodb-mock/index'; -import { generateOpMsgBuffer, getSymbolFrom } from '../../tools/utils'; -import { createTimerSandbox } from '../timer_sandbox'; +import { getSymbolFrom } from '../../tools/utils'; + +const connect = promisify(driverConnectCb); const connectionOptionsDefaults = { id: 0, @@ -35,77 +23,19 @@ const connectionOptionsDefaults = { loadBalanced: false }; -/** - * The absolute minimum socket API needed by these tests - * - * The driver has a greater API requirement for sockets detailed in: NODE-4785 - */ -class FakeSocket extends EventEmitter { - destroyed = false; - writableEnded: boolean; - timeout = 0; - address() { - // is never called - } - pipe() { - // does not need to do anything - } - destroy() { - // is called, has no side effects - this.writableEnded = true; - this.destroyed = true; - } - end(cb) { - this.writableEnded = true; - // nextTick to simulate I/O delay - if (typeof cb === 'function') { - process.nextTick(cb); - } - } - get remoteAddress() { - return 'iLoveJavaScript'; - } - get remotePort() { - return 123; - } - setTimeout(timeout) { - this.timeout = timeout; - } -} - -class InputStream extends Readable { - writableEnded: boolean; - timeout = 0; - - constructor(options?) { - super(options); - } - - end(cb) { - this.writableEnded = true; - if (typeof cb === 'function') { - process.nextTick(cb); - } - } - - setTimeout(timeout) { - this.timeout = timeout; - } -} - describe('new Connection()', function () { let server; after(() => mock.cleanup()); before(() => mock.createServer().then(s => (server = s))); - it('supports fire-and-forget messages', function (done) { + it('supports fire-and-forget messages', async function () { server.setMessageHandler(request => { const doc = request.document; if (isHello(doc)) { request.reply(mock.HELLO); } - // blackhole all other requests + // black hole all other requests }); const options = { @@ -114,27 +44,20 @@ describe('new Connection()', function () { hostAddress: server.hostAddress() }; - connect(options, (err, conn) => { - expect(err).to.not.exist; - expect(conn).to.exist; - - conn.command(ns('$admin.cmd'), { ping: 1 }, { noResponse: true }, (err, result) => { - expect(err).to.not.exist; - expect(result).to.not.exist; - - done(); - }); - }); + const conn = await connect(options); + const readSpy = sinon.spy(conn, 'readMany'); + await conn.command(ns('$admin.cmd'), { ping: 1 }, { noResponse: true }); + expect(readSpy).to.not.have.been.called; }); - it('destroys streams which time out', function (done) { + it('destroys streams which time out', async function () { server.setMessageHandler(request => { const doc = request.document; if (isHello(doc)) { request.reply(mock.HELLO); } - // blackhole all other requests + // black hole all other requests }); const options = { @@ -143,22 +66,15 @@ describe('new Connection()', function () { hostAddress: server.hostAddress() }; - connect(options, (err, conn) => { - expect(err).to.not.exist; - expect(conn).to.exist; - - conn.command(ns('$admin.cmd'), { ping: 1 }, { socketTimeoutMS: 50 }, (err, result) => { - expect(err).to.be.instanceOf(MongoNetworkTimeoutError); - expect(result).to.not.exist; - - expect(conn).property('stream').property('destroyed', true); - - done(); - }); - }); + const conn = await connect(options); + const error = await conn + .command(ns('$admin.cmd'), { ping: 1 }, { socketTimeoutMS: 50 }) + .catch(error => error); + expect(error).to.be.instanceOf(MongoNetworkTimeoutError); + expect(conn).property('socket').property('destroyed', true); }); - it('throws a network error with kBeforeHandshake set to false on timeout after handshake', function (done) { + it('throws a network error with kBeforeHandshake set to false on timeout after handshake', async function () { server.setMessageHandler(request => { const doc = request.document; if (isHello(doc)) { @@ -172,19 +88,15 @@ describe('new Connection()', function () { ...connectionOptionsDefaults }; - connect(options, (err, conn) => { - expect(err).to.be.a('undefined'); - expect(conn).to.be.instanceOf(Connection); - expect(conn).to.have.property('hello').that.is.a('object'); + const conn = await connect(options); - conn.command(ns('$admin.cmd'), { ping: 1 }, { socketTimeoutMS: 50 }, err => { - const beforeHandshakeSymbol = getSymbolFrom(err, 'beforeHandshake', false); - expect(beforeHandshakeSymbol).to.be.a('symbol'); - expect(err).to.have.property(beforeHandshakeSymbol, false); + const error = await conn + .command(ns('$admin.cmd'), { ping: 1 }, { socketTimeoutMS: 50 }) + .catch(error => error); - done(); - }); - }); + const beforeHandshakeSymbol = getSymbolFrom(error, 'beforeHandshake', false); + expect(beforeHandshakeSymbol).to.be.a('symbol'); + expect(error).to.have.property(beforeHandshakeSymbol, false); }); it('calls the command function through command', async function () { @@ -201,15 +113,14 @@ describe('new Connection()', function () { hostAddress: server.hostAddress() }; - const connectAsync = promisify(connect); - const connection: Connection = await connectAsync(options); + const connection = await connect(options); const commandSpy = sinon.spy(connection, 'command'); await connection.command(ns('dummy'), { ping: 1 }, {}); expect(commandSpy).to.have.been.calledOnce; }); - it('throws a network error with kBeforeHandshake set to true on timeout before handshake', function (done) { + it('throws a network error with kBeforeHandshake set to true on timeout before handshake', async function () { server.setMessageHandler(() => { // respond to no requests to trigger timeout event }); @@ -220,812 +131,10 @@ describe('new Connection()', function () { socketTimeoutMS: 50 }; - connect(options, (err, conn) => { - expect(conn).to.be.a('undefined'); - - const beforeHandshakeSymbol = getSymbolFrom(err, 'beforeHandshake'); - expect(err).to.have.property(beforeHandshakeSymbol, true); - - done(); - }); - }); - - describe('#onMessage', function () { - context('when the connection is a monitoring connection', function () { - let queue: Map; - let driverSocket: FakeSocket; - let connection: Connection; - - beforeEach(function () { - driverSocket = sinon.spy(new FakeSocket()); - }); - - context('when multiple hellos exist on the stream', function () { - let callbackSpy; - const inputStream = new InputStream(); - const document = { ok: 1 }; - const last = { isWritablePrimary: true }; - - beforeEach(function () { - callbackSpy = sinon.spy(); - const firstHello = generateOpMsgBuffer(document); - const secondHello = generateOpMsgBuffer(document); - const thirdHello = generateOpMsgBuffer(last); - const buffer = Buffer.concat([firstHello, secondHello, thirdHello]); - - connection = sinon.spy(new Connection(inputStream, connectionOptionsDefaults)); - connection.isMonitoringConnection = true; - const queueSymbol = getSymbolFrom(connection, 'queue'); - queue = connection[queueSymbol]; - - // Create the operation description. - const operationDescription: OperationDescription = { - requestId: 1, - cb: callbackSpy - }; - - // Stick an operation description in the queue. - queue.set(1, operationDescription); - - // Push the buffer of 3 hellos to the input stream - inputStream.push(buffer); - inputStream.push(null); - }); - - it('calls the callback with the last hello document', async function () { - const messages = await once(connection, 'message'); - expect(messages[0].responseTo).to.equal(0); - expect(callbackSpy).to.be.calledOnceWith(undefined, last); - }); - }); - - context('when requestId/responseTo do not match', function () { - let callbackSpy; - const document = { ok: 1 }; - - beforeEach(function () { - callbackSpy = sinon.spy(); - - // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay - connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); - connection.isMonitoringConnection = true; - const queueSymbol = getSymbolFrom(connection, 'queue'); - queue = connection[queueSymbol]; - - // Create the operation description. - const operationDescription: OperationDescription = { - requestId: 1, - cb: callbackSpy - }; - - // Stick an operation description in the queue. - queue.set(1, operationDescription); - // Emit a message that won't match the existing operation description. - const msg = generateOpMsgBuffer(document); - const msgHeader: MessageHeader = { - length: msg.readInt32LE(0), - requestId: 1, - responseTo: 0, // This will not match. - opCode: msg.readInt32LE(12) - }; - const msgBody = msg.subarray(16); - - const message = new OpMsgResponse(msg, msgHeader, msgBody); - connection.onMessage(message); - }); - - it('calls the operation description callback with the document', function () { - expect(callbackSpy).to.be.calledOnceWith(undefined, document); - }); - }); - - context('when requestId/reponseTo match', function () { - let callbackSpy; - const document = { ok: 1 }; - - beforeEach(function () { - callbackSpy = sinon.spy(); - - // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay - connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); - connection.isMonitoringConnection = true; - const queueSymbol = getSymbolFrom(connection, 'queue'); - queue = connection[queueSymbol]; - - // Create the operation description. - const operationDescription: OperationDescription = { - requestId: 1, - cb: callbackSpy - }; - - // Stick an operation description in the queue. - queue.set(1, operationDescription); - // Emit a message that matches the existing operation description. - const msg = generateOpMsgBuffer(document); - const msgHeader: MessageHeader = { - length: msg.readInt32LE(0), - requestId: 2, - responseTo: 1, - opCode: msg.readInt32LE(12) - }; - const msgBody = msg.subarray(16); - - const message = new OpMsgResponse(msg, msgHeader, msgBody); - connection.onMessage(message); - }); - - it('calls the operation description callback with the document', function () { - expect(callbackSpy).to.be.calledOnceWith(undefined, document); - }); - }); - - context('when no operation description is in the queue', function () { - const document = { ok: 1 }; - - beforeEach(function () { - // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay - connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); - connection.isMonitoringConnection = true; - const queueSymbol = getSymbolFrom(connection, 'queue'); - queue = connection[queueSymbol]; - }); - - it('does not error', function () { - const msg = generateOpMsgBuffer(document); - const msgHeader: MessageHeader = { - length: msg.readInt32LE(0), - requestId: 2, - responseTo: 1, - opCode: msg.readInt32LE(12) - }; - const msgBody = msg.subarray(16); - - const message = new OpMsgResponse(msg, msgHeader, msgBody); - expect(() => { - connection.onMessage(message); - }).to.not.throw(); - }); - }); - - context('when more than one operation description is in the queue', function () { - let spyOne; - let spyTwo; - const document = { ok: 1 }; - - beforeEach(function () { - spyOne = sinon.spy(); - spyTwo = sinon.spy(); - - // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay - connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); - connection.isMonitoringConnection = true; - const queueSymbol = getSymbolFrom(connection, 'queue'); - queue = connection[queueSymbol]; - - // Create the operation descriptions. - const descriptionOne: OperationDescription = { - requestId: 1, - cb: spyOne - }; - const descriptionTwo: OperationDescription = { - requestId: 2, - cb: spyTwo - }; - - // Stick an operation description in the queue. - queue.set(2, descriptionOne); - queue.set(3, descriptionTwo); - // Emit a message that matches the existing operation description. - const msg = generateOpMsgBuffer(document); - const msgHeader: MessageHeader = { - length: msg.readInt32LE(0), - requestId: 2, - responseTo: 1, - opCode: msg.readInt32LE(12) - }; - const msgBody = msg.subarray(16); - - const message = new OpMsgResponse(msg, msgHeader, msgBody); - connection.onMessage(message); - }); - - it('calls all operation description callbacks with an error', function () { - expect(spyOne).to.be.calledOnce; - expect(spyTwo).to.be.calledOnce; - const errorOne = spyOne.firstCall.args[0]; - const errorTwo = spyTwo.firstCall.args[0]; - expect(errorOne).to.be.instanceof(MongoRuntimeError); - expect(errorTwo).to.be.instanceof(MongoRuntimeError); - }); - }); - }); - - context('when sending commands on a connection', () => { - const CONNECT_DEFAULTS = { - id: 1, - tls: false, - generation: 1, - monitorCommands: false, - metadata: {} as ClientMetadata, - loadBalanced: false - }; - let server; - let connectOptions; - let connection: Connection; - let streamSetTimeoutSpy; - - beforeEach(async () => { - server = await mock.createServer(); - server.setMessageHandler(request => { - if (isHello(request.document)) { - request.reply(mock.HELLO); - } - }); - connectOptions = { - ...CONNECT_DEFAULTS, - hostAddress: server.hostAddress() as HostAddress, - socketTimeoutMS: 15000 - }; - - connection = await promisify(callback => - //@ts-expect-error: Callbacks do not have mutual exclusion for error/result existence - connect(connectOptions, callback) - )(); - - streamSetTimeoutSpy = sinon.spy(connection.stream, 'setTimeout'); - }); - - afterEach(async () => { - connection.destroy({ force: true }); - sinon.restore(); - await mock.cleanup(); - }); - - it('sets timeout specified on class before writing to the socket', async () => { - await promisify(callback => - connection.command(ns('admin.$cmd'), { hello: 1 }, {}, callback) - )(); - expect(streamSetTimeoutSpy).to.have.been.calledWith(15000); - }); - - it('sets timeout specified on options before writing to the socket', async () => { - await promisify(callback => - connection.command(ns('admin.$cmd'), { hello: 1 }, { socketTimeoutMS: 2000 }, callback) - )(); - expect(streamSetTimeoutSpy).to.have.been.calledWith(2000); - }); - - it('clears timeout after getting a message if moreToCome=false', async () => { - connection.stream.setTimeout(1); - const msg = generateOpMsgBuffer({ hello: 1 }); - const msgHeader = { - length: msg.readInt32LE(0), - requestId: 1, - responseTo: 0, - opCode: msg.readInt32LE(12) - }; - const msgBody = msg.subarray(16); - msgBody.writeInt32LE(0, 0); // OPTS_MORE_TO_COME - connection.onMessage(new OpMsgResponse(msg, msgHeader, msgBody)); - // timeout is still reset - expect(connection.stream).to.have.property('timeout', 0); - }); - - it('does not clear timeout after getting a message if moreToCome=true', async () => { - connection.stream.setTimeout(1); - const msg = generateOpMsgBuffer({ hello: 1 }); - const msgHeader = { - length: msg.readInt32LE(0), - requestId: 1, - responseTo: 0, - opCode: msg.readInt32LE(12) - }; - const msgBody = msg.subarray(16); - msgBody.writeInt32LE(2, 0); // OPTS_MORE_TO_COME - connection[getSymbolFrom(connection, 'queue')].set(0, { cb: () => null }); - connection.onMessage(new OpMsgResponse(msg, msgHeader, msgBody)); - // timeout is still set - expect(connection.stream).to.have.property('timeout', 1); - }); - }); - }); - - describe('when the socket times out', () => { - let connection: sinon.SinonSpiedInstance; - let clock: sinon.SinonFakeTimers; - let timerSandbox: sinon.SinonFakeTimers; - let driverSocket: FakeSocket; - let messageStream: MessageStream; - let callbackSpy; - let closeCount = 0; - let kDelayedTimeoutId; - let NodeJSTimeoutClass: any; - - beforeEach(() => { - timerSandbox = createTimerSandbox(); - clock = sinon.useFakeTimers({ - toFake: ['nextTick', 'setTimeout', 'clearTimeout'] - }); - driverSocket = new FakeSocket(); - // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay - connection = new Connection(driverSocket, connectionOptionsDefaults); - const messageStreamSymbol = getSymbolFrom(connection, 'messageStream'); - messageStream = connection[messageStreamSymbol]; - - callbackSpy = sinon.spy(); - // Create the operation description. - const operationDescription: OperationDescription = { - requestId: 1, - cb: callbackSpy - }; - - connection.on('close', () => { - closeCount++; - }); - connection.once(Connection.PINNED, () => { - /* no-op */ - }); - connection.once(Connection.UNPINNED, () => { - /* no-op */ - }); - - // Stick an operation description in the queue. - const queueSymbol = getSymbolFrom(connection, 'queue'); - connection[queueSymbol].set(1, operationDescription); - - kDelayedTimeoutId = getSymbolFrom(connection, 'delayedTimeoutId'); - NodeJSTimeoutClass = setTimeout(() => null, 1).constructor; - }); - - afterEach(() => { - closeCount = 0; - sinon.restore(); - timerSandbox.restore(); - clock.restore(); - }); - - context('delayed timeout for lambda behavior', () => { - let cleanupSpy; - let timeoutSpy; - beforeEach(() => { - cleanupSpy = sinon.spy(connection, 'cleanup'); - timeoutSpy = sinon.spy(connection, 'onTimeout'); - }); - - it('delays timeout errors by one tick', async () => { - expect(connection).to.have.property(kDelayedTimeoutId, null); - - driverSocket.emit('timeout'); - expect(cleanupSpy).to.not.have.been.called; - expect(connection) - .to.have.property(kDelayedTimeoutId) - .that.is.instanceOf(NodeJSTimeoutClass); - expect(connection).to.have.property('closed', false); - - clock.tick(1); - - expect(cleanupSpy).to.have.been.calledOnce; - expect(connection).to.have.property('closed', true); - }); - - it('clears timeout errors if more data is available', () => { - expect(connection).to.have.property(kDelayedTimeoutId, null); - - driverSocket.emit('timeout'); - expect(timeoutSpy).to.have.been.calledOnce; - expect(cleanupSpy).not.to.have.been.called; - expect(connection) - .to.have.property(kDelayedTimeoutId) - .that.is.instanceOf(NodeJSTimeoutClass); - - // emit a message before the clock ticks even once - // onMessage ignores unknown 'responseTo' value - messageStream.emit('message', { responseTo: null }); - - // New message before clock ticks 1 will clear the timeout - expect(connection).to.have.property(kDelayedTimeoutId, null); - - // ticking the clock should do nothing, there is no timeout anymore - clock.tick(1); - - expect(cleanupSpy).not.to.have.been.called; - expect(connection).to.have.property('closed', false); - expect(connection).to.have.property(kDelayedTimeoutId, null); - }); - }); - - context('when the timeout expires and no more data has come in', () => { - beforeEach(() => { - driverSocket.emit('timeout'); - clock.tick(1); - }); - - it('destroys the MessageStream', () => { - expect(messageStream.destroyed).to.be.true; - }); - - it('ends the socket', () => { - expect(driverSocket.writableEnded).to.be.true; - }); - - it('destroys the socket after ending it', () => { - expect(driverSocket.destroyed).to.be.true; - }); - - it('passes the error along to any callbacks in the operation description queue (asynchronously)', () => { - expect(callbackSpy).to.have.been.calledOnce; - const error = callbackSpy.firstCall.args[0]; - expect(error).to.be.instanceof(MongoNetworkTimeoutError); - }); - - it('emits a Connection.CLOSE event (asynchronously)', () => { - expect(closeCount).to.equal(1); - }); - - it('does NOT remove all Connection.PINNED listeners', () => { - expect(connection.listenerCount(Connection.PINNED)).to.equal(1); - }); - - it('does NOT remove all Connection.UNPINNED listeners', () => { - expect(connection.listenerCount(Connection.UNPINNED)).to.equal(1); - }); + const error = await connect(options).catch(error => error); - it('removes all listeners on the MessageStream', () => { - expect(messageStream.eventNames()).to.have.lengthOf(0); - }); - - it('removes all listeners on the socket', () => { - expect(driverSocket.eventNames()).to.have.lengthOf(0); - }); - }); - }); - - describe('when the MessageStream errors', () => { - let connection: sinon.SinonSpiedInstance; - let clock: sinon.SinonFakeTimers; - let timerSandbox: sinon.SinonFakeTimers; - let driverSocket: FakeSocket; - let messageStream: MessageStream; - let callbackSpy; - const error = new Error('something went wrong'); - let closeCount = 0; - - beforeEach(() => { - timerSandbox = createTimerSandbox(); - clock = sinon.useFakeTimers({ - toFake: ['nextTick'] - }); - driverSocket = new FakeSocket(); - // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay - connection = new Connection(driverSocket, connectionOptionsDefaults); - const messageStreamSymbol = getSymbolFrom(connection, 'messageStream'); - messageStream = connection[messageStreamSymbol]; - - callbackSpy = sinon.spy(); - // Create the operation description. - const operationDescription: OperationDescription = { - requestId: 1, - cb: callbackSpy - }; - - connection.on('close', () => { - closeCount++; - }); - - connection.once(Connection.PINNED, () => { - /* no-op */ - }); - connection.once(Connection.UNPINNED, () => { - /* no-op */ - }); - - // Stick an operation description in the queue. - const queueSymbol = getSymbolFrom(connection, 'queue'); - connection[queueSymbol].set(1, operationDescription); - - messageStream.emit('error', error); - }); - - afterEach(() => { - closeCount = 0; - sinon.restore(); - timerSandbox.restore(); - clock.restore(); - }); - - it('destroys the MessageStream synchronously', () => { - expect(messageStream.destroyed).to.be.true; - }); - - it('ends the socket', () => { - expect(driverSocket.writableEnded).to.be.true; - }); - - it('destroys the socket after ending it (synchronously)', () => { - expect(driverSocket.destroyed).to.be.true; - }); - - it('passes the error along to any callbacks in the operation description queue (synchronously)', () => { - expect(callbackSpy).to.have.been.calledOnceWithExactly(error); - }); - - it('emits a Connection.CLOSE event (synchronously)', () => { - expect(closeCount).to.equal(1); - }); - - it('does NOT remove all Connection.PINNED listeners', () => { - expect(connection.listenerCount(Connection.PINNED)).to.equal(1); - }); - - it('does NOT remove all Connection.UNPINNED listeners', () => { - expect(connection.listenerCount(Connection.UNPINNED)).to.equal(1); - }); - - it('removes all listeners on the MessageStream', () => { - expect(messageStream.eventNames()).to.have.lengthOf(0); - }); - - it('removes all listeners on the socket', () => { - expect(driverSocket.eventNames()).to.have.lengthOf(0); - }); - }); - - describe('when the underlying socket closes', () => { - let connection: sinon.SinonSpiedInstance; - let clock: sinon.SinonFakeTimers; - let timerSandbox: sinon.SinonFakeTimers; - let driverSocket: FakeSocket; - let messageStream: MessageStream; - let callbackSpy; - let closeCount = 0; - - beforeEach(() => { - timerSandbox = createTimerSandbox(); - clock = sinon.useFakeTimers({ - toFake: ['nextTick'] - }); - driverSocket = new FakeSocket(); - // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay - connection = new Connection(driverSocket, connectionOptionsDefaults); - const messageStreamSymbol = getSymbolFrom(connection, 'messageStream'); - messageStream = connection[messageStreamSymbol]; - - callbackSpy = sinon.spy(); - // Create the operation description. - const operationDescription: OperationDescription = { - requestId: 1, - cb: callbackSpy - }; - - connection.on('close', () => { - closeCount++; - }); - - connection.once(Connection.PINNED, () => { - /* no-op */ - }); - connection.once(Connection.UNPINNED, () => { - /* no-op */ - }); - - // Stick an operation description in the queue. - const queueSymbol = getSymbolFrom(connection, 'queue'); - connection[queueSymbol].set(1, operationDescription); - - driverSocket.emit('close'); - }); - - afterEach(() => { - closeCount = 0; - sinon.restore(); - timerSandbox.restore(); - clock.restore(); - }); - - it('destroys the MessageStream synchronously', () => { - expect(messageStream.destroyed).to.be.true; - }); - - it('ends the socket', () => { - expect(driverSocket.writableEnded).to.be.true; - }); - - it('destroys the socket after ending it (synchronously)', () => { - expect(driverSocket.destroyed).to.be.true; - }); - - it('calls any callbacks in the queue with a MongoNetworkError (synchronously)', () => { - expect(callbackSpy).to.have.been.calledOnce; - const error = callbackSpy.firstCall.args[0]; - expect(error).to.be.instanceof(MongoNetworkError); - }); - - it('emits a Connection.CLOSE event (synchronously)', () => { - expect(closeCount).to.equal(1); - }); - - it('does NOT remove all Connection.PINNED listeners', () => { - expect(connection.listenerCount(Connection.PINNED)).to.equal(1); - }); - - it('does NOT remove all Connection.UNPINNED listeners', () => { - expect(connection.listenerCount(Connection.UNPINNED)).to.equal(1); - }); - - it('removes all listeners on the MessageStream', () => { - expect(messageStream.eventNames()).to.have.lengthOf(0); - }); - - it('removes all listeners on the socket', () => { - expect(driverSocket.eventNames()).to.have.lengthOf(0); - }); - }); - - describe('.hasSessionSupport', function () { - let connection; - const stream = new Socket(); - - context('when logicalSessionTimeoutMinutes is present', function () { - beforeEach(function () { - const options = { - ...connectionOptionsDefaults, - hostAddress: server.hostAddress(), - logicalSessionTimeoutMinutes: 5 - }; - connection = new Connection(stream, options); - }); - - it('returns true', function () { - expect(hasSessionSupport(connection)).to.be.true; - }); - }); - - context('when logicalSessionTimeoutMinutes is not present', function () { - beforeEach(function () { - const options = { - ...connectionOptionsDefaults, - hostAddress: server.hostAddress() - }; - connection = new Connection(stream, options); - }); - - it('returns false', function () { - expect(hasSessionSupport(connection)).to.be.false; - }); - }); - }); - - describe('destroy()', () => { - let connection: sinon.SinonSpiedInstance; - let clock: sinon.SinonFakeTimers; - let timerSandbox: sinon.SinonFakeTimers; - let driverSocket: sinon.SinonSpiedInstance; - let messageStream: MessageStream; - beforeEach(() => { - timerSandbox = createTimerSandbox(); - clock = sinon.useFakeTimers({ - toFake: ['nextTick'] - }); - - driverSocket = sinon.spy(new FakeSocket()); - // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay - connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); - const messageStreamSymbol = getSymbolFrom(connection, 'messageStream'); - messageStream = sinon.spy(connection[messageStreamSymbol]); - }); - - afterEach(() => { - timerSandbox.restore(); - clock.restore(); - }); - - it('removes all Connection.PINNED listeners', () => { - connection.once(Connection.PINNED, () => { - /* no-op */ - }); - connection.destroy({ force: true }); - expect(connection.listenerCount(Connection.PINNED)).to.equal(0); - }); - - it('removes all Connection.UNPINNED listeners', () => { - connection.once(Connection.UNPINNED, () => { - /* no-op */ - }); - connection.destroy({ force: true }); - expect(connection.listenerCount(Connection.UNPINNED)).to.equal(0); - }); - - context('when a callback is provided', () => { - context('when the connection was already destroyed', () => { - let callbackSpy; - beforeEach(() => { - connection.destroy({ force: true }); - connection.cleanup.resetHistory(); - callbackSpy = sinon.spy(); - }); - it('does not attempt to cleanup the socket again', () => { - connection.destroy({ force: true }, callbackSpy); - expect(connection.cleanup).not.to.have.been.called; - }); - - it('calls the callback (asynchronously)', () => { - connection.destroy({ force: true }, callbackSpy); - expect(callbackSpy).not.to.have.been.called; - clock.runAll(); - expect(callbackSpy).to.have.been.called; - }); - }); - - context('when the connection was not destroyed', () => { - let callbackSpy; - beforeEach(() => { - callbackSpy = sinon.spy(); - }); - it('cleans up the connection', () => { - connection.destroy({ force: true }, callbackSpy); - expect(connection.cleanup).to.have.been.called; - }); - - it('calls the callback (asynchronously)', () => { - connection.destroy({ force: true }, callbackSpy); - expect(callbackSpy).not.to.have.been.called; - clock.runAll(); - expect(callbackSpy).to.have.been.called; - }); - }); - }); - - context('when options.force == true', function () { - it('destroys the tcp socket (synchronously)', () => { - expect(driverSocket.destroy).not.to.have.been.called; - connection.destroy({ force: true }); - expect(driverSocket.destroy).to.have.been.calledOnce; - }); - - it('does not call stream.end', () => { - connection.destroy({ force: true }); - expect(driverSocket.end).to.not.have.been.called; - }); - - it('destroys the messageStream (synchronously)', () => { - connection.destroy({ force: true }); - expect(messageStream.destroy).to.have.been.calledOnce; - }); - - it('when destroy({ force: true }) is called multiple times, it calls stream.destroy exactly once', () => { - connection.destroy({ force: true }); - connection.destroy({ force: true }); - connection.destroy({ force: true }); - expect(driverSocket.destroy).to.have.been.calledOnce; - }); - }); - - context('when options.force == false', function () { - it('destroys the tcp socket (asynchronously)', () => { - connection.destroy({ force: false }); - expect(driverSocket.destroy).not.to.have.been.called; - clock.tick(1); - expect(driverSocket.destroy).to.have.been.called; - }); - - it('ends the tcp socket (synchronously)', () => { - connection.destroy({ force: false }); - expect(driverSocket.end).to.have.been.calledOnce; - }); - - it('destroys the messageStream (synchronously)', () => { - connection.destroy({ force: false }); - expect(messageStream.destroy).to.have.been.calledOnce; - }); - - it('calls stream.end exactly once when destroy is called multiple times', () => { - connection.destroy({ force: false }); - connection.destroy({ force: false }); - connection.destroy({ force: false }); - connection.destroy({ force: false }); - clock.tick(1); - expect(driverSocket.end).to.have.been.calledOnce; - }); - }); + const beforeHandshakeSymbol = getSymbolFrom(error, 'beforeHandshake', false); + expect(beforeHandshakeSymbol).to.be.a('symbol'); + expect(error).to.have.property(beforeHandshakeSymbol, true); }); }); diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index ecfb8fb7692..9acf50e50af 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -6,6 +6,7 @@ const mock = require('../../tools/mongodb-mock/index'); const sinon = require('sinon'); const { expect } = require('chai'); const { setImmediate } = require('timers'); +const { promisify } = require('util'); const { ns, isHello } = require('../../mongodb'); const { LEGACY_HELLO_COMMAND } = require('../../mongodb'); const { createTimerSandbox } = require('../timer_sandbox'); @@ -32,7 +33,7 @@ describe('Connection Pool', function () { }) ); - it('should destroy connections which have been closed', function (done) { + it('should destroy connections which have been closed', async function () { mockMongod.setMessageHandler(request => { const doc = request.document; if (isHello(doc)) { @@ -52,31 +53,15 @@ describe('Connection Pool', function () { const events = []; pool.on('connectionClosed', event => events.push(event)); - pool.checkOut((err, conn) => { - expect(err).to.not.exist; - - conn.command(ns('admin.$cmd'), { ping: 1 }, undefined, (err, result) => { - expect(err).to.exist; - expect(result).to.not.exist; - - pool.checkIn(conn); + const conn = await promisify(pool.checkOut.bind(pool))(); + const error = await conn.command(ns('admin.$cmd'), { ping: 1 }, {}).catch(error => error); - expect(events).to.have.length(1); - const closeEvent = events[0]; - expect(closeEvent).have.property('reason').equal('error'); - }); - }); + expect(error).to.be.instanceOf(Error); + pool.checkIn(conn); - pool.withConnection( - undefined, - (err, conn, cb) => { - expect(err).to.not.exist; - cb(); - }, - () => { - pool.close(done); - } - ); + expect(events).to.have.length(1); + const closeEvent = events[0]; + expect(closeEvent).have.property('reason').equal('error'); }); it('should propagate socket timeouts to connections', function (done) {