Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream.emit('error', err) -> stream.destroy(err) #127

Merged
merged 3 commits into from
Feb 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions generate.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class Accumulator extends EventEmitter {

return result
}

destroy (err) {
if (err) this.emit('error', err)
}
}

module.exports = generate
1 change: 0 additions & 1 deletion test.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ function testWriteToStreamError (expected, fixture) {
const result = mqtt.writeToStream(fixture, stream)

t.false(result, 'result should be false')
t.end()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, this test was failing because destroy() emits error asynchronously.

Removing t.end() here is fine because the test will automatically end when two assertions are hit and will fail the test if more than 2 are hit (per the tape documentation)

})
}

Expand Down
88 changes: 44 additions & 44 deletions writeToStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ function generate (packet, stream, opts) {
case 'auth':
return auth(packet, stream, opts)
default:
stream.emit('error', new Error('Unknown command'))
stream.destroy(new Error('Unknown command'))
return false
}
}
Expand Down Expand Up @@ -101,13 +101,13 @@ function connect (packet, stream, opts) {
// Must be a string and non-falsy
if (!protocolId ||
(typeof protocolId !== 'string' && !Buffer.isBuffer(protocolId))) {
stream.emit('error', new Error('Invalid protocolId'))
stream.destroy(new Error('Invalid protocolId'))
return false
} else length += protocolId.length + 2

// Must be 3 or 4 or 5
if (protocolVersion !== 3 && protocolVersion !== 4 && protocolVersion !== 5) {
stream.emit('error', new Error('Invalid protocol version'))
stream.destroy(new Error('Invalid protocol version'))
return false
} else length += 1

Expand All @@ -117,11 +117,11 @@ function connect (packet, stream, opts) {
length += Buffer.byteLength(clientId) + 2
} else {
if (protocolVersion < 4) {
stream.emit('error', new Error('clientId must be supplied before 3.1.1'))
stream.destroy(new Error('clientId must be supplied before 3.1.1'))
return false
}
if ((clean * 1) === 0) {
stream.emit('error', new Error('clientId must be given if cleanSession set to 0'))
stream.destroy(new Error('clientId must be given if cleanSession set to 0'))
return false
}
}
Expand All @@ -131,7 +131,7 @@ function connect (packet, stream, opts) {
keepalive < 0 ||
keepalive > 65535 ||
keepalive % 1 !== 0) {
stream.emit('error', new Error('Invalid keepalive'))
stream.destroy(new Error('Invalid keepalive'))
return false
} else length += 2

Expand All @@ -152,12 +152,12 @@ function connect (packet, stream, opts) {
if (will) {
// It must be an object
if (typeof will !== 'object') {
stream.emit('error', new Error('Invalid will'))
stream.destroy(new Error('Invalid will'))
return false
}
// It must have topic typeof string
if (!will.topic || typeof will.topic !== 'string') {
stream.emit('error', new Error('Invalid will topic'))
stream.destroy(new Error('Invalid will topic'))
return false
} else {
length += Buffer.byteLength(will.topic) + 2
Expand All @@ -173,7 +173,7 @@ function connect (packet, stream, opts) {
length += will.payload.length
}
} else {
stream.emit('error', new Error('Invalid will payload'))
stream.destroy(new Error('Invalid will payload'))
return false
}
}
Expand All @@ -193,22 +193,22 @@ function connect (packet, stream, opts) {
providedUsername = true
length += Buffer.byteLength(username) + 2
} else {
stream.emit('error', new Error('Invalid username'))
stream.destroy(new Error('Invalid username'))
return false
}
}

// Password
if (password != null) {
if (!providedUsername) {
stream.emit('error', new Error('Username is required to use password'))
stream.destroy(new Error('Username is required to use password'))
return false
}

if (isStringOrBuffer(password)) {
length += byteLength(password) + 2
} else {
stream.emit('error', new Error('Invalid password'))
stream.destroy(new Error('Invalid password'))
return false
}
}
Expand Down Expand Up @@ -290,7 +290,7 @@ function connack (packet, stream, opts) {

// Check return code
if (typeof rc !== 'number') {
stream.emit('error', new Error('Invalid return code'))
stream.destroy(new Error('Invalid return code'))
return false
}
// mqtt5 properties
Expand Down Expand Up @@ -330,7 +330,7 @@ function publish (packet, stream, opts) {
if (typeof topic === 'string') length += Buffer.byteLength(topic) + 2
else if (Buffer.isBuffer(topic)) length += topic.length + 2
else {
stream.emit('error', new Error('Invalid topic'))
stream.destroy(new Error('Invalid topic'))
return false
}

Expand All @@ -340,7 +340,7 @@ function publish (packet, stream, opts) {

// Message ID must a number if qos > 0
if (qos && typeof id !== 'number') {
stream.emit('error', new Error('Invalid messageId'))
stream.destroy(new Error('Invalid messageId'))
return false
} else if (qos) length += 2

Expand Down Expand Up @@ -391,7 +391,7 @@ function confirmation (packet, stream, opts) {

// Check message ID
if (typeof id !== 'number') {
stream.emit('error', new Error('Invalid messageId'))
stream.destroy(new Error('Invalid messageId'))
return false
}

Expand Down Expand Up @@ -440,7 +440,7 @@ function subscribe (packet, stream, opts) {

// Check message ID
if (typeof id !== 'number') {
stream.emit('error', new Error('Invalid messageId'))
stream.destroy(new Error('Invalid messageId'))
return false
} else length += 2

Expand All @@ -459,36 +459,36 @@ function subscribe (packet, stream, opts) {
const iqos = subs[i].qos

if (typeof itopic !== 'string') {
stream.emit('error', new Error('Invalid subscriptions - invalid topic'))
stream.destroy(new Error('Invalid subscriptions - invalid topic'))
return false
}
if (typeof iqos !== 'number') {
stream.emit('error', new Error('Invalid subscriptions - invalid qos'))
stream.destroy(new Error('Invalid subscriptions - invalid qos'))
return false
}

if (version === 5) {
const nl = subs[i].nl || false
if (typeof nl !== 'boolean') {
stream.emit('error', new Error('Invalid subscriptions - invalid No Local'))
stream.destroy(new Error('Invalid subscriptions - invalid No Local'))
return false
}
const rap = subs[i].rap || false
if (typeof rap !== 'boolean') {
stream.emit('error', new Error('Invalid subscriptions - invalid Retain as Published'))
stream.destroy(new Error('Invalid subscriptions - invalid Retain as Published'))
return false
}
const rh = subs[i].rh || 0
if (typeof rh !== 'number' || rh > 2) {
stream.emit('error', new Error('Invalid subscriptions - invalid Retain Handling'))
stream.destroy(new Error('Invalid subscriptions - invalid Retain Handling'))
return false
}
}

length += Buffer.byteLength(itopic) + 2 + 1
}
} else {
stream.emit('error', new Error('Invalid subscriptions'))
stream.destroy(new Error('Invalid subscriptions'))
return false
}

Expand Down Expand Up @@ -545,21 +545,21 @@ function suback (packet, stream, opts) {

// Check message ID
if (typeof id !== 'number') {
stream.emit('error', new Error('Invalid messageId'))
stream.destroy(new Error('Invalid messageId'))
return false
} else length += 2

// Check granted qos vector
if (typeof granted === 'object' && granted.length) {
for (let i = 0; i < granted.length; i += 1) {
if (typeof granted[i] !== 'number') {
stream.emit('error', new Error('Invalid qos vector'))
stream.destroy(new Error('Invalid qos vector'))
return false
}
length += 1
}
} else {
stream.emit('error', new Error('Invalid qos vector'))
stream.destroy(new Error('Invalid qos vector'))
return false
}

Expand Down Expand Up @@ -600,7 +600,7 @@ function unsubscribe (packet, stream, opts) {

// Check message ID
if (typeof id !== 'number') {
stream.emit('error', new Error('Invalid messageId'))
stream.destroy(new Error('Invalid messageId'))
return false
} else {
length += 2
Expand All @@ -609,13 +609,13 @@ function unsubscribe (packet, stream, opts) {
if (typeof unsubs === 'object' && unsubs.length) {
for (let i = 0; i < unsubs.length; i += 1) {
if (typeof unsubs[i] !== 'string') {
stream.emit('error', new Error('Invalid unsubscriptions'))
stream.destroy(new Error('Invalid unsubscriptions'))
return false
}
length += Buffer.byteLength(unsubs[i]) + 2
}
} else {
stream.emit('error', new Error('Invalid unsubscriptions'))
stream.destroy(new Error('Invalid unsubscriptions'))
return false
}
// properies mqtt 5
Expand Down Expand Up @@ -663,7 +663,7 @@ function unsuback (packet, stream, opts) {

// Check message ID
if (typeof id !== 'number') {
stream.emit('error', new Error('Invalid messageId'))
stream.destroy(new Error('Invalid messageId'))
return false
}

Expand All @@ -672,13 +672,13 @@ function unsuback (packet, stream, opts) {
if (typeof granted === 'object' && granted.length) {
for (let i = 0; i < granted.length; i += 1) {
if (typeof granted[i] !== 'number') {
stream.emit('error', new Error('Invalid qos vector'))
stream.destroy(new Error('Invalid qos vector'))
return false
}
length += 1
}
} else {
stream.emit('error', new Error('Invalid qos vector'))
stream.destroy(new Error('Invalid qos vector'))
return false
}
}
Expand Down Expand Up @@ -757,7 +757,7 @@ function auth (packet, stream, opts) {
const properties = settings.properties
let length = version === 5 ? 1 : 0

if (version !== 5) stream.emit('error', new Error('Invalid mqtt version for auth packet'))
if (version !== 5) stream.destroy(new Error('Invalid mqtt version for auth packet'))

// properies mqtt 5
const propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
Expand Down Expand Up @@ -794,7 +794,7 @@ function auth (packet, stream, opts) {
const varByteIntCache = {}
function writeVarByteInt (stream, num) {
if (num > protocol.VARBYTEINT_MAX) {
stream.emit('error', new Error(`Invalid variable byte integer: ${num}`))
stream.destroy(new Error(`Invalid variable byte integer: ${num}`))
return false
}

Expand Down Expand Up @@ -901,39 +901,39 @@ function getProperties (stream, properties) {
switch (type) {
case 'byte': {
if (typeof value !== 'boolean') {
stream.emit('error', new Error(`Invalid ${name}: ${value}`))
stream.destroy(new Error(`Invalid ${name}: ${value}`))
return false
}
length += 1 + 1
break
}
case 'int8': {
if (typeof value !== 'number' || value < 0 || value > 0xff) {
stream.emit('error', new Error(`Invalid ${name}: ${value}`))
stream.destroy(new Error(`Invalid ${name}: ${value}`))
return false
}
length += 1 + 1
break
}
case 'binary': {
if (value && value === null) {
stream.emit('error', new Error(`Invalid ${name}: ${value}`))
stream.destroy(new Error(`Invalid ${name}: ${value}`))
return false
}
length += 1 + Buffer.byteLength(value) + 2
break
}
case 'int16': {
if (typeof value !== 'number' || value < 0 || value > 0xffff) {
stream.emit('error', new Error(`Invalid ${name}: ${value}`))
stream.destroy(new Error(`Invalid ${name}: ${value}`))
return false
}
length += 1 + 2
break
}
case 'int32': {
if (typeof value !== 'number' || value < 0 || value > 0xffffffff) {
stream.emit('error', new Error(`Invalid ${name}: ${value}`))
stream.destroy(new Error(`Invalid ${name}: ${value}`))
return false
}
length += 1 + 4
Expand All @@ -942,23 +942,23 @@ function getProperties (stream, properties) {
case 'var': {
// var byte integer is max 24 bits packed in 32 bits
if (typeof value !== 'number' || value < 0 || value > 0x0fffffff) {
stream.emit('error', new Error(`Invalid ${name}: ${value}`))
stream.destroy(new Error(`Invalid ${name}: ${value}`))
return false
}
length += 1 + Buffer.byteLength(genBufVariableByteInt(value))
break
}
case 'string': {
if (typeof value !== 'string') {
stream.emit('error', new Error(`Invalid ${name}: ${value}`))
stream.destroy(new Error(`Invalid ${name}: ${value}`))
return false
}
length += 1 + 2 + Buffer.byteLength(value.toString())
break
}
case 'pair': {
if (typeof value !== 'object') {
stream.emit('error', new Error(`Invalid ${name}: ${value}`))
stream.destroy(new Error(`Invalid ${name}: ${value}`))
return false
}
length += Object.getOwnPropertyNames(value).reduce((result, name) => {
Expand All @@ -976,7 +976,7 @@ function getProperties (stream, properties) {
break
}
default: {
stream.emit('error', new Error(`Invalid property ${name}: ${value}`))
stream.destroy(new Error(`Invalid property ${name}: ${value}`))
return false
}
}
Expand Down Expand Up @@ -1085,7 +1085,7 @@ function writeProperty (stream, propName, value) {
break
}
default: {
stream.emit('error', new Error(`Invalid property ${propName} value: ${value}`))
stream.destroy(new Error(`Invalid property ${propName} value: ${value}`))
return false
}
}
Expand Down