Skip to content

Commit fc7a6da

Browse files
committed
feat(NODE-5243): add change stream split event
1 parent f9b5677 commit fc7a6da

File tree

3 files changed

+59
-0
lines changed

3 files changed

+59
-0
lines changed

src/change_stream.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,14 @@ export interface ChangeStreamDocumentKey<TSchema extends Document = Document> {
162162
documentKey: { _id: InferIdType<TSchema>; [shardKey: string]: any };
163163
}
164164

165+
/** @public */
166+
export interface ChangeStreamSplitEvent {
167+
/** Which fragment of the change this is. */
168+
fragment: number;
169+
/** The total number of fragments. */
170+
of: number;
171+
}
172+
165173
/** @public */
166174
export interface ChangeStreamDocumentCommon {
167175
/**
@@ -192,6 +200,13 @@ export interface ChangeStreamDocumentCommon {
192200
* Only present if the operation is part of a multi-document transaction.
193201
*/
194202
lsid?: ServerSessionId;
203+
204+
/**
205+
* When the change stream's backing aggregation pipeline contains the $changeStreamSplitLargeEvent
206+
* stage, events larger than 16MB will be split into multiple events and contain the
207+
* following information about which fragment the current event is.
208+
*/
209+
splitEvent?: ChangeStreamSplitEvent;
195210
}
196211

197212
/** @public */

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ export type {
194194
ChangeStreamReplaceDocument,
195195
ChangeStreamReshardCollectionDocument,
196196
ChangeStreamShardCollectionDocument,
197+
ChangeStreamSplitEvent,
197198
ChangeStreamUpdateDocument,
198199
OperationTime,
199200
ResumeOptions,

test/integration/change-streams/change_streams.prose.test.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -938,4 +938,47 @@ describe('Change Stream prose tests', function () {
938938
}
939939
});
940940
});
941+
942+
describe('19. Validate that large ChangeStream events are split when using $changeStreamSplitLargeEvent', function () {
943+
let client;
944+
let db;
945+
let collection;
946+
947+
beforeEach(async function () {
948+
const configuration = this.configuration;
949+
client = configuration.newClient();
950+
db = client.db('test');
951+
// Create a new collection _C_ with changeStreamPreAndPostImages enabled.
952+
await db.createCollection('changeStreamSplitTests', {
953+
changeStreamPreAndPostImages: { enabled: true }
954+
});
955+
collection = db.collection('changeStreamSplitTests');
956+
});
957+
958+
afterEach(async function () {
959+
await collection.drop();
960+
await client.close();
961+
});
962+
963+
it('splits the event into multiple fragments', {
964+
metadata: { requires: { topology: 'replicaset', mongodb: '>=7.0.0' } },
965+
test: async function () {
966+
// Insert into _C_ a document at least 10mb in size, e.g. { "value": "q"*10*1024*1024 }
967+
await collection.insertOne({ value: 'q'.repeat(10 * 1024 * 1024) });
968+
// Create a change stream _S_ by calling watch on _C_ with pipeline
969+
// [{ "$changeStreamSplitLargeEvent": {} }] and fullDocumentBeforeChange=required.
970+
const changeStream = collection.watch([{ $changeStreamSplitLargeEvent: {} }], {
971+
fullDocumentBeforeChange: 'required'
972+
});
973+
// Call updateOne on _C_ with an empty query and an update setting the field to a new
974+
// large value, e.g. { "$set": { "value": "z"*10*1024*1024 } }.
975+
await collection.updateOne({}, { $set: { value: 'z'.repeat(10 * 1024 * 1024) } });
976+
// Collect two events from _S_.
977+
const eventOne = await changeStream.next();
978+
const eventTwo = await changeStream.next();
979+
expect(eventOne.splitEvent).to.deep.equal({ fragment: 1, of: 2 });
980+
expect(eventTwo.splitEvent).to.deep.equal({ fragment: 2, of: 2 });
981+
}
982+
});
983+
});
941984
});

0 commit comments

Comments
 (0)