From 9f3cc95ee5a0181d637d0b406533e518e00f5df0 Mon Sep 17 00:00:00 2001 From: Jason Diamond Date: Fri, 9 Sep 2016 00:07:48 -0700 Subject: [PATCH 1/2] return on first error to avoid writing invalid bytes --- test.js | 28 +++++++++++++++++++++++++++ writeToStream.js | 50 ++++++++++++++++++++++++------------------------ 2 files changed, 53 insertions(+), 25 deletions(-) diff --git a/test.js b/test.js index 0692553..1c56a7f 100644 --- a/test.js +++ b/test.js @@ -99,6 +99,19 @@ function testParseGenerateDefaults(name, object, buffer, opts) { }) } +function testWriteToStreamError(expected, fixture) { + test('writeToStream ' + expected + ' error', function(t) { + t.plan(1) + + var stream = WS() + + stream.write = () => t.fail('should not have called write') + stream.on('error', () => t.pass('error emitted')) + + mqtt.writeToStream(fixture, stream) + }) +} + testParseGenerate('minimal connect', { cmd: 'connect' , retain: false @@ -1026,3 +1039,18 @@ test('stops parsing after first error', function(t) { 224, 0, // Header ])) }) + +testWriteToStreamError('Invalid protocol id', { + cmd: 'connect', + protocolId: {} +}) + +testWriteToStreamError('Invalid topic', { + cmd: 'publish', + topic: {} +}) + +testWriteToStreamError('Invalid message id', { + cmd: 'subscribe', + mid: {} +}) diff --git a/writeToStream.js b/writeToStream.js index ede9db8..032608d 100644 --- a/writeToStream.js +++ b/writeToStream.js @@ -37,7 +37,7 @@ function generate(packet, stream) { case 'disconnect': return emptyPacket(packet, stream); default: - stream.emit('error', new Error('unknown command')); + return stream.emit('error', new Error('unknown command')); return false; } } @@ -66,14 +66,14 @@ function connect(opts, stream) { // Must be a string and non-falsy if (!protocolId || (typeof protocolId !== "string" && !Buffer.isBuffer(protocolId))) { - stream.emit('error', new Error('Invalid protocol id')) + return stream.emit('error', new Error('Invalid protocol id')) } else { length += protocolId.length + 2 } // Must be 3 or 4 if (protocolVersion !== 3 && protocolVersion !== 4) { - stream.emit('error', new Error('Invalid protocol version')) + return stream.emit('error', new Error('Invalid protocol version')) } else { length += 1 } @@ -87,11 +87,11 @@ function connect(opts, stream) { } else { if (protocolVersion < 4) { - stream.emit('error', new Error('clientId must be supplied before 3.1.1')); + return stream.emit('error', new Error('clientId must be supplied before 3.1.1')); } if (clean == 0) { - stream.emit('error', new Error('clientId must be given if cleanSession set to 0')); + return stream.emit('error', new Error('clientId must be given if cleanSession set to 0')); } } @@ -100,7 +100,7 @@ function connect(opts, stream) { keepalive < 0 || keepalive > 65535 || keepalive % 1 !== 0) { - stream.emit('error', new Error('Invalid keepalive')) + return stream.emit('error', new Error('Invalid keepalive')) } else { length += 2 } @@ -112,11 +112,11 @@ function connect(opts, stream) { if (will) { // It must be an object if ('object' !== typeof will) { - stream.emit('error', new Error('Invalid will')) + return stream.emit('error', new Error('Invalid will')) } // It must have topic typeof string if (!will.topic || 'string' !== typeof will.topic) { - stream.emit('error', new Error('Invalid will topic')) + return stream.emit('error', new Error('Invalid will topic')) } else { length += Buffer.byteLength(will.topic) + 2 } @@ -130,7 +130,7 @@ function connect(opts, stream) { length += will.payload.length + 2 } } else { - stream.emit('error', new Error('Invalid will payload')) + return stream.emit('error', new Error('Invalid will payload')) } } else { length += 2 @@ -142,7 +142,7 @@ function connect(opts, stream) { if (username.length) { length += Buffer.byteLength(username) + 2 } else { - stream.emit('error', new Error('Invalid username')) + return stream.emit('error', new Error('Invalid username')) } } @@ -151,7 +151,7 @@ function connect(opts, stream) { if (password.length) { length += byteLength(password) + 2 } else { - stream.emit('error', new Error('Invalid password')) + return stream.emit('error', new Error('Invalid password')) } } @@ -212,7 +212,7 @@ function connack(opts, stream) { // Check return code if ('number' !== typeof rc) - stream.emit('error', new Error('Invalid return code')); + return stream.emit('error', new Error('Invalid return code')); stream.write(protocol.CONNACK_HEADER); writeLength(stream, 2); @@ -238,7 +238,7 @@ function publish(opts, stream) { else if (Buffer.isBuffer(topic)) length += topic.length + 2; else - stream.emit('error', new Error('Invalid topic')); + return stream.emit('error', new Error('Invalid topic')); // get the payload length if (!Buffer.isBuffer(payload)) { @@ -249,7 +249,7 @@ function publish(opts, stream) { // Message id must a number if qos > 0 if (qos && 'number' !== typeof id) { - stream.emit('error', new Error('Invalid message id')) + return stream.emit('error', new Error('Invalid message id')) } else if (qos) { length += 2; } @@ -286,7 +286,7 @@ function confirmation(opts, stream) { // Check message ID if ('number' !== typeof id) - stream.emit('error', new Error('Invalid message id')); + return stream.emit('error', new Error('Invalid message id')); // Header stream.write(protocol.ACKS[type][qos][dup][0]) @@ -309,7 +309,7 @@ function subscribe(opts, stream) { // Check mid if ('number' !== typeof id) { - stream.emit('error', new Error('Invalid message id')); + return stream.emit('error', new Error('Invalid message id')); } else { length += 2; } @@ -320,16 +320,16 @@ function subscribe(opts, stream) { , qos = subs[i].qos; if ('string' !== typeof topic) { - stream.emit('error', new Error('Invalid subscriptions - invalid topic')); + return stream.emit('error', new Error('Invalid subscriptions - invalid topic')); } if ('number' !== typeof qos) { - stream.emit('error', new Error('Invalid subscriptions - invalid qos')); + return stream.emit('error', new Error('Invalid subscriptions - invalid qos')); } length += Buffer.byteLength(topic) + 2 + 1; } } else { - stream.emit('error', new Error('Invalid subscriptions')); + return stream.emit('error', new Error('Invalid subscriptions')); } // Generate header @@ -367,7 +367,7 @@ function suback(opts, stream) { // Check message id if ('number' !== typeof id) { - stream.emit('error', new Error('Invalid message id')); + return stream.emit('error', new Error('Invalid message id')); } else { length += 2; } @@ -375,12 +375,12 @@ function suback(opts, stream) { if ('object' === typeof granted && granted.length) { for (var i = 0; i < granted.length; i += 1) { if ('number' !== typeof granted[i]) { - stream.emit('error', new Error('Invalid qos vector')); + return stream.emit('error', new Error('Invalid qos vector')); } length += 1; } } else { - stream.emit('error', new Error('Invalid qos vector')); + return stream.emit('error', new Error('Invalid qos vector')); } // header @@ -405,7 +405,7 @@ function unsubscribe(opts, stream) { // Check message id if ('number' !== typeof id) { - stream.emit('error', new Error('Invalid message id')); + return stream.emit('error', new Error('Invalid message id')); } else { length += 2; } @@ -413,12 +413,12 @@ function unsubscribe(opts, stream) { if ('object' === typeof unsubs && unsubs.length) { for (var i = 0; i < unsubs.length; i += 1) { if ('string' !== typeof unsubs[i]) { - stream.emit('error', new Error('Invalid unsubscriptions')); + return stream.emit('error', new Error('Invalid unsubscriptions')); } length += Buffer.byteLength(unsubs[i]) + 2; } } else { - stream.emit('error', new Error('Invalid unsubscriptions')); + return stream.emit('error', new Error('Invalid unsubscriptions')); } // Header From d9a3c1094b122a12522028f766aeeebc858c588e Mon Sep 17 00:00:00 2001 From: Jason Diamond Date: Mon, 12 Sep 2016 19:36:23 -0700 Subject: [PATCH 2/2] return false from writeToStream on error --- test.js | 6 ++-- writeToStream.js | 83 ++++++++++++++++++++++++++++++++---------------- 2 files changed, 59 insertions(+), 30 deletions(-) diff --git a/test.js b/test.js index 1c56a7f..ae774f4 100644 --- a/test.js +++ b/test.js @@ -101,14 +101,16 @@ function testParseGenerateDefaults(name, object, buffer, opts) { function testWriteToStreamError(expected, fixture) { test('writeToStream ' + expected + ' error', function(t) { - t.plan(1) + t.plan(2) var stream = WS() stream.write = () => t.fail('should not have called write') stream.on('error', () => t.pass('error emitted')) - mqtt.writeToStream(fixture, stream) + var result = mqtt.writeToStream(fixture, stream) + + t.false(result, 'result should be false') }) } diff --git a/writeToStream.js b/writeToStream.js index 032608d..b811f09 100644 --- a/writeToStream.js +++ b/writeToStream.js @@ -37,7 +37,7 @@ function generate(packet, stream) { case 'disconnect': return emptyPacket(packet, stream); default: - return stream.emit('error', new Error('unknown command')); + stream.emit('error', new Error('unknown command')); return false; } } @@ -66,14 +66,16 @@ function connect(opts, stream) { // Must be a string and non-falsy if (!protocolId || (typeof protocolId !== "string" && !Buffer.isBuffer(protocolId))) { - return stream.emit('error', new Error('Invalid protocol id')) + stream.emit('error', new Error('Invalid protocol id')) + return false } else { length += protocolId.length + 2 } // Must be 3 or 4 if (protocolVersion !== 3 && protocolVersion !== 4) { - return stream.emit('error', new Error('Invalid protocol version')) + stream.emit('error', new Error('Invalid protocol version')) + return false } else { length += 1 } @@ -87,11 +89,13 @@ function connect(opts, stream) { } else { if (protocolVersion < 4) { - return stream.emit('error', new Error('clientId must be supplied before 3.1.1')); + stream.emit('error', new Error('clientId must be supplied before 3.1.1')) + return false } if (clean == 0) { - return stream.emit('error', new Error('clientId must be given if cleanSession set to 0')); + stream.emit('error', new Error('clientId must be given if cleanSession set to 0')) + return false } } @@ -100,7 +104,8 @@ function connect(opts, stream) { keepalive < 0 || keepalive > 65535 || keepalive % 1 !== 0) { - return stream.emit('error', new Error('Invalid keepalive')) + stream.emit('error', new Error('Invalid keepalive')) + return false } else { length += 2 } @@ -112,11 +117,13 @@ function connect(opts, stream) { if (will) { // It must be an object if ('object' !== typeof will) { - return stream.emit('error', new Error('Invalid will')) + stream.emit('error', new Error('Invalid will')) + return false } // It must have topic typeof string if (!will.topic || 'string' !== typeof will.topic) { - return stream.emit('error', new Error('Invalid will topic')) + stream.emit('error', new Error('Invalid will topic')) + return false } else { length += Buffer.byteLength(will.topic) + 2 } @@ -130,7 +137,8 @@ function connect(opts, stream) { length += will.payload.length + 2 } } else { - return stream.emit('error', new Error('Invalid will payload')) + stream.emit('error', new Error('Invalid will payload')) + return false } } else { length += 2 @@ -142,7 +150,8 @@ function connect(opts, stream) { if (username.length) { length += Buffer.byteLength(username) + 2 } else { - return stream.emit('error', new Error('Invalid username')) + stream.emit('error', new Error('Invalid username')) + return false } } @@ -151,7 +160,8 @@ function connect(opts, stream) { if (password.length) { length += byteLength(password) + 2 } else { - return stream.emit('error', new Error('Invalid password')) + stream.emit('error', new Error('Invalid password')) + return false } } @@ -211,8 +221,10 @@ function connack(opts, stream) { , rc = opts.returnCode; // Check return code - if ('number' !== typeof rc) - return stream.emit('error', new Error('Invalid return code')); + if ('number' !== typeof rc) { + stream.emit('error', new Error('Invalid return code')) + return false + } stream.write(protocol.CONNACK_HEADER); writeLength(stream, 2); @@ -237,8 +249,10 @@ function publish(opts, stream) { length += Buffer.byteLength(topic) + 2; else if (Buffer.isBuffer(topic)) length += topic.length + 2; - else - return stream.emit('error', new Error('Invalid topic')); + else { + stream.emit('error', new Error('Invalid topic')); + return false; + } // get the payload length if (!Buffer.isBuffer(payload)) { @@ -249,7 +263,8 @@ function publish(opts, stream) { // Message id must a number if qos > 0 if (qos && 'number' !== typeof id) { - return stream.emit('error', new Error('Invalid message id')) + stream.emit('error', new Error('Invalid message id')) + return false } else if (qos) { length += 2; } @@ -285,8 +300,10 @@ function confirmation(opts, stream) { qos = 1 // Check message ID - if ('number' !== typeof id) - return stream.emit('error', new Error('Invalid message id')); + if ('number' !== typeof id) { + stream.emit('error', new Error('Invalid message id')); + return false + } // Header stream.write(protocol.ACKS[type][qos][dup][0]) @@ -309,7 +326,8 @@ function subscribe(opts, stream) { // Check mid if ('number' !== typeof id) { - return stream.emit('error', new Error('Invalid message id')); + stream.emit('error', new Error('Invalid message id')); + return false } else { length += 2; } @@ -320,16 +338,19 @@ function subscribe(opts, stream) { , qos = subs[i].qos; if ('string' !== typeof topic) { - return stream.emit('error', new Error('Invalid subscriptions - invalid topic')); + stream.emit('error', new Error('Invalid subscriptions - invalid topic')); + return false } if ('number' !== typeof qos) { - return stream.emit('error', new Error('Invalid subscriptions - invalid qos')); + stream.emit('error', new Error('Invalid subscriptions - invalid qos')); + return false; } length += Buffer.byteLength(topic) + 2 + 1; } } else { - return stream.emit('error', new Error('Invalid subscriptions')); + stream.emit('error', new Error('Invalid subscriptions')); + return false; } // Generate header @@ -367,7 +388,8 @@ function suback(opts, stream) { // Check message id if ('number' !== typeof id) { - return stream.emit('error', new Error('Invalid message id')); + stream.emit('error', new Error('Invalid message id')); + return false; } else { length += 2; } @@ -375,12 +397,14 @@ function suback(opts, stream) { if ('object' === typeof granted && granted.length) { for (var i = 0; i < granted.length; i += 1) { if ('number' !== typeof granted[i]) { - return stream.emit('error', new Error('Invalid qos vector')); + stream.emit('error', new Error('Invalid qos vector')); + return false; } length += 1; } } else { - return stream.emit('error', new Error('Invalid qos vector')); + stream.emit('error', new Error('Invalid qos vector')); + return false; } // header @@ -405,7 +429,8 @@ function unsubscribe(opts, stream) { // Check message id if ('number' !== typeof id) { - return stream.emit('error', new Error('Invalid message id')); + stream.emit('error', new Error('Invalid message id')); + return false; } else { length += 2; } @@ -413,12 +438,14 @@ function unsubscribe(opts, stream) { if ('object' === typeof unsubs && unsubs.length) { for (var i = 0; i < unsubs.length; i += 1) { if ('string' !== typeof unsubs[i]) { - return stream.emit('error', new Error('Invalid unsubscriptions')); + stream.emit('error', new Error('Invalid unsubscriptions')); + return false; } length += Buffer.byteLength(unsubs[i]) + 2; } } else { - return stream.emit('error', new Error('Invalid unsubscriptions')); + stream.emit('error', new Error('Invalid unsubscriptions')); + return false; } // Header