From 190822aadd73a688be9882cc428a0a655782481c Mon Sep 17 00:00:00 2001 From: Oldrich Svec Date: Tue, 26 Apr 2022 11:21:39 +0200 Subject: [PATCH 01/12] Add nolocal handling --- lib/client.js | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/lib/client.js b/lib/client.js index 2a6221dd..798337da 100644 --- a/lib/client.js +++ b/lib/client.js @@ -92,7 +92,10 @@ function Client (broker, conn, req) { this.deliver0 = function deliverQoS0 (_packet, cb) { const toForward = dedupe(that, _packet) && that.broker.authorizeForward(that, _packet) - if (toForward) { + + const isNL = _packet.clientId === that.id && _packet.nl + + if (toForward && !isNL) { // Give nodejs some time to clear stacks, or we will see // "Maximum call stack size exceeded" in a very high load setImmediate(() => { @@ -116,7 +119,10 @@ function Client (broker, conn, req) { } const toForward = dedupe(that, _packet) && that.broker.authorizeForward(that, _packet) - if (toForward) { + + const isNL = _packet.clientId === that.id && _packet.nl + + if (toForward && !isNL) { setImmediate(() => { const packet = new QoSPacket(toForward, that) // Downgrading to client subscription qos if needed From 45d5303692adbb3ca21ffb9a459e6f96359be788 Mon Sep 17 00:00:00 2001 From: Oldrich Svec Date: Tue, 26 Apr 2022 11:23:06 +0200 Subject: [PATCH 02/12] Add clientId --- aedes.js | 1 + 1 file changed, 1 insertion(+) diff --git a/aedes.js b/aedes.js index 75d5914d..e7de15fa 100644 --- a/aedes.js +++ b/aedes.js @@ -170,6 +170,7 @@ function storeRetained (packet, done) { } function emitPacket (packet, done) { + if (this.client) packet.clientId = this.client.id this.broker.mq.emit(packet, done) } From afffe9624b275588a7ff72db5df313411b947790 Mon Sep 17 00:00:00 2001 From: Oldrich Svec Date: Tue, 26 Apr 2022 11:24:27 +0200 Subject: [PATCH 03/12] Add nl to packet --- lib/handlers/subscribe.js | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index e1e25d67..f93fb9d4 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -142,13 +142,12 @@ function addSubs (sub, done) { const nl = this.nl let func = qos > 0 ? client.deliverQoS : client.deliver0 - if (!rap) { - const deliverFunc = func - func = function handlePacketSubscription (_packet, cb) { - _packet = new Packet(_packet, broker) - _packet.retain = false - deliverFunc(_packet, cb) - } + const deliverFunc = func + func = function handlePacketSubscription (_packet, cb) { + _packet = new Packet(_packet, broker) + _packet.nl = nl + if (!rap) _packet.retain = false + deliverFunc(_packet, cb) } // [MQTT-4.7.2-1] From e677f7e077470b7666144e221b620cab99485152 Mon Sep 17 00:00:00 2001 From: Oldrich Svec Date: Tue, 26 Apr 2022 13:20:21 +0200 Subject: [PATCH 04/12] remove trailing spaces --- lib/client.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/client.js b/lib/client.js index 798337da..9e3e23f8 100644 --- a/lib/client.js +++ b/lib/client.js @@ -92,9 +92,9 @@ function Client (broker, conn, req) { this.deliver0 = function deliverQoS0 (_packet, cb) { const toForward = dedupe(that, _packet) && that.broker.authorizeForward(that, _packet) - + const isNL = _packet.clientId === that.id && _packet.nl - + if (toForward && !isNL) { // Give nodejs some time to clear stacks, or we will see // "Maximum call stack size exceeded" in a very high load @@ -119,9 +119,9 @@ function Client (broker, conn, req) { } const toForward = dedupe(that, _packet) && that.broker.authorizeForward(that, _packet) - + const isNL = _packet.clientId === that.id && _packet.nl - + if (toForward && !isNL) { setImmediate(() => { const packet = new QoSPacket(toForward, that) From 95c4689fd14191348d28029d6563f6f3e2b93db1 Mon Sep 17 00:00:00 2001 From: Oldrich Svec Date: Wed, 27 Apr 2022 10:02:57 +0000 Subject: [PATCH 05/12] Make failing tests work --- test/auth.js | 4 ++++ test/basic.js | 1 + test/client-pub-sub.js | 2 ++ test/qos2.js | 2 ++ 4 files changed, 9 insertions(+) diff --git a/test/auth.js b/test/auth.js index 2c57a73a..4c538e01 100644 --- a/test/auth.js +++ b/test/auth.js @@ -446,6 +446,7 @@ test('authorize publish', function (t) { expected.brokerId = s.broker.id expected.brokerCounter = s.broker.counter delete expected.length + delete packet.clientId t.same(packet, expected, 'packet matches') cb() }) @@ -493,6 +494,7 @@ test('authorize waits for authenticate', function (t) { expected.brokerId = s.broker.id expected.brokerCounter = s.broker.counter delete expected.length + delete packet.clientId t.same(packet, expected, 'packet matches') cb() }) @@ -542,6 +544,7 @@ test('authorize publish from configOptions', function (t) { expected.brokerId = s.broker.id expected.brokerCounter = s.broker.counter delete expected.length + delete packet.clientId t.same(packet, expected, 'packet matches') cb() }) @@ -618,6 +621,7 @@ test('modify qos out of range in authorize publish ', function (t) { expected.brokerId = s.broker.id expected.brokerCounter = s.broker.counter delete expected.length + delete packet.clientId t.same(packet, expected, 'packet matches') cb() }) diff --git a/test/basic.js b/test/basic.js index 81130a3e..625c0d39 100644 --- a/test/basic.js +++ b/test/basic.js @@ -35,6 +35,7 @@ test('publish QoS 0', function (t) { s.broker.mq.on('hello', function (packet, cb) { expected.brokerId = s.broker.id expected.brokerCounter = s.broker.counter + delete packet.clientId t.equal(packet.messageId, undefined, 'MUST not contain a packet identifier in QoS 0') t.same(packet, expected, 'packet matches') cb() diff --git a/test/client-pub-sub.js b/test/client-pub-sub.js index 127ccd2b..7e701fb3 100644 --- a/test/client-pub-sub.js +++ b/test/client-pub-sub.js @@ -944,6 +944,7 @@ test('programmatically add custom subscribe', function (t) { function deliver (packet, cb) { deliverP.brokerId = s.broker.id deliverP.brokerCounter = s.broker.counter + delete packet.clientId t.same(packet, deliverP, 'packet matches') cb() } @@ -986,6 +987,7 @@ test('custom function in broker.subscribe', function (t) { function deliver (packet, cb) { expected.brokerId = s.broker.id expected.brokerCounter = s.broker.counter + delete packet.clientId t.same(packet, expected, 'packet matches') cb() } diff --git a/test/qos2.js b/test/qos2.js index e87ac093..e809c6a9 100644 --- a/test/qos2.js +++ b/test/qos2.js @@ -262,6 +262,8 @@ test('call published method with client with QoS 2', function (t) { broker.authorizeForward = function (client, packet) { forwarded.brokerId = broker.id forwarded.brokerCounter = broker.counter + delete packet.clientId + delete packet.nl t.same(packet, forwarded, 'forwarded packet must match') return packet } From abc9cfde2cd9c4634b2ccda086794c2fb709c3e4 Mon Sep 17 00:00:00 2001 From: Oldrich Svec Date: Wed, 27 Apr 2022 10:17:37 +0000 Subject: [PATCH 06/12] Extract common functionality to a function --- lib/client.js | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/lib/client.js b/lib/client.js index 9e3e23f8..a66b2d6b 100644 --- a/lib/client.js +++ b/lib/client.js @@ -89,13 +89,19 @@ function Client (broker, conn, req) { conn.on('end', this.close.bind(this)) this._eos = eos(this.conn, this.close.bind(this)) - this.deliver0 = function deliverQoS0 (_packet, cb) { + const getToForwardPacket = (_packet) => { + if (_packet.clientId === that.id && _packet.nl) return + const toForward = dedupe(that, _packet) && that.broker.authorizeForward(that, _packet) - const isNL = _packet.clientId === that.id && _packet.nl + return toForward + } + + this.deliver0 = function deliverQoS0 (_packet, cb) { + const toForward = getToForwardPacket(_packet) - if (toForward && !isNL) { + if (toForward) { // Give nodejs some time to clear stacks, or we will see // "Maximum call stack size exceeded" in a very high load setImmediate(() => { @@ -117,12 +123,9 @@ function Client (broker, conn, req) { that.deliver0(_packet, cb) return } - const toForward = dedupe(that, _packet) && - that.broker.authorizeForward(that, _packet) - - const isNL = _packet.clientId === that.id && _packet.nl + const toForward = getToForwardPacket(_packet) - if (toForward && !isNL) { + if (toForward) { setImmediate(() => { const packet = new QoSPacket(toForward, that) // Downgrading to client subscription qos if needed From 113fb5e7bd6e699057203fd30bffad56b0289904 Mon Sep 17 00:00:00 2001 From: Oldrich Svec Date: Thu, 12 May 2022 08:18:06 +0000 Subject: [PATCH 07/12] update deps + unit tests validate clientId in packets --- package.json | 4 ++-- test/auth.js | 42 ++++++++++++++++++++++++++++-------------- test/basic.js | 13 ++++++++----- test/client-pub-sub.js | 14 +++++++------- test/qos2.js | 6 +++--- 5 files changed, 48 insertions(+), 31 deletions(-) diff --git a/package.json b/package.json index 19ebfdb2..612f7ebd 100644 --- a/package.json +++ b/package.json @@ -118,8 +118,8 @@ "websocket-stream": "^5.5.2" }, "dependencies": { - "aedes-packet": "^2.3.1", - "aedes-persistence": "^9.0.1", + "aedes-packet": "^3.0.0", + "aedes-persistence": "^9.1.1", "end-of-stream": "^1.4.4", "fastfall": "^1.5.1", "fastparallel": "^2.4.1", diff --git a/test/auth.js b/test/auth.js index 4c538e01..248657c5 100644 --- a/test/auth.js +++ b/test/auth.js @@ -422,7 +422,7 @@ test('authentication error when non numeric return code is passed', function (t) test('authorize publish', function (t) { t.plan(4) - const s = connect(setup()) + const s = connect(setup(), { clientId: 'my-client-xyz' }) t.teardown(s.broker.close.bind(s.broker)) const expected = { @@ -445,8 +445,8 @@ test('authorize publish', function (t) { t.notOk(Object.prototype.hasOwnProperty.call(packet, 'messageId'), 'should not contain messageId in QoS 0') expected.brokerId = s.broker.id expected.brokerCounter = s.broker.counter + expected.clientId = 'my-client-xyz' delete expected.length - delete packet.clientId t.same(packet, expected, 'packet matches') cb() }) @@ -461,7 +461,7 @@ test('authorize publish', function (t) { test('authorize waits for authenticate', function (t) { t.plan(6) - const s = setup() + const s = setup(aedes({ clientId: 'my-client-xyz-2' })) t.teardown(s.broker.close.bind(s.broker)) s.broker.authenticate = function (client, username, password, cb) { @@ -486,7 +486,8 @@ test('authorize waits for authenticate', function (t) { qos: 0, retain: false, length: 12, - dup: false + dup: false, + clientId: 'my-client' } s.broker.mq.on('hello', function (packet, cb) { @@ -494,7 +495,6 @@ test('authorize waits for authenticate', function (t) { expected.brokerId = s.broker.id expected.brokerCounter = s.broker.counter delete expected.length - delete packet.clientId t.same(packet, expected, 'packet matches') cb() }) @@ -521,12 +521,13 @@ test('authorize publish from configOptions', function (t) { t.plan(4) const s = connect(setup(aedes({ + clientId: 'my-client-xyz-3', authorizePublish: function (client, packet, cb) { t.ok(client, 'client exists') t.same(packet, expected, 'packet matches') cb() } - }))) + })), { clientId: 'my-client-xyz-3' }) t.teardown(s.broker.close.bind(s.broker)) const expected = { @@ -543,8 +544,8 @@ test('authorize publish from configOptions', function (t) { t.notOk(Object.prototype.hasOwnProperty.call(packet, 'messageId'), 'should not contain messageId in QoS 0') expected.brokerId = s.broker.id expected.brokerCounter = s.broker.counter + expected.clientId = 'my-client-xyz-3' delete expected.length - delete packet.clientId t.same(packet, expected, 'packet matches') cb() }) @@ -592,7 +593,7 @@ test('do not authorize publish', function (t) { test('modify qos out of range in authorize publish ', function (t) { t.plan(2) - const s = connect(setup()) + const s = connect(setup(), { clientId: 'my-client-xyz-4' }) t.teardown(s.broker.close.bind(s.broker)) const expected = { @@ -602,7 +603,8 @@ test('modify qos out of range in authorize publish ', function (t) { qos: 0, retain: false, length: 12, - dup: false + dup: false, + clientId: 'my-client-xyz-4' } s.broker.authorizePublish = function (client, packet, cb) { @@ -621,7 +623,6 @@ test('modify qos out of range in authorize publish ', function (t) { expected.brokerId = s.broker.id expected.brokerCounter = s.broker.counter delete expected.length - delete packet.clientId t.same(packet, expected, 'packet matches') cb() }) @@ -809,12 +810,19 @@ test('negate multiple subscriptions', function (t) { test('negate subscription with correct persistence', function (t) { t.plan(6) + // rh, rap, nl are undefined because mqtt.parser is set to MQTT 3.1.1 and will thus erase these props from s.inStream.write const expected = [{ topic: 'hello', - qos: 0 + qos: 0, + rh: undefined, + rap: undefined, + nl: undefined }, { topic: 'world', - qos: 0 + qos: 0, + rh: undefined, + rap: undefined, + nl: undefined }] const broker = aedes() @@ -843,10 +851,16 @@ test('negate subscription with correct persistence', function (t) { messageId: 24, subscriptions: [{ topic: 'hello', - qos: 0 + qos: 0, + rh: 0, + rap: true, + nl: false }, { topic: 'world', - qos: 0 + qos: 0, + rh: 0, + rap: true, + nl: false }] }) }) diff --git a/test/basic.js b/test/basic.js index 625c0d39..1638847d 100644 --- a/test/basic.js +++ b/test/basic.js @@ -20,7 +20,7 @@ test('test aedes.Server', function (t) { test('publish QoS 0', function (t) { t.plan(2) - const s = connect(setup()) + const s = connect(setup(), { clientId: 'my-client-xyz-5' }) t.teardown(s.broker.close.bind(s.broker)) const expected = { @@ -29,13 +29,13 @@ test('publish QoS 0', function (t) { payload: Buffer.from('world'), qos: 0, retain: false, - dup: false + dup: false, + clientId: 'my-client-xyz-5' } s.broker.mq.on('hello', function (packet, cb) { expected.brokerId = s.broker.id expected.brokerCounter = s.broker.counter - delete packet.clientId t.equal(packet.messageId, undefined, 'MUST not contain a packet identifier in QoS 0') t.same(packet, expected, 'packet matches') cb() @@ -129,7 +129,7 @@ test('publish to $SYS topic throws error', function (t) { qos: 0, retain: false } - const expectedSubs = ele.clean ? null : [{ topic: 'hello', qos: ele.qos }] + const expectedSubs = ele.clean ? null : [{ topic: 'hello', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined }] subscribe(t, s, 'hello', ele.qos, function () { s.outStream.once('data', function (packet) { @@ -189,7 +189,10 @@ test('return write errors to callback', function (t) { qos: 0, retain: false } - const subs = [{ topic: 'hello', qos: ele.qos }, { topic: 'world', qos: ele.qos }] + const subs = [ + { topic: 'hello', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined }, + { topic: 'world', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined } + ] const expectedSubs = ele.clean ? null : subs subscribeMultiple(t, s, subs, [ele.qos, ele.qos], function () { diff --git a/test/client-pub-sub.js b/test/client-pub-sub.js index 7e701fb3..9f829325 100644 --- a/test/client-pub-sub.js +++ b/test/client-pub-sub.js @@ -905,10 +905,10 @@ test('should not receive a message on negated subscription', function (t) { test('programmatically add custom subscribe', function (t) { t.plan(6) - const broker = aedes() + const broker = aedes({ clientId: 'my-client-xyz-7' }) t.teardown(broker.close.bind(broker)) - const s = connect(setup(broker)) + const s = connect(setup(broker), { clientId: 'my-client-xyz-7' }) const expected = { cmd: 'publish', topic: 'hello', @@ -924,7 +924,8 @@ test('programmatically add custom subscribe', function (t) { payload: Buffer.from('world'), qos: 0, retain: false, - dup: false + dup: false, + clientId: 'my-client-xyz-7' } subscribe(t, s, 'hello', 0, function () { broker.subscribe('hello', deliver, function () { @@ -944,7 +945,6 @@ test('programmatically add custom subscribe', function (t) { function deliver (packet, cb) { deliverP.brokerId = s.broker.id deliverP.brokerCounter = s.broker.counter - delete packet.clientId t.same(packet, deliverP, 'packet matches') cb() } @@ -964,9 +964,10 @@ test('custom function in broker.subscribe', function (t) { qos: 1, retain: false, dup: false, - messageId: undefined + messageId: undefined, + clientId: 'my-client-xyz-6' } - connect(s, {}, function () { + connect(s, { clientId: 'my-client-xyz-6' }, function () { broker.subscribe('hello', deliver, function () { t.pass('subscribed') }) @@ -987,7 +988,6 @@ test('custom function in broker.subscribe', function (t) { function deliver (packet, cb) { expected.brokerId = s.broker.id expected.brokerCounter = s.broker.counter - delete packet.clientId t.same(packet, expected, 'packet matches') cb() } diff --git a/test/qos2.js b/test/qos2.js index e809c6a9..00b028ae 100644 --- a/test/qos2.js +++ b/test/qos2.js @@ -239,7 +239,7 @@ test('call published method with client with QoS 2', function (t) { t.teardown(broker.close.bind(broker)) const opts = { clean: cleanSession } - const publisher = connect(setup(broker)) + const publisher = connect(setup(broker), { clientId: 'my-client-xyz-8' }) const subscriber = connect(setup(broker), { ...opts, clientId: 'abcde' }) const forwarded = { cmd: 'publish', @@ -248,7 +248,8 @@ test('call published method with client with QoS 2', function (t) { qos: 2, retain: false, dup: false, - messageId: undefined + messageId: undefined, + clientId: 'my-client-xyz-8' } const expected = { cmd: 'publish', @@ -262,7 +263,6 @@ test('call published method with client with QoS 2', function (t) { broker.authorizeForward = function (client, packet) { forwarded.brokerId = broker.id forwarded.brokerCounter = broker.counter - delete packet.clientId delete packet.nl t.same(packet, forwarded, 'forwarded packet must match') return packet From 19bdd93a54bc87093f09d881120fddebca09f866 Mon Sep 17 00:00:00 2001 From: Oldrich Svec Date: Thu, 12 May 2022 08:59:32 +0000 Subject: [PATCH 08/12] add bridge unit tests --- test/bridge.js | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 test/bridge.js diff --git a/test/bridge.js b/test/bridge.js new file mode 100644 index 00000000..834c8431 --- /dev/null +++ b/test/bridge.js @@ -0,0 +1,48 @@ +'use strict' + +const { test } = require('tap') +const { setup, connect, subscribe } = require('./helper') + +test('normal client sends a publish message and shall receive it back', function (t) { + const s = connect(setup()) + t.teardown(s.broker.close.bind(s.broker)) + + const handle = setTimeout(() => { + t.fail('did not receive packet back') + t.end() + }, 1000) + + subscribe(t, s, 'hello', 0, function () { + s.outStream.on('data', () => { + clearTimeout(handle) + t.end() + }) + + s.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world' + }) + }) +}) + +test('bridge client sends a publish message but shall not receive it back', function (t) { + const s = connect(setup(), { clientId: 'my-client-bridge-1', protocolVersion: 128 + 4 }) + t.teardown(s.broker.close.bind(s.broker)) + + const handle = setTimeout(() => t.end(), 1000) + + subscribe(t, s, 'hello', 0, function () { + s.outStream.on('data', function () { + clearTimeout(handle) + t.fail('should not receive packet back') + t.end() + }) + + s.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world' + }) + }) +}) From 86baaf9f8e0513f7ed53d23ef604e5f9e9786430 Mon Sep 17 00:00:00 2001 From: Oldrich Svec Date: Thu, 12 May 2022 09:25:21 +0000 Subject: [PATCH 09/12] add qos to bridge unit test --- test/bridge.js | 74 +++++++++++++++++++++++++++----------------------- 1 file changed, 40 insertions(+), 34 deletions(-) diff --git a/test/bridge.js b/test/bridge.js index 834c8431..1f46fc5f 100644 --- a/test/bridge.js +++ b/test/bridge.js @@ -3,46 +3,52 @@ const { test } = require('tap') const { setup, connect, subscribe } = require('./helper') -test('normal client sends a publish message and shall receive it back', function (t) { - const s = connect(setup()) - t.teardown(s.broker.close.bind(s.broker)) - - const handle = setTimeout(() => { - t.fail('did not receive packet back') - t.end() - }, 1000) - - subscribe(t, s, 'hello', 0, function () { - s.outStream.on('data', () => { - clearTimeout(handle) +for (const qos of [0, 1]) { + const packet = { + qos, + cmd: 'publish', + topic: 'hello', + payload: 'world' + } + + if (qos > 0) packet.messageId = 42 + + test('normal client sends a publish message and shall receive it back, qos = ' + qos, function (t) { + const s = connect(setup()) + t.teardown(s.broker.close.bind(s.broker)) + + const handle = setTimeout(() => { + t.fail('did not receive packet back') t.end() - }) - - s.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world' + }, 1000) + + subscribe(t, s, 'hello', qos, function () { + s.outStream.on('data', (packet) => { + if (packet.cmd == 'publish') { + clearTimeout(handle) + t.end() + } + }) + + s.inStream.write(packet) }) }) -}) -test('bridge client sends a publish message but shall not receive it back', function (t) { - const s = connect(setup(), { clientId: 'my-client-bridge-1', protocolVersion: 128 + 4 }) - t.teardown(s.broker.close.bind(s.broker)) + test('bridge client sends a publish message but shall not receive it back, qos = ' + qos, function (t) { + const s = connect(setup(), { clientId: 'my-client-bridge-1', protocolVersion: 128 + 4 }) + t.teardown(s.broker.close.bind(s.broker)) - const handle = setTimeout(() => t.end(), 1000) + const handle = setTimeout(() => t.end(), 1000) - subscribe(t, s, 'hello', 0, function () { - s.outStream.on('data', function () { - clearTimeout(handle) - t.fail('should not receive packet back') - t.end() - }) + subscribe(t, s, 'hello', qos, function () { + s.outStream.on('data', function () { + clearTimeout(handle) + t.fail('should not receive packet back') + t.end() + }) - s.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world' + s.inStream.write(packet) }) }) -}) +} + From 82a634c1b07bfaa677d1cc5ef2cbe51078cd2757 Mon Sep 17 00:00:00 2001 From: Oldrich Svec Date: Thu, 12 May 2022 12:20:24 +0200 Subject: [PATCH 10/12] Add reference to MQTTv5 Co-authored-by: Daniel Lando --- lib/client.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/client.js b/lib/client.js index a66b2d6b..414d8e5f 100644 --- a/lib/client.js +++ b/lib/client.js @@ -90,6 +90,8 @@ function Client (broker, conn, req) { this._eos = eos(this.conn, this.close.bind(this)) const getToForwardPacket = (_packet) => { + // Mqttv5 3.8.3.1: https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html#_Toc3901169 + // prevent to forward messages sent by the same client when no-local flag is set if (_packet.clientId === that.id && _packet.nl) return const toForward = dedupe(that, _packet) && From e0ca896ecd4037d8867f2a5ffa5341a5254c30a7 Mon Sep 17 00:00:00 2001 From: Oldrich Svec Date: Thu, 12 May 2022 12:21:10 +0200 Subject: [PATCH 11/12] Add reference to bridge mode enabled Co-authored-by: Daniel Lando --- test/bridge.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/bridge.js b/test/bridge.js index 1f46fc5f..97801ffd 100644 --- a/test/bridge.js +++ b/test/bridge.js @@ -35,6 +35,8 @@ for (const qos of [0, 1]) { }) test('bridge client sends a publish message but shall not receive it back, qos = ' + qos, function (t) { + // protocolVersion 128 + 4 means mqtt 3.1.1 with bridgeMode enabled + // https://github.com/mqttjs/mqtt-packet/blob/7f7c2ed8bcb4b2c582851d120a94e0b4a731f661/parser.js#L171 const s = connect(setup(), { clientId: 'my-client-bridge-1', protocolVersion: 128 + 4 }) t.teardown(s.broker.close.bind(s.broker)) From ae8dabf8f123864b211226647cb0fb4b774062ab Mon Sep 17 00:00:00 2001 From: Oldrich Svec Date: Thu, 12 May 2022 10:26:06 +0000 Subject: [PATCH 12/12] Add qos 2 to bridge unit test --- test/bridge.js | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/test/bridge.js b/test/bridge.js index 1f46fc5f..ccb7af44 100644 --- a/test/bridge.js +++ b/test/bridge.js @@ -3,7 +3,7 @@ const { test } = require('tap') const { setup, connect, subscribe } = require('./helper') -for (const qos of [0, 1]) { +for (const qos of [0, 1, 2]) { const packet = { qos, cmd: 'publish', @@ -12,24 +12,26 @@ for (const qos of [0, 1]) { } if (qos > 0) packet.messageId = 42 - + test('normal client sends a publish message and shall receive it back, qos = ' + qos, function (t) { const s = connect(setup()) t.teardown(s.broker.close.bind(s.broker)) - + const handle = setTimeout(() => { t.fail('did not receive packet back') t.end() }, 1000) - + subscribe(t, s, 'hello', qos, function () { s.outStream.on('data', (packet) => { - if (packet.cmd == 'publish') { + if (packet.cmd === 'publish') { clearTimeout(handle) t.end() + } else if (packet.cmd === 'pubrec') { + s.inStream.write({ cmd: 'pubrel', messageId: 42 }) } }) - + s.inStream.write(packet) }) }) @@ -51,4 +53,3 @@ for (const qos of [0, 1]) { }) }) } -