From 12fb0d7d63cf0b05b33c1746ecaa4f0ee7fc46b1 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 24 Apr 2019 11:02:54 +0200 Subject: [PATCH 01/20] fix: add basic timeout to individual query func calls --- src/constants.js | 3 +++ src/query.js | 11 +++++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/constants.js b/src/constants.js index 73ba3cba..0fefe0eb 100644 --- a/src/constants.js +++ b/src/constants.js @@ -34,6 +34,9 @@ exports.K = 20 // Alpha is the concurrency for asynchronous requests exports.ALPHA = 3 +// How long individual query messages are allowed to take +exports.MAX_MESSAGE_TIMEOUT = 5 * minute + // Number of disjoint query paths to use // This is set to K/2 per the S/Kademlia paper exports.DISJOINT_PATHS = 10 diff --git a/src/query.js b/src/query.js index 7e3cdf9a..61044a07 100644 --- a/src/query.js +++ b/src/query.js @@ -4,6 +4,7 @@ const EventEmitter = require('events') const waterfall = require('async/waterfall') const each = require('async/each') const queue = require('async/queue') +const timeout = require('async/timeout') const mh = require('multihashes') const c = require('./constants') @@ -81,6 +82,7 @@ class Query { */ _onStart () { this.running = true + this._startTime = Date.now() this._log('query:start') // Register this query so we can stop it if the DHT stops @@ -91,7 +93,7 @@ class Query { * Called when the run completes (even if there's an error). */ _onComplete () { - this._log('query:done') + this._log(`query:done in ${Date.now() - this._startTime}ms`) // Ensure worker queues for all paths are stopped at the end of the query this.stop() @@ -339,7 +341,7 @@ class Path { */ constructor (run, queryFunc) { this.run = run - this.queryFunc = queryFunc + this.queryFunc = timeout(queryFunc, c.MAX_MESSAGE_TIMEOUT) // TODO: make configurable this.initialPeers = [] } @@ -456,6 +458,7 @@ class WorkerQueue { * @param {Error} err */ stop (err) { + this.log('worker:stop') if (!this.running) { return } @@ -495,6 +498,10 @@ class WorkerQueue { this.path.peersToQuery.length > 0) { this.queue.push(this.path.peersToQuery.dequeue()) } + + if (this.queue.length() < 1) { + this.log('queue is empty and cant be filled') + } } /** From bf152e0f2fbf8e56800121afb0e2155449342b4e Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 24 Apr 2019 11:29:41 +0200 Subject: [PATCH 02/20] fix: per s/kademlia start with K peers, not Alpha --- src/index.js | 6 +++--- src/private.js | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/index.js b/src/index.js index 6b519944..032f6d45 100644 --- a/src/index.js +++ b/src/index.js @@ -321,7 +321,7 @@ class KadDHT extends EventEmitter { waterfall([ (cb) => utils.convertBuffer(key, cb), (id, cb) => { - const rtp = this.routingTable.closestPeers(id, c.ALPHA) + const rtp = this.routingTable.closestPeers(id, c.K) this._log('peers in rt: %d', rtp.length) if (rtp.length === 0) { @@ -412,7 +412,7 @@ class KadDHT extends EventEmitter { return callback(err) } - const tablePeers = this.routingTable.closestPeers(id, c.ALPHA) + const tablePeers = this.routingTable.closestPeers(id, c.K) const q = new Query(this, key, () => { // There is no distinction between the disjoint paths, @@ -613,7 +613,7 @@ class KadDHT extends EventEmitter { waterfall([ (cb) => utils.convertPeerId(id, cb), (key, cb) => { - const peers = this.routingTable.closestPeers(key, c.ALPHA) + const peers = this.routingTable.closestPeers(key, c.K) if (peers.length === 0) { return cb(errcode(new Error('Peer lookup failed'), 'ERR_LOOKUP_FAILED')) diff --git a/src/private.js b/src/private.js index 70f72cb1..b0ba0d16 100644 --- a/src/private.js +++ b/src/private.js @@ -522,7 +522,7 @@ module.exports = (dht) => ({ } }) - const peers = dht.routingTable.closestPeers(key.buffer, c.ALPHA) + const peers = dht.routingTable.closestPeers(key.buffer, c.K) timeout((cb) => query.run(peers, cb), providerTimeout)((err) => { query.stop() From 81c26d90d9e92ba0d3c6f93008babd1e4a6f3d38 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 24 Apr 2019 14:01:11 +0200 Subject: [PATCH 03/20] fix: properly use kBucketSize option when provided --- src/constants.js | 9 +++------ src/index.js | 10 +++++----- src/private.js | 4 ++-- src/query.js | 6 +++--- test/kad-dht.spec.js | 12 ++++++------ 5 files changed, 19 insertions(+), 22 deletions(-) diff --git a/src/constants.js b/src/constants.js index 0fefe0eb..6c3b4933 100644 --- a/src/constants.js +++ b/src/constants.js @@ -23,7 +23,7 @@ exports.PROVIDERS_VALIDITY = 24 * hour exports.PROVIDERS_CLEANUP_INTERVAL = hour -exports.READ_MESSAGE_TIMEOUT = minute +exports.READ_MESSAGE_TIMEOUT = 30 * second // The number of records that will be retrieved on a call to getMany() exports.GET_MANY_RECORD_COUNT = 16 @@ -34,12 +34,9 @@ exports.K = 20 // Alpha is the concurrency for asynchronous requests exports.ALPHA = 3 -// How long individual query messages are allowed to take -exports.MAX_MESSAGE_TIMEOUT = 5 * minute - // Number of disjoint query paths to use // This is set to K/2 per the S/Kademlia paper -exports.DISJOINT_PATHS = 10 +exports.DISJOINT_PATHS = exports.K / 2 exports.maxMessageSize = 2 << 22 // 4MB @@ -47,6 +44,6 @@ exports.defaultRandomWalk = { enabled: true, queriesPerPeriod: 1, interval: 5 * minute, - timeout: 30 * second, + timeout: 10 * second, delay: 10 * second } diff --git a/src/index.js b/src/index.js index 032f6d45..188fce42 100644 --- a/src/index.js +++ b/src/index.js @@ -79,7 +79,7 @@ class KadDHT extends EventEmitter { * * @type {number} */ - this.ncp = options.ncp || c.K + this.ncp = options.ncp || this.kBucketSize /** * The routing table. @@ -321,7 +321,7 @@ class KadDHT extends EventEmitter { waterfall([ (cb) => utils.convertBuffer(key, cb), (id, cb) => { - const rtp = this.routingTable.closestPeers(id, c.K) + const rtp = this.routingTable.closestPeers(id, this.kBucketSize) this._log('peers in rt: %d', rtp.length) if (rtp.length === 0) { @@ -412,7 +412,7 @@ class KadDHT extends EventEmitter { return callback(err) } - const tablePeers = this.routingTable.closestPeers(id, c.K) + const tablePeers = this.routingTable.closestPeers(id, this.kBucketSize) const q = new Query(this, key, () => { // There is no distinction between the disjoint paths, @@ -442,7 +442,7 @@ class KadDHT extends EventEmitter { waterfall([ (cb) => utils.sortClosestPeers(Array.from(res.finalSet), id, cb), - (sorted, cb) => cb(null, sorted.slice(0, c.K)) + (sorted, cb) => cb(null, sorted.slice(0, this.kBucketSize)) ], callback) }) }) @@ -613,7 +613,7 @@ class KadDHT extends EventEmitter { waterfall([ (cb) => utils.convertPeerId(id, cb), (key, cb) => { - const peers = this.routingTable.closestPeers(key, c.K) + const peers = this.routingTable.closestPeers(key, this.kBucketSize) if (peers.length === 0) { return cb(errcode(new Error('Peer lookup failed'), 'ERR_LOOKUP_FAILED')) diff --git a/src/private.js b/src/private.js index b0ba0d16..ea7712d1 100644 --- a/src/private.js +++ b/src/private.js @@ -82,7 +82,7 @@ module.exports = (dht) => ({ * Try to fetch a given record by from the local datastore. * Returns the record iff it is still valid, meaning * - it was either authored by this node, or - * - it was receceived less than `MAX_RECORD_AGE` ago. + * - it was received less than `MAX_RECORD_AGE` ago. * * @param {Buffer} key * @param {function(Error, Record)} callback @@ -522,7 +522,7 @@ module.exports = (dht) => ({ } }) - const peers = dht.routingTable.closestPeers(key.buffer, c.K) + const peers = dht.routingTable.closestPeers(key.buffer, dht.kBucketSize) timeout((cb) => query.run(peers, cb), providerTimeout)((err) => { query.stop() diff --git a/src/query.js b/src/query.js index 61044a07..1619cdd6 100644 --- a/src/query.js +++ b/src/query.js @@ -4,7 +4,6 @@ const EventEmitter = require('events') const waterfall = require('async/waterfall') const each = require('async/each') const queue = require('async/queue') -const timeout = require('async/timeout') const mh = require('multihashes') const c = require('./constants') @@ -94,6 +93,7 @@ class Query { */ _onComplete () { this._log(`query:done in ${Date.now() - this._startTime}ms`) + this._log(`${this.run.errors.length} of ${this.run.peersSeen.size} peers errored (${this.run.errors.length / this.run.peersSeen.size * 100}% fail rate)`) // Ensure worker queues for all paths are stopped at the end of the query this.stop() @@ -278,7 +278,7 @@ class Run extends EventEmitter { // Convert the key into a DHT key by hashing it utils.convertBuffer(this.query.key, (err, dhtKey) => { - this.peersQueried = new PeerDistanceList(dhtKey, c.K) + this.peersQueried = new PeerDistanceList(dhtKey, this.query.dht.kBucketSize) for (const cb of this.awaitingKey) { cb(err) @@ -341,7 +341,7 @@ class Path { */ constructor (run, queryFunc) { this.run = run - this.queryFunc = timeout(queryFunc, c.MAX_MESSAGE_TIMEOUT) // TODO: make configurable + this.queryFunc = queryFunc this.initialPeers = [] } diff --git a/test/kad-dht.spec.js b/test/kad-dht.spec.js index 6fcdd570..fd44d9fa 100644 --- a/test/kad-dht.spec.js +++ b/test/kad-dht.spec.js @@ -737,7 +737,7 @@ describe('KadDHT', () => { // Get the alpha (3) closest peers to the key from the origin's // routing table const rtablePeers = guy.routingTable.closestPeers(rtval, c.ALPHA) - expect(rtablePeers).to.have.length(3) + expect(rtablePeers).to.have.length(c.ALPHA) // The set of peers used to initiate the query (the closest alpha // peers to the key that the origin knows about) @@ -768,13 +768,13 @@ describe('KadDHT', () => { expect(out.filter((p) => !rtableSet[p.toB58String()])) .to.not.be.empty() - // Expect that there were 20 peers found - expect(out).to.have.length(20) + // Expect that there were kValue peers found + expect(out).to.have.length(c.K) - // The expected closest 20 peers to the key - const exp = actualClosest.slice(0, 20) + // The expected closest kValue peers to the key + const exp = actualClosest.slice(0, c.K) - // Expect the 20 peers found to be the 20 closest connected peers + // Expect the kValue peers found to be the kValue closest connected peers // to the key expect(countDiffPeers(exp, out)).to.eql(0) From 57133a34727ba446951f454f037c72ccb5971e4b Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 24 Apr 2019 14:13:39 +0200 Subject: [PATCH 04/20] fix: make disjoint paths a function of kBucketSize/2 --- src/constants.js | 6 +----- src/index.js | 7 +++++++ src/query.js | 4 +++- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/constants.js b/src/constants.js index 6c3b4933..fa850687 100644 --- a/src/constants.js +++ b/src/constants.js @@ -32,11 +32,7 @@ exports.GET_MANY_RECORD_COUNT = 16 exports.K = 20 // Alpha is the concurrency for asynchronous requests -exports.ALPHA = 3 - -// Number of disjoint query paths to use -// This is set to K/2 per the S/Kademlia paper -exports.DISJOINT_PATHS = exports.K / 2 +exports.ALPHA = 6 exports.maxMessageSize = 2 << 22 // 4MB diff --git a/src/index.js b/src/index.js index 188fce42..bb964506 100644 --- a/src/index.js +++ b/src/index.js @@ -74,6 +74,13 @@ class KadDHT extends EventEmitter { */ this.kBucketSize = options.kBucketSize || c.K + /** + * Number of disjoint query paths to use + * This is set to `kBucketSize`/2 per the S/Kademlia paper + * @type {number} + */ + this.disjointPaths = Math.ceil(this.kBucketSize / 2) + /** * Number of closest peers to return on kBucket search, default 20 * diff --git a/src/query.js b/src/query.js index 1619cdd6..666359b8 100644 --- a/src/query.js +++ b/src/query.js @@ -71,6 +71,8 @@ class Query { } this.run = new Run(this) + + this._log(`query running with K=${this.dht.kBucketSize}, A=${c.ALPHA}, D=${Math.min(this.dht.disjointPaths, peers.length)}`) this.run.once('start', this._onStart) this.run.once('complete', this._onComplete) this.run.execute(peers, callback) @@ -166,7 +168,7 @@ class Run extends EventEmitter { const paths = [] // array of states per disjoint path // Create disjoint paths - const numPaths = Math.min(c.DISJOINT_PATHS, peers.length) + const numPaths = Math.min(this.query.dht.disjointPaths, peers.length) for (let i = 0; i < numPaths; i++) { paths.push(new Path(this, this.query.makePath(i, numPaths))) } From 6ce1e1bfe08f528f952f85225adfd7a0c706d567 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Thu, 25 Apr 2019 15:01:56 +0200 Subject: [PATCH 05/20] refactor: split up query classes for easier testing --- src/query.js | 638 --------------------------------------- src/query/index.js | 116 +++++++ src/query/path.js | 77 +++++ src/query/run.js | 220 ++++++++++++++ src/query/workerQueue.js | 241 +++++++++++++++ 5 files changed, 654 insertions(+), 638 deletions(-) delete mode 100644 src/query.js create mode 100644 src/query/index.js create mode 100644 src/query/path.js create mode 100644 src/query/run.js create mode 100644 src/query/workerQueue.js diff --git a/src/query.js b/src/query.js deleted file mode 100644 index 666359b8..00000000 --- a/src/query.js +++ /dev/null @@ -1,638 +0,0 @@ -'use strict' - -const EventEmitter = require('events') -const waterfall = require('async/waterfall') -const each = require('async/each') -const queue = require('async/queue') -const mh = require('multihashes') - -const c = require('./constants') -const PeerQueue = require('./peer-queue') -const PeerDistanceList = require('./peer-distance-list') -const utils = require('./utils') - -/** - * Divide peers up into disjoint paths (subqueries). Any peer can only be used once over all paths. - * Within each path, query peers from closest to farthest away. - */ -class Query { - /** - * User-supplied function to set up an individual disjoint path. Per-path - * query state should be held in this function's closure. - * @typedef {makePath} function - * @param {number} pathNum - Numeric index from zero to numPaths - 1 - * @returns {queryFunc} - Function to call on each peer in the query - */ - - /** - * Query function. - * @typedef {queryFunc} function - * @param {PeerId} next - Peer to query - * @param {function(Error, Object)} callback - Query result callback - */ - - /** - * Create a new query. The makePath function is called once per disjoint path, so that per-path - * variables can be created in that scope. makePath then returns the actual query function (queryFunc) to - * use when on that path. - * - * @param {DHT} dht - DHT instance - * @param {Buffer} key - * @param {makePath} makePath - Called to set up each disjoint path. Must return the query function. - */ - constructor (dht, key, makePath) { - this.dht = dht - this.key = key - this.makePath = makePath - this._log = utils.logger(this.dht.peerInfo.id, 'query:' + mh.toB58String(key)) - - this.running = false - - this._onStart = this._onStart.bind(this) - this._onComplete = this._onComplete.bind(this) - } - - /** - * Run this query, start with the given list of peers first. - * - * @param {Array} peers - * @param {function(Error, Object)} callback - * @returns {void} - */ - run (peers, callback) { - if (!this.dht._queryManager.running) { - this._log.error('Attempt to run query after shutdown') - return callback(null, { finalSet: new Set(), paths: [] }) - } - - if (peers.length === 0) { - this._log.error('Running query with no peers') - return callback(null, { finalSet: new Set(), paths: [] }) - } - - this.run = new Run(this) - - this._log(`query running with K=${this.dht.kBucketSize}, A=${c.ALPHA}, D=${Math.min(this.dht.disjointPaths, peers.length)}`) - this.run.once('start', this._onStart) - this.run.once('complete', this._onComplete) - this.run.execute(peers, callback) - } - - /** - * Called when the run starts. - */ - _onStart () { - this.running = true - this._startTime = Date.now() - this._log('query:start') - - // Register this query so we can stop it if the DHT stops - this.dht._queryManager.queryStarted(this) - } - - /** - * Called when the run completes (even if there's an error). - */ - _onComplete () { - this._log(`query:done in ${Date.now() - this._startTime}ms`) - this._log(`${this.run.errors.length} of ${this.run.peersSeen.size} peers errored (${this.run.errors.length / this.run.peersSeen.size * 100}% fail rate)`) - - // Ensure worker queues for all paths are stopped at the end of the query - this.stop() - } - - /** - * Stop the query. - */ - stop () { - if (!this.running) { - return - } - - this.run.removeListener('start', this._onStart) - this.run.removeListener('complete', this._onComplete) - - this.running = false - this.run && this.run.stop() - this.dht._queryManager.queryCompleted(this) - } -} - -/** - * Manages a single run of the query. - */ -class Run extends EventEmitter { - /** - * Creates a Run. - * - * @param {Query} query - */ - constructor (query) { - super() - - this.query = query - - this.running = false - this.workers = [] - - // The peers that have been queried (including error responses) - this.peersSeen = new Set() - // The errors received when querying peers - this.errors = [] - // The closest K peers that have been queried successfully - // (this member is initialized when the worker queues start) - this.peersQueried = null - } - - /** - * Stop all the workers - */ - stop () { - if (!this.running) { - return - } - - this.running = false - for (const worker of this.workers) { - worker.stop() - } - } - - /** - * Execute the run with the given initial set of peers. - * - * @param {Array} peers - * @param {function(Error, Object)} callback - */ - execute (peers, callback) { - const paths = [] // array of states per disjoint path - - // Create disjoint paths - const numPaths = Math.min(this.query.dht.disjointPaths, peers.length) - for (let i = 0; i < numPaths; i++) { - paths.push(new Path(this, this.query.makePath(i, numPaths))) - } - - // Assign peers to paths round-robin style - peers.forEach((peer, i) => { - paths[i % numPaths].addInitialPeer(peer) - }) - - // Execute the query along each disjoint path - // each(paths, (path, cb) => path.execute(cb), (err) => { - this.executePaths(paths, (err) => { - if (err) { - return callback(err) - } - - const res = { - // The closest K peers we were able to query successfully - finalSet: new Set(this.peersQueried.peers), - paths: [] - } - - // Collect the results from each completed path - for (const path of paths) { - if (path.res && (path.res.pathComplete || path.res.queryComplete)) { - path.res.success = true - res.paths.push(path.res) - } - } - - callback(err, res) - }) - } - - /** - * Execute all paths through the DHT. - * - * @param {Array} paths - * @param {function(Error)} callback - */ - executePaths (paths, callback) { - this.running = true - - this.emit('start') - each(paths, (path, cb) => path.execute(cb), (err) => { - // Ensure all workers are stopped - this.stop() - - // Completed the Run - this.emit('complete') - - if (err) { - return callback(err) - } - - // If all queries errored out, something is seriously wrong, so callback - // with an error - if (this.errors.length === this.peersSeen.size) { - return callback(this.errors[0]) - } - - callback() - }) - } - - /** - * Initialize the list of queried peers, then start a worker queue for the - * given path. - * - * @param {Path} path - * @param {function(Error)} callback - */ - workerQueue (path, callback) { - this.init(() => this.startWorker(path, callback)) - } - - /** - * Create and start a worker queue for a particular path. - * - * @param {Path} path - * @param {function(Error)} callback - */ - startWorker (path, callback) { - const worker = new WorkerQueue(this.query.dht, this, path, this.query._log) - this.workers.push(worker) - worker.execute(callback) - } - - /** - * Initialize the list of closest peers we've queried - this is shared by all - * paths in the run. - * - * @param {function(Error)} callback - * @returns {void} - */ - init (callback) { - if (this.peersQueried) { - return callback() - } - - // We only want to initialize it once for the run, and then inform each - // path worker that it's ready - if (this.awaitingKey) { - this.awaitingKey.push(callback) - return - } - - this.awaitingKey = [callback] - - // Convert the key into a DHT key by hashing it - utils.convertBuffer(this.query.key, (err, dhtKey) => { - this.peersQueried = new PeerDistanceList(dhtKey, this.query.dht.kBucketSize) - - for (const cb of this.awaitingKey) { - cb(err) - } - this.awaitingKey = undefined - }) - } - - /** - * If we've queried K peers, and the remaining peers in the queues are all - * further from the key than the peers we've already queried, then we should - * stop querying. - * - * @param {function(Error, boolean)} callback - * @returns {void} - */ - continueQuerying (callback) { - // If we haven't queried K peers yet, keep going - if (this.peersQueried.length < this.peersQueried.capacity) { - return callback(null, true) - } - - // Get all the peers that are currently being queried. - // Note that this function gets called right after a peer has been popped - // off the head of the closest peers queue so it will include that peer. - let running = [] - for (const worker of this.workers) { - const peerIds = worker.queue.workersList().map(i => i.data) - running = running.concat(peerIds) - } - - // Check if any of the peers that are currently being queried are closer - // to the key than the peers we've already queried - this.peersQueried.anyCloser(running, (err, someCloser) => { - if (err) { - return callback(err) - } - - // Some are closer, keep going - if (someCloser) { - return callback(null, true) - } - - // None are closer, so we can stop querying - this.stop() - callback(null, false) - }) - } -} - -/** - * Manages a single Path through the DHT. - */ -class Path { - /** - * Creates a Path. - * - * @param {Run} run - * @param {queryFunc} queryFunc - */ - constructor (run, queryFunc) { - this.run = run - this.queryFunc = queryFunc - this.initialPeers = [] - } - - /** - * Add a peer to the set of peers that are used to intialize the path. - * - * @param {PeerId} peer - */ - addInitialPeer (peer) { - this.initialPeers.push(peer) - } - - /** - * Execute the path. - * - * @param {function(Error)} callback - */ - execute (callback) { - waterfall([ - // Create a queue of peers ordered by distance from the key - (cb) => PeerQueue.fromKey(this.run.query.key, cb), - // Add initial peers to the queue - (q, cb) => { - this.peersToQuery = q - each(this.initialPeers, this.addPeerToQuery.bind(this), cb) - }, - // Start processing the queue - (cb) => { - this.run.workerQueue(this, cb) - } - ], callback) - } - - /** - * Add a peer to the peers to be queried. - * - * @param {PeerId} peer - * @param {function(Error)} callback - * @returns {void} - * @private - */ - addPeerToQuery (peer, callback) { - // Don't add self - if (this.run.query.dht._isSelf(peer)) { - return callback() - } - - // The paths must be disjoint, meaning that no two paths in the Query may - // traverse the same peer - if (this.run.peersSeen.has(peer)) { - return callback() - } - - this.peersToQuery.enqueue(peer, callback) - } -} - -class WorkerQueue { - /** - * Creates a new WorkerQueue. - * - * @param {DHT} dht - * @param {Run} run - * @param {Object} path - * @param {function} log - */ - constructor (dht, run, path, log) { - this.dht = dht - this.run = run - this.path = path - this.log = log - - this.concurrency = c.ALPHA - this.queue = this.setupQueue() - } - - /** - * Create the underlying async queue. - * - * @returns {Object} - */ - setupQueue () { - const q = queue(this.processNext.bind(this), this.concurrency) - - // If there's an error, stop the worker - q.error = (err) => { - this.log.error('queue', err) - this.stop(err) - } - - // When all peers in the queue have been processed, stop the worker - q.drain = () => { - this.log('queue:drain') - this.stop() - } - - // When a space opens up in the queue, add some more peers - q.unsaturated = () => { - if (this.running) { - this.log('queue:unsaturated') - this.fill() - } - } - - q.buffer = 0 - - return q - } - - /** - * Stop the worker, optionally providing an error to pass to the worker's - * callback. - * - * @param {Error} err - */ - stop (err) { - this.log('worker:stop') - if (!this.running) { - return - } - - this.running = false - this.queue.kill() - this.callbackFn(err) - } - - /** - * Use the queue from async to keep `concurrency` amount items running - * per path. - * - * @param {function(Error)} callback - */ - execute (callback) { - this.running = true - this.callbackFn = callback - this.fill() - } - - /** - * Add peers to the worker queue until there are enough to satisfy the - * worker queue concurrency. - * Note that we don't want to take any more than those required to satisfy - * concurrency from the peers-to-query queue, because we always want to - * query the closest peers to the key first, and new peers are continously - * being added to the peers-to-query queue. - */ - fill () { - this.log('queue:fill') - - // Note: - // - queue.running(): number of items that are currently running - // - queue.length(): the number of items that are waiting to be run - while (this.queue.running() + this.queue.length() < this.concurrency && - this.path.peersToQuery.length > 0) { - this.queue.push(this.path.peersToQuery.dequeue()) - } - - if (this.queue.length() < 1) { - this.log('queue is empty and cant be filled') - } - } - - /** - * Process the next peer in the queue - * - * @param {PeerId} peer - * @param {function(Error)} cb - * @returns {void} - */ - processNext (peer, cb) { - if (!this.running) { - return cb() - } - - // The paths must be disjoint, meaning that no two paths in the Query may - // traverse the same peer - if (this.run.peersSeen.has(peer)) { - return cb() - } - - // Check if we've queried enough peers already - this.run.continueQuerying((err, continueQuerying) => { - if (!this.running) { - return cb() - } - - if (err) { - return cb(err) - } - - // If we've queried enough peers, bail out - if (!continueQuerying) { - return cb() - } - - // Check if another path has queried this peer in the mean time - if (this.run.peersSeen.has(peer)) { - return cb() - } - this.run.peersSeen.add(peer) - - // Execute the query on the next peer - this.log('queue:work') - this.execQuery(peer, (err, state) => { - // Ignore response after worker killed - if (!this.running) { - return cb() - } - - this.log('queue:work:done', err, state) - if (err) { - return cb(err) - } - - // If query is complete, stop all workers. - // Note: run.stop() calls stop() on all the workers, which kills the - // queue and calls callbackFn() - if (state && state.queryComplete) { - this.log('query:complete') - this.run.stop() - return cb() - } - - // If path is complete, just stop this worker. - // Note: this.stop() kills the queue and calls callbackFn() - if (state && state.pathComplete) { - this.stop() - return cb() - } - - // Otherwise, process next peer - cb() - }) - }) - } - - /** - * Execute a query on the next peer. - * - * @param {PeerId} peer - * @param {function(Error)} callback - * @returns {void} - * @private - */ - execQuery (peer, callback) { - this.path.queryFunc(peer, (err, res) => { - // If the run has completed, bail out - if (!this.running) { - return callback() - } - - if (err) { - this.run.errors.push(err) - return callback() - } - - // Add the peer to the closest peers we have successfully queried - this.run.peersQueried.add(peer, (err) => { - if (err) { - return callback(err) - } - - // If the query indicates that this path or the whole query is complete - // set the path result and bail out - if (res.pathComplete || res.queryComplete) { - this.path.res = res - return callback(null, { - pathComplete: res.pathComplete, - queryComplete: res.queryComplete - }) - } - - // If there are closer peers to query, add them to the queue - if (res.closerPeers && res.closerPeers.length > 0) { - return each(res.closerPeers, (closer, cb) => { - // don't add ourselves - if (this.dht._isSelf(closer.id)) { - return cb() - } - closer = this.dht.peerBook.put(closer) - this.dht._peerDiscovered(closer) - this.path.addPeerToQuery(closer.id, cb) - }, callback) - } - - callback() - }) - }) - } -} - -module.exports = Query diff --git a/src/query/index.js b/src/query/index.js new file mode 100644 index 00000000..5275bf48 --- /dev/null +++ b/src/query/index.js @@ -0,0 +1,116 @@ +'use strict' + +const mh = require('multihashes') + +const c = require('../constants') +const utils = require('../utils') +const Run = require('./run') + +/** + * Divide peers up into disjoint paths (subqueries). Any peer can only be used once over all paths. + * Within each path, query peers from closest to farthest away. + */ +class Query { + /** + * User-supplied function to set up an individual disjoint path. Per-path + * query state should be held in this function's closure. + * @typedef {makePath} function + * @param {number} pathNum - Numeric index from zero to numPaths - 1 + * @returns {queryFunc} - Function to call on each peer in the query + */ + + /** + * Query function. + * @typedef {queryFunc} function + * @param {PeerId} next - Peer to query + * @param {function(Error, Object)} callback - Query result callback + */ + + /** + * Create a new query. The makePath function is called once per disjoint path, so that per-path + * variables can be created in that scope. makePath then returns the actual query function (queryFunc) to + * use when on that path. + * + * @param {DHT} dht - DHT instance + * @param {Buffer} key + * @param {makePath} makePath - Called to set up each disjoint path. Must return the query function. + */ + constructor (dht, key, makePath) { + this.dht = dht + this.key = key + this.makePath = makePath + this._log = utils.logger(this.dht.peerInfo.id, 'query:' + mh.toB58String(key)) + + this.running = false + + this._onStart = this._onStart.bind(this) + this._onComplete = this._onComplete.bind(this) + } + + /** + * Run this query, start with the given list of peers first. + * + * @param {Array} peers + * @param {function(Error, Object)} callback + * @returns {void} + */ + run (peers, callback) { + if (!this.dht._queryManager.running) { + this._log.error('Attempt to run query after shutdown') + return callback(null, { finalSet: new Set(), paths: [] }) + } + + if (peers.length === 0) { + this._log.error('Running query with no peers') + return callback(null, { finalSet: new Set(), paths: [] }) + } + + this.run = new Run(this) + + this._log(`query running with K=${this.dht.kBucketSize}, A=${c.ALPHA}, D=${Math.min(this.dht.disjointPaths, peers.length)}`) + this.run.once('start', this._onStart) + this.run.once('complete', this._onComplete) + this.run.execute(peers, callback) + } + + /** + * Called when the run starts. + */ + _onStart () { + this.running = true + this._startTime = Date.now() + this._log('query:start') + + // Register this query so we can stop it if the DHT stops + this.dht._queryManager.queryStarted(this) + } + + /** + * Called when the run completes (even if there's an error). + */ + _onComplete () { + // Ensure worker queues for all paths are stopped at the end of the query + this.stop() + } + + /** + * Stop the query. + */ + stop () { + this._log(`query:done in ${Date.now() - this._startTime}ms`) + this._log(`${this.run.errors.length} of ${this.run.peersSeen.size} peers errored (${this.run.errors.length / this.run.peersSeen.size * 100}% fail rate)`) + + if (!this.running) { + return + } + + this.run.removeListener('start', this._onStart) + this.run.removeListener('complete', this._onComplete) + + this.running = false + this.run && this.run.stop() + this.dht._queryManager.queryCompleted(this) + } +} + +module.exports = Query diff --git a/src/query/path.js b/src/query/path.js new file mode 100644 index 00000000..77e9defc --- /dev/null +++ b/src/query/path.js @@ -0,0 +1,77 @@ +'use strict' + +const each = require('async/each') +const waterfall = require('async/waterfall') +const PeerQueue = require('../peer-queue') + +/** + * Manages a single Path through the DHT. + */ +class Path { + /** + * Creates a Path. + * + * @param {Run} run + * @param {queryFunc} queryFunc + */ + constructor (run, queryFunc) { + this.run = run + this.queryFunc = queryFunc + this.initialPeers = [] + } + + /** + * Add a peer to the set of peers that are used to intialize the path. + * + * @param {PeerId} peer + */ + addInitialPeer (peer) { + this.initialPeers.push(peer) + } + + /** + * Execute the path. + * + * @param {function(Error)} callback + */ + execute (callback) { + waterfall([ + // Create a queue of peers ordered by distance from the key + (cb) => PeerQueue.fromKey(this.run.query.key, cb), + // Add initial peers to the queue + (q, cb) => { + this.peersToQuery = q + each(this.initialPeers, this.addPeerToQuery.bind(this), cb) + }, + // Start processing the queue + (cb) => { + this.run.workerQueue(this, cb) + } + ], callback) + } + + /** + * Add a peer to the peers to be queried. + * + * @param {PeerId} peer + * @param {function(Error)} callback + * @returns {void} + * @private + */ + addPeerToQuery (peer, callback) { + // Don't add self + if (this.run.query.dht._isSelf(peer)) { + return callback() + } + + // The paths must be disjoint, meaning that no two paths in the Query may + // traverse the same peer + if (this.run.peersSeen.has(peer)) { + return callback() + } + + this.peersToQuery.enqueue(peer, callback) + } +} + +module.exports = Path diff --git a/src/query/run.js b/src/query/run.js new file mode 100644 index 00000000..df8f1da6 --- /dev/null +++ b/src/query/run.js @@ -0,0 +1,220 @@ +'use strict' + +const PeerDistanceList = require('../peer-distance-list') +const EventEmitter = require('events') +const each = require('async/each') +const Path = require('./path') +const WorkerQueue = require('./workerQueue') +const utils = require('../utils') + +/** + * Manages a single run of the query. + */ +class Run extends EventEmitter { + /** + * Creates a Run. + * + * @param {Query} query + */ + constructor (query) { + super() + + this.query = query + + this.running = false + this.workers = [] + + // The peers that have been queried (including error responses) + this.peersSeen = new Set() + // The errors received when querying peers + this.errors = [] + // The closest K peers that have been queried successfully + // (this member is initialized when the worker queues start) + this.peersQueried = null + } + + /** + * Stop all the workers + */ + stop () { + if (!this.running) { + return + } + + this.running = false + for (const worker of this.workers) { + worker.stop() + } + } + + /** + * Execute the run with the given initial set of peers. + * + * @param {Array} peers + * @param {function(Error, Object)} callback + */ + execute (peers, callback) { + const paths = [] // array of states per disjoint path + + // Create disjoint paths + const numPaths = Math.min(this.query.dht.disjointPaths, peers.length) + for (let i = 0; i < numPaths; i++) { + paths.push(new Path(this, this.query.makePath(i, numPaths))) + } + + // Assign peers to paths round-robin style + peers.forEach((peer, i) => { + paths[i % numPaths].addInitialPeer(peer) + }) + + // Execute the query along each disjoint path + // each(paths, (path, cb) => path.execute(cb), (err) => { + this.executePaths(paths, (err) => { + if (err) { + return callback(err) + } + + const res = { + // The closest K peers we were able to query successfully + finalSet: new Set(this.peersQueried.peers), + paths: [] + } + + // Collect the results from each completed path + for (const path of paths) { + if (path.res && (path.res.pathComplete || path.res.queryComplete)) { + path.res.success = true + res.paths.push(path.res) + } + } + + callback(err, res) + }) + } + + /** + * Execute all paths through the DHT. + * + * @param {Array} paths + * @param {function(Error)} callback + */ + executePaths (paths, callback) { + this.running = true + + this.emit('start') + each(paths, (path, cb) => path.execute(cb), (err) => { + // Ensure all workers are stopped + this.stop() + + // Completed the Run + this.emit('complete') + + if (err) { + return callback(err) + } + + // If all queries errored out, something is seriously wrong, so callback + // with an error + if (this.errors.length === this.peersSeen.size) { + return callback(this.errors[0]) + } + + callback() + }) + } + + /** + * Initialize the list of queried peers, then start a worker queue for the + * given path. + * + * @param {Path} path + * @param {function(Error)} callback + */ + workerQueue (path, callback) { + this.init(() => this.startWorker(path, callback)) + } + + /** + * Create and start a worker queue for a particular path. + * + * @param {Path} path + * @param {function(Error)} callback + */ + startWorker (path, callback) { + const worker = new WorkerQueue(this.query.dht, this, path, this.query._log) + this.workers.push(worker) + worker.execute(callback) + } + + /** + * Initialize the list of closest peers we've queried - this is shared by all + * paths in the run. + * + * @param {function(Error)} callback + * @returns {void} + */ + init (callback) { + if (this.peersQueried) { + return callback() + } + + // We only want to initialize it once for the run, and then inform each + // path worker that it's ready + if (this.awaitingKey) { + this.awaitingKey.push(callback) + return + } + + this.awaitingKey = [callback] + + // Convert the key into a DHT key by hashing it + utils.convertBuffer(this.query.key, (err, dhtKey) => { + this.peersQueried = new PeerDistanceList(dhtKey, this.query.dht.kBucketSize) + + for (const cb of this.awaitingKey) { + cb(err) + } + this.awaitingKey = undefined + }) + } + + /** + * If we've queried K peers, and the remaining peers in the given `worker`'s queue + * are all further from the key than the peers we've already queried, then we should + * stop querying on that `worker`. + * + * @param {WorkerQueue} worker + * @param {function(Error, boolean)} callback + * @returns {void} + */ + continueQuerying (worker, callback) { + // If we haven't queried K peers yet, keep going + if (this.peersQueried.length < this.peersQueried.capacity) { + return callback(null, true) + } + + // Get all the peers that are currently being queried. + // Note that this function gets called right after a peer has been popped + // off the head of the closest peers queue so it will include that peer. + const running = worker.queue.workersList().map(i => i.data) + + + // Check if any of the peers that are currently being queried are closer + // to the key than the peers we've already queried + this.peersQueried.anyCloser(running, (err, someCloser) => { + if (err) { + return callback(err) + } + + // Some are closer, the worker should keep going + if (someCloser) { + return callback(null, true) + } + + // None are closer, the worker can stop + callback(null, false) + }) + } +} + +module.exports = Run diff --git a/src/query/workerQueue.js b/src/query/workerQueue.js new file mode 100644 index 00000000..5c98650e --- /dev/null +++ b/src/query/workerQueue.js @@ -0,0 +1,241 @@ +'use strict' + +const each = require('async/each') +const queue = require('async/queue') +const c = require('../constants') + +class WorkerQueue { + /** + * Creates a new WorkerQueue. + * + * @param {DHT} dht + * @param {Run} run + * @param {Object} path + * @param {function} log + */ + constructor (dht, run, path, log) { + this.dht = dht + this.run = run + this.path = path + this.log = log + + this.concurrency = c.ALPHA + this.queue = this.setupQueue() + } + + /** + * Create the underlying async queue. + * + * @returns {Object} + */ + setupQueue () { + const q = queue(this.processNext.bind(this), this.concurrency) + + // If there's an error, stop the worker + q.error = (err) => { + this.log.error('queue', err) + this.stop(err) + } + + // When all peers in the queue have been processed, stop the worker + q.drain = () => { + this.log('queue:drain') + this.stop() + } + + // When a space opens up in the queue, add some more peers + q.unsaturated = () => { + if (this.running) { + // this.log('queue:unsaturated') + this.fill() + } + } + + q.buffer = 0 + + return q + } + + /** + * Stop the worker, optionally providing an error to pass to the worker's + * callback. + * + * @param {Error} err + */ + stop (err) { + this.log('worker:stop') + if (!this.running) { + return + } + + this.running = false + this.queue.kill() + this.callbackFn(err) + } + + /** + * Use the queue from async to keep `concurrency` amount items running + * per path. + * + * @param {function(Error)} callback + */ + execute (callback) { + this.running = true + this.callbackFn = callback + this.fill() + } + + /** + * Add peers to the worker queue until there are enough to satisfy the + * worker queue concurrency. + * Note that we don't want to take any more than those required to satisfy + * concurrency from the peers-to-query queue, because we always want to + * query the closest peers to the key first, and new peers are continously + * being added to the peers-to-query queue. + */ + fill () { + // this.log('queue:fill') + + // Note: + // - queue.running(): number of items that are currently running + // - queue.length(): the number of items that are waiting to be run + while (this.queue.running() + this.queue.length() < this.concurrency && + this.path.peersToQuery.length > 0) { + this.queue.push(this.path.peersToQuery.dequeue()) + } + + if (this.queue.running() === 0 && this.queue.length() === 0 && this.path.peersToQuery.length < 1) { + this.log('queue is empty') + } + } + + /** + * Process the next peer in the queue + * + * @param {PeerId} peer + * @param {function(Error)} cb + * @returns {void} + */ + processNext (peer, cb) { + if (!this.running) { + return cb() + } + + // The paths must be disjoint, meaning that no two paths in the Query may + // traverse the same peer + if (this.run.peersSeen.has(peer)) { + return cb() + } + + // Check if we've queried enough peers already + this.run.continueQuerying(this, (err, continueQuerying) => { + if (!this.running) { + return cb() + } + + if (err) { + return cb(err) + } + + // If we've queried enough peers, bail out + if (!continueQuerying) { + return cb() + } + + // Check if another path has queried this peer in the mean time + if (this.run.peersSeen.has(peer)) { + return cb() + } + this.run.peersSeen.add(peer) + + // Execute the query on the next peer + // this.log('queue:work') + this.execQuery(peer, (err, state) => { + // Ignore response after worker killed + if (!this.running) { + return cb() + } + + // this.log('queue:work:done', err, state) + if (err) { + return cb(err) + } + + // If query is complete, stop all workers. + // Note: run.stop() calls stop() on all the workers, which kills the + // queue and calls callbackFn() + if (state && state.queryComplete) { + this.log('query:complete') + this.run.stop() + return cb() + } + + // If path is complete, just stop this worker. + // Note: this.stop() kills the queue and calls callbackFn() + if (state && state.pathComplete) { + this.stop() + return cb() + } + + // Otherwise, process next peer + cb() + }) + }) + } + + /** + * Execute a query on the next peer. + * + * @param {PeerId} peer + * @param {function(Error)} callback + * @returns {void} + * @private + */ + execQuery (peer, callback) { + this.path.queryFunc(peer, (err, res) => { + // If the run has completed, bail out + if (!this.running) { + return callback() + } + + if (err) { + this.run.errors.push(err) + return callback() + } + + // Add the peer to the closest peers we have successfully queried + this.run.peersQueried.add(peer, (err) => { + if (err) { + return callback(err) + } + + // If the query indicates that this path or the whole query is complete + // set the path result and bail out + if (res.pathComplete || res.queryComplete) { + this.path.res = res + return callback(null, { + pathComplete: res.pathComplete, + queryComplete: res.queryComplete + }) + } + + // If there are closer peers to query, add them to the queue + if (res.closerPeers && res.closerPeers.length > 0) { + return each(res.closerPeers, (closer, cb) => { + // don't add ourselves + if (this.dht._isSelf(closer.id)) { + return cb() + } + closer = this.dht.peerBook.put(closer) + this.dht._peerDiscovered(closer) + this.path.addPeerToQuery(closer.id, cb) + }, callback) + } + + callback() + }) + }) + } +} + +module.exports = WorkerQueue From a1f9e2bbdbab37331cc429e68c6b5a16eb3bd7d8 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Thu, 25 Apr 2019 15:03:06 +0200 Subject: [PATCH 06/20] test: add test to verify paths stop --- test/query/index.spec.js | 110 +++++++++++++++++++++++++++ test/utils/create-disjoint-tracks.js | 36 +-------- test/utils/create-peer-info.js | 6 +- test/utils/index.js | 25 ++++++ 4 files changed, 144 insertions(+), 33 deletions(-) create mode 100644 test/query/index.spec.js create mode 100644 test/utils/index.js diff --git a/test/query/index.spec.js b/test/query/index.spec.js new file mode 100644 index 00000000..56656638 --- /dev/null +++ b/test/query/index.spec.js @@ -0,0 +1,110 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +chai.use(require('chai-checkmark')) +const expect = chai.expect +const sinon = require('sinon') +const each = require('async/each') + +const Query = require('../../src/query') +const Path = require('../../src/query/path') +const Run = require('../../src/query/run') +const DHT = require('../../src') +const createPeerInfo = require('../utils/create-peer-info') +const { sortClosestPeerInfos } = require('../utils') +const { convertBuffer } = require('../../src/utils') +const NUM_IDS = 101 + +describe('Query', () => { + let peers_all + let ourPeerInfo + before((done) => { + createPeerInfo(NUM_IDS, (err, peers) => { + ourPeerInfo = peers.shift() + peers_all = peers + done(err) + }) + }) + + describe('get closest peers', () => { + let targetKey = { + key: Buffer.from('A key to find'), + dhtKey: null + } + let sortedPeers + let dht + + before('get sorted peers', (done) => { + convertBuffer(targetKey.key, (err, dhtKey) => { + if (err) return done(err) + targetKey.dhtKey = dhtKey + + sortClosestPeerInfos(peers_all, targetKey.dhtKey, (err, peers) => { + sortedPeers = peers + done(err) + }) + }) + }) + + before('create a dht', () => { + dht = new DHT({ + _peerInfo: ourPeerInfo + }) + }) + + afterEach(() => { + sinon.restore() + }) + + it('should end paths when they have no closer peers to whats already been queried', (done) => { + const PATHS = 5 + sinon.stub(dht, 'disjointPaths').value(PATHS) + sinon.stub(dht._queryManager, 'running').value(true) + let querySpy = sinon.stub().callsArgWith(1, null, {}) + + let query = new Query(dht, targetKey.key, () => querySpy) + + let run = new Run(query) + run.init(() => { + // Add the sorted peers into 5 paths. This will weight + // the paths with increasingly further peers + let sortedPeerIds = sortedPeers.map(peerInfo => peerInfo.id) + let peersPerPath = sortedPeerIds.length / PATHS + let paths = [...new Array(PATHS)].map((_, index) => { + let path = new Path(run, query.makePath()) + let start = index * peersPerPath + let peers = sortedPeerIds.slice(start, start + peersPerPath) + peers.forEach(p => path.addInitialPeer(p)) + return path + }) + + // Get the peers of the 2nd closest path, and remove the path + // We don't want to execute it. Just add its peers to peers we've + // already queried. + let queriedPeers = paths.splice(1, 1)[0].initialPeers + each(queriedPeers, (peerId, cb) => { + run.peersQueried.add(peerId, cb) + }, (err) => { + if (err) return done(err) + + // Run the 4 paths + run.executePaths(paths, (err) => { + expect(err).to.not.exist() + // The resulting peers should all be from path 0 as it had the closest + expect(run.peersQueried.peers).to.eql(paths[0].initialPeers) + // The query should ONLY have been called on path 0 as it + // was the only path to contain closer peers that what we + // pre populated `run.peersQueried` with + expect(querySpy.callCount).to.eql(20) + const queriedPeers = querySpy.getCalls().map(call => call.args[0]) + expect(queriedPeers).to.eql(paths[0].initialPeers) + done() + }) + }) + }) + }) + }) +}) + diff --git a/test/utils/create-disjoint-tracks.js b/test/utils/create-disjoint-tracks.js index 8bec7db8..fa578f0a 100644 --- a/test/utils/create-disjoint-tracks.js +++ b/test/utils/create-disjoint-tracks.js @@ -1,38 +1,10 @@ 'use strict' -const multihashing = require('multihashing-async') -const distance = require('xor-distance') const waterfall = require('async/waterfall') -const map = require('async/map') - -function convertPeerId (peer, callback) { - multihashing.digest(peer.id, 'sha2-256', callback) -} - -function sortClosestPeers (peers, target, callback) { - map(peers, (peer, cb) => { - convertPeerId(peer, (err, id) => { - if (err) { - return cb(err) - } - - cb(null, { - peer: peer, - distance: distance(id, target) - }) - }) - }, (err, distances) => { - if (err) { - return callback(err) - } - - callback(null, distances.sort(xorCompare).map((d) => d.peer)) - }) -} - -function xorCompare (a, b) { - return distance.compare(a.distance, b.distance) -} +const { + convertPeerId, + sortClosestPeers +} = require('../../src/utils') /* * Given an array of peerInfos, decide on a target, start peers, and diff --git a/test/utils/create-peer-info.js b/test/utils/create-peer-info.js index 40973deb..b32afacc 100644 --- a/test/utils/create-peer-info.js +++ b/test/utils/create-peer-info.js @@ -4,7 +4,11 @@ const times = require('async/times') const PeerId = require('peer-id') const PeerInfo = require('peer-info') -// Creates multiple PeerInfos +/** + * Creates multiple PeerInfos + * @param {number} n The number of `PeerInfo` to create + * @param {function(Error, Array)} callback + */ function createPeerInfo (n, callback) { times(n, (i, cb) => PeerId.create({ bits: 512 }, cb), (err, ids) => { if (err) { return callback(err) } diff --git a/test/utils/index.js b/test/utils/index.js new file mode 100644 index 00000000..af96067c --- /dev/null +++ b/test/utils/index.js @@ -0,0 +1,25 @@ +'use strict' + +const { sortClosestPeers } = require('../../src/utils') + +/** + * Like `sortClosestPeers`, expect it takes and returns `PeerInfo`s + * + * @param {Array} peers + * @param {Buffer} target + * @param {function(Error, Array)} callback + * @returns {void} + */ +exports.sortClosestPeerInfos = (peers, target, callback) => { + sortClosestPeers(peers.map(peerInfo => peerInfo.id), target, (err, sortedPeerIds) => { + if (err) return callback(err) + + const sortedPeerInfos = sortedPeerIds.map((peerId) => { + return peers.find((peerInfo) => { + return peerInfo.id.isEqual(peerId) + }) + }) + + callback(null, sortedPeerInfos) + }) +} From e49045d525a467a70d33951a783b4b168e321170 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Thu, 25 Apr 2019 15:09:26 +0200 Subject: [PATCH 07/20] chore: fix linting --- src/query/run.js | 1 - test/query/index.spec.js | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/query/run.js b/src/query/run.js index df8f1da6..5e5a6e6e 100644 --- a/src/query/run.js +++ b/src/query/run.js @@ -198,7 +198,6 @@ class Run extends EventEmitter { // off the head of the closest peers queue so it will include that peer. const running = worker.queue.workersList().map(i => i.data) - // Check if any of the peers that are currently being queried are closer // to the key than the peers we've already queried this.peersQueried.anyCloser(running, (err, someCloser) => { diff --git a/test/query/index.spec.js b/test/query/index.spec.js index 56656638..c32c59bf 100644 --- a/test/query/index.spec.js +++ b/test/query/index.spec.js @@ -1,4 +1,5 @@ /* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 6] */ 'use strict' const chai = require('chai') @@ -18,12 +19,12 @@ const { convertBuffer } = require('../../src/utils') const NUM_IDS = 101 describe('Query', () => { - let peers_all + let peerInfos let ourPeerInfo before((done) => { createPeerInfo(NUM_IDS, (err, peers) => { ourPeerInfo = peers.shift() - peers_all = peers + peerInfos = peers done(err) }) }) @@ -41,7 +42,7 @@ describe('Query', () => { if (err) return done(err) targetKey.dhtKey = dhtKey - sortClosestPeerInfos(peers_all, targetKey.dhtKey, (err, peers) => { + sortClosestPeerInfos(peerInfos, targetKey.dhtKey, (err, peers) => { sortedPeers = peers done(err) }) @@ -107,4 +108,3 @@ describe('Query', () => { }) }) }) - From 2d3bacd315b1e6c2be1638273be6df9b71bb4571 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Thu, 25 Apr 2019 17:33:01 +0200 Subject: [PATCH 08/20] fix: make alpha 3 again, reduce rpc message timeout --- src/constants.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/constants.js b/src/constants.js index fa850687..e0afec7b 100644 --- a/src/constants.js +++ b/src/constants.js @@ -23,7 +23,7 @@ exports.PROVIDERS_VALIDITY = 24 * hour exports.PROVIDERS_CLEANUP_INTERVAL = hour -exports.READ_MESSAGE_TIMEOUT = 30 * second +exports.READ_MESSAGE_TIMEOUT = 10 * second // The number of records that will be retrieved on a call to getMany() exports.GET_MANY_RECORD_COUNT = 16 @@ -32,7 +32,7 @@ exports.GET_MANY_RECORD_COUNT = 16 exports.K = 20 // Alpha is the concurrency for asynchronous requests -exports.ALPHA = 6 +exports.ALPHA = 3 exports.maxMessageSize = 2 << 22 // 4MB From 81a558c5462cc523f30d9fe4e56be4ccbfbbe4fb Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Thu, 25 Apr 2019 17:33:57 +0200 Subject: [PATCH 09/20] refactor: remove unneeded peerBook.put and update jsdocs --- src/private.js | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/private.js b/src/private.js index ea7712d1..a8418a3e 100644 --- a/src/private.js +++ b/src/private.js @@ -171,7 +171,7 @@ module.exports = (dht) => ({ * * @param {Buffer} key * @param {PeerId} peer - * @param {function(Error)} callback + * @param {function(Error, Array)} callback * @returns {void} * * @private @@ -185,13 +185,12 @@ module.exports = (dht) => ({ const out = msg.closerPeers .filter((pInfo) => !dht._isSelf(pInfo.id)) - .map((pInfo) => dht.peerBook.put(pInfo)) callback(null, out) }) }, /** - * Is the given peer id the peer id? + * Is the given peer id our PeerId? * * @param {PeerId} other * @returns {bool} @@ -206,7 +205,7 @@ module.exports = (dht) => ({ * * @param {PeerId} peer * @param {PeerId} target - * @param {function(Error)} callback + * @param {function(Error, Message)} callback * @returns {void} * * @private From 75ec7551430fc4aef97569606ce49e4582ef361f Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Thu, 25 Apr 2019 17:36:26 +0200 Subject: [PATCH 10/20] fix: stop the worker if it is done querying --- src/query/workerQueue.js | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/src/query/workerQueue.js b/src/query/workerQueue.js index 5c98650e..837183a6 100644 --- a/src/query/workerQueue.js +++ b/src/query/workerQueue.js @@ -46,7 +46,7 @@ class WorkerQueue { // When a space opens up in the queue, add some more peers q.unsaturated = () => { if (this.running) { - // this.log('queue:unsaturated') + this.log('queue:unsaturated') this.fill() } } @@ -63,13 +63,13 @@ class WorkerQueue { * @param {Error} err */ stop (err) { - this.log('worker:stop') if (!this.running) { return } this.running = false this.queue.kill() + this.log('worker:stop, %d workers still running', this.run.workers.filter(w => w.running).length) this.callbackFn(err) } @@ -94,7 +94,7 @@ class WorkerQueue { * being added to the peers-to-query queue. */ fill () { - // this.log('queue:fill') + this.log('queue:fill') // Note: // - queue.running(): number of items that are currently running @@ -103,10 +103,6 @@ class WorkerQueue { this.path.peersToQuery.length > 0) { this.queue.push(this.path.peersToQuery.dequeue()) } - - if (this.queue.running() === 0 && this.queue.length() === 0 && this.path.peersToQuery.length < 1) { - this.log('queue is empty') - } } /** @@ -137,8 +133,9 @@ class WorkerQueue { return cb(err) } - // If we've queried enough peers, bail out + // If we've queried enough peers, stop the queue if (!continueQuerying) { + this.stop() return cb() } @@ -149,14 +146,14 @@ class WorkerQueue { this.run.peersSeen.add(peer) // Execute the query on the next peer - // this.log('queue:work') + this.log('queue:work') this.execQuery(peer, (err, state) => { // Ignore response after worker killed if (!this.running) { return cb() } - // this.log('queue:work:done', err, state) + this.log('queue:work:done', err, state) if (err) { return cb(err) } From e21967fa5dac233a9c5731674aac979fe8b0b69a Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Thu, 25 Apr 2019 17:37:57 +0200 Subject: [PATCH 11/20] test: add spy on continueQuering to its call count --- test/query/index.spec.js | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/test/query/index.spec.js b/test/query/index.spec.js index c32c59bf..2f50c9b6 100644 --- a/test/query/index.spec.js +++ b/test/query/index.spec.js @@ -13,6 +13,7 @@ const Query = require('../../src/query') const Path = require('../../src/query/path') const Run = require('../../src/query/run') const DHT = require('../../src') +const c = require('../../src/constants') const createPeerInfo = require('../utils/create-peer-info') const { sortClosestPeerInfos } = require('../utils') const { convertBuffer } = require('../../src/utils') @@ -90,15 +91,22 @@ describe('Query', () => { }, (err) => { if (err) return done(err) + const continueSpy = sinon.spy(run, 'continueQuerying') + // Run the 4 paths run.executePaths(paths, (err) => { expect(err).to.not.exist() // The resulting peers should all be from path 0 as it had the closest expect(run.peersQueried.peers).to.eql(paths[0].initialPeers) + + // Continue should be called on all `peersPerPath` queries of the first path, + // plus ALPHA (concurrency) of the other 3 paths + expect(continueSpy.callCount).to.eql(peersPerPath + (3 * c.ALPHA)) + // The query should ONLY have been called on path 0 as it // was the only path to contain closer peers that what we // pre populated `run.peersQueried` with - expect(querySpy.callCount).to.eql(20) + expect(querySpy.callCount).to.eql(peersPerPath) const queriedPeers = querySpy.getCalls().map(call => call.args[0]) expect(queriedPeers).to.eql(paths[0].initialPeers) done() From 909f44c6adbd50ba4a2dcac50928b57c1f5aa088 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Thu, 25 Apr 2019 17:38:24 +0200 Subject: [PATCH 12/20] docs: update jsdocs on utils --- src/utils.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/utils.js b/src/utils.js index 900cb139..d053dc7f 100644 --- a/src/utils.js +++ b/src/utils.js @@ -94,11 +94,11 @@ exports.decodeBase32 = (raw) => { } /** - * Sort peers by distance to the given `id`. + * Sort peers by distance to the given `target`. * * @param {Array} peers * @param {Buffer} target - * @param {function(Error, )} callback + * @param {function(Error, Array)} callback * @returns {void} */ exports.sortClosestPeers = (peers, target, callback) => { From a0eab2cf2d768c0f3a68b463729f2b1e67205dff Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Fri, 26 Apr 2019 10:43:25 +0200 Subject: [PATCH 13/20] fix: make queries quicker but sloppier --- src/query/workerQueue.js | 7 ++--- test/query/index.spec.js | 62 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 64 insertions(+), 5 deletions(-) diff --git a/src/query/workerQueue.js b/src/query/workerQueue.js index 837183a6..658cb40f 100644 --- a/src/query/workerQueue.js +++ b/src/query/workerQueue.js @@ -46,7 +46,6 @@ class WorkerQueue { // When a space opens up in the queue, add some more peers q.unsaturated = () => { if (this.running) { - this.log('queue:unsaturated') this.fill() } } @@ -94,8 +93,6 @@ class WorkerQueue { * being added to the peers-to-query queue. */ fill () { - this.log('queue:fill') - // Note: // - queue.running(): number of items that are currently running // - queue.length(): the number of items that are waiting to be run @@ -133,7 +130,9 @@ class WorkerQueue { return cb(err) } - // If we've queried enough peers, stop the queue + // No peer we're querying is closer stop the queue + // This will cause queries that may potentially result in + // closer nodes to be ended, but it reduces overall query time if (!continueQuerying) { this.stop() return cb() diff --git a/test/query/index.spec.js b/test/query/index.spec.js index 2f50c9b6..9cbe7793 100644 --- a/test/query/index.spec.js +++ b/test/query/index.spec.js @@ -8,6 +8,7 @@ chai.use(require('chai-checkmark')) const expect = chai.expect const sinon = require('sinon') const each = require('async/each') +const PeerBook = require('peer-book') const Query = require('../../src/query') const Path = require('../../src/query/path') @@ -52,7 +53,8 @@ describe('Query', () => { before('create a dht', () => { dht = new DHT({ - _peerInfo: ourPeerInfo + _peerInfo: ourPeerInfo, + _peerBook: new PeerBook() }) }) @@ -114,5 +116,63 @@ describe('Query', () => { }) }) }) + + it('should continue querying if the path has a closer peer', (done) => { + sinon.stub(dht, 'disjointPaths').value(1) + sinon.stub(dht._queryManager, 'running').value(true) + + let querySpy = sinon.stub().callsArgWith(1, null, {}) + let query = new Query(dht, targetKey.key, () => querySpy) + + let run = new Run(query) + run.init(() => { + let sortedPeerIds = sortedPeers.map(peerInfo => peerInfo.id) + + // Take the top 15 peers and peers 20 - 25 to seed `run.peersQueried` + // This leaves us with only 16 - 19 as closer peers + let queriedPeers = [ + ...sortedPeerIds.slice(0, 15), + ...sortedPeerIds.slice(20, 25) + ] + + let path = new Path(run, query.makePath()) + // Give the path a closet peer and 15 further peers + let pathPeers = [ + ...sortedPeerIds.slice(15, 16), // 1 closer + ...sortedPeerIds.slice(80, 95) + ] + + pathPeers.forEach(p => path.addInitialPeer(p)) + const returnPeers = sortedPeers.slice(16, 20) + // When the second query happens, which is a further peer, + // return peers 16 - 19 + querySpy.onCall(1).callsArgWith(1, null, { + closerPeers: returnPeers + }) + + each(queriedPeers, (peerId, cb) => { + run.peersQueried.add(peerId, cb) + }, (err) => { + if (err) return done(err) + + // Run the path + run.executePaths([path], (err) => { + expect(err).to.not.exist() + + // Querying will stop after the first ALPHA peers are queried + expect(querySpy.callCount).to.eql(c.ALPHA) + + // We'll only get the 1 closest peer from `pathPeers`. + // The worker will be stopped before the `returnedPeers` + // are processed and queried. + expect(run.peersQueried.peers).to.eql([ + ...sortedPeerIds.slice(0, 16), + ...sortedPeerIds.slice(20, 24) + ]) + done() + }) + }) + }) + }) }) }) From 6e7ba6c72fd9c12c6465de5a2f7bfeb3f100a81c Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Fri, 26 Apr 2019 12:43:16 +0200 Subject: [PATCH 14/20] fix: dont error provide until all peers have been attempted --- src/index.js | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/index.js b/src/index.js index bb964506..398ed168 100644 --- a/src/index.js +++ b/src/index.js @@ -534,6 +534,7 @@ class KadDHT extends EventEmitter { provide (key, callback) { this._log('provide: %s', key.toBaseEncodedString()) + const errors = [] waterfall([ (cb) => this.providers.addProvider(key, this.peerInfo.id, cb), (cb) => this.getClosestPeers(key.buffer, cb), @@ -543,10 +544,21 @@ class KadDHT extends EventEmitter { each(peers, (peer, cb) => { this._log('putProvider %s to %s', key.toBaseEncodedString(), peer.toB58String()) - this.network.sendMessage(peer, msg, cb) + this.network.sendMessage(peer, msg, (err) => { + if (err) errors.push(err) + cb() + }) }, cb) } - ], (err) => callback(err)) + ], (err) => { + if (errors.length) { + // This should be infrequent. This means a peer we previously connected + // to failed to exchange the provide message. If getClosestPeers was an + // iterator, we could continue to pull until we announce to kBucketSize peers. + err = errcode(`Failed to provide to ${errors.length} of ${this.kBucketSize} peers`, 'ERR_SOME_PROVIDES_FAILED', { errors }) + } + callback(err) + }) } /** From d9831bb003d9aa4a0f2a3026882f5eeda282f44e Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Fri, 26 Apr 2019 13:16:11 +0200 Subject: [PATCH 15/20] fix: add timeout to query func --- src/query/path.js | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/query/path.js b/src/query/path.js index 77e9defc..8b9dba47 100644 --- a/src/query/path.js +++ b/src/query/path.js @@ -1,9 +1,16 @@ 'use strict' const each = require('async/each') +const timeout = require('async/timeout') const waterfall = require('async/waterfall') const PeerQueue = require('../peer-queue') +// TODO: Temporary until parallel dial in Switch have a proper +// timeout. Requires async/await refactor of transports and +// dial abort logic. This gives us 30s to complete the `queryFunc`. +// This should help reduce the high end call times of queries +const QUERY_FUNC_TIMEOUT = 30e3 + /** * Manages a single Path through the DHT. */ @@ -16,8 +23,17 @@ class Path { */ constructor (run, queryFunc) { this.run = run - this.queryFunc = queryFunc + this.queryFunc = timeout(queryFunc, QUERY_FUNC_TIMEOUT) + + /** + * @type {Array} + */ this.initialPeers = [] + + /** + * @type {PeerQueue} + */ + this.peersToQuery = null } /** From 61a8e2622cc034fe0ac84f9f9e65cc3b3c88560c Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Fri, 26 Apr 2019 19:42:23 +0200 Subject: [PATCH 16/20] test: add a simulation --- package.json | 3 +- test/simulation/README.md | 12 +++ test/simulation/index.js | 205 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 219 insertions(+), 1 deletion(-) create mode 100644 test/simulation/README.md create mode 100644 test/simulation/index.js diff --git a/package.json b/package.json index 48864ee7..94fd6d40 100644 --- a/package.json +++ b/package.json @@ -14,7 +14,8 @@ "release-minor": "aegir release --type minor --docs -t node", "release-major": "aegir release --type major --docs -t node", "coverage": "aegir coverage", - "coverage-publish": "aegir-coverage publish" + "coverage-publish": "aegir-coverage publish", + "sim": "node test/simulation/index.js" }, "pre-push": [ "lint", diff --git a/test/simulation/README.md b/test/simulation/README.md new file mode 100644 index 00000000..69d13111 --- /dev/null +++ b/test/simulation/README.md @@ -0,0 +1,12 @@ +# Simulation + +This simulation code is designed to allow us to test DHT behaviors without hitting the network. + +## Run +The simulation can be run from project root via the npm `sim` script: + +```bash +npm run sim +``` + +There is a `VERBOSE` property that can be set to `true`for additional logging. This should probably become a flag if the sim is expanded. diff --git a/test/simulation/index.js b/test/simulation/index.js new file mode 100644 index 00000000..e247699f --- /dev/null +++ b/test/simulation/index.js @@ -0,0 +1,205 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 6] */ +'use strict' +const { promisify } = require('util') +const PeerBook = require('peer-book') +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') +const multihashes = require('multihashes') + +const RoutingTable = require('../../src/routing') +const Message = require('../../src/message') +const utils = require('../../src/utils') +const testUtils = require('../../test/utils') +const DHT = require('../../src') + +const convertBuffer = promisify(utils.convertBuffer) +const sortClosestPeerInfos = promisify(testUtils.sortClosestPeerInfos) + +const NUM_PEERS = 10e3 // Peers to create, not including us +const LATENCY_DEAD_NODE = 120e3 // How long dead nodes should take before erroring +const NUM_DEAD_NODES = Math.floor(NUM_PEERS * 0.3) // 30% undialable +const MAX_PEERS_KNOWN = Math.min(500, NUM_PEERS) // max number of peers a node should be aware of (capped at NUM_PEERS) +const MIN_PEERS_KNOWN = 10 // min number of peers a node should be aware of +const LATENCY_MIN = 100 // min time a good peer should take to respond +const LATENCY_MAX = 10e3 // max time a good peer should take to respond +const KValue = 20 // k Bucket size +const QUERY_KEY = Buffer.from('a key to search for') +const RUNS = 3 // How many times the simulation should run +const VERBOSE = false // If true, some additional logs will run + +let dhtKey +let network +let peers +let ourPeerInfo + +// Execute the simulation +;(async () => { + console.log('Starting setup...') + await setup() + + console.log('Total Nodes=%d, Dead Nodes=%d, Max Siblings per Peer=%d', NUM_PEERS, NUM_DEAD_NODES, MAX_PEERS_KNOWN) + console.log('Starting %d runs...', RUNS) + for (var i = 0; i < RUNS; i++) { + await GetClosestPeersSimulation() + } + + process.exit() +})() + +/** + * Setup the data for the test + */ +async function setup () { + dhtKey = await convertBuffer(QUERY_KEY) + peers = await createPeers(NUM_PEERS + 1) + ourPeerInfo = peers.shift() + + // Create the network + network = await MockNetwork(peers) +} + +async function GetClosestPeersSimulation () { + const dht = new DHT({ + _peerInfo: ourPeerInfo, + _peerBook: new PeerBook(), + handle: () => {}, + on: () => {} + }, { + kBucketSize: KValue, + randomWalk: { + enabled: false + } + }) + + // Add random peers to our table + let ourPeers = randomMembers(peers, randomInteger(MIN_PEERS_KNOWN, MAX_PEERS_KNOWN)) + for (const peer of ourPeers) { + await promisify((peer, callback) => dht._add(peer, callback))(peer) + } + + dht.network.sendRequest = (to, message, callback) => { + const networkPeer = network.peers[to.toB58String()] + let response = null + + if (networkPeer.routingTable) { + response = new Message(message.type, Buffer.alloc(0), message.clusterLevel) + response.closerPeers = networkPeer.routingTable.closestPeers(dhtKey, KValue).map(peerId => { + return new PeerInfo(peerId) + }) + } + + VERBOSE && console.log(`sendRequest latency:${networkPeer.latency} peerId:${to.toB58String()} closestPeers:${response ? response.closerPeers.length : null}`) + + return setTimeout(() => { + if (response) { + return callback(null, response) + } + callback(new Error('ERR_TIMEOUT')) + }, networkPeer.latency) + } + + // Start the dht + await promisify((callback) => dht.start(callback))() + + const startTime = Date.now() + const closestPeers = await new Promise((resolve, reject) => { + dht.getClosestPeers(QUERY_KEY, (err, res) => { + if (err) return reject(err) + resolve(res) + }) + }) + const runTime = Date.now() - startTime + + const actualClosest = await sortClosestPeerInfos(peers, dhtKey) + + const foundIds = closestPeers.map(peerId => peerId.toB58String()) + const topIds = actualClosest.slice(0, 20).map(peerInfo => peerInfo.id.toB58String()) + + const intersection = foundIds.filter(value => topIds.includes(value)) + + console.log('Found %d of the top %d peers in %d ms', intersection.length, KValue, runTime) +} + +/** + * Create `num` PeerInfos + * @param {integer} num How many peers to create + * @returns {Array} + */ +function createPeers (num) { + const crypto = require('crypto') + const peers = [...new Array(num)].map(() => { + return new PeerInfo( + PeerId.createFromB58String( + multihashes.toB58String(crypto.randomBytes(34)) + ) + ) + }) + + return peers +} + +/** + * Creates a mock network + * @param {Array} peers + * @returns {Network} + */ +async function MockNetwork (peers) { + let network = { + peers: {} + } + + // Make nodes dead + for (const peer of peers.slice(0, NUM_DEAD_NODES)) { + network.peers[peer.id.toB58String()] = { + latency: LATENCY_DEAD_NODE + } + } + + // Give the remaining nodes: + for (const peer of peers.slice(NUM_DEAD_NODES)) { + let netPeer = network.peers[peer.id.toB58String()] = { + // dial latency + latency: randomInteger(LATENCY_MIN, LATENCY_MAX), + // random sibling peers from the full list + routingTable: new RoutingTable(peer.id, KValue) + } + const siblings = randomMembers(peers, randomInteger(MIN_PEERS_KNOWN, MAX_PEERS_KNOWN)) + for (const peer of siblings) { + await promisify((callback) => netPeer.routingTable.add(peer.id, callback))() + } + } + + return network +} + +/** + * Returns a random integer between `min` and `max` + * @param {number} min + * @param {number} max + * @returns {int} + */ +function randomInteger (min, max) { + return Math.floor(Math.random() * (max - min)) + min +} + +/** + * Return a unique array of random `num` members from `list` + * @param {Array} list array to pull random members from + * @param {number} num number of random members to get + * @returns {Array} + */ +function randomMembers (list, num) { + let randomMembers = [] + + if (list.length < num) throw new Error(`cant get random members, ${num} is less than ${list.length}`) + + while (randomMembers.length < num) { + const randomMember = list[Math.floor(Math.random() * list.length)] + if (!randomMembers.includes(randomMember)) { + randomMembers.push(randomMember) + } + } + + return randomMembers +} From 1e63a30287ea130bfa236ea63c088b9ef437e01b Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Sat, 27 Apr 2019 12:34:39 +0200 Subject: [PATCH 17/20] feat: allow configuration of alpha concurrency --- src/index.js | 7 +++++++ src/query/index.js | 3 +-- src/query/workerQueue.js | 3 +-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/index.js b/src/index.js index 398ed168..db1139c7 100644 --- a/src/index.js +++ b/src/index.js @@ -48,6 +48,7 @@ class KadDHT extends EventEmitter { * @param {Switch} sw libp2p-switch instance * @param {object} options DHT options * @param {number} options.kBucketSize k-bucket size (default 20) + * @param {number} options.concurrency alpha concurrency of queries (default 3) * @param {Datastore} options.datastore datastore (default MemoryDatastore) * @param {object} options.validators validators object with namespace as keys and function(key, record, callback) * @param {object} options.selectors selectors object with namespace as keys and function(key, records) @@ -74,6 +75,12 @@ class KadDHT extends EventEmitter { */ this.kBucketSize = options.kBucketSize || c.K + /** + * ALPHA concurrency at which each query path with run, defaults to 6 + * @type {number} + */ + this.concurrency = options.concurrency || c.ALPHA + /** * Number of disjoint query paths to use * This is set to `kBucketSize`/2 per the S/Kademlia paper diff --git a/src/query/index.js b/src/query/index.js index 5275bf48..ebf9ac8b 100644 --- a/src/query/index.js +++ b/src/query/index.js @@ -2,7 +2,6 @@ const mh = require('multihashes') -const c = require('../constants') const utils = require('../utils') const Run = require('./run') @@ -67,7 +66,7 @@ class Query { this.run = new Run(this) - this._log(`query running with K=${this.dht.kBucketSize}, A=${c.ALPHA}, D=${Math.min(this.dht.disjointPaths, peers.length)}`) + this._log(`query running with K=${this.dht.kBucketSize}, A=${this.dht.concurrency}, D=${Math.min(this.dht.disjointPaths, peers.length)}`) this.run.once('start', this._onStart) this.run.once('complete', this._onComplete) this.run.execute(peers, callback) diff --git a/src/query/workerQueue.js b/src/query/workerQueue.js index 658cb40f..d1cdfc98 100644 --- a/src/query/workerQueue.js +++ b/src/query/workerQueue.js @@ -2,7 +2,6 @@ const each = require('async/each') const queue = require('async/queue') -const c = require('../constants') class WorkerQueue { /** @@ -19,7 +18,7 @@ class WorkerQueue { this.path = path this.log = log - this.concurrency = c.ALPHA + this.concurrency = this.dht.concurrency this.queue = this.setupQueue() } From a1c989015f834e811d9bd6f122c39f8e57be99d4 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Sat, 27 Apr 2019 12:35:08 +0200 Subject: [PATCH 18/20] test(sim): add common peer check --- test/simulation/index.js | 58 +++++++++++++++++++++++++++++++++------- 1 file changed, 48 insertions(+), 10 deletions(-) diff --git a/test/simulation/index.js b/test/simulation/index.js index e247699f..b3fd6e6c 100644 --- a/test/simulation/index.js +++ b/test/simulation/index.js @@ -24,6 +24,7 @@ const MIN_PEERS_KNOWN = 10 // min number of peers a node should be aware of const LATENCY_MIN = 100 // min time a good peer should take to respond const LATENCY_MAX = 10e3 // max time a good peer should take to respond const KValue = 20 // k Bucket size +const ALPHA = 6 // alpha concurrency const QUERY_KEY = Buffer.from('a key to search for') const RUNS = 3 // How many times the simulation should run const VERBOSE = false // If true, some additional logs will run @@ -32,18 +33,33 @@ let dhtKey let network let peers let ourPeerInfo +let sortedPeers // Peers in the network sorted by closeness to QUERY_KEY +let topIds // Closest 20 peerIds in the network // Execute the simulation ;(async () => { console.log('Starting setup...') await setup() + sortedPeers = await sortClosestPeerInfos(peers, dhtKey) + topIds = sortedPeers.slice(0, 20).map(peerInfo => peerInfo.id.toB58String()) + const topIdFilter = (value) => topIds.includes(value) + console.log('Total Nodes=%d, Dead Nodes=%d, Max Siblings per Peer=%d', NUM_PEERS, NUM_DEAD_NODES, MAX_PEERS_KNOWN) - console.log('Starting %d runs...', RUNS) + console.log('Starting %d runs with concurrency %d...', RUNS, ALPHA) + let topRunIds = [] for (var i = 0; i < RUNS; i++) { - await GetClosestPeersSimulation() + const { closestPeers, runTime } = await GetClosestPeersSimulation() + const foundIds = closestPeers.map(peerId => peerId.toB58String()) + const intersection = foundIds.filter(topIdFilter) + topRunIds.push(intersection) + + console.log('Found %d of the top %d peers in %d ms', intersection.length, KValue, runTime) } + const commonTopIds = getCommonMembers(topRunIds) + console.log('All runs found %d common peers', commonTopIds.length) + process.exit() })() @@ -59,6 +75,15 @@ async function setup () { network = await MockNetwork(peers) } +/** + * @typedef ClosestPeersSimResult + * @property {Array} closestPeers + * @property {number} runTime Time in ms the query took + */ + +/** + * @returns {ClosestPeersSimResult} + */ async function GetClosestPeersSimulation () { const dht = new DHT({ _peerInfo: ourPeerInfo, @@ -67,6 +92,7 @@ async function GetClosestPeersSimulation () { on: () => {} }, { kBucketSize: KValue, + concurrency: ALPHA, randomWalk: { enabled: false } @@ -111,14 +137,7 @@ async function GetClosestPeersSimulation () { }) const runTime = Date.now() - startTime - const actualClosest = await sortClosestPeerInfos(peers, dhtKey) - - const foundIds = closestPeers.map(peerId => peerId.toB58String()) - const topIds = actualClosest.slice(0, 20).map(peerInfo => peerInfo.id.toB58String()) - - const intersection = foundIds.filter(value => topIds.includes(value)) - - console.log('Found %d of the top %d peers in %d ms', intersection.length, KValue, runTime) + return { closestPeers, runTime } } /** @@ -203,3 +222,22 @@ function randomMembers (list, num) { return randomMembers } + +/** + * Finds the common members of all arrays + * @param {Array} arrays An array of arrays to find common members + * @returns {Array} + */ +function getCommonMembers (arrays) { + return arrays.shift().reduce(function (accumulator, val1) { + if (accumulator.indexOf(val1) === -1 && + arrays.every(function (val2) { + return val2.indexOf(val1) !== -1 + }) + ) { + accumulator.push(val1) + } + + return accumulator + }, []) +} From fa203c733f1bd2f928c393696db1b1d473272463 Mon Sep 17 00:00:00 2001 From: dirkmc Date: Mon, 29 Apr 2019 17:19:08 +0200 Subject: [PATCH 19/20] fix: apply suggestions from code review Co-Authored-By: jacobheun --- src/index.js | 2 +- src/query/run.js | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/index.js b/src/index.js index db1139c7..9999eb16 100644 --- a/src/index.js +++ b/src/index.js @@ -76,7 +76,7 @@ class KadDHT extends EventEmitter { this.kBucketSize = options.kBucketSize || c.K /** - * ALPHA concurrency at which each query path with run, defaults to 6 + * ALPHA concurrency at which each query path with run, defaults to 3 * @type {number} */ this.concurrency = options.concurrency || c.ALPHA diff --git a/src/query/run.js b/src/query/run.js index 5e5a6e6e..0b4240f0 100644 --- a/src/query/run.js +++ b/src/query/run.js @@ -68,7 +68,6 @@ class Run extends EventEmitter { }) // Execute the query along each disjoint path - // each(paths, (path, cb) => path.execute(cb), (err) => { this.executePaths(paths, (err) => { if (err) { return callback(err) From 9c886f105146b1943cc33f8a0908b43818f9266b Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 8 May 2019 11:11:29 +0200 Subject: [PATCH 20/20] fix: remove ncp in favor of kBucketSize --- src/index.js | 7 ------- src/private.js | 2 +- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/src/index.js b/src/index.js index 9999eb16..234b0ccc 100644 --- a/src/index.js +++ b/src/index.js @@ -88,13 +88,6 @@ class KadDHT extends EventEmitter { */ this.disjointPaths = Math.ceil(this.kBucketSize / 2) - /** - * Number of closest peers to return on kBucket search, default 20 - * - * @type {number} - */ - this.ncp = options.ncp || this.kBucketSize - /** * The routing table. * diff --git a/src/private.js b/src/private.js index a8418a3e..da1d66eb 100644 --- a/src/private.js +++ b/src/private.js @@ -34,7 +34,7 @@ module.exports = (dht) => ({ } let ids try { - ids = dht.routingTable.closestPeers(key, dht.ncp) + ids = dht.routingTable.closestPeers(key, dht.kBucketSize) } catch (err) { return callback(err) }