Skip to content

Commit

Permalink
MessageId should be unassigned in queue
Browse files Browse the repository at this point in the history
outgoingEnqueue and outgoingEnqueueCombi functions should not store messageId. outgoingUpdate function will do. This PR requires moscajs/aedes-packet#5
  • Loading branch information
gnought committed Jul 17, 2019
1 parent d11ed55 commit b26d1d3
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 21 deletions.
44 changes: 30 additions & 14 deletions abstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

var concat = require('concat-stream')
var through = require('through2')
var Packet = require('aedes-packet')
var Packet = require('../aedes-packet')

function abstractPersistence (opts) {
var test = opts.test
Expand Down Expand Up @@ -769,16 +769,19 @@ function abstractPersistence (opts) {
qos: 1,
retain: false,
brokerId: instance.broker.id,
brokerCounter: 42,
messageId: 1
brokerCounter: 42
}

instance.outgoingEnqueue(sub, packet, function (err) {
t.error(err)
var stream = instance.outgoingStream(client)

stream.pipe(concat(function (list) {
t.deepEqual(list, [expected], 'must return the packet')
var packet = list[0]
t.ok(Object.prototype.hasOwnProperty.call(packet, 'messageId'))
t.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue')
delete packet.messageId
t.deepEqual(packet, expected, 'must return the packet')
instance.destroy(t.end.bind(t))
}))
})
Expand Down Expand Up @@ -820,19 +823,26 @@ function abstractPersistence (opts) {
qos: 1,
retain: false,
brokerId: instance.broker.id,
brokerCounter: 42,
messageId: 1
brokerCounter: 42
}

instance.outgoingEnqueueCombi(subs, packet, function (err) {
t.error(err)
var stream = instance.outgoingStream(client)
stream.pipe(concat(function (list) {
t.deepEqual(list, [expected], 'must return the packet')
var packet = list[0]
t.ok(Object.prototype.hasOwnProperty.call(packet, 'messageId'))
t.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue')
delete packet.messageId
t.deepEqual(packet, expected, 'must return the packet')

var stream2 = instance.outgoingStream(client2)
stream2.pipe(concat(function (list) {
t.deepEqual(list, [expected], 'must return the packet')
var packet = list[0]
t.ok(Object.prototype.hasOwnProperty.call(packet, 'messageId'))
t.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue')
delete packet.messageId
t.deepEqual(packet, expected, 'must return the packet')
instance.destroy(t.end.bind(t))
}))
}))
Expand Down Expand Up @@ -866,16 +876,19 @@ function abstractPersistence (opts) {
qos: 1,
retain: false,
brokerId: instance.broker.id,
brokerCounter: 42,
messageId: 1
brokerCounter: 42
}

instance.outgoingEnqueueCombi([sub], packet, function (err) {
t.error(err)
var stream = instance.outgoingStream(client)

stream.pipe(concat(function (list) {
t.deepEqual(list, [expected], 'must return the packet')
var packet = list[0]
t.ok(Object.prototype.hasOwnProperty.call(packet, 'messageId'))
t.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue')
delete packet.messageId
t.deepEqual(packet, expected, 'must return the packet')
instance.destroy(t.end.bind(t))
}))
})
Expand Down Expand Up @@ -909,16 +922,19 @@ function abstractPersistence (opts) {
qos: 1,
retain: false,
brokerId: instance.broker.id,
brokerCounter: 42,
messageId: 4242
brokerCounter: 42
}

instance.outgoingEnqueueCombi([sub], packet, function (err) {
t.error(err)
var stream = instance.outgoingStream(client)

stream.pipe(concat(function (list) {
t.deepEqual(list, [expected], 'must return the packet')
var packet = list[0]
t.ok(Object.prototype.hasOwnProperty.call(packet, 'messageId'))
t.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue')
delete packet.messageId
t.deepEqual(packet, expected, 'must return the packet')

var stream = instance.outgoingStream(client)

Expand Down
13 changes: 6 additions & 7 deletions persistence.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
var from2 = require('from2')
var QlobberSub = require('qlobber/aedes/qlobber-sub')
var QlobberTrue = require('qlobber').QlobberTrue
var Packet = require('aedes-packet')
var Packet = require('../aedes-packet')
var QlobberOpts = {
wildcard_one: '+',
wildcard_some: '#',
Expand Down Expand Up @@ -185,9 +185,6 @@ function _outgoingEnqueue (sub, packet) {

this._outgoing[id] = queue
var p = new Packet(packet)
if (packet.messageId) {
p.messageId = packet.messageId
}
queue[queue.length] = p
}

Expand All @@ -205,9 +202,11 @@ MemoryPersistence.prototype.outgoingUpdate = function (client, packet, cb) {
temp.brokerCounter === packet.brokerCounter) {
temp.messageId = packet.messageId
return cb(null, client, packet)
} else if (packet.cmd !== 'publish' && temp.messageId === packet.messageId) {
// for non-PUBLISH packet only
outgoing[i] = packet
} else if (temp.messageId === packet.messageId) {
if (packet.cmd !== 'publish') {
// for non-PUBLISH packet only
outgoing[i] = packet
}
return cb(null, client, packet)
}
}
Expand Down

0 comments on commit b26d1d3

Please sign in to comment.