From fc7a6daf8c5515944fcaa7b4a99eef83785542df Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 27 Jun 2023 21:14:42 +0200 Subject: [PATCH 1/5] feat(NODE-5243): add change stream split event --- src/change_stream.ts | 15 +++++++ src/index.ts | 1 + .../change_streams.prose.test.ts | 43 +++++++++++++++++++ 3 files changed, 59 insertions(+) diff --git a/src/change_stream.ts b/src/change_stream.ts index 3148578d12..5041f1ee50 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -162,6 +162,14 @@ export interface ChangeStreamDocumentKey { documentKey: { _id: InferIdType; [shardKey: string]: any }; } +/** @public */ +export interface ChangeStreamSplitEvent { + /** Which fragment of the change this is. */ + fragment: number; + /** The total number of fragments. */ + of: number; +} + /** @public */ export interface ChangeStreamDocumentCommon { /** @@ -192,6 +200,13 @@ export interface ChangeStreamDocumentCommon { * Only present if the operation is part of a multi-document transaction. */ lsid?: ServerSessionId; + + /** + * When the change stream's backing aggregation pipeline contains the $changeStreamSplitLargeEvent + * stage, events larger than 16MB will be split into multiple events and contain the + * following information about which fragment the current event is. + */ + splitEvent?: ChangeStreamSplitEvent; } /** @public */ diff --git a/src/index.ts b/src/index.ts index 881029d524..a35c41565d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -194,6 +194,7 @@ export type { ChangeStreamReplaceDocument, ChangeStreamReshardCollectionDocument, ChangeStreamShardCollectionDocument, + ChangeStreamSplitEvent, ChangeStreamUpdateDocument, OperationTime, ResumeOptions, diff --git a/test/integration/change-streams/change_streams.prose.test.ts b/test/integration/change-streams/change_streams.prose.test.ts index 6b155c05c6..1b74cb7a97 100644 --- a/test/integration/change-streams/change_streams.prose.test.ts +++ b/test/integration/change-streams/change_streams.prose.test.ts @@ -938,4 +938,47 @@ describe('Change Stream prose tests', function () { } }); }); + + describe('19. Validate that large ChangeStream events are split when using $changeStreamSplitLargeEvent', function () { + let client; + let db; + let collection; + + beforeEach(async function () { + const configuration = this.configuration; + client = configuration.newClient(); + db = client.db('test'); + // Create a new collection _C_ with changeStreamPreAndPostImages enabled. + await db.createCollection('changeStreamSplitTests', { + changeStreamPreAndPostImages: { enabled: true } + }); + collection = db.collection('changeStreamSplitTests'); + }); + + afterEach(async function () { + await collection.drop(); + await client.close(); + }); + + it('splits the event into multiple fragments', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=7.0.0' } }, + test: async function () { + // Insert into _C_ a document at least 10mb in size, e.g. { "value": "q"*10*1024*1024 } + await collection.insertOne({ value: 'q'.repeat(10 * 1024 * 1024) }); + // Create a change stream _S_ by calling watch on _C_ with pipeline + // [{ "$changeStreamSplitLargeEvent": {} }] and fullDocumentBeforeChange=required. + const changeStream = collection.watch([{ $changeStreamSplitLargeEvent: {} }], { + fullDocumentBeforeChange: 'required' + }); + // Call updateOne on _C_ with an empty query and an update setting the field to a new + // large value, e.g. { "$set": { "value": "z"*10*1024*1024 } }. + await collection.updateOne({}, { $set: { value: 'z'.repeat(10 * 1024 * 1024) } }); + // Collect two events from _S_. + const eventOne = await changeStream.next(); + const eventTwo = await changeStream.next(); + expect(eventOne.splitEvent).to.deep.equal({ fragment: 1, of: 2 }); + expect(eventTwo.splitEvent).to.deep.equal({ fragment: 2, of: 2 }); + } + }); + }); }); From fa0c958d093537a19f7f9b02b636bd40bcedc997 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 27 Jun 2023 21:32:12 +0200 Subject: [PATCH 2/5] test: update comment --- test/integration/change-streams/change_streams.prose.test.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/integration/change-streams/change_streams.prose.test.ts b/test/integration/change-streams/change_streams.prose.test.ts index 1b74cb7a97..38c4f6392e 100644 --- a/test/integration/change-streams/change_streams.prose.test.ts +++ b/test/integration/change-streams/change_streams.prose.test.ts @@ -976,6 +976,8 @@ describe('Change Stream prose tests', function () { // Collect two events from _S_. const eventOne = await changeStream.next(); const eventTwo = await changeStream.next(); + // Assert that the events collected have splitEvent fields { "fragment": 1, "of": 2 } + // and { "fragment": 2, "of": 2 }, in that order. expect(eventOne.splitEvent).to.deep.equal({ fragment: 1, of: 2 }); expect(eventTwo.splitEvent).to.deep.equal({ fragment: 2, of: 2 }); } From 3a0f13036560e3753c9a38455c950571dd344e7b Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 27 Jun 2023 21:34:03 +0200 Subject: [PATCH 3/5] test: close change stream --- test/integration/change-streams/change_streams.prose.test.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/test/integration/change-streams/change_streams.prose.test.ts b/test/integration/change-streams/change_streams.prose.test.ts index 38c4f6392e..f51ffe5178 100644 --- a/test/integration/change-streams/change_streams.prose.test.ts +++ b/test/integration/change-streams/change_streams.prose.test.ts @@ -976,6 +976,7 @@ describe('Change Stream prose tests', function () { // Collect two events from _S_. const eventOne = await changeStream.next(); const eventTwo = await changeStream.next(); + await changeStream.close(); // Assert that the events collected have splitEvent fields { "fragment": 1, "of": 2 } // and { "fragment": 2, "of": 2 }, in that order. expect(eventOne.splitEvent).to.deep.equal({ fragment: 1, of: 2 }); From 522937a26bc873e3a2cae03b9e1502e4274a340c Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 27 Jun 2023 22:18:32 +0200 Subject: [PATCH 4/5] test: fix prose test hang --- .../change-streams/change_streams.prose.test.ts | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/test/integration/change-streams/change_streams.prose.test.ts b/test/integration/change-streams/change_streams.prose.test.ts index f51ffe5178..87c0db3f37 100644 --- a/test/integration/change-streams/change_streams.prose.test.ts +++ b/test/integration/change-streams/change_streams.prose.test.ts @@ -1,8 +1,11 @@ import { expect } from 'chai'; +import { once } from 'events'; import * as sinon from 'sinon'; import { setTimeout } from 'timers'; +import { promisify } from 'util'; import { + AbstractCursor, type ChangeStream, type CommandFailedEvent, type CommandStartedEvent, @@ -16,6 +19,7 @@ import { Timestamp } from '../../mongodb'; import * as mock from '../../tools/mongodb-mock/index'; +import { getSymbolFrom } from '../../tools/utils'; import { setupDatabase } from '../shared'; /** @@ -68,6 +72,14 @@ function triggerResumableError( triggerError(); } +const initIteratorMode = async (cs: ChangeStream) => { + const init = getSymbolFrom(AbstractCursor.prototype, 'kInit'); + const initEvent = once(cs.cursor, 'init'); + await promisify(cs.cursor[init].bind(cs.cursor))(); + await initEvent; + return; +}; + /** Waits for a change stream to start */ function waitForStarted(changeStream, callback) { changeStream.cursor.once('init', () => { @@ -961,7 +973,7 @@ describe('Change Stream prose tests', function () { }); it('splits the event into multiple fragments', { - metadata: { requires: { topology: 'replicaset', mongodb: '>=7.0.0' } }, + metadata: { requires: { topology: '!single', mongodb: '>=7.0.0' } }, test: async function () { // Insert into _C_ a document at least 10mb in size, e.g. { "value": "q"*10*1024*1024 } await collection.insertOne({ value: 'q'.repeat(10 * 1024 * 1024) }); @@ -970,6 +982,7 @@ describe('Change Stream prose tests', function () { const changeStream = collection.watch([{ $changeStreamSplitLargeEvent: {} }], { fullDocumentBeforeChange: 'required' }); + await initIteratorMode(changeStream); // Call updateOne on _C_ with an empty query and an update setting the field to a new // large value, e.g. { "$set": { "value": "z"*10*1024*1024 } }. await collection.updateOne({}, { $set: { value: 'z'.repeat(10 * 1024 * 1024) } }); From 995251b492f8295755c41711dc317a798b15de62 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Wed, 28 Jun 2023 15:48:54 +0200 Subject: [PATCH 5/5] test: move close to afterEach hook --- test/integration/change-streams/change_streams.prose.test.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/integration/change-streams/change_streams.prose.test.ts b/test/integration/change-streams/change_streams.prose.test.ts index 87c0db3f37..776790ae23 100644 --- a/test/integration/change-streams/change_streams.prose.test.ts +++ b/test/integration/change-streams/change_streams.prose.test.ts @@ -955,6 +955,7 @@ describe('Change Stream prose tests', function () { let client; let db; let collection; + let changeStream; beforeEach(async function () { const configuration = this.configuration; @@ -968,6 +969,7 @@ describe('Change Stream prose tests', function () { }); afterEach(async function () { + await changeStream.close(); await collection.drop(); await client.close(); }); @@ -979,7 +981,7 @@ describe('Change Stream prose tests', function () { await collection.insertOne({ value: 'q'.repeat(10 * 1024 * 1024) }); // Create a change stream _S_ by calling watch on _C_ with pipeline // [{ "$changeStreamSplitLargeEvent": {} }] and fullDocumentBeforeChange=required. - const changeStream = collection.watch([{ $changeStreamSplitLargeEvent: {} }], { + changeStream = collection.watch([{ $changeStreamSplitLargeEvent: {} }], { fullDocumentBeforeChange: 'required' }); await initIteratorMode(changeStream); @@ -989,7 +991,6 @@ describe('Change Stream prose tests', function () { // Collect two events from _S_. const eventOne = await changeStream.next(); const eventTwo = await changeStream.next(); - await changeStream.close(); // Assert that the events collected have splitEvent fields { "fragment": 1, "of": 2 } // and { "fragment": 2, "of": 2 }, in that order. expect(eventOne.splitEvent).to.deep.equal({ fragment: 1, of: 2 });