Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

fix: make pubsub.unsubscribe async and alter pubsub.subscribe signature #260

Merged
merged 8 commits into from
May 11, 2018
45 changes: 29 additions & 16 deletions SPEC/PUBSUB.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ pubsub API

##### `Go` **WIP**

##### `JavaScript` - ipfs.pubsub.subscribe(topic, options, handler, callback)
##### `JavaScript` - ipfs.pubsub.subscribe(options, callback)

- `topic: string`
- `options: Object` - (Optional), might contain the following properties:
- `discover`: type: Boolean - Will use the DHT to find other peers.
- `handler: (msg) => ()` - Event handler which will be called with a message object everytime one is received. The `msg` has the format `{from: string, seqno: Buffer, data: Buffer, topicIDs: Array<string>}`.
- `callback: (Error) => ()` (Optional) Called once the subscription is established.
- `options: Object`: Object containing the following properties:
- `topic: string`
- `discover: Boolean` - Will use the DHT to find other peers.
- `handler: (msg) => ()` - Event handler which will be called with a message object everytime one is received. The `msg` has the format `{from: string, seqno: Buffer, data: Buffer, topicIDs: Array<string>}`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handler moves into the options object? That's a bit of a weird pattern, much nicer when it was separated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm torn here. The thing is that promisify'ing things forces a weird arg check case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @victorbjelkholm on this one, I'd prefer that the implementation choices in js-ipfs didn't affect the public API.

i.e. I don't think promisify should dictate what our public API looks like even if it means handling some difficult argument combinations.

IMHO the most intuitive API would be this:

ipfs.pubsub.subscribe(topic, handler, options, callback)
ipfs.pubsub.unsubscribe(topic, handler, callback)

Most important and non-optional arguments first, followed by optional args.

That said, if you just renamed the options arg to something like details then that would be better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe your proposal of:

ipfs.pubsub.subscribe(topic, handler, options, callback)
ipfs.pubsub.unsubscribe(topic, handler, callback)

Pretty much solves the main issue. I like it!

- `callback: (Error) => ()` Called once the subscription is established.

If no `callback` is passed, a [promise][] is returned.

Expand All @@ -28,7 +28,10 @@ const receiveMsg = (msg) => {
console.log(msg.data.toString())
}

ipfs.pubsub.subscribe(topic, receiveMsg)
ipfs.pubsub.subscribe({
topic: topic,
handler: receiveMsg
})
```

A great source of [examples][] can be found in the tests for this API.
Expand All @@ -39,10 +42,14 @@ A great source of [examples][] can be found in the tests for this API.

##### `Go` **WIP**

##### `JavaScript` - `ipfs.pubsub.unsubscribe(topic, handler)`
##### `JavaScript` - `ipfs.pubsub.unsubscribe(options, callback)`

- `options: Object`: Object containing the following properties:
- `topic: string` - The topic to unsubscribe from
- `handler: (msg) => ()` - The handler to remove.
- `callback: (Error) => ()` (Optional) Called once the unsubscribe is done.

- `topic: string` - The topic to unsubscribe from
- `handler: (msg) => ()` - The handler to remove.
If no `callback` is passed, a [promise][] is returned.

This works like `EventEmitter.removeListener`, as that only the `handler` passed to a `subscribe` call before is removed from listening. The underlying subscription will only be canceled once all listeners for a topic have been removed.

Expand All @@ -55,12 +62,18 @@ const receiveMsg = (msg) => {
console.log(msg.toString())
}

ipfs.pubsub.subscribe(topic, receiveMsg)

setTimeout(() => {
// unsubscribe a second later
ipfs.pubsub.unsubscribe(topic, receiveMsg)
}, 1000)
ipfs.pubsub.subscribe({
topic: topic,
handler: receiveMsg
}, () => {
setTimeout(() => {
// unsubscribe a second later
ipfs.pubsub.unsubscribe({
topic: topic,
handler: receiveMsg
})
}, 1000)
})
```

