diff --git a/README.md b/README.md index d861f5fe..1092ef59 100644 --- a/README.md +++ b/README.md @@ -142,7 +142,7 @@ Events: 2. `client`, it will be null if the message is published using [`publish`](#publish). It is by design that the broker heartbeat will be on publish event, in this case `client` is null * `ack`: when a packet published to a client is delivered successfully with QoS 1 or QoS 2, arguments: - 1. `packet` + 1. `packet`, this will be the original PUBLISH packet in QoS 1, and PUBREL in QoS 2 2. `client` * `ping`: when a [Client](#client) sends a ping, arguments: 1. `packet` diff --git a/test/client-pub-sub.js b/test/client-pub-sub.js index 762a8da2..156cfa19 100644 --- a/test/client-pub-sub.js +++ b/test/client-pub-sub.js @@ -73,15 +73,64 @@ test('publish direct to a single client QoS 1', function (t) { }) }) -test('emit a `ack` event on PUBACK for QoS 1', function (t) { - t.plan(6) +test('publish direct to a single client QoS 2', function (t) { + t.plan(3) var broker = aedes() - var messageId - var clientId + var publishCount = 0 + var nonPublishCount = 0 - broker.on('client', function (client) { - clientId = client.id + broker.on('clientReady', function (client) { + client.publish({ + topic: 'hello', + payload: Buffer.from('world'), + qos: 2 + }, function (err) { + t.error(err, 'no error') + }) + client.on('error', function (err) { + t.error(err) + }) + }) + + var s = connect(setup(broker)) + + s.outStream.on('data', function (packet) { + if (packet.cmd === 'publish') { + publishCount++ + s.inStream.write({ + cmd: 'pubrec', + messageId: packet.messageId + }) + } else { + nonPublishCount++ + s.inStream.write({ + cmd: 'pubcomp', + messageId: packet.messageId + }) + } + }) + + broker.on('closed', function () { + t.equal(publishCount, 1) + t.equal(nonPublishCount, 1) + t.end() + }) +}) + +test('emit a `ack` event on PUBACK for QoS 1 [clean=false]', function (t) { + t.plan(3) + + var broker = aedes() + var expected = { + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + qos: 1, + retain: false + } + + broker.on('clientReady', function (client) { client.publish({ topic: 'hello', payload: Buffer.from('world'), @@ -92,17 +141,46 @@ test('emit a `ack` event on PUBACK for QoS 1', function (t) { }) broker.once('ack', function (packet, client) { - t.equal(client.id, clientId) - t.equal(packet.messageId, messageId) - t.equal(packet.topic, 'hello') - t.equal(packet.payload.toString(), 'world') + expected.brokerId = packet.brokerId + expected.brokerCounter = packet.brokerCounter + expected.messageId = packet.messageId + t.deepEqual(packet, expected, 'ack packet is origianl packet') t.pass('got the ack event') }) - var s = connect(setup(broker)) + var s = connect(setup(broker), { clean: false }) + + s.outStream.once('data', function (packet) { + s.inStream.write({ + cmd: 'puback', + messageId: packet.messageId + }) + }) +}) + +test('emit a `ack` event on PUBACK for QoS 1 [clean=true]', function (t) { + t.plan(3) + + var broker = aedes() + + broker.on('clientReady', function (client) { + client.publish({ + topic: 'hello', + payload: Buffer.from('world'), + qos: 1 + }, function (err) { + t.error(err, 'no error') + }) + }) + + broker.once('ack', function (packet, client) { + t.equal(packet, undefined, 'ack packet is undefined') + t.pass('got the ack event') + }) + + var s = connect(setup(broker), { clean: true }) s.outStream.once('data', function (packet) { - messageId = packet.messageId s.inStream.write({ cmd: 'puback', messageId: packet.messageId @@ -110,14 +188,14 @@ test('emit a `ack` event on PUBACK for QoS 1', function (t) { }) }) -test('emit a `ack` event on PUBCOMP for QoS 2', function (t) { - t.plan(6) +test('emit a `ack` event on PUBCOMP for QoS 2 [clean=false]', function (t) { + t.plan(5) var broker = aedes() var messageId var clientId - broker.on('client', function (client) { + broker.on('clientReady', function (client) { clientId = client.id client.publish({ topic: 'hello', @@ -131,12 +209,12 @@ test('emit a `ack` event on PUBCOMP for QoS 2', function (t) { broker.once('ack', function (packet, client) { t.equal(client.id, clientId) t.equal(packet.messageId, messageId) - t.equal(packet.topic, 'hello') - t.equal(packet.payload.toString(), 'world') + t.equal(packet.cmd, 'pubrel', 'ack packet is purel') t.pass('got the ack event') + t.end() }) - var s = connect(setup(broker)) + var s = connect(setup(broker), { clean: false }) s.outStream.on('data', function (packet) { if (packet.cmd === 'publish') { @@ -154,6 +232,44 @@ test('emit a `ack` event on PUBCOMP for QoS 2', function (t) { }) }) +test('emit a `ack` event on PUBCOMP for QoS 2 [clean=true]', function (t) { + t.plan(3) + + var broker = aedes() + + broker.on('clientReady', function (client) { + client.publish({ + topic: 'hello', + payload: Buffer.from('world'), + qos: 2 + }, function (err) { + t.error(err, 'no error') + }) + }) + + broker.once('ack', function (packet, client) { + t.equal(packet, undefined, 'ack packet is undefined') + t.pass('got the ack event') + t.end() + }) + + var s = connect(setup(broker), { clean: true }) + + s.outStream.on('data', function (packet) { + if (packet.cmd === 'publish') { + s.inStream.write({ + cmd: 'pubrec', + messageId: packet.messageId + }) + } else { + s.inStream.write({ + cmd: 'pubcomp', + messageId: packet.messageId + }) + } + }) +}) + test('offline message support for direct publish', function (t) { t.plan(2) @@ -237,6 +353,44 @@ test('subscribe a client programmatically', function (t) { }) }) +test('subscribe a client programmatically - wildcard', function (t) { + t.plan(3) + + var broker = aedes() + var expected = { + cmd: 'publish', + topic: 'hello/world/1', + payload: Buffer.from('world'), + dup: false, + length: 20, + qos: 0, + retain: false + } + + broker.on('clientReady', function (client) { + client.subscribe({ + topic: '+/world/1', + qos: 0 + }, function (err) { + t.error(err, 'no error') + + broker.publish({ + topic: 'hello/world/1', + payload: Buffer.from('world'), + qos: 0 + }, function (err) { + t.error(err, 'no error') + }) + }) + }) + + var s = connect(setup(broker)) + + s.outStream.once('data', function (packet) { + t.deepEqual(packet, expected, 'packet matches') + }) +}) + test('unsubscribe a client', function (t) { t.plan(2)