@@ -6,6 +6,7 @@ const MongoError = require('./core').MongoError;
6
6
const Cursor = require ( './cursor' ) ;
7
7
const relayEvents = require ( './core/utils' ) . relayEvents ;
8
8
const maxWireVersion = require ( './core/utils' ) . maxWireVersion ;
9
+ const maybePromise = require ( './utils' ) . maybePromise ;
9
10
const AggregateOperation = require ( './operations/aggregate' ) ;
10
11
11
12
const CHANGE_STREAM_OPTIONS = [ 'resumeAfter' , 'startAfter' , 'startAtOperationTime' , 'fullDocument' ] ;
@@ -124,30 +125,28 @@ class ChangeStream extends EventEmitter {
124
125
* @function ChangeStream.prototype.hasNext
125
126
* @param {ChangeStream~resultCallback } [callback] The result callback.
126
127
* @throws {MongoError }
127
- * @return {Promise } returns Promise if no callback passed
128
+ * @returns {Promise|void } returns Promise if no callback passed
128
129
*/
129
130
hasNext ( callback ) {
130
- return this . cursor . hasNext ( callback ) ;
131
+ return maybePromise ( this . parent , callback , cb => this . cursor . hasNext ( cb ) ) ;
131
132
}
132
133
133
134
/**
134
135
* Get the next available document from the Change Stream, returns null if no more documents are available.
135
136
* @function ChangeStream.prototype.next
136
137
* @param {ChangeStream~resultCallback } [callback] The result callback.
137
138
* @throws {MongoError }
138
- * @return {Promise } returns Promise if no callback passed
139
+ * @returns {Promise|void } returns Promise if no callback passed
139
140
*/
140
141
next ( callback ) {
141
- var self = this ;
142
- if ( this . isClosed ( ) ) {
143
- if ( callback ) return callback ( new Error ( 'Change Stream is not open.' ) , null ) ;
144
- return self . promiseLibrary . reject ( new Error ( 'Change Stream is not open.' ) ) ;
145
- }
146
-
147
- return this . cursor
148
- . next ( )
149
- . then ( change => processNewChange ( { changeStream : self , change, callback } ) )
150
- . catch ( error => processNewChange ( { changeStream : self , error, callback } ) ) ;
142
+ return maybePromise ( this . parent , callback , cb => {
143
+ if ( this . isClosed ( ) ) {
144
+ return cb ( new Error ( 'Change Stream is not open.' ) ) ;
145
+ }
146
+ this . cursor . next ( ( error , change ) => {
147
+ processNewChange ( { changeStream : this , error, change, callback : cb } ) ;
148
+ } ) ;
149
+ } ) ;
151
150
}
152
151
153
152
/**
0 commit comments