From 4ba5adc25582877e8a32307906e0a4d648c3e81e Mon Sep 17 00:00:00 2001 From: Dan Aprahamian Date: Thu, 31 May 2018 14:04:38 -0400 Subject: [PATCH] feat(changeStream): allow resuming on getMore errors Fixes NODE-1462 --- lib/change_stream.js | 61 +++---- lib/error_codes.js | 9 + test/unit/change_stream_resume_tests.js | 212 ++++++++++++++++++++++++ 3 files changed, 248 insertions(+), 34 deletions(-) create mode 100644 lib/error_codes.js create mode 100644 test/unit/change_stream_resume_tests.js diff --git a/lib/change_stream.js b/lib/change_stream.js index d6fd9169b9..e28bde286d 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -1,8 +1,10 @@ 'use strict'; -var EventEmitter = require('events'), - inherits = require('util').inherits, - MongoNetworkError = require('mongodb-core').MongoNetworkError; +const EventEmitter = require('events'); +const inherits = require('util').inherits; +const MongoNetworkError = require('mongodb-core').MongoNetworkError; +const mongoErrorContextSymbol = require('mongodb-core').mongoErrorContextSymbol; +const GET_MORE_NON_RESUMABLE_CODES = require('./error_codes').GET_MORE_NON_RESUMABLE_CODES; var cursorOptionNames = ['maxAwaitTimeMS', 'collation', 'readPreference']; @@ -296,42 +298,33 @@ ChangeStream.prototype.stream = function(options) { return this.cursor.stream(options); }; -const RESUMABLE_ERROR_CODES = new Set([ - 6, // HostUnreachable - 7, // HostNotFound - 50, // ExceededTimeLimit - 89, // NetworkTimeout - 189, // PrimarySteppedDown - 216, // ElectionInProgress - 234, // RetryChangeStream - 9001, // SocketException - 10107, // NotMaster - 11602, // InterruptedDueToReplStateChange - 13435, // NotMasterNoSlaveOk - 13436 // NotMasterOrSecondary -]); - -// TODO: will be used for check for getMore errors -// const GET_MORE_NON_RESUMABLE_CODES = new Set([ -// 136, // CappedPositionLost -// 237, // CursorKilled -// 11601 // Interrupted -// ]); +// From spec@https://github.com/mongodb/specifications/blob/35e466ddf25059cb30e4113de71cdebd3754657f/source/change-streams.rst#resumable-error: +// +// An error is considered resumable if it meets any of the following criteria: +// - any error encountered which is not a server error (e.g. a timeout error or network error) +// - any server error response from a getMore command excluding those containing the following error codes +// - Interrupted: 11601 +// - CappedPositionLost: 136 +// - CursorKilled: 237 +// - a server error response with an error message containing the substring "not master" or "node is recovering" +// +// An error on an aggregate command is not a resumable error. Only errors on a getMore command may be considered resumable errors. + +function isGetMoreError(error) { + return !!error[mongoErrorContextSymbol].isGetMore; +} function isResumableError(error) { - // TODO: Need a way to check if error is - // - from a getMore - // - is not in GET_MORE_NON_RESUMABLE_CODES - if ( + if (!isGetMoreError(error)) { + return false; + } + + return !!( error instanceof MongoNetworkError || - RESUMABLE_ERROR_CODES.has(error.code) || + !GET_MORE_NON_RESUMABLE_CODES.has(error.code) || error.message.match(/not master/) || error.message.match(/node is recovering/) - ) { - return true; - } - - return false; + ); } // Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream. diff --git a/lib/error_codes.js b/lib/error_codes.js new file mode 100644 index 0000000000..2b1d1ffa95 --- /dev/null +++ b/lib/error_codes.js @@ -0,0 +1,9 @@ +'use strict'; + +const GET_MORE_NON_RESUMABLE_CODES = new Set([ + 136, // CappedPositionLost + 237, // CursorKilled + 11601 // Interrupted +]); + +module.exports = { GET_MORE_NON_RESUMABLE_CODES }; diff --git a/test/unit/change_stream_resume_tests.js b/test/unit/change_stream_resume_tests.js new file mode 100644 index 0000000000..599b0def7c --- /dev/null +++ b/test/unit/change_stream_resume_tests.js @@ -0,0 +1,212 @@ +'use strict'; + +const expect = require('chai').expect; +const mock = require('mongodb-mock-server'); +const MongoClient = require('../../lib/mongo_client'); +const ObjectId = require('../../index').ObjectId; +const Timestamp = require('../../index').Timestamp; +const Long = require('../../index').Long; +const GET_MORE_NON_RESUMABLE_CODES = require('../../lib/error_codes').GET_MORE_NON_RESUMABLE_CODES; + +describe('Change Stream Resume Tests', function() { + const test = {}; + const DEFAULT_IS_MASTER = Object.assign({}, mock.DEFAULT_ISMASTER, { + setName: 'rs', + setVersion: 1, + maxWireVersion: 7, + secondary: false + }); + + const AGGREGATE_RESPONSE = { + ok: 1, + cursor: { + firstBatch: [], + id: new Long('9064341847921713401'), + ns: 'test.test' + }, + operationTime: new Timestamp(1527200325, 1), + $clusterTime: { + clusterTime: new Timestamp(1527200325, 1), + signature: { + keyId: new Long(0) + } + } + }; + + const CHANGE_DOC = { + _id: { + ts: new Timestamp(4, 1501511802), + ns: 'integration_tests.docsDataEvent', + _id: new ObjectId('597f407a8fd4abb616feca93') + }, + operationType: 'insert', + ns: { + db: 'integration_tests', + coll: 'docsDataEvent' + }, + fullDocument: { + _id: new ObjectId('597f407a8fd4abb616feca93'), + a: 1, + counter: 0 + } + }; + + const GET_MORE_RESPONSE = { + ok: 1, + cursor: { + nextBatch: [CHANGE_DOC], + id: new Long('9064341847921713401'), + ns: 'test.test' + }, + operationTime: new Timestamp(1527200325, 1), + $clusterTime: { + clusterTime: new Timestamp(1527200325, 1), + signature: { + keyId: new Long(0) + } + } + }; + + function makeIsMaster(server) { + const uri = server.uri(); + + return Object.assign({}, DEFAULT_IS_MASTER, { + hosts: [uri], + me: uri, + primary: uri + }); + } + + function makeServerHandler(config) { + let firstGetMore = true; + let firstAggregate = true; + return request => { + const doc = request.document; + + if (doc.ismaster) { + return request.reply(makeIsMaster(test.server)); + } + if (doc.endSessions) { + return request.reply({ ok: 1 }); + } + if (doc.aggregate) { + if (firstAggregate) { + firstAggregate = false; + return config.firstAggregate(request); + } + return config.secondAggregate(request); + } + if (doc.getMore) { + if (firstGetMore) { + firstGetMore = false; + return config.firstGetMore(request); + } + return config.secondGetMore(request); + } + }; + } + + const RESUMABLE_ERROR_CODES = [1, 40, 20000]; + + const configs = RESUMABLE_ERROR_CODES.map(code => ({ + description: `should resume on error code ${code}`, + passing: true, + firstAggregate: req => req.reply(AGGREGATE_RESPONSE), + secondAggregate: req => req.reply(AGGREGATE_RESPONSE), + firstGetMore: req => req.reply({ ok: 0, errmsg: 'firstGetMoreError', code }), + secondGetMore: req => req.reply(GET_MORE_RESPONSE) + })) + .concat([ + { + description: `should resume on a network error`, + passing: true, + firstAggregate: req => req.reply(AGGREGATE_RESPONSE), + secondAggregate: req => req.reply(AGGREGATE_RESPONSE), + firstGetMore: () => {}, // Simulates a timeout + secondGetMore: req => req.reply(GET_MORE_RESPONSE) + }, + { + description: `should resume on an error that says "not master"`, + passing: true, + firstAggregate: req => req.reply(AGGREGATE_RESPONSE), + secondAggregate: req => req.reply(AGGREGATE_RESPONSE), + firstGetMore: req => req.reply({ ok: 0, errmsg: 'not master' }), + secondGetMore: req => req.reply(GET_MORE_RESPONSE) + }, + { + description: `should resume on an error that says "node is recovering"`, + passing: true, + firstAggregate: req => req.reply(AGGREGATE_RESPONSE), + secondAggregate: req => req.reply(AGGREGATE_RESPONSE), + firstGetMore: req => req.reply({ ok: 0, errmsg: 'node is recovering' }), + secondGetMore: req => req.reply(GET_MORE_RESPONSE) + } + ]) + .concat( + Array.from(GET_MORE_NON_RESUMABLE_CODES).map(code => ({ + description: `should not resume on error code ${code}`, + passing: false, + errmsg: 'firstGetMoreError', + firstAggregate: req => req.reply(AGGREGATE_RESPONSE), + secondAggregate: req => + req.reply({ ok: 0, errmsg: 'We should not have a second aggregate' }), + firstGetMore: req => req.reply({ ok: 0, errmsg: 'firstGetMoreError', code }), + secondGetMore: req => req.reply({ ok: 0, errmsg: 'We should not have a second getMore' }) + })) + ) + .concat( + RESUMABLE_ERROR_CODES.map(code => ({ + description: `should not resume on aggregate, even for valid code ${code}`, + passing: false, + errmsg: 'fail aggregate', + firstAggregate: req => req.reply({ ok: 0, errmsg: 'fail aggregate', code }), + secondAggregate: req => + req.reply({ ok: 0, errmsg: 'We should not have a second aggregate' }), + firstGetMore: req => req.reply({ ok: 0, errmsg: 'We should not have a first getMore' }), + secondGetMore: req => req.reply({ ok: 0, errmsg: 'We should not have a second getMore' }) + })) + ); + + let client; + let changeStream; + + beforeEach(() => { + return mock.createServer().then(server => { + test.server = server; + }); + }); + afterEach(done => changeStream.close(() => client.close(() => mock.cleanup(done)))); + + configs.forEach(config => { + it(config.description, { + metadata: { requires: { mongodb: '>=3.6.0' } }, + test: function() { + test.server.setMessageHandler(makeServerHandler(config)); + client = new MongoClient(`mongodb://${test.server.uri()}`, { socketTimeoutMS: 300 }); + return client + .connect() + .then(client => client.db('test')) + .then(db => db.collection('test')) + .then(collection => collection.watch()) + .then(_changeStream => (changeStream = _changeStream)) + .then(() => changeStream.next()) + .then( + change => { + if (!config.passing) { + throw new Error('Expected test to not pass'); + } + + expect(change).to.deep.equal(CHANGE_DOC); + }, + err => { + if (config.passing) { + throw err; + } + + expect(err).to.have.property('errmsg', config.errmsg); + } + ); + } + }); + }); +});