Skip to content

Commit

Permalink
stream.emit('error', err) -> stream.destroy(err) (#127)
Browse files Browse the repository at this point in the history
  • Loading branch information
vishnureddy17 committed Feb 21, 2022
1 parent 454461a commit 2d6667a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 45 deletions.
4 changes: 4 additions & 0 deletions generate.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class Accumulator extends EventEmitter {

return result
}

destroy (err) {
if (err) this.emit('error', err)
}
}

module.exports = generate
1 change: 0 additions & 1 deletion test.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ function testWriteToStreamError (expected, fixture) {
const result = mqtt.writeToStream(fixture, stream)

t.false(result, 'result should be false')
t.end()
})
}

Expand Down
88 changes: 44 additions & 44 deletions writeToStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ function generate (packet, stream, opts) {
case 'auth':
return auth(packet, stream, opts)
default:
stream.emit('error', new Error('Unknown command'))
stream.destroy(new Error('Unknown command'))
return false
}
}
Expand Down Expand Up @@ -101,13 +101,13 @@ function connect (packet, stream, opts) {
// Must be a string and non-falsy
if (!protocolId ||
(typeof protocolId !== 'string' && !Buffer.isBuffer(protocolId))) {
stream.emit('error', new Error('Invalid protocolId'))
stream.destroy(new Error('Invalid protocolId'))
return false
} else length += protocolId.length + 2

// Must be 3 or 4 or 5
if (protocolVersion !== 3 && protocolVersion !== 4 && protocolVersion !== 5) {
stream.emit('error', new Error('Invalid protocol version'))
stream.destroy(new Error('Invalid protocol version'))
return false
} else length += 1

Expand All @@ -117,11 +117,11 @@ function connect (packet, stream, opts) {
length += Buffer.byteLength(clientId) + 2
} else {
if (protocolVersion < 4) {
stream.emit('error', new Error('clientId must be supplied before 3.1.1'))
stream.destroy(new Error('clientId must be supplied before 3.1.1'))
return false
}
if ((clean * 1) === 0) {
stream.emit('error', new Error('clientId must be given if cleanSession set to 0'))
stream.destroy(new Error('clientId must be given if cleanSession set to 0'))
return false
}
}
Expand All @@ -131,7 +131,7 @@ function connect (packet, stream, opts) {
keepalive < 0 ||
keepalive > 65535 ||
keepalive % 1 !== 0) {
stream.emit('error', new Error('Invalid keepalive'))
stream.destroy(new Error('Invalid keepalive'))
return false
} else length += 2

Expand All @@ -152,12 +152,12 @@ function connect (packet, stream, opts) {
if (will) {
// It must be an object
if (typeof will !== 'object') {
stream.emit('error', new Error('Invalid will'))
stream.destroy(new Error('Invalid will'))
return false
}
// It must have topic typeof string
if (!will.topic || typeof will.topic !== 'string') {
stream.emit('error', new Error('Invalid will topic'))
stream.destroy(new Error('Invalid will topic'))
return false
} else {
length += Buffer.byteLength(will.topic) + 2
Expand All @@ -173,7 +173,7 @@ function connect (packet, stream, opts) {
length += will.payload.length
}
} else {
stream.emit('error', new Error('Invalid will payload'))
stream.destroy(new Error('Invalid will payload'))
return false
}
}
Expand All @@ -193,22 +193,22 @@ function connect (packet, stream, opts) {
providedUsername = true
length += Buffer.byteLength(username) + 2
} else {
stream.emit('error', new Error('Invalid username'))
stream.destroy(new Error('Invalid username'))
return false
}
}

// Password
if (password != null) {
if (!providedUsername) {
stream.emit('error', new Error('Username is required to use password'))
stream.destroy(new Error('Username is required to use password'))
return false
}

if (isStringOrBuffer(password)) {
length += byteLength(password) + 2
} else {
stream.emit('error', new Error('Invalid password'))
stream.destroy(new Error('Invalid password'))
return false
}
}
Expand Down Expand Up @@ -290,7 +290,7 @@ function connack (packet, stream, opts) {

// Check return code
if (typeof rc !== 'number') {
stream.emit('error', new Error('Invalid return code'))
stream.destroy(new Error('Invalid return code'))
return false
}
// mqtt5 properties
Expand Down Expand Up @@ -330,7 +330,7 @@ function publish (packet, stream, opts) {
if (typeof topic === 'string') length += Buffer.byteLength(topic) + 2
else if (Buffer.isBuffer(topic)) length += topic.length + 2
else {
stream.emit('error', new Error('Invalid topic'))
stream.destroy(new Error('Invalid topic'))
return false
}

Expand All @@ -340,7 +340,7 @@ function publish (packet, stream, opts) {

// Message ID must a number if qos > 0
if (qos && typeof id !== 'number') {
stream.emit('error', new Error('Invalid messageId'))
stream.destroy(new Error('Invalid messageId'))
return false
} else if (qos) length += 2

Expand Down Expand Up @@ -391,7 +391,7 @@ function confirmation (packet, stream, opts) {

// Check message ID
if (typeof id !== 'number') {
stream.emit('error', new Error('Invalid messageId'))
stream.destroy(new Error('Invalid messageId'))
return false
}

Expand Down Expand Up @@ -440,7 +440,7 @@ function subscribe (packet, stream, opts) {

// Check message ID
if (typeof id !== 'number') {
stream.emit('error', new Error('Invalid messageId'))
stream.destroy(new Error('Invalid messageId'))
return false
} else length += 2

Expand All @@ -459,36 +459,36 @@ function subscribe (packet, stream, opts) {
const iqos = subs[i].qos

if (typeof itopic !== 'string') {
stream.emit('error', new Error('Invalid subscriptions - invalid topic'))
stream.destroy(new Error('Invalid subscriptions - invalid topic'))
return false
}
if (typeof iqos !== 'number') {
stream.emit('error', new Error('Invalid subscriptions - invalid qos'))
stream.destroy(new Error('Invalid subscriptions - invalid qos'))
return false
}

if (version === 5) {
const nl = subs[i].nl || false
if (typeof nl !== 'boolean') {
stream.emit('error', new Error('Invalid subscriptions - invalid No Local'))
stream.destroy(new Error('Invalid subscriptions - invalid No Local'))
return false
}
const rap = subs[i].rap || false
if (typeof rap !== 'boolean') {
stream.emit('error', new Error('Invalid subscriptions - invalid Retain as Published'))
stream.destroy(new Error('Invalid subscriptions - invalid Retain as Published'))
return false
}
const rh = subs[i].rh || 0
if (typeof rh !== 'number' || rh > 2) {
stream.emit('error', new Error('Invalid subscriptions - invalid Retain Handling'))
stream.destroy(new Error('Invalid subscriptions - invalid Retain Handling'))
return false
}
}

length += Buffer.byteLength(itopic) + 2 + 1
}
} else {
stream.emit('error', new Error('Invalid subscriptions'))
stream.destroy(new Error('Invalid subscriptions'))
return false
}

Expand Down Expand Up @@ -545,21 +545,21 @@ function suback (packet, stream, opts) {

// Check message ID
if (typeof id !== 'number') {
stream.emit('error', new Error('Invalid messageId'))
stream.destroy(new Error('Invalid messageId'))
return false
} else length += 2

// Check granted qos vector
if (typeof granted === 'object' && granted.length) {
for (let i = 0; i < granted.length; i += 1) {
if (typeof granted[i] !== 'number') {
stream.emit('error', new Error('Invalid qos vector'))
stream.destroy(new Error('Invalid qos vector'))
return false
}
length += 1
}
} else {
stream.emit('error', new Error('Invalid qos vector'))
stream.destroy(new Error('Invalid qos vector'))
return false
}

Expand Down Expand Up @@ -600,7 +600,7 @@ function unsubscribe (packet, stream, opts) {

// Check message ID
if (typeof id !== 'number') {
stream.emit('error', new Error('Invalid messageId'))
stream.destroy(new Error('Invalid messageId'))
return false
} else {
length += 2
Expand All @@ -609,13 +609,13 @@ function unsubscribe (packet, stream, opts) {
if (typeof unsubs === 'object' && unsubs.length) {
for (let i = 0; i < unsubs.length; i += 1) {
if (typeof unsubs[i] !== 'string') {
stream.emit('error', new Error('Invalid unsubscriptions'))
stream.destroy(new Error('Invalid unsubscriptions'))
return false
}
length += Buffer.byteLength(unsubs[i]) + 2
}
} else {
stream.emit('error', new Error('Invalid unsubscriptions'))
stream.destroy(new Error('Invalid unsubscriptions'))
return false
}
// properies mqtt 5
Expand Down Expand Up @@ -663,7 +663,7 @@ function unsuback (packet, stream, opts) {

// Check message ID
if (typeof id !== 'number') {
stream.emit('error', new Error('Invalid messageId'))
stream.destroy(new Error('Invalid messageId'))
return false
}

Expand All @@ -672,13 +672,13 @@ function unsuback (packet, stream, opts) {
if (typeof granted === 'object' && granted.length) {
for (let i = 0; i < granted.length; i += 1) {
if (typeof granted[i] !== 'number') {
stream.emit('error', new Error('Invalid qos vector'))
stream.destroy(new Error('Invalid qos vector'))
return false
}
length += 1
}
} else {
stream.emit('error', new Error('Invalid qos vector'))
stream.destroy(new Error('Invalid qos vector'))
return false
}
}
Expand Down Expand Up @@ -757,7 +757,7 @@ function auth (packet, stream, opts) {
const properties = settings.properties
let length = version === 5 ? 1 : 0

if (version !== 5) stream.emit('error', new Error('Invalid mqtt version for auth packet'))
if (version !== 5) stream.destroy(new Error('Invalid mqtt version for auth packet'))

// properies mqtt 5
const propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
Expand Down Expand Up @@ -794,7 +794,7 @@ function auth (packet, stream, opts) {
const varByteIntCache = {}
function writeVarByteInt (stream, num) {
if (num > protocol.VARBYTEINT_MAX) {
stream.emit('error', new Error(`Invalid variable byte integer: ${num}`))
stream.destroy(new Error(`Invalid variable byte integer: ${num}`))
return false
}

Expand Down Expand Up @@ -901,39 +901,39 @@ function getProperties (stream, properties) {
switch (type) {
case 'byte': {
if (typeof value !== 'boolean') {
stream.emit('error', new Error(`Invalid ${name}: ${value}`))
stream.destroy(new Error(`Invalid ${name}: ${value}`))
return false
}
length += 1 + 1
break
}
case 'int8': {
if (typeof value !== 'number' || value < 0 || value > 0xff) {
stream.emit('error', new Error(`Invalid ${name}: ${value}`))
stream.destroy(new Error(`Invalid ${name}: ${value}`))
return false
}
length += 1 + 1
break
}
case 'binary': {
if (value && value === null) {
stream.emit('error', new Error(`Invalid ${name}: ${value}`))
stream.destroy(new Error(`Invalid ${name}: ${value}`))
return false
}
length += 1 + Buffer.byteLength(value) + 2
break
}
case 'int16': {
if (typeof value !== 'number' || value < 0 || value > 0xffff) {
stream.emit('error', new Error(`Invalid ${name}: ${value}`))
stream.destroy(new Error(`Invalid ${name}: ${value}`))
return false
}
length += 1 + 2
break
}
case 'int32': {
if (typeof value !== 'number' || value < 0 || value > 0xffffffff) {
stream.emit('error', new Error(`Invalid ${name}: ${value}`))
stream.destroy(new Error(`Invalid ${name}: ${value}`))
return false
}
length += 1 + 4
Expand All @@ -942,23 +942,23 @@ function getProperties (stream, properties) {
case 'var': {
// var byte integer is max 24 bits packed in 32 bits
if (typeof value !== 'number' || value < 0 || value > 0x0fffffff) {
stream.emit('error', new Error(`Invalid ${name}: ${value}`))
stream.destroy(new Error(`Invalid ${name}: ${value}`))
return false
}
length += 1 + Buffer.byteLength(genBufVariableByteInt(value))
break
}
case 'string': {
if (typeof value !== 'string') {
stream.emit('error', new Error(`Invalid ${name}: ${value}`))
stream.destroy(new Error(`Invalid ${name}: ${value}`))
return false
}
length += 1 + 2 + Buffer.byteLength(value.toString())
break
}
case 'pair': {
if (typeof value !== 'object') {
stream.emit('error', new Error(`Invalid ${name}: ${value}`))
stream.destroy(new Error(`Invalid ${name}: ${value}`))
return false
}
length += Object.getOwnPropertyNames(value).reduce((result, name) => {
Expand All @@ -976,7 +976,7 @@ function getProperties (stream, properties) {
break
}
default: {
stream.emit('error', new Error(`Invalid property ${name}: ${value}`))
stream.destroy(new Error(`Invalid property ${name}: ${value}`))
return false
}
}
Expand Down Expand Up @@ -1085,7 +1085,7 @@ function writeProperty (stream, propName, value) {
break
}
default: {
stream.emit('error', new Error(`Invalid property ${propName} value: ${value}`))
stream.destroy(new Error(`Invalid property ${propName} value: ${value}`))
return false
}
}
Expand Down

0 comments on commit 2d6667a

Please sign in to comment.