-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Transactions #163
base: master
Are you sure you want to change the base?
Transactions #163
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,6 +74,10 @@ function ShareDbMongo(mongo, options) { | |
} | ||
|
||
this._middleware = new MiddlewareHandler(); | ||
|
||
this._sessions = Object.create(null); | ||
this._transactionOpLinks = Object.create(null); | ||
this._lockedCollections = Object.create(null); | ||
}; | ||
|
||
ShareDbMongo.prototype = Object.create(DB.prototype); | ||
|
@@ -203,12 +207,54 @@ ShareDbMongo.prototype.close = function(callback) { | |
// **** Commit methods | ||
|
||
ShareDbMongo.prototype.commit = function(collectionName, id, op, snapshot, options, callback) { | ||
options = options || {}; | ||
var self = this; | ||
var request = createRequestForMiddleware(options, collectionName, op); | ||
this._writeOp(collectionName, id, op, snapshot, function(err, result) { | ||
|
||
var cb = callback; | ||
callback = function(error, succeeded) { | ||
if (error && error.code === ShareDbMongo.sessionNotStartedError().code) { | ||
// Session starting is handled automatically by ShareDB, so if a session hasn't | ||
// started, it actually means that a session was closed early, possibly to avoid | ||
// a transaction deadlock. In this case, swallow the error and tell ShareDB that | ||
// the commit hasn't succeeded, which will trigger a retry (including restarting | ||
// the transaction) | ||
error = null; | ||
succeeded = false; | ||
} | ||
cb(error, succeeded); | ||
}; | ||
|
||
// MongoDB locks collections when writing transactions. This can lead to deadlock if | ||
// we have two different transactions attempting to update the same collection. | ||
// Here, we manually keep track of the collections involved in transactions, so that | ||
// we can avoid this deadlock. | ||
var isLocked = this._lockedCollections[collectionName] && | ||
this._lockedCollections[collectionName] !== options.transaction; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NB: if we keep this behaviour (I don't think we should), I think MongoDB locks on a document level, so we should update this key to |
||
|
||
if (isLocked) { | ||
// In order to avoid deadlock, let's impose a priority system: | ||
// - commits without transactions take highest priority, since they're most likely to succeed | ||
// - after that, determine priority on alphabetical order of Transaction ID | ||
var hasLockPriority = !options.transaction || options.transactionId < this._lockedCollections[collectionName]; | ||
// If we don't have priority, return a failed write to allow a retry, which should hopefully | ||
// allow the other transaction to complete | ||
if (!hasLockPriority) return callback(null, false); | ||
|
||
// If we do have lock priority, we should abort the other transaction to unblock our | ||
// current transaction | ||
// TODO: Wait for this? | ||
this.abortTransaction(this._lockedCollections[collectionName], function(error) { | ||
// TODO: Handle errors | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not really sure what to do with this error, and this feels like a bit of a code smell in the architecture of transactions. |
||
}); | ||
} | ||
|
||
if (options.transaction) this._lockedCollections[collectionName] = options.transaction; | ||
|
||
this._writeOp(collectionName, id, op, snapshot, options, function(err, result) { | ||
if (err) return callback(err); | ||
var opId = result.insertedId; | ||
self._writeSnapshot(request, id, snapshot, opId, function(err, succeeded) { | ||
self._writeSnapshot(request, id, snapshot, opId, options, function(err, succeeded) { | ||
if (succeeded) return callback(err, succeeded); | ||
// Cleanup unsuccessful op if snapshot write failed. This is not | ||
// necessary for data correctness, but it gets rid of clutter | ||
|
@@ -219,6 +265,72 @@ ShareDbMongo.prototype.commit = function(collectionName, id, op, snapshot, optio | |
}); | ||
}; | ||
|
||
ShareDbMongo.prototype.startTransaction = function(transactionId, callback) { | ||
if (typeof transactionId !== 'string') throw new Error('Invalid Transaction ID'); | ||
|
||
if (this._sessions[transactionId]) { | ||
// TODO: Proper error code | ||
return callback(new Error('Transaction already in progress')); | ||
} | ||
|
||
this._transactionOpLinks[transactionId] = []; | ||
var session = this._sessions[transactionId] = this._mongoClient.startSession(); | ||
session.startTransaction(); | ||
|
||
var self = this; | ||
session.once('ended', function() { | ||
self._cleanUpTransaction(transactionId); | ||
}); | ||
|
||
callback(); | ||
}; | ||
|
||
ShareDbMongo.prototype.restartTransaction = function(transactionId, callback) { | ||
this._cleanUpTransaction(transactionId); | ||
this.startTransaction(transactionId, callback); | ||
}; | ||
|
||
ShareDbMongo.prototype.commitTransaction = function(transactionId, callback) { | ||
var self = this; | ||
var cb = function(error) { | ||
self._cleanUpTransaction(transactionId); | ||
callback(error); | ||
}; | ||
|
||
if (!this._sessions[transactionId]) return cb(); | ||
this._sessions[transactionId].commitTransaction() | ||
.then(function() { | ||
cb(); | ||
}, cb); | ||
}; | ||
|
||
ShareDbMongo.prototype.abortTransaction = function(transactionId, callback) { | ||
var self = this; | ||
var cb = function(error) { | ||
self._cleanUpTransaction(transactionId); | ||
callback(error); | ||
}; | ||
|
||
if (!this._sessions[transactionId]) return cb(); | ||
this._sessions[transactionId].abortTransaction() | ||
.then(function() { | ||
cb(); | ||
}, cb); | ||
}; | ||
|
||
ShareDbMongo.prototype._cleanUpTransaction = function(transactionId) { | ||
var session = this._sessions[transactionId]; | ||
if (session) session.endSession(); | ||
delete this._sessions[transactionId]; | ||
delete this._transactionOpLinks[transactionId]; | ||
// TODO: Improve performance? | ||
for (var collection in this._lockedCollections) { | ||
if (this._lockedCollections[collection] === transactionId) { | ||
delete this._lockedCollections[collection]; | ||
} | ||
} | ||
}; | ||
|
||
function createRequestForMiddleware(options, collectionName, op, fields) { | ||
// Create a new request object which will be passed to helper functions and middleware | ||
var request = { | ||
|
@@ -236,18 +348,28 @@ function createRequestForMiddleware(options, collectionName, op, fields) { | |
return request; | ||
} | ||
|
||
ShareDbMongo.prototype._writeOp = function(collectionName, id, op, snapshot, callback) { | ||
ShareDbMongo.prototype._writeOp = function(collectionName, id, op, snapshot, options, callback) { | ||
if (typeof op.v !== 'number') { | ||
var err = ShareDbMongo.invalidOpVersionError(collectionName, id, op.v); | ||
return callback(err); | ||
} | ||
var self = this; | ||
this.getOpCollection(collectionName, function(err, opCollection) { | ||
if (err) return callback(err); | ||
var doc = shallowClone(op); | ||
doc.d = id; | ||
doc.o = snapshot._opLink; | ||
opCollection.insertOne(doc) | ||
var transactionLinks = self._transactionOpLinks[options.transaction] || []; | ||
doc.o = transactionLinks[op.v - 1] || snapshot._opLink; | ||
var session; | ||
if (options.transaction) { | ||
session = self._sessions[options.transaction]; | ||
if (!session) return callback(ShareDbMongo.sessionNotStartedError()); | ||
} | ||
opCollection.insertOne(doc, {session: session}) | ||
.then(function(result) { | ||
if (options.transaction) { | ||
self._transactionOpLinks[options.transaction][op.v] = result.insertedId; | ||
} | ||
callback(null, result); | ||
}, callback); | ||
}); | ||
|
@@ -263,17 +385,22 @@ ShareDbMongo.prototype._deleteOp = function(collectionName, opId, callback) { | |
}); | ||
}; | ||
|
||
ShareDbMongo.prototype._writeSnapshot = function(request, id, snapshot, opId, callback) { | ||
ShareDbMongo.prototype._writeSnapshot = function(request, id, snapshot, opId, options, callback) { | ||
var self = this; | ||
this.getCollection(request.collectionName, function(err, collection) { | ||
if (err) return callback(err); | ||
request.documentToWrite = castToDoc(id, snapshot, opId); | ||
var session; | ||
if (options.transaction) { | ||
session = self._sessions[options.transaction]; | ||
if (!session) return callback(ShareDbMongo.sessionNotStartedError()); | ||
} | ||
if (request.documentToWrite._v === 1) { | ||
self._middleware.trigger(MiddlewareHandler.Actions.beforeCreate, request, function(middlewareErr) { | ||
if (middlewareErr) { | ||
return callback(middlewareErr); | ||
} | ||
collection.insertOne(request.documentToWrite) | ||
collection.insertOne(request.documentToWrite, {session: session}) | ||
.then( | ||
function() { | ||
callback(null, true); | ||
|
@@ -294,7 +421,7 @@ ShareDbMongo.prototype._writeSnapshot = function(request, id, snapshot, opId, ca | |
if (middlewareErr) { | ||
return callback(middlewareErr); | ||
} | ||
collection.replaceOne(request.query, request.documentToWrite) | ||
collection.replaceOne(request.query, request.documentToWrite, {session: session}) | ||
.then(function(result) { | ||
var succeeded = !!result.modifiedCount; | ||
callback(null, succeeded); | ||
|
@@ -1668,6 +1795,10 @@ ShareDbMongo.parseQueryError = function(err) { | |
err.code = 5104; | ||
return err; | ||
}; | ||
ShareDbMongo.sessionNotStartedError = function() { | ||
// TODO: Proper code | ||
return {code: 5105, message: 'Session not started'}; | ||
}; | ||
|
||
// Middleware | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I mentioned in share/sharedb#689 (comment) this locking complexity may be removed entirely if we follow MongoDB's own behaviour and define cross-transactional, awaited writes to the same document to be undefined behaviour. Instead, if we just let them run in parallel, MongoDB and ShareDB can do their thing.