Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

fix: performance improvements #107

Merged
merged 20 commits into from
May 8, 2019
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"release-minor": "aegir release --type minor --docs -t node",
"release-major": "aegir release --type major --docs -t node",
"coverage": "aegir coverage",
"coverage-publish": "aegir-coverage publish"
"coverage-publish": "aegir-coverage publish",
"sim": "node test/simulation/index.js"
},
"pre-push": [
"lint",
Expand Down
8 changes: 2 additions & 6 deletions src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ exports.PROVIDERS_VALIDITY = 24 * hour

exports.PROVIDERS_CLEANUP_INTERVAL = hour

exports.READ_MESSAGE_TIMEOUT = minute
exports.READ_MESSAGE_TIMEOUT = 10 * second

// The number of records that will be retrieved on a call to getMany()
exports.GET_MANY_RECORD_COUNT = 16
Expand All @@ -34,16 +34,12 @@ exports.K = 20
// Alpha is the concurrency for asynchronous requests
exports.ALPHA = 3

// Number of disjoint query paths to use
// This is set to K/2 per the S/Kademlia paper
exports.DISJOINT_PATHS = 10

exports.maxMessageSize = 2 << 22 // 4MB

exports.defaultRandomWalk = {
enabled: true,
queriesPerPeriod: 1,
interval: 5 * minute,
timeout: 30 * second,
timeout: 10 * second,
delay: 10 * second
}
40 changes: 33 additions & 7 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class KadDHT extends EventEmitter {
* @param {Switch} sw libp2p-switch instance
* @param {object} options DHT options
* @param {number} options.kBucketSize k-bucket size (default 20)
* @param {number} options.concurrency alpha concurrency of queries (default 3)
* @param {Datastore} options.datastore datastore (default MemoryDatastore)
* @param {object} options.validators validators object with namespace as keys and function(key, record, callback)
* @param {object} options.selectors selectors object with namespace as keys and function(key, records)
Expand All @@ -74,12 +75,25 @@ class KadDHT extends EventEmitter {
*/
this.kBucketSize = options.kBucketSize || c.K

/**
* ALPHA concurrency at which each query path with run, defaults to 3
* @type {number}
*/
this.concurrency = options.concurrency || c.ALPHA

/**
* Number of disjoint query paths to use
* This is set to `kBucketSize`/2 per the S/Kademlia paper
* @type {number}
*/
this.disjointPaths = Math.ceil(this.kBucketSize / 2)

/**
* Number of closest peers to return on kBucket search, default 20
*
* @type {number}
*/
this.ncp = options.ncp || c.K
this.ncp = options.ncp || this.kBucketSize
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add docs to options.ncp ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, looking through the code we actually only use dht.ncp in 1 spot. I kind of think we should just get rid of it altogether and replace that occurrence with dht.kBucketSize. While it could potentially provide some finer grained control of the number of results we're returning, it's not doing that and I think it adds more confusion than it's worth right now. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ncp is now gone in favor of kBucketSize


/**
* The routing table.
Expand Down Expand Up @@ -321,7 +335,7 @@ class KadDHT extends EventEmitter {
waterfall([
(cb) => utils.convertBuffer(key, cb),
(id, cb) => {
const rtp = this.routingTable.closestPeers(id, c.ALPHA)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seeing as this code is the same for every query, maybe it should be part of the Query itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

const rtp = this.routingTable.closestPeers(id, this.kBucketSize)

this._log('peers in rt: %d', rtp.length)
if (rtp.length === 0) {
Expand Down Expand Up @@ -412,7 +426,7 @@ class KadDHT extends EventEmitter {
return callback(err)
}

const tablePeers = this.routingTable.closestPeers(id, c.ALPHA)
const tablePeers = this.routingTable.closestPeers(id, this.kBucketSize)

const q = new Query(this, key, () => {
// There is no distinction between the disjoint paths,
Expand Down Expand Up @@ -442,7 +456,7 @@ class KadDHT extends EventEmitter {

waterfall([
(cb) => utils.sortClosestPeers(Array.from(res.finalSet), id, cb),
(sorted, cb) => cb(null, sorted.slice(0, c.K))
(sorted, cb) => cb(null, sorted.slice(0, this.kBucketSize))
], callback)
})
})
Expand Down Expand Up @@ -527,6 +541,7 @@ class KadDHT extends EventEmitter {
provide (key, callback) {
this._log('provide: %s', key.toBaseEncodedString())

const errors = []
waterfall([
(cb) => this.providers.addProvider(key, this.peerInfo.id, cb),
(cb) => this.getClosestPeers(key.buffer, cb),
Expand All @@ -536,10 +551,21 @@ class KadDHT extends EventEmitter {

each(peers, (peer, cb) => {
this._log('putProvider %s to %s', key.toBaseEncodedString(), peer.toB58String())
this.network.sendMessage(peer, msg, cb)
this.network.sendMessage(peer, msg, (err) => {
if (err) errors.push(err)
cb()
})
}, cb)
}
], (err) => callback(err))
], (err) => {
if (errors.length) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should help mitigate #105. While the provide will still technically fail, we'll at least have attempted to provide to all peers.

// This should be infrequent. This means a peer we previously connected
// to failed to exchange the provide message. If getClosestPeers was an
// iterator, we could continue to pull until we announce to kBucketSize peers.
err = errcode(`Failed to provide to ${errors.length} of ${this.kBucketSize} peers`, 'ERR_SOME_PROVIDES_FAILED', { errors })
}
callback(err)
})
}

/**
Expand Down Expand Up @@ -613,7 +639,7 @@ class KadDHT extends EventEmitter {
waterfall([
(cb) => utils.convertPeerId(id, cb),
(key, cb) => {
const peers = this.routingTable.closestPeers(key, c.ALPHA)
const peers = this.routingTable.closestPeers(key, this.kBucketSize)

if (peers.length === 0) {
return cb(errcode(new Error('Peer lookup failed'), 'ERR_LOOKUP_FAILED'))
Expand Down
11 changes: 5 additions & 6 deletions src/private.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ module.exports = (dht) => ({
* Try to fetch a given record by from the local datastore.
* Returns the record iff it is still valid, meaning
* - it was either authored by this node, or
* - it was receceived less than `MAX_RECORD_AGE` ago.
* - it was received less than `MAX_RECORD_AGE` ago.
*
* @param {Buffer} key
* @param {function(Error, Record)} callback
Expand Down Expand Up @@ -171,7 +171,7 @@ module.exports = (dht) => ({
*
* @param {Buffer} key
* @param {PeerId} peer
* @param {function(Error)} callback
* @param {function(Error, Array<PeerInfo>)} callback
* @returns {void}
*
* @private
Expand All @@ -185,13 +185,12 @@ module.exports = (dht) => ({

const out = msg.closerPeers
.filter((pInfo) => !dht._isSelf(pInfo.id))
.map((pInfo) => dht.peerBook.put(pInfo))

callback(null, out)
})
},
/**
* Is the given peer id the peer id?
* Is the given peer id our PeerId?
*
* @param {PeerId} other
* @returns {bool}
Expand All @@ -206,7 +205,7 @@ module.exports = (dht) => ({
*
* @param {PeerId} peer
* @param {PeerId} target
* @param {function(Error)} callback
* @param {function(Error, Message)} callback
* @returns {void}
*
* @private
Expand Down Expand Up @@ -522,7 +521,7 @@ module.exports = (dht) => ({
}
})

const peers = dht.routingTable.closestPeers(key.buffer, c.ALPHA)
const peers = dht.routingTable.closestPeers(key.buffer, dht.kBucketSize)

timeout((cb) => query.run(peers, cb), providerTimeout)((err) => {
query.stop()
Expand Down
Loading