Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do not emit packet events and stop parsing after a parse error #15

Merged
merged 2 commits into from
Sep 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
language: node_js
sudo: false
node_js:
- 5
- 6
- 4
- iojs-v3
- iojs-v2
- iojs-v1
- 0.12
- 0.10
script:
npm run ci
before_install:
- node -v | grep v0.8 && npm install npm@1.4.28 -g || echo "no need to update NPM"
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ Encode and Decode MQTT 3.1.1 packets the node way.
* <a href="#contributing">Contributing</a>
* <a href="#license">Licence &amp; copyright</a>

This library works with node v4.x, v0.12.x, v0.10.x and all iojs releases, but it requires at
least NPM 1.4. To upgrade NPM on node v0.8, run `npm install
npm@1.4.28 -g`.
This library is tested with node v4 and v6. The last version to support
older versions of node was mqtt-packet@4.1.2.

Install
------------
Expand Down Expand Up @@ -122,6 +121,10 @@ will emit:
Parse a given `Buffer` and emits synchronously all the MQTT packets that
are included. Returns the number of bytes left to parse.

If an error happens, an `error` event will be emitted, but no `packet` events
will be emitted after that. Calling `parse()` again clears the error and
previous buffer as if you created a new `Parser`.

Packets
-------

Expand Down
78 changes: 46 additions & 32 deletions parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,35 @@ function Parser() {
return new Parser()
}

this._list = bl()
this._newPacket()

this._states = [
'_parseHeader'
, '_parseLength'
, '_parsePayload'
, '_newPacket'
]
this._stateCounter = 0

this._resetState()
}

inherits(Parser, EE)

