Skip to content

Commit

Permalink
fix: multiple concurrent attempts to process the queue may fail
Browse files Browse the repository at this point in the history
In certain failover events, multiple entities may signal that the
server selection queue should be processed. This is problematic
because the queue is processed serially per invocation, even though
the queue may grow during processing. Likely a more robust solution
is required in the future, but for now we can prevent issues by
ensuring the loop breaks if no more wait queue members are present.

NODE-2472
  • Loading branch information
mbroadst committed Mar 11, 2020
1 parent cde11ec commit f69f51c
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 5 deletions.
6 changes: 4 additions & 2 deletions lib/core/sdam/topology.js
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,8 @@ class Topology extends EventEmitter {
let readPreference;
if (selector instanceof ReadPreference) {
readPreference = selector;
} else if (typeof selector === 'string') {
readPreference = new ReadPreference(selector);
} else {
translateReadPreference(options);
readPreference = options.readPreference || ReadPreference.primary;
Expand Down Expand Up @@ -997,10 +999,9 @@ function processWaitQueue(topology) {
return;
}

const isSharded = topology.description.type === TopologyType.Sharded;
const serverDescriptions = Array.from(topology.description.servers.values());
const membersToProcess = topology[kWaitQueue].length;
for (let i = 0; i < membersToProcess; ++i) {
for (let i = 0; i < membersToProcess && topology[kWaitQueue].length; ++i) {
const waitQueueMember = topology[kWaitQueue].shift();
if (waitQueueMember[kCancelled]) {
continue;
Expand All @@ -1026,6 +1027,7 @@ function processWaitQueue(topology) {
const selectedServerDescription = randomSelection(selectedDescriptions);
const selectedServer = topology.s.servers.get(selectedServerDescription.address);
const transaction = waitQueueMember.transaction;
const isSharded = topology.description.type === TopologyType.Sharded;
if (isSharded && transaction && transaction.isActive) {
transaction.pinServer(selectedServer);
}
Expand Down
4 changes: 1 addition & 3 deletions test/unit/cmap/connection.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ const expect = require('chai').expect;
describe('Connection', function() {
let server;
after(() => mock.cleanup());
before(() =>
mock.createServer().then(s => (server = s))
);
before(() => mock.createServer().then(s => (server = s)));

it('should support fire-and-forget messages', function(done) {
server.setMessageHandler(request => {
Expand Down
65 changes: 65 additions & 0 deletions test/unit/sdam/topology.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,71 @@ const expect = require('chai').expect;
const sinon = require('sinon');

describe('Topology (unit)', function() {
it('should successfully process multiple queue processing requests', function(done) {
const singleNodeIsMaster = Object.assign({}, mock.DEFAULT_ISMASTER, {
maxWireVersion: 9,
ismaster: true,
secondary: false,
setName: 'rs',
me: 'a:27017',
hosts: ['a:27017'],
logicalSessionTimeoutMinutes: 10
});

const topology = new Topology('a:27017', { replicaSet: 'rs' });
this.sinon.stub(Server.prototype, 'connect').callsFake(function() {
this.s.state = 'connected';
this.emit('connect');
setTimeout(
() =>
this.emit('descriptionReceived', new ServerDescription(this.name, singleNodeIsMaster)),
100
);
});

function simulatedRetryableReadOperation(topology, callback) {
topology.selectServer('primary', err => {
expect(err).to.not.exist;

topology.selectServer('primary', err => {
expect(err).to.not.exist;

callback();
});
});
}

topology.connect(err => {
expect(err).to.not.exist;
this.defer(() => topology.close());

let selected = 0;
const completionHandler = err => {
expect(err).to.not.exist;

selected++;
if (selected === 3) done();
};

// explicitly prevent server selection by reverting to `Unknown`
const server = topology.s.servers.get('a:27017');
server.emit('descriptionReceived', new ServerDescription(server.name, null));
process.nextTick(() => {
simulatedRetryableReadOperation(topology, completionHandler);
simulatedRetryableReadOperation(topology, completionHandler);

setTimeout(() => {
server.emit(
'descriptionReceived',
new ServerDescription(server.name, singleNodeIsMaster)
);

simulatedRetryableReadOperation(topology, completionHandler);
}, 250);
});
});
});

describe('shouldCheckForSessionSupport', function() {
beforeEach(function() {
this.sinon = sinon.sandbox.create();
Expand Down

0 comments on commit f69f51c

Please sign in to comment.