diff --git a/lib/connection/pool.js b/lib/connection/pool.js index 7a09a83ed..7c106fc9b 100644 --- a/lib/connection/pool.js +++ b/lib/connection/pool.js @@ -65,9 +65,13 @@ var _id = 0; * @fires Pool#parseError * @return {Pool} A cursor instance */ -var Pool = function(options) { +var Pool = function(topology, options) { // Add event listener EventEmitter.call(this); + + // Store topology for later use + this.topology = topology; + // Add the options this.options = assign( { @@ -104,9 +108,6 @@ var Pool = function(options) { options ); - // console.log("=================================== pool options") - // console.dir(this.options) - // Identification information this.id = _id++; // Current reconnect retries @@ -269,9 +270,6 @@ function reauthenticate(pool, connection, cb) { function connectionFailureHandler(self, event) { return function(err) { - // console.log("========== connectionFailureHandler :: " + event) - // console.dir(err) - if (this._connectionFailHandled) return; this._connectionFailHandled = true; // Destroy the connection @@ -323,7 +321,6 @@ function connectionFailureHandler(self, event) { function attemptReconnect(self) { return function() { - // console.log("========================= attemptReconnect") self.emit('attemptReconnect', self); if (self.state === DESTROYED || self.state === DESTROYING) return; @@ -542,6 +539,12 @@ function messageHandler(self) { return handleOperationCallback(self, workItem.cb, new MongoError(err)); } + // Look for clusterTime, and update it if necessary + if (message.documents[0] && message.documents[0].hasOwnProperty('$clusterTime')) { + const $clusterTime = message.documents[0].$clusterTime; + self.topology.clusterTime = $clusterTime; + } + // Establish if we have an error if ( workItem.command && diff --git a/lib/topologies/mongos.js b/lib/topologies/mongos.js index c76fd0033..f4cbbba02 100644 --- a/lib/topologies/mongos.js +++ b/lib/topologies/mongos.js @@ -40,7 +40,8 @@ var MongoCR = require('../auth/mongocr'), Plain = require('../auth/plain'), GSSAPI = require('../auth/gssapi'), SSPI = require('../auth/sspi'), - ScramSHA1 = require('../auth/scram'); + ScramSHA1 = require('../auth/scram'), + resolveClusterTime = require('./shared').resolveClusterTime; // // States @@ -230,6 +231,9 @@ var Mongos = function(seedlist, options) { servers: [] }; + // Highest clusterTime seen in responses from the current deployment + this.clusterTime = null; + // Add event listener EventEmitter.call(this); }; @@ -283,7 +287,9 @@ Mongos.prototype.connect = function(options) { var servers = this.s.seedlist.map(function(x) { return new Server( assign( - {}, + { + clusterTimeWatcher: clusterTime => resolveClusterTime(self, clusterTime) + }, self.s.options, x, { @@ -606,7 +612,9 @@ function reconnectProxies(self, proxies, callback) { // Create a new server instance var server = new Server( assign( - {}, + { + clusterTimeWatcher: clusterTime => resolveClusterTime(self, clusterTime) + }, self.s.options, { host: _server.name.split(':')[0], diff --git a/lib/topologies/replset.js b/lib/topologies/replset.js index 29088382e..587212495 100644 --- a/lib/topologies/replset.js +++ b/lib/topologies/replset.js @@ -14,7 +14,8 @@ var inherits = require('util').inherits, clone = require('./shared').clone, Timeout = require('./shared').Timeout, Interval = require('./shared').Interval, - createClientInfo = require('./shared').createClientInfo; + createClientInfo = require('./shared').createClientInfo, + resolveClusterTime = require('./shared').resolveClusterTime; var MongoCR = require('../auth/mongocr'), X509 = require('../auth/x509'), @@ -252,6 +253,9 @@ var ReplSet = function(seedlist, options) { this.ismaster = null; // Contains the intervalId this.intervalIds = []; + + // Highest clusterTime seen in responses from the current deployment + this.clusterTime = null; }; inherits(ReplSet, EventEmitter); @@ -377,7 +381,9 @@ function connectNewServers(self, servers, callback) { // Create a new server instance var server = new Server( assign( - {}, + { + clusterTimeWatcher: clusterTime => resolveClusterTime(self, clusterTime) + }, self.s.options, { host: _server.split(':')[0], @@ -956,7 +962,9 @@ ReplSet.prototype.connect = function(options) { var servers = this.s.seedlist.map(function(x) { return new Server( assign( - {}, + { + clusterTimeWatcher: clusterTime => resolveClusterTime(self, clusterTime) + }, self.s.options, x, { diff --git a/lib/topologies/server.js b/lib/topologies/server.js index 0ea40fe83..119744fa3 100644 --- a/lib/topologies/server.js +++ b/lib/topologies/server.js @@ -154,6 +154,14 @@ var Server = function(options) { compression: { compressors: createCompressionInfo(options) } }; + // special case for Mongos and ReplSet deployments + if (options.clusterTimeWatcher) { + this.s.clusterTimeWatcher = options.clusterTimeWatcher; + } else { + // otherwise this is a single deployment and we need to track the clusterTime here + this.s.clusterTime = null; + } + // Curent ismaster this.ismaster = null; // Current ping time @@ -203,6 +211,24 @@ Object.defineProperty(Server.prototype, 'logicalSessionTimeoutMinutes', { } }); +// In single server deployments we track the clusterTime directly on the topology, however +// in Mongos and ReplSet deployments we instead need to delegate the clusterTime up to the +// tracking objects so we can ensure we are gossiping the maximum time received from the +// server. +Object.defineProperty(Server.prototype, 'clusterTime', { + enumerable: true, + set: function(clusterTime) { + if (this.s.clusterTimeWatcher) { + this.s.clusterTimeWatcher(clusterTime); + } else { + this.s.clusterTime = clusterTime; + } + }, + get: function() { + return this.s.clusterTime || null; + } +}); + Server.enableServerAccounting = function() { serverAccounting = true; servers = {}; @@ -503,7 +529,7 @@ Server.prototype.connect = function(options) { } // Create a pool - self.s.pool = new Pool(assign(self.s.options, options, { bson: this.s.bson })); + self.s.pool = new Pool(this, assign(self.s.options, options, { bson: this.s.bson })); // Set up listeners self.s.pool.on('close', eventHandler(self, 'close')); diff --git a/lib/topologies/shared.js b/lib/topologies/shared.js index 70624b120..0a7de71cf 100644 --- a/lib/topologies/shared.js +++ b/lib/topologies/shared.js @@ -354,6 +354,23 @@ function diff(previous, current) { return diff; } +/** + * Shared function to determine clusterTime for a given topology + * + * @param {*} topology + * @param {*} clusterTime + */ +function resolveClusterTime(topology, $clusterTime) { + if (topology.clusterTime == null) { + topology.clusterTime = $clusterTime; + } else { + if ($clusterTime.clusterTime.greaterThan(topology.clusterTime.clusterTime)) { + topology.clusterTime = $clusterTime; + } + } +} + +module.exports.resolveClusterTime = resolveClusterTime; module.exports.inquireServerState = inquireServerState; module.exports.getTopologyType = getTopologyType; module.exports.emitServerDescriptionChanged = emitServerDescriptionChanged; diff --git a/test/tests/functional/pool_tests.js b/test/tests/functional/pool_tests.js index 60b38c022..015f7f36e 100644 --- a/test/tests/functional/pool_tests.js +++ b/test/tests/functional/pool_tests.js @@ -17,7 +17,7 @@ describe('Pool tests', function() { Connection.enableConnectionAccounting(); // Attempt to connect - var pool = new Pool({ + var pool = new Pool(null, { host: this.configuration.host, port: this.configuration.port, bson: new Bson(), @@ -45,7 +45,7 @@ describe('Pool tests', function() { Connection.enableConnectionAccounting(); // Attempt to connect - var pool = new Pool({ + var pool = new Pool(null, { host: this.configuration.host, port: this.configuration.port, bson: new Bson() @@ -85,7 +85,7 @@ describe('Pool tests', function() { var index = 0; // Attempt to connect - var pool = new Pool({ + var pool = new Pool(null, { host: this.configuration.host, port: this.configuration.port, bson: new Bson() @@ -205,7 +205,7 @@ describe('Pool tests', function() { this.timeout(0); // Attempt to connect - var pool = new Pool({ + var pool = new Pool(null, { host: this.configuration.host, port: this.configuration.port, socketTimeout: 3000, @@ -242,7 +242,7 @@ describe('Pool tests', function() { Connection.enableConnectionAccounting(); // Attempt to connect - var pool = new Pool({ + var pool = new Pool(null, { host: this.configuration.host, port: this.configuration.port, socketTimeout: 3000, @@ -300,7 +300,7 @@ describe('Pool tests', function() { Connection.enableConnectionAccounting(); // Attempt to connect - var pool = new Pool({ + var pool = new Pool(null, { host: this.configuration.host, port: this.configuration.port, socketTimeout: 3000, @@ -382,7 +382,7 @@ describe('Pool tests', function() { Connection.enableConnectionAccounting(); // Attempt to connect - var pool = new Pool({ + var pool = new Pool(null, { host: this.configuration.host, port: this.configuration.port, socketTimeout: 3000, @@ -459,7 +459,7 @@ describe('Pool tests', function() { Connection.enableConnectionAccounting(); // Attempt to connect - var pool = new Pool({ + var pool = new Pool(null, { host: this.configuration.host, port: this.configuration.port, socketTimeout: 1000, @@ -526,7 +526,7 @@ describe('Pool tests', function() { expect(createUserRes).to.exist; expect(createUserErr).to.be.null; // Attempt to connect - var pool = new Pool({ + var pool = new Pool(null, { host: self.configuration.host, port: self.configuration.port, bson: new Bson() @@ -606,7 +606,7 @@ describe('Pool tests', function() { expect(createAdminUserErr).to.be.null; // Attempt to connect - var pool = new Pool({ + var pool = new Pool(null, { host: self.configuration.host, port: self.configuration.port, bson: new Bson() @@ -812,7 +812,7 @@ describe('Pool tests', function() { expect(createAdminUserErr).to.be.null; // Attempt to connect - var pool = new Pool({ + var pool = new Pool(null, { host: self.configuration.host, port: self.configuration.port, bson: new Bson() @@ -936,7 +936,7 @@ describe('Pool tests', function() { expect(createAdminUserRes).to.exist; expect(createAdminUserErr).to.be.null; // Attempt to connect - var pool = new Pool({ + var pool = new Pool(null, { host: self.configuration.host, port: self.configuration.port, bson: new Bson() @@ -1030,7 +1030,7 @@ describe('Pool tests', function() { expect(createAdminUserRes).to.exist; // Attempt to connect - var pool = new Pool({ + var pool = new Pool(null, { host: self.configuration.host, port: self.configuration.port, bson: new Bson() @@ -1098,7 +1098,7 @@ describe('Pool tests', function() { Connection.enableConnectionAccounting(); // Attempt to connect - var pool = new Pool({ + var pool = new Pool(null, { host: this.configuration.host, port: this.configuration.port, bson: new Bson(), diff --git a/test/tests/functional/shared.js b/test/tests/functional/shared.js index 9b289e6fa..b95b0501e 100644 --- a/test/tests/functional/shared.js +++ b/test/tests/functional/shared.js @@ -17,7 +17,7 @@ function executeCommand(configuration, db, cmd, options, cb) { var port = options.port || configuration.port; // Attempt to connect - var pool = new Pool({ + var pool = new Pool(null, { host: host, port: port, bson: new bson() @@ -60,7 +60,7 @@ function locateAuthMethod(configuration, cb) { var cmd = { ismaster: true }; // Attempt to connect - var pool = new Pool({ + var pool = new Pool(null, { host: configuration.host, port: configuration.port, bson: new bson() diff --git a/test/tests/unit/common.js b/test/tests/unit/common.js new file mode 100644 index 000000000..c31155532 --- /dev/null +++ b/test/tests/unit/common.js @@ -0,0 +1,107 @@ +'use strict'; + +var assign = require('../../../lib/utils').assign, + mock = require('../../mock'), + ObjectId = require('bson').ObjectId, + Timestamp = require('bson').Timestamp, + Binary = require('bson').Binary; + +class ReplSetFixture { + constructor() { + this.electionIds = [new ObjectId(), new ObjectId()]; + } + + setup() { + return Promise.all([ + mock.createServer(), + mock.createServer(), + mock.createServer() + ]).then(servers => { + this.servers = servers; + this.primaryServer = servers[0]; + this.firstSecondaryServer = servers[1]; + this.arbiterServer = servers[2]; + + this.defaultFields = assign({}, mock.DEFAULT_ISMASTER, { + setName: 'rs', + setVersion: 1, + electionId: this.electionIds[0], + hosts: this.servers.map(server => server.uri()), + arbiters: [this.arbiterServer.uri()] + }); + + this.defineReplSetStates(); + this.configureMessageHandlers(); + }); + } + + defineReplSetStates() { + this.primaryStates = [ + assign({}, this.defaultFields, { + ismaster: true, + secondary: false, + me: this.primaryServer.uri(), + primary: this.primaryServer.uri(), + tags: { loc: 'ny' } + }) + ]; + + this.firstSecondaryStates = [ + assign({}, this.defaultFields, { + ismaster: false, + secondary: true, + me: this.firstSecondaryServer.uri(), + primary: this.primaryServer.uri(), + tags: { loc: 'sf' } + }) + ]; + + this.arbiterStates = [ + assign({}, this.defaultFields, { + ismaster: false, + secondary: false, + arbiterOnly: true, + me: this.arbiterServer.uri(), + primary: this.primaryServer.uri() + }) + ]; + } + + configureMessageHandlers() { + this.primaryServer.setMessageHandler(request => { + var doc = request.document; + if (doc.ismaster) { + request.reply(this.primaryStates[0]); + } + }); + + this.firstSecondaryServer.setMessageHandler(request => { + var doc = request.document; + if (doc.ismaster) { + request.reply(this.firstSecondaryStates[0]); + } + }); + + this.arbiterServer.setMessageHandler(request => { + var doc = request.document; + if (doc.ismaster) { + request.reply(this.arbiterStates[0]); + } + }); + } +} + +function genClusterTime(time) { + return { + clusterTime: new Timestamp(time), + signature: { + hash: new Binary(new Buffer('testing')), + keyId: 42 + } + }; +} + +module.exports = { + ReplSetFixture: ReplSetFixture, + genClusterTime: genClusterTime +}; diff --git a/test/tests/unit/mongos/sessions_tests.js b/test/tests/unit/mongos/sessions_tests.js new file mode 100644 index 000000000..3779380d5 --- /dev/null +++ b/test/tests/unit/mongos/sessions_tests.js @@ -0,0 +1,95 @@ +'use strict'; +var Mongos = require('../../../../lib/topologies/mongos'), + expect = require('chai').expect, + assign = require('../../../../lib/utils').assign, + mock = require('../../../mock'), + genClusterTime = require('../common').genClusterTime; + +const test = {}; +describe('Sessions (Mongos)', function() { + describe('$clusterTime', function() { + afterEach(() => mock.cleanup()); + beforeEach(() => { + return mock.createServer().then(mockServer => { + test.server = mockServer; + }); + }); + + it('should recognize and set `clusterTime` on the topology', { + metadata: { requires: { topology: 'single' } }, + test: function(done) { + const clusterTime = genClusterTime(Date.now()); + test.server.setMessageHandler(request => { + request.reply( + assign({}, mock.DEFAULT_ISMASTER, { + msg: 'isdbgrid', + $clusterTime: clusterTime + }) + ); + }); + + const mongos = new Mongos([{ host: test.server.host, port: test.server.port }], { + connectionTimeout: 30000, + socketTimeout: 30000, + haInterval: 500, + size: 1 + }); + + mongos.on('error', done); + mongos.once('connect', () => { + expect(mongos.clusterTime).to.eql(clusterTime); + mongos.destroy(); + done(); + }); + + mongos.connect(); + } + }); + + it('should track the highest `$clusterTime` seen', { + metadata: { requires: { topology: 'single' } }, + test: function(done) { + const clusterTime = genClusterTime(Date.now()), + futureClusterTime = genClusterTime(Date.now() + 10 * 60 * 1000); + + test.server.setMessageHandler(request => { + const doc = request.document; + if (doc.ismaster) { + request.reply( + assign({}, mock.DEFAULT_ISMASTER, { + msg: 'isdbgrid', + $clusterTime: clusterTime + }) + ); + } else if (doc.insert) { + request.reply({ + ok: 1, + n: [], + lastOp: new Date(), + $clusterTime: futureClusterTime + }); + } + }); + + const mongos = new Mongos([{ host: test.server.host, port: test.server.port }]); + mongos.on('error', done); + mongos.once('connect', () => { + expect(mongos.clusterTime).to.exist; + expect(mongos.clusterTime).to.eql(clusterTime); + + mongos.insert('test.test', [{ created: new Date() }], function(err) { + expect(err).to.not.exist; + expect(mongos.clusterTime).to.exist; + expect(mongos.clusterTime).to.not.eql(clusterTime); + expect(mongos.clusterTime).to.eql(futureClusterTime); + + mongos.destroy(); + done(); + }); + }); + + mongos.connect(); + } + }); + }); +}); diff --git a/test/tests/unit/replset/sessions_tests.js b/test/tests/unit/replset/sessions_tests.js new file mode 100644 index 000000000..c4f14dc5a --- /dev/null +++ b/test/tests/unit/replset/sessions_tests.js @@ -0,0 +1,50 @@ +'use strict'; +var expect = require('chai').expect, + ReplSet = require('../../../../lib/topologies/replset'), + mock = require('../../../mock'), + genClusterTime = require('../common').genClusterTime, + ReplSetFixture = require('../common').ReplSetFixture; + +const test = new ReplSetFixture(); +describe('Sessions (ReplSet)', function() { + describe('$clusterTime', function() { + afterEach(() => mock.cleanup()); + beforeEach(() => test.setup()); + + it('should track the highest `clusterTime` seen in a replica set', { + metadata: { requires: { topology: 'single' } }, + test: function(done) { + const clusterTime = genClusterTime(Date.now()), + futureClusterTime = genClusterTime(Date.now() + 10 * 60 * 1000); + + test.primaryStates[0].$clusterTime = clusterTime; + test.firstSecondaryStates[0].$clusterTime = futureClusterTime; + test.arbiterStates[0].$clusterTime = futureClusterTime; + + var replset = new ReplSet( + [test.primaryServer.address(), test.firstSecondaryServer.address()], + { + setName: 'rs', + connectionTimeout: 3000, + socketTimeout: 0, + haInterval: 100, + size: 1 + } + ); + + let serverCount = 0; + replset.on('joined', () => { + serverCount++; + if (serverCount === 3) { + expect(replset.clusterTime).to.eql(futureClusterTime); + replset.destroy(); + done(); + } + }); + + replset.on('error', done); + replset.connect(); + } + }); + }); +}); diff --git a/test/tests/unit/single/sessions_tests.js b/test/tests/unit/single/sessions_tests.js new file mode 100644 index 000000000..7a3b84080 --- /dev/null +++ b/test/tests/unit/single/sessions_tests.js @@ -0,0 +1,87 @@ +'use strict'; +var Server = require('../../../../lib/topologies/server'), + expect = require('chai').expect, + assign = require('../../../../lib/utils').assign, + mock = require('../../../mock'), + genClusterTime = require('../common').genClusterTime; + +const test = {}; +describe('Sessions (Single)', function() { + describe('$clusterTime', function() { + afterEach(() => mock.cleanup()); + beforeEach(() => { + return mock.createServer(37019, 'localhost').then(mockServer => { + test.server = mockServer; + }); + }); + + it('should recognize and set `clusterTime` on the topology', { + metadata: { requires: { topology: 'single' } }, + test: function(done) { + const clusterTime = genClusterTime(Date.now()); + test.server.setMessageHandler(request => { + request.reply( + assign({}, mock.DEFAULT_ISMASTER, { + $clusterTime: clusterTime + }) + ); + }); + + const client = new Server({ host: test.server.host, port: test.server.port }); + client.on('error', done); + client.once('connect', () => { + expect(client.clusterTime).to.eql(clusterTime); + client.destroy(); + done(); + }); + + client.connect(); + } + }); + + it('should track the highest `$clusterTime` seen', { + metadata: { requires: { topology: 'single' } }, + test: function(done) { + const clusterTime = genClusterTime(Date.now()), + futureClusterTime = genClusterTime(Date.now() + 10 * 60 * 1000); + + test.server.setMessageHandler(request => { + const doc = request.document; + if (doc.ismaster) { + request.reply( + assign({}, mock.DEFAULT_ISMASTER, { + $clusterTime: clusterTime + }) + ); + } else if (doc.insert) { + request.reply({ + ok: 1, + n: [], + lastOp: new Date(), + $clusterTime: futureClusterTime + }); + } + }); + + const client = new Server({ host: test.server.host, port: test.server.port }); + client.on('error', done); + client.once('connect', () => { + expect(client.clusterTime).to.exist; + expect(client.clusterTime).to.eql(clusterTime); + + client.insert('test.test', [{ created: new Date() }], function(err) { + expect(err).to.not.exist; + expect(client.clusterTime).to.exist; + expect(client.clusterTime).to.not.eql(clusterTime); + expect(client.clusterTime).to.eql(futureClusterTime); + + client.destroy(); + done(); + }); + }); + + client.connect(); + } + }); + }); +});