Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-5243): add change stream split event #3745

Merged
merged 5 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ export interface ChangeStreamDocumentKey<TSchema extends Document = Document> {
documentKey: { _id: InferIdType<TSchema>; [shardKey: string]: any };
}

/** @public */
export interface ChangeStreamSplitEvent {
/** Which fragment of the change this is. */
fragment: number;
/** The total number of fragments. */
of: number;
}

/** @public */
export interface ChangeStreamDocumentCommon {
/**
Expand Down Expand Up @@ -192,6 +200,13 @@ export interface ChangeStreamDocumentCommon {
* Only present if the operation is part of a multi-document transaction.
*/
lsid?: ServerSessionId;

/**
* When the change stream's backing aggregation pipeline contains the $changeStreamSplitLargeEvent
* stage, events larger than 16MB will be split into multiple events and contain the
* following information about which fragment the current event is.
*/
splitEvent?: ChangeStreamSplitEvent;
}

/** @public */
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ export type {
ChangeStreamReplaceDocument,
ChangeStreamReshardCollectionDocument,
ChangeStreamShardCollectionDocument,
ChangeStreamSplitEvent,
ChangeStreamUpdateDocument,
OperationTime,
ResumeOptions,
Expand Down
60 changes: 60 additions & 0 deletions test/integration/change-streams/change_streams.prose.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { expect } from 'chai';
import { once } from 'events';
import * as sinon from 'sinon';
import { setTimeout } from 'timers';
import { promisify } from 'util';

import {
AbstractCursor,
type ChangeStream,
type CommandFailedEvent,
type CommandStartedEvent,
Expand All @@ -16,6 +19,7 @@ import {
Timestamp
} from '../../mongodb';
import * as mock from '../../tools/mongodb-mock/index';
import { getSymbolFrom } from '../../tools/utils';
import { setupDatabase } from '../shared';

/**
Expand Down Expand Up @@ -68,6 +72,14 @@ function triggerResumableError(
triggerError();
}

const initIteratorMode = async (cs: ChangeStream) => {
const init = getSymbolFrom(AbstractCursor.prototype, 'kInit');
const initEvent = once(cs.cursor, 'init');
await promisify(cs.cursor[init].bind(cs.cursor))();
await initEvent;
return;
};

/** Waits for a change stream to start */
function waitForStarted(changeStream, callback) {
changeStream.cursor.once('init', () => {
Expand Down Expand Up @@ -938,4 +950,52 @@ describe('Change Stream prose tests', function () {
}
});
});

describe('19. Validate that large ChangeStream events are split when using $changeStreamSplitLargeEvent', function () {
let client;
let db;
let collection;
let changeStream;

beforeEach(async function () {
const configuration = this.configuration;
client = configuration.newClient();
db = client.db('test');
// Create a new collection _C_ with changeStreamPreAndPostImages enabled.
await db.createCollection('changeStreamSplitTests', {
changeStreamPreAndPostImages: { enabled: true }
});
collection = db.collection('changeStreamSplitTests');
});

afterEach(async function () {
await changeStream.close();
await collection.drop();
await client.close();
});

it('splits the event into multiple fragments', {
metadata: { requires: { topology: '!single', mongodb: '>=7.0.0' } },
test: async function () {
// Insert into _C_ a document at least 10mb in size, e.g. { "value": "q"*10*1024*1024 }
await collection.insertOne({ value: 'q'.repeat(10 * 1024 * 1024) });
// Create a change stream _S_ by calling watch on _C_ with pipeline
// [{ "$changeStreamSplitLargeEvent": {} }] and fullDocumentBeforeChange=required.
changeStream = collection.watch([{ $changeStreamSplitLargeEvent: {} }], {
fullDocumentBeforeChange: 'required'
});
await initIteratorMode(changeStream);
// Call updateOne on _C_ with an empty query and an update setting the field to a new
// large value, e.g. { "$set": { "value": "z"*10*1024*1024 } }.
await collection.updateOne({}, { $set: { value: 'z'.repeat(10 * 1024 * 1024) } });
// Collect two events from _S_.
const eventOne = await changeStream.next();
const eventTwo = await changeStream.next();
// Assert that the events collected have splitEvent fields { "fragment": 1, "of": 2 }
// and { "fragment": 2, "of": 2 }, in that order.
expect(eventOne.splitEvent).to.deep.equal({ fragment: 1, of: 2 });
expect(eventTwo.splitEvent).to.deep.equal({ fragment: 2, of: 2 });
}
});
});
});