Skip to content

Commit 1c84f82

Browse files
authored
feat(NODE-5243): add change stream split event (#3745)
1 parent f9b5677 commit 1c84f82

File tree

3 files changed

+76
-0
lines changed

3 files changed

+76
-0
lines changed

src/change_stream.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,14 @@ export interface ChangeStreamDocumentKey<TSchema extends Document = Document> {
162162
documentKey: { _id: InferIdType<TSchema>; [shardKey: string]: any };
163163
}
164164

165+
/** @public */
166+
export interface ChangeStreamSplitEvent {
167+
/** Which fragment of the change this is. */
168+
fragment: number;
169+
/** The total number of fragments. */
170+
of: number;
171+
}
172+
165173
/** @public */
166174
export interface ChangeStreamDocumentCommon {
167175
/**
@@ -192,6 +200,13 @@ export interface ChangeStreamDocumentCommon {
192200
* Only present if the operation is part of a multi-document transaction.
193201
*/
194202
lsid?: ServerSessionId;
203+
204+
/**
205+
* When the change stream's backing aggregation pipeline contains the $changeStreamSplitLargeEvent
206+
* stage, events larger than 16MB will be split into multiple events and contain the
207+
* following information about which fragment the current event is.
208+
*/
209+
splitEvent?: ChangeStreamSplitEvent;
195210
}
196211

197212
/** @public */

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ export type {
194194
ChangeStreamReplaceDocument,
195195
ChangeStreamReshardCollectionDocument,
196196
ChangeStreamShardCollectionDocument,
197+
ChangeStreamSplitEvent,
197198
ChangeStreamUpdateDocument,
198199
OperationTime,
199200
ResumeOptions,

test/integration/change-streams/change_streams.prose.test.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
import { expect } from 'chai';
2+
import { once } from 'events';
23
import * as sinon from 'sinon';
34
import { setTimeout } from 'timers';
5+
import { promisify } from 'util';
46

57
import {
8+
AbstractCursor,
69
type ChangeStream,
710
type CommandFailedEvent,
811
type CommandStartedEvent,
@@ -16,6 +19,7 @@ import {
1619
Timestamp
1720
} from '../../mongodb';
1821
import * as mock from '../../tools/mongodb-mock/index';
22+
import { getSymbolFrom } from '../../tools/utils';
1923
import { setupDatabase } from '../shared';
2024

2125
/**
@@ -68,6 +72,14 @@ function triggerResumableError(
6872
triggerError();
6973
}
7074

75+
const initIteratorMode = async (cs: ChangeStream) => {
76+
const init = getSymbolFrom(AbstractCursor.prototype, 'kInit');
77+
const initEvent = once(cs.cursor, 'init');
78+
await promisify(cs.cursor[init].bind(cs.cursor))();
79+
await initEvent;
80+
return;
81+
};
82+
7183
/** Waits for a change stream to start */
7284
function waitForStarted(changeStream, callback) {
7385
changeStream.cursor.once('init', () => {
@@ -938,4 +950,52 @@ describe('Change Stream prose tests', function () {
938950
}
939951
});
940952
});
953+
954+
describe('19. Validate that large ChangeStream events are split when using $changeStreamSplitLargeEvent', function () {
955+
let client;
956+
let db;
957+
let collection;
958+
let changeStream;
959+
960+
beforeEach(async function () {
961+
const configuration = this.configuration;
962+
client = configuration.newClient();
963+
db = client.db('test');
964+
// Create a new collection _C_ with changeStreamPreAndPostImages enabled.
965+
await db.createCollection('changeStreamSplitTests', {
966+
changeStreamPreAndPostImages: { enabled: true }
967+
});
968+
collection = db.collection('changeStreamSplitTests');
969+
});
970+
971+
afterEach(async function () {
972+
await changeStream.close();
973+
await collection.drop();
974+
await client.close();
975+
});
976+
977+
it('splits the event into multiple fragments', {
978+
metadata: { requires: { topology: '!single', mongodb: '>=7.0.0' } },
979+
test: async function () {
980+
// Insert into _C_ a document at least 10mb in size, e.g. { "value": "q"*10*1024*1024 }
981+
await collection.insertOne({ value: 'q'.repeat(10 * 1024 * 1024) });
982+
// Create a change stream _S_ by calling watch on _C_ with pipeline
983+
// [{ "$changeStreamSplitLargeEvent": {} }] and fullDocumentBeforeChange=required.
984+
changeStream = collection.watch([{ $changeStreamSplitLargeEvent: {} }], {
985+
fullDocumentBeforeChange: 'required'
986+
});
987+
await initIteratorMode(changeStream);
988+
// Call updateOne on _C_ with an empty query and an update setting the field to a new
989+
// large value, e.g. { "$set": { "value": "z"*10*1024*1024 } }.
990+
await collection.updateOne({}, { $set: { value: 'z'.repeat(10 * 1024 * 1024) } });
991+
// Collect two events from _S_.
992+
const eventOne = await changeStream.next();
993+
const eventTwo = await changeStream.next();
994+
// Assert that the events collected have splitEvent fields { "fragment": 1, "of": 2 }
995+
// and { "fragment": 2, "of": 2 }, in that order.
996+
expect(eventOne.splitEvent).to.deep.equal({ fragment: 1, of: 2 });
997+
expect(eventTwo.splitEvent).to.deep.equal({ fragment: 2, of: 2 });
998+
}
999+
});
1000+
});
9411001
});

0 commit comments

Comments
 (0)