Skip to content

Commit

Permalink
feat(changeStream): allow resuming on getMore errors
Browse files Browse the repository at this point in the history
Fixes NODE-1462
  • Loading branch information
daprahamian authored May 31, 2018
1 parent 13053ce commit 4ba5adc
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 34 deletions.
61 changes: 27 additions & 34 deletions lib/change_stream.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
'use strict';

var EventEmitter = require('events'),
inherits = require('util').inherits,
MongoNetworkError = require('mongodb-core').MongoNetworkError;
const EventEmitter = require('events');
const inherits = require('util').inherits;
const MongoNetworkError = require('mongodb-core').MongoNetworkError;
const mongoErrorContextSymbol = require('mongodb-core').mongoErrorContextSymbol;
const GET_MORE_NON_RESUMABLE_CODES = require('./error_codes').GET_MORE_NON_RESUMABLE_CODES;

var cursorOptionNames = ['maxAwaitTimeMS', 'collation', 'readPreference'];

Expand Down Expand Up @@ -296,42 +298,33 @@ ChangeStream.prototype.stream = function(options) {
return this.cursor.stream(options);
};

const RESUMABLE_ERROR_CODES = new Set([
6, // HostUnreachable
7, // HostNotFound
50, // ExceededTimeLimit
89, // NetworkTimeout
189, // PrimarySteppedDown
216, // ElectionInProgress
234, // RetryChangeStream
9001, // SocketException
10107, // NotMaster
11602, // InterruptedDueToReplStateChange
13435, // NotMasterNoSlaveOk
13436 // NotMasterOrSecondary
]);

// TODO: will be used for check for getMore errors
// const GET_MORE_NON_RESUMABLE_CODES = new Set([
// 136, // CappedPositionLost
// 237, // CursorKilled
// 11601 // Interrupted
// ]);
// From spec@https://github.com/mongodb/specifications/blob/35e466ddf25059cb30e4113de71cdebd3754657f/source/change-streams.rst#resumable-error:
//
// An error is considered resumable if it meets any of the following criteria:
// - any error encountered which is not a server error (e.g. a timeout error or network error)
// - any server error response from a getMore command excluding those containing the following error codes
// - Interrupted: 11601
// - CappedPositionLost: 136
// - CursorKilled: 237
// - a server error response with an error message containing the substring "not master" or "node is recovering"
//
// An error on an aggregate command is not a resumable error. Only errors on a getMore command may be considered resumable errors.

function isGetMoreError(error) {
return !!error[mongoErrorContextSymbol].isGetMore;
}

function isResumableError(error) {
// TODO: Need a way to check if error is
// - from a getMore
// - is not in GET_MORE_NON_RESUMABLE_CODES
if (
if (!isGetMoreError(error)) {
return false;
}

return !!(
error instanceof MongoNetworkError ||
RESUMABLE_ERROR_CODES.has(error.code) ||
!GET_MORE_NON_RESUMABLE_CODES.has(error.code) ||
error.message.match(/not master/) ||
error.message.match(/node is recovering/)
) {
return true;
}

return false;
);
}

// Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream.
Expand Down
9 changes: 9 additions & 0 deletions lib/error_codes.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
'use strict';

const GET_MORE_NON_RESUMABLE_CODES = new Set([
136, // CappedPositionLost
237, // CursorKilled
11601 // Interrupted
]);

module.exports = { GET_MORE_NON_RESUMABLE_CODES };
212 changes: 212 additions & 0 deletions test/unit/change_stream_resume_tests.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
'use strict';

const expect = require('chai').expect;
const mock = require('mongodb-mock-server');
const MongoClient = require('../../lib/mongo_client');
const ObjectId = require('../../index').ObjectId;
const Timestamp = require('../../index').Timestamp;
const Long = require('../../index').Long;
const GET_MORE_NON_RESUMABLE_CODES = require('../../lib/error_codes').GET_MORE_NON_RESUMABLE_CODES;

