Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

return on first error to avoid writing invalid bytes #16

Merged
merged 2 commits into from
Sep 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,21 @@ function testParseGenerateDefaults(name, object, buffer, opts) {
})
}

function testWriteToStreamError(expected, fixture) {
test('writeToStream ' + expected + ' error', function(t) {
t.plan(2)

var stream = WS()

stream.write = () => t.fail('should not have called write')
stream.on('error', () => t.pass('error emitted'))

var result = mqtt.writeToStream(fixture, stream)

t.false(result, 'result should be false')
})
}

testParseGenerate('minimal connect', {
cmd: 'connect'
, retain: false
Expand Down Expand Up @@ -1026,3 +1041,18 @@ test('stops parsing after first error', function(t) {
224, 0, // Header
]))
})

testWriteToStreamError('Invalid protocol id', {
cmd: 'connect',
protocolId: {}
})

testWriteToStreamError('Invalid topic', {
cmd: 'publish',
topic: {}
})

testWriteToStreamError('Invalid message id', {
cmd: 'subscribe',
mid: {}
})
39 changes: 33 additions & 6 deletions writeToStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ function connect(opts, stream) {
if (!protocolId ||
(typeof protocolId !== "string" && !Buffer.isBuffer(protocolId))) {
stream.emit('error', new Error('Invalid protocol id'))
return false
} else {
length += protocolId.length + 2
}

// Must be 3 or 4
if (protocolVersion !== 3 && protocolVersion !== 4) {
stream.emit('error', new Error('Invalid protocol version'))
return false
} else {
length += 1
}
Expand All @@ -87,11 +89,13 @@ function connect(opts, stream) {
} else {

if (protocolVersion < 4) {
stream.emit('error', new Error('clientId must be supplied before 3.1.1'));
stream.emit('error', new Error('clientId must be supplied before 3.1.1'))
return false
}

if (clean == 0) {
stream.emit('error', new Error('clientId must be given if cleanSession set to 0'));
stream.emit('error', new Error('clientId must be given if cleanSession set to 0'))
return false
}
}

Expand All @@ -101,6 +105,7 @@ function connect(opts, stream) {
keepalive > 65535 ||
keepalive % 1 !== 0) {
stream.emit('error', new Error('Invalid keepalive'))
return false
} else {
length += 2
}
Expand All @@ -113,10 +118,12 @@ function connect(opts, stream) {
// It must be an object
if ('object' !== typeof will) {
stream.emit('error', new Error('Invalid will'))
return false
}
// It must have topic typeof string
if (!will.topic || 'string' !== typeof will.topic) {
stream.emit('error', new Error('Invalid will topic'))
return false
} else {
length += Buffer.byteLength(will.topic) + 2
}
Expand All @@ -131,6 +138,7 @@ function connect(opts, stream) {
}
} else {
stream.emit('error', new Error('Invalid will payload'))
return false
}
} else {
length += 2
Expand All @@ -143,6 +151,7 @@ function connect(opts, stream) {
length += Buffer.byteLength(username) + 2
} else {
stream.emit('error', new Error('Invalid username'))
return false
}
}

Expand All @@ -152,6 +161,7 @@ function connect(opts, stream) {
length += byteLength(password) + 2
} else {
stream.emit('error', new Error('Invalid password'))
return false
}
}

Expand Down Expand Up @@ -211,8 +221,10 @@ function connack(opts, stream) {
, rc = opts.returnCode;

// Check return code
if ('number' !== typeof rc)
stream.emit('error', new Error('Invalid return code'));
if ('number' !== typeof rc) {
stream.emit('error', new Error('Invalid return code'))
return false
}

stream.write(protocol.CONNACK_HEADER);
writeLength(stream, 2);
Expand All @@ -237,8 +249,10 @@ function publish(opts, stream) {
length += Buffer.byteLength(topic) + 2;
else if (Buffer.isBuffer(topic))
length += topic.length + 2;
else
else {
stream.emit('error', new Error('Invalid topic'));
return false;
}

// get the payload length
if (!Buffer.isBuffer(payload)) {
Expand All @@ -250,6 +264,7 @@ function publish(opts, stream) {
// Message id must a number if qos > 0
if (qos && 'number' !== typeof id) {
stream.emit('error', new Error('Invalid message id'))
return false
} else if (qos) {
length += 2;
}
Expand Down Expand Up @@ -285,8 +300,10 @@ function confirmation(opts, stream) {
qos = 1

// Check message ID
if ('number' !== typeof id)
if ('number' !== typeof id) {
stream.emit('error', new Error('Invalid message id'));
return false
}

// Header
stream.write(protocol.ACKS[type][qos][dup][0])
Expand All @@ -310,6 +327,7 @@ function subscribe(opts, stream) {
// Check mid
if ('number' !== typeof id) {
stream.emit('error', new Error('Invalid message id'));
return false
} else {
length += 2;
}
Expand All @@ -321,15 +339,18 @@ function subscribe(opts, stream) {

if ('string' !== typeof topic) {
stream.emit('error', new Error('Invalid subscriptions - invalid topic'));
return false
}
if ('number' !== typeof qos) {
stream.emit('error', new Error('Invalid subscriptions - invalid qos'));
return false;
}

length += Buffer.byteLength(topic) + 2 + 1;
}
} else {
stream.emit('error', new Error('Invalid subscriptions'));
return false;
}

// Generate header
Expand Down Expand Up @@ -368,6 +389,7 @@ function suback(opts, stream) {
// Check message id
if ('number' !== typeof id) {
stream.emit('error', new Error('Invalid message id'));
return false;
} else {
length += 2;
}
Expand All @@ -376,11 +398,13 @@ function suback(opts, stream) {
for (var i = 0; i < granted.length; i += 1) {
if ('number' !== typeof granted[i]) {
stream.emit('error', new Error('Invalid qos vector'));
return false;
}
length += 1;
}
} else {
stream.emit('error', new Error('Invalid qos vector'));
return false;
}

// header
Expand All @@ -406,6 +430,7 @@ function unsubscribe(opts, stream) {
// Check message id
if ('number' !== typeof id) {
stream.emit('error', new Error('Invalid message id'));
return false;
} else {
length += 2;
}
Expand All @@ -414,11 +439,13 @@ function unsubscribe(opts, stream) {
for (var i = 0; i < unsubs.length; i += 1) {
if ('string' !== typeof unsubs[i]) {
stream.emit('error', new Error('Invalid unsubscriptions'));
return false;
}
length += Buffer.byteLength(unsubs[i]) + 2;
}
} else {
stream.emit('error', new Error('Invalid unsubscriptions'));
return false;
}

// Header
Expand Down