From 072124f16c3b9a914e7bda1a79bfb1d9de85db03 Mon Sep 17 00:00:00 2001 From: Jason Diamond Date: Sat, 3 Sep 2016 18:19:04 -0700 Subject: [PATCH 1/2] do not emit packet events and stop parsing after a parse error --- README.md | 4 +++ parser.js | 78 ++++++++++++++++++++++++++++++++----------------------- test.js | 77 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 127 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 7247d93..a70ac65 100644 --- a/README.md +++ b/README.md @@ -122,6 +122,10 @@ will emit: Parse a given `Buffer` and emits synchronously all the MQTT packets that are included. Returns the number of bytes left to parse. +If an error happens, an `error` event will be emitted, but no `packet` events +will be emitted after that. Calling `parse()` again clears the error and +previous buffer as if you created a new `Parser`. + Packets ------- diff --git a/parser.js b/parser.js index f73bcb4..c626b3e 100644 --- a/parser.js +++ b/parser.js @@ -10,37 +10,35 @@ function Parser() { return new Parser() } - this._list = bl() - this._newPacket() - this._states = [ '_parseHeader' , '_parseLength' , '_parsePayload' , '_newPacket' ] - this._stateCounter = 0 + + this._resetState() } inherits(Parser, EE) -Parser.prototype._newPacket = function () { - if (this.packet) { - this._list.consume(this.packet.length) - this.emit('packet', this.packet) - } - +Parser.prototype._resetState = function () { this.packet = new Packet() - - return true + this.error = null + this._list = bl() + this._stateCounter = 0 } Parser.prototype.parse = function (buf) { + if (this.error) { + this._resetState() + } this._list.append(buf) while ((this.packet.length != -1 || this._list.length > 0) && - this[this._states[this._stateCounter]]()) { + this[this._states[this._stateCounter]]() && + !this.error) { this._stateCounter++ if (this._stateCounter >= this._states.length) { @@ -140,7 +138,7 @@ Parser.prototype._parsePayload = function () { // these are empty, nothing to do break default: - this.emit('error', new Error('not supported')) + this._emitError(new Error('not supported')) } result = true @@ -162,29 +160,29 @@ Parser.prototype._parseConnect = function () { // Parse constants id protocolId = this._parseString() if (protocolId === null) - return this.emit('error', new Error('cannot parse protocol id')) + return this._emitError(new Error('cannot parse protocol id')) if (protocolId != 'MQTT' && protocolId != 'MQIsdp') { - return this.emit('error', new Error('invalid protocol id')) + return this._emitError(new Error('invalid protocol id')) } packet.protocolId = protocolId // Parse constants version number if(this._pos >= this._list.length) - return this.emit('error', new Error('packet too short')) + return this._emitError(new Error('packet too short')) packet.protocolVersion = this._list.readUInt8(this._pos) if(packet.protocolVersion != 3 && packet.protocolVersion != 4) { - return this.emit('error', new Error('invalid protocol version')) + return this._emitError(new Error('invalid protocol version')) } this._pos++ if(this._pos >= this._list.length) - return this.emit('error', new Error('packet too short')) + return this._emitError(new Error('packet too short')) // Parse connect flags flags.username = (this._list.readUInt8(this._pos) & constants.USERNAME_MASK) @@ -204,25 +202,25 @@ Parser.prototype._parseConnect = function () { // Parse keepalive packet.keepalive = this._parseNum() if(packet.keepalive === -1) - return this.emit('error', new Error('packet too short')) + return this._emitError(new Error('packet too short')) // Parse client ID clientId = this._parseString() if(clientId === null) - return this.emit('error', new Error('packet too short')) + return this._emitError(new Error('packet too short')) packet.clientId = clientId if (flags.will) { // Parse will topic topic = this._parseString() if (topic === null) - return this.emit('error', new Error('cannot parse will topic')) + return this._emitError(new Error('cannot parse will topic')) packet.will.topic = topic // Parse will payload payload = this._parseBuffer() if (payload === null) - return this.emit('error', new Error('cannot parse will payload')) + return this._emitError(new Error('cannot parse will payload')) packet.will.payload = payload } @@ -230,7 +228,7 @@ Parser.prototype._parseConnect = function () { if (flags.username) { username = this._parseString() if(username === null) - return this.emit('error', new Error('cannot parse username')) + return this._emitError(new Error('cannot parse username')) packet.username = username } @@ -238,7 +236,7 @@ Parser.prototype._parseConnect = function () { if(flags.password) { password = this._parseBuffer() if(password === null) - return this.emit('error', new Error('cannot parse username')) + return this._emitError(new Error('cannot parse username')) packet.password = password } @@ -252,7 +250,7 @@ Parser.prototype._parseConnack = function () { packet.sessionPresent = !!(this._list.readUInt8(this._pos++) & constants.SESSIONPRESENT_MASK) packet.returnCode = this._list.readUInt8(this._pos) if(packet.returnCode === -1) - return this.emit('error', new Error('cannot parse return code')) + return this._emitError(new Error('cannot parse return code')) } Parser.prototype._parsePublish = function () { @@ -260,7 +258,7 @@ Parser.prototype._parsePublish = function () { packet.topic = this._parseString() if(packet.topic === null) - return this.emit('error', new Error('cannot parse topic')) + return this._emitError(new Error('cannot parse topic')) // Parse message ID if (packet.qos > 0) { @@ -276,7 +274,7 @@ Parser.prototype._parseSubscribe = function() { , qos if (packet.qos != 1) { - return this.emit('error', new Error('wrong subscribe header')) + return this._emitError(new Error('wrong subscribe header')) } packet.subscriptions = [] @@ -288,7 +286,7 @@ Parser.prototype._parseSubscribe = function() { // Parse topic topic = this._parseString() if (topic === null) - return this.emit('error', new Error('Parse error - cannot parse topic')) + return this._emitError(new Error('Parse error - cannot parse topic')) qos = this._list.readUInt8(this._pos++) @@ -322,7 +320,7 @@ Parser.prototype._parseUnsubscribe = function() { // Parse topic topic = this._parseString() if (topic === null) - return this.emit('error', new Error('cannot parse topic')) + return this._emitError(new Error('cannot parse topic')) // Push topic to unsubscriptions packet.unsubscriptions.push(topic); @@ -331,7 +329,7 @@ Parser.prototype._parseUnsubscribe = function() { Parser.prototype._parseUnsuback = function() { if (!this._parseMessageId()) - return this.emit('error', new Error('cannot parse message id')) + return this._emitError(new Error('cannot parse message id')) } Parser.prototype._parseMessageId = function() { @@ -340,7 +338,7 @@ Parser.prototype._parseMessageId = function() { packet.messageId = this._parseNum() if(packet.messageId === null) { - this.emit('error', new Error('cannot parse message id')) + this._emitError(new Error('cannot parse message id')) return false } @@ -385,4 +383,20 @@ Parser.prototype._parseNum = function() { return result } +Parser.prototype._newPacket = function () { + if (this.packet) { + this._list.consume(this.packet.length) + this.emit('packet', this.packet) + } + + this.packet = new Packet() + + return true +} + +Parser.prototype._emitError = function(err) { + this.error = err + this.emit('error', err) +} + module.exports = Parser diff --git a/test.js b/test.js index b1230de..0692553 100644 --- a/test.js +++ b/test.js @@ -56,6 +56,10 @@ function testParseError(expected, fixture) { t.equal(err.message, expected, 'expected error message') }) + parser.on('packet', function() { + t.fail('parse errors should not be followed by packet events') + }) + parser.parse(fixture) }) } @@ -949,3 +953,76 @@ testParseError('cannot parse protocol id', new Buffer([ 77, 81, 73, 115, 100, 112, 77, 81, 73, 115, 100, 112 ])) + +test('stops parsing after first error', function(t) { + t.plan(4) + + var parser = mqtt.parser() + + var packetCount = 0 + var errorCount = 0 + var expectedPackets = 1 + var expectedErrors = 1 + + parser.on('packet', function(packet) { + t.ok(++packetCount <= expectedPackets, 'expected <= ' + expectedPackets + ' packets') + }) + + parser.on('error', function(err) { + t.ok(++errorCount <= expectedErrors, 'expected <= ' + expectedErrors + ' errors') + }) + + parser.parse(new Buffer([ + // first, a valid connect packet: + + 16, 12, // Header + 0, 4, // Protocol id length + 77, 81, 84, 84, // Protocol id + 4, // Protocol version + 2, // Connect flags + 0, 30, // Keepalive + 0, 0, //Client id length + + // then an invalid subscribe packet: + + 128, 9, // Header (subscribe, qos=0, length=9) + 0, 6, // message id (6) + 0, 4, // topic length, + 116, 101, 115, 116, // Topic (test) + 0, // qos (0) + + // and another invalid subscribe packet: + + 128, 9, // Header (subscribe, qos=0, length=9) + 0, 6, // message id (6) + 0, 4, // topic length, + 116, 101, 115, 116, // Topic (test) + 0, // qos (0) + + // finally, a valid disconnect packet: + + 224, 0, // Header + ])) + + // calling parse again clears the error and continues parsing + packetCount = 0 + errorCount = 0 + expectedPackets = 2 + expectedErrors = 0 + + parser.parse(new Buffer([ + // connect: + + 16, 12, // Header + 0, 4, // Protocol id length + 77, 81, 84, 84, // Protocol id + 4, // Protocol version + 2, // Connect flags + 0, 30, // Keepalive + 0, 0, //Client id length + + // disconnect: + + 224, 0, // Header + ])) +}) From c6b8f930c3b06ea3bdf5cc98c774faa8a85d12b2 Mon Sep 17 00:00:00 2001 From: Jason Diamond Date: Mon, 5 Sep 2016 13:39:40 -0700 Subject: [PATCH 2/2] update supported node versions --- .travis.yml | 9 +-------- README.md | 5 ++--- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/.travis.yml b/.travis.yml index 363e42b..8f2539a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,14 +1,7 @@ language: node_js sudo: false node_js: - - 5 + - 6 - 4 - - iojs-v3 - - iojs-v2 - - iojs-v1 - - 0.12 - - 0.10 script: npm run ci -before_install: - - node -v | grep v0.8 && npm install npm@1.4.28 -g || echo "no need to update NPM" diff --git a/README.md b/README.md index a70ac65..121813d 100644 --- a/README.md +++ b/README.md @@ -10,9 +10,8 @@ Encode and Decode MQTT 3.1.1 packets the node way. * Contributing * Licence & copyright -This library works with node v4.x, v0.12.x, v0.10.x and all iojs releases, but it requires at -least NPM 1.4. To upgrade NPM on node v0.8, run `npm install -npm@1.4.28 -g`. +This library is tested with node v4 and v6. The last version to support +older versions of node was mqtt-packet@4.1.2. Install ------------