Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MessageId should be unassigned in queue #33

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ pids
# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov

# Coverage directory used by tools like istanbul
# Coverage directory used by tools like nyc
coverage
.nyc_output

# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files)
.grunt
Expand All @@ -25,3 +26,5 @@ build/Release
# Dependency directory
# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git
node_modules

package-lock.json
16 changes: 14 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
# aedes-persistence  [![Build Status](https://travis-ci.org/mcollina/aedes-persistence.svg)](https://travis-ci.org/mcollina/aedes-persistence)
# aedes-persistence
[![Build Status](https://travis-ci.org/mcollina/aedes-persistence.svg?branch=master)](https://travis-ci.org/mcollina/aedes-persistence)
[![Dependencies Status](https://david-dm.org/mcollina/aedes-persistence/status.svg)](https://david-dm.org/mcollina/aedes-persistence)
[![devDependencies Status](https://david-dm.org/mcollina/aedes-persistence/dev-status.svg)](https://david-dm.org/mcollina/aedes-persistence?type=dev)
<br/>
[![Known Vulnerabilities](https://snyk.io/test/github/mcollina/aedes-persistence/badge.svg)](https://snyk.io/test/github/mcollina/aedes-persistence)
[![Coverage Status](https://coveralls.io/repos/mcollina/aedes-persistence/badge.svg?branch=master&service=github)](https://coveralls.io/github/mcollina/aedes-persistence?branch=master)
[![NPM version](https://img.shields.io/npm/v/aedes-persistence.svg?style=flat)](https://www.npmjs.com/package/aedes-persistence)
[![NPM downloads](https://img.shields.io/npm/dm/aedes-persistence.svg?style=flat)](https://www.npmjs.com/package/aedes-persistence)

The spec for an [Aedes](http://npm.im/aedes) persistence, with abstract
tests and a fast in-memory implementation.
Expand Down Expand Up @@ -240,7 +248,7 @@ format:
<a name="getCLientList"></a>
### instance.getClientList(topic)

Returns a stream which has all the clientIds subscribed to the
Returns a stream which has all the clientIds subscribed to the
specified topic

<a name="implement"></a>
Expand Down Expand Up @@ -281,6 +289,10 @@ abs({
})
```

## Collaborators

* [__Gnought__](https://github.com/gnought)

## License

MIT
42 changes: 29 additions & 13 deletions abstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -770,16 +770,19 @@ function abstractPersistence (opts) {
qos: 1,
retain: false,
brokerId: instance.broker.id,
brokerCounter: 42,
messageId: 1
brokerCounter: 42
}

instance.outgoingEnqueue(sub, packet, function (err) {
t.error(err)
var stream = instance.outgoingStream(client)

stream.pipe(concat(function (list) {
t.deepEqual(list, [expected], 'must return the packet')
var packet = list[0]
t.ok(Object.prototype.hasOwnProperty.call(packet, 'messageId'))
t.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue')
delete packet.messageId
t.deepEqual(packet, expected, 'must return the packet')
instance.destroy(t.end.bind(t))
}))
})
Expand Down Expand Up @@ -821,19 +824,26 @@ function abstractPersistence (opts) {
qos: 1,
retain: false,
brokerId: instance.broker.id,
brokerCounter: 42,
messageId: 1
brokerCounter: 42
}

instance.outgoingEnqueueCombi(subs, packet, function (err) {
t.error(err)
var stream = instance.outgoingStream(client)
stream.pipe(concat(function (list) {
t.deepEqual(list, [expected], 'must return the packet')
var packet = list[0]
t.ok(Object.prototype.hasOwnProperty.call(packet, 'messageId'))
t.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue')
delete packet.messageId
t.deepEqual(packet, expected, 'must return the packet')

var stream2 = instance.outgoingStream(client2)
stream2.pipe(concat(function (list) {
t.deepEqual(list, [expected], 'must return the packet')
var packet = list[0]
t.ok(Object.prototype.hasOwnProperty.call(packet, 'messageId'))
t.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue')
delete packet.messageId
t.deepEqual(packet, expected, 'must return the packet')
instance.destroy(t.end.bind(t))
}))
}))
Expand Down Expand Up @@ -916,16 +926,19 @@ function abstractPersistence (opts) {
qos: 1,
retain: false,
brokerId: instance.broker.id,
brokerCounter: 42,
messageId: 1
brokerCounter: 42
}

instance.outgoingEnqueueCombi([sub], packet, function (err) {
t.error(err)
var stream = instance.outgoingStream(client)

stream.pipe(concat(function (list) {
t.deepEqual(list, [expected], 'must return the packet')
var packet = list[0]
t.ok(Object.prototype.hasOwnProperty.call(packet, 'messageId'))
t.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue')
delete packet.messageId
t.deepEqual(packet, expected, 'must return the packet')
instance.destroy(t.end.bind(t))
}))
})
Expand Down Expand Up @@ -959,16 +972,19 @@ function abstractPersistence (opts) {
qos: 1,
retain: false,
brokerId: instance.broker.id,
brokerCounter: 42,
messageId: 4242
brokerCounter: 42
}

instance.outgoingEnqueueCombi([sub], packet, function (err) {
t.error(err)
var stream = instance.outgoingStream(client)

stream.pipe(concat(function (list) {
t.deepEqual(list, [expected], 'must return the packet')
var packet = list[0]
t.ok(Object.prototype.hasOwnProperty.call(packet, 'messageId'))
t.equal(packet.messageId, undefined, 'should have an unassigned messageId in queue')
delete packet.messageId
t.deepEqual(packet, expected, 'must return the packet')

var stream = instance.outgoingStream(client)

Expand Down
23 changes: 19 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@
"description": "The spec for an Aedes persistence, with abstract tests and a fast in-memory implementation.",
"main": "persistence.js",
"scripts": {
"test": "standard && tape test.js | faucet"
"lint": "standard --verbose | snazzy",
"test": "tape test.js | faucet",
"coverage": "nyc --reporter=lcov tape test.js",
"license-checker": "license-checker --production --onlyAllow='MIT;ISC;BSD-3-Clause;BSD-2-Clause'"
},
"pre-commit": "test",
"pre-commit": [
"lint",
"test"
],
"repository": {
"type": "git",
"url": "git+https://github.com/mcollina/aedes-persistence.git"
Expand All @@ -18,6 +24,12 @@
"aedes"
],
"author": "Matteo Collina <hello@matteocollina.com>",
"contributors": [
{
"name": "Gnought",
"url": "https://github.com/gnought"
}
],
"license": "MIT",
"bugs": {
"url": "https://github.com/mcollina/aedes-persistence/issues"
Expand All @@ -26,15 +38,18 @@
"devDependencies": {
"concat-stream": "^2.0.0",
"faucet": "0.0.1",
"license-checker": "^25.0.1",
"mqemitter": "^3.0.0",
"nyc": "^14.1.1",
"pre-commit": "^1.2.2",
"pump": "^3.0.0",
"standard": "^13.0.2",
"snazzy": "^8.0.0",
"standard": "^13.1.0",
"tape": "^4.11.0",
"through2": "^3.0.1"
},
"dependencies": {
"aedes-packet": "^2.0.0",
"aedes-packet": "^2.1.0",
"from2": "^2.3.0",
"qlobber": "^3.1.0"
}
Expand Down
3 changes: 0 additions & 3 deletions persistence.js
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,6 @@ function _outgoingEnqueue (sub, packet) {

this._outgoing[id] = queue
var p = new Packet(packet)
if (packet.messageId) {
p.messageId = packet.messageId
}
queue[queue.length] = p
}

Expand Down