Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
feat(cluster-time): track incoming cluster time gossiping
Browse files Browse the repository at this point in the history
This is the first part of the requirement to "gossip the cluster
time" by tracking the clusterTime provided on incoming server
responses.

NODE-1088
  • Loading branch information
mbroadst committed Oct 2, 2017
1 parent d80d956 commit c910706
Show file tree
Hide file tree
Showing 11 changed files with 432 additions and 31 deletions.
19 changes: 11 additions & 8 deletions lib/connection/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,13 @@ var _id = 0;
* @fires Pool#parseError
* @return {Pool} A cursor instance
*/
var Pool = function(options) {
var Pool = function(topology, options) {
// Add event listener
EventEmitter.call(this);

// Store topology for later use
this.topology = topology;

// Add the options
this.options = assign(
{
Expand Down Expand Up @@ -104,9 +108,6 @@ var Pool = function(options) {
options
);

// console.log("=================================== pool options")
// console.dir(this.options)

// Identification information
this.id = _id++;
// Current reconnect retries
Expand Down Expand Up @@ -269,9 +270,6 @@ function reauthenticate(pool, connection, cb) {

function connectionFailureHandler(self, event) {
return function(err) {
// console.log("========== connectionFailureHandler :: " + event)
// console.dir(err)

if (this._connectionFailHandled) return;
this._connectionFailHandled = true;
// Destroy the connection
Expand Down Expand Up @@ -323,7 +321,6 @@ function connectionFailureHandler(self, event) {

function attemptReconnect(self) {
return function() {
// console.log("========================= attemptReconnect")
self.emit('attemptReconnect', self);
if (self.state === DESTROYED || self.state === DESTROYING) return;

Expand Down Expand Up @@ -542,6 +539,12 @@ function messageHandler(self) {
return handleOperationCallback(self, workItem.cb, new MongoError(err));
}

// Look for clusterTime, and update it if necessary
if (message.documents[0] && message.documents[0].hasOwnProperty('$clusterTime')) {
const $clusterTime = message.documents[0].$clusterTime;
self.topology.clusterTime = $clusterTime;
}

// Establish if we have an error
if (
workItem.command &&
Expand Down
14 changes: 11 additions & 3 deletions lib/topologies/mongos.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ var MongoCR = require('../auth/mongocr'),
Plain = require('../auth/plain'),
GSSAPI = require('../auth/gssapi'),
SSPI = require('../auth/sspi'),
ScramSHA1 = require('../auth/scram');
ScramSHA1 = require('../auth/scram'),
resolveClusterTime = require('./shared').resolveClusterTime;

//
// States
Expand Down Expand Up @@ -230,6 +231,9 @@ var Mongos = function(seedlist, options) {
servers: []
};

// Highest clusterTime seen in responses from the current deployment
this.clusterTime = null;

// Add event listener
EventEmitter.call(this);
};
Expand Down Expand Up @@ -283,7 +287,9 @@ Mongos.prototype.connect = function(options) {
var servers = this.s.seedlist.map(function(x) {
return new Server(
assign(
{},
{
clusterTimeWatcher: clusterTime => resolveClusterTime(self, clusterTime)
},
self.s.options,
x,
{
Expand Down Expand Up @@ -606,7 +612,9 @@ function reconnectProxies(self, proxies, callback) {
// Create a new server instance
var server = new Server(
assign(
{},
{
clusterTimeWatcher: clusterTime => resolveClusterTime(self, clusterTime)
},
self.s.options,
{
host: _server.name.split(':')[0],
Expand Down
14 changes: 11 additions & 3 deletions lib/topologies/replset.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ var inherits = require('util').inherits,
clone = require('./shared').clone,
Timeout = require('./shared').Timeout,
Interval = require('./shared').Interval,
createClientInfo = require('./shared').createClientInfo;
createClientInfo = require('./shared').createClientInfo,
resolveClusterTime = require('./shared').resolveClusterTime;

var MongoCR = require('../auth/mongocr'),
X509 = require('../auth/x509'),
Expand Down Expand Up @@ -252,6 +253,9 @@ var ReplSet = function(seedlist, options) {
this.ismaster = null;
// Contains the intervalId
this.intervalIds = [];

// Highest clusterTime seen in responses from the current deployment
this.clusterTime = null;
};

inherits(ReplSet, EventEmitter);
Expand Down Expand Up @@ -377,7 +381,9 @@ function connectNewServers(self, servers, callback) {
// Create a new server instance
var server = new Server(
assign(
{},
{
clusterTimeWatcher: clusterTime => resolveClusterTime(self, clusterTime)
},
self.s.options,
{
host: _server.split(':')[0],
Expand Down Expand Up @@ -956,7 +962,9 @@ ReplSet.prototype.connect = function(options) {
var servers = this.s.seedlist.map(function(x) {
return new Server(
assign(
{},
{
clusterTimeWatcher: clusterTime => resolveClusterTime(self, clusterTime)
},
self.s.options,
x,
{
Expand Down
28 changes: 27 additions & 1 deletion lib/topologies/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ var Server = function(options) {
compression: { compressors: createCompressionInfo(options) }
};

// special case for Mongos and ReplSet deployments
if (options.clusterTimeWatcher) {
this.s.clusterTimeWatcher = options.clusterTimeWatcher;
} else {
// otherwise this is a single deployment and we need to track the clusterTime here
this.s.clusterTime = null;
}

// Curent ismaster
this.ismaster = null;
// Current ping time
Expand Down Expand Up @@ -203,6 +211,24 @@ Object.defineProperty(Server.prototype, 'logicalSessionTimeoutMinutes', {
}
});

// In single server deployments we track the clusterTime directly on the topology, however
// in Mongos and ReplSet deployments we instead need to delegate the clusterTime up to the
// tracking objects so we can ensure we are gossiping the maximum time received from the
// server.
Object.defineProperty(Server.prototype, 'clusterTime', {
enumerable: true,
set: function(clusterTime) {
if (this.s.clusterTimeWatcher) {
this.s.clusterTimeWatcher(clusterTime);
} else {
this.s.clusterTime = clusterTime;
}
},
get: function() {
return this.s.clusterTime || null;
}
});

Server.enableServerAccounting = function() {
serverAccounting = true;
servers = {};
Expand Down Expand Up @@ -503,7 +529,7 @@ Server.prototype.connect = function(options) {
}

// Create a pool
self.s.pool = new Pool(assign(self.s.options, options, { bson: this.s.bson }));
self.s.pool = new Pool(this, assign(self.s.options, options, { bson: this.s.bson }));

// Set up listeners
self.s.pool.on('close', eventHandler(self, 'close'));
Expand Down
17 changes: 17 additions & 0 deletions lib/topologies/shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,23 @@ function diff(previous, current) {
return diff;
}

/**
* Shared function to determine clusterTime for a given topology
*
* @param {*} topology
* @param {*} clusterTime
*/
function resolveClusterTime(topology, $clusterTime) {
if (topology.clusterTime == null) {
topology.clusterTime = $clusterTime;
} else {
if ($clusterTime.clusterTime.greaterThan(topology.clusterTime.clusterTime)) {
topology.clusterTime = $clusterTime;
}
}
}

module.exports.resolveClusterTime = resolveClusterTime;
module.exports.inquireServerState = inquireServerState;
module.exports.getTopologyType = getTopologyType;
module.exports.emitServerDescriptionChanged = emitServerDescriptionChanged;
Expand Down
28 changes: 14 additions & 14 deletions test/tests/functional/pool_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ describe('Pool tests', function() {
Connection.enableConnectionAccounting();

// Attempt to connect
var pool = new Pool({
var pool = new Pool(null, {
host: this.configuration.host,
port: this.configuration.port,
bson: new Bson(),
Expand Down Expand Up @@ -45,7 +45,7 @@ describe('Pool tests', function() {
Connection.enableConnectionAccounting();

// Attempt to connect
var pool = new Pool({
var pool = new Pool(null, {
host: this.configuration.host,
port: this.configuration.port,
bson: new Bson()
Expand Down Expand Up @@ -85,7 +85,7 @@ describe('Pool tests', function() {
var index = 0;

// Attempt to connect
var pool = new Pool({
var pool = new Pool(null, {
host: this.configuration.host,
port: this.configuration.port,
bson: new Bson()
Expand Down Expand Up @@ -205,7 +205,7 @@ describe('Pool tests', function() {
this.timeout(0);

// Attempt to connect
var pool = new Pool({
var pool = new Pool(null, {
host: this.configuration.host,
port: this.configuration.port,
socketTimeout: 3000,
Expand Down Expand Up @@ -242,7 +242,7 @@ describe('Pool tests', function() {
Connection.enableConnectionAccounting();

// Attempt to connect
var pool = new Pool({
var pool = new Pool(null, {
host: this.configuration.host,
port: this.configuration.port,
socketTimeout: 3000,
Expand Down Expand Up @@ -300,7 +300,7 @@ describe('Pool tests', function() {
Connection.enableConnectionAccounting();

// Attempt to connect
var pool = new Pool({
var pool = new Pool(null, {
host: this.configuration.host,
port: this.configuration.port,
socketTimeout: 3000,
Expand Down Expand Up @@ -382,7 +382,7 @@ describe('Pool tests', function() {
Connection.enableConnectionAccounting();

// Attempt to connect
var pool = new Pool({
var pool = new Pool(null, {
host: this.configuration.host,
port: this.configuration.port,
socketTimeout: 3000,
Expand Down Expand Up @@ -459,7 +459,7 @@ describe('Pool tests', function() {
Connection.enableConnectionAccounting();

// Attempt to connect
var pool = new Pool({
var pool = new Pool(null, {
host: this.configuration.host,
port: this.configuration.port,
socketTimeout: 1000,
Expand Down Expand Up @@ -526,7 +526,7 @@ describe('Pool tests', function() {
expect(createUserRes).to.exist;
expect(createUserErr).to.be.null;
// Attempt to connect
var pool = new Pool({
var pool = new Pool(null, {
host: self.configuration.host,
port: self.configuration.port,
bson: new Bson()
Expand Down Expand Up @@ -606,7 +606,7 @@ describe('Pool tests', function() {
expect(createAdminUserErr).to.be.null;

// Attempt to connect
var pool = new Pool({
var pool = new Pool(null, {
host: self.configuration.host,
port: self.configuration.port,
bson: new Bson()
Expand Down Expand Up @@ -812,7 +812,7 @@ describe('Pool tests', function() {
expect(createAdminUserErr).to.be.null;

// Attempt to connect
var pool = new Pool({
var pool = new Pool(null, {
host: self.configuration.host,
port: self.configuration.port,
bson: new Bson()
Expand Down Expand Up @@ -936,7 +936,7 @@ describe('Pool tests', function() {
expect(createAdminUserRes).to.exist;
expect(createAdminUserErr).to.be.null;
// Attempt to connect
var pool = new Pool({
var pool = new Pool(null, {
host: self.configuration.host,
port: self.configuration.port,
bson: new Bson()
Expand Down Expand Up @@ -1030,7 +1030,7 @@ describe('Pool tests', function() {
expect(createAdminUserRes).to.exist;

// Attempt to connect
var pool = new Pool({
var pool = new Pool(null, {
host: self.configuration.host,
port: self.configuration.port,
bson: new Bson()
Expand Down Expand Up @@ -1098,7 +1098,7 @@ describe('Pool tests', function() {
Connection.enableConnectionAccounting();

// Attempt to connect
var pool = new Pool({
var pool = new Pool(null, {
host: this.configuration.host,
port: this.configuration.port,
bson: new Bson(),
Expand Down
4 changes: 2 additions & 2 deletions test/tests/functional/shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ function executeCommand(configuration, db, cmd, options, cb) {
var port = options.port || configuration.port;

// Attempt to connect
var pool = new Pool({
var pool = new Pool(null, {
host: host,
port: port,
bson: new bson()
Expand Down Expand Up @@ -60,7 +60,7 @@ function locateAuthMethod(configuration, cb) {
var cmd = { ismaster: true };

// Attempt to connect
var pool = new Pool({
var pool = new Pool(null, {
host: configuration.host,
port: configuration.port,
bson: new bson()
Expand Down
Loading

0 comments on commit c910706

Please sign in to comment.