Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
feat(retryable-writes): initial support on replicasets
Browse files Browse the repository at this point in the history
NODE-1105
  • Loading branch information
mbroadst committed Dec 1, 2017
1 parent cceca99 commit 73ac688
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 13 deletions.
44 changes: 33 additions & 11 deletions lib/topologies/replset.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ var inherits = require('util').inherits,
retrieveBSON = require('../connection/utils').retrieveBSON,
Logger = require('../connection/logger'),
MongoError = require('../error').MongoError,
errors = require('../error'),
Server = require('./server'),
ReplSetState = require('./replset_state'),
clone = require('./shared').clone,
Timeout = require('./shared').Timeout,
Interval = require('./shared').Interval,
createClientInfo = require('./shared').createClientInfo,
SessionMixins = require('./shared').SessionMixins;
SessionMixins = require('./shared').SessionMixins,
isRetryableWritesSupported = require('./shared').isRetryableWritesSupported,
txnNumber = require('./shared').txnNumber;

var MongoCR = require('../auth/mongocr'),
X509 = require('../auth/x509'),
Expand Down Expand Up @@ -1154,20 +1157,39 @@ ReplSet.prototype.getServers = function() {
//
// Execute write operation
var executeWriteOperation = function(self, op, ns, ops, options, callback) {
if (typeof options === 'function') {
(callback = options), (options = {}), (options = options || {});
}

// Ensure we have no options
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

// No server returned we had an error
if (self.s.replicaSetState.primary == null) {
return callback(new MongoError('no primary server found'));
if (!options.retryWrites || !options.session || !isRetryableWritesSupported(self)) {
// No server returned we had an error
if (self.s.replicaSetState.primary == null) {
return callback(new MongoError('no primary server found'));
}

// Execute the command
return self.s.replicaSetState.primary[op](ns, ops, options, callback);
}

// Execute the command
self.s.replicaSetState.primary[op](ns, ops, options, callback);
// increment and assign txnNumber
options.txnNumber = txnNumber(options.session);

self.s.replicaSetState.primary[op](ns, ops, options, (err, result) => {
if (!err) return callback(null, result);
if (err instanceof errors.MongoNetworkError) {
return callback(err);
}

// check again, this might have changed in the interim
if (self.s.replicaSetState.primary == null) {
return callback(new MongoError('no primary server found'));
}

// increment and assign txnNumber
options.txnNumber = txnNumber(options.session);

// rerun the operation
self.s.replicaSetState.primary[op](ns, ops, options, callback);
});
};

/**
Expand Down
39 changes: 37 additions & 2 deletions lib/topologies/shared.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
'use strict';

var os = require('os'),
const os = require('os'),
f = require('util').format,
ReadPreference = require('./read_preference');
ReadPreference = require('./read_preference'),
retrieveBSON = require('../connection/utils').retrieveBSON;

const BSON = retrieveBSON();

/**
* Emit event if it exists
Expand Down Expand Up @@ -402,6 +405,36 @@ const SessionMixins = {
}
};

const RETRYABLE_WIRE_VERSION = 6;

/**
* Determines whether the provided topology supports retryable writes
*
* @param {Mongos|Replset} topology
*/
const isRetryableWritesSupported = function(topology) {
const maxWireVersion = topology.lastIsMaster().maxWireVersion;
if (maxWireVersion < RETRYABLE_WIRE_VERSION) {
return false;
}

if (!topology.logicalSessionTimeoutMinutes) {
return false;
}

return true;
};

/**
* Increment the transaction number on the ServerSession contained by the provided ClientSession
*
* @param {ClientSession} session
*/
const txnNumber = function(session) {
session.serverSession.txnNumber++;
return BSON.Long.fromNumber(session.serverSession.txnNumber);
};

module.exports.SessionMixins = SessionMixins;
module.exports.resolveClusterTime = resolveClusterTime;
module.exports.inquireServerState = inquireServerState;
Expand All @@ -415,3 +448,5 @@ module.exports.clone = clone;
module.exports.diff = diff;
module.exports.Interval = Interval;
module.exports.Timeout = Timeout;
module.exports.isRetryableWritesSupported = isRetryableWritesSupported;
module.exports.txnNumber = txnNumber;
59 changes: 59 additions & 0 deletions test/tests/unit/replset/retryable_writes_tests.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
'use strict';
var expect = require('chai').expect,
ReplSet = require('../../../../lib/topologies/replset'),
mock = require('../../../mock'),
ReplSetFixture = require('../common').ReplSetFixture,
ClientSession = require('../../../../lib/sessions').ClientSession,
ServerSessionPool = require('../../../../lib/sessions').ServerSessionPool;

const test = new ReplSetFixture();
describe('Sessions (ReplSet)', function() {
afterEach(() => mock.cleanup());
beforeEach(() => test.setup({ ismaster: mock.DEFAULT_ISMASTER_36 }));

it('should add `txnNumber` to write commands where `retryWrites` is true', {
metadata: { requires: { topology: ['single'] } },
test: function(done) {
var replset = new ReplSet(
[test.primaryServer.address(), test.firstSecondaryServer.address()],
{
setName: 'rs',
connectionTimeout: 3000,
socketTimeout: 0,
haInterval: 100,
size: 1
}
);

const sessionPool = new ServerSessionPool(replset);
const session = new ClientSession(replset, sessionPool);

let command = null;
test.primaryServer.setMessageHandler(request => {
const doc = request.document;
if (doc.ismaster) {
request.reply(test.primaryStates[0]);
} else if (doc.insert) {
command = doc;
request.reply({ ok: 1 });
}
});

replset.on('all', () => {
replset.insert('test.test', [{ a: 1 }], { retryWrites: true, session: session }, function(
err
) {
expect(err).to.not.exist;
expect(command).to.have.property('txnNumber');
expect(command.txnNumber).to.eql(1);

replset.destroy();
done();
});
});

replset.on('error', done);
replset.connect();
}
});
});

0 comments on commit 73ac688

Please sign in to comment.