A great source of [examples][] can be found in the tests for this API.
Expand Down
85 changes: 41 additions & 44 deletions js/src/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const dirtyChai = require('dirty-chai')
const expect = chai.expect
chai.use(dirtyChai)
const series = require('async/series')
const each = require('async/each')
const waterfall = require('async/waterfall')
const parallel = require('async/parallel')
const whilst = require('async/whilst')
Expand Down Expand Up @@ -137,12 +138,12 @@ module.exports = (common) => {
expect(msg).to.have.property('topicIDs').eql([topic])
expect(msg).to.have.property('from', ipfs1.peerId.id)

ipfs1.pubsub.unsubscribe(topic, handler)

ipfs1.pubsub.ls((err, topics) => {
expect(err).to.not.exist()
expect(topics).to.be.empty()
check()
ipfs1.pubsub.unsubscribe(topic, handler, () => {
ipfs1.pubsub.ls((err, topics) => {
expect(err).to.not.exist()
expect(topics).to.be.empty()
check()
})
})
}

Expand All @@ -163,12 +164,12 @@ module.exports = (common) => {
expect(msg).to.have.property('topicIDs').eql([topic])
expect(msg).to.have.property('from', ipfs1.peerId.id)

ipfs1.pubsub.unsubscribe(topic, handler)

ipfs1.pubsub.ls((err, topics) => {
expect(err).to.not.exist()
expect(topics).to.be.empty()
check()
ipfs1.pubsub.unsubscribe(topic, handler, () => {
ipfs1.pubsub.ls((err, topics) => {
expect(err).to.not.exist()
expect(topics).to.be.empty()
check()
})
})
}

Expand All @@ -189,12 +190,12 @@ module.exports = (common) => {
expect(msg).to.have.property('topicIDs').eql([topic])
expect(msg).to.have.property('from', ipfs1.peerId.id)

ipfs1.pubsub.unsubscribe(topic, handler)

ipfs1.pubsub.ls((err, topics) => {
expect(err).to.not.exist()
expect(topics).to.be.empty()
check()
ipfs1.pubsub.unsubscribe(topic, handler, () => {
ipfs1.pubsub.ls((err, topics) => {
expect(err).to.not.exist()
expect(topics).to.be.empty()
check()
})
})
}

Expand All @@ -211,14 +212,10 @@ module.exports = (common) => {
const handler1 = (msg) => {
expect(msg.data.toString()).to.eql('hello')

ipfs1.pubsub.unsubscribe(topic, handler1)

series([
(cb) => ipfs1.pubsub.unsubscribe(topic, handler1, cb)
(cb) => ipfs1.pubsub.ls(cb),
(cb) => {
ipfs1.pubsub.unsubscribe(topic, handler2)
cb()
},
(cb) => ipfs1.pubsub.unsubscribe(topic, handler2, cb),
(cb) => ipfs1.pubsub.ls(cb)
], (err, res) => {
expect(err).to.not.exist()
Expand Down Expand Up @@ -251,8 +248,7 @@ module.exports = (common) => {

const handler = (msg) => {
expect(msg.data.toString()).to.eql('hi')
ipfs1.pubsub.unsubscribe(topic, handler)
check()
ipfs1.pubsub.unsubscribe(topic, handler, check)
}

ipfs1.pubsub.subscribe(topic, {
Expand Down Expand Up @@ -389,8 +385,7 @@ module.exports = (common) => {
expect(err).to.not.exist()
expect(topics).to.be.eql([topic])

ipfs1.pubsub.unsubscribe(topic, sub1)
done()
ipfs1.pubsub.unsubscribe(topic, sub1, done)
})
})
})
Expand All @@ -414,11 +409,8 @@ module.exports = (common) => {
ipfs1.pubsub.ls((err, list) => {
expect(err).to.not.exist()

expect(
list.sort()
).to.be.eql(
topics.map((t) => t.name).sort()
)
expect(list.sort())
.to.eql(topics.map((t) => t.name).sort())

topics.forEach((t) => {
ipfs1.pubsub.unsubscribe(t.name, t.handler)
Expand All @@ -439,9 +431,11 @@ module.exports = (common) => {
topic = getTopic()
})

afterEach(() => {
ipfs1.pubsub.unsubscribe(topic, sub1)
ipfs2.pubsub.unsubscribe(topic, sub2)
afterEach((done) => {
parallel([
(cb) => ipfs1.pubsub.unsubscribe(topic, sub1, cb)
(cb) => ipfs2.pubsub.unsubscribe(topic, sub2, cb)
], done)
})

it('receive messages from different node', (done) => {
Expand Down Expand Up @@ -673,14 +667,17 @@ module.exports = (common) => {
},
(err) => {
expect(err).to.not.exist()
handlers.forEach((handler) => {
ipfs1.pubsub.unsubscribe(someTopic, handler)
})

ipfs1.pubsub.ls((err, topics) => {
expect(err).to.not.exist()
expect(topics).to.eql([])
done()
each(
handlers,
(handler, cb) => ipfs1.pubsub.unsubscribe(someTopic, handler, cb)
(err) => {
expect(err).to.not.exist()
ipfs1.pubsub.ls((err, topics) => {
expect(err).to.not.exist()
expect(topics).to.eql([])
done()
})
})
})
}
)
Expand Down