Parser.prototype._newPacket = function () {
if (this.packet) {
this._list.consume(this.packet.length)
this.emit('packet', this.packet)
}

Parser.prototype._resetState = function () {
this.packet = new Packet()

return true
this.error = null
this._list = bl()
this._stateCounter = 0
}

Parser.prototype.parse = function (buf) {
if (this.error) {
this._resetState()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should return early in the if, otherwise it keep resetting itself: if we want to parse two buffer, and the first errors, the second should be ignored. Or maybe errored, what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could go either way. _resetState() resets the error as well as the buffer that had the error in it so it shouldn't keep resetting.

If we don't reset the state, calling parse() again with a new buffer is guaranteed to keep emitting the same error over and over since it appends the new buffer after the old.

So we could just tell the user to throw that instance of Parser away, or reset the internal state like I'm doing here. If we do choose not to reset for them, should we at least throw an exception to make it obvious the current instance is broken?


this._list.append(buf)

while ((this.packet.length != -1 || this._list.length > 0) &&
this[this._states[this._stateCounter]]()) {
this[this._states[this._stateCounter]]() &&
!this.error) {
this._stateCounter++

if (this._stateCounter >= this._states.length) {
Expand Down Expand Up @@ -140,7 +138,7 @@ Parser.prototype._parsePayload = function () {
// these are empty, nothing to do
break
default:
this.emit('error', new Error('not supported'))
this._emitError(new Error('not supported'))
}

result = true
Expand All @@ -162,29 +160,29 @@ Parser.prototype._parseConnect = function () {
// Parse constants id
protocolId = this._parseString()
if (protocolId === null)
return this.emit('error', new Error('cannot parse protocol id'))
return this._emitError(new Error('cannot parse protocol id'))

if (protocolId != 'MQTT' && protocolId != 'MQIsdp') {

return this.emit('error', new Error('invalid protocol id'))
return this._emitError(new Error('invalid protocol id'))
}

packet.protocolId = protocolId

// Parse constants version number
if(this._pos >= this._list.length)
return this.emit('error', new Error('packet too short'))
return this._emitError(new Error('packet too short'))

packet.protocolVersion = this._list.readUInt8(this._pos)

if(packet.protocolVersion != 3 && packet.protocolVersion != 4) {

return this.emit('error', new Error('invalid protocol version'))
return this._emitError(new Error('invalid protocol version'))
}

this._pos++
if(this._pos >= this._list.length)
return this.emit('error', new Error('packet too short'))
return this._emitError(new Error('packet too short'))

// Parse connect flags
flags.username = (this._list.readUInt8(this._pos) & constants.USERNAME_MASK)
Expand All @@ -204,41 +202,41 @@ Parser.prototype._parseConnect = function () {
// Parse keepalive
packet.keepalive = this._parseNum()
if(packet.keepalive === -1)
return this.emit('error', new Error('packet too short'))
return this._emitError(new Error('packet too short'))

// Parse client ID
clientId = this._parseString()
if(clientId === null)
return this.emit('error', new Error('packet too short'))
return this._emitError(new Error('packet too short'))
packet.clientId = clientId

if (flags.will) {
// Parse will topic
topic = this._parseString()
if (topic === null)
return this.emit('error', new Error('cannot parse will topic'))
return this._emitError(new Error('cannot parse will topic'))
packet.will.topic = topic

// Parse will payload
payload = this._parseBuffer()
if (payload === null)
return this.emit('error', new Error('cannot parse will payload'))
return this._emitError(new Error('cannot parse will payload'))
packet.will.payload = payload
}

// Parse username
if (flags.username) {
username = this._parseString()
if(username === null)
return this.emit('error', new Error('cannot parse username'))
return this._emitError(new Error('cannot parse username'))
packet.username = username
}

// Parse password
if(flags.password) {
password = this._parseBuffer()
if(password === null)
return this.emit('error', new Error('cannot parse username'))
return this._emitError(new Error('cannot parse username'))
packet.password = password
}

Expand All @@ -252,15 +250,15 @@ Parser.prototype._parseConnack = function () {
packet.sessionPresent = !!(this._list.readUInt8(this._pos++) & constants.SESSIONPRESENT_MASK)
packet.returnCode = this._list.readUInt8(this._pos)
if(packet.returnCode === -1)
return this.emit('error', new Error('cannot parse return code'))
return this._emitError(new Error('cannot parse return code'))
}

Parser.prototype._parsePublish = function () {
var packet = this.packet
packet.topic = this._parseString()

if(packet.topic === null)
return this.emit('error', new Error('cannot parse topic'))
return this._emitError(new Error('cannot parse topic'))

// Parse message ID
if (packet.qos > 0) {
Expand All @@ -276,7 +274,7 @@ Parser.prototype._parseSubscribe = function() {
, qos

if (packet.qos != 1) {
return this.emit('error', new Error('wrong subscribe header'))
return this._emitError(new Error('wrong subscribe header'))
}

packet.subscriptions = []
Expand All @@ -288,7 +286,7 @@ Parser.prototype._parseSubscribe = function() {
// Parse topic
topic = this._parseString()
if (topic === null)
return this.emit('error', new Error('Parse error - cannot parse topic'))
return this._emitError(new Error('Parse error - cannot parse topic'))

qos = this._list.readUInt8(this._pos++)

Expand Down Expand Up @@ -322,7 +320,7 @@ Parser.prototype._parseUnsubscribe = function() {
// Parse topic
topic = this._parseString()
if (topic === null)
return this.emit('error', new Error('cannot parse topic'))
return this._emitError(new Error('cannot parse topic'))

// Push topic to unsubscriptions
packet.unsubscriptions.push(topic);
Expand All @@ -331,7 +329,7 @@ Parser.prototype._parseUnsubscribe = function() {

Parser.prototype._parseUnsuback = function() {
if (!this._parseMessageId())
return this.emit('error', new Error('cannot parse message id'))
return this._emitError(new Error('cannot parse message id'))
}

Parser.prototype._parseMessageId = function() {
Expand All @@ -340,7 +338,7 @@ Parser.prototype._parseMessageId = function() {
packet.messageId = this._parseNum()

if(packet.messageId === null) {
this.emit('error', new Error('cannot parse message id'))
this._emitError(new Error('cannot parse message id'))
return false
}

Expand Down Expand Up @@ -385,4 +383,20 @@ Parser.prototype._parseNum = function() {
return result
}

Parser.prototype._newPacket = function () {
if (this.packet) {
this._list.consume(this.packet.length)
this.emit('packet', this.packet)
}

this.packet = new Packet()

return true
}

Parser.prototype._emitError = function(err) {
this.error = err
this.emit('error', err)
}

module.exports = Parser
77 changes: 77 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ function testParseError(expected, fixture) {
t.equal(err.message, expected, 'expected error message')
})

parser.on('packet', function() {
t.fail('parse errors should not be followed by packet events')
})

parser.parse(fixture)
})
}
Expand Down Expand Up @@ -949,3 +953,76 @@ testParseError('cannot parse protocol id', new Buffer([
77, 81, 73, 115, 100, 112,
77, 81, 73, 115, 100, 112
]))

test('stops parsing after first error', function(t) {
t.plan(4)

var parser = mqtt.parser()

var packetCount = 0
var errorCount = 0
var expectedPackets = 1
var expectedErrors = 1

parser.on('packet', function(packet) {
t.ok(++packetCount <= expectedPackets, 'expected <= ' + expectedPackets + ' packets')
})

parser.on('error', function(err) {
t.ok(++errorCount <= expectedErrors, 'expected <= ' + expectedErrors + ' errors')
})

parser.parse(new Buffer([
// first, a valid connect packet:

16, 12, // Header
0, 4, // Protocol id length
77, 81, 84, 84, // Protocol id
4, // Protocol version
2, // Connect flags
0, 30, // Keepalive
0, 0, //Client id length

// then an invalid subscribe packet:

128, 9, // Header (subscribe, qos=0, length=9)
0, 6, // message id (6)
0, 4, // topic length,
116, 101, 115, 116, // Topic (test)
0, // qos (0)

// and another invalid subscribe packet:

128, 9, // Header (subscribe, qos=0, length=9)
0, 6, // message id (6)
0, 4, // topic length,
116, 101, 115, 116, // Topic (test)
0, // qos (0)

// finally, a valid disconnect packet:

224, 0, // Header
]))

// calling parse again clears the error and continues parsing
packetCount = 0
errorCount = 0
expectedPackets = 2
expectedErrors = 0

parser.parse(new Buffer([
// connect:

16, 12, // Header
0, 4, // Protocol id length
77, 81, 84, 84, // Protocol id
4, // Protocol version
2, // Connect flags
0, 30, // Keepalive
0, 0, //Client id length

// disconnect:

224, 0, // Header
]))
})