diff --git a/src/change_stream.ts b/src/change_stream.ts index 3148578d12..5041f1ee50 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -162,6 +162,14 @@ export interface ChangeStreamDocumentKey { documentKey: { _id: InferIdType; [shardKey: string]: any }; } +/** @public */ +export interface ChangeStreamSplitEvent { + /** Which fragment of the change this is. */ + fragment: number; + /** The total number of fragments. */ + of: number; +} + /** @public */ export interface ChangeStreamDocumentCommon { /** @@ -192,6 +200,13 @@ export interface ChangeStreamDocumentCommon { * Only present if the operation is part of a multi-document transaction. */ lsid?: ServerSessionId; + + /** + * When the change stream's backing aggregation pipeline contains the $changeStreamSplitLargeEvent + * stage, events larger than 16MB will be split into multiple events and contain the + * following information about which fragment the current event is. + */ + splitEvent?: ChangeStreamSplitEvent; } /** @public */ diff --git a/src/index.ts b/src/index.ts index 881029d524..a35c41565d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -194,6 +194,7 @@ export type { ChangeStreamReplaceDocument, ChangeStreamReshardCollectionDocument, ChangeStreamShardCollectionDocument, + ChangeStreamSplitEvent, ChangeStreamUpdateDocument, OperationTime, ResumeOptions, diff --git a/test/integration/change-streams/change_streams.prose.test.ts b/test/integration/change-streams/change_streams.prose.test.ts index 6b155c05c6..1b74cb7a97 100644 --- a/test/integration/change-streams/change_streams.prose.test.ts +++ b/test/integration/change-streams/change_streams.prose.test.ts @@ -938,4 +938,47 @@ describe('Change Stream prose tests', function () { } }); }); + + describe('19. Validate that large ChangeStream events are split when using $changeStreamSplitLargeEvent', function () { + let client; + let db; + let collection; + + beforeEach(async function () { + const configuration = this.configuration; + client = configuration.newClient(); + db = client.db('test'); + // Create a new collection _C_ with changeStreamPreAndPostImages enabled. + await db.createCollection('changeStreamSplitTests', { + changeStreamPreAndPostImages: { enabled: true } + }); + collection = db.collection('changeStreamSplitTests'); + }); + + afterEach(async function () { + await collection.drop(); + await client.close(); + }); + + it('splits the event into multiple fragments', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=7.0.0' } }, + test: async function () { + // Insert into _C_ a document at least 10mb in size, e.g. { "value": "q"*10*1024*1024 } + await collection.insertOne({ value: 'q'.repeat(10 * 1024 * 1024) }); + // Create a change stream _S_ by calling watch on _C_ with pipeline + // [{ "$changeStreamSplitLargeEvent": {} }] and fullDocumentBeforeChange=required. + const changeStream = collection.watch([{ $changeStreamSplitLargeEvent: {} }], { + fullDocumentBeforeChange: 'required' + }); + // Call updateOne on _C_ with an empty query and an update setting the field to a new + // large value, e.g. { "$set": { "value": "z"*10*1024*1024 } }. + await collection.updateOne({}, { $set: { value: 'z'.repeat(10 * 1024 * 1024) } }); + // Collect two events from _S_. + const eventOne = await changeStream.next(); + const eventTwo = await changeStream.next(); + expect(eventOne.splitEvent).to.deep.equal({ fragment: 1, of: 2 }); + expect(eventTwo.splitEvent).to.deep.equal({ fragment: 2, of: 2 }); + } + }); + }); });