-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(changeStream): Adding new 4.0 ChangeStream features
Adds new ChangeStream features for 4.0 as per SPEC-1057 - Db.watch() method - MongoClient.watch() method - startAtClusterTime option Fixes NODE-1483
- Loading branch information
1 parent
7018c1e
commit 2cb4894
Showing
13 changed files
with
1,940 additions
and
303 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,258 @@ | ||
'use strict'; | ||
|
||
const EJSON = require('mongodb-extjson'); | ||
const chai = require('chai'); | ||
const fs = require('fs'); | ||
const camelCase = require('lodash.camelcase'); | ||
const MongoClient = require('../../lib/mongo_client'); | ||
const setupDatabase = require('./shared').setupDatabase; | ||
const delay = require('./shared').delay; | ||
const expect = chai.expect; | ||
|
||
describe('Change Stream Spec', function() { | ||
const EJSONToJSON = x => JSON.parse(EJSON.stringify(x)); | ||
|
||
let globalClient; | ||
let ctx; | ||
let events; | ||
|
||
before(function() { | ||
return setupDatabase(this.configuration).then(() => { | ||
globalClient = new MongoClient(this.configuration.url()); | ||
return globalClient.connect(); | ||
}); | ||
}); | ||
|
||
after(function() { | ||
const gc = globalClient; | ||
globalClient = undefined; | ||
return new Promise(r => gc.close(() => r())); | ||
}); | ||
|
||
fs | ||
.readdirSync(`${__dirname}/spec/change-stream`) | ||
.filter(filename => filename.match(/\.json$/)) | ||
.forEach(filename => { | ||
const specString = fs.readFileSync(`${__dirname}/spec/change-stream/${filename}`, 'utf8'); | ||
const specData = JSON.parse(specString); | ||
|
||
const ALL_DBS = [specData.database_name, specData.database2_name]; | ||
|
||
describe(filename, () => { | ||
beforeEach(function() { | ||
const gc = globalClient; | ||
const sDB = specData.database_name; | ||
const sColl = specData.collection_name; | ||
return Promise.all(ALL_DBS.map(db => gc.db(db).dropDatabase())) | ||
.then(() => gc.db(sDB).createCollection(sColl)) | ||
.then(() => | ||
new MongoClient(this.configuration.url(), { monitorCommands: true }).connect() | ||
) | ||
.then(client => { | ||
ctx = { gc, client }; | ||
events = []; | ||
const _events = events; | ||
|
||
ctx.database = ctx.client.db(sDB); | ||
ctx.collection = ctx.database.collection(sColl); | ||
ctx.client.on('commandStarted', e => _events.push(e)); | ||
}); | ||
}); | ||
|
||
afterEach(function() { | ||
const client = ctx.client; | ||
ctx = undefined; | ||
events = undefined; | ||
|
||
return client && client.close(); | ||
}); | ||
|
||
specData.tests.forEach(test => { | ||
const itFn = test.skip ? it.skip : test.only ? it.only : it; | ||
const metadata = generateMetadata(test); | ||
const testFn = generateTestFn(test); | ||
|
||
itFn(test.description, { metadata, test: testFn }); | ||
}); | ||
}); | ||
}); | ||
|
||
// Fn Generator methods | ||
|
||
function generateMetadata(test) { | ||
const mongodb = test.minServerVersion; | ||
const topology = test.topology; | ||
const requires = {}; | ||
if (mongodb) { | ||
requires.mongodb = `>=${mongodb}`; | ||
} | ||
if (topology) { | ||
requires.topology = topology; | ||
} | ||
|
||
return { requires }; | ||
} | ||
|
||
function generateTestFn(test) { | ||
const testFnRunOperations = makeTestFnRunOperations(test); | ||
const testSuccess = makeTestSuccess(test); | ||
const testFailure = makeTestFailure(test); | ||
const testAPM = makeTestAPM(test); | ||
|
||
return function testFn() { | ||
return testFnRunOperations(ctx) | ||
.then(testSuccess, testFailure) | ||
.then(() => testAPM(ctx, events)); | ||
}; | ||
} | ||
|
||
function makeTestSuccess(test) { | ||
const result = test.result; | ||
|
||
return function testSuccess(value) { | ||
if (result.error) { | ||
throw new Error(`Expected test to return error ${result.error}`); | ||
} | ||
|
||
if (result.success) { | ||
value = EJSONToJSON(value); | ||
assertEquality(value, result.success); | ||
} | ||
}; | ||
} | ||
|
||
function makeTestFailure(test) { | ||
const result = test.result; | ||
|
||
return function testFailure(err) { | ||
if (!result.error) { | ||
throw err; | ||
} | ||
|
||
assertEquality(err, result.error); | ||
}; | ||
} | ||
|
||
function makeTestAPM(test) { | ||
const expectedEvents = test.expectations; | ||
|
||
return function testAPM(ctx, events) { | ||
expectedEvents | ||
.map(e => e.command_started_event) | ||
.map(normalizeAPMEvent) | ||
.forEach((expected, idx) => { | ||
if (!events[idx]) { | ||
throw new Error( | ||
`Expected there to be an APM event at index ${idx}, but there was none` | ||
); | ||
} | ||
const actual = EJSONToJSON(events[idx]); | ||
assertEquality(actual, expected); | ||
}); | ||
}; | ||
} | ||
|
||
function makeTestFnRunOperations(test) { | ||
const target = test.target; | ||
const operations = test.operations; | ||
const success = test.result.success || []; | ||
|
||
return function testFnRunOperations(ctx) { | ||
const changeStreamPipeline = test.changeStreamPipeline; | ||
const changeStreamOptions = test.changeStreamOptions; | ||
ctx.changeStream = ctx[target].watch(changeStreamPipeline, changeStreamOptions); | ||
|
||
const changeStreamPromise = readAndCloseChangeStream(ctx.changeStream, success.length); | ||
const operationsPromise = runOperations(ctx.gc, operations); | ||
|
||
return Promise.all([changeStreamPromise, operationsPromise]).then(args => args[0]); | ||
}; | ||
} | ||
|
||
function readAndCloseChangeStream(changeStream, numChanges) { | ||
const close = makeChangeStreamCloseFn(changeStream); | ||
let changeStreamPromise = changeStream.next().then(r => [r]); | ||
|
||
for (let i = 1; i < numChanges; i += 1) { | ||
changeStreamPromise = changeStreamPromise.then(results => { | ||
return changeStream.next().then(result => { | ||
results.push(result); | ||
return results; | ||
}); | ||
}); | ||
} | ||
|
||
return changeStreamPromise.then(result => close(null, result), err => close(err)); | ||
} | ||
|
||
function runOperations(client, operations) { | ||
return operations | ||
.map(op => makeOperation(client, op)) | ||
.reduce((p, op) => p.then(op), delay(200)); | ||
} | ||
|
||
function makeChangeStreamCloseFn(changeStream) { | ||
return function close(error, value) { | ||
return new Promise((resolve, reject) => { | ||
changeStream.close(err => { | ||
if (error || err) { | ||
return reject(error || err); | ||
} | ||
return resolve(value); | ||
}); | ||
}); | ||
}; | ||
} | ||
|
||
function normalizeAPMEvent(raw) { | ||
return Object.keys(raw).reduce((agg, key) => { | ||
agg[camelCase(key)] = raw[key]; | ||
return agg; | ||
}, {}); | ||
} | ||
|
||
function makeOperation(client, op) { | ||
const target = client.db(op.database).collection(op.collection); | ||
const command = op.name; | ||
const args = []; | ||
if (op.arguments && op.arguments.document) { | ||
args.push(op.arguments.document); | ||
} | ||
return () => target[command].apply(target, args); | ||
} | ||
|
||
function assertEquality(actual, expected) { | ||
try { | ||
_assertEquality(actual, expected); | ||
} catch (e) { | ||
console.dir(actual, { depth: 999 }); | ||
console.dir(expected, { depth: 999 }); | ||
throw e; | ||
} | ||
} | ||
|
||
function _assertEquality(actual, expected) { | ||
try { | ||
if (expected === '42' || expected === 42) { | ||
expect(actual).to.exist; | ||
return; | ||
} | ||
|
||
expect(actual).to.be.a(Array.isArray(expected) ? 'array' : typeof expected); | ||
|
||
if (expected == null) { | ||
expect(actual).to.not.exist; | ||
} else if (Array.isArray(expected)) { | ||
expected.forEach((ex, idx) => _assertEquality(actual[idx], ex)); | ||
} else if (typeof expected === 'object') { | ||
for (let i in expected) { | ||
_assertEquality(actual[i], expected[i]); | ||
} | ||
} else { | ||
expect(actual).to.equal(expected); | ||
} | ||
} catch (e) { | ||
throw e; | ||
} | ||
} | ||
}); |
Oops, something went wrong.