Skip to content

Commit 1e3b4c9

Browse files
authored
refactor(ChangeStream): use maybePromise for close, improve tests
NODE-2598
1 parent 5e7197a commit 1e3b4c9

File tree

3 files changed

+163
-161
lines changed

3 files changed

+163
-161
lines changed

lib/change_stream.js

Lines changed: 37 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ class ChangeStream extends EventEmitter {
9494
// Create contained Change Stream cursor
9595
this.cursor = createChangeStreamCursor(this, options);
9696

97+
this.closed = false;
98+
9799
// Listen for any `change` listeners being added to ChangeStream
98100
this.on('newListener', eventName => {
99101
if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
@@ -141,7 +143,7 @@ class ChangeStream extends EventEmitter {
141143
next(callback) {
142144
return maybePromise(this.parent, callback, cb => {
143145
if (this.isClosed()) {
144-
return cb(new Error('Change Stream is not open.'));
146+
return cb(new MongoError('ChangeStream is closed'));
145147
}
146148
this.cursor.next((error, change) => {
147149
processNewChange({ changeStream: this, error, change, callback: cb });
@@ -155,10 +157,7 @@ class ChangeStream extends EventEmitter {
155157
* @return {boolean}
156158
*/
157159
isClosed() {
158-
if (this.cursor) {
159-
return this.cursor.isClosed();
160-
}
161-
return true;
160+
return this.closed || (this.cursor && this.cursor.isClosed());
162161
}
163162

164163
/**
@@ -168,31 +167,20 @@ class ChangeStream extends EventEmitter {
168167
* @return {Promise} returns Promise if no callback passed
169168
*/
170169
close(callback) {
171-
if (!this.cursor) {
172-
if (callback) return callback();
173-
return this.promiseLibrary.resolve();
174-
}
170+
return maybePromise(this.parent, callback, cb => {
171+
if (this.closed) return cb();
175172

176-
// Tidy up the existing cursor
177-
const cursor = this.cursor;
173+
// flag the change stream as explicitly closed
174+
this.closed = true;
178175

179-
if (callback) {
180-
return cursor.close(err => {
181-
['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
182-
delete this.cursor;
176+
// Tidy up the existing cursor
177+
const cursor = this.cursor;
183178

184-
return callback(err);
185-
});
186-
}
187-
188-
const PromiseCtor = this.promiseLibrary || Promise;
189-
return new PromiseCtor((resolve, reject) => {
190-
cursor.close(err => {
179+
return cursor.close(err => {
191180
['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
192-
delete this.cursor;
181+
this.cursor = undefined;
193182

194-
if (err) return reject(err);
195-
resolve();
183+
return cb(err);
196184
});
197185
});
198186
}
@@ -321,7 +309,7 @@ class ChangeStreamCursor extends Cursor {
321309
_initializeCursor(callback) {
322310
super._initializeCursor((err, result) => {
323311
if (err) {
324-
callback(err, null);
312+
callback(err);
325313
return;
326314
}
327315

@@ -347,7 +335,7 @@ class ChangeStreamCursor extends Cursor {
347335
_getMore(callback) {
348336
super._getMore((err, response) => {
349337
if (err) {
350-
callback(err, null);
338+
callback(err);
351339
return;
352340
}
353341

@@ -460,12 +448,12 @@ function waitForTopologyConnected(topology, options, callback) {
460448
const timeout = options.timeout || SELECTION_TIMEOUT;
461449
const readPreference = options.readPreference;
462450

463-
if (topology.isConnected({ readPreference })) return callback(null, null);
451+
if (topology.isConnected({ readPreference })) return callback();
464452
const hrElapsed = process.hrtime(start);
465453
const elapsed = (hrElapsed[0] * 1e9 + hrElapsed[1]) / 1e6;
466454
if (elapsed > timeout) return callback(new MongoError('Timed out waiting for connection'));
467455
waitForTopologyConnected(topology, options, callback);
468-
}, 3000); // this is an arbitrary wait time to allow SDAM to transition
456+
}, 500); // this is an arbitrary wait time to allow SDAM to transition
469457
}
470458

471459
// Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream.
@@ -477,17 +465,15 @@ function processNewChange(args) {
477465
const eventEmitter = args.eventEmitter || false;
478466
const cursor = changeStream.cursor;
479467

480-
// If the cursor is null, then it should not process a change.
481-
if (cursor == null) {
468+
// If the cursor is null or the change stream has been closed explictly, do not process a change.
469+
if (cursor == null || changeStream.closed) {
482470
// We do not error in the eventEmitter case.
471+
changeStream.closed = true;
483472
if (eventEmitter) {
484473
return;
485474
}
486-
487-
const error = new MongoError('ChangeStream is closed');
488-
return typeof callback === 'function'
489-
? callback(error, null)
490-
: changeStream.promiseLibrary.reject(error);
475+
callback(new MongoError('ChangeStream is closed'));
476+
return;
491477
}
492478

493479
const topology = changeStream.topology;
@@ -506,46 +492,27 @@ function processNewChange(args) {
506492
// close internal cursor, ignore errors
507493
changeStream.cursor.close();
508494

509-
// attempt recreating the cursor
510-
if (eventEmitter) {
511-
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
512-
if (err) {
495+
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
496+
if (err) {
497+
// if there's an error reconnecting, close the change stream
498+
changeStream.closed = true;
499+
if (eventEmitter) {
513500
changeStream.emit('error', err);
514501
changeStream.emit('close');
515502
return;
516503
}
517-
changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
518-
});
504+
return callback(err);
505+
}
519506

520-
return;
521-
}
522-
523-
if (callback) {
524-
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
525-
if (err) return callback(err, null);
526-
527-
changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
528-
changeStream.next(callback);
529-
});
530-
531-
return;
532-
}
533-
534-
return new Promise((resolve, reject) => {
535-
waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
536-
if (err) return reject(err);
537-
resolve();
538-
});
539-
})
540-
.then(
541-
() => (changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions))
542-
)
543-
.then(() => changeStream.next());
507+
changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
508+
if (eventEmitter) return;
509+
changeStream.next(callback);
510+
});
511+
return;
544512
}
545513

546514
if (eventEmitter) return changeStream.emit('error', error);
547-
if (typeof callback === 'function') return callback(error, null);
548-
return changeStream.promiseLibrary.reject(error);
515+
return callback(error);
549516
}
550517

551518
changeStream.attemptingResume = false;
@@ -556,8 +523,7 @@ function processNewChange(args) {
556523
);
557524

558525
if (eventEmitter) return changeStream.emit('error', noResumeTokenError);
559-
if (typeof callback === 'function') return callback(noResumeTokenError, null);
560-
return changeStream.promiseLibrary.reject(noResumeTokenError);
526+
return callback(noResumeTokenError);
561527
}
562528

563529
// cache the resume token
@@ -569,8 +535,7 @@ function processNewChange(args) {
569535

570536
// Return the change
571537
if (eventEmitter) return changeStream.emit('change', change);
572-
if (typeof callback === 'function') return callback(error, change);
573-
return changeStream.promiseLibrary.resolve(change);
538+
return callback(error, change);
574539
}
575540

576541
/**

0 commit comments

Comments
 (0)