@@ -361,7 +361,7 @@ export abstract class AbstractCursor<
361
361
return true ;
362
362
}
363
363
364
- const doc = await next < TSchema > ( this , true ) ;
364
+ const doc = await next < TSchema > ( this , true , false ) ;
365
365
366
366
if ( doc ) {
367
367
this [ kDocuments ] . unshift ( doc ) ;
@@ -680,48 +680,47 @@ export abstract class AbstractCursor<
680
680
}
681
681
}
682
682
683
- function nextDocument < T > ( cursor : AbstractCursor < T > ) : T | null {
684
- const doc = cursor [ kDocuments ] . shift ( ) ;
685
-
686
- if ( doc && cursor [ kTransform ] ) {
687
- return cursor [ kTransform ] ( doc ) as T ;
688
- }
689
-
690
- return doc ;
691
- }
692
-
693
683
/**
694
684
* @param cursor - the cursor on which to call `next`
695
685
* @param blocking - a boolean indicating whether or not the cursor should `block` until data
696
686
* is available. Generally, this flag is set to `false` because if the getMore returns no documents,
697
687
* the cursor has been exhausted. In certain scenarios (ChangeStreams, tailable await cursors and
698
688
* `tryNext`, for example) blocking is necessary because a getMore returning no documents does
699
689
* not indicate the end of the cursor.
690
+ * @param transform - if true, the cursor's transform function is applied to the result document (if the transform exists)
700
691
* @returns the next document in the cursor, or `null`. When `blocking` is `true`, a `null` document means
701
692
* the cursor has been exhausted. Otherwise, it means that there is no document available in the cursor's buffer.
702
693
*/
703
- async function next < T > ( cursor : AbstractCursor < T > , blocking : boolean ) : Promise < T | null > {
694
+ async function next < T > (
695
+ cursor : AbstractCursor < T > ,
696
+ blocking : boolean ,
697
+ transform = true
698
+ ) : Promise < T | null > {
704
699
const cursorId = cursor [ kId ] ;
705
700
if ( cursor . closed ) {
706
701
return null ;
707
702
}
708
703
709
704
if ( cursor [ kDocuments ] . length !== 0 ) {
710
- return nextDocument < T > ( cursor ) ;
705
+ const doc = cursor [ kDocuments ] . shift ( ) ;
706
+
707
+ if ( doc != null && transform && cursor [ kTransform ] ) {
708
+ return cursor [ kTransform ] ( doc ) ;
709
+ }
710
+
711
+ return doc ;
711
712
}
712
713
713
714
if ( cursorId == null ) {
714
715
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
715
716
const init = promisify ( cb => cursor [ kInit ] ( cb ) ) ;
716
717
await init ( ) ;
717
- return next ( cursor , blocking ) ;
718
+ return next ( cursor , blocking , transform ) ;
718
719
}
719
720
720
721
if ( cursorIsDead ( cursor ) ) {
721
- try {
722
- await cleanupCursorAsync ( cursor , undefined ) ;
723
- // eslint-disable-next-line no-empty
724
- } catch { }
722
+ // if the cursor is dead, we clean it up
723
+ await cleanupCursorAsync ( cursor ) ;
725
724
return null ;
726
725
}
727
726
@@ -735,11 +734,8 @@ async function next<T>(cursor: AbstractCursor<T>, blocking: boolean): Promise<T
735
734
try {
736
735
response = await getMore ( batchSize ) ;
737
736
} catch ( error ) {
738
- if ( error || cursorIsDead ( cursor ) ) {
739
- try {
740
- await cleanupCursorAsync ( cursor , { error } ) ;
741
- // eslint-disable-next-line no-empty
742
- } catch { }
737
+ if ( error ) {
738
+ await cleanupCursorAsync ( cursor , { error } ) ;
743
739
throw error ;
744
740
}
745
741
}
@@ -756,19 +752,40 @@ async function next<T>(cursor: AbstractCursor<T>, blocking: boolean): Promise<T
756
752
cursor [ kId ] = cursorId ;
757
753
}
758
754
755
+ if ( cursorIsDead ( cursor ) ) {
756
+ // If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
757
+ // we intentionally clean up the cursor to release its session back into the pool before the cursor
758
+ // is iterated. This prevents a cursor that is exhausted on the server from holding
759
+ // onto a session indefinitely until the AbstractCursor is iterated.
760
+ await cleanupCursorAsync ( cursor ) ;
761
+ }
762
+
759
763
if ( cursor [ kDocuments ] . length === 0 && blocking === false ) {
760
764
return null ;
761
765
}
762
766
763
- return next ( cursor , blocking ) ;
767
+ return next ( cursor , blocking , transform ) ;
764
768
}
765
769
766
770
function cursorIsDead ( cursor : AbstractCursor ) : boolean {
767
771
const cursorId = cursor [ kId ] ;
768
772
return ! ! cursorId && cursorId . isZero ( ) ;
769
773
}
770
774
771
- const cleanupCursorAsync = promisify ( cleanupCursor ) ;
775
+ const cleanupCursorAsyncInternal = promisify ( cleanupCursor ) ;
776
+
777
+ async function cleanupCursorAsync < T > (
778
+ cursor : AbstractCursor < T > ,
779
+ options : { needsToEmitClosed ?: boolean ; error ?: AnyError } = { }
780
+ ) : Promise < void > {
781
+ try {
782
+ await cleanupCursorAsyncInternal ( cursor , options ) ;
783
+ } catch {
784
+ // `cleanupCursor` never throws but we can't really test that.
785
+ // so this is a hack to ensure that any upstream consumers
786
+ // can safely guarantee on this wrapper never throwing.
787
+ }
788
+ }
772
789
773
790
function cleanupCursor (
774
791
cursor : AbstractCursor ,
0 commit comments