Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
feat(monitoring): add support for server monitoring to Server
Browse files Browse the repository at this point in the history
This adds the final piece of SDAM into the implementation, allowing
for a monitoring loop that informs our topology state machine.
  • Loading branch information
mbroadst committed Aug 8, 2018
1 parent a252cc7 commit 30a394d
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 5 deletions.
100 changes: 96 additions & 4 deletions lib/sdam/monitoring.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
'use strict';

const ServerDescription = require('./server_description').ServerDescription;
const calculateDurationInMs = require('../utils').calculateDurationInMs;

/**
* Published when server description changes, but does NOT include changes to the RTT.
*
Expand Down Expand Up @@ -88,7 +91,7 @@ class ServerHeartbeatStartedEvent {
/**
* Fired when the server monitor’s ismaster succeeds.
*
* @param {Number} duration The execution time of the event
* @param {Number} duration The execution time of the event in ms
* @param {Object} reply The command reply
* @param {Object} connectionId The connection id for the command
*/
Expand All @@ -101,16 +104,104 @@ class ServerHeartbeatSucceededEvent {
/**
* Fired when the server monitor’s ismaster fails, either with an “ok: 0” or a socket exception.
*
* @param {Number} duration The execution time of the event
* @param {Number} duration The execution time of the event in ms
* @param {MongoError|Object} failure The command failure
* @param {Object} connectionId The connection id for the command
*/
class ServerHearbeatFailedEvent {
class ServerHeartbeatFailedEvent {
constructor(duration, failure, connectionId) {
Object.assign(this, { duration, failure, connectionId });
}
}

/**
* Performs a server check as described by the SDAM spec.
*
* NOTE: This method automatically reschedules itself, so that there is always an active
* monitoring process
*
* @param {Server} server The server to monitor
*/
function monitorServer(server) {
// executes a single check of a server
const checkServer = callback => {
let start = process.hrtime();

// emit a signal indicating we have started the heartbeat
server.emit('serverHeartbeatStarted', new ServerHeartbeatStartedEvent(server.name));

server.command(
'admin.$cmd',
{ ismaster: true },
{
monitoring: true,
socketTimeout: server.s.options.connectionTimeout || 2000
},
function(err, result) {
let duration = calculateDurationInMs(start);

if (err) {
server.emit(
'serverHeartbeatFailed',
new ServerHeartbeatFailedEvent(duration, err, server.name)
);

return callback(err, null);
}

const isMaster = result.result;
server.emit(
'serverHeartbeatSucceded',
new ServerHeartbeatSucceededEvent(duration, isMaster, server.name)
);

return callback(null, isMaster);
}
);
};

const successHandler = isMaster => {
server.s.monitoring = false;

// emit an event indicating that our description has changed
server.emit('descriptionReceived', new ServerDescription(server.description.address, isMaster));

// schedule the next monitoring process
server.s.monitorId = setTimeout(
() => monitorServer(server),
server.s.options.heartbeatFrequencyMS
);
};

// run the actual monitoring loop
server.s.monitoring = true;
checkServer((err, isMaster) => {
if (err) {
// According to the SDAM specification's "Network error during server check" section, if
// an ismaster call fails we reset the server's pool. If a server was once connected,
// change its type to `Unknown` only after retrying once.

// TODO: we need to reset the pool here

return checkServer((err, isMaster) => {
if (err) {
server.s.monitoring = false;

// we revert to an `Unknown` by emitting a default description with no isMaster
server.emit('descriptionReceived', new ServerDescription(server.description.address));

// we do not reschedule monitoring in this case
return;
}

successHandler(isMaster);
});
}

successHandler(isMaster);
});
}

module.exports = {
ServerDescriptionChangedEvent,
ServerOpeningEvent,
Expand All @@ -120,5 +211,6 @@ module.exports = {
TopologyClosedEvent,
ServerHeartbeatStartedEvent,
ServerHeartbeatSucceededEvent,
ServerHearbeatFailedEvent
ServerHeartbeatFailedEvent,
monitorServer
};
15 changes: 14 additions & 1 deletion lib/sdam/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const createClientInfo = require('../topologies/shared').createClientInfo;
const Logger = require('../connection/logger');
const ServerDescription = require('./server_description').ServerDescription;
const ReadPreference = require('../topologies/read_preference');
const monitorServer = require('./monitoring').monitorServer;

/**
*
Expand Down Expand Up @@ -56,7 +57,9 @@ class Server extends EventEmitter {
BSON.Timestamp
]),
// client metadata for the initial handshake
clientInfo: createClientInfo(options)
clientInfo: createClientInfo(options),
// state variable to determine if there is an active server check in progress
monitoring: false
};
}

Expand Down Expand Up @@ -119,6 +122,16 @@ class Server extends EventEmitter {
}
}

/**
* Immediately schedule monitoring of this server. If there already an attempt being made
* this will be a no-op.
*/
monitor() {
if (this.s.monitoring) return;
if (this.s.monitorId) clearTimeout(this.s.monitorId);
monitorServer(this);
}

/**
* Execute a command
*
Expand Down

0 comments on commit 30a394d

Please sign in to comment.