Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
feat(with-transaction): provide helper for convenient txn api
Browse files Browse the repository at this point in the history
This introduces a helper (`withTransaction`) that provides a
covenient api for automatic retryability around committing
transactions.

NODE-1741
  • Loading branch information
mbroadst committed Feb 4, 2019
1 parent 32a5e74 commit 478d1e7
Showing 1 changed file with 103 additions and 1 deletion.
104 changes: 103 additions & 1 deletion lib/sessions.js
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,106 @@ class ClientSession extends EventEmitter {
toBSON() {
throw new Error('ClientSession cannot be serialized to BSON.');
}

/**
* A user provided function to be run within a transaction
*
* @callback WithTransactionCallback
* @param {ClientSession} session The parent session of the transaction running the operation. This should be passed into each operation within the lambda.
* @returns {Promise} The resulting Promise of operations run within this transaction
*/

/**
* Runs a provided lambda within a transaction, retrying either the commit operation
* or entire transaction as needed (and when the error permits) to better ensure that
* the transaction can complete successfully.
*
* IMPORTANT: This method requires the user to return a Promise, all lambdas that do not
* return a Promise will result in undefined behavior.
*
* @param {WithTransactionCallback} fn
* @param {TransactionOptions} [options] Optional settings for the transaction
*/
withTransaction(fn, options) {
const self = this;
const startTime = Date.now();

function retryCommit() {
return self.commitTransaction().catch(err => {
if (
!isWriteConcernTimeoutError(err) &&
(err instanceof MongoError && err.hasErrorLabel('UnknownTransactionCommitResult')) &&
Date.now() - startTime < 120000
) {
return retryCommit();
}

if (
err instanceof MongoError &&
err.hasErrorLabel('TransientTransactionError') &&
Date.now() - startTime < 120000
) {
return retryTransaction();
}

throw err;
});
}

function retryTransaction() {
self.startTransaction(options);

// TODO: should we support callbacks?
return fn(self)
.then(() => {
if (
[
TxnState.NO_TRANSACTION,
TxnState.TRANSACTION_COMMITTED,
TxnState.TRANSACTION_ABORTED
].indexOf(self.transaction.state) !== -1
) {
// Assume user provided function intentionally ended the transaction
return;
}

return retryCommit();
})
.catch(err => {
function maybeRetryOrThrow(err) {
if (
err instanceof MongoError &&
err.hasErrorLabel('TransientTransactionError') &&
Date.now() - startTime < 120000
) {
return retryTransaction();
}

throw err;
}

if (self.transaction.isActive) {
return self.abortTransaction().then(() => maybeRetryOrThrow(err));
}

return maybeRetryOrThrow(err);
});
}

return retryTransaction();
}
}

function isWriteConcernTimeoutError(err) {
return err.code === 64 && !!(err.errInfo && err.errInfo.wtimeout === true);
}

function isUnknownTransactionCommitResult(err) {
return (
['CannotSatisfyWriteConcern', 'UnknownReplWriteConcern', 'UnsatisfiableWriteConcern'].indexOf(
err.codeName
) === -1
);
}

function endTransaction(session, commandName, callback) {
Expand Down Expand Up @@ -334,7 +434,9 @@ function endTransaction(session, commandName, callback) {
e.errorLabels = [];
}

e.errorLabels.push('UnknownTransactionCommitResult');
if (isUnknownTransactionCommitResult(e)) {
e.errorLabels.push('UnknownTransactionCommitResult');
}
}
} else {
session.transaction.transition(TxnState.TRANSACTION_ABORTED);
Expand Down

0 comments on commit 478d1e7

Please sign in to comment.