From 9f081283b984db11ba2586da8777f8b0574d1af6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Avoustin?= Date: Wed, 4 Jan 2017 14:15:18 +0100 Subject: [PATCH] Redis does not convert empty array into objects when decoding JSON anymore Fixes #96 --- lib/databases/redis.js | 72 ++++++++-------------- lib/databases/redis/commit.lua | 11 ---- lib/event.js | 13 +++- lib/eventStream.js | 2 +- lib/eventstore.js | 7 +-- test/storeTest.js | 109 +++++++++++++++++++++++++-------- 6 files changed, 123 insertions(+), 91 deletions(-) delete mode 100644 lib/databases/redis/commit.lua diff --git a/lib/databases/redis.js b/lib/databases/redis.js index 10230df4..17e7c070 100644 --- a/lib/databases/redis.js +++ b/lib/databases/redis.js @@ -120,44 +120,16 @@ _.extend(Redis.prototype, { self.client.send_anyways = false; } - self.deployLuaScripts(function (error) { - if (error) { - debug(error); - callback(error, self); - } - - self.emit('connect'); - - if (self.options.heartbeat) { - self.startHeartbeat(); - } - - if (calledBack) return; - calledBack = true; - if (callback) callback(null, self); - }); + self.emit('connect'); - }); - }, - - deployLuaScripts: function (callback) { - var self = this; - - fs.readFile(__dirname + '/redis/commit.lua', {encoding: 'utf8'}, function (error, script) { - if (error) { - debug(error); - return callback(error); + if (self.options.heartbeat) { + self.startHeartbeat(); } - self.client.script('load', script, function (error, sha1) { - if (error) { - debug(error); - return callback(error); - } - self.commitScript = sha1; - callback(); - }); - }) + if (calledBack) return; + calledBack = true; + if (callback) callback(null, self); + }); }, stopHeartbeat: function () { @@ -260,22 +232,20 @@ _.extend(Redis.prototype, { return event.commitStamp.getTime() + ':' + event.commitSequence.toString() + ':' + context + ':' + aggregate + ':' + aggregateId + ':' + event.id; } - var multi = events.reduce(function(multi, event) { - var prefix = self.options.prefix + ':' + self.options.eventsCollectionName; - var key = prefix + ':' + eventKey(event); - var revisionKey = prefix + ':' + context + ':' + aggregate + ':' + aggregateId; - - return multi.evalsha(self.commitScript, 3, key, JSON.stringify(event), revisionKey); + var prefix = self.options.prefix + ':' + self.options.eventsCollectionName; + var revisionKey = prefix + ':' + context + ':' + aggregate + ':' + aggregateId + ':revision'; + var multi = events.reduce(function (multi) { + return multi.incr(revisionKey); }, this.client.multi()); - multi.exec(function (error, replies) { + multi.exec(function (error, revisions) { if (error) { debug(error); return callback(error); } - var errors = replies.filter(function (reply) { - return reply instanceof Error + var errors = revisions.filter(function (reply) { + return reply instanceof Error; }); if (errors.length) { @@ -283,13 +253,21 @@ _.extend(Redis.prototype, { return callback(new Error(message + '\n' + errors.join('\n'))); } - var undispKeysEvtMap = events.map(function (event, index) { - event.streamRevision = parseInt(replies[index], 10); // mutates events passed as parameter + var savedKeysAndEvents = events.map(function(event, index) { + var key = prefix + ':' + eventKey(event); + event.streamRevision = parseInt(revisions[index], 10) - 1; + event.applyMappings(); + return [key, JSON.stringify(event)]; + }); + + var undispatchedKeysAndEvents = events.map(function (event) { var key = self.options.prefix + ':undispatched_' + self.options.eventsCollectionName + ':' + eventKey(event); return [key, JSON.stringify(event)]; }); - var args = _.flatten(undispKeysEvtMap).concat(callback); + var args = _.flatten(savedKeysAndEvents) + .concat(_.flatten(undispatchedKeysAndEvents)) + .concat(callback); self.client.mset.apply(self.client, args); }); }, diff --git a/lib/databases/redis/commit.lua b/lib/databases/redis/commit.lua deleted file mode 100644 index 2e5574b0..00000000 --- a/lib/databases/redis/commit.lua +++ /dev/null @@ -1,11 +0,0 @@ -local key = KEYS[1] -local event = cjson.decode(KEYS[2]) -local revisionKey = KEYS[3] .. ':revision' - -local revision = redis.call('GET', revisionKey) -if (not revision) then revision = 0 end -redis.call('INCR', revisionKey) -event['streamRevision'] = revision -redis.call('SET', key, cjson.encode(event)) - -return revision diff --git a/lib/event.js b/lib/event.js index e47d6651..8c854698 100644 --- a/lib/event.js +++ b/lib/event.js @@ -1,6 +1,7 @@ 'use strict'; var debug = require('debug')('eventstore:event'), + dotty = require('dotty'), _ = require('lodash'); /** @@ -9,7 +10,7 @@ var debug = require('debug')('eventstore:event'), * @param {Object} event the event object * @constructor */ -function Event (eventstream, event) { +function Event (eventstream, event, eventMappings) { if (!eventstream) { var errStreamMsg = 'eventstream not injected!'; debug(errStreamMsg); @@ -34,6 +35,8 @@ function Event (eventstream, event) { throw new Error(errAggIdMsg); } + eventMappings = eventMappings || {}; + this.streamId = eventstream.aggregateId; this.aggregateId = eventstream.aggregateId; this.aggregate = eventstream.aggregate; @@ -44,6 +47,14 @@ function Event (eventstream, event) { this.commitStamp = null; this.payload = event || null; + this.applyMappings = function applyMappings() { + _.keys(eventMappings).forEach(function (key) { + if (this[key] !== undefined && this[key] !== null) { + dotty.put(this.payload, eventMappings[key], this[key]); + } + }.bind(this)); + }; + eventstream.uncommittedEvents.push(this); } diff --git a/lib/eventStream.js b/lib/eventStream.js index a0e9f868..a0fd48b5 100644 --- a/lib/eventStream.js +++ b/lib/eventStream.js @@ -90,7 +90,7 @@ EventStream.prototype = { * @param {Object} event */ addEvent: function(event) { - new Event(this, event); + new Event(this, event, this.eventstore.eventMappings); }, /** diff --git a/lib/eventstore.js b/lib/eventstore.js index 48bfb11e..17bc774a 100644 --- a/lib/eventstore.js +++ b/lib/eventstore.js @@ -4,7 +4,6 @@ var debug = require('debug')('eventstore'), util = require('util'), EventEmitter = require('events').EventEmitter, _ = require('lodash'), - dotty = require('dotty'), async = require('async'), tolerate = require('tolerance'), EventDispatcher = require('./eventDispatcher'), @@ -412,11 +411,7 @@ _.extend(Eventstore.prototype, { currentRevision++; event.streamRevision = currentRevision; - _.each(_.keys(self.eventMappings), function (key) { - if (event[key] !== undefined && event[key] !== null) { - dotty.put(event.payload, self.eventMappings[key], event[key]); - } - }); + event.applyMappings(); } self.store.addEvents(uncommittedEvents, function(err) { diff --git a/test/storeTest.js b/test/storeTest.js index 15612e4f..54c31339 100644 --- a/test/storeTest.js +++ b/test/storeTest.js @@ -158,7 +158,8 @@ types.forEach(function (type) { commitSequence: 0, payload: { event:'bla' - } + }, + applyMappings: function () {} }; store.addEvents([event], function(err) { @@ -181,6 +182,40 @@ types.forEach(function (type) { }); + describe('with an array in the payload', function () { + + it('it should save the event', function(done) { + + var event = { + aggregateId: 'id1', + id: '111', + streamRevision: 0, + commitId: '111', + commitStamp: new Date(), + commitSequence: 0, + payload: { + event:'bla', + array: [] + }, + applyMappings: function () {} + }; + + store.addEvents([event], function(err) { + expect(err).not.to.be.ok(); + + store.getEvents({}, 0, -1, function(err, evts) { + expect(err).not.to.be.ok(); + + expect(evts[0].payload.array).to.be.an('array'); + + done(); + }); + }); + + }); + + }); + describe('with multiple events in the array', function () { it('it should save the event', function(done) { @@ -195,7 +230,8 @@ types.forEach(function (type) { restInCommitStream: 1, payload: { event:'bla' - } + }, + applyMappings: function () {} }; var event2 = { @@ -208,7 +244,8 @@ types.forEach(function (type) { restInCommitStream: 0, payload: { event:'bla2' - } + }, + applyMappings: function () {} }; store.addEvents([event1, event2], function(err) { @@ -752,7 +789,8 @@ types.forEach(function (type) { commitSequence: 0, payload: { event:'bla' - } + }, + applyMappings: function () {} }; store.addEvents([event], function(err) { @@ -777,7 +815,8 @@ types.forEach(function (type) { commitSequence: 0, payload: { event:'blaffff' - } + }, + applyMappings: function () {} }; store.addEvents([event], function(err) { @@ -814,7 +853,8 @@ types.forEach(function (type) { commitSequence: 0, payload: { event:'blaffff' - } + }, + applyMappings: function () {} }; store.addEvents([event], function(err) { @@ -853,7 +893,8 @@ types.forEach(function (type) { commitSequence: 0, payload: { event:'blaffff' - } + }, + applyMappings: function () {} }; store.addEvents([event], function(err) { @@ -892,7 +933,8 @@ types.forEach(function (type) { commitId: '118', payload: { event:'blaffff' - } + }, + applyMappings: function () {} }; store.addEvents([event], function(err) { @@ -931,7 +973,8 @@ types.forEach(function (type) { commitSequence: 0, payload: { event:'bla' - } + }, + applyMappings: function () {} }, { aggregateId: 'id', streamRevision: 1, @@ -941,7 +984,8 @@ types.forEach(function (type) { commitSequence: 1, payload: { event:'bla2' - } + }, + applyMappings: function () {} }]; var stream2 = [{ @@ -954,7 +998,8 @@ types.forEach(function (type) { commitSequence: 0, payload: { event:'bla' - } + }, + applyMappings: function () {} }, { aggregateId: 'idWithAgg', aggregate: 'myAgg', @@ -965,7 +1010,8 @@ types.forEach(function (type) { commitSequence: 1, payload: { event: 'bla2' - } + }, + applyMappings: function () {} }]; var stream3 = [{ @@ -978,7 +1024,8 @@ types.forEach(function (type) { commitSequence: 0, payload: { event:'bla2' - } + }, + applyMappings: function () {} }]; var stream4 = [{ @@ -991,7 +1038,8 @@ types.forEach(function (type) { commitSequence: 0, payload: { event:'bla' - } + }, + applyMappings: function () {} }, { aggregateId: 'idWithCont', context: 'myCont', @@ -1002,7 +1050,8 @@ types.forEach(function (type) { commitSequence: 1, payload: { event: 'bla2' - } + }, + applyMappings: function () {} }]; var stream5 = [{ @@ -1015,7 +1064,8 @@ types.forEach(function (type) { commitSequence: 0, payload: { event:'bla2' - } + }, + applyMappings: function () {} }]; var stream6 = [{ @@ -1029,7 +1079,8 @@ types.forEach(function (type) { commitSequence: 0, payload: { event:'bla' - } + }, + applyMappings: function () {} }, { aggregateId: 'idWithAggrAndCont', aggregate: 'myAggrrr', @@ -1041,7 +1092,8 @@ types.forEach(function (type) { commitSequence: 1, payload: { event: 'bla2' - } + }, + applyMappings: function () {} }]; var stream7 = [{ @@ -1055,7 +1107,8 @@ types.forEach(function (type) { commitSequence: 0, payload: { event:'bla' - } + }, + applyMappings: function () {} }, { aggregateId: 'idWithAggrAndCont2', aggregate: 'myAggrrr2', @@ -1067,7 +1120,8 @@ types.forEach(function (type) { commitSequence: 1, payload: { event: 'bla2' - } + }, + applyMappings: function () {} }]; var stream8 = [{ @@ -1081,7 +1135,8 @@ types.forEach(function (type) { commitSequence: 0, payload: { event:'bla' - } + }, + applyMappings: function () {} }]; var stream9 = [{ @@ -1095,7 +1150,8 @@ types.forEach(function (type) { commitSequence: 0, payload: { event: 'bla2' - } + }, + applyMappings: function () {} }]; var stream10 = [{ @@ -1109,7 +1165,8 @@ types.forEach(function (type) { commitSequence: 0, payload: { event:'bla2' - } + }, + applyMappings: function () {} }]; var allEvents = [].concat(stream1).concat(stream2).concat(stream3) @@ -2312,7 +2369,8 @@ types.forEach(function (type) { commitSequence: 0, payload: { event:'bla' - } + }, + applyMappings: function () {} }, { aggregateId: 'id', streamRevision: 1, @@ -2322,7 +2380,8 @@ types.forEach(function (type) { commitSequence: 1, payload: { event:'bla2' - } + }, + applyMappings: function () {} }]; beforeEach(function (done) {