Skip to content

Commit

Permalink
feat(changeStream): Adding new 4.0 ChangeStream features
Browse files Browse the repository at this point in the history
Adds new ChangeStream features for 4.0 as per SPEC-1057

- Db.watch() method
- MongoClient.watch() method
- startAtClusterTime option

Fixes NODE-1483
  • Loading branch information
daprahamian committed Jun 13, 2018
1 parent 7018c1e commit 2cb4894
Show file tree
Hide file tree
Showing 13 changed files with 1,940 additions and 303 deletions.
398 changes: 213 additions & 185 deletions lib/change_stream.js

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions lib/collection.js
Original file line number Diff line number Diff line change
Expand Up @@ -2453,14 +2453,15 @@ Collection.prototype.aggregate = function(pipeline, options, callback) {
* Create a new Change Stream, watching for new changes (insertions, updates, replacements, deletions, and invalidations) in this collection.
* @method
* @since 3.0.0
* @param {Array} [pipeline=null] An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
* @param {object} [options=null] Optional settings
* @param {Array} [pipeline] An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
* @param {object} [options] Optional settings
* @param {string} [options.fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
* @param {object} [options.resumeAfter=null] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document.
* @param {object} [options.resumeAfter] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document.
* @param {number} [options.maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query
* @param {number} [options.batchSize=null] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
* @param {object} [options.collation=null] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
* @param {ReadPreference} [options.readPreference=null] The read preference. Defaults to the read preference of the database or collection. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}.
* @param {number} [options.batchSize] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
* @param {object} [options.collation] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
* @param {ReadPreference} [options.readPreference] The read preference. Defaults to the read preference of the database or collection. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}.
* @param {Timestamp} [options.startAtClusterTime] receive change events that occur after the specified timestamp
* @param {ClientSession} [options.session] optional session to use for this operation
* @return {ChangeStream} a ChangeStream instance.
*/
Expand Down
8 changes: 7 additions & 1 deletion lib/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,13 @@ Cursor.prototype.close = function(options, callback) {
};

if (this.s.session) {
return this._endSession(() => completeClose());
if (typeof callback === 'function') {
return this._endSession(() => completeClose());
}

return new this.s.promiseLibrary(resolve => {
this._endSession(() => completeClose().then(resolve));
});
}

return completeClose();
Expand Down
30 changes: 30 additions & 0 deletions lib/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const mergeOptionsAndWriteConcern = require('./utils').mergeOptionsAndWriteConce
const executeOperation = require('./utils').executeOperation;
const applyWriteConcern = require('./utils').applyWriteConcern;
const convertReadPreference = require('./utils').convertReadPreference;
const ChangeStream = require('./change_stream');

// Operations
const addUser = require('./operations/db_ops').addUser;
Expand Down Expand Up @@ -876,6 +877,35 @@ Db.prototype.unref = function() {
this.s.topology.unref();
};

/**
* Create a new Change Stream, watching for new changes (insertions, updates, replacements, deletions, and invalidations) in this database. Will ignore all changes to system collections.
* @method
* @since 3.1.0
* @param {Array} [pipeline] An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
* @param {object} [options] Optional settings
* @param {string} [options.fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
* @param {object} [options.resumeAfter] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document.
* @param {number} [options.maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query
* @param {number} [options.batchSize] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
* @param {object} [options.collation] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
* @param {ReadPreference} [options.readPreference] The read preference. Defaults to the read preference of the database. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}.
* @param {Timestamp} [options.startAtClusterTime] receive change events that occur after the specified timestamp
* @param {ClientSession} [options.session] optional session to use for this operation
* @return {ChangeStream} a ChangeStream instance.
*/
Db.prototype.watch = function(pipeline, options) {
pipeline = pipeline || [];
options = options || {};

// Allow optionally not specifying a pipeline
if (!Array.isArray(pipeline)) {
options = pipeline;
pipeline = [];
}

return new ChangeStream(this, pipeline, options);
};

/**
* Db close event
*
Expand Down
30 changes: 30 additions & 0 deletions lib/mongo_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const shallowClone = require('./utils').shallowClone;
const authenticate = require('./authenticate');
const ServerSessionPool = require('mongodb-core').Sessions.ServerSessionPool;
const executeOperation = require('./utils').executeOperation;
const ChangeStream = require('./change_stream');

const legacyParse = deprecate(
require('./url_parser'),
Expand Down Expand Up @@ -596,6 +597,35 @@ MongoClient.prototype.withSession = function(options, operation) {
return cleanupHandler(err, null, { throw: false });
}
};
/**
* Create a new Change Stream, watching for new changes (insertions, updates, replacements, deletions, and invalidations) in this cluster. Will ignore all changes to system collections, as well as the local, admin,
* and config databases.
* @method
* @since 3.1.0
* @param {Array} [pipeline] An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
* @param {object} [options] Optional settings
* @param {string} [options.fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
* @param {object} [options.resumeAfter] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document.
* @param {number} [options.maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query
* @param {number} [options.batchSize] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
* @param {object} [options.collation] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
* @param {ReadPreference} [options.readPreference] The read preference. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}.
* @param {Timestamp} [options.startAtClusterTime] receive change events that occur after the specified timestamp
* @param {ClientSession} [options.session] optional session to use for this operation
* @return {ChangeStream} a ChangeStream instance.
*/
MongoClient.prototype.watch = function(pipeline, options) {
pipeline = pipeline || [];
options = options || {};

// Allow optionally not specifying a pipeline
if (!Array.isArray(pipeline)) {
options = pipeline;
pipeline = [];
}

return new ChangeStream(this, pipeline, options);
};

var mergeOptions = function(target, source, flatten) {
for (var name in source) {
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"eslint-plugin-prettier": "^2.2.0",
"istanbul": "^0.4.5",
"jsdoc": "3.5.5",
"lodash.camelcase": "^4.3.0",
"mongodb-extjson": "^2.1.1",
"mongodb-mock-server": "^1.0.0",
"mongodb-test-runner": "^1.1.18",
Expand Down
258 changes: 258 additions & 0 deletions test/functional/change_stream_spec_tests.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
'use strict';

const EJSON = require('mongodb-extjson');
const chai = require('chai');
const fs = require('fs');
const camelCase = require('lodash.camelcase');
const MongoClient = require('../../lib/mongo_client');
const setupDatabase = require('./shared').setupDatabase;
const delay = require('./shared').delay;
const expect = chai.expect;

describe('Change Stream Spec', function() {
const EJSONToJSON = x => JSON.parse(EJSON.stringify(x));

let globalClient;
let ctx;
let events;

before(function() {
return setupDatabase(this.configuration).then(() => {
globalClient = new MongoClient(this.configuration.url());
return globalClient.connect();
});
});

after(function() {
const gc = globalClient;
globalClient = undefined;
return new Promise(r => gc.close(() => r()));
});

fs
.readdirSync(`${__dirname}/spec/change-stream`)
.filter(filename => filename.match(/\.json$/))
.forEach(filename => {
const specString = fs.readFileSync(`${__dirname}/spec/change-stream/${filename}`, 'utf8');
const specData = JSON.parse(specString);

const ALL_DBS = [specData.database_name, specData.database2_name];

describe(filename, () => {
beforeEach(function() {
const gc = globalClient;
const sDB = specData.database_name;
const sColl = specData.collection_name;
return Promise.all(ALL_DBS.map(db => gc.db(db).dropDatabase()))
.then(() => gc.db(sDB).createCollection(sColl))
.then(() =>
new MongoClient(this.configuration.url(), { monitorCommands: true }).connect()
)
.then(client => {
ctx = { gc, client };
events = [];
const _events = events;

ctx.database = ctx.client.db(sDB);
ctx.collection = ctx.database.collection(sColl);
ctx.client.on('commandStarted', e => _events.push(e));
});
});

afterEach(function() {
const client = ctx.client;
ctx = undefined;
events = undefined;

return client && client.close();
});

specData.tests.forEach(test => {
const itFn = test.skip ? it.skip : test.only ? it.only : it;
const metadata = generateMetadata(test);
const testFn = generateTestFn(test);

itFn(test.description, { metadata, test: testFn });
});
});
});

// Fn Generator methods

function generateMetadata(test) {
const mongodb = test.minServerVersion;
const topology = test.topology;
const requires = {};
if (mongodb) {
requires.mongodb = `>=${mongodb}`;
}
if (topology) {
requires.topology = topology;
}

return { requires };
}

function generateTestFn(test) {
const testFnRunOperations = makeTestFnRunOperations(test);
const testSuccess = makeTestSuccess(test);
const testFailure = makeTestFailure(test);
const testAPM = makeTestAPM(test);

return function testFn() {
return testFnRunOperations(ctx)
.then(testSuccess, testFailure)
.then(() => testAPM(ctx, events));
};
}

function makeTestSuccess(test) {
const result = test.result;

return function testSuccess(value) {
if (result.error) {
throw new Error(`Expected test to return error ${result.error}`);
}

if (result.success) {
value = EJSONToJSON(value);
assertEquality(value, result.success);
}
};
}

function makeTestFailure(test) {
const result = test.result;

return function testFailure(err) {
if (!result.error) {
throw err;
}

assertEquality(err, result.error);
};
}

function makeTestAPM(test) {
const expectedEvents = test.expectations;

return function testAPM(ctx, events) {
expectedEvents
.map(e => e.command_started_event)
.map(normalizeAPMEvent)
.forEach((expected, idx) => {
if (!events[idx]) {
throw new Error(
`Expected there to be an APM event at index ${idx}, but there was none`
);
}
const actual = EJSONToJSON(events[idx]);
assertEquality(actual, expected);
});
};
}

function makeTestFnRunOperations(test) {
const target = test.target;
const operations = test.operations;
const success = test.result.success || [];

return function testFnRunOperations(ctx) {
const changeStreamPipeline = test.changeStreamPipeline;
const changeStreamOptions = test.changeStreamOptions;
ctx.changeStream = ctx[target].watch(changeStreamPipeline, changeStreamOptions);

const changeStreamPromise = readAndCloseChangeStream(ctx.changeStream, success.length);
const operationsPromise = runOperations(ctx.gc, operations);

return Promise.all([changeStreamPromise, operationsPromise]).then(args => args[0]);
};
}

function readAndCloseChangeStream(changeStream, numChanges) {
const close = makeChangeStreamCloseFn(changeStream);
let changeStreamPromise = changeStream.next().then(r => [r]);

for (let i = 1; i < numChanges; i += 1) {
changeStreamPromise = changeStreamPromise.then(results => {
return changeStream.next().then(result => {
results.push(result);
return results;
});
});
}

return changeStreamPromise.then(result => close(null, result), err => close(err));
}

function runOperations(client, operations) {
return operations
.map(op => makeOperation(client, op))
.reduce((p, op) => p.then(op), delay(200));
}

function makeChangeStreamCloseFn(changeStream) {
return function close(error, value) {
return new Promise((resolve, reject) => {
changeStream.close(err => {
if (error || err) {
return reject(error || err);
}
return resolve(value);
});
});
};
}

function normalizeAPMEvent(raw) {
return Object.keys(raw).reduce((agg, key) => {
agg[camelCase(key)] = raw[key];
return agg;
}, {});
}

function makeOperation(client, op) {
const target = client.db(op.database).collection(op.collection);
const command = op.name;
const args = [];
if (op.arguments && op.arguments.document) {
args.push(op.arguments.document);
}
return () => target[command].apply(target, args);
}

function assertEquality(actual, expected) {
try {
_assertEquality(actual, expected);
} catch (e) {
console.dir(actual, { depth: 999 });
console.dir(expected, { depth: 999 });
throw e;
}
}

function _assertEquality(actual, expected) {
try {
if (expected === '42' || expected === 42) {
expect(actual).to.exist;
return;
}

expect(actual).to.be.a(Array.isArray(expected) ? 'array' : typeof expected);

if (expected == null) {
expect(actual).to.not.exist;
} else if (Array.isArray(expected)) {
expected.forEach((ex, idx) => _assertEquality(actual[idx], ex));
} else if (typeof expected === 'object') {
for (let i in expected) {
_assertEquality(actual[i], expected[i]);
}
} else {
expect(actual).to.equal(expected);
}
} catch (e) {
throw e;
}
}
});
Loading

0 comments on commit 2cb4894

Please sign in to comment.