Skip to content

Commit

Permalink
Merge pull request #598 from strongloop/feature/add-connector-hooks
Browse files Browse the repository at this point in the history
Mix in observer apis to the connector
  • Loading branch information
raymondfeng committed May 21, 2015
2 parents 5af6bf5 + 5062238 commit 12dea6e
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 24 deletions.
3 changes: 3 additions & 0 deletions lib/datasource.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
var ModelBuilder = require('./model-builder.js').ModelBuilder;
var ModelDefinition = require('./model-definition.js');
var RelationDefinition = require('./relation-definition.js');
var OberserverMixin = require('./observer');
var jutil = require('./jutil');
var utils = require('./utils');
var ModelBaseClass = require('./model.js');
Expand Down Expand Up @@ -182,6 +183,8 @@ DataSource.prototype._setupConnector = function () {
log(q || query, t1);
};
};
// Configure the connector instance to mix in observer functions
jutil.mixin(this.connector, OberserverMixin);
}
};

Expand Down
52 changes: 50 additions & 2 deletions lib/observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,59 @@ ObserverMixin.notifyObserversOf = function(operation, context, callback) {
);
});
return callback.promise;
}
};

ObserverMixin._notifyBaseObservers = function(operation, context, callback) {
if (this.base && this.base.notifyObserversOf)
this.base.notifyObserversOf(operation, context, callback);
else
callback();
}
};

/**
* Run the given function with before/after observers. It's done in three serial
* steps asynchronously:
*
* - Notify the registered observers under 'before ' + operation
* - Execute the function
* - Notify the registered observers under 'after ' + operation
*
* If an error happens, it fails fast and calls the callback with err.
*
* @param {String} operation The operation name
* @param {Context} context The context object
* @param {Function} fn The task to be invoked as fn(done) or fn(context, done)
* @param {Function} callback The callback function
* @returns {*}
*/
ObserverMixin.notifyObserversAround = function(operation, context, fn, callback) {
var self = this;
return self.notifyObserversOf('before ' + operation, context,
function(err, context) {
if (err) return callback(err, context);

function cbForWork(err) {
if (err) return callback(err, context);
var returnedArgs = [].slice.call(arguments, 1);
context.results = returnedArgs;
self.notifyObserversOf('after ' + operation, context,
function(err, context) {
if (err) return callback(err, context);
var results = returnedArgs;
if (context) {
results = context.results;
}
var args = [err].concat(results);
callback.apply(null, args);
});
}

if (fn.length === 1) {
// fn(done)
fn(cbForWork);
} else {
// fn(context, done)
fn(context, cbForWork);
}
});
};
44 changes: 22 additions & 22 deletions lib/transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,18 @@ if (Transaction) {
transaction: self,
operation: 'commit'
};
self.notifyObserversOf('before commit', context, function(err) {
if (err) return cb(err);
self.connector.commit(self.connection, function(err) {
if (err) return cb(err);
self.notifyObserversOf('after commit', context, function(err) {
// Deference the connection to mark the transaction is not active
// The connection should have been released back the pool
self.connection = null;
cb(err);
});
});

function work(done) {
self.connector.commit(self.connection, done);
}

self.notifyObserversAround('commit', context, work, function(err) {
// Deference the connection to mark the transaction is not active
// The connection should have been released back the pool
self.connection = null;
cb(err);
});

return cb.promise;
};

Expand All @@ -152,18 +152,18 @@ if (Transaction) {
transaction: self,
operation: 'rollback'
};
self.notifyObserversOf('before rollback', context, function(err) {
if (err) return cb(err);
self.connector.rollback(self.connection, function(err) {
if (err) return cb(err);
self.notifyObserversOf('after rollback', context, function(err) {
// Deference the connection to mark the transaction is not active
// The connection should have been released back the pool
self.connection = null;
cb(err);
});
});

function work(done) {
self.connector.rollback(self.connection, done);
}

self.notifyObserversAround('rollback', context, work, function(err) {
// Deference the connection to mark the transaction is not active
// The connection should have been released back the pool
self.connection = null;
cb(err);
});

return cb.promise;
};

Expand Down
64 changes: 64 additions & 0 deletions test/async-observer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,70 @@ describe('async observer', function() {
});
});

describe('notifyObserversAround', function() {
var notifications;
beforeEach(function() {
notifications = [];
TestModel.observe('before execute',
pushAndNext(notifications, 'before execute'));
TestModel.observe('after execute',
pushAndNext(notifications, 'after execute'));
});

it('should notify before/after observers', function(done) {
var context = {};

function work(done) {
process.nextTick(function() {
done(null, 1);
});
}

TestModel.notifyObserversAround('execute', context, work,
function(err, result) {
notifications.should.eql(['before execute', 'after execute']);
result.should.eql(1);
done();
});
});

it('should allow work with context', function(done) {
var context = {};

function work(context, done) {
process.nextTick(function() {
done(null, 1);
});
}

TestModel.notifyObserversAround('execute', context, work,
function(err, result) {
notifications.should.eql(['before execute', 'after execute']);
result.should.eql(1);
done();
});
});

it('should notify before/after observers with multiple results',
function(done) {
var context = {};

function work(done) {
process.nextTick(function() {
done(null, 1, 2);
});
}

TestModel.notifyObserversAround('execute', context, work,
function(err, r1, r2) {
r1.should.eql(1);
r2.should.eql(2);
notifications.should.eql(['before execute', 'after execute']);
done();
});
});
});

it('resolves promises returned by observers', function(done) {
TestModel.observe('event', function(ctx) {
return Promise.resolve('value-to-ignore');
Expand Down
44 changes: 44 additions & 0 deletions test/memory.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -584,4 +584,48 @@ describe('Memory connector with options', function() {

});

describe('Memory connector with observers', function() {
var ds = new DataSource({
connector: 'memory'
});

it('should have observer mixed into the connector', function() {
ds.connector.observe.should.be.a.function;
ds.connector.notifyObserversOf.should.be.a.function;
});

it('should notify observers', function(done) {
var events = [];
ds.connector.execute = function(command, params, options, cb) {
var self = this;
var context = {command: command, params: params, options: options};
self.notifyObserversOf('before execute', context, function(err) {
process.nextTick(function() {
if (err) return cb(err);
events.push('execute');
self.notifyObserversOf('after execute', context, function(err) {
cb(err);
});
});
});
};

ds.connector.observe('before execute', function(context, next) {
events.push('before execute');
next();
});

ds.connector.observe('after execute', function(context, next) {
events.push('after execute');
next();
});

ds.connector.execute('test', [1, 2], {x: 2}, function(err) {
if (err) return done(err);
events.should.eql(['before execute', 'execute', 'after execute']);
done();
});
});
});


0 comments on commit 12dea6e

Please sign in to comment.