diff --git a/abstract.js b/abstract.js index 9dc28df..07335c9 100644 --- a/abstract.js +++ b/abstract.js @@ -2,7 +2,7 @@ var concat = require('concat-stream') var through = require('through2') -var Packet = require('aedes-packet') +var Packet = require('../aedes-packet') function abstractPersistence (opts) { var test = opts.test @@ -769,8 +769,7 @@ function abstractPersistence (opts) { qos: 1, retain: false, brokerId: instance.broker.id, - brokerCounter: 42, - messageId: 1 + brokerCounter: 42 } instance.outgoingEnqueue(sub, packet, function (err) { @@ -778,7 +777,11 @@ function abstractPersistence (opts) { var stream = instance.outgoingStream(client) stream.pipe(concat(function (list) { - t.deepEqual(list, [expected], 'must return the packet') + var packet = list[0] + t.ok(Object.prototype.hasOwnProperty.call(packet, 'messageId')) + t.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue') + delete packet.messageId + t.deepEqual(packet, expected, 'must return the packet') instance.destroy(t.end.bind(t)) })) }) @@ -820,19 +823,26 @@ function abstractPersistence (opts) { qos: 1, retain: false, brokerId: instance.broker.id, - brokerCounter: 42, - messageId: 1 + brokerCounter: 42 } instance.outgoingEnqueueCombi(subs, packet, function (err) { t.error(err) var stream = instance.outgoingStream(client) stream.pipe(concat(function (list) { - t.deepEqual(list, [expected], 'must return the packet') + var packet = list[0] + t.ok(Object.prototype.hasOwnProperty.call(packet, 'messageId')) + t.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue') + delete packet.messageId + t.deepEqual(packet, expected, 'must return the packet') var stream2 = instance.outgoingStream(client2) stream2.pipe(concat(function (list) { - t.deepEqual(list, [expected], 'must return the packet') + var packet = list[0] + t.ok(Object.prototype.hasOwnProperty.call(packet, 'messageId')) + t.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue') + delete packet.messageId + t.deepEqual(packet, expected, 'must return the packet') instance.destroy(t.end.bind(t)) })) })) @@ -866,8 +876,7 @@ function abstractPersistence (opts) { qos: 1, retain: false, brokerId: instance.broker.id, - brokerCounter: 42, - messageId: 1 + brokerCounter: 42 } instance.outgoingEnqueueCombi([sub], packet, function (err) { @@ -875,7 +884,11 @@ function abstractPersistence (opts) { var stream = instance.outgoingStream(client) stream.pipe(concat(function (list) { - t.deepEqual(list, [expected], 'must return the packet') + var packet = list[0] + t.ok(Object.prototype.hasOwnProperty.call(packet, 'messageId')) + t.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue') + delete packet.messageId + t.deepEqual(packet, expected, 'must return the packet') instance.destroy(t.end.bind(t)) })) }) @@ -909,8 +922,7 @@ function abstractPersistence (opts) { qos: 1, retain: false, brokerId: instance.broker.id, - brokerCounter: 42, - messageId: 4242 + brokerCounter: 42 } instance.outgoingEnqueueCombi([sub], packet, function (err) { @@ -918,7 +930,11 @@ function abstractPersistence (opts) { var stream = instance.outgoingStream(client) stream.pipe(concat(function (list) { - t.deepEqual(list, [expected], 'must return the packet') + var packet = list[0] + t.ok(Object.prototype.hasOwnProperty.call(packet, 'messageId')) + t.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue') + delete packet.messageId + t.deepEqual(packet, expected, 'must return the packet') var stream = instance.outgoingStream(client) diff --git a/persistence.js b/persistence.js index b3f0004..b3bfc01 100644 --- a/persistence.js +++ b/persistence.js @@ -3,7 +3,7 @@ var from2 = require('from2') var QlobberSub = require('qlobber/aedes/qlobber-sub') var QlobberTrue = require('qlobber').QlobberTrue -var Packet = require('aedes-packet') +var Packet = require('../aedes-packet') var QlobberOpts = { wildcard_one: '+', wildcard_some: '#', @@ -185,9 +185,6 @@ function _outgoingEnqueue (sub, packet) { this._outgoing[id] = queue var p = new Packet(packet) - if (packet.messageId) { - p.messageId = packet.messageId - } queue[queue.length] = p } @@ -205,9 +202,11 @@ MemoryPersistence.prototype.outgoingUpdate = function (client, packet, cb) { temp.brokerCounter === packet.brokerCounter) { temp.messageId = packet.messageId return cb(null, client, packet) - } else if (packet.cmd !== 'publish' && temp.messageId === packet.messageId) { - // for non-PUBLISH packet only - outgoing[i] = packet + } else if (temp.messageId === packet.messageId) { + if (packet.cmd !== 'publish') { + // for non-PUBLISH packet only + outgoing[i] = packet + } return cb(null, client, packet) } }