diff --git a/lib/cursor.js b/lib/cursor.js index 3b7a97a18..21eedd446 100644 --- a/lib/cursor.js +++ b/lib/cursor.js @@ -606,7 +606,16 @@ function initializeCursor(cursor, callback) { } } - return cursor.topology.selectServer(cursor.options, (err, server) => { + // Very explicitly choose what is passed to selectServer + const serverSelectOptions = {}; + if (cursor.cursorState.session) { + serverSelectOptions.session = cursor.cursorState.session; + } + if (cursor.options.readPreference) { + serverSelectOptions.readPreference = cursor.options.readPreference; + } + + return cursor.topology.selectServer(serverSelectOptions, (err, server) => { if (err) { const disconnectHandler = cursor.disconnectHandler; if (disconnectHandler != null) { diff --git a/lib/topologies/mongos.js b/lib/topologies/mongos.js index 3c1a5b979..c050ac2d2 100644 --- a/lib/topologies/mongos.js +++ b/lib/topologies/mongos.js @@ -464,7 +464,14 @@ function connectProxies(self, servers) { } } -function pickProxy(self) { +function pickProxy(self, session) { + // TODO: Destructure :) + const transaction = session && session.transaction; + + if (transaction && transaction.server) { + return transaction.server; + } + // Get the currently connected Proxies var connectedProxies = self.connectedProxies.slice(0); @@ -488,15 +495,22 @@ function pickProxy(self) { } }); + let proxy; + // We have no connectedProxies pick first of the connected ones if (connectedProxies.length === 0) { - return self.connectedProxies[0]; + proxy = self.connectedProxies[0]; + } else { + // Get proxy + proxy = connectedProxies[self.index % connectedProxies.length]; + // Update the index + self.index = (self.index + 1) % connectedProxies.length; + } + + if (transaction) { + transaction.pinServer(proxy); } - // Get proxy - var proxy = connectedProxies[self.index % connectedProxies.length]; - // Update the index - self.index = (self.index + 1) % connectedProxies.length; // Return the proxy return proxy; } @@ -846,7 +860,7 @@ var executeWriteOperation = function(self, op, ns, ops, options, callback) { options = options || {}; // Pick a server - let server = pickProxy(self); + let server = pickProxy(self, options.session); // No server found error out if (!server) return callback(new MongoError('no mongos proxy available')); @@ -866,7 +880,7 @@ var executeWriteOperation = function(self, op, ns, ops, options, callback) { } // Pick another server - server = pickProxy(self); + server = pickProxy(self, options.session); // No server found error out with original error if (!server || !isRetryableWritesSupported(server)) { @@ -1007,7 +1021,7 @@ Mongos.prototype.command = function(ns, cmd, options, callback) { var self = this; // Pick a proxy - var server = pickProxy(self); + var server = pickProxy(self, options.session); // Topology is not connected, save the call in the provided store to be // Executed at some point when the handler deems it's reconnected @@ -1087,7 +1101,8 @@ Mongos.prototype.cursor = function(ns, cmd, options) { * * @method * @param {function} selector Unused - * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it + * @param {ReadPreference} [options.readPreference] Unused + * @param {ClientSession} [options.session] Specify a session if it is being used * @param {function} callback */ Mongos.prototype.selectServer = function(selector, options, callback) { @@ -1097,7 +1112,7 @@ Mongos.prototype.selectServer = function(selector, options, callback) { (callback = options), (options = selector), (selector = undefined); options = options || {}; - const server = pickProxy(this); + const server = pickProxy(this, options.session); if (this.s.debug) this.emit('pickedServer', null, server); callback(null, server); }; diff --git a/lib/topologies/replset.js b/lib/topologies/replset.js index fbac822f2..42ed55588 100644 --- a/lib/topologies/replset.js +++ b/lib/topologies/replset.js @@ -1074,6 +1074,7 @@ ReplSet.prototype.isDestroyed = function() { * @method * @param {function} selector Unused * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it + * @param {ClientSession} [options.session] Unused * @param {function} callback */ ReplSet.prototype.selectServer = function(selector, options, callback) { diff --git a/lib/topologies/server.js b/lib/topologies/server.js index 88ef0fc44..92a5509f9 100644 --- a/lib/topologies/server.js +++ b/lib/topologies/server.js @@ -778,6 +778,10 @@ Server.prototype.connections = function() { /** * Selects a server + * @method + * @param {function} selector Unused + * @param {ReadPreference} [options.readPreference] Unused + * @param {ClientSession} [options.session] Unused * @return {Server} */ Server.prototype.selectServer = function(selector, options, callback) {