describe('Change Stream Resume Tests', function() {
const test = {};
const DEFAULT_IS_MASTER = Object.assign({}, mock.DEFAULT_ISMASTER, {
setName: 'rs',
setVersion: 1,
maxWireVersion: 7,
secondary: false
});

const AGGREGATE_RESPONSE = {
ok: 1,
cursor: {
firstBatch: [],
id: new Long('9064341847921713401'),
ns: 'test.test'
},
operationTime: new Timestamp(1527200325, 1),
$clusterTime: {
clusterTime: new Timestamp(1527200325, 1),
signature: {
keyId: new Long(0)
}
}
};

const CHANGE_DOC = {
_id: {
ts: new Timestamp(4, 1501511802),
ns: 'integration_tests.docsDataEvent',
_id: new ObjectId('597f407a8fd4abb616feca93')
},
operationType: 'insert',
ns: {
db: 'integration_tests',
coll: 'docsDataEvent'
},
fullDocument: {
_id: new ObjectId('597f407a8fd4abb616feca93'),
a: 1,
counter: 0
}
};

const GET_MORE_RESPONSE = {
ok: 1,
cursor: {
nextBatch: [CHANGE_DOC],
id: new Long('9064341847921713401'),
ns: 'test.test'
},
operationTime: new Timestamp(1527200325, 1),
$clusterTime: {
clusterTime: new Timestamp(1527200325, 1),
signature: {
keyId: new Long(0)
}
}
};

function makeIsMaster(server) {
const uri = server.uri();

return Object.assign({}, DEFAULT_IS_MASTER, {
hosts: [uri],
me: uri,
primary: uri
});
}

function makeServerHandler(config) {
let firstGetMore = true;
let firstAggregate = true;
return request => {
const doc = request.document;

if (doc.ismaster) {
return request.reply(makeIsMaster(test.server));
}
if (doc.endSessions) {
return request.reply({ ok: 1 });
}
if (doc.aggregate) {
if (firstAggregate) {
firstAggregate = false;
return config.firstAggregate(request);
}
return config.secondAggregate(request);
}
if (doc.getMore) {
if (firstGetMore) {
firstGetMore = false;
return config.firstGetMore(request);
}
return config.secondGetMore(request);
}
};
}

const RESUMABLE_ERROR_CODES = [1, 40, 20000];

const configs = RESUMABLE_ERROR_CODES.map(code => ({
description: `should resume on error code ${code}`,
passing: true,
firstAggregate: req => req.reply(AGGREGATE_RESPONSE),
secondAggregate: req => req.reply(AGGREGATE_RESPONSE),
firstGetMore: req => req.reply({ ok: 0, errmsg: 'firstGetMoreError', code }),
secondGetMore: req => req.reply(GET_MORE_RESPONSE)
}))
.concat([
{
description: `should resume on a network error`,
passing: true,
firstAggregate: req => req.reply(AGGREGATE_RESPONSE),
secondAggregate: req => req.reply(AGGREGATE_RESPONSE),
firstGetMore: () => {}, // Simulates a timeout
secondGetMore: req => req.reply(GET_MORE_RESPONSE)
},
{
description: `should resume on an error that says "not master"`,
passing: true,
firstAggregate: req => req.reply(AGGREGATE_RESPONSE),
secondAggregate: req => req.reply(AGGREGATE_RESPONSE),
firstGetMore: req => req.reply({ ok: 0, errmsg: 'not master' }),
secondGetMore: req => req.reply(GET_MORE_RESPONSE)
},
{
description: `should resume on an error that says "node is recovering"`,
passing: true,
firstAggregate: req => req.reply(AGGREGATE_RESPONSE),
secondAggregate: req => req.reply(AGGREGATE_RESPONSE),
firstGetMore: req => req.reply({ ok: 0, errmsg: 'node is recovering' }),
secondGetMore: req => req.reply(GET_MORE_RESPONSE)
}
])
.concat(
Array.from(GET_MORE_NON_RESUMABLE_CODES).map(code => ({
description: `should not resume on error code ${code}`,
passing: false,
errmsg: 'firstGetMoreError',
firstAggregate: req => req.reply(AGGREGATE_RESPONSE),
secondAggregate: req =>
req.reply({ ok: 0, errmsg: 'We should not have a second aggregate' }),
firstGetMore: req => req.reply({ ok: 0, errmsg: 'firstGetMoreError', code }),
secondGetMore: req => req.reply({ ok: 0, errmsg: 'We should not have a second getMore' })
}))
)
.concat(
RESUMABLE_ERROR_CODES.map(code => ({
description: `should not resume on aggregate, even for valid code ${code}`,
passing: false,
errmsg: 'fail aggregate',
firstAggregate: req => req.reply({ ok: 0, errmsg: 'fail aggregate', code }),
secondAggregate: req =>
req.reply({ ok: 0, errmsg: 'We should not have a second aggregate' }),
firstGetMore: req => req.reply({ ok: 0, errmsg: 'We should not have a first getMore' }),
secondGetMore: req => req.reply({ ok: 0, errmsg: 'We should not have a second getMore' })
}))
);

let client;
let changeStream;

beforeEach(() => {
return mock.createServer().then(server => {
test.server = server;
});
});
afterEach(done => changeStream.close(() => client.close(() => mock.cleanup(done))));

configs.forEach(config => {
it(config.description, {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: function() {
test.server.setMessageHandler(makeServerHandler(config));
client = new MongoClient(`mongodb://${test.server.uri()}`, { socketTimeoutMS: 300 });
return client
.connect()
.then(client => client.db('test'))
.then(db => db.collection('test'))
.then(collection => collection.watch())
.then(_changeStream => (changeStream = _changeStream))
.then(() => changeStream.next())
.then(
change => {
if (!config.passing) {
throw new Error('Expected test to not pass');
}

expect(change).to.deep.equal(CHANGE_DOC);
},
err => {
if (config.passing) {
throw err;
}

expect(err).to.have.property('errmsg', config.errmsg);
}
);
}
});
});
});

0 comments on commit 4ba5adc

Please sign in to comment.