From d1fdea997ee1d87a6306453f4058ead8e96c6004 Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Mon, 20 Jan 2025 11:47:15 +0000 Subject: [PATCH 1/4] mostly working --- index.js | 54 +++++++++++++++++++++++++++++++++++++++------- package.json | 2 +- test/test_mongo.js | 2 +- 3 files changed, 48 insertions(+), 10 deletions(-) diff --git a/index.js b/index.js index 5a331d3..18e2907 100644 --- a/index.js +++ b/index.js @@ -74,6 +74,9 @@ function ShareDbMongo(mongo, options) { } this._middleware = new MiddlewareHandler(); + + this._sessions = Object.create(null); + this._transactionOpLinks = Object.create(null); }; ShareDbMongo.prototype = Object.create(DB.prototype); @@ -203,12 +206,14 @@ ShareDbMongo.prototype.close = function(callback) { // **** Commit methods ShareDbMongo.prototype.commit = function(collectionName, id, op, snapshot, options, callback) { + options = options || {}; var self = this; var request = createRequestForMiddleware(options, collectionName, op); - this._writeOp(collectionName, id, op, snapshot, function(err, result) { + // TODO: Wrap callback to abort transaction on error and tidy up session in .finally() + this._writeOp(collectionName, id, op, snapshot, options, function(err, result) { if (err) return callback(err); var opId = result.insertedId; - self._writeSnapshot(request, id, snapshot, opId, function(err, succeeded) { + self._writeSnapshot(request, id, snapshot, opId, options, function(err, succeeded) { if (succeeded) return callback(err, succeeded); // Cleanup unsuccessful op if snapshot write failed. This is not // necessary for data correctness, but it gets rid of clutter @@ -219,6 +224,32 @@ ShareDbMongo.prototype.commit = function(collectionName, id, op, snapshot, optio }); }; +ShareDbMongo.prototype.commitTransaction = function(transactionId, callback) { + if (!this._sessions[transactionId]) return callback(); + this._sessions[transactionId].commitTransaction() + .then(function() { + callback(); + }, callback); +}; + +ShareDbMongo.prototype.abortTransaction = function(transactionId, callback) { + if (!this._sessions[transactionId]) return callback(); + this._sessions[transactionId].abortTransaction() + .then(function() { + callback(); + }, callback); +}; + +ShareDbMongo.prototype._ensureTransactionStarted = function(transactionId) { + if (typeof transactionId !== 'string') return null; + if (!this._sessions[transactionId]) { + this._sessions[transactionId] = this._mongoClient.startSession(); + this._sessions[transactionId].startTransaction(); + this._transactionOpLinks[transactionId] = []; + } + return this._sessions[transactionId]; +}; + function createRequestForMiddleware(options, collectionName, op, fields) { // Create a new request object which will be passed to helper functions and middleware var request = { @@ -236,18 +267,24 @@ function createRequestForMiddleware(options, collectionName, op, fields) { return request; } -ShareDbMongo.prototype._writeOp = function(collectionName, id, op, snapshot, callback) { +ShareDbMongo.prototype._writeOp = function(collectionName, id, op, snapshot, options, callback) { if (typeof op.v !== 'number') { var err = ShareDbMongo.invalidOpVersionError(collectionName, id, op.v); return callback(err); } + var self = this; this.getOpCollection(collectionName, function(err, opCollection) { if (err) return callback(err); var doc = shallowClone(op); doc.d = id; - doc.o = snapshot._opLink; - opCollection.insertOne(doc) + var transactionLinks = self._transactionOpLinks[options.transaction] || []; + doc.o = transactionLinks[op.v - 1] || snapshot._opLink; + var session = self._ensureTransactionStarted(options.transaction); + opCollection.insertOne(doc, {session: session}) .then(function(result) { + if (options.transaction) { + self._transactionOpLinks[options.transaction][op.v] = result.insertedId; + } callback(null, result); }, callback); }); @@ -263,17 +300,18 @@ ShareDbMongo.prototype._deleteOp = function(collectionName, opId, callback) { }); }; -ShareDbMongo.prototype._writeSnapshot = function(request, id, snapshot, opId, callback) { +ShareDbMongo.prototype._writeSnapshot = function(request, id, snapshot, opId, options, callback) { var self = this; this.getCollection(request.collectionName, function(err, collection) { if (err) return callback(err); request.documentToWrite = castToDoc(id, snapshot, opId); + var session = self._ensureTransactionStarted(options.transaction); if (request.documentToWrite._v === 1) { self._middleware.trigger(MiddlewareHandler.Actions.beforeCreate, request, function(middlewareErr) { if (middlewareErr) { return callback(middlewareErr); } - collection.insertOne(request.documentToWrite) + collection.insertOne(request.documentToWrite, {session: session}) .then( function() { callback(null, true); @@ -294,7 +332,7 @@ ShareDbMongo.prototype._writeSnapshot = function(request, id, snapshot, opId, ca if (middlewareErr) { return callback(middlewareErr); } - collection.replaceOne(request.query, request.documentToWrite) + collection.replaceOne(request.query, request.documentToWrite, {session: session}) .then(function(result) { var succeeded = !!result.modifiedCount; callback(null, succeeded); diff --git a/package.json b/package.json index 6332045..32f91a5 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,7 @@ "main": "index.js", "dependencies": { "mongodb": "^3.1.13 || ^4.0.0 || ^5.0.0 || ^6.0.0", - "sharedb": "^1.9.1 || ^2.0.0 || ^3.0.0 || ^4.0.0 || ^5.0.0" + "sharedb": "file:../sharedb" }, "devDependencies": { "async": "^3.2.4", diff --git a/test/test_mongo.js b/test/test_mongo.js index 36ba42c..521e5ec 100644 --- a/test/test_mongo.js +++ b/test/test_mongo.js @@ -18,7 +18,7 @@ function create(callback) { }); }; -require('sharedb/test/db')({create: create, getQuery: getQuery}); +require('sharedb/test/db')({create: create, getQuery: getQuery, transactions: true}); describe('mongo db', function() { beforeEach(function(done) { From 62915a5114bdfb17931a72d69ad4dff4507a51f6 Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Mon, 20 Jan 2025 15:49:49 +0000 Subject: [PATCH 2/4] promisify --- index.js | 177 +++++++++++++++++++++++++++++++++---------------------- 1 file changed, 108 insertions(+), 69 deletions(-) diff --git a/index.js b/index.js index 18e2907..c0bb386 100644 --- a/index.js +++ b/index.js @@ -75,6 +75,7 @@ function ShareDbMongo(mongo, options) { this._middleware = new MiddlewareHandler(); + this._transactionConnections = Object.create(null); this._sessions = Object.create(null); this._transactionOpLinks = Object.create(null); }; @@ -83,12 +84,12 @@ ShareDbMongo.prototype = Object.create(DB.prototype); ShareDbMongo.prototype.projectsSnapshots = true; -ShareDbMongo.prototype.getCollection = function(collectionName, callback) { +ShareDbMongo.prototype.getCollection = function(collectionName, options, callback) { // Check the collection name var err = this.validateCollectionName(collectionName); if (err) return callback(err); // Gotcha: calls back sync if connected or async if not - this.getDbs(function(err, mongo) { + this.getDbs(options, function(err, mongo) { if (err) return callback(err); var collection = mongo.collection(collectionName); return callback(null, collection); @@ -100,7 +101,7 @@ ShareDbMongo.prototype._getCollectionPoll = function(collectionName, callback) { var err = this.validateCollectionName(collectionName); if (err) return callback(err); // Gotcha: calls back sync if connected or async if not - this.getDbs(function(err, mongo, mongoPoll) { + this.getDbs({}, function(err, mongo, mongoPoll) { if (err) return callback(err); var collection = (mongoPoll || mongo).collection(collectionName); return callback(null, collection); @@ -118,7 +119,15 @@ ShareDbMongo.prototype.getCollectionPoll = function(collectionName, callback) { this._getCollectionPoll(collectionName, callback); }; -ShareDbMongo.prototype.getDbs = function(callback) { +ShareDbMongo.prototype.getDbs = function(options, callback) { + if (typeof options === 'function') { + callback = options; + options = null; + } + options = options || {}; + + if (options.transaction) return this._getTransactionDbs(options.transaction, callback); + if (this.closed) { var err = ShareDbMongo.alreadyClosedError(); return callback(err); @@ -129,6 +138,23 @@ ShareDbMongo.prototype.getDbs = function(callback) { }, callback); }; +ShareDbMongo.prototype._getTransactionDbs = function(transactionId, callback) { + return this._getTransactionClient(transactionId) + .then(function(client) { + callback(null, client.db()); + }, callback); +}; + +ShareDbMongo.prototype._getTransactionClient = function(transactionId) { + if (!this._transactionConnections[transactionId]) { + // TODO: this bypasses the creation function, which probably isn't desirable + var client = new mongodb.MongoClient(this._mongoClient.s.url); + this._transactionConnections[transactionId] = client.connect(); + } + + return this._transactionConnections[transactionId]; +}; + ShareDbMongo.prototype._connect = function(mongo, options) { // Create the mongo connection client connections if needed // @@ -187,7 +213,7 @@ ShareDbMongo.prototype.close = function(callback) { }; } var self = this; - this.getDbs(function(err) { + this.getDbs({}, function(err) { // Ignore "already closed" if (err && err.code === 5101) return callback(); if (err) return callback(err); @@ -206,14 +232,18 @@ ShareDbMongo.prototype.close = function(callback) { // **** Commit methods ShareDbMongo.prototype.commit = function(collectionName, id, op, snapshot, options, callback) { + console.log('commit op', op, snapshot); options = options || {}; var self = this; var request = createRequestForMiddleware(options, collectionName, op); // TODO: Wrap callback to abort transaction on error and tidy up session in .finally() this._writeOp(collectionName, id, op, snapshot, options, function(err, result) { + console.log('op written', err, op); if (err) return callback(err); var opId = result.insertedId; + console.log('write snapshot...', snapshot); self._writeSnapshot(request, id, snapshot, opId, options, function(err, succeeded) { + console.log('snapshot written', err, succeeded); if (succeeded) return callback(err, succeeded); // Cleanup unsuccessful op if snapshot write failed. This is not // necessary for data correctness, but it gets rid of clutter @@ -226,27 +256,30 @@ ShareDbMongo.prototype.commit = function(collectionName, id, op, snapshot, optio ShareDbMongo.prototype.commitTransaction = function(transactionId, callback) { if (!this._sessions[transactionId]) return callback(); - this._sessions[transactionId].commitTransaction() - .then(function() { - callback(); - }, callback); + this._sessions[transactionId].then(function(session) { + return session.commitTransaction(); + }, callback); }; ShareDbMongo.prototype.abortTransaction = function(transactionId, callback) { if (!this._sessions[transactionId]) return callback(); - this._sessions[transactionId].abortTransaction() - .then(function() { - callback(); - }, callback); + this._sessions[transactionId].then(function(session) { + return session.abortTransaction(); + }, callback); }; ShareDbMongo.prototype._ensureTransactionStarted = function(transactionId) { - if (typeof transactionId !== 'string') return null; + if (typeof transactionId !== 'string') return Promise.resolve(); + if (!this._sessions[transactionId]) { - this._sessions[transactionId] = this._mongoClient.startSession(); - this._sessions[transactionId].startTransaction(); this._transactionOpLinks[transactionId] = []; + this._sessions[transactionId] = this._getTransactionClient(transactionId).then(function(client) { + var session = client.startSession(); + session.startTransaction(); + return session; + }); } + return this._sessions[transactionId]; }; @@ -273,25 +306,27 @@ ShareDbMongo.prototype._writeOp = function(collectionName, id, op, snapshot, opt return callback(err); } var self = this; - this.getOpCollection(collectionName, function(err, opCollection) { + this.getOpCollection(collectionName, options, function(err, opCollection) { if (err) return callback(err); var doc = shallowClone(op); doc.d = id; var transactionLinks = self._transactionOpLinks[options.transaction] || []; doc.o = transactionLinks[op.v - 1] || snapshot._opLink; - var session = self._ensureTransactionStarted(options.transaction); - opCollection.insertOne(doc, {session: session}) - .then(function(result) { - if (options.transaction) { - self._transactionOpLinks[options.transaction][op.v] = result.insertedId; - } - callback(null, result); - }, callback); + // TODO: Rewrite as async/await (supported since Node.js v7.6.0) + self._ensureTransactionStarted(options.transaction).then(function(session) { + opCollection.insertOne(doc, {session: session}) + .then(function(result) { + if (options.transaction) { + self._transactionOpLinks[options.transaction][op.v] = result.insertedId; + } + callback(null, result); + }, callback); + }, callback); }); }; ShareDbMongo.prototype._deleteOp = function(collectionName, opId, callback) { - this.getOpCollection(collectionName, function(err, opCollection) { + this.getOpCollection(collectionName, {}, function(err, opCollection) { if (err) return callback(err); opCollection.deleteOne({_id: opId}) .then(function(result) { @@ -302,43 +337,47 @@ ShareDbMongo.prototype._deleteOp = function(collectionName, opId, callback) { ShareDbMongo.prototype._writeSnapshot = function(request, id, snapshot, opId, options, callback) { var self = this; - this.getCollection(request.collectionName, function(err, collection) { + this.getCollection(request.collectionName, options, function(err, collection) { if (err) return callback(err); request.documentToWrite = castToDoc(id, snapshot, opId); - var session = self._ensureTransactionStarted(options.transaction); - if (request.documentToWrite._v === 1) { - self._middleware.trigger(MiddlewareHandler.Actions.beforeCreate, request, function(middlewareErr) { - if (middlewareErr) { - return callback(middlewareErr); - } - collection.insertOne(request.documentToWrite, {session: session}) - .then( - function() { - callback(null, true); - }, - function(err) { + self._ensureTransactionStarted(options.transaction).then(function(session) { + console.log('session?', !!session); + if (request.documentToWrite._v === 1) { + self._middleware.trigger(MiddlewareHandler.Actions.beforeCreate, request, function(middlewareErr) { + if (middlewareErr) { + return callback(middlewareErr); + } + collection.insertOne(request.documentToWrite, {session: session}) + .then( + function() { + callback(null, true); + }, + function(err) { // Return non-success instead of duplicate key error, since this is // expected to occur during simultaneous creates on the same id - if (err.code === 11000 && /\b_id_\b/.test(err.message)) { - return callback(null, false); + if (err.code === 11000 && /\b_id_\b/.test(err.message)) { + return callback(null, false); + } + return callback(err); } - return callback(err); - } - ); - }); - } else { - request.query = {_id: id, _v: request.documentToWrite._v - 1}; - self._middleware.trigger(MiddlewareHandler.Actions.beforeOverwrite, request, function(middlewareErr) { - if (middlewareErr) { - return callback(middlewareErr); - } - collection.replaceOne(request.query, request.documentToWrite, {session: session}) - .then(function(result) { - var succeeded = !!result.modifiedCount; - callback(null, succeeded); - }, callback); - }); - } + ); + }); + } else { + request.query = {_id: id, _v: request.documentToWrite._v - 1}; + console.log('query', request.query); + console.log('to write', request.documentToWrite); + self._middleware.trigger(MiddlewareHandler.Actions.beforeOverwrite, request, function(middlewareErr) { + if (middlewareErr) { + return callback(middlewareErr); + } + collection.replaceOne(request.query, request.documentToWrite, {session: session}) + .then(function(result) { + var succeeded = !!result.modifiedCount; + callback(null, succeeded); + }, callback); + }); + } + }, callback); }); }; @@ -347,7 +386,7 @@ ShareDbMongo.prototype._writeSnapshot = function(request, id, snapshot, opId, op ShareDbMongo.prototype.getSnapshot = function(collectionName, id, fields, options, callback) { var self = this; - this.getCollection(collectionName, function(err, collection) { + this.getCollection(collectionName, options, function(err, collection) { if (err) return callback(err); var query = {_id: id}; var projection = getProjection(fields, options); @@ -367,7 +406,7 @@ ShareDbMongo.prototype.getSnapshot = function(collectionName, id, fields, option ShareDbMongo.prototype.getSnapshotBulk = function(collectionName, ids, fields, options, callback) { var self = this; - this.getCollection(collectionName, function(err, collection) { + this.getCollection(collectionName, options, function(err, collection) { if (err) return callback(err); var query = {_id: {$in: ids}}; var projection = getProjection(fields, options); @@ -414,9 +453,9 @@ ShareDbMongo.prototype.validateCollectionName = function(collectionName) { }; // Get and return the op collection from mongo, ensuring it has the op index. -ShareDbMongo.prototype.getOpCollection = function(collectionName, callback) { +ShareDbMongo.prototype.getOpCollection = function(collectionName, options, callback) { var self = this; - this.getDbs(function(err, mongo) { + this.getDbs(options, function(err, mongo) { if (err) return callback(err); var name = self.getOplogCollectionName(collectionName); var collection = mongo.collection(name); @@ -548,7 +587,7 @@ ShareDbMongo.prototype.getOpsBulk = function(collectionName, fromMap, toMap, opt ShareDbMongo.prototype.getCommittedOpVersion = function(collectionName, id, snapshot, op, options, callback) { var self = this; - this.getOpCollection(collectionName, function(err, opCollection) { + this.getOpCollection(collectionName, {}, function(err, opCollection) { if (err) return callback(err); var query = { src: op.src, @@ -685,7 +724,7 @@ function getOpsQuery(id, from, to) { } ShareDbMongo.prototype._getOps = function(collectionName, id, from, to, options, callback) { - this.getOpCollection(collectionName, function(err, opCollection) { + this.getOpCollection(collectionName, options, function(err, opCollection) { if (err) return callback(err); var query = getOpsQuery(id, from, to); // Exclude the `d` field, which is only for use internal to livedb-mongo. @@ -701,7 +740,7 @@ ShareDbMongo.prototype._getOps = function(collectionName, id, from, to, options, }; ShareDbMongo.prototype._getOpsBulk = function(collectionName, conditions, options, callback) { - this.getOpCollection(collectionName, function(err, opCollection) { + this.getOpCollection(collectionName, options, function(err, opCollection) { if (err) return callback(err); var query = {$or: conditions}; // Exclude the `m` field, which can be used to store metadata on ops for @@ -745,7 +784,7 @@ ShareDbMongo.prototype._getOpLink = function(collectionName, id, to, options, ca if (!this.getOpsWithoutStrictLinking) return this._getSnapshotOpLink(collectionName, id, options, callback); var db = this; - this.getOpCollection(collectionName, function(error, collection) { + this.getOpCollection(collectionName, options, function(error, collection) { if (error) return callback(error); // If to is null, we want the most recent version, so just return the @@ -816,7 +855,7 @@ function closeCursor(cursor, callback, error, returnValue) { ShareDbMongo.prototype._getSnapshotOpLink = function(collectionName, id, options, callback) { var self = this; - this.getCollection(collectionName, function(err, collection) { + this.getCollection(collectionName, options, function(err, collection) { if (err) return callback(err); var query = {_id: id}; var projection = {_id: 0, _o: 1, _v: 1}; @@ -835,7 +874,7 @@ ShareDbMongo.prototype._getSnapshotOpLink = function(collectionName, id, options ShareDbMongo.prototype._getSnapshotOpLinkBulk = function(collectionName, ids, options, callback) { var self = this; - this.getCollection(collectionName, function(err, collection) { + this.getCollection(collectionName, options, function(err, collection) { if (err) return callback(err); var query = {_id: {$in: ids}}; var projection = {_o: 1, _v: 1}; @@ -917,7 +956,7 @@ ShareDbMongo.prototype._query = function(collection, inputQuery, projection, cal ShareDbMongo.prototype.query = function(collectionName, inputQuery, fields, options, callback) { var self = this; - this.getCollection(collectionName, function(err, collection) { + this.getCollection(collectionName, options, function(err, collection) { if (err) return callback(err); var projection = getProjection(fields, options); self._query(collection, inputQuery, projection, function(err, results, extra) { From ca8d879b387355bfa114b740db14dafb3bbe9a73 Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Mon, 20 Jan 2025 15:49:54 +0000 Subject: [PATCH 3/4] Revert "promisify" This reverts commit 62915a5114bdfb17931a72d69ad4dff4507a51f6. --- index.js | 177 ++++++++++++++++++++++--------------------------------- 1 file changed, 69 insertions(+), 108 deletions(-) diff --git a/index.js b/index.js index c0bb386..18e2907 100644 --- a/index.js +++ b/index.js @@ -75,7 +75,6 @@ function ShareDbMongo(mongo, options) { this._middleware = new MiddlewareHandler(); - this._transactionConnections = Object.create(null); this._sessions = Object.create(null); this._transactionOpLinks = Object.create(null); }; @@ -84,12 +83,12 @@ ShareDbMongo.prototype = Object.create(DB.prototype); ShareDbMongo.prototype.projectsSnapshots = true; -ShareDbMongo.prototype.getCollection = function(collectionName, options, callback) { +ShareDbMongo.prototype.getCollection = function(collectionName, callback) { // Check the collection name var err = this.validateCollectionName(collectionName); if (err) return callback(err); // Gotcha: calls back sync if connected or async if not - this.getDbs(options, function(err, mongo) { + this.getDbs(function(err, mongo) { if (err) return callback(err); var collection = mongo.collection(collectionName); return callback(null, collection); @@ -101,7 +100,7 @@ ShareDbMongo.prototype._getCollectionPoll = function(collectionName, callback) { var err = this.validateCollectionName(collectionName); if (err) return callback(err); // Gotcha: calls back sync if connected or async if not - this.getDbs({}, function(err, mongo, mongoPoll) { + this.getDbs(function(err, mongo, mongoPoll) { if (err) return callback(err); var collection = (mongoPoll || mongo).collection(collectionName); return callback(null, collection); @@ -119,15 +118,7 @@ ShareDbMongo.prototype.getCollectionPoll = function(collectionName, callback) { this._getCollectionPoll(collectionName, callback); }; -ShareDbMongo.prototype.getDbs = function(options, callback) { - if (typeof options === 'function') { - callback = options; - options = null; - } - options = options || {}; - - if (options.transaction) return this._getTransactionDbs(options.transaction, callback); - +ShareDbMongo.prototype.getDbs = function(callback) { if (this.closed) { var err = ShareDbMongo.alreadyClosedError(); return callback(err); @@ -138,23 +129,6 @@ ShareDbMongo.prototype.getDbs = function(options, callback) { }, callback); }; -ShareDbMongo.prototype._getTransactionDbs = function(transactionId, callback) { - return this._getTransactionClient(transactionId) - .then(function(client) { - callback(null, client.db()); - }, callback); -}; - -ShareDbMongo.prototype._getTransactionClient = function(transactionId) { - if (!this._transactionConnections[transactionId]) { - // TODO: this bypasses the creation function, which probably isn't desirable - var client = new mongodb.MongoClient(this._mongoClient.s.url); - this._transactionConnections[transactionId] = client.connect(); - } - - return this._transactionConnections[transactionId]; -}; - ShareDbMongo.prototype._connect = function(mongo, options) { // Create the mongo connection client connections if needed // @@ -213,7 +187,7 @@ ShareDbMongo.prototype.close = function(callback) { }; } var self = this; - this.getDbs({}, function(err) { + this.getDbs(function(err) { // Ignore "already closed" if (err && err.code === 5101) return callback(); if (err) return callback(err); @@ -232,18 +206,14 @@ ShareDbMongo.prototype.close = function(callback) { // **** Commit methods ShareDbMongo.prototype.commit = function(collectionName, id, op, snapshot, options, callback) { - console.log('commit op', op, snapshot); options = options || {}; var self = this; var request = createRequestForMiddleware(options, collectionName, op); // TODO: Wrap callback to abort transaction on error and tidy up session in .finally() this._writeOp(collectionName, id, op, snapshot, options, function(err, result) { - console.log('op written', err, op); if (err) return callback(err); var opId = result.insertedId; - console.log('write snapshot...', snapshot); self._writeSnapshot(request, id, snapshot, opId, options, function(err, succeeded) { - console.log('snapshot written', err, succeeded); if (succeeded) return callback(err, succeeded); // Cleanup unsuccessful op if snapshot write failed. This is not // necessary for data correctness, but it gets rid of clutter @@ -256,30 +226,27 @@ ShareDbMongo.prototype.commit = function(collectionName, id, op, snapshot, optio ShareDbMongo.prototype.commitTransaction = function(transactionId, callback) { if (!this._sessions[transactionId]) return callback(); - this._sessions[transactionId].then(function(session) { - return session.commitTransaction(); - }, callback); + this._sessions[transactionId].commitTransaction() + .then(function() { + callback(); + }, callback); }; ShareDbMongo.prototype.abortTransaction = function(transactionId, callback) { if (!this._sessions[transactionId]) return callback(); - this._sessions[transactionId].then(function(session) { - return session.abortTransaction(); - }, callback); + this._sessions[transactionId].abortTransaction() + .then(function() { + callback(); + }, callback); }; ShareDbMongo.prototype._ensureTransactionStarted = function(transactionId) { - if (typeof transactionId !== 'string') return Promise.resolve(); - + if (typeof transactionId !== 'string') return null; if (!this._sessions[transactionId]) { + this._sessions[transactionId] = this._mongoClient.startSession(); + this._sessions[transactionId].startTransaction(); this._transactionOpLinks[transactionId] = []; - this._sessions[transactionId] = this._getTransactionClient(transactionId).then(function(client) { - var session = client.startSession(); - session.startTransaction(); - return session; - }); } - return this._sessions[transactionId]; }; @@ -306,27 +273,25 @@ ShareDbMongo.prototype._writeOp = function(collectionName, id, op, snapshot, opt return callback(err); } var self = this; - this.getOpCollection(collectionName, options, function(err, opCollection) { + this.getOpCollection(collectionName, function(err, opCollection) { if (err) return callback(err); var doc = shallowClone(op); doc.d = id; var transactionLinks = self._transactionOpLinks[options.transaction] || []; doc.o = transactionLinks[op.v - 1] || snapshot._opLink; - // TODO: Rewrite as async/await (supported since Node.js v7.6.0) - self._ensureTransactionStarted(options.transaction).then(function(session) { - opCollection.insertOne(doc, {session: session}) - .then(function(result) { - if (options.transaction) { - self._transactionOpLinks[options.transaction][op.v] = result.insertedId; - } - callback(null, result); - }, callback); - }, callback); + var session = self._ensureTransactionStarted(options.transaction); + opCollection.insertOne(doc, {session: session}) + .then(function(result) { + if (options.transaction) { + self._transactionOpLinks[options.transaction][op.v] = result.insertedId; + } + callback(null, result); + }, callback); }); }; ShareDbMongo.prototype._deleteOp = function(collectionName, opId, callback) { - this.getOpCollection(collectionName, {}, function(err, opCollection) { + this.getOpCollection(collectionName, function(err, opCollection) { if (err) return callback(err); opCollection.deleteOne({_id: opId}) .then(function(result) { @@ -337,47 +302,43 @@ ShareDbMongo.prototype._deleteOp = function(collectionName, opId, callback) { ShareDbMongo.prototype._writeSnapshot = function(request, id, snapshot, opId, options, callback) { var self = this; - this.getCollection(request.collectionName, options, function(err, collection) { + this.getCollection(request.collectionName, function(err, collection) { if (err) return callback(err); request.documentToWrite = castToDoc(id, snapshot, opId); - self._ensureTransactionStarted(options.transaction).then(function(session) { - console.log('session?', !!session); - if (request.documentToWrite._v === 1) { - self._middleware.trigger(MiddlewareHandler.Actions.beforeCreate, request, function(middlewareErr) { - if (middlewareErr) { - return callback(middlewareErr); - } - collection.insertOne(request.documentToWrite, {session: session}) - .then( - function() { - callback(null, true); - }, - function(err) { + var session = self._ensureTransactionStarted(options.transaction); + if (request.documentToWrite._v === 1) { + self._middleware.trigger(MiddlewareHandler.Actions.beforeCreate, request, function(middlewareErr) { + if (middlewareErr) { + return callback(middlewareErr); + } + collection.insertOne(request.documentToWrite, {session: session}) + .then( + function() { + callback(null, true); + }, + function(err) { // Return non-success instead of duplicate key error, since this is // expected to occur during simultaneous creates on the same id - if (err.code === 11000 && /\b_id_\b/.test(err.message)) { - return callback(null, false); - } - return callback(err); + if (err.code === 11000 && /\b_id_\b/.test(err.message)) { + return callback(null, false); } - ); - }); - } else { - request.query = {_id: id, _v: request.documentToWrite._v - 1}; - console.log('query', request.query); - console.log('to write', request.documentToWrite); - self._middleware.trigger(MiddlewareHandler.Actions.beforeOverwrite, request, function(middlewareErr) { - if (middlewareErr) { - return callback(middlewareErr); - } - collection.replaceOne(request.query, request.documentToWrite, {session: session}) - .then(function(result) { - var succeeded = !!result.modifiedCount; - callback(null, succeeded); - }, callback); - }); - } - }, callback); + return callback(err); + } + ); + }); + } else { + request.query = {_id: id, _v: request.documentToWrite._v - 1}; + self._middleware.trigger(MiddlewareHandler.Actions.beforeOverwrite, request, function(middlewareErr) { + if (middlewareErr) { + return callback(middlewareErr); + } + collection.replaceOne(request.query, request.documentToWrite, {session: session}) + .then(function(result) { + var succeeded = !!result.modifiedCount; + callback(null, succeeded); + }, callback); + }); + } }); }; @@ -386,7 +347,7 @@ ShareDbMongo.prototype._writeSnapshot = function(request, id, snapshot, opId, op ShareDbMongo.prototype.getSnapshot = function(collectionName, id, fields, options, callback) { var self = this; - this.getCollection(collectionName, options, function(err, collection) { + this.getCollection(collectionName, function(err, collection) { if (err) return callback(err); var query = {_id: id}; var projection = getProjection(fields, options); @@ -406,7 +367,7 @@ ShareDbMongo.prototype.getSnapshot = function(collectionName, id, fields, option ShareDbMongo.prototype.getSnapshotBulk = function(collectionName, ids, fields, options, callback) { var self = this; - this.getCollection(collectionName, options, function(err, collection) { + this.getCollection(collectionName, function(err, collection) { if (err) return callback(err); var query = {_id: {$in: ids}}; var projection = getProjection(fields, options); @@ -453,9 +414,9 @@ ShareDbMongo.prototype.validateCollectionName = function(collectionName) { }; // Get and return the op collection from mongo, ensuring it has the op index. -ShareDbMongo.prototype.getOpCollection = function(collectionName, options, callback) { +ShareDbMongo.prototype.getOpCollection = function(collectionName, callback) { var self = this; - this.getDbs(options, function(err, mongo) { + this.getDbs(function(err, mongo) { if (err) return callback(err); var name = self.getOplogCollectionName(collectionName); var collection = mongo.collection(name); @@ -587,7 +548,7 @@ ShareDbMongo.prototype.getOpsBulk = function(collectionName, fromMap, toMap, opt ShareDbMongo.prototype.getCommittedOpVersion = function(collectionName, id, snapshot, op, options, callback) { var self = this; - this.getOpCollection(collectionName, {}, function(err, opCollection) { + this.getOpCollection(collectionName, function(err, opCollection) { if (err) return callback(err); var query = { src: op.src, @@ -724,7 +685,7 @@ function getOpsQuery(id, from, to) { } ShareDbMongo.prototype._getOps = function(collectionName, id, from, to, options, callback) { - this.getOpCollection(collectionName, options, function(err, opCollection) { + this.getOpCollection(collectionName, function(err, opCollection) { if (err) return callback(err); var query = getOpsQuery(id, from, to); // Exclude the `d` field, which is only for use internal to livedb-mongo. @@ -740,7 +701,7 @@ ShareDbMongo.prototype._getOps = function(collectionName, id, from, to, options, }; ShareDbMongo.prototype._getOpsBulk = function(collectionName, conditions, options, callback) { - this.getOpCollection(collectionName, options, function(err, opCollection) { + this.getOpCollection(collectionName, function(err, opCollection) { if (err) return callback(err); var query = {$or: conditions}; // Exclude the `m` field, which can be used to store metadata on ops for @@ -784,7 +745,7 @@ ShareDbMongo.prototype._getOpLink = function(collectionName, id, to, options, ca if (!this.getOpsWithoutStrictLinking) return this._getSnapshotOpLink(collectionName, id, options, callback); var db = this; - this.getOpCollection(collectionName, options, function(error, collection) { + this.getOpCollection(collectionName, function(error, collection) { if (error) return callback(error); // If to is null, we want the most recent version, so just return the @@ -855,7 +816,7 @@ function closeCursor(cursor, callback, error, returnValue) { ShareDbMongo.prototype._getSnapshotOpLink = function(collectionName, id, options, callback) { var self = this; - this.getCollection(collectionName, options, function(err, collection) { + this.getCollection(collectionName, function(err, collection) { if (err) return callback(err); var query = {_id: id}; var projection = {_id: 0, _o: 1, _v: 1}; @@ -874,7 +835,7 @@ ShareDbMongo.prototype._getSnapshotOpLink = function(collectionName, id, options ShareDbMongo.prototype._getSnapshotOpLinkBulk = function(collectionName, ids, options, callback) { var self = this; - this.getCollection(collectionName, options, function(err, collection) { + this.getCollection(collectionName, function(err, collection) { if (err) return callback(err); var query = {_id: {$in: ids}}; var projection = {_o: 1, _v: 1}; @@ -956,7 +917,7 @@ ShareDbMongo.prototype._query = function(collection, inputQuery, projection, cal ShareDbMongo.prototype.query = function(collectionName, inputQuery, fields, options, callback) { var self = this; - this.getCollection(collectionName, options, function(err, collection) { + this.getCollection(collectionName, function(err, collection) { if (err) return callback(err); var projection = getProjection(fields, options); self._query(collection, inputQuery, projection, function(err, results, extra) { From b4b950894fe8a6057ed1e0f7c61ca7adc8185df8 Mon Sep 17 00:00:00 2001 From: Alec Gibson <12036746+alecgibson@users.noreply.github.com> Date: Mon, 20 Jan 2025 16:58:41 +0000 Subject: [PATCH 4/4] handle deadlock --- index.js | 125 ++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 109 insertions(+), 16 deletions(-) diff --git a/index.js b/index.js index 18e2907..e8ddf56 100644 --- a/index.js +++ b/index.js @@ -77,6 +77,7 @@ function ShareDbMongo(mongo, options) { this._sessions = Object.create(null); this._transactionOpLinks = Object.create(null); + this._lockedCollections = Object.create(null); }; ShareDbMongo.prototype = Object.create(DB.prototype); @@ -209,7 +210,47 @@ ShareDbMongo.prototype.commit = function(collectionName, id, op, snapshot, optio options = options || {}; var self = this; var request = createRequestForMiddleware(options, collectionName, op); - // TODO: Wrap callback to abort transaction on error and tidy up session in .finally() + + var cb = callback; + callback = function(error, succeeded) { + if (error && error.code === ShareDbMongo.sessionNotStartedError().code) { + // Session starting is handled automatically by ShareDB, so if a session hasn't + // started, it actually means that a session was closed early, possibly to avoid + // a transaction deadlock. In this case, swallow the error and tell ShareDB that + // the commit hasn't succeeded, which will trigger a retry (including restarting + // the transaction) + error = null; + succeeded = false; + } + cb(error, succeeded); + }; + + // MongoDB locks collections when writing transactions. This can lead to deadlock if + // we have two different transactions attempting to update the same collection. + // Here, we manually keep track of the collections involved in transactions, so that + // we can avoid this deadlock. + var isLocked = this._lockedCollections[collectionName] && + this._lockedCollections[collectionName] !== options.transaction; + + if (isLocked) { + // In order to avoid deadlock, let's impose a priority system: + // - commits without transactions take highest priority, since they're most likely to succeed + // - after that, determine priority on alphabetical order of Transaction ID + var hasLockPriority = !options.transaction || options.transactionId < this._lockedCollections[collectionName]; + // If we don't have priority, return a failed write to allow a retry, which should hopefully + // allow the other transaction to complete + if (!hasLockPriority) return callback(null, false); + + // If we do have lock priority, we should abort the other transaction to unblock our + // current transaction + // TODO: Wait for this? + this.abortTransaction(this._lockedCollections[collectionName], function(error) { + // TODO: Handle errors + }); + } + + if (options.transaction) this._lockedCollections[collectionName] = options.transaction; + this._writeOp(collectionName, id, op, snapshot, options, function(err, result) { if (err) return callback(err); var opId = result.insertedId; @@ -224,30 +265,70 @@ ShareDbMongo.prototype.commit = function(collectionName, id, op, snapshot, optio }); }; +ShareDbMongo.prototype.startTransaction = function(transactionId, callback) { + if (typeof transactionId !== 'string') throw new Error('Invalid Transaction ID'); + + if (this._sessions[transactionId]) { + // TODO: Proper error code + return callback(new Error('Transaction already in progress')); + } + + this._transactionOpLinks[transactionId] = []; + var session = this._sessions[transactionId] = this._mongoClient.startSession(); + session.startTransaction(); + + var self = this; + session.once('ended', function() { + self._cleanUpTransaction(transactionId); + }); + + callback(); +}; + +ShareDbMongo.prototype.restartTransaction = function(transactionId, callback) { + this._cleanUpTransaction(transactionId); + this.startTransaction(transactionId, callback); +}; + ShareDbMongo.prototype.commitTransaction = function(transactionId, callback) { - if (!this._sessions[transactionId]) return callback(); + var self = this; + var cb = function(error) { + self._cleanUpTransaction(transactionId); + callback(error); + }; + + if (!this._sessions[transactionId]) return cb(); this._sessions[transactionId].commitTransaction() .then(function() { - callback(); - }, callback); + cb(); + }, cb); }; ShareDbMongo.prototype.abortTransaction = function(transactionId, callback) { - if (!this._sessions[transactionId]) return callback(); + var self = this; + var cb = function(error) { + self._cleanUpTransaction(transactionId); + callback(error); + }; + + if (!this._sessions[transactionId]) return cb(); this._sessions[transactionId].abortTransaction() .then(function() { - callback(); - }, callback); + cb(); + }, cb); }; -ShareDbMongo.prototype._ensureTransactionStarted = function(transactionId) { - if (typeof transactionId !== 'string') return null; - if (!this._sessions[transactionId]) { - this._sessions[transactionId] = this._mongoClient.startSession(); - this._sessions[transactionId].startTransaction(); - this._transactionOpLinks[transactionId] = []; +ShareDbMongo.prototype._cleanUpTransaction = function(transactionId) { + var session = this._sessions[transactionId]; + if (session) session.endSession(); + delete this._sessions[transactionId]; + delete this._transactionOpLinks[transactionId]; + // TODO: Improve performance? + for (var collection in this._lockedCollections) { + if (this._lockedCollections[collection] === transactionId) { + delete this._lockedCollections[collection]; + } } - return this._sessions[transactionId]; }; function createRequestForMiddleware(options, collectionName, op, fields) { @@ -279,7 +360,11 @@ ShareDbMongo.prototype._writeOp = function(collectionName, id, op, snapshot, opt doc.d = id; var transactionLinks = self._transactionOpLinks[options.transaction] || []; doc.o = transactionLinks[op.v - 1] || snapshot._opLink; - var session = self._ensureTransactionStarted(options.transaction); + var session; + if (options.transaction) { + session = self._sessions[options.transaction]; + if (!session) return callback(ShareDbMongo.sessionNotStartedError()); + } opCollection.insertOne(doc, {session: session}) .then(function(result) { if (options.transaction) { @@ -305,7 +390,11 @@ ShareDbMongo.prototype._writeSnapshot = function(request, id, snapshot, opId, op this.getCollection(request.collectionName, function(err, collection) { if (err) return callback(err); request.documentToWrite = castToDoc(id, snapshot, opId); - var session = self._ensureTransactionStarted(options.transaction); + var session; + if (options.transaction) { + session = self._sessions[options.transaction]; + if (!session) return callback(ShareDbMongo.sessionNotStartedError()); + } if (request.documentToWrite._v === 1) { self._middleware.trigger(MiddlewareHandler.Actions.beforeCreate, request, function(middlewareErr) { if (middlewareErr) { @@ -1706,6 +1795,10 @@ ShareDbMongo.parseQueryError = function(err) { err.code = 5104; return err; }; +ShareDbMongo.sessionNotStartedError = function() { + // TODO: Proper code + return {code: 5105, message: 'Session not started'}; +}; // Middleware