diff --git a/aedes.d.ts b/aedes.d.ts index 2e71b751..8f56b25a 100644 --- a/aedes.d.ts +++ b/aedes.d.ts @@ -87,12 +87,12 @@ declare namespace aedes { ): void subscribe ( topic: string, - deliverfunc: (packet: ISubscribePacket, callback: () => void) => void, + deliverfunc: (packet: IPublishPacket, callback: () => void) => void, callback: () => void ): void unsubscribe ( topic: string, - deliverfunc: (packet: IUnsubscribePacket, callback: () => void) => void, + deliverfunc: (packet: IPublishPacket, callback: () => void) => void, callback: () => void ): void close (callback?: () => void): void diff --git a/test/client-pub-sub.js b/test/client-pub-sub.js index cbad57bb..9e1d7255 100644 --- a/test/client-pub-sub.js +++ b/test/client-pub-sub.js @@ -1,7 +1,7 @@ 'use strict' const { test } = require('tap') -const { setup, connect } = require('./helper') +const { setup, connect, subscribe, noError } = require('./helper') const aedes = require('../') test('publish direct to a single client QoS 0', function (t) { @@ -604,3 +604,123 @@ test('should not receive a message on negated subscription', function (t) { t.fail('Packet should not be received') }) }) + +test('programmatically add custom subscribe', function (t) { + t.plan(6) + + const broker = aedes() + t.tearDown(broker.close.bind(broker)) + + const s = connect(setup(broker)) + const expected = { + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + qos: 0, + retain: false, + length: 12, + dup: false + } + var deliverP = { + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + qos: 0, + retain: false + } + subscribe(t, s, 'hello', 0, function () { + broker.subscribe('hello', deliver, function () { + t.pass('subscribed') + }) + s.outStream.on('data', function (packet) { + t.deepEqual(packet, expected, 'packet matches') + }) + s.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 0, + messageId: 42 + }) + }) + function deliver (packet, cb) { + deliverP.brokerId = s.broker.id + deliverP.brokerCounter = s.broker.counter + t.deepEqual(packet, deliverP, 'packet matches') + cb() + } +}) + +test('custom function in broker.subscribe', function (t) { + t.plan(4) + + const broker = aedes() + t.tearDown(broker.close.bind(broker)) + + const s = setup(broker) + var expected = { + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + qos: 1, + retain: false, + messageId: undefined + } + connect(s, {}, function () { + broker.subscribe('hello', deliver, function () { + t.pass('subscribed') + }) + s.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 1, + messageId: 42 + }) + }) + broker.on('publish', function (packet, client) { + if (client) { + t.equal(packet.topic, 'hello') + t.equal(packet.messageId, 42) + } + }) + function deliver (packet, cb) { + expected.brokerId = s.broker.id + expected.brokerCounter = s.broker.counter + t.deepEqual(packet, expected, 'packet matches') + cb() + } +}) + +test('custom function in broker.unsubscribe', function (t) { + t.plan(3) + + const broker = aedes() + t.tearDown(broker.close.bind(broker)) + + const s = noError(setup(broker)) + connect(s, {}, function () { + broker.subscribe('hello', deliver, function () { + t.pass('subscribed') + broker.unsubscribe('hello', deliver, function () { + t.pass('unsubscribe') + s.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'word', + qos: 1, + messageId: 42 + }) + }) + }) + }) + broker.on('publish', function (packet, client) { + if (client) { + t.pass('publish') + } + }) + function deliver (packet, cb) { + t.fail('shoudl not be called') + cb() + } +}) diff --git a/test/types/index.ts b/test/types/index.ts index badacb7f..428a2071 100644 --- a/test/types/index.ts +++ b/test/types/index.ts @@ -2,7 +2,7 @@ /* eslint no-undef: 0 */ import { Server, Client, AuthenticateError } from '../../aedes' -import { IPublishPacket, ISubscribePacket, ISubscription, IUnsubscribePacket } from 'mqtt-packet' +import { IPublishPacket, ISubscription } from 'mqtt-packet' import { createServer } from 'net' const broker = Server({ @@ -122,17 +122,17 @@ broker.on('unsubscribe', (subscriptions, client) => { console.log(`client: ${client.id} subsribe`) }) -broker.subscribe('aaaa', (packet: ISubscribePacket, cb) => { +broker.subscribe('aaaa', (packet: IPublishPacket, cb) => { console.log('cmd') - console.log(packet.subscriptions) + console.log(packet.cmd) cb() }, () => { console.log('done subscribing') }) -broker.unsubscribe('aaaa', (packet: IUnsubscribePacket, cb) => { +broker.unsubscribe('aaaa', (packet: IPublishPacket, cb) => { console.log('cmd') - console.log(packet.unsubscriptions) + console.log(packet.cmd) cb() }, () => { console.log('done unsubscribing')