Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix duplicate events in dynamic services. #115

Merged
merged 5 commits into from
Feb 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions lib/application.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,8 @@ module.exports = {
provider(location, protoService, options || {});
});

// If already _setup, just add this single service.
if (this._setup) {
// If we're using a socket provider, register the service on it.
if (this.addService) {
this.addService(protoService, location);
}
// If we ran setup already, set this service up explicitly
if (this._isSetup) {
protoService.setup(this, location);
}

Expand All @@ -62,14 +58,14 @@ module.exports = {
var args = _.toArray(arguments);
var location = args.shift();
var service = args.pop();
var hasMethod = function() {
return _.some(arguments, function(name) {
var hasMethod = function(methods) {
return _.some(methods, function(name) {
return (service && typeof service[name] === 'function');
});
};

// Check for service (any object with at least one service method)
if(hasMethod('handle', 'set') || !hasMethod.apply(null, this.methods)) {
if(hasMethod(['handle', 'set']) || !hasMethod(this.methods)) {
return this._super.apply(this, arguments);
}

Expand All @@ -89,7 +85,7 @@ module.exports = {
}
}.bind(this));

this._setup = true;
this._isSetup = true;

return this;
},
Expand Down
6 changes: 2 additions & 4 deletions lib/mixins/event.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ var eventMappings = {
*/
var EventMixin = {
setup: function () {
this.applyEvents();
return this._super ? this._super.apply(this, arguments) : this;
},
applyEvents:function(){
var emitter = this._rubberDuck = rubberduck.emitter(this);
var self = this;

Expand Down Expand Up @@ -49,6 +45,8 @@ var EventMixin = {
});
}
});

return this._super ? this._super.apply(this, arguments) : this;
}
};

Expand Down
130 changes: 79 additions & 51 deletions lib/providers/socket/commons.js
Original file line number Diff line number Diff line change
@@ -1,77 +1,105 @@
'use strict';

var _ = require('lodash');
var eventsMixin = require('../../mixins/event').Mixin;

