Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
fix: make mongos write commands work the same as replset write commands
Browse files Browse the repository at this point in the history
  • Loading branch information
daprahamian authored and mbroadst committed Feb 27, 2019
1 parent 189e428 commit 31b984f
Showing 1 changed file with 35 additions and 19 deletions.
54 changes: 35 additions & 19 deletions lib/topologies/mongos.js
Original file line number Diff line number Diff line change
Expand Up @@ -854,26 +854,29 @@ Mongos.prototype.isDestroyed = function() {
// Operations
//

// Execute write operation
var executeWriteOperation = function(self, op, ns, ops, options, callback) {
function executeWriteOperation(args, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

// TODO: once we drop Node 4, use destructuring either here or in arguments.
const self = args.self;
const op = args.op;
const ns = args.ns;
const ops = args.ops;

// Pick a server
let server = pickProxy(self, options.session);
// No server found error out
if (!server) return callback(new MongoError('no mongos proxy available'));

if (!options.retryWrites || !options.session || !isRetryableWritesSupported(self)) {
// Execute the command
return server[op](ns, ops, options, callback);
}

// increment and assign txnNumber
options.willRetryWrite = true;
options.session.incrementTransactionNumber();
const willRetryWrite =
!args.retrying &&
!!options.retryWrites &&
options.session &&
isRetryableWritesSupported(self) &&
!options.session.inTransaction();

server[op](ns, ops, options, (err, result) => {
const handler = (err, result) => {
if (!err) return callback(null, result);
if (!isRetryableError(err)) {
return callback(err);
Expand All @@ -883,14 +886,27 @@ var executeWriteOperation = function(self, op, ns, ops, options, callback) {
server = pickProxy(self, options.session);

// No server found error out with original error
if (!server || !isRetryableWritesSupported(server)) {
if (!server || !willRetryWrite) {
return callback(err);
}

// rerun the operation
server[op](ns, ops, options, callback);
});
};
const newArgs = Object.assign({}, args, { retrying: true });
return executeWriteOperation(newArgs, options, callback);
};

if (callback.operationId) {
handler.operationId = callback.operationId;
}

// increment and assign txnNumber
if (willRetryWrite) {
options.session.incrementTransactionNumber();
options.willRetryWrite = willRetryWrite;
}

// rerun the operation
server[op](ns, ops, options, handler);
}

/**
* Insert one or more documents
Expand Down Expand Up @@ -923,7 +939,7 @@ Mongos.prototype.insert = function(ns, ops, options, callback) {
}

// Execute write operation
executeWriteOperation(this, 'insert', ns, ops, options, callback);
executeWriteOperation({ self: this, op: 'insert', ns, ops }, options, callback);
};

/**
Expand Down Expand Up @@ -957,7 +973,7 @@ Mongos.prototype.update = function(ns, ops, options, callback) {
}

// Execute write operation
executeWriteOperation(this, 'update', ns, ops, options, callback);
executeWriteOperation({ self: this, op: 'update', ns, ops }, options, callback);
};

/**
Expand Down Expand Up @@ -991,7 +1007,7 @@ Mongos.prototype.remove = function(ns, ops, options, callback) {
}

// Execute write operation
executeWriteOperation(this, 'remove', ns, ops, options, callback);
executeWriteOperation({ self: this, op: 'remove', ns, ops }, options, callback);
};

const RETRYABLE_WRITE_OPERATIONS = ['findAndModify', 'insert', 'update', 'delete'];
Expand Down

0 comments on commit 31b984f

Please sign in to comment.