@@ -78,6 +78,8 @@ export interface ResumeOptions {
78
78
maxAwaitTimeMS ?: number ;
79
79
collation ?: CollationOptions ;
80
80
readPreference ?: ReadPreference ;
81
+ resumeAfter ?: ResumeToken ;
82
+ startAfter ?: ResumeToken ;
81
83
}
82
84
83
85
/**
@@ -104,7 +106,7 @@ export interface PipeOptions {
104
106
* @public
105
107
*/
106
108
export interface ChangeStreamOptions extends AggregateOptions {
107
- /** Allowed values: ‘ updateLookup’ . When set to ‘ updateLookup’ , the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */
109
+ /** Allowed values: ' updateLookup' . When set to ' updateLookup' , the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. */
108
110
fullDocument ?: string ;
109
111
/** The maximum amount of time for the server to wait on new documents to satisfy a change stream query. */
110
112
maxAwaitTimeMS ?: number ;
@@ -456,22 +458,18 @@ export class ChangeStreamCursor<TSchema extends Document = Document> extends Abs
456
458
}
457
459
458
460
get resumeOptions ( ) : ResumeOptions {
459
- const result = { } as ResumeOptions ;
460
- for ( const optionName of CURSOR_OPTIONS ) {
461
- if ( Reflect . has ( this . options , optionName ) ) {
462
- Reflect . set ( result , optionName , Reflect . get ( this . options , optionName ) ) ;
463
- }
464
- }
461
+ const result : ResumeOptions = applyKnownOptions ( this . options , CURSOR_OPTIONS ) ;
465
462
466
463
if ( this . resumeToken || this . startAtOperationTime ) {
467
- [ 'resumeAfter' , 'startAfter' , 'startAtOperationTime' ] . forEach ( key =>
468
- Reflect . deleteProperty ( result , key )
469
- ) ;
464
+ for ( const key of [ 'resumeAfter' , 'startAfter' , 'startAtOperationTime' ] ) {
465
+ Reflect . deleteProperty ( result , key ) ;
466
+ }
470
467
471
468
if ( this . resumeToken ) {
472
469
const resumeKey =
473
470
this . options . startAfter && ! this . hasReceived ? 'startAfter' : 'resumeAfter' ;
474
- Reflect . set ( result , resumeKey , this . resumeToken ) ;
471
+
472
+ result [ resumeKey ] = this . resumeToken ;
475
473
} else if ( this . startAtOperationTime && maxWireVersion ( this . server ) >= 7 ) {
476
474
result . startAtOperationTime = this . startAtOperationTime ;
477
475
}
@@ -579,48 +577,35 @@ function setIsIterator<TSchema>(changeStream: ChangeStream<TSchema>): void {
579
577
changeStream [ kMode ] = 'iterator' ;
580
578
}
581
579
582
- function createChangeStreamStageOptions < TSchema > (
583
- changeStream : ChangeStream < TSchema > ,
584
- changeStreamOptions : ChangeStreamOptions
585
- ) {
586
- const changeStreamStageOptions : Document = {
587
- fullDocument : changeStreamOptions . fullDocument
588
- } ;
580
+ function applyKnownOptions ( source : Document , options : ReadonlyArray < string > ) {
581
+ const result : Document = { } ;
589
582
590
- for ( const optionName of CHANGE_STREAM_OPTIONS ) {
591
- if ( changeStreamOptions [ optionName ] ) {
592
- changeStreamStageOptions [ optionName ] = changeStreamOptions [ optionName ] ;
583
+ for ( const option of options ) {
584
+ if ( source [ option ] ) {
585
+ result [ option ] = source [ option ] ;
593
586
}
594
587
}
595
588
596
- if ( changeStream . type === CHANGE_DOMAIN_TYPES . CLUSTER ) {
597
- changeStreamStageOptions . allChangesForCluster = true ;
598
- }
599
-
600
- return changeStreamStageOptions ;
589
+ return result ;
601
590
}
591
+
602
592
/**
603
593
* Create a new change stream cursor based on self's configuration
604
594
* @internal
605
595
*/
606
596
function createChangeStreamCursor < TSchema > (
607
597
changeStream : ChangeStream < TSchema > ,
608
- changeStreamOptions : ChangeStreamOptions
598
+ changeStreamOptions : ChangeStreamOptions | ResumeOptions
609
599
) : ChangeStreamCursor < TSchema > {
610
- const changeStreamStageOptions = createChangeStreamStageOptions (
611
- changeStream ,
612
- changeStreamOptions
613
- ) ;
600
+ const changeStreamStageOptions = applyKnownOptions ( changeStreamOptions , CHANGE_STREAM_OPTIONS ) ;
614
601
const pipeline = [ { $changeStream : changeStreamStageOptions } as Document ] . concat (
615
602
changeStream . pipeline
616
603
) ;
617
604
618
- const cursorOptions : Document = { } ;
619
- for ( const optionName of CURSOR_OPTIONS ) {
620
- if ( changeStreamOptions [ optionName ] ) {
621
- cursorOptions [ optionName ] = changeStreamOptions [ optionName ] ;
622
- }
623
- }
605
+ const cursorOptions : ChangeStreamCursorOptions = applyKnownOptions (
606
+ changeStreamOptions ,
607
+ CURSOR_OPTIONS
608
+ ) ;
624
609
625
610
const changeStreamCursor = new ChangeStreamCursor < TSchema > (
626
611
getTopology ( changeStream . parent ) ,
0 commit comments