Skip to content

fix(ChangeStream): should resume from errors when iterating #2360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 134 additions & 71 deletions lib/change_stream.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict';

const Denque = require('denque');
const EventEmitter = require('events');
const isResumableError = require('./error').isResumableError;
const MongoError = require('./core').MongoError;
Expand All @@ -9,6 +10,8 @@ const maxWireVersion = require('./core/utils').maxWireVersion;
const maybePromise = require('./utils').maybePromise;
const AggregateOperation = require('./operations/aggregate');

const kResumeQueue = Symbol('resumeQueue');

const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
CHANGE_STREAM_OPTIONS
Expand Down Expand Up @@ -91,6 +94,8 @@ class ChangeStream extends EventEmitter {
this.options.readPreference = parent.s.readPreference;
}

this[kResumeQueue] = new Denque();

// Create contained Change Stream cursor
this.cursor = createChangeStreamCursor(this, options);

Expand All @@ -99,9 +104,7 @@ class ChangeStream extends EventEmitter {
// Listen for any `change` listeners being added to ChangeStream
this.on('newListener', eventName => {
if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
this.cursor.on('data', change =>
processNewChange({ changeStream: this, change, eventEmitter: true })
);
this.cursor.on('data', change => processNewChange(this, change));
}
});

Expand Down Expand Up @@ -130,7 +133,12 @@ class ChangeStream extends EventEmitter {
* @returns {Promise|void} returns Promise if no callback passed
*/
hasNext(callback) {
return maybePromise(this.parent, callback, cb => this.cursor.hasNext(cb));
return maybePromise(this.parent, callback, cb => {
getCursor(this, (err, cursor) => {
if (err) return cb(err); // failed to resume, raise an error
cursor.hasNext(cb);
});
});
}

/**
Expand All @@ -142,18 +150,24 @@ class ChangeStream extends EventEmitter {
*/
next(callback) {
return maybePromise(this.parent, callback, cb => {
if (this.isClosed()) {
return cb(new MongoError('ChangeStream is closed'));
}
this.cursor.next((error, change) => {
processNewChange({ changeStream: this, error, change, callback: cb });
getCursor(this, (err, cursor) => {
if (err) return cb(err); // failed to resume, raise an error
cursor.next((error, change) => {
if (error) {
this[kResumeQueue].push(() => this.next(cb));
processError(this, error, cb);
return;
}
processNewChange(this, change, cb);
});
});
});
}

/**
* Is the cursor closed
* Is the change stream closed
* @method ChangeStream.prototype.isClosed
* @param {boolean} [checkCursor=true] also check if the underlying cursor is closed
* @return {boolean}
*/
isClosed() {
Expand All @@ -173,6 +187,8 @@ class ChangeStream extends EventEmitter {
// flag the change stream as explicitly closed
this.closed = true;

if (!this.cursor) return cb();

// Tidy up the existing cursor
const cursor = this.cursor;

Expand Down Expand Up @@ -383,7 +399,7 @@ function createChangeStreamCursor(self, options) {
*/
if (self.listenerCount('change') > 0) {
changeStreamCursor.on('data', function(change) {
processNewChange({ changeStream: self, change, eventEmitter: true });
processNewChange(self, change);
});
}

Expand Down Expand Up @@ -415,7 +431,7 @@ function createChangeStreamCursor(self, options) {
* @type {Error}
*/
changeStreamCursor.on('error', function(error) {
processNewChange({ changeStream: self, error, eventEmitter: true });
processError(self, error);
});

if (self.pipeDestinations) {
Expand Down Expand Up @@ -456,73 +472,20 @@ function waitForTopologyConnected(topology, options, callback) {
}, 500); // this is an arbitrary wait time to allow SDAM to transition
}

// Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream.
function processNewChange(args) {
const changeStream = args.changeStream;
const error = args.error;
const change = args.change;
const callback = args.callback;
const eventEmitter = args.eventEmitter || false;
function processNewChange(changeStream, change, callback) {
const cursor = changeStream.cursor;

// If the cursor is null or the change stream has been closed explictly, do not process a change.
if (cursor == null || changeStream.closed) {
// We do not error in the eventEmitter case.
changeStream.closed = true;
if (eventEmitter) {
return;
}
callback(new MongoError('ChangeStream is closed'));
if (changeStream.closed) {
if (callback) callback(new MongoError('ChangeStream is closed'));
return;
}

const topology = changeStream.topology;
const options = changeStream.cursor.options;
const wireVersion = maxWireVersion(cursor.server);

if (error) {
if (isResumableError(error, wireVersion) && !changeStream.attemptingResume) {
changeStream.attemptingResume = true;

// stop listening to all events from old cursor
['data', 'close', 'end', 'error'].forEach(event =>
changeStream.cursor.removeAllListeners(event)
);

// close internal cursor, ignore errors
changeStream.cursor.close();

waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
if (err) {
// if there's an error reconnecting, close the change stream
changeStream.closed = true;
if (eventEmitter) {
changeStream.emit('error', err);
changeStream.emit('close');
return;
}
return callback(err);
}

changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
if (eventEmitter) return;
changeStream.next(callback);
});
return;
}

if (eventEmitter) return changeStream.emit('error', error);
return callback(error);
}

changeStream.attemptingResume = false;

if (change && !change._id) {
const noResumeTokenError = new Error(
'A change stream document has been received that lacks a resume token (_id).'
);

if (eventEmitter) return changeStream.emit('error', noResumeTokenError);
if (!callback) return changeStream.emit('error', noResumeTokenError);
return callback(noResumeTokenError);
}

Expand All @@ -534,8 +497,108 @@ function processNewChange(args) {
changeStream.options.startAtOperationTime = undefined;

// Return the change
if (eventEmitter) return changeStream.emit('change', change);
return callback(error, change);
if (!callback) return changeStream.emit('change', change);
return callback(undefined, change);
}

function processError(changeStream, error, callback) {
const topology = changeStream.topology;
const cursor = changeStream.cursor;

// If the change stream has been closed explictly, do not process error.
if (changeStream.closed) {
if (callback) callback(new MongoError('ChangeStream is closed'));
return;
}

// if the resume succeeds, continue with the new cursor
function resumeWithCursor(newCursor) {
changeStream.cursor = newCursor;
processResumeQueue(changeStream);
}

// otherwise, raise an error and close the change stream
function unresumableError(err) {
if (!callback) {
changeStream.emit('error', err);
changeStream.emit('close');
}
processResumeQueue(changeStream, err);
changeStream.closed = true;
}

if (cursor && isResumableError(error, maxWireVersion(cursor.server))) {
changeStream.cursor = undefined;

// stop listening to all events from old cursor
['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));

// close internal cursor, ignore errors
cursor.close();

waitForTopologyConnected(topology, { readPreference: cursor.options.readPreference }, err => {
// if the topology can't reconnect, close the stream
if (err) return unresumableError(err);

// create a new cursor, preserving the old cursor's options
const newCursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);

// attempt to continue in emitter mode
if (!callback) return resumeWithCursor(newCursor);

// attempt to continue in iterator mode
newCursor.hasNext(err => {
// if there's an error immediately after resuming, close the stream
if (err) return unresumableError(err);
resumeWithCursor(newCursor);
});
});
return;
}

if (!callback) return changeStream.emit('error', error);
return callback(error);
}

/**
* Safely provides a cursor across resume attempts
*
* @param {ChangeStream} changeStream the parent ChangeStream
* @param {function} callback gets the cursor or error
* @param {ChangeStreamCursor} [oldCursor] when resuming from an error, carry over options from previous cursor
*/
function getCursor(changeStream, callback) {
if (changeStream.isClosed()) {
callback(new MongoError('ChangeStream is closed.'));
return;
}

// if a cursor exists and it is open, return it
if (changeStream.cursor) {
callback(undefined, changeStream.cursor);
return;
}

// no cursor, queue callback until topology reconnects
changeStream[kResumeQueue].push(callback);
}

/**
* Drain the resume queue when a new has become available
*
* @param {ChangeStream} changeStream the parent ChangeStream
* @param {ChangeStreamCursor?} changeStream.cursor the new cursor
* @param {Error} [err] error getting a new cursor
*/
function processResumeQueue(changeStream, err) {
while (changeStream[kResumeQueue].length) {
const request = changeStream[kResumeQueue].pop();
if (changeStream.isClosed() && !err) {
request(new MongoError('Change Stream is not open.'));
return;
}
request(err, changeStream.cursor);
}
}

/**
Expand Down
Loading