diff --git a/lib/cmap/connection_pool.js b/lib/cmap/connection_pool.js index 21e463a37aa..a335c450714 100644 --- a/lib/cmap/connection_pool.js +++ b/lib/cmap/connection_pool.js @@ -218,7 +218,6 @@ class ConnectionPool extends EventEmitter { return; } - // add this request to the wait queue const waitQueueMember = { callback }; const pool = this; @@ -233,11 +232,8 @@ class ConnectionPool extends EventEmitter { }, waitQueueTimeoutMS); } - // place the member at the end of the wait queue this[kWaitQueue].push(waitQueueMember); - - // process the wait queue - processWaitQueue(this); + setImmediate(() => processWaitQueue(this)); } /** @@ -250,10 +246,8 @@ class ConnectionPool extends EventEmitter { const stale = connectionIsStale(this, connection); const willDestroy = !!(poolClosed || stale || connection.closed); - // Properly adjust state of connection if (!willDestroy) { connection.markAvailable(); - this[kConnections].push(connection); } @@ -264,7 +258,7 @@ class ConnectionPool extends EventEmitter { destroyConnection(this, connection, reason); } - processWaitQueue(this); + setImmediate(() => processWaitQueue(this)); } /** @@ -434,7 +428,7 @@ function createConnection(pool, callback) { // otherwise add it to the pool for later acquisition, and try to process the wait queue pool[kConnections].push(connection); - processWaitQueue(pool); + setImmediate(() => processWaitQueue(pool)); }); } @@ -445,7 +439,7 @@ function destroyConnection(pool, connection, reason) { pool[kPermits]++; // destroy the connection - process.nextTick(() => connection.destroy()); + setImmediate(() => connection.destroy()); } function processWaitQueue(pool) { diff --git a/lib/core/cursor.js b/lib/core/cursor.js index f0d651a8d17..8cb3357db49 100644 --- a/lib/core/cursor.js +++ b/lib/core/cursor.js @@ -745,9 +745,10 @@ function nextFunction(self, callback) { if (self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) { // Ensure we kill the cursor on the server - self.kill(); - // Set cursor in dead and notified state - return setCursorDeadAndNotified(self, callback); + self.kill(() => + // Set cursor in dead and notified state + setCursorDeadAndNotified(self, callback) + ); } else if ( self.cursorState.cursorIndex === self.cursorState.documents.length && !Long.ZERO.equals(self.cursorState.cursorId) @@ -827,9 +828,12 @@ function nextFunction(self, callback) { } else { if (self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) { // Ensure we kill the cursor on the server - self.kill(); - // Set cursor in dead and notified state - return setCursorDeadAndNotified(self, callback); + self.kill(() => + // Set cursor in dead and notified state + setCursorDeadAndNotified(self, callback) + ); + + return; } // Increment the current cursor limit @@ -841,11 +845,14 @@ function nextFunction(self, callback) { // Doc overflow if (!doc || doc.$err) { // Ensure we kill the cursor on the server - self.kill(); - // Set cursor in dead and notified state - return setCursorDeadAndNotified(self, function() { - handleCallback(callback, new MongoError(doc ? doc.$err : undefined)); - }); + self.kill(() => + // Set cursor in dead and notified state + setCursorDeadAndNotified(self, function() { + handleCallback(callback, new MongoError(doc ? doc.$err : undefined)); + }) + ); + + return; } // Transform the doc with passed in transformation method if provided diff --git a/lib/core/sessions.js b/lib/core/sessions.js index ad7cc7267c6..abc8d945188 100644 --- a/lib/core/sessions.js +++ b/lib/core/sessions.js @@ -13,6 +13,7 @@ const Transaction = require('./transactions').Transaction; const TxnState = require('./transactions').TxnState; const isPromiseLike = require('./utils').isPromiseLike; const ReadPreference = require('./topologies/read_preference'); +const maybePromise = require('../utils').maybePromise; const isTransactionCommand = require('./transactions').isTransactionCommand; const resolveClusterTime = require('./topologies/shared').resolveClusterTime; const isSharded = require('./wireprotocol/shared').isSharded; @@ -125,25 +126,36 @@ class ClientSession extends EventEmitter { if (typeof options === 'function') (callback = options), (options = {}); options = options || {}; - if (this.hasEnded) { - if (typeof callback === 'function') callback(null, null); - return; - } + const session = this; + return maybePromise(this, callback, done => { + if (session.hasEnded) { + return done(); + } - if (this.serverSession && this.inTransaction()) { - this.abortTransaction(); // pass in callback? - } + function completeEndSession() { + // release the server session back to the pool + session.sessionPool.release(session.serverSession); + session[kServerSession] = undefined; - // release the server session back to the pool - this.sessionPool.release(this.serverSession); - this[kServerSession] = undefined; + // mark the session as ended, and emit a signal + session.hasEnded = true; + session.emit('ended', session); + + // spec indicates that we should ignore all errors for `endSessions` + done(); + } + + if (session.serverSession && session.inTransaction()) { + session.abortTransaction(err => { + if (err) return done(err); + completeEndSession(); + }); - // mark the session as ended, and emit a signal - this.hasEnded = true; - this.emit('ended', this); + return; + } - // spec indicates that we should ignore all errors for `endSessions` - if (typeof callback === 'function') callback(null, null); + completeEndSession(); + }); } /** @@ -227,16 +239,7 @@ class ClientSession extends EventEmitter { * @return {Promise} A promise is returned if no callback is provided */ commitTransaction(callback) { - if (typeof callback === 'function') { - endTransaction(this, 'commitTransaction', callback); - return; - } - - return new Promise((resolve, reject) => { - endTransaction(this, 'commitTransaction', (err, reply) => - err ? reject(err) : resolve(reply) - ); - }); + return maybePromise(this, callback, done => endTransaction(this, 'commitTransaction', done)); } /** @@ -246,16 +249,7 @@ class ClientSession extends EventEmitter { * @return {Promise} A promise is returned if no callback is provided */ abortTransaction(callback) { - if (typeof callback === 'function') { - endTransaction(this, 'abortTransaction', callback); - return; - } - - return new Promise((resolve, reject) => { - endTransaction(this, 'abortTransaction', (err, reply) => - err ? reject(err) : resolve(reply) - ); - }); + return maybePromise(this, callback, done => endTransaction(this, 'abortTransaction', done)); } /** diff --git a/test/functional/spec-runner/index.js b/test/functional/spec-runner/index.js index 8e045bb4fdc..1a8e818ce06 100644 --- a/test/functional/spec-runner/index.js +++ b/test/functional/spec-runner/index.js @@ -347,11 +347,12 @@ function runTestSuiteTest(configuration, spec, context) { throw err; }) .then(() => { - if (session0) session0.endSession(); - if (session1) session1.endSession(); - - return validateExpectations(context.commandEvents, spec, savedSessionData); - }); + const promises = []; + if (session0) promises.push(session0.endSession()); + if (session1) promises.push(session1.endSession()); + return Promise.all(promises); + }) + .then(() => validateExpectations(context.commandEvents, spec, savedSessionData)); }); } diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index 94cb7a3b33b..2a15125ed54 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -145,11 +145,13 @@ describe('Connection Pool', function() { sinon.stub(pool, 'availableConnectionCount').get(() => 0); pool.checkIn(conn); - expect(pool) - .property('waitQueueSize') - .to.equal(0); + setImmediate(() => { + expect(pool) + .property('waitQueueSize') + .to.equal(0); - done(); + done(); + }); }); }); });