From 89f85bfa685976bb701c1501d4885778f3b59a0a Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Tue, 19 Nov 2019 09:42:32 -0600 Subject: [PATCH 1/3] refactor: async it --- package.json | 13 +- src/content-routing/index.js | 10 +- src/index.js | 67 +++++----- src/network.js | 141 ++++++++++----------- src/peer-routing/index.js | 24 ++-- src/query/path.js | 1 - src/query/workerQueue.js | 2 +- src/rpc/handlers/add-provider.js | 2 +- src/rpc/handlers/get-providers.js | 6 +- src/rpc/handlers/get-value.js | 4 +- src/rpc/index.js | 103 +++++++--------- src/utils.js | 10 ++ test/kad-dht.spec.js | 163 +++++++++++++++---------- test/multiple-nodes.spec.js | 46 ++++--- test/network.spec.js | 138 +++++++++++++-------- test/query.spec.js | 94 +++++++------- test/query/index.spec.js | 9 +- test/random-walk.spec.js | 8 +- test/rpc/handlers/add-provider.spec.js | 4 +- test/rpc/handlers/get-value.spec.js | 1 + test/rpc/handlers/put-value.spec.js | 2 +- test/rpc/index.spec.js | 103 +++++++++------- test/utils/index.js | 100 +++++++-------- test/utils/test-dht.js | 130 +++++++++++++++----- 24 files changed, 651 insertions(+), 530 deletions(-) diff --git a/package.json b/package.json index edefaebc..b2db184b 100644 --- a/package.json +++ b/package.json @@ -47,8 +47,11 @@ "hashlru": "^2.3.0", "heap": "~0.2.6", "interface-datastore": "~0.8.0", + "it-length-prefixed": "^3.0.0", + "it-pipe": "^1.1.0", "k-bucket": "^5.0.0", "libp2p-crypto": "~0.17.1", + "libp2p-interfaces": "^0.1.5", "libp2p-record": "~0.7.0", "multihashes": "~0.4.15", "multihashing-async": "~0.8.0", @@ -57,26 +60,22 @@ "p-queue": "^6.2.1", "p-timeout": "^3.2.0", "p-times": "^2.1.0", + "paramap-it": "^0.1.1", "peer-id": "~0.13.5", "peer-info": "~0.17.0", "promise-to-callback": "^1.0.0", - "promisify-es6": "^1.0.3", "protons": "^1.0.1", - "pull-length-prefixed": "^1.3.3", - "pull-stream": "^3.6.14", "varint": "^5.0.0", "xor-distance": "^2.0.0" }, "devDependencies": { "aegir": "^20.4.1", + "async-iterator-all": "^1.0.0", "chai": "^4.2.0", "datastore-level": "~0.12.1", "delay": "^4.3.0", "dirty-chai": "^2.0.1", - "interface-connection": "~0.3.3", - "libp2p-mplex": "~0.8.5", - "libp2p-switch": "~0.42.7", - "libp2p-tcp": "~0.13.0", + "it-pair": "^1.0.0", "lodash": "^4.17.11", "lodash.random": "^3.2.0", "lodash.range": "^3.2.0", diff --git a/src/content-routing/index.js b/src/content-routing/index.js index 6b95aaba..8f351574 100644 --- a/src/content-routing/index.js +++ b/src/content-routing/index.js @@ -82,10 +82,10 @@ module.exports = (dht) => { provs.forEach((id) => { let info - if (dht.peerBook.has(id)) { - info = dht.peerBook.get(id) + if (dht.peerStore.has(id)) { + info = dht.peerStore.get(id) } else { - info = dht.peerBook.put(new PeerInfo(id)) + info = dht.peerStore.put(new PeerInfo(id)) } out.push(info) }) @@ -110,7 +110,7 @@ module.exports = (dht) => { dht._log('(%s) found %s provider entries', dht.peerInfo.id.toB58String(), provs.length) provs.forEach((prov) => { - pathProviders.push(dht.peerBook.put(prov)) + pathProviders.push(dht.peerStore.put(prov)) }) // hooray we have all that we want @@ -131,7 +131,7 @@ module.exports = (dht) => { providerTimeout ) } catch (err) { - if (err !== pTimeout.TimeoutError) { + if (err.name !== pTimeout.TimeoutError.name) { throw err } } finally { diff --git a/src/index.js b/src/index.js index 97bb2469..fca9e5b6 100644 --- a/src/index.js +++ b/src/index.js @@ -40,9 +40,10 @@ class KadDHT extends EventEmitter { /** * Create a new KadDHT. * @param {Object} props - * @param {Switch} props.sw libp2p-switch instance + * @param {Dialer} props.dialer libp2p dialer instance * @param {PeerInfo} props.peerInfo peer's peerInfo - * @param {Object} props.registrar registrar for libp2p protocols + * @param {PeerStore} props.peerStore libp2p peerStore + * @param {Object} props.registrar libp2p registrar instance * @param {function} props.registrar.handle * @param {function} props.registrar.register * @param {function} props.registrar.unregister @@ -54,7 +55,10 @@ class KadDHT extends EventEmitter { * @param {randomWalkOptions} options.randomWalk randomWalk options */ constructor ({ - sw, + dialer, + peerInfo, + peerStore, + registrar, datastore = new MemoryDatastore(), kBucketSize = c.K, concurrency = c.ALPHA, @@ -63,14 +67,31 @@ class KadDHT extends EventEmitter { randomWalk = {} }) { super() - assert(sw, 'libp2p-kad-dht requires a instance of Switch') + assert(dialer, 'libp2p-kad-dht requires a instance of Dialer') /** - * Local reference to the libp2p-switch instance - * - * @type {Switch} + * Local reference to the libp2p dialer instance + * @type {Dialer} + */ + this.dialer = dialer + + /** + * Local peer info + * @type {PeerInfo} + */ + this.peerInfo = peerInfo + + /** + * Local peer info + * @type {PeerStore} + */ + this.peerStore = peerStore + + /** + * Local peer info + * @type {Registrar} */ - this.switch = sw + this.registrar = registrar /** * k-bucket size @@ -141,6 +162,8 @@ class KadDHT extends EventEmitter { */ this._queryManager = new QueryManager() + this._running = false + // DHT components this.contentFetching = contentFetching(this) this.contentRouting = contentRouting(this) @@ -155,22 +178,6 @@ class KadDHT extends EventEmitter { return this._running } - /** - * Local peer (yourself) - * @type {PeerInfo} - */ - get peerInfo () { - return this.switch._peerInfo - } - - /** - * Peerbook - * @type {PeerBook} - */ - get peerBook () { - return this.switch._peerBook - } - /** * Start listening to incoming connections. * @returns {Promise} @@ -312,10 +319,10 @@ class KadDHT extends EventEmitter { const ids = this.routingTable.closestPeers(key, this.kBucketSize) return ids.map((p) => { - if (this.peerBook.has(p)) { - return this.peerBook.get(p) + if (this.peerStore.has(p)) { + return this.peerStore.get(p) } - return this.peerBook.put(new PeerInfo(p)) + return this.peerStore.put(new PeerInfo(p)) }) } @@ -390,7 +397,7 @@ class KadDHT extends EventEmitter { } /** - * Add the peer to the routing table and update it in the peerbook. + * Add the peer to the routing table and update it in the peerStore. * * @param {PeerInfo} peer * @returns {Promise} @@ -398,7 +405,6 @@ class KadDHT extends EventEmitter { */ async _add (peer) { - peer = this.peerBook.put(peer) await this.routingTable.add(peer.id) } @@ -455,7 +461,7 @@ class KadDHT extends EventEmitter { * Query a particular peer for the value for the given key. * It will either return the value or a list of closer peers. * - * Note: The peerbook is updated with new addresses found for the given peer. + * Note: The peerStore is updated with new addresses found for the given peer. * * @param {PeerId} peer * @param {Buffer} key @@ -518,3 +524,4 @@ class KadDHT extends EventEmitter { } module.exports = KadDHT +module.exports.multicodec = c.PROTOCOL_DHT diff --git a/src/network.js b/src/network.js index f0963bf2..9a1020c5 100644 --- a/src/network.js +++ b/src/network.js @@ -1,11 +1,12 @@ 'use strict' -const pull = require('pull-stream') +const errcode = require('err-code') + +const pipe = require('it-pipe') +const lp = require('it-length-prefixed') const pTimeout = require('p-timeout') -const lp = require('pull-length-prefixed') -const promisify = require('promisify-es6') -const errcode = require('err-code') +const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology') const rpc = require('./rpc') const c = require('./constants') @@ -32,38 +33,46 @@ class Network { /** * Start the network. - * @returns {void} + * @returns {Promise} */ - start () { + async start () { if (this._running) { return } - // TODO add a way to check if switch has started or not + + // TODO remove: add a way to check if switch has started or not if (!this.dht.isStarted) { throw errcode(new Error('Can not start network'), 'ERR_CANNOT_START_NETWORK') } this._running = true - // handle incoming connections - this.dht.switch.handle(c.PROTOCOL_DHT, this._rpc) + // Incoming streams + this.dht.registrar.handle(c.PROTOCOL_DHT, this._rpc) - // handle new connections - this.dht.switch.on('peer-mux-established', this._onPeerConnected) + // register protocol with topology + const topology = new MulticodecTopology({ + multicodecs: c.PROTOCOL_DHT, + handlers: { + onConnect: this._onPeerConnected, + onDisconnect: () => {} + } + }) + this._registrarId = await this.dht.registrar.register(topology) } /** * Stop all network activity. - * @returns {void} + * @returns {Promise} */ - stop () { + async stop () { if (!this.dht.isStarted && !this.isStarted) { return } this._running = false - this.dht.switch.removeListener('peer-mux-established', this._onPeerConnected) - this.dht.switch.unhandle(c.PROTOCOL_DHT) + // unregister protocol and handlers + await this.dht.registrar.unregister(this._registrarId) } /** @@ -86,24 +95,18 @@ class Network { } /** - * Handle new connections in the switch. - * - * @param {PeerInfo} peer - * @returns {Promise} + * Registrar notifies a connection successfully with dht protocol. * @private + * @param {PeerInfo} peerInfo remote peer info + * @param {Connection} conn connection to the peer + * @returns {Promise} */ - async _onPeerConnected (peer) { - if (!this.isConnected) { - return this._log.error('Network is offline') - } + async _onPeerConnected (peerInfo, conn) { + await this.dht._add(peerInfo) + this._log('added to the routing table: %s', peerInfo.id.toB58String()) - const conn = await promisify(cb => this.dht.switch.dial(peer, c.PROTOCOL_DHT, cb))() - - // TODO: conn.close() - pull(pull.empty(), conn) - - await this.dht._add(peer) - this._log('added to the routing table: %s', peer.id.toB58String()) + // Open a stream with the connected peer + await conn.newStream(c.PROTOCOL_DHT) } /** @@ -111,7 +114,6 @@ class Network { * @async * @param {PeerId} to - The peer that should receive a message * @param {Message} msg - The message to send. - * @param {function(Error, Message)} callback * @returns {Promise} */ async sendRequest (to, msg) { @@ -122,8 +124,9 @@ class Network { this._log('sending to: %s', to.toB58String()) - const conn = await promisify(cb => this.dht.switch.dial(to, c.PROTOCOL_DHT, cb))() - return this._writeReadMessage(conn, msg.serialize()) + const { stream } = await this.dht.dialer.dialProtocol(to, c.PROTOCOL_DHT) + + return this._writeReadMessage(stream, msg.serialize()) } /** @@ -140,8 +143,8 @@ class Network { this._log('sending to: %s', to.toB58String()) - const conn = await promisify(cb => this.dht.switch.dial(to, c.PROTOCOL_DHT, cb))() - return this._writeMessage(conn, msg.serialize()) + const { stream } = await this.dht.dialer.dialProtocol(to, c.PROTOCOL_DHT) + return this._writeMessage(stream, msg.serialize()) } /** @@ -151,10 +154,10 @@ class Network { * * @param {Connection} conn - the connection to use * @param {Buffer} msg - the message to send - * @returns {Message} + * @returns {Promise} * @private */ - _writeReadMessage (conn, msg) { + async _writeReadMessage (conn, msg) { // eslint-disable-line require-await return pTimeout( writeReadMessage(conn, msg), this.readMessageTimeout @@ -170,47 +173,35 @@ class Network { * @private */ _writeMessage (conn, msg) { - return new Promise((resolve, reject) => { - pull( - pull.values([msg]), - lp.encode(), - conn, - pull.onEnd((err) => { - if (err) return reject(err) - resolve() - }) - ) - }) + return pipe( + [msg], + lp.encode(), + conn + ) } } -function writeReadMessage (conn, msg) { - return new Promise((resolve, reject) => { - pull( - pull.values([msg]), - lp.encode(), - conn, - pull.filter((msg) => msg.length < c.maxMessageSize), - lp.decode(), - pull.collect((err, res) => { - if (err) { - return reject(err) - } - if (res.length === 0) { - return reject(errcode(new Error('No message received'), 'ERR_NO_MESSAGE_RECEIVED')) - } - - let response - try { - response = Message.deserialize(res[0]) - } catch (err) { - return reject(errcode(err, 'ERR_FAILED_DESERIALIZE_RESPONSE')) - } - - resolve(response) - }) - ) - }) +async function writeReadMessage (conn, msg) { + const res = await pipe( + [msg], + lp.encode(), + conn, + utils.itFilter( + (msg) => msg.length < c.maxMessageSize + ), + lp.decode(), + async source => { + for await (const chunk of source) { + return chunk.slice() + } + } + ) + + if (res.length === 0) { + throw errcode(new Error('No message received'), 'ERR_NO_MESSAGE_RECEIVED') + } + + return Message.deserialize(res) } module.exports = Network diff --git a/src/peer-routing/index.js b/src/peer-routing/index.js index f6d5e3e6..12f82407 100644 --- a/src/peer-routing/index.js +++ b/src/peer-routing/index.js @@ -24,11 +24,11 @@ module.exports = (dht) => { dht._log('findPeerLocal %s', peer.toB58String()) const p = await dht.routingTable.find(peer) - if (!p || !dht.peerBook.has(p)) { + if (!p || !dht.peerStore.has(p)) { return } - return dht.peerBook.get(p) + return dht.peerStore.get(p) } /** @@ -57,7 +57,7 @@ module.exports = (dht) => { return msg.closerPeers .filter((pInfo) => !dht._isSelf(pInfo.id)) - .map((pInfo) => dht.peerBook.put(pInfo)) + .map((pInfo) => dht.peerStore.put(pInfo)) } /** @@ -128,9 +128,9 @@ module.exports = (dht) => { // sanity check const match = peers.find((p) => p.isEqual(id)) - if (match && dht.peerBook.has(id)) { - dht._log('found in peerbook') - return dht.peerBook.get(id) + if (match && dht.peerStore.has(id)) { + dht._log('found in peerStore') + return dht.peerStore.get(id) } // query the network @@ -169,7 +169,7 @@ module.exports = (dht) => { result.paths.forEach((result) => { if (result.success) { success = true - dht.peerBook.put(result.peer) + dht.peerStore.put(result.peer) } }) dht._log('findPeer %s: %s', id.toB58String(), success) @@ -177,7 +177,7 @@ module.exports = (dht) => { if (!success) { throw errcode(new Error('No peer found'), 'ERR_NOT_FOUND') } - return dht.peerBook.get(id) + return dht.peerStore.get(id) }, /** @@ -226,15 +226,15 @@ module.exports = (dht) => { // local check let info - if (dht.peerBook.has(peer)) { - info = dht.peerBook.get(peer) + if (dht.peerStore.has(peer)) { + info = dht.peerStore.get(peer) if (info && info.id.pubKey) { dht._log('getPublicKey: found local copy') return info.id.pubKey } } else { - info = dht.peerBook.put(new PeerInfo(peer)) + info = dht.peerStore.put(new PeerInfo(peer)) } // try the node directly @@ -249,7 +249,7 @@ module.exports = (dht) => { } info.id = new PeerId(peer.id, null, pk) - dht.peerBook.put(info) + dht.peerStore.put(info) return pk } diff --git a/src/query/path.js b/src/query/path.js index 4a215fd9..8da48775 100644 --- a/src/query/path.js +++ b/src/query/path.js @@ -38,7 +38,6 @@ class Path { /** * Add a peer to the set of peers that are used to intialize the path. - * * @param {PeerId} peer */ addInitialPeer (peer) { diff --git a/src/query/workerQueue.js b/src/query/workerQueue.js index 3eb91d08..f03f9533 100644 --- a/src/query/workerQueue.js +++ b/src/query/workerQueue.js @@ -244,7 +244,7 @@ class WorkerQueue { if (this.dht._isSelf(closer.id)) { return } - closer = this.dht.peerBook.put(closer) + closer = this.dht.peerStore.put(closer) this.dht._peerDiscovered(closer) await this.path.addPeerToQuery(closer.id) })) diff --git a/src/rpc/handlers/add-provider.js b/src/rpc/handlers/add-provider.js index ec3a6dd9..a95961d2 100644 --- a/src/rpc/handlers/add-provider.js +++ b/src/rpc/handlers/add-provider.js @@ -44,7 +44,7 @@ module.exports = (dht) => { log('received provider %s for %s (addrs %s)', peer.id.toB58String(), cid.toBaseEncodedString(), pi.multiaddrs.toArray().map((m) => m.toString())) if (!dht._isSelf(pi.id)) { - dht.peerBook.put(pi) + dht.peerStore.put(pi) return dht.providers.addProvider(cid, pi.id) } }) diff --git a/src/rpc/handlers/get-providers.js b/src/rpc/handlers/get-providers.js index d8cfd984..fc5208a2 100644 --- a/src/rpc/handlers/get-providers.js +++ b/src/rpc/handlers/get-providers.js @@ -35,11 +35,11 @@ module.exports = (dht) => { ]) const providers = peers.map((p) => { - if (dht.peerBook.has(p)) { - return dht.peerBook.get(p) + if (dht.peerStore.has(p)) { + return dht.peerStore.get(p) } - return dht.peerBook.put(new PeerInfo(p)) + return dht.peerStore.put(new PeerInfo(p)) }) if (has) { diff --git a/src/rpc/handlers/get-value.js b/src/rpc/handlers/get-value.js index f0f8729c..ee9524a1 100644 --- a/src/rpc/handlers/get-value.js +++ b/src/rpc/handlers/get-value.js @@ -35,8 +35,8 @@ module.exports = (dht) => { if (dht._isSelf(id)) { info = dht.peerInfo - } else if (dht.peerBook.has(id)) { - info = dht.peerBook.get(id) + } else if (dht.peerStore.has(id)) { + info = dht.peerStore.get(id) } if (info && info.id.pubKey) { diff --git a/src/rpc/index.js b/src/rpc/index.js index d7cb8846..cdac664c 100644 --- a/src/rpc/index.js +++ b/src/rpc/index.js @@ -1,7 +1,9 @@ 'use strict' -const pull = require('pull-stream') -const lp = require('pull-length-prefixed') +const pipe = require('it-pipe') +const lp = require('it-length-prefixed') +const paramap = require('paramap-it') +const PeerInfo = require('peer-info') const Message = require('../message') const handlers = require('./handlers') @@ -14,14 +16,16 @@ module.exports = (dht) => { /** * Process incoming DHT messages. - * * @param {PeerInfo} peer * @param {Message} msg * @returns {Promise} * * @private */ - async function handleMessage (peer, msg) { + async function handleMessage (peer, msg) { // eslint-disable-line + // get handler & exectue it + const handler = getMessageHandler(msg.type) + try { await dht._add(peer) } catch (err) { @@ -29,9 +33,6 @@ module.exports = (dht) => { log.error(err) } - // get handler & exectue it - const handler = getMessageHandler(msg.type) - if (!handler) { log.error(`no handler found for message type: ${msg.type}`) return @@ -41,63 +42,43 @@ module.exports = (dht) => { } /** - * Handle incoming streams from the Switch, on the dht protocol. - * - * @param {string} protocol - * @param {Connection} conn - * @returns {void} + * Handle incoming streams on the dht protocol. + * @param {Object} props + * @param {string} props.protocol + * @param {DuplexStream} props.stream + * @param {Connection} props.connection connection + * @returns {Promise} */ - return function protocolHandler (protocol, conn) { - conn.getPeerInfo((err, peer) => { - if (err) { - log.error('Failed to get peer info') - log.error(err) - return - } + return async function onIncomingStream ({ protocol, stream, connection }) { + const peerInfo = await PeerInfo.create(connection.remotePeer) + peerInfo.protocols.add(protocol) - log('from: %s', peer.id.toB58String()) + try { + await dht._add(peerInfo) + } catch (err) { + log.error(err) + } - pull( - conn, - lp.decode(), - pull.filter((msg) => msg.length < c.maxMessageSize), - pull.map((rawMsg) => { - let msg - try { - msg = Message.deserialize(rawMsg) - } catch (err) { - log.error('failed to read incoming message', err) - return - } + const idB58Str = peerInfo.id.toB58String() + log('from: %s', idB58Str) - return msg - }), - pull.filter(Boolean), - pull.asyncMap(async (msg, cb) => { - let response - try { - response = await handleMessage(peer, msg) - } catch (err) { - cb(err) - } - cb(null, response) - }), - // Not all handlers will return a response - pull.filter(Boolean), - pull.map((response) => { - let msg - try { - msg = response.serialize() - } catch (err) { - log.error('failed to send message', err) - return - } - return msg - }), - pull.filter(Boolean), - lp.encode(), - conn - ) - }) + await pipe( + stream.source, + lp.decode(), + utils.itFilter( + (msg) => msg.length < c.maxMessageSize + ), + source => paramap(source, rawMsg => { + const msg = Message.deserialize(rawMsg.slice()) + return handleMessage(peerInfo, msg) + }), + // Not all handlers will return a response + utils.itFilter(Boolean), + source => paramap(source, response => { + return response.serialize() + }), + lp.encode(), + stream.sink + ) } } diff --git a/src/utils.js b/src/utils.js index 53d1477c..20fa015d 100644 --- a/src/utils.js +++ b/src/utils.js @@ -11,6 +11,16 @@ const { Record } = require('libp2p-record') const PeerId = require('peer-id') const errcode = require('err-code') +exports.itFilter = function (predicate) { + return source => (async function * () { + for await (const msg of source) { + if (predicate(msg)) { + yield msg + } + } + })() +} + /** * Creates a DHT ID by hashing a given buffer. * diff --git a/test/kad-dht.spec.js b/test/kad-dht.spec.js index c0e2709c..66ef978f 100644 --- a/test/kad-dht.spec.js +++ b/test/kad-dht.spec.js @@ -20,11 +20,7 @@ const Message = require('../src/message') const createPeerInfo = require('./utils/create-peer-info') const createValues = require('./utils/create-values') const TestDHT = require('./utils/test-dht') -const { - connect, - countDiffPeers, - createDHT -} = require('./utils') +const { countDiffPeers } = require('./utils') describe('KadDHT', () => { let peerInfos @@ -43,18 +39,28 @@ describe('KadDHT', () => { }) describe('create', () => { - it('simple', () => { - const dht = createDHT(peerInfos[0], { + let tdht + + beforeEach(() => { + tdht = new TestDHT() + }) + + afterEach(() => { + return tdht.teardown() + }) + + it('simple', async () => { + const [dht] = await tdht.spawn(1, { kBucketSize: 5 }) - expect(dht).to.have.property('peerInfo').eql(peerInfos[0]) + expect(dht).to.have.property('peerInfo') expect(dht).to.have.property('kBucketSize', 5) expect(dht).to.have.property('routingTable') }) - it('with validators and selectors', () => { - const dht = createDHT(peerInfos[0], { + it('with validators and selectors', async () => { + const [dht] = await tdht.spawn(1, { validators: { ipns: { func: () => { } } }, @@ -63,7 +69,7 @@ describe('KadDHT', () => { } }) - expect(dht).to.have.property('peerInfo').eql(peerInfos[0]) + expect(dht).to.have.property('peerInfo') expect(dht).to.have.property('routingTable') expect(dht.validators).to.have.property('ipns') expect(dht.selectors).to.have.property('ipns') @@ -71,8 +77,18 @@ describe('KadDHT', () => { }) describe('start and stop', () => { + let tdht + + beforeEach(() => { + tdht = new TestDHT() + }) + + afterEach(() => { + return tdht.teardown() + }) + it('simple with defaults', async () => { - const dht = createDHT(peerInfos[0]) + const [dht] = await tdht.spawn(1) sinon.spy(dht.network, 'start') sinon.spy(dht.randomWalk, 'start') @@ -90,7 +106,7 @@ describe('KadDHT', () => { }) it('random-walk disabled', async () => { - const dht = createDHT(peerInfos[0], { + const [dht] = await tdht.spawn(1, { randomWalk: { enabled: false } }) @@ -110,14 +126,14 @@ describe('KadDHT', () => { }) it('should not fail when already started', async () => { - const dht = createDHT(peerInfos[0]) + const [dht] = await tdht.spawn(1) await dht.start() await dht.start() }) - it('should not fail to stop when was not started', () => { - const dht = createDHT(peerInfos[0]) + it('should not fail to stop when was not started', async () => { + const [dht] = await tdht.spawn(1) dht.stop() }) @@ -134,7 +150,7 @@ describe('KadDHT', () => { const [dhtA, dhtB] = await tdht.spawn(2) // Connect nodes - await connect(dhtA, dhtB) + await tdht.connect(dhtA, dhtB) // Exchange data through the dht await dhtA.put(key, value) @@ -160,11 +176,10 @@ describe('KadDHT', () => { const stub = sinon.stub(dhtD, '_verifyRecordLocally').rejects(error) await Promise.all([ - connect(dhtA, dhtB), - connect(dhtA, dhtC), - connect(dhtA, dhtD) + tdht.connect(dhtA, dhtB), + tdht.connect(dhtA, dhtC), + tdht.connect(dhtA, dhtD) ]) - // DHT operations await dhtA.put(key, value, { minPeers: 2 }) const res = await dhtB.get(key, { timeout: 1000 }) @@ -190,9 +205,9 @@ describe('KadDHT', () => { const stub2 = sinon.stub(dhtC, '_verifyRecordLocally').rejects(error) await Promise.all([ - connect(dhtA, dhtB), - connect(dhtA, dhtC), - connect(dhtA, dhtD) + tdht.connect(dhtA, dhtB), + tdht.connect(dhtA, dhtC), + tdht.connect(dhtA, dhtD) ]) // DHT operations @@ -223,8 +238,8 @@ describe('KadDHT', () => { const stub = sinon.stub(dhtC, '_verifyRecordLocally').rejects(error) await Promise.all([ - connect(dhtA, dhtB), - connect(dhtA, dhtC) + tdht.connect(dhtA, dhtB), + tdht.connect(dhtA, dhtC) ]) // DHT operations @@ -248,7 +263,7 @@ describe('KadDHT', () => { const tdht = new TestDHT() const [dhtA, dhtB] = await tdht.spawn(2) - await connect(dhtA, dhtB) + await tdht.connect(dhtA, dhtB) // DHT operations await dhtA.put(key, value) @@ -276,7 +291,7 @@ describe('KadDHT', () => { } }) - await connect(dhtA, dhtB) + await tdht.connect(dhtA, dhtB) // DHT operations await dhtA.put(key, value) @@ -295,7 +310,7 @@ describe('KadDHT', () => { const tdht = new TestDHT() const [dhtA, dhtB] = await tdht.spawn(2) - await connect(dhtA, dhtB) + await tdht.connect(dhtA, dhtB) try { await dhtA.put(key, value) @@ -324,7 +339,7 @@ describe('KadDHT', () => { await dhtB.put(key, valueB) // Connect peers - await connect(dhtA, dhtB) + await tdht.connect(dhtA, dhtB) // Get values const resA = await dhtA.get(key, { timeout: 1000 }) @@ -352,9 +367,9 @@ describe('KadDHT', () => { // Connect all await Promise.all([ - connect(dhts[0], dhts[1]), - connect(dhts[1], dhts[2]), - connect(dhts[2], dhts[3]) + tdht.connect(dhts[0], dhts[1]), + tdht.connect(dhts[1], dhts[2]), + tdht.connect(dhts[2], dhts[3]) ]) // DHT operations @@ -370,7 +385,8 @@ describe('KadDHT', () => { const value = Buffer.from('world') const rec = new Record(key, value) - const dht = createDHT(peerInfos[0]) + const tdht = new TestDHT() + const [dht] = await tdht.spawn(1) await dht.start() const stubs = [ @@ -404,9 +420,9 @@ describe('KadDHT', () => { // connect peers await Promise.all([ - connect(dhts[0], dhts[1]), - connect(dhts[1], dhts[2]), - connect(dhts[2], dhts[3]) + tdht.connect(dhts[0], dhts[1]), + tdht.connect(dhts[1], dhts[2]), + tdht.connect(dhts[2], dhts[3]) ]) // provide values @@ -448,8 +464,8 @@ describe('KadDHT', () => { // Connect await Promise.all([ - connect(dhts[0], dhts[1]), - connect(dhts[1], dhts[2]) + tdht.connect(dhts[0], dhts[1]), + tdht.connect(dhts[1], dhts[2]) ]) await Promise.all(dhts.map((dht) => dht.provide(val.cid))) @@ -479,9 +495,9 @@ describe('KadDHT', () => { // Connect all await Promise.all([ - connect(dhts[0], dhts[1]), - connect(dhts[1], dhts[2]), - connect(dhts[2], dhts[3]) + tdht.connect(dhts[0], dhts[1]), + tdht.connect(dhts[1], dhts[2]), + tdht.connect(dhts[2], dhts[3]) ]) const ids = dhts.map((d) => d.peerInfo.id) @@ -530,7 +546,7 @@ describe('KadDHT', () => { } } - await Promise.all(conns.map((conn) => connect(dhtsById.get(conn[0]), dhtsById.get(conn[1])))) + await Promise.all(conns.map((conn) => tdht.connect(dhtsById.get(conn[0]), dhtsById.get(conn[1])))) // Get the alpha (3) closest peers to the key from the origin's // routing table @@ -578,7 +594,7 @@ describe('KadDHT', () => { const dhts = await tdht.spawn(nDHTs) await pMapSeries(dhts, async (_, index) => { - await connect(dhts[index], dhts[(index + 1) % dhts.length]) + await tdht.connect(dhts[index], dhts[(index + 1) % dhts.length]) }) const res = await dhts[1].getClosestPeers(Buffer.from('foo')) @@ -596,7 +612,7 @@ describe('KadDHT', () => { const dhts = await tdht.spawn(2) const ids = dhts.map((d) => d.peerInfo.id) - dhts[0].peerBook.put(dhts[1].peerInfo) + dhts[0].peerStore.put(dhts[1].peerInfo) const key = await dhts[0].getPublicKey(ids[1]) expect(key).to.eql(dhts[1].peerInfo.id.pubKey) @@ -616,12 +632,10 @@ describe('KadDHT', () => { const ids = dhts.map((d) => d.peerInfo.id) - await connect(dhts[0], dhts[1]) + await tdht.connect(dhts[0], dhts[1]) // remove the pub key to be sure it is fetched - const p = dhts[0].peerBook.get(ids[1]) - p.id._pubKey = null - dhts[0].peerBook.put(p, true) + dhts[0].peerStore.put(dhts[1].peerInfo, true) const key = await dhts[0].getPublicKey(ids[1]) expect(key.equals(dhts[1].peerInfo.id.pubKey)).to.eql(true) @@ -631,20 +645,30 @@ describe('KadDHT', () => { }) describe('internals', () => { + let tdht + + beforeEach(() => { + tdht = new TestDHT() + }) + + afterEach(() => { + return tdht.teardown() + }) + it('_nearestPeersToQuery', async () => { - const dht = createDHT(peerInfos[0]) + const [dht] = await tdht.spawn(1) - dht.peerBook.put(peerInfos[1]) + dht.peerStore.put(peerInfos[1]) await dht._add(peerInfos[1]) const res = await dht._nearestPeersToQuery({ key: 'hello' }) expect(res).to.be.eql([peerInfos[1]]) }) it('_betterPeersToQuery', async () => { - const dht = createDHT(peerInfos[0]) + const [dht] = await tdht.spawn(1) - dht.peerBook.put(peerInfos[1]) - dht.peerBook.put(peerInfos[2]) + dht.peerStore.put(peerInfos[1]) + dht.peerStore.put(peerInfos[2]) await dht._add(peerInfos[1]) await dht._add(peerInfos[2]) @@ -654,8 +678,18 @@ describe('KadDHT', () => { }) describe('_checkLocalDatastore', () => { + let tdht + + beforeEach(() => { + tdht = new TestDHT() + }) + + afterEach(() => { + return tdht.teardown() + }) + it('allow a peer record from store if recent', async () => { - const dht = createDHT(peerInfos[0]) + const [dht] = await tdht.spawn(1) const record = new Record( Buffer.from('hello'), @@ -671,7 +705,7 @@ describe('KadDHT', () => { }) it('delete entries received from peers that have expired', async () => { - const dht = createDHT(peerInfos[0]) + const [dht] = await tdht.spawn(1) const record = new Record( Buffer.from('hello'), @@ -696,9 +730,10 @@ describe('KadDHT', () => { }) }) - it('_verifyRecordLocally', () => { - const dht = createDHT(peerInfos[0]) - dht.peerBook.put(peerInfos[1]) + it('_verifyRecordLocally', async () => { + const [dht] = await tdht.spawn(1) + + dht.peerStore.put(peerInfos[1]) const record = new Record( Buffer.from('hello'), @@ -743,7 +778,7 @@ describe('KadDHT', () => { const [dhtA, dhtB] = await tdht.spawn(2) const stub = sinon.stub(dhtA, '_getValueOrPeers').rejects(error) - await connect(dhtA, dhtB) + await tdht.connect(dhtA, dhtB) try { await dhtA.get(Buffer.from('/v/hello'), { timeout: 1000 }) @@ -765,7 +800,7 @@ describe('KadDHT', () => { const [dhtA, dhtB] = await tdht.spawn(2) const stub = sinon.stub(dhtA, '_getValueOrPeers').rejects(error) - await connect(dhtA, dhtB) + await tdht.connect(dhtA, dhtB) try { await dhtA.get(Buffer.from('/v/hello'), { timeout: 1000 }) @@ -786,9 +821,9 @@ describe('KadDHT', () => { const ids = dhts.map((d) => d.peerInfo.id) await Promise.all([ - connect(dhts[0], dhts[1]), - connect(dhts[1], dhts[2]), - connect(dhts[2], dhts[3]) + tdht.connect(dhts[0], dhts[1]), + tdht.connect(dhts[1], dhts[2]), + tdht.connect(dhts[2], dhts[3]) ]) const stub = sinon.stub(dhts[0].routingTable, 'closestPeers').returns([]) diff --git a/test/multiple-nodes.spec.js b/test/multiple-nodes.spec.js index c0d611a6..3e006426 100644 --- a/test/multiple-nodes.spec.js +++ b/test/multiple-nodes.spec.js @@ -5,7 +5,6 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const { connect } = require('./utils') const TestDHT = require('./utils/test-dht') describe('multiple nodes', () => { @@ -14,23 +13,20 @@ describe('multiple nodes', () => { let dhts // spawn nodes - before(async function () { + beforeEach(async function () { this.timeout(10 * 1000) tdht = new TestDHT() dhts = await tdht.spawn(n) - }) - // connect nodes - before(function () { // all nodes except the last one const range = Array.from(Array(n - 1).keys()) // connect the last one with the others one by one - return Promise.all(range.map((i) => connect(dhts[n - 1], dhts[i]))) + return Promise.all(range.map((i) => tdht.connect(dhts[n - 1], dhts[i]))) }) - after(function () { + afterEach(function () { this.timeout(10 * 1000) return tdht.teardown() @@ -44,13 +40,13 @@ describe('multiple nodes', () => { await dhts[7].put(key, value) const res = await Promise.all([ - dhts[0].get(key, { maxTimeout: 1000 }), - dhts[1].get(key, { maxTimeout: 1000 }), - dhts[2].get(key, { maxTimeout: 1000 }), - dhts[3].get(key, { maxTimeout: 1000 }), - dhts[4].get(key, { maxTimeout: 1000 }), - dhts[5].get(key, { maxTimeout: 1000 }), - dhts[6].get(key, { maxTimeout: 1000 }) + dhts[0].get(key, { timeout: 1000 }), + dhts[1].get(key, { timeout: 1000 }), + dhts[2].get(key, { timeout: 1000 }), + dhts[3].get(key, { timeout: 1000 }), + dhts[4].get(key, { timeout: 1000 }), + dhts[5].get(key, { timeout: 1000 }), + dhts[6].get(key, { timeout: 1000 }) ]) expect(res[0]).to.eql(Buffer.from('world')) @@ -70,13 +66,13 @@ describe('multiple nodes', () => { await dhts[1].put(key, value) const res = await Promise.all([ - dhts[0].get(key, { maxTimeout: 1000 }), - dhts[2].get(key, { maxTimeout: 1000 }), - dhts[3].get(key, { maxTimeout: 1000 }), - dhts[4].get(key, { maxTimeout: 1000 }), - dhts[5].get(key, { maxTimeout: 1000 }), - dhts[6].get(key, { maxTimeout: 1000 }), - dhts[7].get(key, { maxTimeout: 1000 }) + dhts[0].get(key, { timeout: 1000 }), + dhts[2].get(key, { timeout: 1000 }), + dhts[3].get(key, { timeout: 1000 }), + dhts[4].get(key, { timeout: 1000 }), + dhts[5].get(key, { timeout: 1000 }), + dhts[6].get(key, { timeout: 1000 }), + dhts[7].get(key, { timeout: 1000 }) ]) expect(res[0]).to.eql(Buffer.from('world')) @@ -100,10 +96,10 @@ describe('multiple nodes', () => { await dhts[4].put(key, Buffer.from('world4')) const res = await Promise.all([ - dhts[3].get(key, { maxTimeout: 2000 }), - dhts[4].get(key, { maxTimeout: 2000 }), - dhts[5].get(key, { maxTimeout: 2000 }), - dhts[6].get(key, { maxTimeout: 2000 }) + dhts[4].get(key, { timeout: 2000 }), + dhts[5].get(key, { timeout: 2000 }), + dhts[6].get(key, { timeout: 2000 }), + dhts[7].get(key, { timeout: 2000 }) ]) expect(res[0]).to.eql(result) diff --git a/test/network.spec.js b/test/network.spec.js index 929721bc..39cafc17 100644 --- a/test/network.spec.js +++ b/test/network.spec.js @@ -4,45 +4,47 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const Connection = require('interface-connection').Connection -const pull = require('pull-stream') -const lp = require('pull-length-prefixed') + +const pair = require('it-pair') +const pipe = require('it-pipe') +const delay = require('delay') +const lp = require('it-length-prefixed') const pDefer = require('p-defer') -const PeerBook = require('peer-book') -const Switch = require('libp2p-switch') -const TCP = require('libp2p-tcp') -const Mplex = require('libp2p-mplex') -const KadDHT = require('../src') const Message = require('../src/message') -const createPeerInfo = require('./utils/create-peer-info') +const TestDHT = require('./utils/test-dht') describe('Network', () => { let dht - let peerInfos + let tdht before(async function () { this.timeout(10 * 1000) - peerInfos = await createPeerInfo(3) - - const sw = new Switch(peerInfos[0], new PeerBook()) - sw.transport.add('tcp', new TCP()) - sw.connection.addStreamMuxer(Mplex) - sw.connection.reuse() - dht = new KadDHT({ sw }) - - await sw.start() - await dht.start() + tdht = new TestDHT() + ;[dht] = await tdht.spawn(1) }) - after(() => Promise.all([ - dht.stop(), - dht.switch.stop() - ])) + after(() => tdht.teardown()) describe('sendRequest', () => { - it('send and response', async () => { + it('send and response echo', async () => { + const msg = new Message(Message.TYPES.PING, Buffer.from('hello'), 0) + + // mock dial + dht.dialer.dialProtocol = (peer, protocol) => { + expect(protocol).to.eql('/ipfs/kad/1.0.0') + + return { + stream: pair() // {source, sink} streams that are internally connected + } + } + + const response = await dht.network.sendRequest(dht.peerInfo.id, msg) + expect(response.type).to.eql(Message.TYPES.PING) + }) + + it('send and response different messages', async () => { const defer = pDefer() let i = 0 const finish = () => { @@ -54,29 +56,50 @@ describe('Network', () => { const msg = new Message(Message.TYPES.PING, Buffer.from('hello'), 0) // mock it - dht.switch.dial = (peer, protocol, callback) => { + dht.dialer.dialProtocol = async (peer, protocol) => { expect(protocol).to.eql('/ipfs/kad/1.0.0') const msg = new Message(Message.TYPES.FIND_NODE, Buffer.from('world'), 0) - const rawConn = { - source: pull( - pull.values([msg.serialize()]), - lp.encode() - ), - sink: pull( + const data = [] + await pipe( + [msg.serialize()], + lp.encode(), + async source => { + for await (const chunk of source) { + data.push(chunk.slice()) + } + } + ) + + const source = (function * () { + const array = data + + while (array.length) { + yield array.shift() + } + })() + + const sink = async source => { + const res = [] + await pipe( + source, lp.decode(), - pull.collect((err, res) => { - expect(err).to.not.exist() - expect(Message.deserialize(res[0]).type).to.eql(Message.TYPES.PING) - finish() - }) + async source => { + for await (const chunk of source) { + res.push(chunk.slice()) + } + } ) + expect(Message.deserialize(res[0]).type).to.eql(Message.TYPES.PING) + finish() + } + + return { + stream: { source, sink } } - const conn = new Connection(rawConn) - callback(null, conn) } - const response = await dht.network.sendRequest(peerInfos[0].id, msg) + const response = await dht.network.sendRequest(dht.peerInfo.id, msg) expect(response.type).to.eql(Message.TYPES.FIND_NODE) finish() @@ -96,28 +119,37 @@ describe('Network', () => { const msg = new Message(Message.TYPES.PING, Buffer.from('hello'), 0) // mock it - dht.switch.dial = (peer, protocol, callback) => { + dht.dialer.dialProtocol = (peer, protocol) => { expect(protocol).to.eql('/ipfs/kad/1.0.0') - const rawConn = { - // hanging - source: (end, cb) => {}, - sink: pull( + + const source = (async function * () { // eslint-disable-line require-yield + await delay(1000) + })() + + const sink = async source => { + const res = [] + await pipe( + source, lp.decode(), - pull.collect((err, res) => { - expect(err).to.not.exist() - expect(Message.deserialize(res[0]).type).to.eql(Message.TYPES.PING) - finish() - }) + async source => { + for await (const chunk of source) { + res.push(chunk.slice()) + } + } ) + expect(Message.deserialize(res[0]).type).to.eql(Message.TYPES.PING) + finish() + } + + return { + stream: { source, sink } } - const conn = new Connection(rawConn) - callback(null, conn) } dht.network.readMessageTimeout = 100 try { - await dht.network.sendRequest(peerInfos[0].id, msg) + await dht.network.sendRequest(dht.peerInfo.id, msg) } catch (err) { expect(err).to.exist() expect(err.message).to.match(/timed out/) diff --git a/test/query.spec.js b/test/query.spec.js index fb04e9b1..d22db399 100644 --- a/test/query.spec.js +++ b/test/query.spec.js @@ -4,45 +4,39 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const PeerBook = require('peer-book') -const Switch = require('libp2p-switch') -const TCP = require('libp2p-tcp') -const Mplex = require('libp2p-mplex') const pDefer = require('p-defer') const delay = require('delay') -const DHT = require('../src') const Query = require('../src/query') +const kadUtils = require('../src/utils') const createPeerInfo = require('./utils/create-peer-info') +const TestDHT = require('./utils/test-dht') const createDisjointTracks = require('./utils/create-disjoint-tracks') -const kadUtils = require('../src/utils') - -const createDHT = async (peerInfos) => { - const sw = new Switch(peerInfos[0], new PeerBook()) - sw.transport.add('tcp', new TCP()) - sw.connection.addStreamMuxer(Mplex) - sw.connection.reuse() - const d = new DHT({ sw }) - - await d.start() - return d -} describe('Query', () => { let peerInfos + let tdht let dht before(async () => { peerInfos = await createPeerInfo(40) - dht = await createDHT(peerInfos) + }) + + beforeEach(async () => { + tdht = new TestDHT() + ;[dht] = await tdht.spawn(1) + }) + + afterEach(() => { + return tdht.teardown() }) it('simple run', async () => { - const peer = peerInfos[0] + const peer = dht.peerInfo // mock this so we can dial non existing peers - dht.switch.dial = (peer, callback) => callback() + dht.dialer.dial = () => {} let i = 0 const queryFunc = async (p) => { // eslint-disable-line require-await @@ -69,10 +63,10 @@ describe('Query', () => { }) it('does not return an error if only some queries error', async () => { - const peer = peerInfos[0] + const peer = dht.peerInfo // mock this so we can dial non existing peers - dht.switch.dial = (peer, callback) => callback() + dht.dialer.dial = () => {} let i = 0 const visited = [] @@ -103,10 +97,10 @@ describe('Query', () => { }) it('returns an error if all queries error', async () => { - const peer = peerInfos[0] + const peer = dht.peerInfo // mock this so we can dial non existing peers - dht.switch.dial = (peer, callback) => callback() + dht.dialer.dial = () => {} const queryFunc = async (p) => { throw new Error('fail') } // eslint-disable-line require-await const q = new Query(dht, peer.id.id, () => queryFunc) @@ -123,7 +117,7 @@ describe('Query', () => { }) it('returns empty run if initial peer list is empty', async () => { - const peer = peerInfos[0] + const peer = dht.peerInfo const queryFunc = async (p) => {} const q = new Query(dht, peer.id.id, () => queryFunc) @@ -135,10 +129,10 @@ describe('Query', () => { }) it('only closerPeers', async () => { - const peer = peerInfos[0] + const peer = dht.peerInfo // mock this so we can dial non existing peers - dht.switch.dial = (peer, callback) => callback() + dht.dialer.dial = () => {} const queryFunc = async (p) => { // eslint-disable-line require-await return { @@ -153,10 +147,10 @@ describe('Query', () => { }) it('only closerPeers concurrent', async () => { - const peer = peerInfos[0] + const peer = dht.peerInfo // mock this so we can dial non existing peers - dht.switch.dial = (peer, callback) => callback() + dht.dialer.dial = () => {} // 1 -> 8 // 2 -> 4 -> 5 @@ -201,10 +195,10 @@ describe('Query', () => { }) it('early success', async () => { - const peer = peerInfos[0] + const peer = dht.peerInfo // mock this so we can dial non existing peers - dht.switch.dial = (peer, callback) => callback() + dht.dialer.dial = () => {} // 1 -> 2 -> 3 -> 4 const topology = { @@ -244,11 +238,11 @@ describe('Query', () => { it('all queries stop after shutdown', async () => { const deferShutdown = pDefer() - const dhtA = await createDHT(peerInfos) - const peer = peerInfos[0] + const [dhtA] = await tdht.spawn(1) + const peer = dht.peerInfo // mock this so we can dial non existing peers - dhtA.switch.dial = (peer, callback) => callback() + dhtA.dialer.dial = (peer) => {} // 1 -> 2 -> 3 -> 4 const topology = { @@ -301,11 +295,11 @@ describe('Query', () => { }) it('queries run after shutdown return immediately', async () => { - const dhtA = await createDHT(peerInfos) - const peer = peerInfos[0] + const [dhtA] = await tdht.spawn(1) + const peer = dht.peerInfo // mock this so we can dial non existing peers - dhtA.switch.dial = (peer, callback) => callback() + dhtA.dialer.dial = (peer, callback) => callback() // 1 -> 2 -> 3 const topology = { @@ -335,11 +329,11 @@ describe('Query', () => { }) it('disjoint path values', async () => { - const peer = peerInfos[0] + const peer = dht.peerInfo const values = ['v0', 'v1'].map(Buffer.from) // mock this so we can dial non existing peers - dht.switch.dial = (peer, callback) => callback() + dht.dialer.dial = () => {} // 1 -> 2 -> 3 (v0) // 4 -> 5 (v1) @@ -390,11 +384,11 @@ describe('Query', () => { }) it('disjoint path values with early completion', async () => { - const peer = peerInfos[0] + const peer = dht.peerInfo const values = ['v0', 'v1'].map(Buffer.from) // mock this so we can dial non existing peers - dht.switch.dial = (peer, callback) => callback() + dht.dialer.dial = () => {} // 1 -> 2 (delay) -> 3 // 4 -> 5 [query complete] @@ -459,11 +453,11 @@ describe('Query', () => { }) it('disjoint path continue other paths after error on one path', async () => { - const peer = peerInfos[0] + const peer = dht.peerInfo const values = ['v0', 'v1'].map(Buffer.from) // mock this so we can dial non existing peers - dht.switch.dial = (peer, callback) => callback() + dht.dialer.dial = () => {} // 1 -> 2 (delay) -> 3 [pathComplete] // 4 -> 5 [error] -> 6 @@ -527,10 +521,10 @@ describe('Query', () => { it('stop after finding k closest peers', async () => { // mock this so we can dial non existing peers - dht.switch.dial = (peer, callback) => callback() + dht.dialer.dial = () => {} - // Sort peers by distance from peerInfos[0] - const peerZeroDhtKey = await kadUtils.convertPeerId(peerInfos[0].id) + // Sort peers by distance from dht.peerInfo + const peerZeroDhtKey = await kadUtils.convertPeerId(dht.peerInfo.id) const peerIds = peerInfos.map(pi => pi.id) const sorted = await kadUtils.sortClosestPeers(peerIds, peerZeroDhtKey) @@ -586,7 +580,7 @@ describe('Query', () => { return { closerPeers } } - const q = new Query(dht, peerInfos[0].id.id, () => queryFunc) + const q = new Query(dht, dht.peerInfo.id.id, () => queryFunc) const res = await q.run(initial) // Should query 19 peers, then find some peers closer to the key, and @@ -638,7 +632,7 @@ describe('Query', () => { } = await createDisjointTracks(samplePeerInfos, goodLength) // mock this so we can dial non existing peers - dht.switch.dial = (peer, callback) => callback() + dht.dialer.dial = () => {} let badEndVisited = false let targetVisited = false @@ -668,10 +662,10 @@ describe('Query', () => { it('should discover closer peers', () => { const discoverDefer = pDefer() - const peer = peerInfos[0] + const peer = dht.peerInfo // mock this so we can dial non existing peers - dht.switch.dial = (peer, callback) => callback() + dht.dialer.dial = () => {} const queryFunc = async (p) => { // eslint-disable-line require-await return { diff --git a/test/query/index.spec.js b/test/query/index.spec.js index 1fd3f453..7d178315 100644 --- a/test/query/index.spec.js +++ b/test/query/index.spec.js @@ -46,11 +46,14 @@ describe('Query', () => { }) before('create a dht', () => { + const peerStore = new PeerBook() dht = new DHT({ - sw: { + dialer: { _peerInfo: ourPeerInfo, - _peerBook: new PeerBook() - } + _peerBook: peerStore + }, + peerStore, + peerInfo: ourPeerInfo }) }) diff --git a/test/random-walk.spec.js b/test/random-walk.spec.js index db154bf6..180e4485 100644 --- a/test/random-walk.spec.js +++ b/test/random-walk.spec.js @@ -13,7 +13,6 @@ const { AssertionError } = require('assert') const TestDHT = require('./utils/test-dht') const { bootstrap, - connect, waitForWellFormedTables } = require('./utils') @@ -263,7 +262,8 @@ describe('Random Walk', () => { }) it('manual operation', async function () { - this.timeout(20 * 1000) + const timeout = 20 * 1000 + this.timeout(timeout) const nDHTs = 20 const tdht = new TestDHT() @@ -272,11 +272,11 @@ describe('Random Walk', () => { const dhts = await tdht.spawn(nDHTs) await Promise.all( - Array.from({ length: nDHTs }).map((_, i) => connect(dhts[i], dhts[(i + 1) % nDHTs])) + Array.from({ length: nDHTs }).map((_, i) => tdht.connect(dhts[i], dhts[(i + 1) % nDHTs])) ) bootstrap(dhts) - await waitForWellFormedTables(dhts, 7, 0, 20 * 1000) + await waitForWellFormedTables(dhts, 7, 0, timeout) return tdht.teardown() }) diff --git a/test/rpc/handlers/add-provider.spec.js b/test/rpc/handlers/add-provider.spec.js index 4070dbd1..5ab9b35b 100644 --- a/test/rpc/handlers/add-provider.spec.js +++ b/test/rpc/handlers/add-provider.spec.js @@ -80,7 +80,7 @@ describe('rpc - handlers - AddProvider', () => { expect(provs).to.have.length(1) expect(provs[0].id).to.eql(provider.id.id) - const bookEntry = dht.peerBook.get(provider.id) + const bookEntry = dht.peerStore.get(provider.id) // Favour peerInfo from payload over peerInfo from sender expect(bookEntry.multiaddrs.toArray()).to.eql( @@ -100,7 +100,7 @@ describe('rpc - handlers - AddProvider', () => { const provs = await dht.providers.getProviders(cid) - expect(dht.peerBook.has(provider.id)).to.equal(false) + expect(dht.peerStore.has(provider.id)).to.equal(false) expect(provs).to.have.length(1) expect(provs[0].id).to.eql(provider.id.id) }) diff --git a/test/rpc/handlers/get-value.spec.js b/test/rpc/handlers/get-value.spec.js index b0ad3cd4..417b7c10 100644 --- a/test/rpc/handlers/get-value.spec.js +++ b/test/rpc/handlers/get-value.spec.js @@ -87,6 +87,7 @@ describe('rpc - handlers - GetValue', () => { const msg = new Message(T, key, 0) + dht.peerStore.put(other) await dht._add(other) const response = await handler(dht)(peers[0], msg) expect(response.record).to.exist() diff --git a/test/rpc/handlers/put-value.spec.js b/test/rpc/handlers/put-value.spec.js index f46913fe..3548616d 100644 --- a/test/rpc/handlers/put-value.spec.js +++ b/test/rpc/handlers/put-value.spec.js @@ -5,7 +5,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const Record = require('libp2p-record').Record +const { Record } = require('libp2p-record') const delay = require('delay') const Message = require('../../../src/message') diff --git a/test/rpc/index.spec.js b/test/rpc/index.spec.js index a4c7d8d9..97cc2b6a 100644 --- a/test/rpc/index.spec.js +++ b/test/rpc/index.spec.js @@ -4,69 +4,82 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const pull = require('pull-stream') -const lp = require('pull-length-prefixed') -const Connection = require('interface-connection').Connection -const PeerBook = require('peer-book') -const Switch = require('libp2p-switch') -const TCP = require('libp2p-tcp') -const Mplex = require('libp2p-mplex') +const pDefer = require('p-defer') +const pipe = require('it-pipe') +const lp = require('it-length-prefixed') const Message = require('../../src/message') -const KadDHT = require('../../src') const rpc = require('../../src/rpc') const createPeerInfo = require('../utils/create-peer-info') +const TestDHT = require('../utils/test-dht') describe('rpc', () => { let peerInfos + let tdht before(async () => { peerInfos = await createPeerInfo(2) + tdht = new TestDHT() }) - describe('protocolHandler', () => { - it('calls back with the response', (done) => { - const sw = new Switch(peerInfos[0], new PeerBook()) - sw.transport.add('tcp', new TCP()) - sw.connection.addStreamMuxer(Mplex) - sw.connection.reuse() - const dht = new KadDHT({ - sw, - kBucketSize: 5 - }) + it('calls back with the response', async () => { + const defer = pDefer() + const [dht] = await tdht.spawn(1) - dht.peerBook.put(peerInfos[1]) + dht.peerStore.put(peerInfos[1]) - const msg = new Message(Message.TYPES.GET_VALUE, Buffer.from('hello'), 5) + const msg = new Message(Message.TYPES.GET_VALUE, Buffer.from('hello'), 5) - const conn = makeConnection(msg, peerInfos[1], (err, res) => { - expect(err).to.not.exist() - expect(res).to.have.length(1) - const msg = Message.deserialize(res[0]) - expect(msg).to.have.property('key').eql(Buffer.from('hello')) - expect(msg).to.have.property('closerPeers').eql([]) + const validateMessage = (res) => { + const msg = Message.deserialize(res[0]) + expect(msg).to.have.property('key').eql(Buffer.from('hello')) + expect(msg).to.have.property('closerPeers').eql([]) + defer.resolve() + } - done() - }) + const data = [] + await pipe( + [msg.serialize()], + lp.encode(), + async source => { + for await (const chunk of source) { + data.push(chunk.slice()) + } + } + ) + + const duplexStream = { + source: function * () { + const array = data + + while (array.length) { + yield array.shift() + } + }, + sink: async (source) => { + const res = [] + await pipe( + source, + lp.decode(), + async source => { + for await (const chunk of source) { + res.push(chunk.slice()) + } + } + ) + validateMessage(res) + } + } - rpc(dht)('protocol', conn) + rpc(dht)({ + protocol: 'protocol', + stream: duplexStream, + connection: { + remotePeer: peerInfos[1].id + } }) + + return defer.promise }) }) - -function makeConnection (msg, info, callback) { - const rawConn = { - source: pull( - pull.values([msg.serialize()]), - lp.encode() - ), - sink: pull( - lp.decode(), - pull.collect(callback) - ) - } - const conn = new Connection(rawConn) - conn.setPeerInfo(info) - return conn -} diff --git a/test/utils/index.js b/test/utils/index.js index 889d88df..6d9b6f00 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -3,17 +3,54 @@ const delay = require('delay') const pRetry = require('p-retry') const pTimeout = require('p-timeout') -const promisify = require('promisify-es6') +const DuplexPair = require('it-pair/duplex') -const PeerId = require('peer-id') -const PeerInfo = require('peer-info') -const PeerBook = require('peer-book') -const Mplex = require('libp2p-mplex') -const Switch = require('libp2p-switch') -const TCP = require('libp2p-tcp') - -const DHT = require('../../src') const { sortClosestPeers } = require('../../src/utils') + +const createMockRegistrar = (registrarRecord) => ({ + handle: (multicodec, handler) => { + const rec = registrarRecord[multicodec] || {} + + registrarRecord[multicodec] = { + ...rec, + handler + } + }, + register: ({ multicodecs, _onConnect, _onDisconnect }) => { + const rec = registrarRecord[multicodecs[0]] || {} + + registrarRecord[multicodecs[0]] = { + ...rec, + onConnect: _onConnect, + onDisconnect: _onDisconnect + } + + return multicodecs[0] + }, + unregister: (id) => { + delete registrarRecord[id] + } +}) + +exports.createMockRegistrar = createMockRegistrar + +const ConnectionPair = () => { + const [d0, d1] = DuplexPair() + + return [ + { + stream: d0, + newStream: () => Promise.resolve({ stream: d0 }) + }, + { + stream: d1, + newStream: () => Promise.resolve({ stream: d1 }) + } + ] +} + +exports.ConnectionPair = ConnectionPair + /** * Like `sortClosestPeers`, expect it takes and returns `PeerInfo`s * @@ -31,51 +68,6 @@ exports.sortClosestPeerInfos = async (peers, target) => { }) } -const createDHT = (peerInfo, props = {}) => { - const sw = new Switch(peerInfo, new PeerBook()) - sw.transport.add('tcp', new TCP()) - sw.connection.addStreamMuxer(Mplex) - sw.connection.reuse() - return new DHT({ sw, ...props }) -} - -exports.createDHT = createDHT - -exports.createAndStartDHT = async (peerInfo, props) => { - const dht = createDHT(peerInfo, props) - await dht.start() - return dht -} - -// connect two dhts -const connectNoSync = async (a, b) => { - const publicPeerId = new PeerId(b.peerInfo.id.id, null, b.peerInfo.id.pubKey) - const target = new PeerInfo(publicPeerId) - target.multiaddrs = b.peerInfo.multiaddrs - await promisify(cb => a.switch.dial(target, cb))() -} - -const find = (a, b) => { - return pRetry(async () => { - const match = await a.routingTable.find(b.peerInfo.id) - - if (!match) { - await delay(100) - throw new Error('not found') - } - - return match - }, { retries: 50 }) -} - -// connect two dhts and wait for them to have each other -// in their routing table -exports.connect = async (a, b) => { - await connectNoSync(a, b) - await find(a, b) - await find(b, a) -} - exports.bootstrap = (dhts) => { dhts.forEach((dht) => { dht.randomWalk._walk(1, 10000) diff --git a/test/utils/test-dht.js b/test/utils/test-dht.js index d6b66109..ad9a2b86 100644 --- a/test/utils/test-dht.js +++ b/test/utils/test-dht.js @@ -1,23 +1,23 @@ 'use strict' const PeerBook = require('peer-book') -const Switch = require('libp2p-switch') -const TCP = require('libp2p-tcp') -const Mplex = require('libp2p-mplex') - -const createPeerInfo = require('./create-peer-info') +const pRetry = require('p-retry') +const delay = require('delay') const KadDHT = require('../../src') +const { PROTOCOL_DHT } = require('../../src/constants') + +const createPeerInfo = require('./create-peer-info') +const { + createMockRegistrar, + ConnectionPair +} = require('.') class TestDHT { constructor () { this.nodes = [] } - spawnConnected (length, options) { - - } - spawn (length, options = {}) { return Promise.all( Array.from({ length }) @@ -26,6 +26,8 @@ class TestDHT { } async _spawnOne (index, options = {}) { + const regRecord = {} + // Disable random walk by default for more controlled testing options = { randomWalk: { @@ -38,41 +40,107 @@ class TestDHT { const port = index !== undefined ? 8000 + index : 0 p.multiaddrs.add(`/ip4/127.0.0.1/tcp/${port}/p2p/${p.id.toB58String()}`) - // p.multiaddrs.add(`/ip4/127.0.0.1/tcp/0`) - const sw = new Switch(p, new PeerBook()) - sw.transport.add('tcp', new TCP()) - sw.connection.addStreamMuxer(Mplex) - sw.connection.reuse() + const dial = async (peer, protocol) => { + const remotePeerB58 = peer.toB58String() + const remoteDht = this.nodes.find( + (node) => node.peerInfo.id.toB58String() === remotePeerB58 + ) + + const localOnConnect = regRecord[PROTOCOL_DHT].onConnect + const remoteOnConnect = remoteDht.regRecord[PROTOCOL_DHT].onConnect + + const remoteHandler = remoteDht.regRecord[PROTOCOL_DHT].handler + + // Notice peers of connection + const [c0, c1] = ConnectionPair() + await localOnConnect(remoteDht.peerInfo, c1) + await remoteOnConnect(p, c0) + + await remoteHandler({ + protocol: protocol, + stream: c0.stream, + connection: { + remotePeer: p.id + } + }) + + return { + stream: c1.stream + } + } const dht = new KadDHT({ - sw, + dialer: { + dial, + dialProtocol: dial + }, + registrar: createMockRegistrar(regRecord), + peerStore: new PeerBook(), + peerInfo: p, + validators: { + v: { + func () { + return Promise.resolve(true) + }, + sign: false + }, + v2: { + func () { + return Promise.resolve(true) + }, + sign: false + } + }, + selectors: { + v: () => 0 + }, ...options }) - dht.validators.v = { - func (key, publicKey) { - return Promise.resolve(true) - }, - sign: false - } - - dht.validators.v2 = dht.validators.v // added to simulate just validators available - - dht.selectors.v = (k, records) => 0 - - await sw.start() await dht.start() + dht.regRecord = regRecord this.nodes.push(dht) return dht } + async connect (dhtA, dhtB) { + const onConnectA = dhtA.regRecord[PROTOCOL_DHT].onConnect + const onConnectB = dhtB.regRecord[PROTOCOL_DHT].onConnect + + const [c0, c1] = ConnectionPair() + + // Notice peers of connection + await onConnectA(dhtB.peerInfo, c0) + await onConnectB(dhtA.peerInfo, c1) + + return Promise.all([ + pRetry(async () => { + const match = await dhtA.routingTable.find(dhtB.peerInfo.id) + + if (!match) { + await delay(100) + throw new Error('not found') + } + + return match + }, { retries: 50 }), + pRetry(async () => { + const match = await dhtB.routingTable.find(dhtA.peerInfo.id) + + if (!match) { + await delay(100) + throw new Error('not found') + } + + return match + }, { retries: 50 }) + ]) + } + async teardown () { - await Promise.all(this.nodes.map(async (node) => { - await node.stop() - await node.switch.stop() - })) + await Promise.all(this.nodes.map((node) => node.stop())) this.nodes = [] } } From 44054b261eb01aa912d2ffbeb3d6e9ad50192764 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 22 Nov 2019 12:56:30 -0600 Subject: [PATCH 2/3] chore: apply suggestions from code review Co-Authored-By: dirkmc Co-Authored-By: Jacob Heun --- package.json | 4 ++-- src/index.js | 4 ++-- src/network.js | 27 +++++++++++---------------- src/rpc/index.js | 39 +++++++++++++++++++-------------------- test/query/index.spec.js | 5 +---- test/rpc/index.spec.js | 29 ++++++++--------------------- test/utils/index.js | 4 ++-- test/utils/to-buffer.js | 15 +++++++++++++++ 8 files changed, 60 insertions(+), 67 deletions(-) create mode 100644 test/utils/to-buffer.js diff --git a/package.json b/package.json index b2db184b..f5eb00a5 100644 --- a/package.json +++ b/package.json @@ -40,7 +40,6 @@ "abort-controller": "^3.0.0", "async": "^2.6.2", "base32.js": "~0.1.0", - "chai-checkmark": "^1.0.1", "cids": "~0.7.1", "debug": "^4.1.1", "err-code": "^2.0.0", @@ -60,11 +59,11 @@ "p-queue": "^6.2.1", "p-timeout": "^3.2.0", "p-times": "^2.1.0", - "paramap-it": "^0.1.1", "peer-id": "~0.13.5", "peer-info": "~0.17.0", "promise-to-callback": "^1.0.0", "protons": "^1.0.1", + "streaming-iterables": "^4.1.1", "varint": "^5.0.0", "xor-distance": "^2.0.0" }, @@ -72,6 +71,7 @@ "aegir": "^20.4.1", "async-iterator-all": "^1.0.0", "chai": "^4.2.0", + "chai-checkmark": "^1.0.1", "datastore-level": "~0.12.1", "delay": "^4.3.0", "dirty-chai": "^2.0.1", diff --git a/src/index.js b/src/index.js index fca9e5b6..b4de0a6d 100644 --- a/src/index.js +++ b/src/index.js @@ -67,7 +67,7 @@ class KadDHT extends EventEmitter { randomWalk = {} }) { super() - assert(dialer, 'libp2p-kad-dht requires a instance of Dialer') + assert(dialer, 'libp2p-kad-dht requires an instance of Dialer') /** * Local reference to the libp2p dialer instance @@ -82,7 +82,7 @@ class KadDHT extends EventEmitter { this.peerInfo = peerInfo /** - * Local peer info + * Local PeerStore * @type {PeerStore} */ this.peerStore = peerStore diff --git a/src/network.js b/src/network.js index 9a1020c5..a76b7965 100644 --- a/src/network.js +++ b/src/network.js @@ -40,7 +40,6 @@ class Network { return } - // TODO remove: add a way to check if switch has started or not if (!this.dht.isStarted) { throw errcode(new Error('Can not start network'), 'ERR_CANNOT_START_NETWORK') } @@ -52,7 +51,7 @@ class Network { // register protocol with topology const topology = new MulticodecTopology({ - multicodecs: c.PROTOCOL_DHT, + multicodecs: [c.PROTOCOL_DHT], handlers: { onConnect: this._onPeerConnected, onDisconnect: () => {} @@ -98,15 +97,11 @@ class Network { * Registrar notifies a connection successfully with dht protocol. * @private * @param {PeerInfo} peerInfo remote peer info - * @param {Connection} conn connection to the peer * @returns {Promise} */ - async _onPeerConnected (peerInfo, conn) { + async _onPeerConnected (peerInfo) { await this.dht._add(peerInfo) this._log('added to the routing table: %s', peerInfo.id.toB58String()) - - // Open a stream with the connected peer - await conn.newStream(c.PROTOCOL_DHT) } /** @@ -152,40 +147,40 @@ class Network { * If no response is received after the specified timeout * this will error out. * - * @param {Connection} conn - the connection to use + * @param {DuplexIterable} stream - the stream to use * @param {Buffer} msg - the message to send * @returns {Promise} * @private */ - async _writeReadMessage (conn, msg) { // eslint-disable-line require-await + async _writeReadMessage (stream, msg) { // eslint-disable-line require-await return pTimeout( - writeReadMessage(conn, msg), + writeReadMessage(stream, msg), this.readMessageTimeout ) } /** - * Write a message to the given connection. + * Write a message to the given stream. * - * @param {Connection} conn - the connection to use + * @param {DuplexIterable} stream - the stream to use * @param {Buffer} msg - the message to send * @returns {Promise} * @private */ - _writeMessage (conn, msg) { + _writeMessage (stream, msg) { return pipe( [msg], lp.encode(), - conn + stream ) } } -async function writeReadMessage (conn, msg) { +async function writeReadMessage (stream, msg) { const res = await pipe( [msg], lp.encode(), - conn, + stream, utils.itFilter( (msg) => msg.length < c.maxMessageSize ), diff --git a/src/rpc/index.js b/src/rpc/index.js index cdac664c..c494877e 100644 --- a/src/rpc/index.js +++ b/src/rpc/index.js @@ -2,7 +2,6 @@ const pipe = require('it-pipe') const lp = require('it-length-prefixed') -const paramap = require('paramap-it') const PeerInfo = require('peer-info') const Message = require('../message') @@ -22,15 +21,14 @@ module.exports = (dht) => { * * @private */ - async function handleMessage (peer, msg) { // eslint-disable-line - // get handler & exectue it + async function handleMessage (peer, msg) { + // get handler & execute it const handler = getMessageHandler(msg.type) try { await dht._add(peer) } catch (err) { - log.error('Failed to update the kbucket store') - log.error(err) + log.error('Failed to update the kbucket store', err) } if (!handler) { @@ -44,14 +42,12 @@ module.exports = (dht) => { /** * Handle incoming streams on the dht protocol. * @param {Object} props - * @param {string} props.protocol * @param {DuplexStream} props.stream * @param {Connection} props.connection connection * @returns {Promise} */ - return async function onIncomingStream ({ protocol, stream, connection }) { + return async function onIncomingStream ({ stream, connection }) { const peerInfo = await PeerInfo.create(connection.remotePeer) - peerInfo.protocols.add(protocol) try { await dht._add(peerInfo) @@ -65,18 +61,21 @@ module.exports = (dht) => { await pipe( stream.source, lp.decode(), - utils.itFilter( - (msg) => msg.length < c.maxMessageSize - ), - source => paramap(source, rawMsg => { - const msg = Message.deserialize(rawMsg.slice()) - return handleMessage(peerInfo, msg) - }), - // Not all handlers will return a response - utils.itFilter(Boolean), - source => paramap(source, response => { - return response.serialize() - }), + source => (async function * () { + for await (const msg of source) { + // Check message size + if (msg.length < c.maxMessageSize) { + // handle the message + const desMessage = Message.deserialize(msg.slice()) + const res = await handleMessage(peerInfo, desMessage) + + // Not all handlers will return a response + if (res) { + yield res.serialize() + } + } + } + })(), lp.encode(), stream.sink ) diff --git a/test/query/index.spec.js b/test/query/index.spec.js index 7d178315..aa92f699 100644 --- a/test/query/index.spec.js +++ b/test/query/index.spec.js @@ -48,10 +48,7 @@ describe('Query', () => { before('create a dht', () => { const peerStore = new PeerBook() dht = new DHT({ - dialer: { - _peerInfo: ourPeerInfo, - _peerBook: peerStore - }, + dialer: {}, peerStore, peerInfo: ourPeerInfo }) diff --git a/test/rpc/index.spec.js b/test/rpc/index.spec.js index 97cc2b6a..d4e59a28 100644 --- a/test/rpc/index.spec.js +++ b/test/rpc/index.spec.js @@ -7,12 +7,14 @@ const expect = chai.expect const pDefer = require('p-defer') const pipe = require('it-pipe') const lp = require('it-length-prefixed') +const { collect } = require('streaming-iterables') const Message = require('../../src/message') const rpc = require('../../src/rpc') const createPeerInfo = require('../utils/create-peer-info') const TestDHT = require('../utils/test-dht') +const toBuffer = require('../utils/to-buffer') describe('rpc', () => { let peerInfos @@ -38,35 +40,20 @@ describe('rpc', () => { defer.resolve() } - const data = [] - await pipe( + const source = await pipe( [msg.serialize()], lp.encode(), - async source => { - for await (const chunk of source) { - data.push(chunk.slice()) - } - } + collect ) const duplexStream = { - source: function * () { - const array = data - - while (array.length) { - yield array.shift() - } - }, + source, sink: async (source) => { - const res = [] - await pipe( + const res = await pipe( source, lp.decode(), - async source => { - for await (const chunk of source) { - res.push(chunk.slice()) - } - } + toBuffer, // Ensure we have buffers here for validateMessage to consume + collect ) validateMessage(res) } diff --git a/test/utils/index.js b/test/utils/index.js index 6d9b6f00..1350af3e 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -3,7 +3,7 @@ const delay = require('delay') const pRetry = require('p-retry') const pTimeout = require('p-timeout') -const DuplexPair = require('it-pair/duplex') +const duplexPair = require('it-pair/duplex') const { sortClosestPeers } = require('../../src/utils') @@ -35,7 +35,7 @@ const createMockRegistrar = (registrarRecord) => ({ exports.createMockRegistrar = createMockRegistrar const ConnectionPair = () => { - const [d0, d1] = DuplexPair() + const [d0, d1] = duplexPair() return [ { diff --git a/test/utils/to-buffer.js b/test/utils/to-buffer.js new file mode 100644 index 00000000..7865c0ba --- /dev/null +++ b/test/utils/to-buffer.js @@ -0,0 +1,15 @@ +'use strict' +/** + * Converts BufferList messages to Buffers + * @param {*} source + * @returns {AsyncGenerator} + */ +const toBuffer = (source) => { + return (async function * () { + for await (const chunk of source) { + yield Buffer.isBuffer(chunk) ? chunk : chunk.slice() + } + })() +} + +module.exports = toBuffer From a0a850093ff0422530773dc83b6d5e9a0518ff11 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 22 Nov 2019 16:26:41 -0600 Subject: [PATCH 3/3] fix: use libp2p dialer --- src/network.js | 13 +++++++++---- test/network.spec.js | 23 ++++++++++++----------- test/utils/test-dht.js | 14 ++++++++------ 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/src/network.js b/src/network.js index a76b7965..b23efb3e 100644 --- a/src/network.js +++ b/src/network.js @@ -117,9 +117,11 @@ class Network { throw errcode(new Error('Network is offline'), 'ERR_NETWORK_OFFLINE') } - this._log('sending to: %s', to.toB58String()) + const id = to.toB58String() + this._log('sending to: %s', id) - const { stream } = await this.dht.dialer.dialProtocol(to, c.PROTOCOL_DHT) + const conn = await this.dht.dialer.connectToPeer(to) + const { stream } = await conn.newStream(c.PROTOCOL_DHT) return this._writeReadMessage(stream, msg.serialize()) } @@ -136,9 +138,12 @@ class Network { throw errcode(new Error('Network is offline'), 'ERR_NETWORK_OFFLINE') } - this._log('sending to: %s', to.toB58String()) + const id = to.toB58String() + this._log('sending to: %s', id) + + const conn = await this.dht.dialer.connectToPeer(to) + const { stream } = await conn.newStream(c.PROTOCOL_DHT) - const { stream } = await this.dht.dialer.dialProtocol(to, c.PROTOCOL_DHT) return this._writeMessage(stream, msg.serialize()) } diff --git a/test/network.spec.js b/test/network.spec.js index 39cafc17..8c0a9fb5 100644 --- a/test/network.spec.js +++ b/test/network.spec.js @@ -32,11 +32,11 @@ describe('Network', () => { const msg = new Message(Message.TYPES.PING, Buffer.from('hello'), 0) // mock dial - dht.dialer.dialProtocol = (peer, protocol) => { - expect(protocol).to.eql('/ipfs/kad/1.0.0') - + dht.dialer.connectToPeer = () => { return { - stream: pair() // {source, sink} streams that are internally connected + newStream: () => { + return { stream: pair() } // {source, sink} streams that are internally connected + } } } @@ -56,8 +56,7 @@ describe('Network', () => { const msg = new Message(Message.TYPES.PING, Buffer.from('hello'), 0) // mock it - dht.dialer.dialProtocol = async (peer, protocol) => { - expect(protocol).to.eql('/ipfs/kad/1.0.0') + dht.dialer.connectToPeer = async () => { const msg = new Message(Message.TYPES.FIND_NODE, Buffer.from('world'), 0) const data = [] @@ -95,7 +94,9 @@ describe('Network', () => { } return { - stream: { source, sink } + newStream: () => { + return { stream: { source, sink } } + } } } @@ -119,9 +120,7 @@ describe('Network', () => { const msg = new Message(Message.TYPES.PING, Buffer.from('hello'), 0) // mock it - dht.dialer.dialProtocol = (peer, protocol) => { - expect(protocol).to.eql('/ipfs/kad/1.0.0') - + dht.dialer.connectToPeer = () => { const source = (async function * () { // eslint-disable-line require-yield await delay(1000) })() @@ -142,7 +141,9 @@ describe('Network', () => { } return { - stream: { source, sink } + newStream: () => { + return { stream: { source, sink } } + } } } diff --git a/test/utils/test-dht.js b/test/utils/test-dht.js index ad9a2b86..0acf3ec6 100644 --- a/test/utils/test-dht.js +++ b/test/utils/test-dht.js @@ -27,6 +27,7 @@ class TestDHT { async _spawnOne (index, options = {}) { const regRecord = {} + const peerStore = new PeerBook() // Disable random walk by default for more controlled testing options = { @@ -41,7 +42,7 @@ class TestDHT { p.multiaddrs.add(`/ip4/127.0.0.1/tcp/${port}/p2p/${p.id.toB58String()}`) - const dial = async (peer, protocol) => { + const connectToPeer = async (peer) => { const remotePeerB58 = peer.toB58String() const remoteDht = this.nodes.find( (node) => node.peerInfo.id.toB58String() === remotePeerB58 @@ -58,7 +59,7 @@ class TestDHT { await remoteOnConnect(p, c0) await remoteHandler({ - protocol: protocol, + protocol: PROTOCOL_DHT, stream: c0.stream, connection: { remotePeer: p.id @@ -66,17 +67,18 @@ class TestDHT { }) return { - stream: c1.stream + newStream: () => { + return { stream: c1.stream } + } } } const dht = new KadDHT({ dialer: { - dial, - dialProtocol: dial + connectToPeer }, registrar: createMockRegistrar(regRecord), - peerStore: new PeerBook(), + peerStore, peerInfo: p, validators: { v: {