From d9c5c16c733a40f647a52209d87e1403e3f77e19 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Tue, 24 Apr 2018 12:32:17 -0400 Subject: [PATCH] feat(max-staleness): properly support a max staleness reducer This adds a `maxStalenessReducer` to our server selectors, and adds it in all the right places in the `readPreferenceServerSelector`. --- lib/sdam/server_selectors.js | 64 ++++++++++++++++++++++++++++-------- lib/sdam/topology.js | 14 +++++--- package.json | 1 + 3 files changed, 60 insertions(+), 19 deletions(-) diff --git a/lib/sdam/server_selectors.js b/lib/sdam/server_selectors.js index 53d02abec..f16e0fcbd 100644 --- a/lib/sdam/server_selectors.js +++ b/lib/sdam/server_selectors.js @@ -2,6 +2,11 @@ const ServerType = require('./server_description').ServerType; const TopologyType = require('./topology_description').TopologyType; const ReadPreference = require('../topologies/read_preference'); +const MongoError = require('../error').MongoError; + +// max staleness constants +const IDLE_WRITE_PERIOD = 10000; +const SMALLEST_MAX_STALENESS_SECONDS = 90; function writableServerSelector() { return function(topologyDescription, servers) { @@ -23,23 +28,39 @@ function maxStalenessReducer(readPreference, topologyDescription, servers) { return servers; } + const maxStaleness = readPreference.maxStalenessSeconds; + const maxStalenessVariance = + (topologyDescription.heartbeatFrequencyMS + IDLE_WRITE_PERIOD) / 1000; + if (maxStaleness < maxStalenessVariance) { + throw MongoError(`maxStalenessSeconds must be at least ${maxStalenessVariance} seconds`); + } + + if (maxStaleness < SMALLEST_MAX_STALENESS_SECONDS) { + throw new MongoError( + `maxStalenessSeconds must be at least ${SMALLEST_MAX_STALENESS_SECONDS} seconds` + ); + } + if (topologyDescription.type === TopologyType.ReplicaSetWithPrimary) { - const primary = servers.filter(primaryFilter); + const primary = servers.filter(primaryFilter)[0]; return servers.reduce((result, server) => { - const staleness = + const stalenessMS = server.lastUpdateTime - server.lastWriteDate - (primary.lastUpdateTime - primary.lastWriteDate) + topologyDescription.heartbeatFrequencyMS; + const staleness = stalenessMS / 1000; if (staleness <= readPreference.maxStalenessSeconds) result.push(server); return result; }, []); } else if (topologyDescription.type === TopologyType.ReplicaSetNoPrimary) { const sMax = servers.reduce((max, s) => (s.lastWriteDate > max.lastWriteDate ? s : max)); return servers.reduce((result, server) => { - const staleness = + const stalenessMS = sMax.lastWriteDate - server.lastWriteDate + topologyDescription.heartbeatFrequencyMS; + + const staleness = stalenessMS / 1000; if (staleness <= readPreference.maxStalenessSeconds) result.push(server); return result; }, []); @@ -111,18 +132,33 @@ function nearestFilter(server) { return server.type === ServerType.RSSecondary || server.type === ServerType.RSPrimary; } +function knownFilter(server) { + return server.type !== ServerType.Unknown; +} + function readPreferenceServerSelector(readPreference) { if (!readPreference.isValid()) { throw new TypeError('Invalid read preference specified'); } return function(topologyDescription, servers) { + const commonWireVersion = topologyDescription.commonWireVersion; + if ( + commonWireVersion && + (readPreference.minWireVersion && readPreference.minWireVersion > commonWireVersion) + ) { + throw new MongoError( + `Minimum wire version '${ + readPreference.minWireVersion + }' required, but found '${commonWireVersion}'` + ); + } + if ( topologyDescription.type === TopologyType.Single || - topologyDescription.type === TopologyType.Sharded || - topologyDescription.type === TopologyType.Unknown + topologyDescription.type === TopologyType.Sharded ) { - return latencyWindowReducer(topologyDescription, servers); + return latencyWindowReducer(topologyDescription, servers.filter(knownFilter)); } if (readPreference.mode === ReadPreference.PRIMARY) { @@ -134,25 +170,25 @@ function readPreferenceServerSelector(readPreference) { topologyDescription, tagSetReducer( readPreference, - maxStalenessReducer(readPreference, topologyDescription, servers.filter(secondaryFilter)) + maxStalenessReducer(readPreference, topologyDescription, servers) ) - ); + ).filter(secondaryFilter); } else if (readPreference.mode === ReadPreference.NEAREST) { return latencyWindowReducer( topologyDescription, tagSetReducer( readPreference, - maxStalenessReducer(readPreference, topologyDescription, servers.filter(nearestFilter)) + maxStalenessReducer(readPreference, topologyDescription, servers) ) - ); + ).filter(nearestFilter); } else if (readPreference.mode === ReadPreference.SECONDARY_PREFERRED) { const result = latencyWindowReducer( topologyDescription, tagSetReducer( readPreference, - maxStalenessReducer(readPreference, topologyDescription, servers.filter(secondaryFilter)) + maxStalenessReducer(readPreference, topologyDescription, servers) ) - ); + ).filter(secondaryFilter); return result.length === 0 ? servers.filter(primaryFilter) : result; } else if (readPreference.mode === ReadPreference.PRIMARY_PREFERRED) { @@ -165,9 +201,9 @@ function readPreferenceServerSelector(readPreference) { topologyDescription, tagSetReducer( readPreference, - maxStalenessReducer(readPreference, topologyDescription, servers.filter(secondaryFilter)) + maxStalenessReducer(readPreference, topologyDescription, servers) ) - ); + ).filter(secondaryFilter); } }; } diff --git a/lib/sdam/topology.js b/lib/sdam/topology.js index 8caf4dee6..d77b674f5 100644 --- a/lib/sdam/topology.js +++ b/lib/sdam/topology.js @@ -6,7 +6,6 @@ const TopologyType = require('./topology_description').TopologyType; const monitoring = require('./monitoring'); const calculateDurationInMs = require('../utils').calculateDurationInMs; const MongoTimeoutError = require('../error').MongoTimeoutError; -const MongoError = require('../error').MongoError; // Global state let globalTopologyCounter = 0; @@ -209,12 +208,17 @@ class FakeServer { * @param {*} callback */ function selectServers(topology, selector, timeout, start, callback) { - if (!topology.description.compatible) { - return callback(new MongoError(topology.description.compatibilityError)); + const serverDescriptions = Array.from(topology.description.servers.values()); + let descriptions; + + try { + descriptions = selector + ? selector(topology.description, serverDescriptions) + : serverDescriptions; + } catch (e) { + return callback(e, null); } - const serverDescriptions = Array.from(topology.description.servers.values()); - let descriptions = selector(topology.description, serverDescriptions); if (descriptions.length) { // TODO: obviously return the actual server in the future const servers = descriptions.map(d => new FakeServer(d)); diff --git a/package.json b/package.json index 1b2ffb7a4..d09619612 100644 --- a/package.json +++ b/package.json @@ -30,6 +30,7 @@ "eslint": "^4.6.1", "eslint-plugin-prettier": "^2.2.0", "jsdoc": "3.5.4", + "mongodb-extjson": "^2.1.2", "mongodb-mock-server": "^1.0.0", "mongodb-test-runner": "^1.1.18", "prettier": "^1.6.1",