diff --git a/test.js b/test.js index 0692553..ae774f4 100644 --- a/test.js +++ b/test.js @@ -99,6 +99,21 @@ function testParseGenerateDefaults(name, object, buffer, opts) { }) } +function testWriteToStreamError(expected, fixture) { + test('writeToStream ' + expected + ' error', function(t) { + t.plan(2) + + var stream = WS() + + stream.write = () => t.fail('should not have called write') + stream.on('error', () => t.pass('error emitted')) + + var result = mqtt.writeToStream(fixture, stream) + + t.false(result, 'result should be false') + }) +} + testParseGenerate('minimal connect', { cmd: 'connect' , retain: false @@ -1026,3 +1041,18 @@ test('stops parsing after first error', function(t) { 224, 0, // Header ])) }) + +testWriteToStreamError('Invalid protocol id', { + cmd: 'connect', + protocolId: {} +}) + +testWriteToStreamError('Invalid topic', { + cmd: 'publish', + topic: {} +}) + +testWriteToStreamError('Invalid message id', { + cmd: 'subscribe', + mid: {} +}) diff --git a/writeToStream.js b/writeToStream.js index ede9db8..b811f09 100644 --- a/writeToStream.js +++ b/writeToStream.js @@ -67,6 +67,7 @@ function connect(opts, stream) { if (!protocolId || (typeof protocolId !== "string" && !Buffer.isBuffer(protocolId))) { stream.emit('error', new Error('Invalid protocol id')) + return false } else { length += protocolId.length + 2 } @@ -74,6 +75,7 @@ function connect(opts, stream) { // Must be 3 or 4 if (protocolVersion !== 3 && protocolVersion !== 4) { stream.emit('error', new Error('Invalid protocol version')) + return false } else { length += 1 } @@ -87,11 +89,13 @@ function connect(opts, stream) { } else { if (protocolVersion < 4) { - stream.emit('error', new Error('clientId must be supplied before 3.1.1')); + stream.emit('error', new Error('clientId must be supplied before 3.1.1')) + return false } if (clean == 0) { - stream.emit('error', new Error('clientId must be given if cleanSession set to 0')); + stream.emit('error', new Error('clientId must be given if cleanSession set to 0')) + return false } } @@ -101,6 +105,7 @@ function connect(opts, stream) { keepalive > 65535 || keepalive % 1 !== 0) { stream.emit('error', new Error('Invalid keepalive')) + return false } else { length += 2 } @@ -113,10 +118,12 @@ function connect(opts, stream) { // It must be an object if ('object' !== typeof will) { stream.emit('error', new Error('Invalid will')) + return false } // It must have topic typeof string if (!will.topic || 'string' !== typeof will.topic) { stream.emit('error', new Error('Invalid will topic')) + return false } else { length += Buffer.byteLength(will.topic) + 2 } @@ -131,6 +138,7 @@ function connect(opts, stream) { } } else { stream.emit('error', new Error('Invalid will payload')) + return false } } else { length += 2 @@ -143,6 +151,7 @@ function connect(opts, stream) { length += Buffer.byteLength(username) + 2 } else { stream.emit('error', new Error('Invalid username')) + return false } } @@ -152,6 +161,7 @@ function connect(opts, stream) { length += byteLength(password) + 2 } else { stream.emit('error', new Error('Invalid password')) + return false } } @@ -211,8 +221,10 @@ function connack(opts, stream) { , rc = opts.returnCode; // Check return code - if ('number' !== typeof rc) - stream.emit('error', new Error('Invalid return code')); + if ('number' !== typeof rc) { + stream.emit('error', new Error('Invalid return code')) + return false + } stream.write(protocol.CONNACK_HEADER); writeLength(stream, 2); @@ -237,8 +249,10 @@ function publish(opts, stream) { length += Buffer.byteLength(topic) + 2; else if (Buffer.isBuffer(topic)) length += topic.length + 2; - else + else { stream.emit('error', new Error('Invalid topic')); + return false; + } // get the payload length if (!Buffer.isBuffer(payload)) { @@ -250,6 +264,7 @@ function publish(opts, stream) { // Message id must a number if qos > 0 if (qos && 'number' !== typeof id) { stream.emit('error', new Error('Invalid message id')) + return false } else if (qos) { length += 2; } @@ -285,8 +300,10 @@ function confirmation(opts, stream) { qos = 1 // Check message ID - if ('number' !== typeof id) + if ('number' !== typeof id) { stream.emit('error', new Error('Invalid message id')); + return false + } // Header stream.write(protocol.ACKS[type][qos][dup][0]) @@ -310,6 +327,7 @@ function subscribe(opts, stream) { // Check mid if ('number' !== typeof id) { stream.emit('error', new Error('Invalid message id')); + return false } else { length += 2; } @@ -321,15 +339,18 @@ function subscribe(opts, stream) { if ('string' !== typeof topic) { stream.emit('error', new Error('Invalid subscriptions - invalid topic')); + return false } if ('number' !== typeof qos) { stream.emit('error', new Error('Invalid subscriptions - invalid qos')); + return false; } length += Buffer.byteLength(topic) + 2 + 1; } } else { stream.emit('error', new Error('Invalid subscriptions')); + return false; } // Generate header @@ -368,6 +389,7 @@ function suback(opts, stream) { // Check message id if ('number' !== typeof id) { stream.emit('error', new Error('Invalid message id')); + return false; } else { length += 2; } @@ -376,11 +398,13 @@ function suback(opts, stream) { for (var i = 0; i < granted.length; i += 1) { if ('number' !== typeof granted[i]) { stream.emit('error', new Error('Invalid qos vector')); + return false; } length += 1; } } else { stream.emit('error', new Error('Invalid qos vector')); + return false; } // header @@ -406,6 +430,7 @@ function unsubscribe(opts, stream) { // Check message id if ('number' !== typeof id) { stream.emit('error', new Error('Invalid message id')); + return false; } else { length += 2; } @@ -414,11 +439,13 @@ function unsubscribe(opts, stream) { for (var i = 0; i < unsubs.length; i += 1) { if ('string' !== typeof unsubs[i]) { stream.emit('error', new Error('Invalid unsubscriptions')); + return false; } length += Buffer.byteLength(unsubs[i]) + 2; } } else { stream.emit('error', new Error('Invalid unsubscriptions')); + return false; } // Header