Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
feat(txns): add initial transaction interface for sessions
Browse files Browse the repository at this point in the history
NODE-1374
  • Loading branch information
mbroadst committed Apr 17, 2018
1 parent b472d45 commit ed76be0
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 60 deletions.
144 changes: 136 additions & 8 deletions lib/sessions.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,37 @@
'use strict';

const retrieveBSON = require('./connection/utils').retrieveBSON,
EventEmitter = require('events'),
BSON = retrieveBSON(),
Binary = BSON.Binary,
uuidV4 = require('./utils').uuidV4;
const retrieveBSON = require('./connection/utils').retrieveBSON;
const EventEmitter = require('events');
const BSON = retrieveBSON();
const Binary = BSON.Binary;
const uuidV4 = require('./utils').uuidV4;
const MongoError = require('./error').MongoError;

/**
*
*/
function assertAlive(session, callback) {
if (session.serverSession == null) {
const error = new MongoError('Cannot use a session that has ended');
if (typeof callback === 'function') {
return callback(error, null);
}

throw error;
}
};

/** A class representing a client session on the server */
class ClientSession extends EventEmitter {

/**
* Create a client session.
* WARNING: not meant to be instantiated directly
*
* @param {Topology} topology The current client's topology
* @param {ServerSessionPool} sessionPool The server session pool
* @param {Object} [options] Optional settings
* @param {Boolean} [options.causalConsistency] Whether causal consistency should be enabled on this session
* @param {Boolean} [options.autoStartTransaction=false] When enabled this session automatically starts a transaction with the provided defaultTransactionOptions.
* @param {Object} [options.defaultTransactionOptions] The default TransactionOptions to use for transactions started on this session.
*/
constructor(topology, sessionPool, options) {
super();

Expand Down Expand Up @@ -42,10 +64,20 @@ class ClientSession extends EventEmitter {

this.explicit = !!options.explicit;
this.owner = options.owner;
this.transactionOptions = null;
this.defaultTransactionOptions = options.defaultTransactionOptions || {};

if (options.autoStartTransaction) {
this.startTransaction();
}
}

/**
* Ends this session on the server
*
* @param {Object} [options] Optional settings
* @param {Boolean} [options.skipCommand] Skip sending the actual endSessions command to the server
* @param {Function} [callback] Optional callback for completion of this operation
*/
endSession(options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
Expand All @@ -56,6 +88,10 @@ class ClientSession extends EventEmitter {
return;
}

if (this.serverSession && this.inTransaction()) {
this.abortTransaction(); // pass in callback?
}

if (!options.skipCommand) {
// send the `endSessions` command
this.topology.endSessions(this.id);
Expand Down Expand Up @@ -98,6 +134,98 @@ class ClientSession extends EventEmitter {

return this.id.id.buffer.equals(session.id.id.buffer);
}

/**
* @returns whether this session is current in a transaction or not
*/
inTransaction() {
return this.transactionOptions != null;
}

/**
* Starts a new transaction with the given options.
*
* @param {Object} options Optional settings
* @param {ReadConcern} [options.readConcern] The readConcern to use for this transaction
* @param {WriteConcern} [options.writeConcern] The writeConcern to use for this transaction
*/
startTransaction(options) {
assertAlive(this);
if (this.inTransaction()) {
throw new MongoError('Transaction already started');
}

// increment txnNumber and reset stmtId to zero.
this.serverSession.txnNumber += 1;
this.serverSession.stmtId = 0;

// set transaction options, we will use this to determine if we are in a transaction
this.transactionOptions = options || this.defaultTransactionOptions;
}

/**
* Commits the currently active transaction in this session.
*
* @param {Function} [callback] optional callback for completion of this operation
* @return {Promise} A promise is returned if no callback is provided
*/
commitTransaction(callback) {
if (typeof callback === 'function') {
endTransaction(this, 'commitTransaction', callback);
return;
}

return new Promise((resolve, reject) => {
endTransaction(this, 'commitTransaction', (err, reply) => err ? reject(err) : resolve(reply));
});
}

/**
* Aborts the currently active transaction in this session.
*
* @param {Function} [callback] optional callback for completion of this operation
* @return {Promise} A promise is returned if no callback is provided
*/
abortTransaction(callback) {
if (typeof callback === 'function') {
endTransaction(this, 'abortTransaction', callback);
return;
}

return new Promise((resolve, reject) => {
endTransaction(this, 'abortTransaction', (err, reply) => err ? reject(err) : resolve(reply));
});

}
}

function endTransaction(clientSession, commandName, callback) {
assertAlive(clientSession, callback);

if (!clientSession.inTransaction()) {
callback(new MongoError('No transaction started'));
return;
}

if (clientSession.serverSession.stmtId === 0) {
// The server transaction was never started.
callback(null, null);
return;
}

// send the command
clientSession.topology.command('admin.$cmd', { [commandName]: 1 }, {
writeConcern: clientSession.transactionOptions.writeConcern
}, (err, reply) => {
// reset internal transaction state
if (clientSession.options.autoStartTransaction) {
clientSession.startTransaction();
} else {
clientSession.transactionOptions = null;
}

callback(err, reply);
});
}

Object.defineProperty(ClientSession.prototype, 'id', {
Expand Down
37 changes: 18 additions & 19 deletions lib/topologies/mongos.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
'use strict';

const inherits = require('util').inherits,
f = require('util').format,
EventEmitter = require('events').EventEmitter,
BasicCursor = require('../cursor'),
Logger = require('../connection/logger'),
retrieveBSON = require('../connection/utils').retrieveBSON,
MongoError = require('../error').MongoError,
errors = require('../error'),
Server = require('./server'),
clone = require('./shared').clone,
diff = require('./shared').diff,
cloneOptions = require('./shared').cloneOptions,
createClientInfo = require('./shared').createClientInfo,
SessionMixins = require('./shared').SessionMixins,
isRetryableWritesSupported = require('./shared').isRetryableWritesSupported,
getNextTransactionNumber = require('./shared').getNextTransactionNumber,
relayEvents = require('./shared').relayEvents;

const inherits = require('util').inherits;
const f = require('util').format;
const EventEmitter = require('events').EventEmitter;
const BasicCursor = require('../cursor');
const Logger = require('../connection/logger');
const retrieveBSON = require('../connection/utils').retrieveBSON;
const MongoError = require('../error').MongoError;
const errors = require('../error');
const Server = require('./server');
const clone = require('./shared').clone;
const diff = require('./shared').diff;
const cloneOptions = require('./shared').cloneOptions;
const createClientInfo = require('./shared').createClientInfo;
const SessionMixins = require('./shared').SessionMixins;
const isRetryableWritesSupported = require('./shared').isRetryableWritesSupported;
const incrementTransactionNumber = require('./shared').incrementTransactionNumber;
const relayEvents = require('./shared').relayEvents;
const BSON = retrieveBSON();

/**
Expand Down Expand Up @@ -909,7 +908,7 @@ var executeWriteOperation = function(self, op, ns, ops, options, callback) {
}

// increment and assign txnNumber
options.txnNumber = getNextTransactionNumber(options.session);
incrementTransactionNumber(options.session);

server[op](ns, ops, options, (err, result) => {
if (!err) return callback(null, result);
Expand Down
40 changes: 20 additions & 20 deletions lib/topologies/replset.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
'use strict';

var inherits = require('util').inherits,
f = require('util').format,
EventEmitter = require('events').EventEmitter,
ReadPreference = require('./read_preference'),
BasicCursor = require('../cursor'),
retrieveBSON = require('../connection/utils').retrieveBSON,
Logger = require('../connection/logger'),
MongoError = require('../error').MongoError,
errors = require('../error'),
Server = require('./server'),
ReplSetState = require('./replset_state'),
clone = require('./shared').clone,
Timeout = require('./shared').Timeout,
Interval = require('./shared').Interval,
createClientInfo = require('./shared').createClientInfo,
SessionMixins = require('./shared').SessionMixins,
isRetryableWritesSupported = require('./shared').isRetryableWritesSupported,
getNextTransactionNumber = require('./shared').getNextTransactionNumber,
relayEvents = require('./shared').relayEvents;
const inherits = require('util').inherits;
const f = require('util').format;
const EventEmitter = require('events').EventEmitter;
const ReadPreference = require('./read_preference');
const BasicCursor = require('../cursor');
const retrieveBSON = require('../connection/utils').retrieveBSON;
const Logger = require('../connection/logger');
const MongoError = require('../error').MongoError;
const errors = require('../error');
const Server = require('./server');
const ReplSetState = require('./replset_state');
const clone = require('./shared').clone;
const Timeout = require('./shared').Timeout;
const Interval = require('./shared').Interval;
const createClientInfo = require('./shared').createClientInfo;
const SessionMixins = require('./shared').SessionMixins;
const isRetryableWritesSupported = require('./shared').isRetryableWritesSupported;
const incrementTransactionNumber = require('./shared').incrementTransactionNumber;
const relayEvents = require('./shared').relayEvents;

var MongoCR = require('../auth/mongocr'),
X509 = require('../auth/x509'),
Expand Down Expand Up @@ -1230,7 +1230,7 @@ function executeWriteOperation(args, options, callback) {

// increment and assign txnNumber
if (willRetryWrite) {
options.txnNumber = getNextTransactionNumber(options.session);
incrementTransactionNumber(options.session);
}

return self.s.replicaSetState.primary[op](ns, ops, options, handler);
Expand Down
5 changes: 2 additions & 3 deletions lib/topologies/shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,8 @@ const isRetryableWritesSupported = function(topology) {
*
* @param {ClientSession} session
*/
const getNextTransactionNumber = function(session) {
const incrementTransactionNumber = function(session) {
session.serverSession.txnNumber++;
return BSON.Long.fromNumber(session.serverSession.txnNumber);
};

/**
Expand All @@ -454,5 +453,5 @@ module.exports.diff = diff;
module.exports.Interval = Interval;
module.exports.Timeout = Timeout;
module.exports.isRetryableWritesSupported = isRetryableWritesSupported;
module.exports.getNextTransactionNumber = getNextTransactionNumber;
module.exports.incrementTransactionNumber = incrementTransactionNumber;
module.exports.relayEvents = relayEvents;
32 changes: 22 additions & 10 deletions lib/wireprotocol/3_2_support.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
'use strict';

var Query = require('../connection/commands').Query,
retrieveBSON = require('../connection/utils').retrieveBSON,
f = require('util').format,
MongoError = require('../error').MongoError,
MongoNetworkError = require('../error').MongoNetworkError,
getReadPreference = require('./shared').getReadPreference;
const Query = require('../connection/commands').Query;
const retrieveBSON = require('../connection/utils').retrieveBSON;
const f = require('util').format;
const MongoError = require('../error').MongoError;
const MongoNetworkError = require('../error').MongoNetworkError;
const getReadPreference = require('./shared').getReadPreference;

var BSON = retrieveBSON(),
Long = BSON.Long;
const BSON = retrieveBSON();
const Long = BSON.Long;

var WireProtocol = function(legacyWireProtocol) {
this.legacyWireProtocol = legacyWireProtocol;
Expand Down Expand Up @@ -57,8 +57,20 @@ var executeWrite = function(pool, bson, type, opsField, ns, ops, options, callba
}

// optionally add a `txnNumber` if retryable writes are being attempted
if (typeof options.txnNumber !== 'undefined') {
writeCommand.txnNumber = options.txnNumber;
if (options.session && options.session.serverSession) {
const serverSession = options.session.serverSession;
if (serverSession.txnNumber) {
writeCommand.txnNumber = BSON.Long.fromNumber(serverSession.txnNumber);
}

if (typeof serverSession.stmtId !== 'undefined') {
writeCommand.stmtId = serverSession.stmtId;

if (serverSession.stmtId === 0) {
writeCommand.startTransaction = true;
writeCommand.autocommit = false;
}
}
}

// Options object
Expand Down

0 comments on commit ed76be0

Please sign in to comment.