Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
feat(server-selection): add basic support for server selection
Browse files Browse the repository at this point in the history
NODE-1259
  • Loading branch information
mbroadst committed May 23, 2018
1 parent cc843cf commit ccc5e1d
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 20 deletions.
8 changes: 6 additions & 2 deletions lib/sdam/server_description.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ class ServerDescription {
* Create a ServerDescription
* @param {String} address The address of the server
* @param {Object} [ismaster] An optional ismaster response for this server
* @param {Object} [options] Optional settings
* @param {Number} [options.roundTripTime] The round trip time to ping this server (in ms)
*/
constructor(address, ismaster) {
constructor(address, ismaster, options) {
options = options || {};

this.address = address;
this.error = null;
this.roundTripTime = null;
this.roundTripTime = options.roundTripTime;
this.lastWriteDate = ismaster && ismaster.lastWrite ? ismaster.lastWrite.lasteWriteDate : null;
this.opTime = ismaster && ismaster.lastWrite ? ismaster.lastWrite.opTime : null;
this.type = parseServerType(ismaster);
Expand Down
168 changes: 168 additions & 0 deletions lib/sdam/server_selectors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
'use strict';
const ServerType = require('./server_description').ServerType;
const TopologyType = require('./topology_description').TopologyType;
const ReadPreference = require('../topologies/read_preference');

function writableServerSelector() {
return function(topologyDescription, servers) {
if (topologyDescription === TopologyType.ReplicaSetNoPrimary) return [];
if (
topologyDescription.type === TopologyType.Sharded ||
topologyDescription.type === TopologyType.Single
) {
return servers;
}

return servers.filter(s => s.isWritable);
};
}

// reducers
function maxStalenessReducer(readPreference, topologyDescription, servers) {
if (readPreference.maxStalenessSeconds == null || readPreference.maxStalenessSeconds < 0) {
return servers;
}

if (topologyDescription.type === TopologyType.ReplicaSetWithPrimary) {
const primary = servers.filter(primaryFilter);
return servers.reduce((result, server) => {
const staleness =
server.lastUpdateTime -
server.lastWriteDate -
(primary.lastUpdateTime - primary.lastWriteDate) +
topologyDescription.heartbeatFrequencyMS;

if (staleness <= readPreference.maxStalenessSeconds) result.push(server);
return result;
}, []);
} else if (topologyDescription.type === TopologyType.ReplicaSetNoPrimary) {
const sMax = servers.reduce((max, s) => (s.lastWriteDate > max.lastWriteDate ? s : max));
return servers.reduce((result, server) => {
const staleness =
sMax.lastWriteDate - server.lastWriteDate + topologyDescription.heartbeatFrequencyMS;
if (staleness <= readPreference.maxStalenessSeconds) result.push(server);
return result;
}, []);
}

return servers;
}

function tagSetMatch(tagSet, serverTags) {
const keys = Object.keys(tagSet);
const serverTagKeys = Object.keys(serverTags);
for (let i = 0; i < keys.length; ++i) {
const key = keys[i];
if (serverTagKeys.indexOf(key) === -1 || serverTags[key] !== tagSet[key]) {
return false;
}
}

return true;
}

function tagSetReducer(readPreference, servers) {
if (
readPreference.tags == null ||
(Array.isArray(readPreference.tags) && readPreference.tags.length === 0)
) {
return servers;
}

for (let i = 0; i < readPreference.tags.length; ++i) {
const tagSet = readPreference.tags[i];
const serversMatchingTagset = servers.reduce((matched, server) => {
if (tagSetMatch(tagSet, server.tags)) matched.push(server);
return matched;
}, []);

if (serversMatchingTagset.length) {
return serversMatchingTagset;
}
}

return [];
}

function latencyWindowReducer(readPreference, servers) {
return servers;
}

// filters
function primaryFilter(server) {
return server.type === ServerType.RSPrimary;
}

function secondaryFilter(server) {
return server.type === ServerType.RSSecondary;
}

function nearestFilter(server) {
return server.type === ServerType.RSSecondary || server.type === ServerType.RSPrimary;
}

function readPreferenceServerSelector(readPreference) {
if (!readPreference.isValid()) {
throw new TypeError('Invalid read preference specified');
}

return function(topologyDescription, servers) {
if (
topologyDescription.type === TopologyType.Single ||
topologyDescription.type === TopologyType.Sharded ||
topologyDescription.type === TopologyType.Unknown
) {
return servers;
}

if (readPreference.mode === ReadPreference.PRIMARY) {
return servers.filter(s => s.type === ServerType.RSPrimary);
}

if (readPreference.mode === ReadPreference.SECONDARY) {
return latencyWindowReducer(
readPreference,
tagSetReducer(
readPreference,
maxStalenessReducer(readPreference, topologyDescription, servers.filter(secondaryFilter))
)
);
} else if (readPreference.mode === ReadPreference.NEAREST) {
return latencyWindowReducer(
readPreference,
tagSetReducer(
readPreference,
maxStalenessReducer(readPreference, topologyDescription, servers.filter(nearestFilter))
)
);
} else if (readPreference.mode === ReadPreference.SECONDARY_PREFERRED) {
const result = latencyWindowReducer(
readPreference,
tagSetReducer(
readPreference,
maxStalenessReducer(readPreference, topologyDescription, servers.filter(secondaryFilter))
)
);

return result.length === 0 ? servers.filter(primaryFilter) : result;
} else if (readPreference.mode === ReadPreference.PRIMARY_PREFERRED) {
const result = servers.filter(primaryFilter);
if (result.length) {
return result;
}

return latencyWindowReducer(
readPreference,
tagSetReducer(
readPreference,
maxStalenessReducer(readPreference, topologyDescription, servers.filter(secondaryFilter))
)
);
}
};
}

module.exports = {
writableServerSelector,
readPreferenceServerSelector
};
91 changes: 84 additions & 7 deletions lib/sdam/topology.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,18 @@ const ServerDescription = require('./server_description').ServerDescription;
const TopologyDescription = require('./topology_description').TopologyDescription;
const TopologyType = require('./topology_description').TopologyType;
const monitoring = require('./monitoring');
const calculateDurationInMs = require('../utils').calculateDurationInMs;
const MongoTimeoutError = require('../error').MongoTimeoutError;
const MongoError = require('../error').MongoError;

// Global state
let globalTopologyCounter = 0;

// Constants
const DEFAULT_LOCAL_THRESHOLD_MS = 15;
const DEFAULT_HEARTBEAT_FREQUENCY = 10000;
const DEFAULT_SERVER_SELECTION_TIMEOUT = 30000;

/**
* A container of server instances representing a connection to a MongoDB topology.
*
Expand All @@ -27,11 +35,22 @@ class Topology extends EventEmitter {
*
* @param {Array|String} seedlist a string list, or array of Server instances to connect to
* @param {Object} [options] Optional settings
* @param {Number} [options.localThresholdMS=15] The size of the latency window for selecting among multiple suitable servers
* @param {Number} [options.serverSelectionTimeoutMS=30000] How long to block for server selection before throwing an error
* @param {Number} [options.heartbeatFrequencyMS=10000] The frequency with which topology updates are scheduled
*/
constructor(seedlist, options) {
super();
seedlist = seedlist || [];
options = options || {};
options = Object.assign(
{},
{
localThresholdMS: DEFAULT_LOCAL_THRESHOLD_MS,
serverSelectionTimeoutMS: DEFAULT_SERVER_SELECTION_TIMEOUT,
heartbeatFrequencyMS: DEFAULT_HEARTBEAT_FREQUENCY
},
options
);

const topologyType =
seedlist.length === 1 && !options.replicaset
Expand Down Expand Up @@ -62,7 +81,11 @@ class Topology extends EventEmitter {
null,
null,
options
)
),
serverSelectionTimeoutMS:
options.serverSelectionTimeoutMS || DEFAULT_SERVER_SELECTION_TIMEOUT,
heartbeatFrequencyMS: options.heartbeatFrequencyMS || DEFAULT_HEARTBEAT_FREQUENCY,
ServerClass: options.ServerClass || null /* eventually our Server class, but null for now */
};
}

Expand Down Expand Up @@ -111,17 +134,33 @@ class Topology extends EventEmitter {
/**
* Selects a server according to the selection predicate provided
*
* @param {function} [predicate] An optional predicate to select servers by, defaults to a random selection within a latency window
* @param {function} [selector] An optional selector to select servers by, defaults to a random selection within a latency window
* @return {Server} An instance of a `Server` meeting the criteria of the predicate provided
*/
selectServer(/* predicate */) {
return;
selectServer(selector, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = Object.assign(
{},
{ serverSelectionTimeoutMS: this.s.serverSelectionTimeoutMS },
options
);

selectServers(
this,
selector,
options.serverSelectionTimeoutMS,
process.hrtime(),
(err, servers) => {
if (err) return callback(err, null);
callback(null, randomSelection(servers));
}
);
}

/**
* Update the topology with a ServerDescription
* Update the internal TopologyDescription with a ServerDescription
*
* @param {object} serverDescription the server to update
* @param {object} serverDescription The server to update in the internal list of server descriptions
*/
update(serverDescription) {
// these will be used for monitoring events later
Expand Down Expand Up @@ -153,6 +192,44 @@ class Topology extends EventEmitter {
}
}

function randomSelection(array) {
return array[Math.floor(Math.random() * array.length)];
}

class FakeServer {
constructor(description) {
this.description = description;
}
}

/**
*
* @param {*} topology
* @param {*} selector
* @param {*} options
* @param {*} callback
*/
function selectServers(topology, selector, timeout, start, callback) {
if (!topology.description.compatible) {
return callback(new MongoError(topology.description.compatibilityError));
}

const serverDescriptions = Array.from(topology.description.servers.values());
let descriptions = selector(topology.description, serverDescriptions);
if (descriptions.length) {
// TODO: obviously return the actual server in the future
const servers = descriptions.map(d => new FakeServer(d));
return callback(null, servers);
}

const duration = calculateDurationInMs(process.hrtime(start));
if (duration > timeout) {
return callback(new MongoTimeoutError(`Server selection timed out after ${timeout} ms`));
}

// TODO: loop this, add monitoring
}

/**
* A server opening SDAM monitoring event
*
Expand Down
18 changes: 7 additions & 11 deletions lib/sdam/topology_description.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,12 @@ class TopologyDescription {
* @param {number} maxSetVersion
* @param {ObjectId} maxElectionId
*/
constructor(
topologyType,
serverDescriptions,
setName,
maxSetVersion,
maxElectionId
/*, options */
) {
constructor(topologyType, serverDescriptions, setName, maxSetVersion, maxElectionId, options) {
options = options || {};

// TODO: consider assigning all these values to a temporary value `s` which
// we use `Object.freeze` on, ensuring the internal state of this type
// is immutable.

this.type = topologyType || TopologyType.Unknown;
this.setName = setName || null;
this.maxSetVersion = maxSetVersion || null;
Expand All @@ -50,6 +44,8 @@ class TopologyDescription {
this.compatible = true;
this.compatibilityError = null;
this.logicalSessionTimeoutMinutes = null;
this.heartbeatFrequencyMS = options.heartbeatFrequencyMS || 0;
this.options = options;

// determine server compatibility
for (const serverDescription of this.servers.values()) {
Expand Down Expand Up @@ -112,7 +108,7 @@ class TopologyDescription {
setName,
maxSetVersion,
maxElectionId,
{}
this.options
);
}

Expand Down Expand Up @@ -192,7 +188,7 @@ class TopologyDescription {
setName,
maxSetVersion,
maxElectionId,
{}
this.options
);
}

Expand Down

0 comments on commit ccc5e1d

Please sign in to comment.