// The position of the params parameters for a service method so that we can extend them
// default is 1
var paramsPositions = {
exports.paramsPositions = {
find: 0,
update: 2,
patch: 2
};

exports.addService = function addService(service, path){
// Add handlers for the service to connected sockets.
_.each(this.info.connections, function (spark) {
this.setupMethodHandlers(service, path, spark);
}, this);
// The default event dispatcher
exports.defaultDispatcher = function (data, params, callback) {
callback(null, data);
};

// Set up event handlers for a given service using the event dispatching mechanism
exports.setupEventHandlers = function (info, service, path) {
// If the service emits events that we want to listen to (Event mixin)
if (typeof service.on === 'function' && service._serviceEvents) {
var addEvent = function (ev) {
service.on(ev, function (data) {
// Check if there is a method on the service with the same name as the event
var dispatcher = typeof service[ev] === 'function' ?
service[ev] : exports.defaultDispatcher;
var eventName = path + ' ' + ev;

info.clients().forEach(function (socket) {
dispatcher(data, info.params(socket), function (error, dispatchData) {
if (error) {
socket[info.method]('error', error);
} else if (dispatchData) { // Only dispatch if we have data
socket[info.method](eventName, dispatchData);
}
});
});
});
};

// Setup events for the service.
eventsMixin.applyEvents.call(service);
exports.setupEventHandlers.call(this, service, path);
_.each(service._serviceEvents, addEvent);
}
};

// Set up the service method handler for a service and socket.
exports.setupMethodHandler = function setupMethodHandler(emitter, params, service, path, method) {
var name = path + '::' + method;
var position = typeof paramsPositions[method] !== 'undefined' ? paramsPositions[method] : 1;
// Set up all method handlers for a service and socket.
exports.setupMethodHandlers = function (info, socket, service, path) {
this.methods.forEach(function (method) {
if (typeof service[method] !== 'function') {
return;
}

var name = path + '::' + method;
var params = info.params(socket);
var position = typeof exports.paramsPositions[method] !== 'undefined' ?
exports.paramsPositions[method] : 1;

if (typeof service[method] === 'function') {
emitter.on(name, function () {
socket.on(name, function () {
var args = _.toArray(arguments);
// If the service is called with no parameter object
// insert an empty object
if(typeof args[position] === 'function') {
if (typeof args[position] === 'function') {
args.splice(position, 0, {});
}
args[position] = _.extend({ query: args[position] }, params);
args[position] = _.extend({query: args[position]}, params);
service[method].apply(service, args);
});
}
});
};

exports.setupEventHandlers = function setupEventHandlers(service, path){
// If the service emits events that we want to listen to (Event mixin)
if (typeof service.on === 'function' && service._serviceEvents) {
_.each(service._serviceEvents, function (ev) {
exports.setupEventHandler(this.info, service, path, ev);
}, this);
}
// Common setup functionality taking the info object which abstracts websocket access
exports.setup = function (info) {
var app = this;
var setupEventHandlers = exports.setupEventHandlers.bind(this, info);

app._commons = info;

// For a new connection, set up the service method handlers
info.connection().on('connection', function (socket) {
var setupMethodHandlers = exports.setupMethodHandlers.bind(app, info, socket);
// Process all registered services
_.each(app.services, setupMethodHandlers);
});

// Set up events and event dispatching
_.each(app.services, setupEventHandlers);
};

// Set up event handlers for a given service and connected sockets.
// Send it through the service dispatching mechanism (`removed(data, params, callback)`,
// `updated(data, params, callback)` and `created(data, params, callback)`) if it
// exists.
exports.setupEventHandler = function setupEventHandler (info, service, path, ev) {
var defaultDispatcher = function (data, params, callback) {
callback(null, data);
};

service.on(ev, function (data) {
// Check if there is a method on the service with the same name as the event
var dispatcher = typeof service[ev] === 'function' ? service[ev] : defaultDispatcher;
var eventName = path + ' ' + ev;

info.emitters().forEach(function (emitter) {
dispatcher(data, info.params(emitter), function (error, dispatchData) {
if (error) {
emitter[info.method]('error', error);
} else if (dispatchData) {
emitter[info.method](eventName, dispatchData);
}
});
// Socket mixin when a new service is registered
exports.service = function (path, service) {
var protoService = this._super.apply(this, arguments);
var info = this._commons;

// app._socketInfo will only be available once we are set up
if (service && info) {
var setupEventHandlers = exports.setupEventHandlers.bind(this, info);
var setupMethodHandlers = exports.setupMethodHandlers.bind(this, info);

// Set up event handlers for this new service
setupEventHandlers(protoService, path);
// For any existing connection add method handlers
info.clients().forEach(function (socket) {
setupMethodHandlers(socket, path, protoService);
});
});
}

return protoService;
};
40 changes: 11 additions & 29 deletions lib/providers/socket/primus.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use strict';

var _ = require('lodash');
var Proto = require('uberproto');
var Primus = require('primus');
var Emitter = require('primus-emitter');
Expand All @@ -14,54 +13,37 @@ module.exports = function(config, configurer) {

// Monkey patch app.setup(server)
Proto.mixin({
service: commons.service,

setup: function(server) {
var self = this;
var result = this._super.apply(this, arguments);

if (this.disabled('feathers primus')) {
return result;
}

var primus = this.primus = new Primus(server, config);
this.info = {
emitters: function() {

commons.setup.call(this, {
method: 'send',
connection: function() {
return primus;
},
clients: function() {
return primus;
},
params: function(spark) {
return spark.request.feathers;
},
method: 'send',
connections: this.primus.connections
};

primus.use('emitter', Emitter);

// For a new connection, set up the service method handlers
primus.on('connection', function (spark) {
// Process services that were registered at startup.
_.each(self.services, function (service, path) {
self.setupMethodHandlers.call(self, service, path, spark);
});
}
});

// Set up events and event dispatching
_.each(this.services, function (service, path) {
commons.setupEventHandlers.call(this, service, path);
}, this);
primus.use('emitter', Emitter);

if (typeof configurer === 'function') {
configurer.call(this, primus);
}

return result;
},

addService: commons.addService,

setupMethodHandlers: function(service, path, spark){
_.each(this.methods, function (method) {
commons.setupMethodHandler(spark, spark.request.feathers, service, path, method);
}, this);
}
}, app);
};
Expand Down
39 changes: 10 additions & 29 deletions lib/providers/socket/socketio.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use strict';

var _ = require('lodash');
var socketio = require('socket.io');
var Proto = require('uberproto');
var commons = require('./commons');
Expand All @@ -13,53 +12,35 @@ module.exports = function (config) {

// Monkey patch app.setup(server)
Proto.mixin({
service: commons.service,

setup: function (server) {
var self = this;
var result = this._super.apply(this, arguments);

if (this.disabled('feathers socketio')) {
return result;
}

var io = this.io = socketio.listen(server);
// The info object we can pass to commons.setupEventHandler
this.info = {
emitters: function() {

commons.setup.call(this, {
method: 'emit',
connection: function() {
return io.sockets;
},
clients: function() {
return io.sockets.sockets;
},
params: function(socket) {
return socket.feathers;
},
method: 'emit',
connections: this.connections = this.io.sockets.connected
};

// For a new connection, set up the service method handlers
io.sockets.on('connection', function (socket) {
// Process services that were registered at startup.
_.each(self.services, function (service, path) {
self.setupMethodHandlers.call(self, service, path, socket);
});
}
});

// Set up events and event dispatching
_.each(self.services, function (service, path) {
commons.setupEventHandlers.call(this, service, path);
}, this);

if (typeof config === 'function') {
config.call(this, io);
}

return result;
},

addService: commons.addService,

setupMethodHandlers: function(service, path, socket){
_.each(this.methods, function (method) {
commons.setupMethodHandler(socket, socket.feathers || {}, service, path, method);
}, this);
}
}, app);
};
Expand Down
Loading