Skip to content

Commit

Permalink
Redis does not convert empty array into objects when decoding JSON an…
Browse files Browse the repository at this point in the history
…ymore

Fixes thenativeweb#96
  • Loading branch information
rehia committed Jan 4, 2017
1 parent eca3b22 commit 9f08128
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 91 deletions.
72 changes: 25 additions & 47 deletions lib/databases/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,44 +120,16 @@ _.extend(Redis.prototype, {
self.client.send_anyways = false;
}

self.deployLuaScripts(function (error) {
if (error) {
debug(error);
callback(error, self);
}

self.emit('connect');

if (self.options.heartbeat) {
self.startHeartbeat();
}

if (calledBack) return;
calledBack = true;
if (callback) callback(null, self);
});
self.emit('connect');

});
},

deployLuaScripts: function (callback) {
var self = this;

fs.readFile(__dirname + '/redis/commit.lua', {encoding: 'utf8'}, function (error, script) {
if (error) {
debug(error);
return callback(error);
if (self.options.heartbeat) {
self.startHeartbeat();
}

self.client.script('load', script, function (error, sha1) {
if (error) {
debug(error);
return callback(error);
}
self.commitScript = sha1;
callback();
});
})
if (calledBack) return;
calledBack = true;
if (callback) callback(null, self);
});
},

stopHeartbeat: function () {
Expand Down Expand Up @@ -260,36 +232,42 @@ _.extend(Redis.prototype, {
return event.commitStamp.getTime() + ':' + event.commitSequence.toString() + ':' + context + ':' + aggregate + ':' + aggregateId + ':' + event.id;
}

var multi = events.reduce(function(multi, event) {
var prefix = self.options.prefix + ':' + self.options.eventsCollectionName;
var key = prefix + ':' + eventKey(event);
var revisionKey = prefix + ':' + context + ':' + aggregate + ':' + aggregateId;

return multi.evalsha(self.commitScript, 3, key, JSON.stringify(event), revisionKey);
var prefix = self.options.prefix + ':' + self.options.eventsCollectionName;
var revisionKey = prefix + ':' + context + ':' + aggregate + ':' + aggregateId + ':revision';
var multi = events.reduce(function (multi) {
return multi.incr(revisionKey);
}, this.client.multi());

multi.exec(function (error, replies) {
multi.exec(function (error, revisions) {
if (error) {
debug(error);
return callback(error);
}

var errors = replies.filter(function (reply) {
return reply instanceof Error
var errors = revisions.filter(function (reply) {
return reply instanceof Error;
});

if (errors.length) {
var message = 'error while adding events for aggregate ' + aggregate + ' ' + aggregateId;
return callback(new Error(message + '\n' + errors.join('\n')));
}

var undispKeysEvtMap = events.map(function (event, index) {
event.streamRevision = parseInt(replies[index], 10); // mutates events passed as parameter
var savedKeysAndEvents = events.map(function(event, index) {
var key = prefix + ':' + eventKey(event);
event.streamRevision = parseInt(revisions[index], 10) - 1;
event.applyMappings();
return [key, JSON.stringify(event)];
});

var undispatchedKeysAndEvents = events.map(function (event) {
var key = self.options.prefix + ':undispatched_' + self.options.eventsCollectionName + ':' + eventKey(event);
return [key, JSON.stringify(event)];
});

var args = _.flatten(undispKeysEvtMap).concat(callback);
var args = _.flatten(savedKeysAndEvents)
.concat(_.flatten(undispatchedKeysAndEvents))
.concat(callback);
self.client.mset.apply(self.client, args);
});
},
Expand Down
11 changes: 0 additions & 11 deletions lib/databases/redis/commit.lua

This file was deleted.

13 changes: 12 additions & 1 deletion lib/event.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

var debug = require('debug')('eventstore:event'),
dotty = require('dotty'),
_ = require('lodash');

/**
Expand All @@ -9,7 +10,7 @@ var debug = require('debug')('eventstore:event'),
* @param {Object} event the event object
* @constructor
*/
function Event (eventstream, event) {
function Event (eventstream, event, eventMappings) {
if (!eventstream) {
var errStreamMsg = 'eventstream not injected!';
debug(errStreamMsg);
Expand All @@ -34,6 +35,8 @@ function Event (eventstream, event) {
throw new Error(errAggIdMsg);
}

eventMappings = eventMappings || {};

this.streamId = eventstream.aggregateId;
this.aggregateId = eventstream.aggregateId;
this.aggregate = eventstream.aggregate;
Expand All @@ -44,6 +47,14 @@ function Event (eventstream, event) {
this.commitStamp = null;
this.payload = event || null;

this.applyMappings = function applyMappings() {
_.keys(eventMappings).forEach(function (key) {
if (this[key] !== undefined && this[key] !== null) {
dotty.put(this.payload, eventMappings[key], this[key]);
}
}.bind(this));
};

eventstream.uncommittedEvents.push(this);
}

Expand Down
2 changes: 1 addition & 1 deletion lib/eventStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ EventStream.prototype = {
* @param {Object} event
*/
addEvent: function(event) {
new Event(this, event);
new Event(this, event, this.eventstore.eventMappings);
},

/**
Expand Down
7 changes: 1 addition & 6 deletions lib/eventstore.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ var debug = require('debug')('eventstore'),
util = require('util'),
EventEmitter = require('events').EventEmitter,
_ = require('lodash'),
dotty = require('dotty'),
async = require('async'),
tolerate = require('tolerance'),
EventDispatcher = require('./eventDispatcher'),
Expand Down Expand Up @@ -412,11 +411,7 @@ _.extend(Eventstore.prototype, {
currentRevision++;
event.streamRevision = currentRevision;

_.each(_.keys(self.eventMappings), function (key) {
if (event[key] !== undefined && event[key] !== null) {
dotty.put(event.payload, self.eventMappings[key], event[key]);
}
});
event.applyMappings();
}

self.store.addEvents(uncommittedEvents, function(err) {
Expand Down
Loading

0 comments on commit 9f08128

Please sign in to comment.