Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Feature: PubSub 🌟 #644

Merged
merged 55 commits into from
Jan 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
d6c6d12
fix cli test typo
gavinmcdermott Nov 12, 2016
641fe1c
wip: floodsub now in api-routes, api-resources, and core-components
gavinmcdermott Nov 12, 2016
d3c4f02
Initial core tests pass
gavinmcdermott Nov 16, 2016
9026a88
Merge with master
gavinmcdermott Nov 16, 2016
9b7da97
feat(api, cli, core): add floodsub
gavinmcdermott Nov 17, 2016
adeccec
Merge branch 'master' into feature-floodsub
gavinmcdermott Nov 18, 2016
94bbc43
refactor(api, cli, core): update floodsub to a full name API
gavinmcdermott Nov 20, 2016
4ee7b5b
chore: merge upstream/master
gavinmcdermott Dec 6, 2016
5031470
chore: address CR comments
gavinmcdermott Dec 8, 2016
8084cb5
fix: fix lint issues
gavinmcdermott Dec 8, 2016
3cdfcb0
Merge pull request #610 from gavinmcdermott/feature-floodsub
daviddias Dec 8, 2016
3b365f7
update dep
daviddias Dec 8, 2016
faa6e33
Fix canceling subscription. Move subscription state to be local to th…
haadcode Dec 9, 2016
6ba150c
fix: typo in bitswap core
gavinmcdermott Dec 10, 2016
7d17a86
chore: update deps
daviddias Dec 11, 2016
ba12388
fix: no more infinite buffering of messages
daviddias Dec 11, 2016
39db7df
fix: wip - cancel relevant subscribe events
gavinmcdermott Dec 12, 2016
d2e6f6e
fix: no dangling listeners, multiple subscribers. pass all the tests …
daviddias Dec 12, 2016
deb5fd6
test: get http tests passing (except subscribe)
gavinmcdermott Dec 12, 2016
a89301b
fix cli test typo
gavinmcdermott Nov 12, 2016
9c82402
wip: floodsub now in api-routes, api-resources, and core-components
gavinmcdermott Nov 12, 2016
7f1eca4
Initial core tests pass
gavinmcdermott Nov 16, 2016
4233e53
feat(api, cli, core): add floodsub
gavinmcdermott Nov 17, 2016
186c5d2
refactor(api, cli, core): update floodsub to a full name API
gavinmcdermott Nov 20, 2016
093897b
chore: address CR comments
gavinmcdermott Dec 8, 2016
d4e9efa
fix: fix lint issues
gavinmcdermott Dec 8, 2016
580313a
update dep
daviddias Dec 8, 2016
ca9c0ea
Fix canceling subscription. Move subscription state to be local to th…
haadcode Dec 9, 2016
43045f5
fix: typo in bitswap core
gavinmcdermott Dec 10, 2016
467a9a9
chore: update deps
daviddias Dec 11, 2016
a302b25
fix: no more infinite buffering of messages
daviddias Dec 11, 2016
2d829f8
fix: wip - cancel relevant subscribe events
gavinmcdermott Dec 12, 2016
5a5a810
fix: no dangling listeners, multiple subscribers. pass all the tests …
daviddias Dec 12, 2016
82aba1e
chore: update deps
daviddias Dec 13, 2016
8d855c2
fix: subscribe returns a valid stream
gavinmcdermott Dec 16, 2016
b32af31
fix: fix lint issues; comment out pubsub cli
gavinmcdermott Dec 17, 2016
c457bf7
update to the latest floodsub
dignifiedquire Dec 18, 2016
65fe21c
Merge remote-tracking branch 'origin/master' into feat/pubsub
dignifiedquire Dec 18, 2016
ea458ee
simple cli and http tests passing
dignifiedquire Dec 18, 2016
3e29f95
new events based api done and http-api with ipfs-interface-core testing
dignifiedquire Dec 19, 2016
350a854
pubsub: fix peers implementation and test
dignifiedquire Dec 20, 2016
b5b4739
apply some cr
dignifiedquire Dec 21, 2016
6465bb5
better error handling
dignifiedquire Dec 21, 2016
d19864a
Merge branch 'master' into feat/pubsub
daviddias Dec 21, 2016
966b597
chore(deps): use interface-ipfs-core release
dignifiedquire Dec 21, 2016
c206800
chore: update deps
daviddias Dec 23, 2016
249bca1
Merge remote-tracking branch 'origin/master' into feat/pubsub
dignifiedquire Dec 23, 2016
9371b67
(wip) fix: pubsub closes its conns
daviddias Dec 30, 2016
50ae06d
feat(pubsub): update to new floodsub
daviddias Jan 9, 2017
9b1aea7
fix: let protocols initiate properly
daviddias Jan 10, 2017
aa29598
chore: update deps
daviddias Jan 10, 2017
4495b3a
fix: remove delay
daviddias Jan 10, 2017
3082c2f
chore: update deps
daviddias Jan 11, 2017
0208504
fix: setImmediate erros as well, update interface-ipfs-core
daviddias Jan 12, 2017
738c55c
Merge branch 'master' into feat/pubsub
daviddias Jan 16, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,18 @@
"aegir": "^9.3.0",
"buffer-loader": "0.0.1",
"chai": "^3.5.0",
"delay": "^1.3.1",
"detect-node": "^2.0.3",
"eslint-plugin-react": "^6.9.0",
"execa": "^0.6.0",
"expose-loader": "^0.7.1",
"form-data": "^2.1.2",
"fs-pull-blob-store": "^0.4.1",
"gulp": "^3.9.1",
"interface-ipfs-core": "^0.23.0",
"interface-ipfs-core": "^0.23.3",
"ipfsd-ctl": "^0.18.1",
"left-pad": "^1.1.3",
"lodash": "^4.17.2",
"lodash": "^4.17.4",
"mocha": "^3.2.0",
"ncp": "^2.0.0",
"nexpect": "^0.5.0",
Expand All @@ -85,11 +86,12 @@
"async": "^2.1.4",
"bl": "^1.2.0",
"boom": "^4.2.0",
"debug": "^2.5.1",
"debug": "^2.6.0",
"fs-pull-blob-store": "^0.3.0",
"glob": "^7.1.1",
"hapi": "^16.1.0",
"hapi-set-header": "^1.0.2",
"hoek": "^4.1.0",
"idb-pull-blob-store": "^0.5.1",
"ipfs-api": "^12.1.4",
"ipfs-bitswap": "^0.9.0",
Expand All @@ -101,9 +103,10 @@
"ipfs-unixfs-engine": "^0.15.0",
"ipld-resolver": "^0.4.1",
"isstream": "^0.1.2",
"joi": "^10.0.6",
"libp2p-ipfs-nodejs": "^0.17.1",
"libp2p-ipfs-browser": "^0.17.3",
"libp2p-floodsub": "0.7.1",
"joi": "^10.1.0",
"libp2p-ipfs-nodejs": "^0.17.3",
"libp2p-ipfs-browser": "^0.17.4",
"lodash.flatmap": "^4.5.0",
"lodash.get": "^4.4.2",
"lodash.has": "^4.5.2",
Expand Down Expand Up @@ -132,7 +135,7 @@
"temp": "^0.8.3",
"through2": "^2.0.3",
"update-notifier": "^1.0.3",
"yargs": "^6.5.0"
"yargs": "^6.6.0"
},
"contributors": [
"Andrew de Andrade <andrew@deandrade.com.br>",
Expand Down
14 changes: 14 additions & 0 deletions src/cli/commands/pubsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
'use strict'

module.exports = {
command: 'pubsub',

description: 'pubsub commands',

builder (yargs) {
return yargs
.commandDir('pubsub')
},

handler (argv) {}
}
32 changes: 32 additions & 0 deletions src/cli/commands/pubsub/ls.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:pubsub')
log.error = debug('cli:pubsub:error')

module.exports = {
command: 'ls',

describe: 'Get your list of subscriptions',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

ipfs.pubsub.ls((err, subscriptions) => {
if (err) {
throw err
}

subscriptions.forEach((sub) => {
console.log(sub)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to the logger thing @victorbjelkholm added, so that we save time in refactoring.

#495

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find the logger, @victorbjelkholm can you remind me where/how to use it?

})
})
})
}
}
32 changes: 32 additions & 0 deletions src/cli/commands/pubsub/peers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:pubsub')
log.error = debug('cli:pubsub:error')

