Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(read-preference): unify means of read preference resolution #1738

Merged
merged 2 commits into from
Jun 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions lib/collection.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const ordered = require('./bulk/ordered');
const ChangeStream = require('./change_stream');
const executeOperation = require('./utils').executeOperation;
const applyWriteConcern = require('./utils').applyWriteConcern;
const getReadPreference = require('./utils').getReadPreference;
const resolveReadPreference = require('./utils').resolveReadPreference;

// Operations
const bulkWrite = require('./operations/collection_ops').bulkWrite;
Expand Down Expand Up @@ -340,7 +340,10 @@ Collection.prototype.find = function(query, options, callback) {
newOptions.slaveOk = options.slaveOk != null ? options.slaveOk : this.s.db.slaveOk;

// Add read preference if needed
newOptions = getReadPreference(this, newOptions, this.s.db);
newOptions.readPreference = resolveReadPreference(newOptions, {
db: this.s.db,
collection: this
});

// Set slave ok to true if read preference different from primary
if (
Expand Down Expand Up @@ -1223,7 +1226,7 @@ Collection.prototype.listIndexes = function(options) {
// Clone the options
options = Object.assign({}, options);
// Determine the read preference in the options.
options = getReadPreference(this, options, this.s.db, this);
options.readPreference = resolveReadPreference(options, { db: this.s.db, collection: this });
// Set the CommandCursor constructor
options.cursorFactory = CommandCursor;
// Set the promiseLibrary
Expand Down Expand Up @@ -1738,7 +1741,7 @@ Collection.prototype.aggregate = function(pipeline, options, callback) {

options = Object.assign({}, options);
// Ensure we have the right read preference inheritance
options = getReadPreference(this, options, this.s.db, this);
options.readPreference = resolveReadPreference(options, { db: this.s.db, collection: this });

// If explain has been specified add it
if (options.explain) {
Expand Down Expand Up @@ -1836,7 +1839,7 @@ Collection.prototype.parallelCollectionScan = function(options, callback) {

options = Object.assign({}, options);
// Ensure we have the right read preference inheritance
options = getReadPreference(this, options, this.s.db, this);
options.readPreference = resolveReadPreference(options, { db: this.s.db, collection: this });

// Add a promiseLibrary
options.promiseLibrary = this.s.promiseLibrary;
Expand Down
17 changes: 6 additions & 11 deletions lib/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const Collection = require('./collection');
const mergeOptionsAndWriteConcern = require('./utils').mergeOptionsAndWriteConcern;
const executeOperation = require('./utils').executeOperation;
const applyWriteConcern = require('./utils').applyWriteConcern;
const convertReadPreference = require('./utils').convertReadPreference;
const resolveReadPreference = require('./utils').resolveReadPreference;
const ChangeStream = require('./change_stream');

// Operations
Expand Down Expand Up @@ -477,11 +477,10 @@ Db.prototype.listCollections = function(filter, options) {
options.promiseLibrary = this.s.promiseLibrary;

// Ensure valid readPreference
if (options.readPreference) {
options.readPreference = convertReadPreference(options.readPreference);
} else {
options.readPreference = this.s.readPreference || ReadPreference.primary;
}
options.readPreference = resolveReadPreference(options, {
db: this,
default: ReadPreference.primary
});

// Cursor options
let cursor = options.batchSize ? { batchSize: options.batchSize } : {};
Expand Down Expand Up @@ -673,11 +672,7 @@ Db.prototype.collections = function(options, callback) {
Db.prototype.executeDbAdminCommand = function(selector, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

// Convert read preference
if (options.readPreference) {
options.readPreference = convertReadPreference(options.readPreference);
}
options.readPreference = resolveReadPreference(options);

return executeOperation(this.s.topology, executeDbAdminCommand, [
this,
Expand Down
14 changes: 7 additions & 7 deletions lib/operations/collection_ops.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const evaluate = require('./db_ops').evaluate;
const executeCommand = require('./db_ops').executeCommand;
const executeDbAdminCommand = require('./db_ops').executeDbAdminCommand;
const formattedOrderClause = require('../utils').formattedOrderClause;
const getReadPreference = require('../utils').getReadPreference;
const resolveReadPreference = require('../utils').resolveReadPreference;
const handleCallback = require('../utils').handleCallback;
const indexInformationDb = require('./db_ops').indexInformation;
const isObject = require('../utils').isObject;
Expand Down Expand Up @@ -195,7 +195,7 @@ function count(coll, query, options, callback) {
if (hint) cmd.hint = hint;

// Ensure we have the right read preference inheritance
options = getReadPreference(coll, options, coll.s.db);
options.readPreference = resolveReadPreference(options, { db: coll.s.db, collection: coll });

// Do we have a readConcern specified
decorateWithReadConcern(cmd, coll, options);
Expand Down Expand Up @@ -359,7 +359,7 @@ function distinct(coll, key, query, options, callback) {

options = Object.assign({}, options);
// Ensure we have the right read preference inheritance
options = getReadPreference(coll, options, coll.s.db, coll);
options.readPreference = resolveReadPreference(options, { db: coll.s.db, collection: coll });

// Add maxTimeMS if defined
if (typeof maxTimeMS === 'number') cmd.maxTimeMS = maxTimeMS;
Expand Down Expand Up @@ -634,7 +634,7 @@ function geoHaystackSearch(coll, x, y, options, callback) {

options = Object.assign({}, options);
// Ensure we have the right read preference inheritance
options = getReadPreference(coll, options, coll.s.db, coll);
options.readPreference = resolveReadPreference(options, { db: coll.s.db, collection: coll });

// Do we have a readConcern specified
decorateWithReadConcern(commandObject, coll, options);
Expand Down Expand Up @@ -694,7 +694,7 @@ function group(coll, keys, condition, initial, reduce, finalize, command, option

options = Object.assign({}, options);
// Ensure we have the right read preference inheritance
options = getReadPreference(coll, options, coll.s.db, coll);
options.readPreference = resolveReadPreference(options, { db: coll.s.db, collection: coll });

// Do we have a readConcern specified
decorateWithReadConcern(selector, coll, options);
Expand Down Expand Up @@ -891,7 +891,7 @@ function mapReduce(coll, map, reduce, options, callback) {
options = Object.assign({}, options);

// Ensure we have the right read preference inheritance
options = getReadPreference(coll, options, coll.s.db, coll);
options.readPreference = resolveReadPreference(options, { db: coll.s.db, collection: coll });

// If we have a read preference and inline is not set as output fail hard
if (
Expand Down Expand Up @@ -1280,7 +1280,7 @@ function stats(coll, options, callback) {

options = Object.assign({}, options);
// Ensure we have the right read preference inheritance
options = getReadPreference(coll, options, coll.s.db, coll);
options.readPreference = resolveReadPreference(options, { db: coll.s.db, collection: coll });

// Execute the command
executeCommand(coll.s.db, commandObject, options, callback);
Expand Down
8 changes: 2 additions & 6 deletions lib/operations/db_ops.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const applyWriteConcern = require('../utils').applyWriteConcern;
const Code = require('mongodb-core').BSON.Code;
const convertReadPreference = require('../utils').convertReadPreference;
const resolveReadPreference = require('../utils').resolveReadPreference;
const crypto = require('crypto');
const Db = require('../db');
const debugOptions = require('../utils').debugOptions;
Expand Down Expand Up @@ -480,11 +480,7 @@ function executeCommand(db, command, options, callback) {
}

// Convert the readPreference if its not a write
if (options.readPreference) {
options.readPreference = convertReadPreference(options.readPreference);
} else {
options.readPreference = ReadPreference.primary;
}
options.readPreference = resolveReadPreference(options, { default: ReadPreference.primary });

// Debug information
if (db.s.logger.isDebug())
Expand Down
90 changes: 43 additions & 47 deletions lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -496,60 +496,57 @@ function applyWriteConcern(target, sources, options) {
}

/**
* Ensures provided read preference is properly converted into an object
* @param {(ReadPreference|string|object)} readPreference the user provided read preference
* @return {ReadPreference}
* Resolves a read preference based on well-defined inheritance rules. This method will not only
* determine the read preference (if there is one), but will also ensure the returned value is a
* properly constructed instance of `ReadPreference`.
*
* @param {Object} options The options passed into the method, potentially containing a read preference
* @param {Object} sources Sources from which we can inherit a read preference
* @returns {(ReadPreference|null)} The resolved read preference
*/
function convertReadPreference(readPreference) {
if (readPreference) {
if (typeof readPreference === 'string') {
return new ReadPreference(readPreference);
} else if (
readPreference &&
!(readPreference instanceof ReadPreference) &&
typeof readPreference === 'object'
) {
const mode = readPreference.mode || readPreference.preference;
if (mode && typeof mode === 'string') {
return new ReadPreference(mode, readPreference.tags, {
maxStalenessSeconds: readPreference.maxStalenessSeconds
});
}
} else if (!(readPreference instanceof ReadPreference)) {
throw new TypeError('Invalid read preference: ' + readPreference);
}
}
function resolveReadPreference(options, sources) {
options = options || {};
sources = sources || {};

return readPreference;
}
const db = sources.db;
const coll = sources.collection;
const defaultReadPreference = sources.default;

// Figure out the read preference
function getReadPreference(coll, options, db) {
let r = null;
let readPreference;
if (options.readPreference) {
r = options.readPreference;
} else if (coll.s.readPreference) {
r = coll.s.readPreference;
} else if (db.s.readPreference) {
r = db.s.readPreference;
} else {
return options;
}

if (typeof r === 'string') {
options.readPreference = new ReadPreference(r);
} else if (r && !(r instanceof ReadPreference) && typeof r === 'object') {
const mode = r.mode || r.preference;
readPreference = options.readPreference;
} else if (coll && coll.s.readPreference) {
readPreference = coll.s.readPreference;
} else if (db && db.s.readPreference) {
readPreference = db.s.readPreference;
} else if (defaultReadPreference) {
readPreference = defaultReadPreference;
}

// do we even have a read preference?
if (readPreference == null) {
return null;
}

// now attempt to convert the read preference if necessary
if (typeof readPreference === 'string') {
readPreference = new ReadPreference(readPreference);
} else if (
readPreference &&
!(readPreference instanceof ReadPreference) &&
typeof readPreference === 'object'
) {
const mode = readPreference.mode || readPreference.preference;
if (mode && typeof mode === 'string') {
options.readPreference = new ReadPreference(mode, r.tags, {
maxStalenessSeconds: r.maxStalenessSeconds
readPreference = new ReadPreference(mode, readPreference.tags, {
maxStalenessSeconds: readPreference.maxStalenessSeconds
});
}
} else if (!(r instanceof ReadPreference)) {
throw new TypeError('Invalid read preference: ' + r);
} else if (!(readPreference instanceof ReadPreference)) {
throw new TypeError('Invalid read preference: ' + readPreference);
}

return options;
return readPreference;
}

/**
Expand Down Expand Up @@ -625,9 +622,8 @@ module.exports = {
translateReadPreference,
executeOperation,
applyWriteConcern,
convertReadPreference,
resolveReadPreference,
isPromiseLike,
getReadPreference,
decorateWithCollation,
decorateWithReadConcern
};