1
1
import { Readable , Transform } from 'stream' ;
2
- import { promisify } from 'util' ;
2
+ import { callbackify , promisify } from 'util' ;
3
3
4
4
import { type BSONSerializeOptions , type Document , Long , pluckBSONSerializeOptions } from '../bson' ;
5
5
import {
@@ -361,7 +361,7 @@ export abstract class AbstractCursor<
361
361
return true ;
362
362
}
363
363
364
- const doc = await nextAsync < TSchema > ( this , true ) ;
364
+ const doc = await next < TSchema > ( this , true ) ;
365
365
366
366
if ( doc ) {
367
367
this [ kDocuments ] . unshift ( doc ) ;
@@ -377,7 +377,7 @@ export abstract class AbstractCursor<
377
377
throw new MongoCursorExhaustedError ( ) ;
378
378
}
379
379
380
- return nextAsync ( this , true ) ;
380
+ return next ( this , true ) ;
381
381
}
382
382
383
383
/**
@@ -388,7 +388,7 @@ export abstract class AbstractCursor<
388
388
throw new MongoCursorExhaustedError ( ) ;
389
389
}
390
390
391
- return nextAsync ( this , false ) ;
391
+ return next ( this , false ) ;
392
392
}
393
393
394
394
/**
@@ -690,78 +690,77 @@ function nextDocument<T>(cursor: AbstractCursor<T>): T | null {
690
690
return doc ;
691
691
}
692
692
693
- const nextAsync = promisify (
694
- next as < T > (
695
- cursor : AbstractCursor < T > ,
696
- blocking : boolean ,
697
- callback : ( e : Error , r : T | null ) => void
698
- ) => void
699
- ) ;
700
-
701
693
/**
702
694
* @param cursor - the cursor on which to call `next`
703
695
* @param blocking - a boolean indicating whether or not the cursor should `block` until data
704
696
* is available. Generally, this flag is set to `false` because if the getMore returns no documents,
705
697
* the cursor has been exhausted. In certain scenarios (ChangeStreams, tailable await cursors and
706
698
* `tryNext`, for example) blocking is necessary because a getMore returning no documents does
707
699
* not indicate the end of the cursor.
708
- * @param callback - callback to return the result to the caller
709
- * @returns
700
+ * @returns the next document in the cursor, or `null`. When `blocking` is `true`, a `null` document means
701
+ * the cursor has been exhausted. Otherwise, it means that there is no document available in the cursor's buffer.
710
702
*/
711
- export function next < T > (
712
- cursor : AbstractCursor < T > ,
713
- blocking : boolean ,
714
- callback : Callback < T | null >
715
- ) : void {
703
+ async function next < T > ( cursor : AbstractCursor < T > , blocking : boolean ) : Promise < T | null > {
716
704
const cursorId = cursor [ kId ] ;
717
705
if ( cursor . closed ) {
718
- return callback ( undefined , null ) ;
706
+ return null ;
719
707
}
720
708
721
709
if ( cursor [ kDocuments ] . length !== 0 ) {
722
- callback ( undefined , nextDocument < T > ( cursor ) ) ;
723
- return ;
710
+ return nextDocument < T > ( cursor ) ;
724
711
}
725
712
726
713
if ( cursorId == null ) {
727
714
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
728
- cursor [ kInit ] ( err => {
729
- if ( err ) return callback ( err ) ;
730
- return next ( cursor , blocking , callback ) ;
731
- } ) ;
732
-
733
- return ;
715
+ const init = promisify ( cb => cursor [ kInit ] ( cb ) ) ;
716
+ await init ( ) ;
717
+ return next ( cursor , blocking ) ;
734
718
}
735
719
736
720
if ( cursorIsDead ( cursor ) ) {
737
- return cleanupCursor ( cursor , undefined , ( ) => callback ( undefined , null ) ) ;
721
+ try {
722
+ await cleanupCursorAsync ( cursor , undefined ) ;
723
+ // eslint-disable-next-line no-empty
724
+ } catch { }
725
+ return null ;
738
726
}
739
727
740
728
// otherwise need to call getMore
741
729
const batchSize = cursor [ kOptions ] . batchSize || 1000 ;
742
- cursor . _getMore ( batchSize , ( error , response ) => {
743
- if ( response ) {
744
- const cursorId =
745
- typeof response . cursor . id === 'number'
746
- ? Long . fromNumber ( response . cursor . id )
747
- : typeof response . cursor . id === 'bigint'
748
- ? Long . fromBigInt ( response . cursor . id )
749
- : response . cursor . id ;
750
-
751
- cursor [ kDocuments ] . pushMany ( response . cursor . nextBatch ) ;
752
- cursor [ kId ] = cursorId ;
753
- }
754
-
730
+ const getMore = promisify ( ( batchSize : number , cb : Callback < Document | undefined > ) =>
731
+ cursor . _getMore ( batchSize , cb )
732
+ ) ;
733
+
734
+ let response : Document | undefined ;
735
+ try {
736
+ response = await getMore ( batchSize ) ;
737
+ } catch ( error ) {
755
738
if ( error || cursorIsDead ( cursor ) ) {
756
- return cleanupCursor ( cursor , { error } , ( ) => callback ( error , nextDocument < T > ( cursor ) ) ) ;
739
+ try {
740
+ await cleanupCursorAsync ( cursor , { error } ) ;
741
+ // eslint-disable-next-line no-empty
742
+ } catch { }
743
+ throw error ;
757
744
}
745
+ }
758
746
759
- if ( cursor [ kDocuments ] . length === 0 && blocking === false ) {
760
- return callback ( undefined , null ) ;
761
- }
747
+ if ( response ) {
748
+ const cursorId =
749
+ typeof response . cursor . id === 'number'
750
+ ? Long . fromNumber ( response . cursor . id )
751
+ : typeof response . cursor . id === 'bigint'
752
+ ? Long . fromBigInt ( response . cursor . id )
753
+ : response . cursor . id ;
754
+
755
+ cursor [ kDocuments ] . pushMany ( response . cursor . nextBatch ) ;
756
+ cursor [ kId ] = cursorId ;
757
+ }
758
+
759
+ if ( cursor [ kDocuments ] . length === 0 && blocking === false ) {
760
+ return null ;
761
+ }
762
762
763
- next ( cursor , blocking , callback ) ;
764
- } ) ;
763
+ return next ( cursor , blocking ) ;
765
764
}
766
765
767
766
function cursorIsDead ( cursor : AbstractCursor ) : boolean {
@@ -881,7 +880,7 @@ class ReadableCursorStream extends Readable {
881
880
}
882
881
883
882
private _readNext ( ) {
884
- next ( this . _cursor , true , ( err , result ) => {
883
+ callbackify ( next ) ( this . _cursor , true , ( err , result ) => {
885
884
if ( err ) {
886
885
// NOTE: This is questionable, but we have a test backing the behavior. It seems the
887
886
// desired behavior is that a stream ends cleanly when a user explicitly closes
0 commit comments