Skip to content

Commit

Permalink
refactor: core async (#478)
Browse files Browse the repository at this point in the history
* refactor: cleanup core

test: auto dial on startup

* fix: make hangup work properly

* chore: fix lint

* chore: apply suggestions from code review

Co-Authored-By: Vasco Santos <vasco.santos@moxy.studio>
  • Loading branch information
jacobheun and vasco-santos committed Nov 21, 2019
1 parent 17946fb commit 480373d
Show file tree
Hide file tree
Showing 8 changed files with 365 additions and 37 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
"libp2p-delegated-peer-routing": "^0.2.2",
"libp2p-floodsub": "^0.19.0",
"libp2p-gossipsub": "^0.1.0",
"libp2p-kad-dht": "^0.15.3",
"libp2p-kad-dht": "libp2p/js-libp2p-kad-dht#refactor/async-iterators",
"libp2p-mdns": "^0.12.3",
"libp2p-mplex": "^0.9.1",
"libp2p-pnet": "~0.1.0",
Expand Down
91 changes: 65 additions & 26 deletions src/dht.js
Original file line number Diff line number Diff line change
@@ -1,43 +1,82 @@
'use strict'

const nextTick = require('async/nextTick')
const errCode = require('err-code')
const promisify = require('promisify-es6')

const { messages, codes } = require('./errors')

module.exports = (node) => {
return {
put: promisify((key, value, callback) => {
if (!node._dht) {
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
}
module.exports = (node, DHT, config) => {
const dht = new DHT({
dialer: {
dial: (peer, options) => node.dial(peer, options),
dialProtocol: (peer, protocols, options) => {
const recordedPeer = node.peerStore.get(peer.toB58String())

node._dht.put(key, value, callback)
}),
get: promisify((key, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
if (recordedPeer) {
peer = recordedPeer
}
return node.dialProtocol(peer, protocols, options)
}
},
peerInfo: node.peerInfo,
peerStore: node.peerStore,
registrar: node.registrar,
datastore: this.datastore,
...config
})

if (!node._dht) {
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
return {
/**
* Store the given key/value pair in the DHT.
* @param {Buffer} key
* @param {Buffer} value
* @param {Object} [options] - put options
* @param {number} [options.minPeers] - minimum number of peers required to successfully put
* @returns {Promise<void>}
*/
put: (key, value, options) => {
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

node._dht.get(key, options, callback)
}),
getMany: promisify((key, nVals, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
return dht.put(key, value, options)
},

/**
* Get the value to the given key.
* Times out after 1 minute by default.
* @param {Buffer} key
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<{from: PeerId, val: Buffer}>}
*/
get: (key, options) => {
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

if (!node._dht) {
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
return dht.get(key, options)
},

/**
* Get the `n` values to the given key without sorting.
* @param {Buffer} key
* @param {number} nVals
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<Array<{from: PeerId, val: Buffer}>>}
*/
getMany: (key, nVals, options) => {
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

node._dht.getMany(key, nVals, options, callback)
})
return dht.getMany(key, nVals, options)
},

_dht: dht,

start: () => dht.start(),

stop: () => dht.stop()
}
}
1 change: 1 addition & 0 deletions src/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ exports.messages = {
exports.codes = {
DHT_DISABLED: 'ERR_DHT_DISABLED',
PUBSUB_NOT_STARTED: 'ERR_PUBSUB_NOT_STARTED',
DHT_NOT_STARTED: 'ERR_DHT_NOT_STARTED',
ERR_CONNECTION_ENDED: 'ERR_CONNECTION_ENDED',
ERR_CONNECTION_FAILED: 'ERR_CONNECTION_FAILED',
ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED',
Expand Down
15 changes: 7 additions & 8 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,8 @@ class Libp2p extends EventEmitter {
}

// dht provided components (peerRouting, contentRouting, dht)
if (this._config.dht.enabled) {
const DHT = this._modules.dht

this._dht = new DHT(this._switch, {
datastore: this.datastore,
...this._config.dht
})
if (this._modules.dht) {
this._dht = dht(this, this._modules.dht, this._config.dht)
}

// start pubsub
Expand All @@ -136,7 +131,6 @@ class Libp2p extends EventEmitter {
// peer and content routing will automatically get modules from _modules and _dht
this.peerRouting = peerRouting(this)
this.contentRouting = contentRouting(this)
this.dht = dht(this)

this._peerDiscovered = this._peerDiscovered.bind(this)
}
Expand Down Expand Up @@ -186,6 +180,7 @@ class Libp2p extends EventEmitter {

try {
this.pubsub && await this.pubsub.stop()
this._dht && await this._dht.stop()
await this.transportManager.close()
} catch (err) {
if (err) {
Expand Down Expand Up @@ -312,6 +307,10 @@ class Libp2p extends EventEmitter {
if (this._config.pubsub.enabled) {
this.pubsub && this.pubsub.start()
}

if (this._config.dht.enabled) {
this._dht && this._dht.start()
}
}

/**
Expand Down
21 changes: 19 additions & 2 deletions src/peer-store/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,28 @@ class PeerStore extends EventEmitter {
* Stores the peerInfo of a new peer.
* If already exist, its info is updated.
* @param {PeerInfo} peerInfo
* @return {PeerInfo}
*/
put (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')

let peer
// Already know the peer?
if (this.peers.has(peerInfo.id.toB58String())) {
this.update(peerInfo)
peer = this.update(peerInfo)
} else {
this.add(peerInfo)
peer = this.add(peerInfo)

// Emit the new peer found
this.emit('peer', peerInfo)
}
return peer
}

/**
* Add a new peer to the store.
* @param {PeerInfo} peerInfo
* @return {PeerInfo}
*/
add (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
Expand Down Expand Up @@ -86,11 +90,13 @@ class PeerStore extends EventEmitter {
})

this.peers.set(peerInfo.id.toB58String(), peerProxy)
return peerProxy
}

/**
* Updates an already known peer.
* @param {PeerInfo} peerInfo
* @return {PeerInfo}
*/
update (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
Expand Down Expand Up @@ -148,6 +154,8 @@ class PeerStore extends EventEmitter {
if (!recorded.id.pubKey && peerInfo.id.pubKey) {
recorded.id.pubKey = peerInfo.id.pubKey
}

return recorded
}

/**
Expand All @@ -165,6 +173,15 @@ class PeerStore extends EventEmitter {
return undefined
}

/**
* Has the info to the given id.
* @param {string} peerId b58str id
* @returns {boolean}
*/
has (peerId) {
return this.peers.has(peerId)
}

/**
* Removes the Peer with the matching `peerId` from the PeerStore
* @param {string} peerId b58str id
Expand Down
92 changes: 92 additions & 0 deletions test/dht/configuration.node.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
'use strict'
/* eslint-env mocha */

const chai = require('chai')
chai.use(require('dirty-chai'))
const { expect } = chai

const mergeOptions = require('merge-options')
const multiaddr = require('multiaddr')

const { create } = require('../../src')
const { baseOptions, subsystemOptions } = require('./utils')
const peerUtils = require('../utils/creators/peer')

const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')

describe('DHT subsystem is configurable', () => {
let libp2p

afterEach(async () => {
libp2p && await libp2p.stop()
})

it('should not exist if no module is provided', async () => {
libp2p = await create(baseOptions)
expect(libp2p._dht).to.not.exist()
})

it('should exist if the module is provided', async () => {
libp2p = await create(subsystemOptions)
expect(libp2p._dht).to.exist()
})

it('should start and stop by default once libp2p starts', async () => {
const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1)
peerInfo.multiaddrs.add(listenAddr)

const customOptions = mergeOptions(subsystemOptions, {
peerInfo
})

libp2p = await create(customOptions)
expect(libp2p._dht._dht.isStarted).to.equal(false)

await libp2p.start()
expect(libp2p._dht._dht.isStarted).to.equal(true)

await libp2p.stop()
expect(libp2p._dht._dht.isStarted).to.equal(false)
})

it('should not start if disabled once libp2p starts', async () => {
const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1)
peerInfo.multiaddrs.add(listenAddr)

const customOptions = mergeOptions(subsystemOptions, {
peerInfo,
config: {
dht: {
enabled: false
}
}
})

libp2p = await create(customOptions)
expect(libp2p._dht._dht.isStarted).to.equal(false)

await libp2p.start()
expect(libp2p._dht._dht.isStarted).to.equal(false)
})

it('should allow a manual start', async () => {
const [peerInfo] = await peerUtils.createPeerInfoFromFixture(1)
peerInfo.multiaddrs.add(listenAddr)

const customOptions = mergeOptions(subsystemOptions, {
peerInfo,
config: {
dht: {
enabled: false
}
}
})

libp2p = await create(customOptions)
await libp2p.start()
expect(libp2p._dht._dht.isStarted).to.equal(false)

await libp2p._dht.start()
expect(libp2p._dht._dht.isStarted).to.equal(true)
})
})
Loading

0 comments on commit 480373d

Please sign in to comment.