Skip to content

Commit

Permalink
refactor: core async (#478)
Browse files Browse the repository at this point in the history
* refactor: cleanup core

test: auto dial on startup

* fix: make hangup work properly

* chore: fix lint

* chore: apply suggestions from code review

Co-Authored-By: Vasco Santos <vasco.santos@moxy.studio>
  • Loading branch information
jacobheun and vasco-santos committed Nov 21, 2019
1 parent 4adee59 commit 2d93eaf
Show file tree
Hide file tree
Showing 14 changed files with 526 additions and 215 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
"libp2p-delegated-peer-routing": "^0.2.2",
"libp2p-floodsub": "^0.19.0",
"libp2p-gossipsub": "^0.1.0",
"libp2p-kad-dht": "^0.15.3",
"libp2p-kad-dht": "libp2p/js-libp2p-kad-dht#refactor/async-iterators",
"libp2p-mdns": "^0.12.3",
"libp2p-mplex": "^0.9.1",
"libp2p-pnet": "~0.1.0",
Expand Down
91 changes: 65 additions & 26 deletions src/dht.js
Original file line number Diff line number Diff line change
@@ -1,43 +1,82 @@
'use strict'

const nextTick = require('async/nextTick')
const errCode = require('err-code')
const promisify = require('promisify-es6')

const { messages, codes } = require('./errors')

module.exports = (node) => {
return {
put: promisify((key, value, callback) => {
if (!node._dht) {
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
}
module.exports = (node, DHT, config) => {
const dht = new DHT({
dialer: {
dial: (peer, options) => node.dial(peer, options),
dialProtocol: (peer, protocols, options) => {
const recordedPeer = node.peerStore.get(peer.toB58String())

node._dht.put(key, value, callback)
}),
get: promisify((key, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
if (recordedPeer) {
peer = recordedPeer
}
return node.dialProtocol(peer, protocols, options)
}
},
peerInfo: node.peerInfo,
peerStore: node.peerStore,
registrar: node.registrar,
datastore: this.datastore,
...config
})

if (!node._dht) {
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
return {
/**
* Store the given key/value pair in the DHT.
* @param {Buffer} key
* @param {Buffer} value
* @param {Object} [options] - put options
* @param {number} [options.minPeers] - minimum number of peers required to successfully put
* @returns {Promise<void>}
*/
put: (key, value, options) => {
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

node._dht.get(key, options, callback)
}),
getMany: promisify((key, nVals, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
return dht.put(key, value, options)
},

/**
* Get the value to the given key.
* Times out after 1 minute by default.
* @param {Buffer} key
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<{from: PeerId, val: Buffer}>}
*/
get: (key, options) => {
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

if (!node._dht) {
return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED))
return dht.get(key, options)
},

/**
* Get the `n` values to the given key without sorting.
* @param {Buffer} key
* @param {number} nVals
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<Array<{from: PeerId, val: Buffer}>>}
*/
getMany: (key, nVals, options) => {
if (!node.isStarted() || !dht.isStarted) {
throw errCode(new Error(messages.NOT_STARTED_YET), codes.DHT_NOT_STARTED)
}

node._dht.getMany(key, nVals, options, callback)
})
return dht.getMany(key, nVals, options)
},

_dht: dht,

start: () => dht.start(),

stop: () => dht.stop()
}
}
1 change: 1 addition & 0 deletions src/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ exports.messages = {
exports.codes = {
DHT_DISABLED: 'ERR_DHT_DISABLED',
PUBSUB_NOT_STARTED: 'ERR_PUBSUB_NOT_STARTED',
DHT_NOT_STARTED: 'ERR_DHT_NOT_STARTED',
ERR_CONNECTION_ENDED: 'ERR_CONNECTION_ENDED',
ERR_CONNECTION_FAILED: 'ERR_CONNECTION_FAILED',
ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED',
Expand Down
7 changes: 2 additions & 5 deletions src/get-peer-info.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,15 @@ function getPeerInfoRemote (peer, libp2p) {
try {
peerInfo = getPeerInfo(peer, libp2p.peerStore)
} catch (err) {
return Promise.reject(errCode(
new Error(`${peer} is not a valid peer type`),
'ERR_INVALID_PEER_TYPE'
))
throw errCode(err, 'ERR_INVALID_PEER_TYPE')
}

// If we don't have an address for the peer, attempt to find it
if (peerInfo.multiaddrs.size < 1) {
return libp2p.peerRouting.findPeer(peerInfo.id)
}

return Promise.resolve(peerInfo)
return peerInfo
}

module.exports = {
Expand Down
Loading

0 comments on commit 2d93eaf

Please sign in to comment.