diff --git a/.travis.yml b/.travis.yml
index 363e42b..8f2539a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,14 +1,7 @@
language: node_js
sudo: false
node_js:
- - 5
+ - 6
- 4
- - iojs-v3
- - iojs-v2
- - iojs-v1
- - 0.12
- - 0.10
script:
npm run ci
-before_install:
- - node -v | grep v0.8 && npm install npm@1.4.28 -g || echo "no need to update NPM"
diff --git a/README.md b/README.md
index 7247d93..121813d 100644
--- a/README.md
+++ b/README.md
@@ -10,9 +10,8 @@ Encode and Decode MQTT 3.1.1 packets the node way.
* Contributing
* Licence & copyright
-This library works with node v4.x, v0.12.x, v0.10.x and all iojs releases, but it requires at
-least NPM 1.4. To upgrade NPM on node v0.8, run `npm install
-npm@1.4.28 -g`.
+This library is tested with node v4 and v6. The last version to support
+older versions of node was mqtt-packet@4.1.2.
Install
------------
@@ -122,6 +121,10 @@ will emit:
Parse a given `Buffer` and emits synchronously all the MQTT packets that
are included. Returns the number of bytes left to parse.
+If an error happens, an `error` event will be emitted, but no `packet` events
+will be emitted after that. Calling `parse()` again clears the error and
+previous buffer as if you created a new `Parser`.
+
Packets
-------
diff --git a/parser.js b/parser.js
index f73bcb4..c626b3e 100644
--- a/parser.js
+++ b/parser.js
@@ -10,37 +10,35 @@ function Parser() {
return new Parser()
}
- this._list = bl()
- this._newPacket()
-
this._states = [
'_parseHeader'
, '_parseLength'
, '_parsePayload'
, '_newPacket'
]
- this._stateCounter = 0
+
+ this._resetState()
}
inherits(Parser, EE)
-Parser.prototype._newPacket = function () {
- if (this.packet) {
- this._list.consume(this.packet.length)
- this.emit('packet', this.packet)
- }
-
+Parser.prototype._resetState = function () {
this.packet = new Packet()
-
- return true
+ this.error = null
+ this._list = bl()
+ this._stateCounter = 0
}
Parser.prototype.parse = function (buf) {
+ if (this.error) {
+ this._resetState()
+ }
this._list.append(buf)
while ((this.packet.length != -1 || this._list.length > 0) &&
- this[this._states[this._stateCounter]]()) {
+ this[this._states[this._stateCounter]]() &&
+ !this.error) {
this._stateCounter++
if (this._stateCounter >= this._states.length) {
@@ -140,7 +138,7 @@ Parser.prototype._parsePayload = function () {
// these are empty, nothing to do
break
default:
- this.emit('error', new Error('not supported'))
+ this._emitError(new Error('not supported'))
}
result = true
@@ -162,29 +160,29 @@ Parser.prototype._parseConnect = function () {
// Parse constants id
protocolId = this._parseString()
if (protocolId === null)
- return this.emit('error', new Error('cannot parse protocol id'))
+ return this._emitError(new Error('cannot parse protocol id'))
if (protocolId != 'MQTT' && protocolId != 'MQIsdp') {
- return this.emit('error', new Error('invalid protocol id'))
+ return this._emitError(new Error('invalid protocol id'))
}
packet.protocolId = protocolId
// Parse constants version number
if(this._pos >= this._list.length)
- return this.emit('error', new Error('packet too short'))
+ return this._emitError(new Error('packet too short'))
packet.protocolVersion = this._list.readUInt8(this._pos)
if(packet.protocolVersion != 3 && packet.protocolVersion != 4) {
- return this.emit('error', new Error('invalid protocol version'))
+ return this._emitError(new Error('invalid protocol version'))
}
this._pos++
if(this._pos >= this._list.length)
- return this.emit('error', new Error('packet too short'))
+ return this._emitError(new Error('packet too short'))
// Parse connect flags
flags.username = (this._list.readUInt8(this._pos) & constants.USERNAME_MASK)
@@ -204,25 +202,25 @@ Parser.prototype._parseConnect = function () {
// Parse keepalive
packet.keepalive = this._parseNum()
if(packet.keepalive === -1)
- return this.emit('error', new Error('packet too short'))
+ return this._emitError(new Error('packet too short'))
// Parse client ID
clientId = this._parseString()
if(clientId === null)
- return this.emit('error', new Error('packet too short'))
+ return this._emitError(new Error('packet too short'))
packet.clientId = clientId
if (flags.will) {
// Parse will topic
topic = this._parseString()
if (topic === null)
- return this.emit('error', new Error('cannot parse will topic'))
+ return this._emitError(new Error('cannot parse will topic'))
packet.will.topic = topic
// Parse will payload
payload = this._parseBuffer()
if (payload === null)
- return this.emit('error', new Error('cannot parse will payload'))
+ return this._emitError(new Error('cannot parse will payload'))
packet.will.payload = payload
}
@@ -230,7 +228,7 @@ Parser.prototype._parseConnect = function () {
if (flags.username) {
username = this._parseString()
if(username === null)
- return this.emit('error', new Error('cannot parse username'))
+ return this._emitError(new Error('cannot parse username'))
packet.username = username
}
@@ -238,7 +236,7 @@ Parser.prototype._parseConnect = function () {
if(flags.password) {
password = this._parseBuffer()
if(password === null)
- return this.emit('error', new Error('cannot parse username'))
+ return this._emitError(new Error('cannot parse username'))
packet.password = password
}
@@ -252,7 +250,7 @@ Parser.prototype._parseConnack = function () {
packet.sessionPresent = !!(this._list.readUInt8(this._pos++) & constants.SESSIONPRESENT_MASK)
packet.returnCode = this._list.readUInt8(this._pos)
if(packet.returnCode === -1)
- return this.emit('error', new Error('cannot parse return code'))
+ return this._emitError(new Error('cannot parse return code'))
}
Parser.prototype._parsePublish = function () {
@@ -260,7 +258,7 @@ Parser.prototype._parsePublish = function () {
packet.topic = this._parseString()
if(packet.topic === null)
- return this.emit('error', new Error('cannot parse topic'))
+ return this._emitError(new Error('cannot parse topic'))
// Parse message ID
if (packet.qos > 0) {
@@ -276,7 +274,7 @@ Parser.prototype._parseSubscribe = function() {
, qos
if (packet.qos != 1) {
- return this.emit('error', new Error('wrong subscribe header'))
+ return this._emitError(new Error('wrong subscribe header'))
}
packet.subscriptions = []
@@ -288,7 +286,7 @@ Parser.prototype._parseSubscribe = function() {
// Parse topic
topic = this._parseString()
if (topic === null)
- return this.emit('error', new Error('Parse error - cannot parse topic'))
+ return this._emitError(new Error('Parse error - cannot parse topic'))
qos = this._list.readUInt8(this._pos++)
@@ -322,7 +320,7 @@ Parser.prototype._parseUnsubscribe = function() {
// Parse topic
topic = this._parseString()
if (topic === null)
- return this.emit('error', new Error('cannot parse topic'))
+ return this._emitError(new Error('cannot parse topic'))
// Push topic to unsubscriptions
packet.unsubscriptions.push(topic);
@@ -331,7 +329,7 @@ Parser.prototype._parseUnsubscribe = function() {
Parser.prototype._parseUnsuback = function() {
if (!this._parseMessageId())
- return this.emit('error', new Error('cannot parse message id'))
+ return this._emitError(new Error('cannot parse message id'))
}
Parser.prototype._parseMessageId = function() {
@@ -340,7 +338,7 @@ Parser.prototype._parseMessageId = function() {
packet.messageId = this._parseNum()
if(packet.messageId === null) {
- this.emit('error', new Error('cannot parse message id'))
+ this._emitError(new Error('cannot parse message id'))
return false
}
@@ -385,4 +383,20 @@ Parser.prototype._parseNum = function() {
return result
}
+Parser.prototype._newPacket = function () {
+ if (this.packet) {
+ this._list.consume(this.packet.length)
+ this.emit('packet', this.packet)
+ }
+
+ this.packet = new Packet()
+
+ return true
+}
+
+Parser.prototype._emitError = function(err) {
+ this.error = err
+ this.emit('error', err)
+}
+
module.exports = Parser
diff --git a/test.js b/test.js
index b1230de..0692553 100644
--- a/test.js
+++ b/test.js
@@ -56,6 +56,10 @@ function testParseError(expected, fixture) {
t.equal(err.message, expected, 'expected error message')
})
+ parser.on('packet', function() {
+ t.fail('parse errors should not be followed by packet events')
+ })
+
parser.parse(fixture)
})
}
@@ -949,3 +953,76 @@ testParseError('cannot parse protocol id', new Buffer([
77, 81, 73, 115, 100, 112,
77, 81, 73, 115, 100, 112
]))
+
+test('stops parsing after first error', function(t) {
+ t.plan(4)
+
+ var parser = mqtt.parser()
+
+ var packetCount = 0
+ var errorCount = 0
+ var expectedPackets = 1
+ var expectedErrors = 1
+
+ parser.on('packet', function(packet) {
+ t.ok(++packetCount <= expectedPackets, 'expected <= ' + expectedPackets + ' packets')
+ })
+
+ parser.on('error', function(err) {
+ t.ok(++errorCount <= expectedErrors, 'expected <= ' + expectedErrors + ' errors')
+ })
+
+ parser.parse(new Buffer([
+ // first, a valid connect packet:
+
+ 16, 12, // Header
+ 0, 4, // Protocol id length
+ 77, 81, 84, 84, // Protocol id
+ 4, // Protocol version
+ 2, // Connect flags
+ 0, 30, // Keepalive
+ 0, 0, //Client id length
+
+ // then an invalid subscribe packet:
+
+ 128, 9, // Header (subscribe, qos=0, length=9)
+ 0, 6, // message id (6)
+ 0, 4, // topic length,
+ 116, 101, 115, 116, // Topic (test)
+ 0, // qos (0)
+
+ // and another invalid subscribe packet:
+
+ 128, 9, // Header (subscribe, qos=0, length=9)
+ 0, 6, // message id (6)
+ 0, 4, // topic length,
+ 116, 101, 115, 116, // Topic (test)
+ 0, // qos (0)
+
+ // finally, a valid disconnect packet:
+
+ 224, 0, // Header
+ ]))
+
+ // calling parse again clears the error and continues parsing
+ packetCount = 0
+ errorCount = 0
+ expectedPackets = 2
+ expectedErrors = 0
+
+ parser.parse(new Buffer([
+ // connect:
+
+ 16, 12, // Header
+ 0, 4, // Protocol id length
+ 77, 81, 84, 84, // Protocol id
+ 4, // Protocol version
+ 2, // Connect flags
+ 0, 30, // Keepalive
+ 0, 0, //Client id length
+
+ // disconnect:
+
+ 224, 0, // Header
+ ]))
+})