@@ -577,35 +577,23 @@ function setIsIterator<TSchema>(changeStream: ChangeStream<TSchema>): void {
577
577
changeStream [ kMode ] = 'iterator' ;
578
578
}
579
579
580
- function applyKnownOptions ( source : Document , options : ReadonlyArray < string > ) {
581
- const result : Document = { } ;
582
-
583
- for ( const option of options ) {
584
- if ( source [ option ] ) {
585
- result [ option ] = source [ option ] ;
586
- }
587
- }
588
-
589
- return result ;
590
- }
591
-
592
580
/**
593
581
* Create a new change stream cursor based on self's configuration
594
582
* @internal
595
583
*/
596
584
function createChangeStreamCursor < TSchema > (
597
585
changeStream : ChangeStream < TSchema > ,
598
- changeStreamOptions : ChangeStreamOptions | ResumeOptions
586
+ options : ChangeStreamOptions | ResumeOptions
599
587
) : ChangeStreamCursor < TSchema > {
600
- const changeStreamStageOptions = applyKnownOptions ( changeStreamOptions , CHANGE_STREAM_OPTIONS ) ;
588
+ const changeStreamStageOptions = applyKnownOptions ( options , CHANGE_STREAM_OPTIONS ) ;
589
+ if ( changeStream . type === CHANGE_DOMAIN_TYPES . CLUSTER ) {
590
+ changeStreamStageOptions . allChangesForCluster = true ;
591
+ }
601
592
const pipeline = [ { $changeStream : changeStreamStageOptions } as Document ] . concat (
602
593
changeStream . pipeline
603
594
) ;
604
595
605
- const cursorOptions : ChangeStreamCursorOptions = applyKnownOptions (
606
- changeStreamOptions ,
607
- CURSOR_OPTIONS
608
- ) ;
596
+ const cursorOptions : ChangeStreamCursorOptions = applyKnownOptions ( options , CURSOR_OPTIONS ) ;
609
597
610
598
const changeStreamCursor = new ChangeStreamCursor < TSchema > (
611
599
getTopology ( changeStream . parent ) ,
@@ -625,6 +613,17 @@ function createChangeStreamCursor<TSchema>(
625
613
return changeStreamCursor ;
626
614
}
627
615
616
+ function applyKnownOptions ( source : Document , options : ReadonlyArray < string > ) {
617
+ const result : Document = { } ;
618
+
619
+ for ( const option of options ) {
620
+ if ( source [ option ] ) {
621
+ result [ option ] = source [ option ] ;
622
+ }
623
+ }
624
+
625
+ return result ;
626
+ }
628
627
interface TopologyWaitOptions {
629
628
start ?: number ;
630
629
timeout ?: number ;
0 commit comments