Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactions #163

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 139 additions & 8 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ function ShareDbMongo(mongo, options) {
}

this._middleware = new MiddlewareHandler();

this._sessions = Object.create(null);
this._transactionOpLinks = Object.create(null);
this._lockedCollections = Object.create(null);
};

ShareDbMongo.prototype = Object.create(DB.prototype);
Expand Down Expand Up @@ -203,12 +207,54 @@ ShareDbMongo.prototype.close = function(callback) {
// **** Commit methods

ShareDbMongo.prototype.commit = function(collectionName, id, op, snapshot, options, callback) {
options = options || {};
var self = this;
var request = createRequestForMiddleware(options, collectionName, op);
this._writeOp(collectionName, id, op, snapshot, function(err, result) {

var cb = callback;
callback = function(error, succeeded) {
if (error && error.code === ShareDbMongo.sessionNotStartedError().code) {
// Session starting is handled automatically by ShareDB, so if a session hasn't
// started, it actually means that a session was closed early, possibly to avoid
// a transaction deadlock. In this case, swallow the error and tell ShareDB that
// the commit hasn't succeeded, which will trigger a retry (including restarting
// the transaction)
error = null;
succeeded = false;
}
cb(error, succeeded);
};

// MongoDB locks collections when writing transactions. This can lead to deadlock if
// we have two different transactions attempting to update the same collection.
// Here, we manually keep track of the collections involved in transactions, so that
// we can avoid this deadlock.
var isLocked = this._lockedCollections[collectionName] &&
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in share/sharedb#689 (comment) this locking complexity may be removed entirely if we follow MongoDB's own behaviour and define cross-transactional, awaited writes to the same document to be undefined behaviour. Instead, if we just let them run in parallel, MongoDB and ShareDB can do their thing.

this._lockedCollections[collectionName] !== options.transaction;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NB: if we keep this behaviour (I don't think we should), I think MongoDB locks on a document level, so we should update this key to collectionName + id to avoid unnecessary locking.


if (isLocked) {
// In order to avoid deadlock, let's impose a priority system:
// - commits without transactions take highest priority, since they're most likely to succeed
// - after that, determine priority on alphabetical order of Transaction ID
var hasLockPriority = !options.transaction || options.transactionId < this._lockedCollections[collectionName];
// If we don't have priority, return a failed write to allow a retry, which should hopefully
// allow the other transaction to complete
if (!hasLockPriority) return callback(null, false);

// If we do have lock priority, we should abort the other transaction to unblock our
// current transaction
// TODO: Wait for this?
this.abortTransaction(this._lockedCollections[collectionName], function(error) {
// TODO: Handle errors
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure what to do with this error, and this feels like a bit of a code smell in the architecture of transactions.

});
}

if (options.transaction) this._lockedCollections[collectionName] = options.transaction;

this._writeOp(collectionName, id, op, snapshot, options, function(err, result) {
if (err) return callback(err);
var opId = result.insertedId;
self._writeSnapshot(request, id, snapshot, opId, function(err, succeeded) {
self._writeSnapshot(request, id, snapshot, opId, options, function(err, succeeded) {
if (succeeded) return callback(err, succeeded);
// Cleanup unsuccessful op if snapshot write failed. This is not
// necessary for data correctness, but it gets rid of clutter
Expand All @@ -219,6 +265,72 @@ ShareDbMongo.prototype.commit = function(collectionName, id, op, snapshot, optio
});
};

ShareDbMongo.prototype.startTransaction = function(transactionId, callback) {
if (typeof transactionId !== 'string') throw new Error('Invalid Transaction ID');

if (this._sessions[transactionId]) {
// TODO: Proper error code
return callback(new Error('Transaction already in progress'));
}

this._transactionOpLinks[transactionId] = [];
var session = this._sessions[transactionId] = this._mongoClient.startSession();
session.startTransaction();

var self = this;
session.once('ended', function() {
self._cleanUpTransaction(transactionId);
});

callback();
};

ShareDbMongo.prototype.restartTransaction = function(transactionId, callback) {
this._cleanUpTransaction(transactionId);
this.startTransaction(transactionId, callback);
};

ShareDbMongo.prototype.commitTransaction = function(transactionId, callback) {
var self = this;
var cb = function(error) {
self._cleanUpTransaction(transactionId);
callback(error);
};

if (!this._sessions[transactionId]) return cb();
this._sessions[transactionId].commitTransaction()
.then(function() {
cb();
}, cb);
};

ShareDbMongo.prototype.abortTransaction = function(transactionId, callback) {
var self = this;
var cb = function(error) {
self._cleanUpTransaction(transactionId);
callback(error);
};

if (!this._sessions[transactionId]) return cb();
this._sessions[transactionId].abortTransaction()
.then(function() {
cb();
}, cb);
};

ShareDbMongo.prototype._cleanUpTransaction = function(transactionId) {
var session = this._sessions[transactionId];
if (session) session.endSession();
delete this._sessions[transactionId];
delete this._transactionOpLinks[transactionId];
// TODO: Improve performance?
for (var collection in this._lockedCollections) {
if (this._lockedCollections[collection] === transactionId) {
delete this._lockedCollections[collection];
}
}
};

function createRequestForMiddleware(options, collectionName, op, fields) {
// Create a new request object which will be passed to helper functions and middleware
var request = {
Expand All @@ -236,18 +348,28 @@ function createRequestForMiddleware(options, collectionName, op, fields) {
return request;
}

ShareDbMongo.prototype._writeOp = function(collectionName, id, op, snapshot, callback) {
ShareDbMongo.prototype._writeOp = function(collectionName, id, op, snapshot, options, callback) {
if (typeof op.v !== 'number') {
var err = ShareDbMongo.invalidOpVersionError(collectionName, id, op.v);
return callback(err);
}
var self = this;
this.getOpCollection(collectionName, function(err, opCollection) {
if (err) return callback(err);
var doc = shallowClone(op);
doc.d = id;
doc.o = snapshot._opLink;
opCollection.insertOne(doc)
var transactionLinks = self._transactionOpLinks[options.transaction] || [];
doc.o = transactionLinks[op.v - 1] || snapshot._opLink;
var session;
if (options.transaction) {
session = self._sessions[options.transaction];
if (!session) return callback(ShareDbMongo.sessionNotStartedError());
}
opCollection.insertOne(doc, {session: session})
.then(function(result) {
if (options.transaction) {
self._transactionOpLinks[options.transaction][op.v] = result.insertedId;
}
callback(null, result);
}, callback);
});
Expand All @@ -263,17 +385,22 @@ ShareDbMongo.prototype._deleteOp = function(collectionName, opId, callback) {
});
};

ShareDbMongo.prototype._writeSnapshot = function(request, id, snapshot, opId, callback) {
ShareDbMongo.prototype._writeSnapshot = function(request, id, snapshot, opId, options, callback) {
var self = this;
this.getCollection(request.collectionName, function(err, collection) {
if (err) return callback(err);
request.documentToWrite = castToDoc(id, snapshot, opId);
var session;
if (options.transaction) {
session = self._sessions[options.transaction];
if (!session) return callback(ShareDbMongo.sessionNotStartedError());
}
if (request.documentToWrite._v === 1) {
self._middleware.trigger(MiddlewareHandler.Actions.beforeCreate, request, function(middlewareErr) {
if (middlewareErr) {
return callback(middlewareErr);
}
collection.insertOne(request.documentToWrite)
collection.insertOne(request.documentToWrite, {session: session})
.then(
function() {
callback(null, true);
Expand All @@ -294,7 +421,7 @@ ShareDbMongo.prototype._writeSnapshot = function(request, id, snapshot, opId, ca
if (middlewareErr) {
return callback(middlewareErr);
}
collection.replaceOne(request.query, request.documentToWrite)
collection.replaceOne(request.query, request.documentToWrite, {session: session})
.then(function(result) {
var succeeded = !!result.modifiedCount;
callback(null, succeeded);
Expand Down Expand Up @@ -1668,6 +1795,10 @@ ShareDbMongo.parseQueryError = function(err) {
err.code = 5104;
return err;
};
ShareDbMongo.sessionNotStartedError = function() {
// TODO: Proper code
return {code: 5105, message: 'Session not started'};
};

// Middleware

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"main": "index.js",
"dependencies": {
"mongodb": "^3.1.13 || ^4.0.0 || ^5.0.0 || ^6.0.0",
"sharedb": "^1.9.1 || ^2.0.0 || ^3.0.0 || ^4.0.0 || ^5.0.0"
"sharedb": "file:../sharedb"
},
"devDependencies": {
"async": "^3.2.4",
Expand Down
2 changes: 1 addition & 1 deletion test/test_mongo.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ function create(callback) {
});
};

require('sharedb/test/db')({create: create, getQuery: getQuery});
require('sharedb/test/db')({create: create, getQuery: getQuery, transactions: true});

describe('mongo db', function() {
beforeEach(function(done) {
Expand Down
Loading