Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: nolocal flag support for bridges #737

Merged
merged 14 commits into from
May 12, 2022
1 change: 1 addition & 0 deletions aedes.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ function storeRetained (packet, done) {
}

function emitPacket (packet, done) {
if (this.client) packet.clientId = this.client.id
this.broker.mq.emit(packet, done)
}

Expand Down
15 changes: 12 additions & 3 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,18 @@ function Client (broker, conn, req) {
conn.on('end', this.close.bind(this))
this._eos = eos(this.conn, this.close.bind(this))

this.deliver0 = function deliverQoS0 (_packet, cb) {
const getToForwardPacket = (_packet) => {
if (_packet.clientId === that.id && _packet.nl) return
oldrich-s marked this conversation as resolved.
Show resolved Hide resolved

const toForward = dedupe(that, _packet) &&
that.broker.authorizeForward(that, _packet)

return toForward
}

this.deliver0 = function deliverQoS0 (_packet, cb) {
const toForward = getToForwardPacket(_packet)

if (toForward) {
// Give nodejs some time to clear stacks, or we will see
// "Maximum call stack size exceeded" in a very high load
Expand All @@ -114,8 +123,8 @@ function Client (broker, conn, req) {
that.deliver0(_packet, cb)
return
}
const toForward = dedupe(that, _packet) &&
that.broker.authorizeForward(that, _packet)
const toForward = getToForwardPacket(_packet)

if (toForward) {
setImmediate(() => {
const packet = new QoSPacket(toForward, that)
Expand Down
13 changes: 6 additions & 7 deletions lib/handlers/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,12 @@ function addSubs (sub, done) {
const nl = this.nl
let func = qos > 0 ? client.deliverQoS : client.deliver0

if (!rap) {
const deliverFunc = func
func = function handlePacketSubscription (_packet, cb) {
_packet = new Packet(_packet, broker)
_packet.retain = false
deliverFunc(_packet, cb)
}
const deliverFunc = func
func = function handlePacketSubscription (_packet, cb) {
_packet = new Packet(_packet, broker)
_packet.nl = nl
if (!rap) _packet.retain = false
deliverFunc(_packet, cb)
robertsLando marked this conversation as resolved.
Show resolved Hide resolved
}

// [MQTT-4.7.2-1]
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@
"websocket-stream": "^5.5.2"
},
"dependencies": {
"aedes-packet": "^2.3.1",
"aedes-persistence": "^9.0.1",
"aedes-packet": "^3.0.0",
"aedes-persistence": "^9.1.1",
"end-of-stream": "^1.4.4",
"fastfall": "^1.5.1",
"fastparallel": "^2.4.1",
Expand Down
38 changes: 28 additions & 10 deletions test/auth.js
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ test('authentication error when non numeric return code is passed', function (t)
test('authorize publish', function (t) {
t.plan(4)

const s = connect(setup())
const s = connect(setup(), { clientId: 'my-client-xyz' })
t.teardown(s.broker.close.bind(s.broker))

const expected = {
Expand All @@ -445,6 +445,7 @@ test('authorize publish', function (t) {
t.notOk(Object.prototype.hasOwnProperty.call(packet, 'messageId'), 'should not contain messageId in QoS 0')
expected.brokerId = s.broker.id
expected.brokerCounter = s.broker.counter
expected.clientId = 'my-client-xyz'
delete expected.length
t.same(packet, expected, 'packet matches')
cb()
Expand All @@ -460,7 +461,7 @@ test('authorize publish', function (t) {
test('authorize waits for authenticate', function (t) {
t.plan(6)

const s = setup()
const s = setup(aedes({ clientId: 'my-client-xyz-2' }))
t.teardown(s.broker.close.bind(s.broker))

s.broker.authenticate = function (client, username, password, cb) {
Expand All @@ -485,7 +486,8 @@ test('authorize waits for authenticate', function (t) {
qos: 0,
retain: false,
length: 12,
dup: false
dup: false,
clientId: 'my-client'
}

s.broker.mq.on('hello', function (packet, cb) {
Expand Down Expand Up @@ -519,12 +521,13 @@ test('authorize publish from configOptions', function (t) {
t.plan(4)

const s = connect(setup(aedes({
clientId: 'my-client-xyz-3',
authorizePublish: function (client, packet, cb) {
t.ok(client, 'client exists')
t.same(packet, expected, 'packet matches')
cb()
}
})))
})), { clientId: 'my-client-xyz-3' })
t.teardown(s.broker.close.bind(s.broker))

const expected = {
Expand All @@ -541,6 +544,7 @@ test('authorize publish from configOptions', function (t) {
t.notOk(Object.prototype.hasOwnProperty.call(packet, 'messageId'), 'should not contain messageId in QoS 0')
expected.brokerId = s.broker.id
expected.brokerCounter = s.broker.counter
expected.clientId = 'my-client-xyz-3'
delete expected.length
t.same(packet, expected, 'packet matches')
cb()
Expand Down Expand Up @@ -589,7 +593,7 @@ test('do not authorize publish', function (t) {
test('modify qos out of range in authorize publish ', function (t) {
t.plan(2)

const s = connect(setup())
const s = connect(setup(), { clientId: 'my-client-xyz-4' })
t.teardown(s.broker.close.bind(s.broker))

const expected = {
Expand All @@ -599,7 +603,8 @@ test('modify qos out of range in authorize publish ', function (t) {
qos: 0,
retain: false,
length: 12,
dup: false
dup: false,
clientId: 'my-client-xyz-4'
}

s.broker.authorizePublish = function (client, packet, cb) {
Expand Down Expand Up @@ -805,12 +810,19 @@ test('negate multiple subscriptions', function (t) {
test('negate subscription with correct persistence', function (t) {
t.plan(6)

// rh, rap, nl are undefined because mqtt.parser is set to MQTT 3.1.1 and will thus erase these props from s.inStream.write
const expected = [{
topic: 'hello',
qos: 0
qos: 0,
rh: undefined,
rap: undefined,
nl: undefined
}, {
topic: 'world',
qos: 0
qos: 0,
rh: undefined,
rap: undefined,
nl: undefined
}]

const broker = aedes()
Expand Down Expand Up @@ -839,10 +851,16 @@ test('negate subscription with correct persistence', function (t) {
messageId: 24,
subscriptions: [{
topic: 'hello',
qos: 0
qos: 0,
rh: 0,
rap: true,
nl: false
}, {
topic: 'world',
qos: 0
qos: 0,
rh: 0,
rap: true,
nl: false
}]
})
})
Expand Down
12 changes: 8 additions & 4 deletions test/basic.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ test('test aedes.Server', function (t) {
test('publish QoS 0', function (t) {
t.plan(2)

const s = connect(setup())
const s = connect(setup(), { clientId: 'my-client-xyz-5' })
t.teardown(s.broker.close.bind(s.broker))

const expected = {
Expand All @@ -29,7 +29,8 @@ test('publish QoS 0', function (t) {
payload: Buffer.from('world'),
qos: 0,
retain: false,
dup: false
dup: false,
clientId: 'my-client-xyz-5'
}

s.broker.mq.on('hello', function (packet, cb) {
Expand Down Expand Up @@ -128,7 +129,7 @@ test('publish to $SYS topic throws error', function (t) {
qos: 0,
retain: false
}
const expectedSubs = ele.clean ? null : [{ topic: 'hello', qos: ele.qos }]
const expectedSubs = ele.clean ? null : [{ topic: 'hello', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined }]

subscribe(t, s, 'hello', ele.qos, function () {
s.outStream.once('data', function (packet) {
Expand Down Expand Up @@ -188,7 +189,10 @@ test('return write errors to callback', function (t) {
qos: 0,
retain: false
}
const subs = [{ topic: 'hello', qos: ele.qos }, { topic: 'world', qos: ele.qos }]
const subs = [
{ topic: 'hello', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined },
{ topic: 'world', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined }
]
const expectedSubs = ele.clean ? null : subs

subscribeMultiple(t, s, subs, [ele.qos, ele.qos], function () {
Expand Down
54 changes: 54 additions & 0 deletions test/bridge.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
'use strict'

const { test } = require('tap')
const { setup, connect, subscribe } = require('./helper')

for (const qos of [0, 1]) {
oldrich-s marked this conversation as resolved.
Show resolved Hide resolved
const packet = {
qos,
cmd: 'publish',
topic: 'hello',
payload: 'world'
}

if (qos > 0) packet.messageId = 42

test('normal client sends a publish message and shall receive it back, qos = ' + qos, function (t) {
const s = connect(setup())
t.teardown(s.broker.close.bind(s.broker))

const handle = setTimeout(() => {
t.fail('did not receive packet back')
t.end()
}, 1000)

subscribe(t, s, 'hello', qos, function () {
s.outStream.on('data', (packet) => {
if (packet.cmd == 'publish') {
clearTimeout(handle)
t.end()
}
})

s.inStream.write(packet)
})
})

test('bridge client sends a publish message but shall not receive it back, qos = ' + qos, function (t) {
const s = connect(setup(), { clientId: 'my-client-bridge-1', protocolVersion: 128 + 4 })
oldrich-s marked this conversation as resolved.
Show resolved Hide resolved
t.teardown(s.broker.close.bind(s.broker))

const handle = setTimeout(() => t.end(), 1000)

subscribe(t, s, 'hello', qos, function () {
s.outStream.on('data', function () {
clearTimeout(handle)
t.fail('should not receive packet back')
t.end()
})

s.inStream.write(packet)
})
})
}

12 changes: 7 additions & 5 deletions test/client-pub-sub.js
Original file line number Diff line number Diff line change
Expand Up @@ -905,10 +905,10 @@ test('should not receive a message on negated subscription', function (t) {
test('programmatically add custom subscribe', function (t) {
t.plan(6)

const broker = aedes()
const broker = aedes({ clientId: 'my-client-xyz-7' })
t.teardown(broker.close.bind(broker))

const s = connect(setup(broker))
const s = connect(setup(broker), { clientId: 'my-client-xyz-7' })
const expected = {
cmd: 'publish',
topic: 'hello',
Expand All @@ -924,7 +924,8 @@ test('programmatically add custom subscribe', function (t) {
payload: Buffer.from('world'),
qos: 0,
retain: false,
dup: false
dup: false,
clientId: 'my-client-xyz-7'
}
subscribe(t, s, 'hello', 0, function () {
broker.subscribe('hello', deliver, function () {
Expand Down Expand Up @@ -963,9 +964,10 @@ test('custom function in broker.subscribe', function (t) {
qos: 1,
retain: false,
dup: false,
messageId: undefined
messageId: undefined,
clientId: 'my-client-xyz-6'
}
connect(s, {}, function () {
connect(s, { clientId: 'my-client-xyz-6' }, function () {
broker.subscribe('hello', deliver, function () {
t.pass('subscribed')
})
Expand Down
6 changes: 4 additions & 2 deletions test/qos2.js
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ test('call published method with client with QoS 2', function (t) {
t.teardown(broker.close.bind(broker))

const opts = { clean: cleanSession }
const publisher = connect(setup(broker))
const publisher = connect(setup(broker), { clientId: 'my-client-xyz-8' })
const subscriber = connect(setup(broker), { ...opts, clientId: 'abcde' })
const forwarded = {
cmd: 'publish',
Expand All @@ -248,7 +248,8 @@ test('call published method with client with QoS 2', function (t) {
qos: 2,
retain: false,
dup: false,
messageId: undefined
messageId: undefined,
clientId: 'my-client-xyz-8'
}
const expected = {
cmd: 'publish',
Expand All @@ -262,6 +263,7 @@ test('call published method with client with QoS 2', function (t) {
broker.authorizeForward = function (client, packet) {
forwarded.brokerId = broker.id
forwarded.brokerCounter = broker.counter
delete packet.nl
t.same(packet, forwarded, 'forwarded packet must match')
return packet
}
Expand Down