module.exports = {
command: 'peers <topic>',

describe: 'Get all peers subscribed to a topic',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

ipfs.pubsub.peers(argv.topic, (err, peers) => {
if (err) {
throw err
}

peers.forEach((peer) => {
console.log(peer)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

})
})
})
}
}
30 changes: 30 additions & 0 deletions src/cli/commands/pubsub/pub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:pubsub')
log.error = debug('cli:pubsub:error')

module.exports = {
command: 'pub <topic> <data>',

describe: 'Publish data to a topic',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

const data = new Buffer(String(argv.data))

ipfs.pubsub.publish(argv.topic, data, (err) => {
if (err) {
throw err
}
})
})
}
}
32 changes: 32 additions & 0 deletions src/cli/commands/pubsub/sub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:pubsub')
log.error = debug('cli:pubsub:error')

module.exports = {
command: 'sub <topic>',

describe: 'Subscribe to a topic',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

const handler = (msg) => {
console.log(msg.data.toString())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

}

ipfs.pubsub.subscribe(argv.topic, handler, (err) => {
if (err) {
throw err
}
})
})
}
}
11 changes: 8 additions & 3 deletions src/core/components/go-offline.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
'use strict'

module.exports = function goOffline (self) {
return (cb) => {
module.exports = (self) => {
return (callback) => {
self._blockService.goOffline()
self._bitswap.stop()
self.libp2p.stop(cb)
self._pubsub.stop((err) => {
if (err) {
return callback(err)
}
self.libp2p.stop(callback)
})
}
}
30 changes: 22 additions & 8 deletions src/core/components/go-online.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,39 @@

const series = require('async/series')
const Bitswap = require('ipfs-bitswap')
const FloodSub = require('libp2p-floodsub')

module.exports = function goOnline (self) {
return (cb) => {
module.exports = (self) => {
return (callback) => {
series([
self.load,
self.libp2p.start
(cb) => self.load(cb),
(cb) => self.libp2p.start(cb)
], (err) => {
if (err) {
return cb(err)
return callback(err)
}

self._bitswap = new Bitswap(
self._libp2pNode,
self._repo.blockstore,
self._libp2pNode.peerBook
)
self._bitswap.start()
self._blockService.goOnline(self._bitswap)
cb()

self._pubsub = new FloodSub(self._libp2pNode)

series([
(cb) => {
self._bitswap.start()
cb()
},
(cb) => {
self._blockService.goOnline(self._bitswap)
cb()
},
(cb) => self._pubsub.start(cb) // ,
// For all of the protocols to handshake with each other
// (cb) => setTimeout(cb, 1000) // Still not decided if we want this
], callback)
})
}
}
97 changes: 97 additions & 0 deletions src/core/components/pubsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
'use strict'

const promisify = require('promisify-es6')
const setImmediate = require('async/setImmediate')

const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR

module.exports = function pubsub (self) {
return {
subscribe: (topic, options, handler, callback) => {
if (!self.isOnline()) {
throw OFFLINE_ERROR
}

if (typeof options === 'function') {
callback = handler
handler = options
options = {}
}

if (!callback) {
return new Promise((resolve, reject) => {
subscribe(topic, options, handler, (err) => {
if (err) {
return reject(err)
}
resolve()
})
})
}

subscribe(topic, options, handler, callback)
},

unsubscribe: (topic, handler) => {
const ps = self._pubsub

ps.removeListener(topic, handler)

if (ps.listenerCount(topic) === 0) {
ps.unsubscribe(topic)
}
},

publish: promisify((topic, data, callback) => {
if (!self.isOnline()) {
return setImmediate(() => callback(OFFLINE_ERROR))
}

if (!Buffer.isBuffer(data)) {
return setImmediate(() => callback(new Error('data must be a Buffer')))
}

self._pubsub.publish(topic, data)
setImmediate(() => callback())
}),

ls: promisify((callback) => {
if (!self.isOnline()) {
return setImmediate(() => callback(OFFLINE_ERROR))
}

const subscriptions = Array.from(
self._pubsub.subscriptions
)

setImmediate(() => callback(null, subscriptions))
}),

peers: promisify((topic, callback) => {
if (!self.isOnline()) {
return setImmediate(() => callback(OFFLINE_ERROR))
}

const peers = Array.from(self._pubsub.peers.values())
.filter((peer) => peer.topics.has(topic))
.map((peer) => peer.info.id.toB58String())

setImmediate(() => callback(null, peers))
}),

setMaxListeners (n) {
return self._pubsub.setMaxListeners(n)
}
}

function subscribe (topic, options, handler, callback) {
const ps = self._pubsub

if (ps.listenerCount(topic) === 0) {
ps.subscribe(topic)
}

ps.on(topic, handler)
setImmediate(() => callback())
}
}
3 changes: 3 additions & 0 deletions src/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const swarm = require('./components/swarm')
const ping = require('./components/ping')
const files = require('./components/files')
const bitswap = require('./components/bitswap')
const pubsub = require('./components/pubsub')

exports = module.exports = IPFS

Expand All @@ -44,6 +45,7 @@ function IPFS (repoInstance) {
this._bitswap = null
this._blockService = new BlockService(this._repo)
this._ipldResolver = new IPLDResolver(this._blockService)
this._pubsub = null

// IPFS Core exposed components

Expand All @@ -67,4 +69,5 @@ function IPFS (repoInstance) {
this.files = files(this)
this.bitswap = bitswap(this)
this.ping = ping(this)
this.pubsub = pubsub(this)
}
Loading