diff --git a/lib/change_stream.js b/lib/change_stream.js index ec9a13d475..025d7e55f7 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -1,77 +1,225 @@ 'use strict'; const EventEmitter = require('events'); -const inherits = require('util').inherits; const isResumableError = require('./error').isResumableError; +const MongoError = require('mongodb-core').MongoError; var cursorOptionNames = ['maxAwaitTimeMS', 'collation', 'readPreference']; +const CHANGE_DOMAIN_TYPES = { + COLLECTION: Symbol('Collection'), + DATABASE: Symbol('Database'), + CLUSTER: Symbol('Cluster') +}; + /** * Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}. * @class ChangeStream * @since 3.0.0 - * @param {(Db|Collection)} changeDomain The collection against which to create the change stream + * @param {(MongoClient|Db|Collection)} changeDomain The domain against which to create the change stream * @param {Array} pipeline An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents - * @param {object} [options=null] Optional settings + * @param {object} [options] Optional settings * @param {string} [options.fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. * @param {number} [options.maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query - * @param {object} [options.resumeAfter=null] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document. - * @param {number} [options.batchSize=null] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. - * @param {object} [options.collation=null] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. - * @param {ReadPreference} [options.readPreference=null] The read preference. Defaults to the read preference of the database or collection. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}. + * @param {object} [options.resumeAfter] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document. + * @param {number} [options.batchSize] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. + * @param {object} [options.collation] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. + * @param {ReadPreference} [options.readPreference] The read preference. Defaults to the read preference of the database or collection. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}. * @fires ChangeStream#close * @fires ChangeStream#change * @fires ChangeStream#end * @fires ChangeStream#error * @return {ChangeStream} a ChangeStream instance. */ -var ChangeStream = function(collection, pipeline, options) { - var Collection = require('./collection'); - // Ensure the provided collection is actually a collection - if (!(collection instanceof Collection)) { - throw new Error( - 'collection provided to ChangeStream constructor is not an instance of Collection' - ); - } +class ChangeStream extends EventEmitter { + constructor(changeDomain, pipeline, options) { + super(); + const Collection = require('./collection'); + const Db = require('./db'); + const MongoClient = require('./mongo_client'); + + this.pipeline = pipeline || []; + this.options = options || {}; + this.cursorNamespace = undefined; + this.namespace = {}; + + if (changeDomain instanceof Collection) { + this.type = CHANGE_DOMAIN_TYPES.COLLECTION; + this.serverConfig = changeDomain.s.db.serverConfig; + + this.namespace = { + collection: changeDomain.collectionName, + database: changeDomain.s.db.databaseName + }; + + this.cursorNamespace = `${this.namespace.database}.${this.namespace.collection}`; + } else if (changeDomain instanceof Db) { + this.type = CHANGE_DOMAIN_TYPES.DATABASE; + this.namespace = { collection: '', database: changeDomain.databaseName }; + this.cursorNamespace = this.namespace.database; + this.serverConfig = changeDomain.serverConfig; + } else if (changeDomain instanceof MongoClient) { + this.type = CHANGE_DOMAIN_TYPES.CLUSTER; + this.namespace = { collection: '', database: 'admin' }; + this.cursorNamespace = this.namespace.database; + this.serverConfig = changeDomain.topology; + } else { + throw new TypeError( + 'changeDomain provided to ChangeStream constructor is not an instance of Collection, Db, or MongoClient' + ); + } - var self = this; - self.pipeline = pipeline || []; - self.options = options || {}; - self.promiseLibrary = collection.s.promiseLibrary; + this.promiseLibrary = changeDomain.s.promiseLibrary; + if (!this.options.readPreference && changeDomain.s.readPreference) { + this.options.readPreference = changeDomain.s.readPreference; + } - // Extract namespace and serverConfig from the collection - self.namespace = { - collection: collection.collectionName, - database: collection.s.db.databaseName - }; + // We need to get the operationTime as early as possible + const isMaster = this.serverConfig.lastIsMaster(); + if (!isMaster) { + throw new MongoError('ServerConfig does not have an ismaster yet.'); + } + + this.operationTime = isMaster.operationTime; + + // Create contained Change Stream cursor + this.cursor = createChangeStreamCursor(this); - self.serverConfig = collection.s.db.serverConfig; + // Listen for any `change` listeners being added to ChangeStream + this.on('newListener', eventName => { + if (eventName === 'change' && this.cursor && this.cursor.listenerCount('change') === 0) { + this.cursor.on('data', change => processNewChange(this, null, change)); + } + }); - // Determine correct read preference - self.options.readPreference = self.options.readPreference || collection.s.readPreference; + // Listen for all `change` listeners being removed from ChangeStream + this.on('removeListener', eventName => { + if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) { + this.cursor.removeAllListeners('data'); + } + }); + } - // Create contained Change Stream cursor - self.cursor = createChangeStreamCursor(self); + /** + * Check if there is any document still available in the Change Stream + * @function ChangeStream.prototype.hasNext + * @param {ChangeStream~resultCallback} [callback] The result callback. + * @throws {MongoError} + * @return {Promise} returns Promise if no callback passed + */ + hasNext(callback) { + return this.cursor.hasNext(callback); + } - // Listen for any `change` listeners being added to ChangeStream - self.on('newListener', function(eventName) { - if (eventName === 'change' && self.cursor && self.cursor.listenerCount('change') === 0) { - self.cursor.on('data', function(change) { - processNewChange(self, null, change); + /** + * Get the next available document from the Change Stream, returns null if no more documents are available. + * @function ChangeStream.prototype.next + * @param {ChangeStream~resultCallback} [callback] The result callback. + * @throws {MongoError} + * @return {Promise} returns Promise if no callback passed + */ + next(callback) { + var self = this; + if (this.isClosed()) { + if (callback) return callback(new Error('Change Stream is not open.'), null); + return self.promiseLibrary.reject(new Error('Change Stream is not open.')); + } + return this.cursor + .next() + .then(function(change) { + return processNewChange(self, null, change, callback); + }) + .catch(function(err) { + return processNewChange(self, err, null, callback); }); + } + + /** + * Is the cursor closed + * @method ChangeStream.prototype.isClosed + * @return {boolean} + */ + isClosed() { + if (this.cursor) { + return this.cursor.isClosed(); } - }); + return true; + } - // Listen for all `change` listeners being removed from ChangeStream - self.on('removeListener', function(eventName) { - if (eventName === 'change' && self.listenerCount('change') === 0 && self.cursor) { - self.cursor.removeAllListeners('data'); + /** + * Close the Change Stream + * @method ChangeStream.prototype.close + * @param {ChangeStream~resultCallback} [callback] The result callback. + * @return {Promise} returns Promise if no callback passed + */ + close(callback) { + if (!this.cursor) { + if (callback) return callback(); + return this.promiseLibrary.resolve(); } - }); -}; -inherits(ChangeStream, EventEmitter); + // Tidy up the existing cursor + var cursor = this.cursor; + delete this.cursor; + return cursor.close(callback); + } + + /** + * This method pulls all the data out of a readable stream, and writes it to the supplied destination, automatically managing the flow so that the destination is not overwhelmed by a fast readable stream. + * @method + * @param {Writable} destination The destination for writing data + * @param {object} [options] {@link https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options|Pipe options} + * @return {null} + */ + pipe(destination, options) { + if (!this.pipeDestinations) { + this.pipeDestinations = []; + } + this.pipeDestinations.push(destination); + return this.cursor.pipe(destination, options); + } + + /** + * This method will remove the hooks set up for a previous pipe() call. + * @param {Writable} [destination] The destination for writing data + * @return {null} + */ + unpipe(destination) { + if (this.pipeDestinations && this.pipeDestinations.indexOf(destination) > -1) { + this.pipeDestinations.splice(this.pipeDestinations.indexOf(destination), 1); + } + return this.cursor.unpipe(destination); + } + + /** + * Return a modified Readable stream including a possible transform method. + * @method + * @param {object} [options=null] Optional settings. + * @param {function} [options.transform=null] A transformation method applied to each document emitted by the stream. + * @return {Cursor} + */ + stream(options) { + this.streamOptions = options; + return this.cursor.stream(options); + } + + /** + * This method will cause a stream in flowing mode to stop emitting data events. Any data that becomes available will remain in the internal buffer. + * @return {null} + */ + pause() { + return this.cursor.pause(); + } + + /** + * This method will cause the readable stream to resume emitting data events. + * @return {null} + */ + resume() { + return this.cursor.resume(); + } +} // Create a new change stream cursor based on self's configuration var createChangeStreamCursor = function(self) { @@ -79,13 +227,7 @@ var createChangeStreamCursor = function(self) { self.options.resumeAfter = self.resumeToken; } - var changeStreamCursor = buildChangeStreamAggregationCommand( - self.serverConfig, - self.namespace, - self.pipeline, - self.resumeToken, - self.options - ); + var changeStreamCursor = buildChangeStreamAggregationCommand(self); /** * Fired for each new matching change in the specified namespace. Attaching a `change` event listener to a Change Stream will switch the stream into flowing mode. Data will then be passed as soon as it is available. @@ -139,20 +281,25 @@ var createChangeStreamCursor = function(self) { return changeStreamCursor; }; -var buildChangeStreamAggregationCommand = function( - serverConfig, - namespace, - pipeline, - resumeToken, - options -) { - var changeStreamStageOptions = {}; - if (options.fullDocument) { - changeStreamStageOptions.fullDocument = options.fullDocument; - } +var buildChangeStreamAggregationCommand = function(self) { + const serverConfig = self.serverConfig; + const namespace = self.namespace; + const pipeline = self.pipeline; + const resumeToken = self.resumeToken; + const options = self.options; + const cursorNamespace = self.cursorNamespace; + + const isMaster = serverConfig.lastIsMaster() || {}; + + var changeStreamStageOptions = { + fullDocument: options.fullDocument || 'default' + }; if (resumeToken || options.resumeAfter) { changeStreamStageOptions.resumeAfter = resumeToken || options.resumeAfter; + } else if (isMaster.maxWireVersion && isMaster.maxWireVersion >= 7) { + const ts = options.startAtOperationTime || self.operationTime; + changeStreamStageOptions.startAtOperationTime = ts; } // Map cursor options @@ -163,12 +310,16 @@ var buildChangeStreamAggregationCommand = function( } }); + if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) { + changeStreamStageOptions.allChangesForCluster = true; + } + var changeStreamPipeline = [{ $changeStream: changeStreamStageOptions }]; changeStreamPipeline = changeStreamPipeline.concat(pipeline); var command = { - aggregate: namespace.collection, + aggregate: self.type === CHANGE_DOMAIN_TYPES.COLLECTION ? namespace.collection : 1, pipeline: changeStreamPipeline, readConcern: { level: 'majority' }, cursor: { @@ -177,130 +328,7 @@ var buildChangeStreamAggregationCommand = function( }; // Create and return the cursor - return serverConfig.cursor( - namespace.database + '.' + namespace.collection, - command, - cursorOptions - ); -}; - -/** - * Check if there is any document still available in the Change Stream - * @function ChangeStream.prototype.hasNext - * @param {ChangeStream~resultCallback} [callback] The result callback. - * @throws {MongoError} - * @return {Promise} returns Promise if no callback passed - */ -ChangeStream.prototype.hasNext = function(callback) { - return this.cursor.hasNext(callback); -}; - -/** - * Get the next available document from the Change Stream, returns null if no more documents are available. - * @function ChangeStream.prototype.next - * @param {ChangeStream~resultCallback} [callback] The result callback. - * @throws {MongoError} - * @return {Promise} returns Promise if no callback passed - */ -ChangeStream.prototype.next = function(callback) { - var self = this; - if (this.isClosed()) { - if (callback) return callback(new Error('Change Stream is not open.'), null); - return self.promiseLibrary.reject(new Error('Change Stream is not open.')); - } - return this.cursor - .next() - .then(function(change) { - return processNewChange(self, null, change, callback); - }) - .catch(function(err) { - return processNewChange(self, err, null, callback); - }); -}; - -/** - * Is the cursor closed - * @method ChangeStream.prototype.isClosed - * @return {boolean} - */ -ChangeStream.prototype.isClosed = function() { - if (this.cursor) { - return this.cursor.isClosed(); - } - return true; -}; - -/** - * Close the Change Stream - * @method ChangeStream.prototype.close - * @param {ChangeStream~resultCallback} [callback] The result callback. - * @return {Promise} returns Promise if no callback passed - */ -ChangeStream.prototype.close = function(callback) { - if (!this.cursor) { - if (callback) return callback(); - return this.promiseLibrary.resolve(); - } - - // Tidy up the existing cursor - var cursor = this.cursor; - delete this.cursor; - return cursor.close(callback); -}; - -/** - * This method pulls all the data out of a readable stream, and writes it to the supplied destination, automatically managing the flow so that the destination is not overwhelmed by a fast readable stream. - * @method - * @param {Writable} destination The destination for writing data - * @param {object} [options] {@link https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options|Pipe options} - * @return {null} - */ -ChangeStream.prototype.pipe = function(destination, options) { - if (!this.pipeDestinations) { - this.pipeDestinations = []; - } - this.pipeDestinations.push(destination); - return this.cursor.pipe(destination, options); -}; - -/** - * This method will remove the hooks set up for a previous pipe() call. - * @param {Writable} [destination] The destination for writing data - * @return {null} - */ -ChangeStream.prototype.unpipe = function(destination) { - if (this.pipeDestinations && this.pipeDestinations.indexOf(destination) > -1) { - this.pipeDestinations.splice(this.pipeDestinations.indexOf(destination), 1); - } - return this.cursor.unpipe(destination); -}; - -/** - * This method will cause a stream in flowing mode to stop emitting data events. Any data that becomes available will remain in the internal buffer. - * @return {null} - */ -ChangeStream.prototype.pause = function() { - return this.cursor.pause(); -}; - -/** - * This method will cause the readable stream to resume emitting data events. - * @return {null} - */ -ChangeStream.prototype.resume = function() { - return this.cursor.resume(); -}; - -/** - * Return a modified Readable stream including a possible transform method. - * @method - * @param {object} [options=null] Optional settings. - * @param {function} [options.transform=null] A transformation method applied to each document emitted by the stream. - * @return {Cursor} - */ -ChangeStream.prototype.stream = function(options) { - this.streamOptions = options; - return this.cursor.stream(options); + return serverConfig.cursor(cursorNamespace, command, cursorOptions); }; // Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream. diff --git a/lib/collection.js b/lib/collection.js index d1b51cf6b9..defe17b1ad 100644 --- a/lib/collection.js +++ b/lib/collection.js @@ -2453,14 +2453,15 @@ Collection.prototype.aggregate = function(pipeline, options, callback) { * Create a new Change Stream, watching for new changes (insertions, updates, replacements, deletions, and invalidations) in this collection. * @method * @since 3.0.0 - * @param {Array} [pipeline=null] An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents. - * @param {object} [options=null] Optional settings + * @param {Array} [pipeline] An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents. + * @param {object} [options] Optional settings * @param {string} [options.fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. - * @param {object} [options.resumeAfter=null] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document. + * @param {object} [options.resumeAfter] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document. * @param {number} [options.maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query - * @param {number} [options.batchSize=null] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. - * @param {object} [options.collation=null] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. - * @param {ReadPreference} [options.readPreference=null] The read preference. Defaults to the read preference of the database or collection. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}. + * @param {number} [options.batchSize] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. + * @param {object} [options.collation] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. + * @param {ReadPreference} [options.readPreference] The read preference. Defaults to the read preference of the database or collection. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}. + * @param {Timestamp} [options.startAtClusterTime] receive change events that occur after the specified timestamp * @param {ClientSession} [options.session] optional session to use for this operation * @return {ChangeStream} a ChangeStream instance. */ diff --git a/lib/cursor.js b/lib/cursor.js index 9411263ef1..07735e63d5 100644 --- a/lib/cursor.js +++ b/lib/cursor.js @@ -1076,7 +1076,13 @@ Cursor.prototype.close = function(options, callback) { }; if (this.s.session) { - return this._endSession(() => completeClose()); + if (typeof callback === 'function') { + return this._endSession(() => completeClose()); + } + + return new this.s.promiseLibrary(resolve => { + this._endSession(() => completeClose().then(resolve)); + }); } return completeClose(); diff --git a/lib/db.js b/lib/db.js index f6b7c59afc..aed7ceb25f 100644 --- a/lib/db.js +++ b/lib/db.js @@ -17,6 +17,7 @@ const mergeOptionsAndWriteConcern = require('./utils').mergeOptionsAndWriteConce const executeOperation = require('./utils').executeOperation; const applyWriteConcern = require('./utils').applyWriteConcern; const convertReadPreference = require('./utils').convertReadPreference; +const ChangeStream = require('./change_stream'); // Operations const addUser = require('./operations/db_ops').addUser; @@ -876,6 +877,35 @@ Db.prototype.unref = function() { this.s.topology.unref(); }; +/** + * Create a new Change Stream, watching for new changes (insertions, updates, replacements, deletions, and invalidations) in this database. Will ignore all changes to system collections. + * @method + * @since 3.1.0 + * @param {Array} [pipeline] An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents. + * @param {object} [options] Optional settings + * @param {string} [options.fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. + * @param {object} [options.resumeAfter] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document. + * @param {number} [options.maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query + * @param {number} [options.batchSize] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. + * @param {object} [options.collation] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. + * @param {ReadPreference} [options.readPreference] The read preference. Defaults to the read preference of the database. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}. + * @param {Timestamp} [options.startAtClusterTime] receive change events that occur after the specified timestamp + * @param {ClientSession} [options.session] optional session to use for this operation + * @return {ChangeStream} a ChangeStream instance. + */ +Db.prototype.watch = function(pipeline, options) { + pipeline = pipeline || []; + options = options || {}; + + // Allow optionally not specifying a pipeline + if (!Array.isArray(pipeline)) { + options = pipeline; + pipeline = []; + } + + return new ChangeStream(this, pipeline, options); +}; + /** * Db close event * diff --git a/lib/mongo_client.js b/lib/mongo_client.js index 6440b0bfdc..cda2b8c7b3 100644 --- a/lib/mongo_client.js +++ b/lib/mongo_client.js @@ -17,6 +17,7 @@ const shallowClone = require('./utils').shallowClone; const authenticate = require('./authenticate'); const ServerSessionPool = require('mongodb-core').Sessions.ServerSessionPool; const executeOperation = require('./utils').executeOperation; +const ChangeStream = require('./change_stream'); const legacyParse = deprecate( require('./url_parser'), @@ -596,6 +597,35 @@ MongoClient.prototype.withSession = function(options, operation) { return cleanupHandler(err, null, { throw: false }); } }; +/** + * Create a new Change Stream, watching for new changes (insertions, updates, replacements, deletions, and invalidations) in this cluster. Will ignore all changes to system collections, as well as the local, admin, + * and config databases. + * @method + * @since 3.1.0 + * @param {Array} [pipeline] An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents. + * @param {object} [options] Optional settings + * @param {string} [options.fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. + * @param {object} [options.resumeAfter] Specifies the logical starting point for the new change stream. This should be the _id field from a previously returned change stream document. + * @param {number} [options.maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query + * @param {number} [options.batchSize] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. + * @param {object} [options.collation] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. + * @param {ReadPreference} [options.readPreference] The read preference. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}. + * @param {Timestamp} [options.startAtClusterTime] receive change events that occur after the specified timestamp + * @param {ClientSession} [options.session] optional session to use for this operation + * @return {ChangeStream} a ChangeStream instance. + */ +MongoClient.prototype.watch = function(pipeline, options) { + pipeline = pipeline || []; + options = options || {}; + + // Allow optionally not specifying a pipeline + if (!Array.isArray(pipeline)) { + options = pipeline; + pipeline = []; + } + + return new ChangeStream(this, pipeline, options); +}; var mergeOptions = function(target, source, flatten) { for (var name in source) { diff --git a/package.json b/package.json index 7d577bab63..3ad2449b35 100644 --- a/package.json +++ b/package.json @@ -27,6 +27,7 @@ "eslint-plugin-prettier": "^2.2.0", "istanbul": "^0.4.5", "jsdoc": "3.5.5", + "lodash.camelcase": "^4.3.0", "mongodb-extjson": "^2.1.1", "mongodb-mock-server": "^1.0.0", "mongodb-test-runner": "^1.1.18", diff --git a/test/functional/change_stream_spec_tests.js b/test/functional/change_stream_spec_tests.js new file mode 100644 index 0000000000..05d0386486 --- /dev/null +++ b/test/functional/change_stream_spec_tests.js @@ -0,0 +1,258 @@ +'use strict'; + +const EJSON = require('mongodb-extjson'); +const chai = require('chai'); +const fs = require('fs'); +const camelCase = require('lodash.camelcase'); +const MongoClient = require('../../lib/mongo_client'); +const setupDatabase = require('./shared').setupDatabase; +const delay = require('./shared').delay; +const expect = chai.expect; + +describe('Change Stream Spec', function() { + const EJSONToJSON = x => JSON.parse(EJSON.stringify(x)); + + let globalClient; + let ctx; + let events; + + before(function() { + return setupDatabase(this.configuration).then(() => { + globalClient = new MongoClient(this.configuration.url()); + return globalClient.connect(); + }); + }); + + after(function() { + const gc = globalClient; + globalClient = undefined; + return new Promise(r => gc.close(() => r())); + }); + + fs + .readdirSync(`${__dirname}/spec/change-stream`) + .filter(filename => filename.match(/\.json$/)) + .forEach(filename => { + const specString = fs.readFileSync(`${__dirname}/spec/change-stream/${filename}`, 'utf8'); + const specData = JSON.parse(specString); + + const ALL_DBS = [specData.database_name, specData.database2_name]; + + describe(filename, () => { + beforeEach(function() { + const gc = globalClient; + const sDB = specData.database_name; + const sColl = specData.collection_name; + return Promise.all(ALL_DBS.map(db => gc.db(db).dropDatabase())) + .then(() => gc.db(sDB).createCollection(sColl)) + .then(() => + new MongoClient(this.configuration.url(), { monitorCommands: true }).connect() + ) + .then(client => { + ctx = { gc, client }; + events = []; + const _events = events; + + ctx.database = ctx.client.db(sDB); + ctx.collection = ctx.database.collection(sColl); + ctx.client.on('commandStarted', e => _events.push(e)); + }); + }); + + afterEach(function() { + const client = ctx.client; + ctx = undefined; + events = undefined; + + return client && client.close(); + }); + + specData.tests.forEach(test => { + const itFn = test.skip ? it.skip : test.only ? it.only : it; + const metadata = generateMetadata(test); + const testFn = generateTestFn(test); + + itFn(test.description, { metadata, test: testFn }); + }); + }); + }); + + // Fn Generator methods + + function generateMetadata(test) { + const mongodb = test.minServerVersion; + const topology = test.topology; + const requires = {}; + if (mongodb) { + requires.mongodb = `>=${mongodb}`; + } + if (topology) { + requires.topology = topology; + } + + return { requires }; + } + + function generateTestFn(test) { + const testFnRunOperations = makeTestFnRunOperations(test); + const testSuccess = makeTestSuccess(test); + const testFailure = makeTestFailure(test); + const testAPM = makeTestAPM(test); + + return function testFn() { + return testFnRunOperations(ctx) + .then(testSuccess, testFailure) + .then(() => testAPM(ctx, events)); + }; + } + + function makeTestSuccess(test) { + const result = test.result; + + return function testSuccess(value) { + if (result.error) { + throw new Error(`Expected test to return error ${result.error}`); + } + + if (result.success) { + value = EJSONToJSON(value); + assertEquality(value, result.success); + } + }; + } + + function makeTestFailure(test) { + const result = test.result; + + return function testFailure(err) { + if (!result.error) { + throw err; + } + + assertEquality(err, result.error); + }; + } + + function makeTestAPM(test) { + const expectedEvents = test.expectations; + + return function testAPM(ctx, events) { + expectedEvents + .map(e => e.command_started_event) + .map(normalizeAPMEvent) + .forEach((expected, idx) => { + if (!events[idx]) { + throw new Error( + `Expected there to be an APM event at index ${idx}, but there was none` + ); + } + const actual = EJSONToJSON(events[idx]); + assertEquality(actual, expected); + }); + }; + } + + function makeTestFnRunOperations(test) { + const target = test.target; + const operations = test.operations; + const success = test.result.success || []; + + return function testFnRunOperations(ctx) { + const changeStreamPipeline = test.changeStreamPipeline; + const changeStreamOptions = test.changeStreamOptions; + ctx.changeStream = ctx[target].watch(changeStreamPipeline, changeStreamOptions); + + const changeStreamPromise = readAndCloseChangeStream(ctx.changeStream, success.length); + const operationsPromise = runOperations(ctx.gc, operations); + + return Promise.all([changeStreamPromise, operationsPromise]).then(args => args[0]); + }; + } + + function readAndCloseChangeStream(changeStream, numChanges) { + const close = makeChangeStreamCloseFn(changeStream); + let changeStreamPromise = changeStream.next().then(r => [r]); + + for (let i = 1; i < numChanges; i += 1) { + changeStreamPromise = changeStreamPromise.then(results => { + return changeStream.next().then(result => { + results.push(result); + return results; + }); + }); + } + + return changeStreamPromise.then(result => close(null, result), err => close(err)); + } + + function runOperations(client, operations) { + return operations + .map(op => makeOperation(client, op)) + .reduce((p, op) => p.then(op), delay(200)); + } + + function makeChangeStreamCloseFn(changeStream) { + return function close(error, value) { + return new Promise((resolve, reject) => { + changeStream.close(err => { + if (error || err) { + return reject(error || err); + } + return resolve(value); + }); + }); + }; + } + + function normalizeAPMEvent(raw) { + return Object.keys(raw).reduce((agg, key) => { + agg[camelCase(key)] = raw[key]; + return agg; + }, {}); + } + + function makeOperation(client, op) { + const target = client.db(op.database).collection(op.collection); + const command = op.name; + const args = []; + if (op.arguments && op.arguments.document) { + args.push(op.arguments.document); + } + return () => target[command].apply(target, args); + } + + function assertEquality(actual, expected) { + try { + _assertEquality(actual, expected); + } catch (e) { + console.dir(actual, { depth: 999 }); + console.dir(expected, { depth: 999 }); + throw e; + } + } + + function _assertEquality(actual, expected) { + try { + if (expected === '42' || expected === 42) { + expect(actual).to.exist; + return; + } + + expect(actual).to.be.a(Array.isArray(expected) ? 'array' : typeof expected); + + if (expected == null) { + expect(actual).to.not.exist; + } else if (Array.isArray(expected)) { + expected.forEach((ex, idx) => _assertEquality(actual[idx], ex)); + } else if (typeof expected === 'object') { + for (let i in expected) { + _assertEquality(actual[i], expected[i]); + } + } else { + expect(actual).to.equal(expected); + } + } catch (e) { + throw e; + } + } +}); diff --git a/test/functional/change_stream_tests.js b/test/functional/change_stream_tests.js index 23c955cb2a..c7d8f45500 100644 --- a/test/functional/change_stream_tests.js +++ b/test/functional/change_stream_tests.js @@ -6,6 +6,10 @@ var setupDatabase = require('./shared').setupDatabase; var delay = require('./shared').delay; var co = require('co'); var mock = require('mongodb-mock-server'); +const chai = require('chai'); +const expect = chai.expect; + +chai.use(require('chai-subset')); // Define the pipeline processing changes var pipeline = [ @@ -14,17 +18,20 @@ var pipeline = [ { $addFields: { comment: 'The documentKey field has been projected out of this document.' } } ]; -describe.skip('Change Streams', function() { +describe('Change Streams', function() { before(function() { - return setupDatabase(this.configuration, [ - 'integration_tests', - 'integration_tests_2', - 'integration_tests5', - 'integration_tests09', - 'integration_tests13', - 'integration_tests14', - 'integration_tests19' - ]); + return setupDatabase(this.configuration, ['integration_tests']); + }); + + beforeEach(function() { + const configuration = this.configuration; + const MongoClient = configuration.require.MongoClient; + const client = new MongoClient(configuration.url()); + + return client.connect().then(() => { + const db = client.db('integration_tests'); + return db.createCollection('test'); + }); }); it('Should create a Change Stream on a collection and emit `change` events', { @@ -41,40 +48,41 @@ describe.skip('Change Streams', function() { var collection = client.db('integration_tests').collection('docsDataEvent'); var changeStream = collection.watch(pipeline); + let count = 0; + // Attach first event listener - changeStream.once('change', function(change) { - assert.equal(change.operationType, 'insert'); - assert.equal(change.fullDocument.d, 4); - assert.equal(change.ns.db, 'integration_tests'); - assert.equal(change.ns.coll, 'docsDataEvent'); - assert.ok(!change.documentKey); - assert.equal( - change.comment, - 'The documentKey field has been projected out of this document.' - ); + changeStream.on('change', function(change) { + if (count === 0) { + count += 1; + assert.equal(change.operationType, 'insert'); + assert.equal(change.fullDocument.d, 4); + assert.equal(change.ns.db, 'integration_tests'); + assert.equal(change.ns.coll, 'docsDataEvent'); + assert.ok(!change.documentKey); + assert.equal( + change.comment, + 'The documentKey field has been projected out of this document.' + ); + return; + } - // Attach second event listener - changeStream.once('change', function(change) { - assert.equal(change.operationType, 'update'); - assert.equal(change.updateDescription.updatedFields.d, 6); + assert.equal(change.operationType, 'update'); + assert.equal(change.updateDescription.updatedFields.d, 6); - // Close the change stream - changeStream.close(function(err) { - assert.ifError(err); - done(); - }); - }); + // Close the change stream + changeStream.close(err => client.close(cerr => done(err || cerr))); + }); - // Trigger the second database event - collection.update({ d: 4 }, { $inc: { d: 2 } }, function(err) { + setTimeout(() => { + // Trigger the first database event + collection.insert({ d: 4 }, function(err) { assert.ifError(err); + // Trigger the second database event + collection.update({ d: 4 }, { $inc: { d: 2 } }, function(err) { + assert.ifError(err); + }); }); }); - - // Trigger the first database event - collection.insert({ d: 4 }, function(err) { - assert.ifError(err); - }); }); } }); @@ -122,10 +130,7 @@ describe.skip('Change Streams', function() { assert.ifError(err); assert.equal(change.operationType, 'update'); // Close the change stream - changeStream.close(function(err) { - assert.ifError(err); - done(); - }); + changeStream.close(err => client.close(cerr => done(err || cerr))); }); }); }); @@ -220,6 +225,7 @@ describe.skip('Change Streams', function() { thisChangeStream3.close() ]); }) + .then(() => client.close()) .then(function() { done(); }) @@ -254,7 +260,7 @@ describe.skip('Change Streams', function() { // Check the cursor is closed assert.equal(thisChangeStream.isClosed(), true); assert.ok(!thisChangeStream.cursor); - done(); + client.close(() => done()); }); }); } @@ -283,11 +289,7 @@ describe.skip('Change Streams', function() { assert.ok(err); assert.ok(err.message); // assert.ok(err.message.indexOf('SOME ERROR MESSAGE HERE ONCE SERVER-29137 IS DONE') > -1); - - changeStream.close(function(err) { - assert.ifError(err); - done(); - }); + changeStream.close(err => client.close(cerr => done(err || cerr))); }); }); } @@ -327,10 +329,7 @@ describe.skip('Change Streams', function() { assert.deepEqual(thisChangeStream.resumeToken, change._id); // Close the change stream - thisChangeStream.close(function(err) { - assert.ifError(err); - done(); - }); + thisChangeStream.close(err => client.close(cerr => done(err || cerr))); }); }); }); @@ -369,7 +368,7 @@ describe.skip('Change Streams', function() { assert.deepEqual(thisChangeStream.resumeToken, change._id); // Close the change stream - return thisChangeStream.close(); + return thisChangeStream.close().then(() => client.close()); }); }); } @@ -394,15 +393,17 @@ describe.skip('Change Streams', function() { thisChangeStream.once('change', function(change) { assert.deepEqual(thisChangeStream.resumeToken, change._id); // Close the change stream - thisChangeStream.close().then(function() { - done(); - }); + thisChangeStream.close().then(() => client.close(done)); }); - // Trigger the first database event - theDatabase.collection('cacheResumeTokenListener').insert({ b: 2 }, function(err, result) { - assert.ifError(err); - assert.equal(result.insertedCount, 1); + setTimeout(() => { + // Trigger the first database event + theDatabase + .collection('cacheResumeTokenListener') + .insert({ b: 2 }, function(err, result) { + assert.ifError(err); + assert.equal(result.insertedCount, 1); + }); }); }); } @@ -446,13 +447,11 @@ describe.skip('Change Streams', function() { assert.ok(err); assert.equal( err.message, - 'A change stream document has been recieved that lacks a resume token (_id).' + 'A change stream document has been received that lacks a resume token (_id).' ); // Close the change stream - thisChangeStream.close().then(function() { - done(); - }); + thisChangeStream.close().then(() => client.close(done)); }); }); }); @@ -484,12 +483,10 @@ describe.skip('Change Streams', function() { thisChangeStream.on('error', function(err) { assert.equal( err.message, - 'A change stream document has been recieved that lacks a resume token (_id).' + 'A change stream document has been received that lacks a resume token (_id).' ); - thisChangeStream.close(function() { - done(); - }); + thisChangeStream.close(() => client.close(done)); }); // Trigger the first database event @@ -538,7 +535,7 @@ describe.skip('Change Streams', function() { assert.equal(change.operationType, 'invalidate'); // now expect the server to close the stream - changeStream.once('close', done); + changeStream.once('close', () => client.close(done)); }); // Trigger the second database event @@ -598,7 +595,7 @@ describe.skip('Change Streams', function() { changeStream.hasNext(function(err, hasNext) { assert.equal(hasNext, false); assert.equal(changeStream.isClosed(), true); - done(); + client.close(done); }); }); }); @@ -618,7 +615,7 @@ describe.skip('Change Streams', function() { client.connect(function(err, client) { assert.ifError(err); - var database = client.db('integration_tests_2'); + var database = client.db('integration_tests'); var changeStream = database.collection('invalidateCollectionDropPromises').watch(pipeline); // Trigger the first database event @@ -647,7 +644,7 @@ describe.skip('Change Streams', function() { .then(function(hasNext) { assert.equal(hasNext, false); assert.equal(changeStream.isClosed(), true); - done(); + client.close(done); }) .catch(function(err) { assert.ifError(err); @@ -721,7 +718,7 @@ describe.skip('Change Streams', function() { client.connect(function(err, client) { assert.ifError(err); - var database = client.db('integration_tests5'); + var database = client.db('integration_tests'); var collection = database.collection('MongoNetworkErrorTestPromises'); var changeStream = collection.watch(pipeline); @@ -748,7 +745,7 @@ describe.skip('Change Streams', function() { // running = false; primaryServer.destroy(); - mock.cleanup(() => done()); + client.close(() => mock.cleanup(() => done())); }); }) .catch(err => done(err)); @@ -827,7 +824,7 @@ describe.skip('Change Streams', function() { function(err, client) { assert.ifError(err); - var theDatabase = client.db('integration_tests5'); + var theDatabase = client.db('integration_tests'); var theCollection = theDatabase.collection('MongoNetworkErrorTestPromises'); var thisChangeStream = theCollection.watch(pipeline); @@ -846,7 +843,7 @@ describe.skip('Change Streams', function() { assert.ifError(err); thisChangeStream.close(); - mock.cleanup(() => done()); + client.close(() => mock.cleanup(() => done())); }); }); } @@ -948,7 +945,7 @@ describe.skip('Change Streams', function() { validateOptions: true }) .then(client => { - var database = client.db('integration_tests5'); + var database = client.db('integration_tests'); var collection = database.collection('MongoNetworkErrorTestPromises'); var changeStream = collection.watch(pipeline); @@ -975,7 +972,9 @@ describe.skip('Change Streams', function() { // Check that only one getMore call was made assert.equal(callsToGetMore, 1); - return Promise.all([changeStream.close(), primaryServer.destroy]); + return Promise.all([changeStream.close(), primaryServer.destroy]).then(() => + client.close() + ); }); }) .catch(err => (finalError = err)) @@ -1070,7 +1069,8 @@ describe.skip('Change Streams', function() { assert.equal(change.operationType, 'insert'); assert.equal(change.fullDocument.a, docs[2].a); return secondChangeStream.close(); - }); + }) + .then(() => client.close()); }); } }); @@ -1085,7 +1085,7 @@ describe.skip('Change Streams', function() { var client = new MongoClient(configuration.url()); return client.connect().then(client => { - var database = client.db('integration_tests09'); + var database = client.db('integration_tests'); var collection = database.collection('fullDocumentLookup'); var changeStream = collection.watch(pipeline, { fullDocument: 'updateLookup' @@ -1125,7 +1125,7 @@ describe.skip('Change Streams', function() { assert.equal(change.fullDocument.f, 128); assert.equal(change.fullDocument.c, 2); - return changeStream.close(); + return changeStream.close().then(() => client.close()); }); }); } @@ -1141,7 +1141,7 @@ describe.skip('Change Streams', function() { var client = new MongoClient(configuration.url()); return client.connect().then(client => { - var database = client.db('integration_tests13'); + var database = client.db('integration_tests'); var collection = database.collection('fullLookupTest'); var changeStream = collection.watch(pipeline, { fullDocument: 'updateLookup' @@ -1194,7 +1194,8 @@ describe.skip('Change Streams', function() { assert.equal(change.lookedUpDocument, null); return changeStream.close(); - }); + }) + .then(() => client.close()); }); } }); @@ -1239,7 +1240,11 @@ describe.skip('Change Streams', function() { assert.deepEqual(changeStream2.cursor.readPreference.preference, ReadPreference.NEAREST); - return Promise.all([changeStream0.close(), changeStream1.close(), changeStream2.close()]); + return Promise.all([ + changeStream0.close(), + changeStream1.close(), + changeStream2.close() + ]).then(() => client.close()); }); } }); @@ -1257,7 +1262,7 @@ describe.skip('Change Streams', function() { client.connect(function(err, client) { assert.ifError(err); - var theDatabase = client.db('integration_tests14'); + var theDatabase = client.db('integration_tests'); var theCollection = theDatabase.collection('pipeTest'); var thisChangeStream = theCollection.watch(pipeline); @@ -1283,16 +1288,13 @@ describe.skip('Change Streams', function() { watcher.close(); - thisChangeStream.close(function(err) { - assert.ifError(err); - done(); - }); + thisChangeStream.close(err => client.close(cErr => done(err || cErr))); }); }); } }); - it('Should resume piping of Change Streams when a resumable error is encountered', { + it.skip('Should resume piping of Change Streams when a resumable error is encountered', { metadata: { requires: { generators: true, @@ -1462,7 +1464,7 @@ describe.skip('Change Streams', function() { var cipher = crypto.createCipher('aes192', 'a password'); var decipher = crypto.createDecipher('aes192', 'a password'); - var theDatabase = client.db('integration_tests19'); + var theDatabase = client.db('integration_tests'); var theCollection = theDatabase.collection('multiPipeTest'); var thisChangeStream = theCollection.watch(pipeline); @@ -1490,10 +1492,7 @@ describe.skip('Change Streams', function() { basicStream.emit('close'); - thisChangeStream.close(function(err) { - assert.ifError(err); - done(); - }); + thisChangeStream.close(err => client.close(cErr => done(err || cErr))); }); pipedStream.on('error', function(err) { @@ -1509,28 +1508,178 @@ describe.skip('Change Streams', function() { } }); - it('Should error when attempting to create a Change Stream against a stand-alone server', { - metadata: { requires: { topology: 'single', mongodb: '>=3.5.10' } }, - - // The actual test we wish to run + it('Should resume after a killCursors command is issued for its child cursor', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.5.10' } }, test: function(done) { - var configuration = this.configuration; - var MongoClient = configuration.require.MongoClient; - var client = new MongoClient(configuration.url()); + const configuration = this.configuration; + const MongoClient = configuration.require.MongoClient; + const client = new MongoClient(configuration.url()); - client.connect(function(err, client) { - assert.ifError(err); + const collectionName = 'resumeAfterKillCursor'; - var database = client.db('integration_tests'); - var changeStreamTest = database.collection('standAloneTest').watch(); - changeStreamTest.hasNext(function(err, result) { - assert.equal(null, result); - assert.ok(err); - assert.equal(err.message, 'The $changeStream stage is only supported on replica sets'); + let db; + let coll; + let changeStream; - done(); - }); + function close(e) { + changeStream.close(() => client.close(() => done(e))); + } + + client + .connect() + .then(() => (db = client.db('integration_tests'))) + .then(() => (coll = db.collection(collectionName))) + .then(() => (changeStream = coll.watch())) + .then(() => ({ p: changeStream.next() })) + .then(x => coll.insertOne({ darmok: 'jalad' }).then(() => x.p)) + .then(() => + db.command({ + killCursors: collectionName, + cursors: [changeStream.cursor.cursorState.cursorId] + }) + ) + .then(() => coll.insertOne({ shaka: 'walls fell' })) + .then(() => changeStream.next()) + .then(change => { + expect(change).to.have.property('operationType', 'insert'); + expect(change).to.have.nested.property('fullDocument.shaka', 'walls fell'); + }) + .then(() => close(), e => close(e)); + } + }); + + it('Should include a startAtOperationTime field when resuming if no changes have been received', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.7.3' } }, + test: function(done) { + const configuration = this.configuration; + const MongoClient = configuration.require.MongoClient; + const ObjectId = configuration.require.ObjectId; + const Timestamp = configuration.require.Timestamp; + const Long = configuration.require.Long; + + const OPERATION_TIME = new Timestamp(4, 1501511802); + + const makeIsMaster = server => ({ + ismaster: true, + secondary: false, + me: server.uri(), + primary: server.uri(), + tags: { loc: 'ny' }, + setName: 'rs', + setVersion: 1, + electionId: new ObjectId(0), + maxBsonObjectSize: 16777216, + maxMessageSizeBytes: 48000000, + maxWriteBatchSize: 1000, + localTime: new Date(), + maxWireVersion: 7, + minWireVersion: 0, + ok: 1, + hosts: [server.uri()], + operationTime: OPERATION_TIME, + $clusterTime: { + clusterTime: OPERATION_TIME + } }); + + const AGGREGATE_RESPONSE = { + ok: 1, + cursor: { + firstBatch: [], + id: new Long('9064341847921713401'), + ns: 'test.test' + } + }; + + const CHANGE_DOC = { + _id: { + ts: OPERATION_TIME, + ns: 'integration_tests.docsDataEvent', + _id: new ObjectId('597f407a8fd4abb616feca93') + }, + operationType: 'insert', + ns: { + db: 'integration_tests', + coll: 'docsDataEvent' + }, + fullDocument: { + _id: new ObjectId('597f407a8fd4abb616feca93'), + a: 1, + counter: 0 + } + }; + + const GET_MORE_RESPONSE = { + ok: 1, + cursor: { + nextBatch: [CHANGE_DOC], + id: new Long('9064341847921713401'), + ns: 'test.test' + }, + cursorId: new Long('9064341847921713401') + }; + + const dbName = 'integration_tests'; + const collectionName = 'resumeWithStartAtOperationTime'; + const connectOptions = { + socketTimeoutMS: 500, + validateOptions: true + }; + + let getMoreCounter = 0; + let aggregateCounter = 0; + let client; + let changeStream; + let server; + + let finish = err => { + finish = () => {}; + Promise.resolve() + .then(() => changeStream && changeStream.close()) + .then(() => client && client.close()) + .then(() => done(err)); + }; + + function primaryServerHandler(request) { + try { + const doc = request.document; + + if (doc.ismaster) { + return request.reply(makeIsMaster(server)); + } else if (doc.aggregate) { + if (aggregateCounter++ > 0) { + expect(doc) + .to.have.nested.property('pipeline[0].$changeStream.startAtOperationTime') + .that.deep.equals(OPERATION_TIME); + expect(doc).to.not.have.nested.property('pipeline[0].$changeStream.resumeAfter'); + } + return request.reply(AGGREGATE_RESPONSE); + } else if (doc.getMore) { + if (getMoreCounter++ === 0) { + return; + } + + request.reply(GET_MORE_RESPONSE); + } else if (doc.endSessions) { + request.reply({ ok: 1 }); + } + } catch (e) { + finish(e); + } + } + + mock + .createServer() + .then(_server => (server = _server)) + .then(() => server.setMessageHandler(primaryServerHandler)) + .then(() => MongoClient.connect(`mongodb://${server.uri()}`, connectOptions)) + .then(_client => (client = _client)) + .then(() => client.db(dbName)) + .then(db => db.collection(collectionName)) + .then(col => col.watch(pipeline)) + .then(_changeStream => (changeStream = _changeStream)) + .then(() => changeStream.next()) + .then(() => finish(), err => finish(err)); } }); }); diff --git a/test/functional/spec/change-stream/README.rst b/test/functional/spec/change-stream/README.rst new file mode 100644 index 0000000000..2f4bacc471 --- /dev/null +++ b/test/functional/spec/change-stream/README.rst @@ -0,0 +1,155 @@ +.. role:: javascript(code) + :language: javascript + +============== +Change Streams +============== + +.. contents:: + +-------- + +Introduction +============ + +The YAML and JSON files in this directory are platform-independent tests that +drivers can use to prove their conformance to the Change Streams Spec. + +Several prose tests, which are not easily expressed in YAML, are also presented +in this file. Those tests will need to be manually implemented by each driver. + +Spec Test Format +================ + +Each YAML file has the following keys: + +- ``database_name``: The default database +- ``collection_name``: The default collection +- ``database2_name``: Another database +- ``collection2_name``: Another collection +- ``tests``: An array of tests that are to be run independently of each other. + Each test will have some of the following fields: + + - ``description``: The name of the test. + - ``minServerVersion``: The minimum server version to run this test against. If not present, assume there is no minimum server version. + - ``maxServerVersion``: Reserved for later use + - ``failPoint``: Reserved for later use + - ``target``: The entity on which to run the change stream. Valid values are: + + - ``collection``: Watch changes on collection ``database_name.collection_name`` + - ``database``: Watch changes on database ``database_name`` + - ``client``: Watch changes on entire clusters + - ``topology``: An array of server topologies against which to run the test. + Valid topologies are ``single``, ``replicaset``, and ``sharded``. + - ``changeStreamPipeline``: An array of additional aggregation pipeline stages to add to the change stream + - ``changeStreamOptions``: Additional options to add to the changeStream + - ``operations``: Array of documents, each describing an operation. Each document has the following fields: + - ``database``: Database against which to run the operation + - ``collection``: Collection against which to run the operation + - ``commandName``: Name of the command to run + - ``arguments``: Object of arguments for the command (ex: document to insert) + + - ``expectations``: Optional list of command-started events in Extended JSON format + - ``result``: Document with ONE of the following fields: + + - ``error``: Describes an error received during the test + - ``success``: An Extended JSON array of documents expected to be received from the changeStream + +Spec Test Match Function +======================== + +The definition of MATCH or MATCHES in the Spec Test Runner is as follows: + +- MATCH takes two values, ``expected`` and ``actual`` +- Notation is "Assert [actual] MATCHES [expected] +- Assertion passes if ``expected`` is a subset of ``actual``, with the values ``42`` and ``"42"`` acting as placeholders for "any value" + +Pseudocode implementation of ``actual`` MATCHES ``expected``: + +:: + + If expected is "42" or 42: + Assert that actual exists (is not null or undefined) + Else: + Assert that actual is of the same JSON type as expected + If expected is a JSON array: + For every idx/value in expected: + Assert that actual[idx] MATCHES value + Else if expected is a JSON object: + For every key/value in expected + Assert that actual[key] MATCHES value + Else: + Assert that expected equals actual + +The expected values for ``result.success`` and ``expectations`` are written in Extended JSON. Drivers may adopt any of the following approaches to comparisons, as long as they are consistent: + +- Convert ``actual`` to Extended JSON and compare to ``expected`` +- Convert ``expected`` and ``actual`` to BSON, and compare them +- Convert ``expected`` and ``actual`` to native equivalents of JSON, and compare them + +Spec Test Runner +================ + +Before running the tests + +- Create a MongoClient ``globalClient``, and connect to the server + +For each YAML file, for each element in ``tests``: + +- If ``topology`` does not include the topology of the server instance(s), skip this test. +- Use ``globalClient`` to + + - Drop the database ``database_name`` + - Drop the database ``database2_name`` + - Create the database ``database_name`` and the collection ``database_name.collection_name`` + - Create the database ``database2_name`` and the collection ``database2_name.collection2_name`` + +- Create a new MongoClient ``client`` +- Begin monitoring all APM events for ``client``. (If the driver uses global listeners, filter out all events that do not originate with ``client``). Filter out any "internal" commands (e.g. ``isMaster``) +- Using ``client``, create a changeStream ``changeStream`` against the specified ``target``. Use ``changeStreamPipeline`` and ``changeStreamOptions`` if they are non-empty +- Using ``globalClient``, run every operation in ``operations`` in serial against the server +- Wait until either: + + - An error occurs + - All operations have been successful AND the changeStream has received as many changes as there are in ``result.success`` + +- Close ``changeStream`` +- If there was an error: + + - Assert that an error was expected for the test. + - Assert that the error MATCHES ``results.error`` + +- Else: + + - Assert that no error was expected for the test + - Assert that the changes received from ``changeStream`` MATCH the results in ``results.success`` + +- If there are any ``expectations`` + + - For each (``expected``, ``idx``) in ``expectations`` + + - Assert that ``actual[idx]`` MATCHES ``expected`` + +- Close the MongoClient ``client`` + +After running all tests + +- Close the MongoClient ``globalClient`` +- Drop database ``database_name`` +- Drop database ``database2_name`` + + +Prose Tests +=========== + +The following tests have not yet been automated, but MUST still be tested + +1. ``ChangeStream`` must continuously track the last seen ``resumeToken`` +2. ``ChangeStream`` will throw an exception if the server response is missing the resume token +3. ``ChangeStream`` will automatically resume one time on a resumable error (including `not master`) with the initial pipeline and options, except for the addition/update of a ``resumeToken``. +4. ``ChangeStream`` will not attempt to resume on a server error +5. ``ChangeStream`` will perform server selection before attempting to resume, using initial ``readPreference`` +6. Ensure that a cursor returned from an aggregate command with a cursor id and an initial empty batch is not closed on the driver side. +7. The ``killCursors`` command sent during the "Resume Process" must not be allowed to throw an exception. +8. ``$changeStream`` stage for ``ChangeStream`` against a server ``>=4.0`` that has not received any results yet MUST include a ``startAtOperationTime`` option when resuming a changestream. +9. ``ChangeStream`` will resume after a ``killCursors`` command is issued for its child cursor. diff --git a/test/functional/spec/change-stream/change-streams-errors.json b/test/functional/spec/change-stream/change-streams-errors.json new file mode 100644 index 0000000000..053ac43e70 --- /dev/null +++ b/test/functional/spec/change-stream/change-streams-errors.json @@ -0,0 +1,78 @@ +{ + "collection_name": "test", + "database_name": "change-stream-tests", + "collection2_name": "test2", + "database2_name": "change-stream-tests-2", + "tests": [ + { + "description": "The watch helper must not throw a custom exception when executed against a single server topology, but instead depend on a server error", + "minServerVersion": "3.6.0", + "target": "collection", + "topology": [ + "single" + ], + "changeStreamPipeline": [], + "changeStreamOptions": {}, + "operations": [], + "expectations": [], + "result": { + "error": { + "code": 40573 + } + } + }, + { + "description": "Change Stream should error when an invalid aggregation stage is passed in", + "minServerVersion": "3.6.0", + "target": "collection", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [ + { + "$unsupported": "foo" + } + ], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "z": 3 + } + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "aggregate": "test", + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "fullDocument": "default" + } + }, + { + "$unsupported": "foo" + } + ] + }, + "command_name": "aggregate", + "database_name": "change-stream-tests" + } + } + ], + "result": { + "error": { + "code": 40324 + } + } + } + ] +} diff --git a/test/functional/spec/change-stream/change-streams-errors.yml b/test/functional/spec/change-stream/change-streams-errors.yml new file mode 100644 index 0000000000..1286e86588 --- /dev/null +++ b/test/functional/spec/change-stream/change-streams-errors.yml @@ -0,0 +1,53 @@ +collection_name: &collection_name "test" +database_name: &database_name "change-stream-tests" +collection2_name: &collection2_name "test2" +database2_name: &database2_name "change-stream-tests-2" +tests: + - + description: The watch helper must not throw a custom exception when executed against a single server topology, but instead depend on a server error + minServerVersion: "3.6.0" + target: collection + topology: + - single + changeStreamPipeline: [] + changeStreamOptions: {} + operations: [] + expectations: [] + result: + error: + code: 40573 + - + description: Change Stream should error when an invalid aggregation stage is passed in + minServerVersion: "3.6.0" + target: collection + topology: + - replicaset + changeStreamPipeline: + - + $unsupported: foo + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + z: 3 + expectations: + - + command_started_event: + command: + aggregate: *collection_name + cursor: {} + pipeline: + - + $changeStream: + fullDocument: default + - + $unsupported: foo + command_name: aggregate + database_name: *database_name + result: + error: + code: 40324 \ No newline at end of file diff --git a/test/functional/spec/change-stream/change-streams.json b/test/functional/spec/change-stream/change-streams.json new file mode 100644 index 0000000000..00ba59f438 --- /dev/null +++ b/test/functional/spec/change-stream/change-streams.json @@ -0,0 +1,507 @@ +{ + "collection_name": "test", + "database_name": "change-stream-tests", + "collection2_name": "test2", + "database2_name": "change-stream-tests-2", + "tests": [ + { + "description": "$changeStream must be the first stage in a change stream pipeline sent to the server", + "minServerVersion": "3.6.0", + "target": "collection", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "x": 1 + } + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "aggregate": "test", + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "fullDocument": "default" + } + } + ] + }, + "command_name": "aggregate", + "database_name": "change-stream-tests" + } + } + ], + "result": { + "success": [] + } + }, + { + "description": "The server returns change stream responses in the specified server response format", + "minServerVersion": "3.6.0", + "target": "collection", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "x": 1 + } + } + } + ], + "expectations": [], + "result": { + "success": [ + { + "_id": "42", + "documentKey": "42", + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "fullDocument": { + "x": { + "$numberInt": "1" + } + } + } + ] + } + }, + { + "description": "Executing a watch helper on a Collection results in notifications for changes to the specified collection", + "minServerVersion": "3.6.0", + "target": "collection", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test2", + "name": "insertOne", + "arguments": { + "document": { + "x": 1 + } + } + }, + { + "database": "change-stream-tests-2", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "y": 2 + } + } + }, + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "z": 3 + } + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "aggregate": "test", + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "fullDocument": "default" + } + } + ] + }, + "command_name": "aggregate", + "database_name": "change-stream-tests" + } + } + ], + "result": { + "success": [ + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "fullDocument": { + "z": { + "$numberInt": "3" + } + } + } + ] + } + }, + { + "description": "Change Stream should allow valid aggregate pipeline stages", + "minServerVersion": "3.6.0", + "target": "collection", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [ + { + "$match": { + "fullDocument.z": 3 + } + } + ], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "y": 2 + } + } + }, + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "z": 3 + } + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "aggregate": "test", + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "fullDocument": "default" + } + }, + { + "$match": { + "fullDocument.z": { + "$numberInt": "3" + } + } + } + ] + }, + "command_name": "aggregate", + "database_name": "change-stream-tests" + } + } + ], + "result": { + "success": [ + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "fullDocument": { + "z": { + "$numberInt": "3" + } + } + } + ] + } + }, + { + "description": "A fresh ChangeStream against a server >=4.0 will always include startAtOperationTime in the $changeStream stage.", + "minServerVersion": "3.8.0", + "target": "collection", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "x": 1 + } + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "aggregate": "test", + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "fullDocument": "default", + "startAtOperationTime": { + "$timestamp": { + "i": 42, + "t": 42 + } + } + } + } + ] + }, + "command_name": "aggregate", + "database_name": "change-stream-tests" + } + } + ], + "result": { + "success": [] + } + }, + { + "description": "Executing a watch helper on a Database results in notifications for changes to all collections in the specified database.", + "minServerVersion": "3.8.0", + "target": "database", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test2", + "name": "insertOne", + "arguments": { + "document": { + "x": 1 + } + } + }, + { + "database": "change-stream-tests-2", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "y": 2 + } + } + }, + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "z": 3 + } + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "aggregate": { + "$numberInt": "1" + }, + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "fullDocument": "default", + "startAtOperationTime": { + "$timestamp": { + "i": 42, + "t": 42 + } + } + } + } + ] + }, + "command_name": "aggregate", + "database_name": "change-stream-tests" + } + } + ], + "result": { + "success": [ + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test2" + }, + "fullDocument": { + "x": { + "$numberInt": "1" + } + } + }, + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "fullDocument": { + "z": { + "$numberInt": "3" + } + } + } + ] + } + }, + { + "description": "Executing a watch helper on a MongoClient results in notifications for changes to all collections in all databases in the cluster.", + "minServerVersion": "3.8.0", + "target": "client", + "topology": [ + "replicaset" + ], + "changeStreamPipeline": [], + "changeStreamOptions": {}, + "operations": [ + { + "database": "change-stream-tests", + "collection": "test2", + "name": "insertOne", + "arguments": { + "document": { + "x": 1 + } + } + }, + { + "database": "change-stream-tests-2", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "y": 2 + } + } + }, + { + "database": "change-stream-tests", + "collection": "test", + "name": "insertOne", + "arguments": { + "document": { + "z": 3 + } + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "aggregate": { + "$numberInt": "1" + }, + "cursor": {}, + "pipeline": [ + { + "$changeStream": { + "fullDocument": "default", + "allChangesForCluster": true, + "startAtOperationTime": { + "$timestamp": { + "i": 42, + "t": 42 + } + } + } + } + ] + }, + "command_name": "aggregate", + "database_name": "admin" + } + } + ], + "result": { + "success": [ + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test2" + }, + "fullDocument": { + "x": { + "$numberInt": "1" + } + } + }, + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests-2", + "coll": "test" + }, + "fullDocument": { + "y": { + "$numberInt": "2" + } + } + }, + { + "operationType": "insert", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "fullDocument": { + "z": { + "$numberInt": "3" + } + } + } + ] + } + } + ] +} diff --git a/test/functional/spec/change-stream/change-streams.yml b/test/functional/spec/change-stream/change-streams.yml new file mode 100644 index 0000000000..c4dd0674a1 --- /dev/null +++ b/test/functional/spec/change-stream/change-streams.yml @@ -0,0 +1,341 @@ +collection_name: &collection_name "test" +database_name: &database_name "change-stream-tests" +collection2_name: &collection2_name "test2" +database2_name: &database2_name "change-stream-tests-2" +tests: + - + description: "$changeStream must be the first stage in a change stream pipeline sent to the server" + minServerVersion: "3.6.0" + target: collection + topology: + - replicaset + changeStreamPipeline: [] + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + x: 1 + expectations: + - + command_started_event: + command: + aggregate: *collection_name + cursor: {} + pipeline: + - + $changeStream: + fullDocument: default + command_name: aggregate + database_name: *database_name + result: + success: [] + - + description: The server returns change stream responses in the specified server response format + minServerVersion: "3.6.0" + target: collection + topology: + - replicaset + changeStreamPipeline: [] + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + x: 1 + expectations: [] + result: + success: + - + _id: "42" + documentKey: "42" + operationType: insert + ns: + db: *database_name + coll: *collection_name + fullDocument: + x: + $numberInt: "1" + - + description: Executing a watch helper on a Collection results in notifications for changes to the specified collection + minServerVersion: "3.6.0" + target: collection + topology: + - replicaset + changeStreamPipeline: [] + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection2_name + name: insertOne + arguments: + document: + x: 1 + - + database: *database2_name + collection: *collection_name + name: insertOne + arguments: + document: + y: 2 + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + z: 3 + expectations: + - + command_started_event: + command: + aggregate: *collection_name + cursor: {} + pipeline: + - + $changeStream: + fullDocument: default + command_name: aggregate + database_name: *database_name + result: + success: + - + operationType: insert + ns: + db: *database_name + coll: *collection_name + fullDocument: + z: + $numberInt: "3" + - + description: Change Stream should allow valid aggregate pipeline stages + minServerVersion: "3.6.0" + target: collection + topology: + - replicaset + changeStreamPipeline: + - + $match: + "fullDocument.z": 3 + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + y: 2 + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + z: 3 + expectations: + - + command_started_event: + command: + aggregate: *collection_name + cursor: {} + pipeline: + - + $changeStream: + fullDocument: default + - + $match: + "fullDocument.z": + $numberInt: "3" + command_name: aggregate + database_name: *database_name + result: + success: + - + operationType: insert + ns: + db: *database_name + coll: *collection_name + fullDocument: + z: + $numberInt: "3" + - + description: A fresh ChangeStream against a server >=4.0 will always include startAtOperationTime in the $changeStream stage. + minServerVersion: "3.8.0" + target: collection + topology: + - replicaset + changeStreamPipeline: [] + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + x: 1 + expectations: + - + command_started_event: + command: + aggregate: *collection_name + cursor: {} + pipeline: + - + $changeStream: + fullDocument: default + startAtOperationTime: + $timestamp: + i: 42 + t: 42 + command_name: aggregate + database_name: *database_name + result: + success: [] + - + description: Executing a watch helper on a Database results in notifications for changes to all collections in the specified database. + minServerVersion: "3.8.0" + target: database + topology: + - replicaset + changeStreamPipeline: [] + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection2_name + name: insertOne + arguments: + document: + x: 1 + - + database: *database2_name + collection: *collection_name + name: insertOne + arguments: + document: + y: 2 + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + z: 3 + expectations: + - + command_started_event: + command: + aggregate: + $numberInt: "1" + cursor: {} + pipeline: + - + $changeStream: + fullDocument: default + startAtOperationTime: + $timestamp: + i: 42 + t: 42 + command_name: aggregate + database_name: *database_name + result: + success: + - + operationType: insert + ns: + db: *database_name + coll: *collection2_name + fullDocument: + x: + $numberInt: "1" + - + operationType: insert + ns: + db: *database_name + coll: *collection_name + fullDocument: + z: + $numberInt: "3" + - + description: Executing a watch helper on a MongoClient results in notifications for changes to all collections in all databases in the cluster. + minServerVersion: "3.8.0" + target: client + topology: + - replicaset + changeStreamPipeline: [] + changeStreamOptions: {} + operations: + - + database: *database_name + collection: *collection2_name + name: insertOne + arguments: + document: + x: 1 + - + database: *database2_name + collection: *collection_name + name: insertOne + arguments: + document: + y: 2 + - + database: *database_name + collection: *collection_name + name: insertOne + arguments: + document: + z: 3 + expectations: + - + command_started_event: + command: + aggregate: + $numberInt: "1" + cursor: {} + pipeline: + - + $changeStream: + fullDocument: default + allChangesForCluster: true + startAtOperationTime: + $timestamp: + i: 42 + t: 42 + command_name: aggregate + database_name: admin + result: + success: + - + operationType: insert + ns: + db: *database_name + coll: *collection2_name + fullDocument: + x: + $numberInt: "1" + - + operationType: insert + ns: + db: *database2_name + coll: *collection_name + fullDocument: + y: + $numberInt: "2" + - + operationType: insert + ns: + db: *database_name + coll: *collection_name + fullDocument: + z: + $numberInt: "3"