From 7b1d9bd33836ab480796328be9979566602d56a2 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Tue, 8 Sep 2020 08:15:07 -0400 Subject: [PATCH 1/4] fix: allow event loop to process during wait queue processing Running `processWaitQueue` on the next tick allows the event loop to process while the connection pool is processing large numbers of wait queue members. NODE-2803 --- lib/cmap/connection_pool.js | 12 +++--------- test/unit/cmap/connection_pool.test.js | 10 ++++++---- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/lib/cmap/connection_pool.js b/lib/cmap/connection_pool.js index 21e463a37aa..e4cfcee5f25 100644 --- a/lib/cmap/connection_pool.js +++ b/lib/cmap/connection_pool.js @@ -218,7 +218,6 @@ class ConnectionPool extends EventEmitter { return; } - // add this request to the wait queue const waitQueueMember = { callback }; const pool = this; @@ -233,11 +232,8 @@ class ConnectionPool extends EventEmitter { }, waitQueueTimeoutMS); } - // place the member at the end of the wait queue this[kWaitQueue].push(waitQueueMember); - - // process the wait queue - processWaitQueue(this); + process.nextTick(() => processWaitQueue(this)); } /** @@ -250,10 +246,8 @@ class ConnectionPool extends EventEmitter { const stale = connectionIsStale(this, connection); const willDestroy = !!(poolClosed || stale || connection.closed); - // Properly adjust state of connection if (!willDestroy) { connection.markAvailable(); - this[kConnections].push(connection); } @@ -264,7 +258,7 @@ class ConnectionPool extends EventEmitter { destroyConnection(this, connection, reason); } - processWaitQueue(this); + process.nextTick(() => processWaitQueue(this)); } /** @@ -434,7 +428,7 @@ function createConnection(pool, callback) { // otherwise add it to the pool for later acquisition, and try to process the wait queue pool[kConnections].push(connection); - processWaitQueue(pool); + process.nextTick(() => processWaitQueue(pool)); }); } diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index 94cb7a3b33b..281f8670a5c 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -145,11 +145,13 @@ describe('Connection Pool', function() { sinon.stub(pool, 'availableConnectionCount').get(() => 0); pool.checkIn(conn); - expect(pool) - .property('waitQueueSize') - .to.equal(0); + process.nextTick(() => { + expect(pool) + .property('waitQueueSize') + .to.equal(0); - done(); + done(); + }); }); }); }); From e340b8f9d6584281797f78342d9e73efabf844e3 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Wed, 9 Sep 2020 09:02:21 -0400 Subject: [PATCH 2/4] correct timing issues around transactions `endSession` must wait for all the machinery behind the scenes to check out a connection and write a message before considering its job finished. --- lib/core/sessions.js | 64 +++++++++++++--------------- test/functional/spec-runner/index.js | 11 ++--- 2 files changed, 35 insertions(+), 40 deletions(-) diff --git a/lib/core/sessions.js b/lib/core/sessions.js index ad7cc7267c6..abc8d945188 100644 --- a/lib/core/sessions.js +++ b/lib/core/sessions.js @@ -13,6 +13,7 @@ const Transaction = require('./transactions').Transaction; const TxnState = require('./transactions').TxnState; const isPromiseLike = require('./utils').isPromiseLike; const ReadPreference = require('./topologies/read_preference'); +const maybePromise = require('../utils').maybePromise; const isTransactionCommand = require('./transactions').isTransactionCommand; const resolveClusterTime = require('./topologies/shared').resolveClusterTime; const isSharded = require('./wireprotocol/shared').isSharded; @@ -125,25 +126,36 @@ class ClientSession extends EventEmitter { if (typeof options === 'function') (callback = options), (options = {}); options = options || {}; - if (this.hasEnded) { - if (typeof callback === 'function') callback(null, null); - return; - } + const session = this; + return maybePromise(this, callback, done => { + if (session.hasEnded) { + return done(); + } - if (this.serverSession && this.inTransaction()) { - this.abortTransaction(); // pass in callback? - } + function completeEndSession() { + // release the server session back to the pool + session.sessionPool.release(session.serverSession); + session[kServerSession] = undefined; - // release the server session back to the pool - this.sessionPool.release(this.serverSession); - this[kServerSession] = undefined; + // mark the session as ended, and emit a signal + session.hasEnded = true; + session.emit('ended', session); + + // spec indicates that we should ignore all errors for `endSessions` + done(); + } + + if (session.serverSession && session.inTransaction()) { + session.abortTransaction(err => { + if (err) return done(err); + completeEndSession(); + }); - // mark the session as ended, and emit a signal - this.hasEnded = true; - this.emit('ended', this); + return; + } - // spec indicates that we should ignore all errors for `endSessions` - if (typeof callback === 'function') callback(null, null); + completeEndSession(); + }); } /** @@ -227,16 +239,7 @@ class ClientSession extends EventEmitter { * @return {Promise} A promise is returned if no callback is provided */ commitTransaction(callback) { - if (typeof callback === 'function') { - endTransaction(this, 'commitTransaction', callback); - return; - } - - return new Promise((resolve, reject) => { - endTransaction(this, 'commitTransaction', (err, reply) => - err ? reject(err) : resolve(reply) - ); - }); + return maybePromise(this, callback, done => endTransaction(this, 'commitTransaction', done)); } /** @@ -246,16 +249,7 @@ class ClientSession extends EventEmitter { * @return {Promise} A promise is returned if no callback is provided */ abortTransaction(callback) { - if (typeof callback === 'function') { - endTransaction(this, 'abortTransaction', callback); - return; - } - - return new Promise((resolve, reject) => { - endTransaction(this, 'abortTransaction', (err, reply) => - err ? reject(err) : resolve(reply) - ); - }); + return maybePromise(this, callback, done => endTransaction(this, 'abortTransaction', done)); } /** diff --git a/test/functional/spec-runner/index.js b/test/functional/spec-runner/index.js index 8e045bb4fdc..1a8e818ce06 100644 --- a/test/functional/spec-runner/index.js +++ b/test/functional/spec-runner/index.js @@ -347,11 +347,12 @@ function runTestSuiteTest(configuration, spec, context) { throw err; }) .then(() => { - if (session0) session0.endSession(); - if (session1) session1.endSession(); - - return validateExpectations(context.commandEvents, spec, savedSessionData); - }); + const promises = []; + if (session0) promises.push(session0.endSession()); + if (session1) promises.push(session1.endSession()); + return Promise.all(promises); + }) + .then(() => validateExpectations(context.commandEvents, spec, savedSessionData)); }); } From 6b21b5ae4b09d7c18c25293a3604655a5547ba57 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Wed, 9 Sep 2020 11:01:20 -0400 Subject: [PATCH 3/4] `process.nextTick` => `setImmediate` --- lib/cmap/connection_pool.js | 8 ++++---- test/unit/cmap/connection_pool.test.js | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/cmap/connection_pool.js b/lib/cmap/connection_pool.js index e4cfcee5f25..a335c450714 100644 --- a/lib/cmap/connection_pool.js +++ b/lib/cmap/connection_pool.js @@ -233,7 +233,7 @@ class ConnectionPool extends EventEmitter { } this[kWaitQueue].push(waitQueueMember); - process.nextTick(() => processWaitQueue(this)); + setImmediate(() => processWaitQueue(this)); } /** @@ -258,7 +258,7 @@ class ConnectionPool extends EventEmitter { destroyConnection(this, connection, reason); } - process.nextTick(() => processWaitQueue(this)); + setImmediate(() => processWaitQueue(this)); } /** @@ -428,7 +428,7 @@ function createConnection(pool, callback) { // otherwise add it to the pool for later acquisition, and try to process the wait queue pool[kConnections].push(connection); - process.nextTick(() => processWaitQueue(pool)); + setImmediate(() => processWaitQueue(pool)); }); } @@ -439,7 +439,7 @@ function destroyConnection(pool, connection, reason) { pool[kPermits]++; // destroy the connection - process.nextTick(() => connection.destroy()); + setImmediate(() => connection.destroy()); } function processWaitQueue(pool) { diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index 281f8670a5c..2a15125ed54 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -145,7 +145,7 @@ describe('Connection Pool', function() { sinon.stub(pool, 'availableConnectionCount').get(() => 0); pool.checkIn(conn); - process.nextTick(() => { + setImmediate(() => { expect(pool) .property('waitQueueSize') .to.equal(0); From 7d7312775a19c6a31ba2f2b84956eeb7e41d7666 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Wed, 9 Sep 2020 16:36:25 -0400 Subject: [PATCH 4/4] ensure we wait for kill before marking cursor notified --- lib/core/cursor.js | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/lib/core/cursor.js b/lib/core/cursor.js index f0d651a8d17..8cb3357db49 100644 --- a/lib/core/cursor.js +++ b/lib/core/cursor.js @@ -745,9 +745,10 @@ function nextFunction(self, callback) { if (self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) { // Ensure we kill the cursor on the server - self.kill(); - // Set cursor in dead and notified state - return setCursorDeadAndNotified(self, callback); + self.kill(() => + // Set cursor in dead and notified state + setCursorDeadAndNotified(self, callback) + ); } else if ( self.cursorState.cursorIndex === self.cursorState.documents.length && !Long.ZERO.equals(self.cursorState.cursorId) @@ -827,9 +828,12 @@ function nextFunction(self, callback) { } else { if (self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) { // Ensure we kill the cursor on the server - self.kill(); - // Set cursor in dead and notified state - return setCursorDeadAndNotified(self, callback); + self.kill(() => + // Set cursor in dead and notified state + setCursorDeadAndNotified(self, callback) + ); + + return; } // Increment the current cursor limit @@ -841,11 +845,14 @@ function nextFunction(self, callback) { // Doc overflow if (!doc || doc.$err) { // Ensure we kill the cursor on the server - self.kill(); - // Set cursor in dead and notified state - return setCursorDeadAndNotified(self, function() { - handleCallback(callback, new MongoError(doc ? doc.$err : undefined)); - }); + self.kill(() => + // Set cursor in dead and notified state + setCursorDeadAndNotified(self, function() { + handleCallback(callback, new MongoError(doc ? doc.$err : undefined)); + }) + ); + + return; } // Transform the doc with passed in transformation method if provided