Skip to content

Commit

Permalink
fix(BulkOp): run unordered bulk ops in serial
Browse files Browse the repository at this point in the history
UnorderedBulkWrites that use either retryableWrites or transactions
and require more than one batch have a chance to fail if the batches
arrive at the server out of order. We will now conform to other
drivers by executing batches serially.

Fixes NODE-1934
  • Loading branch information
daprahamian committed Apr 15, 2019
1 parent f35eecf commit f548bd7
Showing 1 changed file with 24 additions and 49 deletions.
73 changes: 24 additions & 49 deletions lib/bulk/unordered.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,25 @@ class UnorderedBulkOperation extends BulkOperationBase {
options = ret.options;
callback = ret.callback;

return executeOperation(this.s.topology, executeBatches, [this, options, callback]);
return executeOperation(this.s.topology, executeCommands, [this, options, callback]);
}
}

/**
* Execute the command
* Execute next write command in a chain
*
* @param {UnorderedBulkOperation} bulkOperation
* @param {object} batch
* @param {OrderedBulkOperation} bulkOperation
* @param {object} options
* @param {function} callback
*/
function executeBatch(bulkOperation, batch, options, callback) {
function executeCommands(bulkOperation, options, callback) {
if (bulkOperation.s.batches.length === 0) {
return handleCallback(callback, null, new BulkWriteResult(bulkOperation.s.bulkResult));
}

// Ordered execution of the command
const batch = bulkOperation.s.batches.shift();

function resultHandler(err, result) {
// Error is a driver related error not a bulk op error, terminate
if (((err && err.driver) || (err && err.message)) && !(err instanceof MongoWriteConcernError)) {
Expand All @@ -161,54 +167,23 @@ function executeBatch(bulkOperation, batch, options, callback) {
// If we have and error
if (err) err.ok = 0;
if (err instanceof MongoWriteConcernError) {
return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, false, err, callback);
return handleMongoWriteConcernError(batch, bulkOperation.s.bulkResult, true, err, callback);
}
handleCallback(
callback,
null,
mergeBatchResults(false, batch, bulkOperation.s.bulkResult, err, result)
);
}

bulkOperation.finalOptionsHandler({ options, batch, resultHandler }, callback);
}
// Merge the results together
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult);
const mergeResult = mergeBatchResults(true, batch, bulkOperation.s.bulkResult, err, result);
if (mergeResult != null) {
return handleCallback(callback, null, writeResult);
}

/**
* Execute all the commands
*
* @param {UnorderedBulkOperation} bulkOperation
* @param {object} options
* @param {function} callback
*/
function executeBatches(bulkOperation, options, callback) {
let numberOfCommandsToExecute = bulkOperation.s.batches.length;
let hasErrored = false;
// Execute over all the batches
for (let i = 0; i < bulkOperation.s.batches.length; i++) {
executeBatch(bulkOperation, bulkOperation.s.batches[i], options, function(err) {
if (hasErrored) {
return;
}

if (err) {
hasErrored = true;
return handleCallback(callback, err);
}
// Count down the number of commands left to execute
numberOfCommandsToExecute = numberOfCommandsToExecute - 1;

// Execute
if (numberOfCommandsToExecute === 0) {
// Driver level error
if (err) return handleCallback(callback, err);

const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult);
if (bulkOperation.handleWriteError(callback, writeResult)) return;

return handleCallback(callback, null, writeResult);
}
});
if (bulkOperation.handleWriteError(callback, writeResult)) return;

// Execute the next command in line
executeCommands(bulkOperation, options, callback);
}

bulkOperation.finalOptionsHandler({ options, batch, resultHandler }, callback);
}

/**
Expand Down

0 comments on commit f548bd7

Please sign in to comment.