Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

fix: performance improvements #107

Merged
merged 20 commits into from
May 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"release-minor": "aegir release --type minor --docs -t node",
"release-major": "aegir release --type major --docs -t node",
"coverage": "aegir coverage",
"coverage-publish": "aegir-coverage publish"
"coverage-publish": "aegir-coverage publish",
"sim": "node test/simulation/index.js"
},
"pre-push": [
"lint",
Expand Down
8 changes: 2 additions & 6 deletions src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ exports.PROVIDERS_VALIDITY = 24 * hour

exports.PROVIDERS_CLEANUP_INTERVAL = hour

exports.READ_MESSAGE_TIMEOUT = minute
exports.READ_MESSAGE_TIMEOUT = 10 * second

// The number of records that will be retrieved on a call to getMany()
exports.GET_MANY_RECORD_COUNT = 16
Expand All @@ -34,16 +34,12 @@ exports.K = 20
// Alpha is the concurrency for asynchronous requests
exports.ALPHA = 3

// Number of disjoint query paths to use
// This is set to K/2 per the S/Kademlia paper
exports.DISJOINT_PATHS = 10

exports.maxMessageSize = 2 << 22 // 4MB

exports.defaultRandomWalk = {
enabled: true,
queriesPerPeriod: 1,
interval: 5 * minute,
timeout: 30 * second,
timeout: 10 * second,
delay: 10 * second
}
37 changes: 28 additions & 9 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class KadDHT extends EventEmitter {
* @param {Switch} sw libp2p-switch instance
* @param {object} options DHT options
* @param {number} options.kBucketSize k-bucket size (default 20)
* @param {number} options.concurrency alpha concurrency of queries (default 3)
* @param {Datastore} options.datastore datastore (default MemoryDatastore)
* @param {object} options.validators validators object with namespace as keys and function(key, record, callback)
* @param {object} options.selectors selectors object with namespace as keys and function(key, records)
Expand Down Expand Up @@ -75,11 +76,17 @@ class KadDHT extends EventEmitter {
this.kBucketSize = options.kBucketSize || c.K

/**
* Number of closest peers to return on kBucket search, default 20
*
* ALPHA concurrency at which each query path with run, defaults to 3
* @type {number}
*/
this.concurrency = options.concurrency || c.ALPHA

/**
* Number of disjoint query paths to use
* This is set to `kBucketSize`/2 per the S/Kademlia paper
* @type {number}
*/
this.ncp = options.ncp || c.K
this.disjointPaths = Math.ceil(this.kBucketSize / 2)

/**
* The routing table.
Expand Down Expand Up @@ -321,7 +328,7 @@ class KadDHT extends EventEmitter {
waterfall([
(cb) => utils.convertBuffer(key, cb),
(id, cb) => {
const rtp = this.routingTable.closestPeers(id, c.ALPHA)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seeing as this code is the same for every query, maybe it should be part of the Query itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

const rtp = this.routingTable.closestPeers(id, this.kBucketSize)

this._log('peers in rt: %d', rtp.length)
if (rtp.length === 0) {
Expand Down Expand Up @@ -412,7 +419,7 @@ class KadDHT extends EventEmitter {
return callback(err)
}

const tablePeers = this.routingTable.closestPeers(id, c.ALPHA)
const tablePeers = this.routingTable.closestPeers(id, this.kBucketSize)

const q = new Query(this, key, () => {
// There is no distinction between the disjoint paths,
Expand Down Expand Up @@ -442,7 +449,7 @@ class KadDHT extends EventEmitter {

waterfall([
(cb) => utils.sortClosestPeers(Array.from(res.finalSet), id, cb),
(sorted, cb) => cb(null, sorted.slice(0, c.K))
(sorted, cb) => cb(null, sorted.slice(0, this.kBucketSize))
], callback)
})
})
Expand Down Expand Up @@ -527,6 +534,7 @@ class KadDHT extends EventEmitter {
provide (key, callback) {
this._log('provide: %s', key.toBaseEncodedString())

const errors = []
waterfall([
(cb) => this.providers.addProvider(key, this.peerInfo.id, cb),
(cb) => this.getClosestPeers(key.buffer, cb),
Expand All @@ -536,10 +544,21 @@ class KadDHT extends EventEmitter {

each(peers, (peer, cb) => {
this._log('putProvider %s to %s', key.toBaseEncodedString(), peer.toB58String())
this.network.sendMessage(peer, msg, cb)
this.network.sendMessage(peer, msg, (err) => {
if (err) errors.push(err)
cb()
})
}, cb)
}
], (err) => callback(err))
], (err) => {
if (errors.length) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should help mitigate #105. While the provide will still technically fail, we'll at least have attempted to provide to all peers.

// This should be infrequent. This means a peer we previously connected
// to failed to exchange the provide message. If getClosestPeers was an
// iterator, we could continue to pull until we announce to kBucketSize peers.
err = errcode(`Failed to provide to ${errors.length} of ${this.kBucketSize} peers`, 'ERR_SOME_PROVIDES_FAILED', { errors })
}
callback(err)
})
}

/**
Expand Down Expand Up @@ -613,7 +632,7 @@ class KadDHT extends EventEmitter {
waterfall([
(cb) => utils.convertPeerId(id, cb),
(key, cb) => {
const peers = this.routingTable.closestPeers(key, c.ALPHA)
const peers = this.routingTable.closestPeers(key, this.kBucketSize)

if (peers.length === 0) {
return cb(errcode(new Error('Peer lookup failed'), 'ERR_LOOKUP_FAILED'))
Expand Down
13 changes: 6 additions & 7 deletions src/private.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ module.exports = (dht) => ({
}
let ids
try {
ids = dht.routingTable.closestPeers(key, dht.ncp)
ids = dht.routingTable.closestPeers(key, dht.kBucketSize)
} catch (err) {
return callback(err)
}
Expand Down Expand Up @@ -82,7 +82,7 @@ module.exports = (dht) => ({
* Try to fetch a given record by from the local datastore.
* Returns the record iff it is still valid, meaning
* - it was either authored by this node, or
* - it was receceived less than `MAX_RECORD_AGE` ago.
* - it was received less than `MAX_RECORD_AGE` ago.
*
* @param {Buffer} key
* @param {function(Error, Record)} callback
Expand Down Expand Up @@ -171,7 +171,7 @@ module.exports = (dht) => ({
*
* @param {Buffer} key
* @param {PeerId} peer
* @param {function(Error)} callback
* @param {function(Error, Array<PeerInfo>)} callback
* @returns {void}
*
* @private
Expand All @@ -185,13 +185,12 @@ module.exports = (dht) => ({

const out = msg.closerPeers
.filter((pInfo) => !dht._isSelf(pInfo.id))
.map((pInfo) => dht.peerBook.put(pInfo))

callback(null, out)
})
},
/**
* Is the given peer id the peer id?
* Is the given peer id our PeerId?
*
* @param {PeerId} other
* @returns {bool}
Expand All @@ -206,7 +205,7 @@ module.exports = (dht) => ({
*
* @param {PeerId} peer
* @param {PeerId} target
* @param {function(Error)} callback
* @param {function(Error, Message)} callback
* @returns {void}
*
* @private
Expand Down Expand Up @@ -522,7 +521,7 @@ module.exports = (dht) => ({
}
})

const peers = dht.routingTable.closestPeers(key.buffer, c.ALPHA)
const peers = dht.routingTable.closestPeers(key.buffer, dht.kBucketSize)

timeout((cb) => query.run(peers, cb), providerTimeout)((err) => {
query.stop()
Expand Down
Loading