Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
feat(retryableWrites): adding more support for retries
Browse files Browse the repository at this point in the history
- Adds support for retryable commands that route through the topology `.commands` function instead of a designated function like `.insert`. This adds coverage for `findAndModify` commands.

- Adds support for retrying on more types of errors, and retrying on writeConcern errors

Fixes NODE-1456
  • Loading branch information
daprahamian authored May 31, 2018
1 parent d71b21c commit d4c1597
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 47 deletions.
22 changes: 13 additions & 9 deletions lib/connection/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -575,15 +575,19 @@ function messageHandler(self) {
}

// Establish if we have an error
if (
workItem.command &&
message.documents[0] &&
(message.documents[0].ok === 0 ||
message.documents[0]['$err'] ||
message.documents[0]['errmsg'] ||
message.documents[0]['code'])
) {
return handleOperationCallback(self, workItem.cb, new MongoError(message.documents[0]));
if (workItem.command && message.documents[0]) {
const responseDoc = message.documents[0];
if (responseDoc.ok === 0 || responseDoc.$err || responseDoc.errmsg || responseDoc.code) {
return handleOperationCallback(self, workItem.cb, new MongoError(responseDoc));
}

if (responseDoc.writeConcernError) {
return handleOperationCallback(
self,
workItem.cb,
new MongoError(responseDoc.writeConcernError)
);
}
}

// Add the connection details
Expand Down
32 changes: 31 additions & 1 deletion lib/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,40 @@ const MongoTimeoutError = function(message) {
};
util.inherits(MongoTimeoutError, MongoError);

// see: https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.rst#terms
const RETRYABLE_ERROR_CODES = new Set([
6, // HostUnreachable
7, // HostNotFound
64, // WriteConcernFailed
89, // NetworkTimeout
91, // ShutdownInProgress
189, // PrimarySteppedDown
9001, // SocketException
11600, // InterruptedAtShutdown
11602, // InterruptedDueToReplStateChange
10107, // NotMaster
13435, // NotMasterNoSlaveOk
13436 // NotMasterOrSecondary
]);

function isRetryableError(error) {
if (
RETRYABLE_ERROR_CODES.has(error.code) ||
error instanceof MongoNetworkError ||
error.message.match(/not master/) ||
error.message.match(/node is recovering/)
) {
return true;
}

return false;
}

module.exports = {
MongoError,
MongoNetworkError,
MongoParseError,
MongoTimeoutError,
mongoErrorContextSymbol
mongoErrorContextSymbol,
isRetryableError
};
31 changes: 1 addition & 30 deletions lib/sessions.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const BSON = retrieveBSON();
const Binary = BSON.Binary;
const uuidV4 = require('./utils').uuidV4;
const MongoError = require('./error').MongoError;
const MongoNetworkError = require('./error').MongoNetworkError;
const isRetryableError = require('././error').isRetryableError;

function assertAlive(session, callback) {
if (session.serverSession == null) {
Expand Down Expand Up @@ -219,35 +219,6 @@ class ClientSession extends EventEmitter {
}
}

// see: https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.rst#terms
const RETRYABLE_ERROR_CODES = new Set([
6, // HostUnreachable
7, // HostNotFound
64, // WriteConcernFailed
89, // NetworkTimeout
91, // ShutdownInProgress
189, // PrimarySteppedDown
9001, // SocketException
11600, // InterruptedAtShutdown
11602, // InterruptedDueToReplStateChange
10107, // NotMaster
13435, // NotMasterNoSlaveOk
13436 // NotMasterOrSecondary
]);

function isRetryableError(error) {
if (
RETRYABLE_ERROR_CODES.has(error.code) ||
error instanceof MongoNetworkError ||
error.message.match(/not master/) ||
error.message.match(/node is recovering/)
) {
return true;
}

return false;
}

function resetTransactionState(clientSession) {
clientSession.transactionOptions = null;
}
Expand Down
40 changes: 37 additions & 3 deletions lib/topologies/mongos.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ const BasicCursor = require('../cursor');
const Logger = require('../connection/logger');
const retrieveBSON = require('../connection/utils').retrieveBSON;
const MongoError = require('../error').MongoError;
const errors = require('../error');
const Server = require('./server');
const clone = require('./shared').clone;
const diff = require('./shared').diff;
Expand All @@ -16,6 +15,7 @@ const createClientInfo = require('./shared').createClientInfo;
const SessionMixins = require('./shared').SessionMixins;
const isRetryableWritesSupported = require('./shared').isRetryableWritesSupported;
const relayEvents = require('./shared').relayEvents;
const isRetryableError = require('../error').isRetryableError;
const BSON = retrieveBSON();

/**
Expand Down Expand Up @@ -900,7 +900,7 @@ var executeWriteOperation = function(self, op, ns, ops, options, callback) {

server[op](ns, ops, options, (err, result) => {
if (!err) return callback(null, result);
if (!(err instanceof errors.MongoNetworkError) && !err.message.match(/not master/)) {
if (!isRetryableError(err)) {
return callback(err);
}

Expand Down Expand Up @@ -1019,6 +1019,12 @@ Mongos.prototype.remove = function(ns, ops, options, callback) {
executeWriteOperation(this, 'remove', ns, ops, options, callback);
};

const RETRYABLE_WRITE_OPERATIONS = ['findAndModify', 'insert', 'update', 'delete'];

function isWriteCommand(command) {
return RETRYABLE_WRITE_OPERATIONS.some(op => command[op]);
}

/**
* Execute a command
* @method
Expand Down Expand Up @@ -1057,8 +1063,36 @@ Mongos.prototype.command = function(ns, cmd, options, callback) {
var clonedOptions = cloneOptions(options);
clonedOptions.topology = self;

const willRetryWrite =
!options.retrying &&
options.retryWrites &&
options.session &&
isRetryableWritesSupported(self) &&
!options.session.inTransaction() &&
isWriteCommand(cmd);

const cb = (err, result) => {
if (!err) return callback(null, result);
if (!isRetryableError(err)) {
return callback(err);
}

if (willRetryWrite) {
const newOptions = Object.assign({}, clonedOptions, { retrying: true });
return this.command(ns, cmd, newOptions, callback);
}

return callback(err);
};

// increment and assign txnNumber
if (willRetryWrite) {
options.session.incrementTransactionNumber();
options.willRetryWrite = willRetryWrite;
}

// Execute the command
server.command(ns, cmd, clonedOptions, callback);
server.command(ns, cmd, clonedOptions, cb);
};

/**
Expand Down
45 changes: 42 additions & 3 deletions lib/topologies/replset.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ const BasicCursor = require('../cursor');
const retrieveBSON = require('../connection/utils').retrieveBSON;
const Logger = require('../connection/logger');
const MongoError = require('../error').MongoError;
const errors = require('../error');
const Server = require('./server');
const ReplSetState = require('./replset_state');
const clone = require('./shared').clone;
Expand All @@ -18,6 +17,7 @@ const createClientInfo = require('./shared').createClientInfo;
const SessionMixins = require('./shared').SessionMixins;
const isRetryableWritesSupported = require('./shared').isRetryableWritesSupported;
const relayEvents = require('./shared').relayEvents;
const isRetryableError = require('../error').isRetryableError;

const defaultAuthProviders = require('../auth/defaultAuthProviders').defaultAuthProviders;

Expand Down Expand Up @@ -1206,7 +1206,7 @@ function executeWriteOperation(args, options, callback) {

const handler = (err, result) => {
if (!err) return callback(null, result);
if (!(err instanceof errors.MongoNetworkError) && !err.message.match(/not master/)) {
if (!isRetryableError(err)) {
return callback(err);
}

Expand Down Expand Up @@ -1298,6 +1298,12 @@ ReplSet.prototype.remove = function(ns, ops, options, callback) {
executeWriteOperation({ self: this, op: 'remove', ns, ops }, options, callback);
};

const RETRYABLE_WRITE_OPERATIONS = ['findAndModify', 'insert', 'update', 'delete'];

function isWriteCommand(command) {
return RETRYABLE_WRITE_OPERATIONS.some(op => command[op]);
}

/**
* Execute a command
* @method
Expand Down Expand Up @@ -1370,8 +1376,41 @@ ReplSet.prototype.command = function(ns, cmd, options, callback) {
);
}

const willRetryWrite =
!options.retrying &&
options.retryWrites &&
options.session &&
isRetryableWritesSupported(self) &&
!options.session.inTransaction() &&
isWriteCommand(cmd);

const cb = (err, result) => {
if (!err) return callback(null, result);
if (!isRetryableError(err)) {
return callback(err);
}

if (willRetryWrite) {
const newOptions = Object.assign({}, options, { retrying: true });
return this.command(ns, cmd, newOptions, callback);
}

// Per SDAM, remove primary from replicaset
if (this.s.replicaSetState.primary) {
this.s.replicaSetState.remove(this.s.replicaSetState.primary, { force: true });
}

return callback(err);
};

// increment and assign txnNumber
if (willRetryWrite) {
options.session.incrementTransactionNumber();
options.willRetryWrite = willRetryWrite;
}

// Execute the command
server.command(ns, cmd, options, callback);
server.command(ns, cmd, options, cb);
};

/**
Expand Down
2 changes: 1 addition & 1 deletion lib/wireprotocol/3_2_support.js
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ WireProtocol.prototype.command = function(bson, ns, cmd, cursorState, topology,
}

// optionally decorate query with transaction data
decorateWithTransactionsData(query.query, options.session);
decorateWithTransactionsData(query.query, options.session, options.willRetryWrite);

// We need to increment the statement id if we're in a transaction
if (options.session && options.session.inTransaction()) {
Expand Down

0 comments on commit d4c1597

Please sign in to comment.