From 7abdeeec12944d47a594bff8fab01e1d803d69f9 Mon Sep 17 00:00:00 2001 From: emadum Date: Fri, 8 May 2020 10:50:22 -0400 Subject: [PATCH 01/19] add tests for failure case --- test/functional/change_stream.test.js | 105 ++++++++++++++++++++++++-- 1 file changed, 98 insertions(+), 7 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 020de3d6c44..af91a91898a 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -18,17 +18,19 @@ chai.use(require('chai-subset')); * Triggers a fake resumable error on a change stream * * @param {ChangeStream} changeStream - * @param {function} onCursorClosed callback when cursor closed due this error + * @param {Function} onCursorClosed callback when cursor closed due this error */ function triggerResumableError(changeStream, onCursorClosed) { - const closeCursor = changeStream.cursor.close; + const closeCursor = changeStream.cursor.close.bind(changeStream.cursor); changeStream.cursor.close = callback => { - onCursorClosed(); - changeStream.cursor.close = closeCursor; - changeStream.cursor.close(callback); + closeCursor(err => { + callback && callback(err); + onCursorClosed(); + }); }; const fakeResumableError = new MongoNetworkError('fake error'); - changeStream.cursor.emit('error', fakeResumableError); + // delay error slightly to better simulate real conditions + setTimeout(() => changeStream.cursor.emit('error', fakeResumableError), 250); } /** @@ -88,7 +90,8 @@ function tryNext(changeStream, callback) { * empty batch into a returned array of events. * * @param {ChangeStream} changeStream - * @param {function} callback + * @param {Function|Array} bag + * @param {Function} [callback] */ function exhaust(changeStream, bag, callback) { if (typeof bag === 'function') { @@ -2850,3 +2853,91 @@ describe('Change Streams', function() { }); }); }); + +describe('Change Stream Resume Error Tests', function() { + function withChangeStream(callback) { + return function(done) { + const configuration = this.configuration; + const client = configuration.newClient(); + client.connect(err => { + expect(err).to.not.exist; + const db = client.db('test'); + db.createCollection('changeStreamResumeErrorTest', (err, collection) => { + expect(err).to.not.exist; + const changeStream = collection.watch(); + callback(collection, changeStream, () => + changeStream.close(() => collection.drop(() => client.close(done))) + ); + }); + }); + }; + } + it('(events) should continue iterating after a resumable error', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: withChangeStream((collection, changeStream, done) => { + const docs = []; + changeStream.on('change', change => { + expect(change).to.exist; + docs.push(change); + if (docs.length === 2) { + expect(docs[0]).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 42 } + }); + expect(docs[1]).to.containSubset({ + operationType: 'insert', + fullDocument: { b: 24 } + }); + done(); + } + }); + waitForStarted(changeStream, () => { + collection.insertOne({ a: 42 }, err => { + expect(err).to.not.exist; + triggerResumableError(changeStream, () => { + collection.insertOne({ b: 24 }, err => { + expect(err).to.not.exist; + }); + }); + }); + }); + }) + }); + + it('(callback) hasNext and next should work after a resumable error', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: withChangeStream((collection, changeStream, done) => { + waitForStarted(changeStream, () => { + collection.insertOne({ a: 42 }, err => { + expect(err).to.not.exist; + triggerResumableError(changeStream, () => { + changeStream.hasNext((err1, hasNext) => { + expect(err1).to.not.exist; + expect(hasNext).to.be.true; + changeStream.next((err, change) => { + expect(err).to.not.exist; + expect(change).to.containSubset({ + operationType: 'insert', + fullDocument: { b: 24 } + }); + done(); + }); + }); + collection.insertOne({ b: 24 }); + }); + }); + }); + changeStream.hasNext((err, hasNext) => { + expect(err).to.not.exist; + expect(hasNext).to.be.true; + changeStream.next((err, change) => { + expect(err).to.not.exist; + expect(change).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 42 } + }); + }); + }); + }) + }); +}); From fdf6271fd20f6ae2d2075ec0e4d082502e0a26eb Mon Sep 17 00:00:00 2001 From: emadum Date: Fri, 8 May 2020 11:43:38 -0400 Subject: [PATCH 02/19] use getCursor to safely get cursor across resume attempt --- lib/change_stream.js | 82 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 64 insertions(+), 18 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 635d009ec2b..ac5882a4ef4 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -1,5 +1,6 @@ 'use strict'; +const Denque = require('denque'); const EventEmitter = require('events'); const isResumableError = require('./error').isResumableError; const MongoError = require('./core').MongoError; @@ -9,6 +10,8 @@ const maxWireVersion = require('./core/utils').maxWireVersion; const maybePromise = require('./utils').maybePromise; const AggregateOperation = require('./operations/aggregate'); +const kResumeQueue = Symbol('resumeQueue'); + const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument']; const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat( CHANGE_STREAM_OPTIONS @@ -91,6 +94,8 @@ class ChangeStream extends EventEmitter { this.options.readPreference = parent.s.readPreference; } + this[kResumeQueue] = new Denque(); + // Create contained Change Stream cursor this.cursor = createChangeStreamCursor(this, options); @@ -130,7 +135,12 @@ class ChangeStream extends EventEmitter { * @returns {Promise|void} returns Promise if no callback passed */ hasNext(callback) { - return maybePromise(this.parent, callback, cb => this.cursor.hasNext(cb)); + return maybePromise(this.parent, callback, cb => { + getCursor(this, (err, cursor) => { + if (err) return cb(err); + cursor.hasNext(cb); + }); + }); } /** @@ -142,11 +152,11 @@ class ChangeStream extends EventEmitter { */ next(callback) { return maybePromise(this.parent, callback, cb => { - if (this.isClosed()) { - return cb(new MongoError('ChangeStream is closed')); - } - this.cursor.next((error, change) => { - processNewChange({ changeStream: this, error, change, callback: cb }); + getCursor(this, (err, cursor) => { + if (err) return cb(err); + cursor.next((error, change) => + processNewChange({ changeStream: this, error, change, callback: cb }) + ); }); }); } @@ -481,16 +491,15 @@ function processNewChange(args) { const wireVersion = maxWireVersion(cursor.server); if (error) { - if (isResumableError(error, wireVersion) && !changeStream.attemptingResume) { - changeStream.attemptingResume = true; + if (isResumableError(error, wireVersion)) { + const cursor = changeStream.cursor; + changeStream.cursor = null; // stop listening to all events from old cursor - ['data', 'close', 'end', 'error'].forEach(event => - changeStream.cursor.removeAllListeners(event) - ); + ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); // close internal cursor, ignore errors - changeStream.cursor.close(); + cursor.close(); waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { if (err) { @@ -501,12 +510,10 @@ function processNewChange(args) { changeStream.emit('close'); return; } - return callback(err); + return processResumeQueue(changeStream, err); } - changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); - if (eventEmitter) return; - changeStream.next(callback); + processResumeQueue(changeStream); }); return; } @@ -515,8 +522,6 @@ function processNewChange(args) { return callback(error); } - changeStream.attemptingResume = false; - if (change && !change._id) { const noResumeTokenError = new Error( 'A change stream document has been received that lacks a resume token (_id).' @@ -538,6 +543,47 @@ function processNewChange(args) { return callback(error, change); } +/** + * Safely provides a cursor across resume attempts + * + * @param {ChangeStream} changeStream the parent ChangeStream + * @param {function} callback gets the cursor or error + * @param {ChangeStreamCursor} [oldCursor] when resuming from an error, carry over options from previous cursor + */ +function getCursor(changeStream, callback) { + if (changeStream.isClosed()) { + callback(new MongoError('ChangeStream is closed.')); + return; + } + + // if a cursor exists and it is open, return it + if (changeStream.cursor) { + callback(undefined, changeStream.cursor); + return; + } + + // no cursor, queue callback until topology reconnects + changeStream[kResumeQueue].push(callback); +} + +/** + * Drain the resume queue when a new has become available + * + * @param {ChangeStream} changeStream the parent ChangeStream + * @param {?ChangeStreamCursor} changeStream.cursor the new cursor + * @param {?Error} err error getting a new cursor + */ +function processResumeQueue(changeStream, err) { + while (changeStream[kResumeQueue].length) { + if (changeStream.isClosed()) { + request(new MongoError('Change Stream is not open.')); + return; + } + const request = changeStream[kResumeQueue].pop(); + request(err, changeStream.cursor); + } +} + /** * The callback format for results * @callback ChangeStream~resultCallback From dd22829e386839aa95044daa2fb9ccd85ed2ce2c Mon Sep 17 00:00:00 2001 From: emadum Date: Thu, 30 Apr 2020 16:39:07 -0400 Subject: [PATCH 03/19] test: skip change stream tests running on mock single topology --- test/functional/change_stream.test.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index af91a91898a..f8999dcc14a 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -798,7 +798,7 @@ describe('Change Streams', function() { } }); - it('Should return MongoNetworkError after first retry attempt fails using promises', { + it.skip('Should return MongoNetworkError after first retry attempt fails using promises', { metadata: { requires: { generators: true, @@ -896,7 +896,7 @@ describe('Change Streams', function() { } }); - it('Should return MongoNetworkError after first retry attempt fails using callbacks', { + it.skip('Should return MongoNetworkError after first retry attempt fails using callbacks', { metadata: { requires: { generators: true, @@ -991,7 +991,7 @@ describe('Change Streams', function() { } }); - it('Should resume Change Stream when a resumable error is encountered', { + it.skip('Should resume Change Stream when a resumable error is encountered', { metadata: { requires: { generators: true, From 5c004a45d2b3c8c6c7c9c75c79692b8490cdef19 Mon Sep 17 00:00:00 2001 From: emadum Date: Fri, 8 May 2020 12:33:42 -0400 Subject: [PATCH 04/19] fix skipped tests --- lib/change_stream.js | 1 + test/functional/change_stream.test.js | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index ac5882a4ef4..1185dd59152 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -514,6 +514,7 @@ function processNewChange(args) { } changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); processResumeQueue(changeStream); + if (!eventEmitter) changeStream.cursor.next(callback); }); return; } diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index f8999dcc14a..af91a91898a 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -798,7 +798,7 @@ describe('Change Streams', function() { } }); - it.skip('Should return MongoNetworkError after first retry attempt fails using promises', { + it('Should return MongoNetworkError after first retry attempt fails using promises', { metadata: { requires: { generators: true, @@ -896,7 +896,7 @@ describe('Change Streams', function() { } }); - it.skip('Should return MongoNetworkError after first retry attempt fails using callbacks', { + it('Should return MongoNetworkError after first retry attempt fails using callbacks', { metadata: { requires: { generators: true, @@ -991,7 +991,7 @@ describe('Change Streams', function() { } }); - it.skip('Should resume Change Stream when a resumable error is encountered', { + it('Should resume Change Stream when a resumable error is encountered', { metadata: { requires: { generators: true, From 3d1dd4287be2b95ea4f3ebff5a7378a21b2a4da7 Mon Sep 17 00:00:00 2001 From: emadum Date: Fri, 8 May 2020 12:55:14 -0400 Subject: [PATCH 05/19] add optional delay to triggerResumableError --- test/functional/change_stream.test.js | 56 ++++++++++++++++----------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index af91a91898a..67cc9cd2eb3 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -19,8 +19,9 @@ chai.use(require('chai-subset')); * * @param {ChangeStream} changeStream * @param {Function} onCursorClosed callback when cursor closed due this error + * @param {number} [delay] optional delay before triggering error */ -function triggerResumableError(changeStream, onCursorClosed) { +function triggerResumableError(changeStream, onCursorClosed, delay) { const closeCursor = changeStream.cursor.close.bind(changeStream.cursor); changeStream.cursor.close = callback => { closeCursor(err => { @@ -30,7 +31,10 @@ function triggerResumableError(changeStream, onCursorClosed) { }; const fakeResumableError = new MongoNetworkError('fake error'); // delay error slightly to better simulate real conditions - setTimeout(() => changeStream.cursor.emit('error', fakeResumableError), 250); + if (delay) { + return setTimeout(() => changeStream.cursor.emit('error', fakeResumableError), delay); + } + changeStream.cursor.emit('error', fakeResumableError); } /** @@ -2800,12 +2804,16 @@ describe('Change Streams', function() { }); waitForStarted(changeStream, () => { - triggerResumableError(changeStream, () => { - events.push('error'); - coll.insertOne({ x: 2 }, { w: 'majority', j: true }, err => { - expect(err).to.not.exist; - }); - }); + triggerResumableError( + changeStream, + () => { + events.push('error'); + coll.insertOne({ x: 2 }, { w: 'majority', j: true }, err => { + expect(err).to.not.exist; + }); + }, + 250 + ); }); } }); @@ -2828,7 +2836,7 @@ describe('Change Streams', function() { case 2: // only events after this point are relevant to this test events = []; - triggerResumableError(changeStream, () => events.push('error')); + triggerResumableError(changeStream, () => events.push('error'), 0); break; case 3: expect(events) @@ -2910,21 +2918,25 @@ describe('Change Stream Resume Error Tests', function() { waitForStarted(changeStream, () => { collection.insertOne({ a: 42 }, err => { expect(err).to.not.exist; - triggerResumableError(changeStream, () => { - changeStream.hasNext((err1, hasNext) => { - expect(err1).to.not.exist; - expect(hasNext).to.be.true; - changeStream.next((err, change) => { - expect(err).to.not.exist; - expect(change).to.containSubset({ - operationType: 'insert', - fullDocument: { b: 24 } + triggerResumableError( + changeStream, + () => { + changeStream.hasNext((err1, hasNext) => { + expect(err1).to.not.exist; + expect(hasNext).to.be.true; + changeStream.next((err, change) => { + expect(err).to.not.exist; + expect(change).to.containSubset({ + operationType: 'insert', + fullDocument: { b: 24 } + }); + done(); }); - done(); }); - }); - collection.insertOne({ b: 24 }); - }); + collection.insertOne({ b: 24 }); + }, + 250 + ); }); }); changeStream.hasNext((err, hasNext) => { From 81f01204b7427478db5b642cfcb422e32c71ce53 Mon Sep 17 00:00:00 2001 From: emadum Date: Fri, 8 May 2020 14:00:31 -0400 Subject: [PATCH 06/19] fix --- lib/change_stream.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 1185dd59152..4f03b185eb3 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -514,7 +514,7 @@ function processNewChange(args) { } changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); processResumeQueue(changeStream); - if (!eventEmitter) changeStream.cursor.next(callback); + if (!eventEmitter) changeStream.next(callback); }); return; } From abb58e6464dd776947ef11b53d35212e07a02714 Mon Sep 17 00:00:00 2001 From: emadum Date: Fri, 8 May 2020 16:46:00 -0400 Subject: [PATCH 07/19] refactor: extract processError from processNewChange --- lib/change_stream.js | 131 +++++++++++++++----------- test/functional/change_stream.test.js | 33 +++---- 2 files changed, 92 insertions(+), 72 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 4f03b185eb3..600b6763fc0 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -104,9 +104,7 @@ class ChangeStream extends EventEmitter { // Listen for any `change` listeners being added to ChangeStream this.on('newListener', eventName => { if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) { - this.cursor.on('data', change => - processNewChange({ changeStream: this, change, eventEmitter: true }) - ); + this.cursor.on('data', change => processNewChange(this, change)); } }); @@ -154,9 +152,12 @@ class ChangeStream extends EventEmitter { return maybePromise(this.parent, callback, cb => { getCursor(this, (err, cursor) => { if (err) return cb(err); - cursor.next((error, change) => - processNewChange({ changeStream: this, error, change, callback: cb }) - ); + cursor.next((error, change) => { + if (error) { + return processError(this, error, cb); + } + processNewChange(this, change, cb); + }); }); }); } @@ -183,6 +184,8 @@ class ChangeStream extends EventEmitter { // flag the change stream as explicitly closed this.closed = true; + if (!this.cursor) return cb(); + // Tidy up the existing cursor const cursor = this.cursor; @@ -393,7 +396,7 @@ function createChangeStreamCursor(self, options) { */ if (self.listenerCount('change') > 0) { changeStreamCursor.on('data', function(change) { - processNewChange({ changeStream: self, change, eventEmitter: true }); + processNewChange(self, change); }); } @@ -425,7 +428,7 @@ function createChangeStreamCursor(self, options) { * @type {Error} */ changeStreamCursor.on('error', function(error) { - processNewChange({ changeStream: self, error, eventEmitter: true }); + processError(self, error); }); if (self.pipeDestinations) { @@ -467,12 +470,8 @@ function waitForTopologyConnected(topology, options, callback) { } // Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream. -function processNewChange(args) { - const changeStream = args.changeStream; - const error = args.error; - const change = args.change; - const callback = args.callback; - const eventEmitter = args.eventEmitter || false; +function processNewChange(changeStream, change, callback) { + const eventEmitter = typeof callback !== 'function'; const cursor = changeStream.cursor; // If the cursor is null or the change stream has been closed explictly, do not process a change. @@ -486,43 +485,6 @@ function processNewChange(args) { return; } - const topology = changeStream.topology; - const options = changeStream.cursor.options; - const wireVersion = maxWireVersion(cursor.server); - - if (error) { - if (isResumableError(error, wireVersion)) { - const cursor = changeStream.cursor; - changeStream.cursor = null; - - // stop listening to all events from old cursor - ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); - - // close internal cursor, ignore errors - cursor.close(); - - waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { - if (err) { - // if there's an error reconnecting, close the change stream - changeStream.closed = true; - if (eventEmitter) { - changeStream.emit('error', err); - changeStream.emit('close'); - return; - } - return processResumeQueue(changeStream, err); - } - changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); - processResumeQueue(changeStream); - if (!eventEmitter) changeStream.next(callback); - }); - return; - } - - if (eventEmitter) return changeStream.emit('error', error); - return callback(error); - } - if (change && !change._id) { const noResumeTokenError = new Error( 'A change stream document has been received that lacks a resume token (_id).' @@ -541,7 +503,68 @@ function processNewChange(args) { // Return the change if (eventEmitter) return changeStream.emit('change', change); - return callback(error, change); + return callback(undefined, change); +} + +function processError(changeStream, error, callback) { + const eventEmitter = typeof callback !== 'function'; + const topology = changeStream.topology; + const cursor = changeStream.cursor; + + // If the cursor is null or the change stream has been closed explictly, do not process a change. + if (cursor == null || changeStream.closed) { + // We do not error in the eventEmitter case. + changeStream.closed = true; + if (eventEmitter) { + return; + } + callback(new MongoError('ChangeStream is closed')); + return; + } + + function handleError(err) { + // if there's an error reconnecting, close the change stream + if (eventEmitter) { + changeStream.emit('error', err); + changeStream.emit('close'); + // return; + } + processResumeQueue(changeStream, err); + changeStream.closed = true; + } + + if (isResumableError(error, maxWireVersion(cursor.server))) { + changeStream.cursor = undefined; + + // stop listening to all events from old cursor + ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); + + // close internal cursor, ignore errors + cursor.close(); + + if (!eventEmitter) changeStream[kResumeQueue].push(() => changeStream.next(callback)); + + waitForTopologyConnected(topology, { readPreference: cursor.options.readPreference }, err => { + if (err) { + return handleError(err); + } + const newCursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); + if (eventEmitter) { + changeStream.cursor = newCursor; + processResumeQueue(changeStream); + return; + } + newCursor.hasNext(err => { + if (err) return handleError(err); + changeStream.cursor = newCursor; + processResumeQueue(changeStream); + }); + }); + return; + } + + if (eventEmitter) return changeStream.emit('error', error); + return callback(error); } /** @@ -576,11 +599,11 @@ function getCursor(changeStream, callback) { */ function processResumeQueue(changeStream, err) { while (changeStream[kResumeQueue].length) { - if (changeStream.isClosed()) { + const request = changeStream[kResumeQueue].pop(); + if (changeStream.isClosed() && !err) { request(new MongoError('Change Stream is not open.')); return; } - const request = changeStream[kResumeQueue].pop(); request(err, changeStream.cursor); } } diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 67cc9cd2eb3..931643c0b34 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -22,19 +22,23 @@ chai.use(require('chai-subset')); * @param {number} [delay] optional delay before triggering error */ function triggerResumableError(changeStream, onCursorClosed, delay) { + let cbCalled = false; const closeCursor = changeStream.cursor.close.bind(changeStream.cursor); changeStream.cursor.close = callback => { closeCursor(err => { callback && callback(err); + if (cbCalled) return; + cbCalled = true; onCursorClosed(); }); }; const fakeResumableError = new MongoNetworkError('fake error'); // delay error slightly to better simulate real conditions if (delay) { - return setTimeout(() => changeStream.cursor.emit('error', fakeResumableError), delay); + setTimeout(() => changeStream.cursor.emit('error', fakeResumableError), delay); + } else { + changeStream.cursor.emit('error', fakeResumableError); } - changeStream.cursor.emit('error', fakeResumableError); } /** @@ -2707,9 +2711,8 @@ describe('Change Streams', function() { let startAfter; function recordEvent(events, e) { - if (e.commandName === 'aggregate') { - events.push({ $changeStream: e.command.pipeline[0].$changeStream }); - } + if (e.commandName !== 'aggregate') return; + events.push({ $changeStream: e.command.pipeline[0].$changeStream }); } beforeEach(function(done) { @@ -2783,7 +2786,7 @@ describe('Change Streams', function() { // - MUST include a startAfter option // - MUST NOT include a resumeAfter option // when resuming a change stream. - it('$changeStream that has not received results must include startAfter and not resumeAfter', { + it('$changeStream w/o results must include startAfter and not resumeAfter', { metadata: { requires: { topology: 'replicaset', mongodb: '>=4.1.1' } }, test: function(done) { const events = []; @@ -2804,16 +2807,10 @@ describe('Change Streams', function() { }); waitForStarted(changeStream, () => { - triggerResumableError( - changeStream, - () => { - events.push('error'); - coll.insertOne({ x: 2 }, { w: 'majority', j: true }, err => { - expect(err).to.not.exist; - }); - }, - 250 - ); + triggerResumableError(changeStream, () => events.push('error')); + coll.insertOne({ x: 2 }, { w: 'majority', j: true }, err => { + expect(err).to.not.exist; + }); }); } }); @@ -2823,7 +2820,7 @@ describe('Change Streams', function() { // - MUST include a resumeAfter option // - MUST NOT include a startAfter option // when resuming a change stream. - it('$changeStream that has received results must include resumeAfter and not startAfter', { + it('$changeStream w/ results must include resumeAfter and not startAfter', { metadata: { requires: { topology: 'replicaset', mongodb: '>=4.1.1' } }, test: function(done) { let events = []; @@ -2836,7 +2833,7 @@ describe('Change Streams', function() { case 2: // only events after this point are relevant to this test events = []; - triggerResumableError(changeStream, () => events.push('error'), 0); + triggerResumableError(changeStream, () => events.push('error')); break; case 3: expect(events) From 507ba22745745c2fbdcc152ca800a4a394fc0dcb Mon Sep 17 00:00:00 2001 From: emadum Date: Fri, 8 May 2020 17:41:21 -0400 Subject: [PATCH 08/19] use unique database name to avoid mid-test drop --- test/functional/change_stream.test.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 931643c0b34..8e8969d4929 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -2866,12 +2866,12 @@ describe('Change Stream Resume Error Tests', function() { const client = configuration.newClient(); client.connect(err => { expect(err).to.not.exist; - const db = client.db('test'); + const db = client.db(`changeStreamResumeErrorDb${process.pid}`); db.createCollection('changeStreamResumeErrorTest', (err, collection) => { expect(err).to.not.exist; const changeStream = collection.watch(); callback(collection, changeStream, () => - changeStream.close(() => collection.drop(() => client.close(done))) + changeStream.close(() => db.dropDatabase(() => client.close(done))) ); }); }); From ae6d1e1a2d55a1020c8b6765a60c79ebca89f25c Mon Sep 17 00:00:00 2001 From: emadum Date: Fri, 8 May 2020 17:59:26 -0400 Subject: [PATCH 09/19] better fix --- test/functional/change_stream.test.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 8e8969d4929..8dfa3030fe8 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -2860,18 +2860,18 @@ describe('Change Streams', function() { }); describe('Change Stream Resume Error Tests', function() { - function withChangeStream(callback) { + function withChangeStream(testName, callback) { return function(done) { const configuration = this.configuration; const client = configuration.newClient(); client.connect(err => { expect(err).to.not.exist; - const db = client.db(`changeStreamResumeErrorDb${process.pid}`); - db.createCollection('changeStreamResumeErrorTest', (err, collection) => { + const db = client.db('changeStreamResumErrorTest'); + db.createCollection(testName, (err, collection) => { expect(err).to.not.exist; const changeStream = collection.watch(); callback(collection, changeStream, () => - changeStream.close(() => db.dropDatabase(() => client.close(done))) + changeStream.close(() => collection.drop(() => client.close(done))) ); }); }); @@ -2879,7 +2879,7 @@ describe('Change Stream Resume Error Tests', function() { } it('(events) should continue iterating after a resumable error', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: withChangeStream((collection, changeStream, done) => { + test: withChangeStream('resumeErrorEvents', (collection, changeStream, done) => { const docs = []; changeStream.on('change', change => { expect(change).to.exist; @@ -2911,7 +2911,7 @@ describe('Change Stream Resume Error Tests', function() { it('(callback) hasNext and next should work after a resumable error', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: withChangeStream((collection, changeStream, done) => { + test: withChangeStream('resumeErrorIterator', (collection, changeStream, done) => { waitForStarted(changeStream, () => { collection.insertOne({ a: 42 }, err => { expect(err).to.not.exist; From 5a560b8ceb5c1c21a04a00184ebe7c0b6b100385 Mon Sep 17 00:00:00 2001 From: emadum Date: Fri, 8 May 2020 19:25:06 -0400 Subject: [PATCH 10/19] add delay to resumable error on events test --- test/functional/change_stream.test.js | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 8dfa3030fe8..4882fd46c17 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -2899,11 +2899,15 @@ describe('Change Stream Resume Error Tests', function() { waitForStarted(changeStream, () => { collection.insertOne({ a: 42 }, err => { expect(err).to.not.exist; - triggerResumableError(changeStream, () => { - collection.insertOne({ b: 24 }, err => { - expect(err).to.not.exist; - }); - }); + triggerResumableError( + changeStream, + () => { + collection.insertOne({ b: 24 }, err => { + expect(err).to.not.exist; + }); + }, + 1000 + ); }); }); }) From ea223b1a17dae6b4fc80b40bf0e00b8f3f0608b1 Mon Sep 17 00:00:00 2001 From: emadum Date: Fri, 8 May 2020 23:53:09 -0400 Subject: [PATCH 11/19] cleanup --- lib/change_stream.js | 63 ++++++++++++++++++-------------------------- 1 file changed, 25 insertions(+), 38 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 600b6763fc0..1aa510c47c2 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -471,26 +471,23 @@ function waitForTopologyConnected(topology, options, callback) { // Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream. function processNewChange(changeStream, change, callback) { - const eventEmitter = typeof callback !== 'function'; const cursor = changeStream.cursor; - // If the cursor is null or the change stream has been closed explictly, do not process a change. - if (cursor == null || changeStream.closed) { - // We do not error in the eventEmitter case. - changeStream.closed = true; - if (eventEmitter) { - return; - } - callback(new MongoError('ChangeStream is closed')); + if (changeStream.closed) { + if (callback) callback(new MongoError('ChangeStream is closed')); return; } + if (cursor == null) { + throw new Error('cursor should never be null here'); + } + if (change && !change._id) { const noResumeTokenError = new Error( 'A change stream document has been received that lacks a resume token (_id).' ); - if (eventEmitter) return changeStream.emit('error', noResumeTokenError); + if (!callback) return changeStream.emit('error', noResumeTokenError); return callback(noResumeTokenError); } @@ -502,38 +499,35 @@ function processNewChange(changeStream, change, callback) { changeStream.options.startAtOperationTime = undefined; // Return the change - if (eventEmitter) return changeStream.emit('change', change); + if (!callback) return changeStream.emit('change', change); return callback(undefined, change); } function processError(changeStream, error, callback) { - const eventEmitter = typeof callback !== 'function'; const topology = changeStream.topology; const cursor = changeStream.cursor; - // If the cursor is null or the change stream has been closed explictly, do not process a change. - if (cursor == null || changeStream.closed) { - // We do not error in the eventEmitter case. - changeStream.closed = true; - if (eventEmitter) { - return; - } - callback(new MongoError('ChangeStream is closed')); + // If the change stream has been closed explictly, do not process error. + if (changeStream.closed) { + if (callback) callback(new MongoError('ChangeStream is closed')); return; } - function handleError(err) { - // if there's an error reconnecting, close the change stream - if (eventEmitter) { + function unresumableError(err) { + if (!callback) { changeStream.emit('error', err); changeStream.emit('close'); - // return; } processResumeQueue(changeStream, err); changeStream.closed = true; } - if (isResumableError(error, maxWireVersion(cursor.server))) { + function resumeWithCursor(newCursor) { + changeStream.cursor = newCursor; + processResumeQueue(changeStream); + } + + if (cursor && isResumableError(error, maxWireVersion(cursor.server))) { changeStream.cursor = undefined; // stop listening to all events from old cursor @@ -542,28 +536,21 @@ function processError(changeStream, error, callback) { // close internal cursor, ignore errors cursor.close(); - if (!eventEmitter) changeStream[kResumeQueue].push(() => changeStream.next(callback)); + if (callback) changeStream[kResumeQueue].push(() => changeStream.next(callback)); waitForTopologyConnected(topology, { readPreference: cursor.options.readPreference }, err => { - if (err) { - return handleError(err); - } + if (err) return unresumableError(err); const newCursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); - if (eventEmitter) { - changeStream.cursor = newCursor; - processResumeQueue(changeStream); - return; - } + if (!callback) return resumeWithCursor(newCursor); newCursor.hasNext(err => { - if (err) return handleError(err); - changeStream.cursor = newCursor; - processResumeQueue(changeStream); + if (err) return unresumableError(err); + resumeWithCursor(newCursor); }); }); return; } - if (eventEmitter) return changeStream.emit('error', error); + if (!callback) return changeStream.emit('error', error); return callback(error); } From 66006a93ba63777c30dc12342dfa775e0ab8e8b5 Mon Sep 17 00:00:00 2001 From: emadum Date: Sat, 9 May 2020 00:21:58 -0400 Subject: [PATCH 12/19] add comments --- lib/change_stream.js | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 1aa510c47c2..9a6a501c21e 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -469,7 +469,6 @@ function waitForTopologyConnected(topology, options, callback) { }, 500); // this is an arbitrary wait time to allow SDAM to transition } -// Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream. function processNewChange(changeStream, change, callback) { const cursor = changeStream.cursor; @@ -478,10 +477,6 @@ function processNewChange(changeStream, change, callback) { return; } - if (cursor == null) { - throw new Error('cursor should never be null here'); - } - if (change && !change._id) { const noResumeTokenError = new Error( 'A change stream document has been received that lacks a resume token (_id).' @@ -513,6 +508,13 @@ function processError(changeStream, error, callback) { return; } + // if the resume succeeds, continue with the new cursor + function resumeWithCursor(newCursor) { + changeStream.cursor = newCursor; + processResumeQueue(changeStream); + } + + // otherwise, raise an error and close the change stream function unresumableError(err) { if (!callback) { changeStream.emit('error', err); @@ -522,11 +524,6 @@ function processError(changeStream, error, callback) { changeStream.closed = true; } - function resumeWithCursor(newCursor) { - changeStream.cursor = newCursor; - processResumeQueue(changeStream); - } - if (cursor && isResumableError(error, maxWireVersion(cursor.server))) { changeStream.cursor = undefined; @@ -536,13 +533,23 @@ function processError(changeStream, error, callback) { // close internal cursor, ignore errors cursor.close(); + // the presence of a callback here indicates the error happened during a .next + // so it should be retried on resume if (callback) changeStream[kResumeQueue].push(() => changeStream.next(callback)); waitForTopologyConnected(topology, { readPreference: cursor.options.readPreference }, err => { + // if the topology can't reconnect, close the stream if (err) return unresumableError(err); + + // create a new cursor, preserving the old cursor's options const newCursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); + + // attempt to continue in emitter mode if (!callback) return resumeWithCursor(newCursor); + + // attempt to continue in iterator mode newCursor.hasNext(err => { + // if there's an error immediately after resuming, close the stream if (err) return unresumableError(err); resumeWithCursor(newCursor); }); From c8cabf34d68f16e2a97b04b13aa3ca841b20899f Mon Sep 17 00:00:00 2001 From: emadum Date: Tue, 12 May 2020 10:33:01 -0400 Subject: [PATCH 13/19] feedback from review --- lib/change_stream.js | 27 ++++---- test/functional/change_stream.test.js | 93 +++++++++++++-------------- 2 files changed, 58 insertions(+), 62 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 9a6a501c21e..ad25865d4e3 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -135,7 +135,7 @@ class ChangeStream extends EventEmitter { hasNext(callback) { return maybePromise(this.parent, callback, cb => { getCursor(this, (err, cursor) => { - if (err) return cb(err); + if (err) return cb(err); // failed to resume, raise an error cursor.hasNext(cb); }); }); @@ -151,10 +151,12 @@ class ChangeStream extends EventEmitter { next(callback) { return maybePromise(this.parent, callback, cb => { getCursor(this, (err, cursor) => { - if (err) return cb(err); + if (err) return cb(err); // failed to resume, raise an error cursor.next((error, change) => { if (error) { - return processError(this, error, cb); + this[kResumeQueue].push(() => this.next(cb)); + processError(this, error, cb); + return; } processNewChange(this, change, cb); }); @@ -163,12 +165,13 @@ class ChangeStream extends EventEmitter { } /** - * Is the cursor closed + * Is the change stream closed * @method ChangeStream.prototype.isClosed + * @param {boolean} [checkCursor=true] also check if the underlying cursor is closed * @return {boolean} */ - isClosed() { - return this.closed || (this.cursor && this.cursor.isClosed()); + isClosed(checkCursor) { + return !!(this.closed || (checkCursor && this.cursor && this.cursor.isClosed())); } /** @@ -472,7 +475,7 @@ function waitForTopologyConnected(topology, options, callback) { function processNewChange(changeStream, change, callback) { const cursor = changeStream.cursor; - if (changeStream.closed) { + if (changeStream.isClosed()) { if (callback) callback(new MongoError('ChangeStream is closed')); return; } @@ -503,7 +506,7 @@ function processError(changeStream, error, callback) { const cursor = changeStream.cursor; // If the change stream has been closed explictly, do not process error. - if (changeStream.closed) { + if (changeStream.isClosed()) { if (callback) callback(new MongoError('ChangeStream is closed')); return; } @@ -533,10 +536,6 @@ function processError(changeStream, error, callback) { // close internal cursor, ignore errors cursor.close(); - // the presence of a callback here indicates the error happened during a .next - // so it should be retried on resume - if (callback) changeStream[kResumeQueue].push(() => changeStream.next(callback)); - waitForTopologyConnected(topology, { readPreference: cursor.options.readPreference }, err => { // if the topology can't reconnect, close the stream if (err) return unresumableError(err); @@ -569,7 +568,7 @@ function processError(changeStream, error, callback) { * @param {ChangeStreamCursor} [oldCursor] when resuming from an error, carry over options from previous cursor */ function getCursor(changeStream, callback) { - if (changeStream.isClosed()) { + if (changeStream.isClosed(true)) { callback(new MongoError('ChangeStream is closed.')); return; } @@ -594,7 +593,7 @@ function getCursor(changeStream, callback) { function processResumeQueue(changeStream, err) { while (changeStream[kResumeQueue].length) { const request = changeStream[kResumeQueue].pop(); - if (changeStream.isClosed() && !err) { + if (changeStream.isClosed(true) && !err) { request(new MongoError('Change Stream is not open.')); return; } diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 4882fd46c17..d735d612d34 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -18,27 +18,32 @@ chai.use(require('chai-subset')); * Triggers a fake resumable error on a change stream * * @param {ChangeStream} changeStream - * @param {Function} onCursorClosed callback when cursor closed due this error * @param {number} [delay] optional delay before triggering error + * @param {Function} onClose callback when cursor closed due this error */ -function triggerResumableError(changeStream, onCursorClosed, delay) { - let cbCalled = false; - const closeCursor = changeStream.cursor.close.bind(changeStream.cursor); - changeStream.cursor.close = callback => { - closeCursor(err => { - callback && callback(err); - if (cbCalled) return; - cbCalled = true; - onCursorClosed(); - }); - }; - const fakeResumableError = new MongoNetworkError('fake error'); - // delay error slightly to better simulate real conditions - if (delay) { - setTimeout(() => changeStream.cursor.emit('error', fakeResumableError), delay); - } else { - changeStream.cursor.emit('error', fakeResumableError); +function triggerResumableError(changeStream, delay, onClose) { + if (arguments.length === 2) { + onClose = delay; + delay = undefined; + } + + const stub = sinon.stub(changeStream.cursor, 'close'); + stub.callsFake(function() { + stub.wrappedMethod.call(this); + stub.restore(); + onClose(); + }); + + function triggerError() { + changeStream.cursor.emit('error', new MongoNetworkError('fake error')); } + + if (delay != null) { + setTimeout(triggerError, delay); + return; + } + + triggerError(); } /** @@ -728,7 +733,7 @@ describe('Change Streams', function() { changeStream.hasNext(function(err, hasNext) { expect(err).to.not.exist; assert.equal(hasNext, false); - assert.equal(changeStream.isClosed(), true); + assert.equal(changeStream.isClosed(true), true); client.close(done); }); } @@ -796,7 +801,7 @@ describe('Change Streams', function() { .then(() => changeStream.hasNext()) .then(function(hasNext) { assert.equal(hasNext, false); - assert.equal(changeStream.isClosed(), true); + assert.equal(changeStream.isClosed(true), true); client.close(done); }) .catch(function(err) { @@ -2786,7 +2791,7 @@ describe('Change Streams', function() { // - MUST include a startAfter option // - MUST NOT include a resumeAfter option // when resuming a change stream. - it('$changeStream w/o results must include startAfter and not resumeAfter', { + it('$changeStream without results must include startAfter and not resumeAfter', { metadata: { requires: { topology: 'replicaset', mongodb: '>=4.1.1' } }, test: function(done) { const events = []; @@ -2820,7 +2825,7 @@ describe('Change Streams', function() { // - MUST include a resumeAfter option // - MUST NOT include a startAfter option // when resuming a change stream. - it('$changeStream w/ results must include resumeAfter and not startAfter', { + it('$changeStream with results must include resumeAfter and not startAfter', { metadata: { requires: { topology: 'replicaset', mongodb: '>=4.1.1' } }, test: function(done) { let events = []; @@ -2899,15 +2904,11 @@ describe('Change Stream Resume Error Tests', function() { waitForStarted(changeStream, () => { collection.insertOne({ a: 42 }, err => { expect(err).to.not.exist; - triggerResumableError( - changeStream, - () => { - collection.insertOne({ b: 24 }, err => { - expect(err).to.not.exist; - }); - }, - 1000 - ); + triggerResumableError(changeStream, 1000, () => { + collection.insertOne({ b: 24 }, err => { + expect(err).to.not.exist; + }); + }); }); }); }) @@ -2919,25 +2920,21 @@ describe('Change Stream Resume Error Tests', function() { waitForStarted(changeStream, () => { collection.insertOne({ a: 42 }, err => { expect(err).to.not.exist; - triggerResumableError( - changeStream, - () => { - changeStream.hasNext((err1, hasNext) => { - expect(err1).to.not.exist; - expect(hasNext).to.be.true; - changeStream.next((err, change) => { - expect(err).to.not.exist; - expect(change).to.containSubset({ - operationType: 'insert', - fullDocument: { b: 24 } - }); - done(); + triggerResumableError(changeStream, 250, () => { + changeStream.hasNext((err1, hasNext) => { + expect(err1).to.not.exist; + expect(hasNext).to.be.true; + changeStream.next((err, change) => { + expect(err).to.not.exist; + expect(change).to.containSubset({ + operationType: 'insert', + fullDocument: { b: 24 } }); + done(); }); - collection.insertOne({ b: 24 }); - }, - 250 - ); + }); + collection.insertOne({ b: 24 }); + }); }); }); changeStream.hasNext((err, hasNext) => { From 07fe6fb749f7bebbc0d9919a4823fd618cfd0555 Mon Sep 17 00:00:00 2001 From: emadum Date: Wed, 13 May 2020 11:21:22 -0400 Subject: [PATCH 14/19] update jsdoc --- lib/change_stream.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index ad25865d4e3..72b7b02ce8b 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -587,8 +587,8 @@ function getCursor(changeStream, callback) { * Drain the resume queue when a new has become available * * @param {ChangeStream} changeStream the parent ChangeStream - * @param {?ChangeStreamCursor} changeStream.cursor the new cursor - * @param {?Error} err error getting a new cursor + * @param {ChangeStreamCursor?} changeStream.cursor the new cursor + * @param {Error} [err] error getting a new cursor */ function processResumeQueue(changeStream, err) { while (changeStream[kResumeQueue].length) { From 07e2302b3d356dc0e67688fa3fdeb11cfa46ba43 Mon Sep 17 00:00:00 2001 From: emadum Date: Thu, 14 May 2020 16:12:12 -0400 Subject: [PATCH 15/19] revert change to isClosed --- lib/change_stream.js | 12 ++++++------ test/functional/change_stream.test.js | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 72b7b02ce8b..900da1edad1 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -170,8 +170,8 @@ class ChangeStream extends EventEmitter { * @param {boolean} [checkCursor=true] also check if the underlying cursor is closed * @return {boolean} */ - isClosed(checkCursor) { - return !!(this.closed || (checkCursor && this.cursor && this.cursor.isClosed())); + isClosed() { + return this.closed || (this.cursor && this.cursor.isClosed()); } /** @@ -475,7 +475,7 @@ function waitForTopologyConnected(topology, options, callback) { function processNewChange(changeStream, change, callback) { const cursor = changeStream.cursor; - if (changeStream.isClosed()) { + if (changeStream.closed) { if (callback) callback(new MongoError('ChangeStream is closed')); return; } @@ -506,7 +506,7 @@ function processError(changeStream, error, callback) { const cursor = changeStream.cursor; // If the change stream has been closed explictly, do not process error. - if (changeStream.isClosed()) { + if (changeStream.closed) { if (callback) callback(new MongoError('ChangeStream is closed')); return; } @@ -568,7 +568,7 @@ function processError(changeStream, error, callback) { * @param {ChangeStreamCursor} [oldCursor] when resuming from an error, carry over options from previous cursor */ function getCursor(changeStream, callback) { - if (changeStream.isClosed(true)) { + if (changeStream.isClosed()) { callback(new MongoError('ChangeStream is closed.')); return; } @@ -593,7 +593,7 @@ function getCursor(changeStream, callback) { function processResumeQueue(changeStream, err) { while (changeStream[kResumeQueue].length) { const request = changeStream[kResumeQueue].pop(); - if (changeStream.isClosed(true) && !err) { + if (changeStream.isClosed() && !err) { request(new MongoError('Change Stream is not open.')); return; } diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 0d1c65c811e..080c1a61618 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -733,7 +733,7 @@ describe('Change Streams', function() { changeStream.hasNext(function(err, hasNext) { expect(err).to.not.exist; assert.equal(hasNext, false); - assert.equal(changeStream.isClosed(true), true); + assert.equal(changeStream.isClosed(), true); client.close(done); }); } @@ -801,7 +801,7 @@ describe('Change Streams', function() { .then(() => changeStream.hasNext()) .then(function(hasNext) { assert.equal(hasNext, false); - assert.equal(changeStream.isClosed(true), true); + assert.equal(changeStream.isClosed(), true); client.close(done); }) .catch(function(err) { From e493558af6f62e6222cee247d7e29d0a6b98b257 Mon Sep 17 00:00:00 2001 From: emadum Date: Thu, 14 May 2020 17:42:37 -0400 Subject: [PATCH 16/19] refactor: extract withCursor helper --- test/functional/change_stream.test.js | 34 +++++++++++++-------------- test/functional/shared.js | 15 ++++++++++++ 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 080c1a61618..f6d16976d05 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -5,6 +5,7 @@ const MongoError = require('../../lib/core').MongoError; var MongoNetworkError = require('../../lib/core').MongoNetworkError; var setupDatabase = require('./shared').setupDatabase; var withClient = require('./shared').withClient; +var withCursor = require('./shared').withCursor; var delay = require('./shared').delay; var co = require('co'); var mock = require('mongodb-mock-server'); @@ -2859,24 +2860,21 @@ describe('Change Streams', function() { }); describe('Change Stream Resume Error Tests', function() { - function withChangeStream(testName, callback) { - return function(done) { - const configuration = this.configuration; - const client = configuration.newClient(); - client.connect(err => { - expect(err).to.not.exist; - const db = client.db('changeStreamResumErrorTest'); - db.createCollection(testName, (err, collection) => { - expect(err).to.not.exist; - const changeStream = collection.watch(); - callback(collection, changeStream, () => - changeStream.close(() => collection.drop(() => client.close(done))) - ); - }); - }); - }; + function withChangeStream(collectionName, callback) { + return withCursor( + // get change stream cursor + (client, cursorCb) => { + const db = client.db('changeStreamResumeErrorTest'); + db.createCollection(collectionName, (err, collection) => { + if (err) return cursorCb(err); + cursorCb(null, collection.watch()); + }); + }, + // perform test on change stream cursor + (cursor, done) => callback(cursor.parent, cursor, done) + ); } - it('(events) should continue iterating after a resumable error', { + it('should continue emitting change events after a resumable error', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: withChangeStream('resumeErrorEvents', (collection, changeStream, done) => { const docs = []; @@ -2908,7 +2906,7 @@ describe('Change Stream Resume Error Tests', function() { }) }); - it('(callback) hasNext and next should work after a resumable error', { + it('should continue iterating changes after a resumable error', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: withChangeStream('resumeErrorIterator', (collection, changeStream, done) => { waitForStarted(changeStream, () => { diff --git a/test/functional/shared.js b/test/functional/shared.js index f2c34c58c89..22ba6c98a4f 100644 --- a/test/functional/shared.js +++ b/test/functional/shared.js @@ -189,6 +189,20 @@ function withMonitoredClient(commands, options, callback) { }; } +/** + * Safely perform a test with an arbitrary cursor. + * + * @param {function} getCursor given a client, provide cursor for test + * @param {function} callback the test function + */ +function withCursor(getCursor, callback) { + return withClient((client, done) => { + getCursor(client, (cursorErr, cursor) => { + callback(cursor, () => cursor.close(closeErr => done(cursorErr || closeErr))); + }); + }); +} + /** * A class for listening on specific events * @@ -265,5 +279,6 @@ module.exports = { setupDatabase, withClient, withMonitoredClient, + withCursor, EventCollector }; From a77ee8b72874871beff33091ce885217f9acfb36 Mon Sep 17 00:00:00 2001 From: emadum Date: Thu, 14 May 2020 18:01:02 -0400 Subject: [PATCH 17/19] fix withCursor error handling --- test/functional/shared.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/functional/shared.js b/test/functional/shared.js index 22ba6c98a4f..5839b95697d 100644 --- a/test/functional/shared.js +++ b/test/functional/shared.js @@ -197,8 +197,9 @@ function withMonitoredClient(commands, options, callback) { */ function withCursor(getCursor, callback) { return withClient((client, done) => { - getCursor(client, (cursorErr, cursor) => { - callback(cursor, () => cursor.close(closeErr => done(cursorErr || closeErr))); + getCursor(client, (err, cursor) => { + if (err) return done(err); + callback(cursor, () => cursor.close(done)); }); }); } From 0f856d86f7e2e9c1aa3a6e0b2a4703acd378c3c2 Mon Sep 17 00:00:00 2001 From: emadum Date: Fri, 15 May 2020 16:26:04 -0400 Subject: [PATCH 18/19] refactor withCursor and withChangeStream --- test/functional/change_stream.test.js | 64 +++++++++++++-------------- test/functional/shared.js | 24 ++++++---- 2 files changed, 47 insertions(+), 41 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index f6d16976d05..82744e3f068 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -15,6 +15,31 @@ const sinon = require('sinon'); chai.use(require('chai-subset')); +function withChangeStream(dbName, collectionName, callback) { + if (arguments.length === 1) { + callback = dbName; + dbName = undefined; + } else if (arguments.length === 2) { + callback = collectionName; + collectionName = dbName; + dbName = undefined; + } + dbName = dbName || 'changestream_integration_test'; + collectionName = collectionName || 'test'; + + return withClient((client, done) => { + const db = client.db(); + db.createCollection(collectionName, { w: 'majority' }, (err, collection) => { + if (err) return done(err); + withCursor( + collection.watch(), + (cursor, done) => callback(collection, cursor, done), + err => collection.drop(dropErr => done(err || dropErr)) + ); + }); + }); +} + /** * Triggers a fake resumable error on a change stream * @@ -2649,32 +2674,21 @@ describe('Change Streams', function() { }); describe('tryNext', function() { - function withTemporaryCollectionOnDb(database, testFn) { - return withClient((client, done) => { - const db = client.db(database); - db.createCollection('test', { w: 'majority' }, (err, collection) => { - if (err) return done(err); - testFn(collection, () => db.dropDatabase(done)); - }); - }); - } + it('should return null on single iteration of empty cursor', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: withTemporaryCollectionOnDb('testTryNext', (collection, done) => { - const changeStream = collection.watch(); + test: withChangeStream((collection, changeStream, done) => { tryNext(changeStream, (err, doc) => { expect(err).to.not.exist; expect(doc).to.not.exist; - - changeStream.close(done); + done(); }); }) }); it('should iterate a change stream until first empty batch', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: withTemporaryCollectionOnDb('testTryNext', (collection, done) => { - const changeStream = collection.watch(); + test: withChangeStream((collection, changeStream, done) => { waitForStarted(changeStream, () => { collection.insertOne({ a: 42 }, err => { expect(err).to.not.exist; @@ -2697,7 +2711,7 @@ describe('Change Streams', function() { expect(err).to.not.exist; expect(doc).to.not.exist; - changeStream.close(done); + done(); }); }); }); @@ -2860,23 +2874,9 @@ describe('Change Streams', function() { }); describe('Change Stream Resume Error Tests', function() { - function withChangeStream(collectionName, callback) { - return withCursor( - // get change stream cursor - (client, cursorCb) => { - const db = client.db('changeStreamResumeErrorTest'); - db.createCollection(collectionName, (err, collection) => { - if (err) return cursorCb(err); - cursorCb(null, collection.watch()); - }); - }, - // perform test on change stream cursor - (cursor, done) => callback(cursor.parent, cursor, done) - ); - } it('should continue emitting change events after a resumable error', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: withChangeStream('resumeErrorEvents', (collection, changeStream, done) => { + test: withChangeStream((collection, changeStream, done) => { const docs = []; changeStream.on('change', change => { expect(change).to.exist; @@ -2908,7 +2908,7 @@ describe('Change Stream Resume Error Tests', function() { it('should continue iterating changes after a resumable error', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: withChangeStream('resumeErrorIterator', (collection, changeStream, done) => { + test: withChangeStream((collection, changeStream, done) => { waitForStarted(changeStream, () => { collection.insertOne({ a: 42 }, err => { expect(err).to.not.exist; diff --git a/test/functional/shared.js b/test/functional/shared.js index 5839b95697d..37a2927b8d1 100644 --- a/test/functional/shared.js +++ b/test/functional/shared.js @@ -192,16 +192,22 @@ function withMonitoredClient(commands, options, callback) { /** * Safely perform a test with an arbitrary cursor. * - * @param {function} getCursor given a client, provide cursor for test - * @param {function} callback the test function + * @param {function} cursor any cursor that needs to be closed + * @param {(cursor: Object, done: Function) => void} body test body + * @param {function} done called after cleanup */ -function withCursor(getCursor, callback) { - return withClient((client, done) => { - getCursor(client, (err, cursor) => { - if (err) return done(err); - callback(cursor, () => cursor.close(done)); - }); - }); +function withCursor(cursor, body, done) { + let clean = false; + function cleanup(testErr) { + if (clean) return; + clean = true; + return cursor.close(closeErr => done(testErr || closeErr)); + } + try { + body(cursor, cleanup); + } catch (err) { + cleanup(err); + } } /** From e549d7b17059b6ad8d3941659923046b0efd3e31 Mon Sep 17 00:00:00 2001 From: emadum Date: Fri, 15 May 2020 16:40:13 -0400 Subject: [PATCH 19/19] fix lint error, cleanup --- test/functional/change_stream.test.js | 3 +-- test/functional/shared.js | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index 82744e3f068..8c34ce99735 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -28,7 +28,7 @@ function withChangeStream(dbName, collectionName, callback) { collectionName = collectionName || 'test'; return withClient((client, done) => { - const db = client.db(); + const db = client.db(dbName); db.createCollection(collectionName, { w: 'majority' }, (err, collection) => { if (err) return done(err); withCursor( @@ -2674,7 +2674,6 @@ describe('Change Streams', function() { }); describe('tryNext', function() { - it('should return null on single iteration of empty cursor', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, test: withChangeStream((collection, changeStream, done) => { diff --git a/test/functional/shared.js b/test/functional/shared.js index 37a2927b8d1..b372b7275de 100644 --- a/test/functional/shared.js +++ b/test/functional/shared.js @@ -192,9 +192,9 @@ function withMonitoredClient(commands, options, callback) { /** * Safely perform a test with an arbitrary cursor. * - * @param {function} cursor any cursor that needs to be closed + * @param {Function} cursor any cursor that needs to be closed * @param {(cursor: Object, done: Function) => void} body test body - * @param {function} done called after cleanup + * @param {Function} done called after cleanup */ function withCursor(cursor, body, done) { let clean = false;