1
1
import { Readable , Transform } from 'stream' ;
2
- import { callbackify , promisify } from 'util' ;
2
+ import { promisify } from 'util' ;
3
3
4
4
import { type BSONSerializeOptions , type Document , Long , pluckBSONSerializeOptions } from '../bson' ;
5
5
import {
@@ -708,7 +708,10 @@ async function next<T>(
708
708
try {
709
709
return cursor [ kTransform ] ( doc ) ;
710
710
} catch ( error ) {
711
- await cleanupCursorAsync ( cursor , { error, needsToEmitClosed : true } ) ;
711
+ await cleanupCursorAsync ( cursor , { error, needsToEmitClosed : true } ) . catch ( ( ) => {
712
+ // `cleanupCursorAsync` should never throw, but if it does we want to throw the original
713
+ // error instead.
714
+ } ) ;
712
715
throw error ;
713
716
}
714
717
}
@@ -725,7 +728,9 @@ async function next<T>(
725
728
726
729
if ( cursorIsDead ( cursor ) ) {
727
730
// if the cursor is dead, we clean it up
728
- await cleanupCursorAsync ( cursor ) ;
731
+ // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
732
+ // and we should surface the error
733
+ await cleanupCursorAsync ( cursor , { } ) ;
729
734
return null ;
730
735
}
731
736
@@ -740,7 +745,10 @@ async function next<T>(
740
745
response = await getMore ( batchSize ) ;
741
746
} catch ( error ) {
742
747
if ( error ) {
743
- await cleanupCursorAsync ( cursor , { error } ) ;
748
+ await cleanupCursorAsync ( cursor , { error } ) . catch ( ( ) => {
749
+ // `cleanupCursorAsync` should never throw, but if it does we want to throw the original
750
+ // error instead.
751
+ } ) ;
744
752
throw error ;
745
753
}
746
754
}
@@ -762,7 +770,10 @@ async function next<T>(
762
770
// we intentionally clean up the cursor to release its session back into the pool before the cursor
763
771
// is iterated. This prevents a cursor that is exhausted on the server from holding
764
772
// onto a session indefinitely until the AbstractCursor is iterated.
765
- await cleanupCursorAsync ( cursor ) ;
773
+ //
774
+ // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
775
+ // and we should surface the error
776
+ await cleanupCursorAsync ( cursor , { } ) ;
766
777
}
767
778
768
779
if ( cursor [ kDocuments ] . length === 0 && blocking === false ) {
@@ -777,20 +788,7 @@ function cursorIsDead(cursor: AbstractCursor): boolean {
777
788
return ! ! cursorId && cursorId . isZero ( ) ;
778
789
}
779
790
780
- const cleanupCursorAsyncInternal = promisify ( cleanupCursor ) ;
781
-
782
- async function cleanupCursorAsync < T > (
783
- cursor : AbstractCursor < T > ,
784
- options : { needsToEmitClosed ?: boolean ; error ?: AnyError } = { }
785
- ) : Promise < void > {
786
- try {
787
- await cleanupCursorAsyncInternal ( cursor , options ) ;
788
- } catch {
789
- // `cleanupCursor` never throws but we can't really test that.
790
- // so this is a hack to ensure that any upstream consumers
791
- // can safely guarantee on this wrapper never throwing.
792
- }
793
- }
791
+ const cleanupCursorAsync = promisify ( cleanupCursor ) ;
794
792
795
793
function cleanupCursor (
796
794
cursor : AbstractCursor ,
@@ -802,6 +800,10 @@ function cleanupCursor(
802
800
const server = cursor [ kServer ] ;
803
801
const session = cursor [ kSession ] ;
804
802
const error = options ?. error ;
803
+
804
+ // Cursors only emit closed events once the client-side cursor has been exhausted fully or there
805
+ // was an error. Notably, when the server returns a cursor id of 0 and a non-empty batch, we
806
+ // cleanup the cursor but don't emit a `close` event.
805
807
const needsToEmitClosed = options ?. needsToEmitClosed ?? cursor [ kDocuments ] . length === 0 ;
806
808
807
809
if ( error ) {
0 commit comments