diff --git a/src/connection/index.js b/src/connection/index.js index d43743398f..1bfe13990d 100644 --- a/src/connection/index.js +++ b/src/connection/index.js @@ -95,7 +95,8 @@ class ConnectionFSM extends BaseConnection { UPGRADING: { // Attempting to upgrade the connection with muxers stop: 'CONNECTED', // If we cannot mux, stop upgrading done: 'MUXED', - error: 'ERRORED' + error: 'ERRORED', + disconnect: 'DISCONNECTING' }, MUXED: { disconnect: 'DISCONNECTING' diff --git a/src/dialer/index.js b/src/dialer/index.js index 8464d8eabc..00675a5fbb 100644 --- a/src/dialer/index.js +++ b/src/dialer/index.js @@ -12,7 +12,8 @@ const { module.exports = function (_switch) { const dialQueueManager = new DialQueueManager(_switch) - _switch.state.on('STOPPING:enter', abort) + _switch.state.on('STARTED:enter', start) + _switch.state.on('STOPPING:enter', stop) /** * @param {DialRequest} dialRequest @@ -34,14 +35,24 @@ module.exports = function (_switch) { dialQueueManager.add({ peerInfo, protocol, useFSM, callback }) } + /** + * Starts the `DialQueueManager` + * + * @param {function} callback + */ + function start (callback) { + dialQueueManager.start() + callback() + } + /** * Aborts all dials that are queued. This should * only be used when the Switch is being stopped * * @param {function} callback */ - function abort (callback) { - dialQueueManager.abort() + function stop (callback) { + dialQueueManager.stop() callback() } @@ -77,7 +88,6 @@ module.exports = function (_switch) { return { dial, dialFSM, - abort, clearBlacklist, BLACK_LIST_ATTEMPTS: isNaN(_switch._options.blackListAttempts) ? BLACK_LIST_ATTEMPTS : _switch._options.blackListAttempts, BLACK_LIST_TTL: isNaN(_switch._options.blacklistTTL) ? BLACK_LIST_TTL : _switch._options.blacklistTTL, diff --git a/src/dialer/queueManager.js b/src/dialer/queueManager.js index f9508507f1..6199e0c972 100644 --- a/src/dialer/queueManager.js +++ b/src/dialer/queueManager.js @@ -22,6 +22,7 @@ class DialQueueManager { this._queues = {} this.switch = _switch this._cleanInterval = retimer(this._clean.bind(this), QUARTER_HOUR) + this.start() } /** @@ -66,13 +67,21 @@ class DialQueueManager { this._cleanInterval.reschedule(QUARTER_HOUR) } + /** + * Allows the `DialQueueManager` to execute dials + */ + start () { + this.isRunning = true + } + /** * Iterates over all items in the DialerQueue * and executes there callback with an error. * * This causes the entire DialerQueue to be drained */ - abort () { + stop () { + this.isRunning = false // Clear the general queue this._queue.clear() // Clear the cold call queue @@ -140,6 +149,8 @@ class DialQueueManager { * Will execute up to `MAX_PARALLEL_DIALS` dials */ run () { + if (!this.isRunning) return + if (this._dialingQueues.size < this.switch.dialer.MAX_PARALLEL_DIALS) { let nextQueue = { done: true } // Check the queue first and fall back to the cold call queue diff --git a/src/index.js b/src/index.js index ceda303380..b32f8fdf3d 100644 --- a/src/index.js +++ b/src/index.js @@ -232,7 +232,8 @@ class Switch extends EventEmitter { }, (err) => { if (err) { log.error(err) - return this.emit('error', err) + this.emit('error', err) + return this.state('stop') } this.state('done') }) @@ -250,7 +251,10 @@ class Switch extends EventEmitter { (cb) => { each(this.transports, (transport, cb) => { each(transport.listeners, (listener, cb) => { - listener.close(cb) + listener.close((err) => { + if (err) log.error(err) + cb() + }) }, cb) }, cb) }, diff --git a/test/circuit-relay.node.js b/test/circuit-relay.node.js index 2806700d66..abd52117b5 100644 --- a/test/circuit-relay.node.js +++ b/test/circuit-relay.node.js @@ -24,124 +24,126 @@ const switchOptions = { } describe(`circuit`, function () { - let swarmA // TCP and WS - let swarmB // WS - let swarmC // no transports - let dialSpyA + describe('basic', () => { + let swarmA // TCP and WS + let swarmB // WS + let swarmC // no transports + let dialSpyA - before((done) => createInfos(3, (err, infos) => { - expect(err).to.not.exist() + before((done) => createInfos(3, (err, infos) => { + expect(err).to.not.exist() - const peerA = infos[0] - const peerB = infos[1] - const peerC = infos[2] + const peerA = infos[0] + const peerB = infos[1] + const peerC = infos[2] - peerA.multiaddrs.add('/ip4/0.0.0.0/tcp/9001') - peerB.multiaddrs.add('/ip4/127.0.0.1/tcp/9002/ws') + peerA.multiaddrs.add('/ip4/0.0.0.0/tcp/9001') + peerB.multiaddrs.add('/ip4/127.0.0.1/tcp/9002/ws') - swarmA = new Swarm(peerA, new PeerBook(), switchOptions) - swarmB = new Swarm(peerB, new PeerBook()) - swarmC = new Swarm(peerC, new PeerBook()) + swarmA = new Swarm(peerA, new PeerBook(), switchOptions) + swarmB = new Swarm(peerB, new PeerBook()) + swarmC = new Swarm(peerC, new PeerBook()) - swarmA.transport.add('tcp', new TCP()) - swarmA.transport.add('ws', new WS()) - swarmB.transport.add('ws', new WS()) + swarmA.transport.add('tcp', new TCP()) + swarmA.transport.add('ws', new WS()) + swarmB.transport.add('ws', new WS()) - dialSpyA = sinon.spy(swarmA.transport, 'dial') + dialSpyA = sinon.spy(swarmA.transport, 'dial') - done() - })) + done() + })) - after((done) => { - parallel([ - (cb) => swarmA.stop(cb), - (cb) => swarmB.stop(cb) - ], done) - }) + after((done) => { + parallel([ + (cb) => swarmA.stop(cb), + (cb) => swarmB.stop(cb) + ], done) + }) - it('circuit not enabled and all transports failed', (done) => { - swarmA.dial(swarmC._peerInfo, (err, conn) => { - expect(err).to.exist() - expect(err).to.match(/Circuit not enabled and all transports failed to dial peer/) - expect(conn).to.not.exist() - done() + it('circuit not enabled and all transports failed', (done) => { + swarmA.dial(swarmC._peerInfo, (err, conn) => { + expect(err).to.exist() + expect(err).to.match(/Circuit not enabled and all transports failed to dial peer/) + expect(conn).to.not.exist() + done() + }) }) - }) - it('.enableCircuitRelay', () => { - swarmA.connection.enableCircuitRelay({ enabled: true }) - expect(Object.keys(swarmA.transports).length).to.equal(3) + it('.enableCircuitRelay', () => { + swarmA.connection.enableCircuitRelay({ enabled: true }) + expect(Object.keys(swarmA.transports).length).to.equal(3) - swarmB.connection.enableCircuitRelay({ enabled: true }) - expect(Object.keys(swarmB.transports).length).to.equal(2) - }) + swarmB.connection.enableCircuitRelay({ enabled: true }) + expect(Object.keys(swarmB.transports).length).to.equal(2) + }) - it('listed on the transports map', () => { - expect(swarmA.transports.Circuit).to.exist() - expect(swarmB.transports.Circuit).to.exist() - }) + it('listed on the transports map', () => { + expect(swarmA.transports.Circuit).to.exist() + expect(swarmB.transports.Circuit).to.exist() + }) - it('add /p2p-circuit addrs on start', (done) => { - parallel([ - (cb) => swarmA.start(cb), - (cb) => swarmB.start(cb) - ], (err) => { - expect(err).to.not.exist() - expect(swarmA._peerInfo.multiaddrs.toArray().filter((a) => a.toString() - .includes(`/p2p-circuit`)).length).to.be.at.least(3) - // ensure swarmA has had 0.0.0.0 replaced in the addresses - expect(swarmA._peerInfo.multiaddrs.toArray().filter((a) => a.toString() - .includes(`/0.0.0.0`)).length).to.equal(0) - expect(swarmB._peerInfo.multiaddrs.toArray().filter((a) => a.toString() - .includes(`/p2p-circuit`)).length).to.be.at.least(2) - done() + it('add /p2p-circuit addrs on start', (done) => { + parallel([ + (cb) => swarmA.start(cb), + (cb) => swarmB.start(cb) + ], (err) => { + expect(err).to.not.exist() + expect(swarmA._peerInfo.multiaddrs.toArray().filter((a) => a.toString() + .includes(`/p2p-circuit`)).length).to.be.at.least(3) + // ensure swarmA has had 0.0.0.0 replaced in the addresses + expect(swarmA._peerInfo.multiaddrs.toArray().filter((a) => a.toString() + .includes(`/0.0.0.0`)).length).to.equal(0) + expect(swarmB._peerInfo.multiaddrs.toArray().filter((a) => a.toString() + .includes(`/p2p-circuit`)).length).to.be.at.least(2) + done() + }) }) - }) - it('dial circuit only once', (done) => { - swarmA._peerInfo.multiaddrs.clear() - swarmA._peerInfo.multiaddrs - .add(`/dns4/wrtc-star.discovery.libp2p.io/tcp/443/wss/p2p-webrtc-star`) + it('dial circuit only once', (done) => { + swarmA._peerInfo.multiaddrs.clear() + swarmA._peerInfo.multiaddrs + .add(`/dns4/wrtc-star.discovery.libp2p.io/tcp/443/wss/p2p-webrtc-star`) - swarmA.dial(swarmC._peerInfo, (err, conn) => { - expect(err).to.exist() - expect(err).to.match(/No available transports to dial peer/) - expect(conn).to.not.exist() - expect(dialSpyA.callCount).to.be.eql(1) - done() + swarmA.dial(swarmC._peerInfo, (err, conn) => { + expect(err).to.exist() + expect(err).to.match(/No available transports to dial peer/) + expect(conn).to.not.exist() + expect(dialSpyA.callCount).to.be.eql(1) + done() + }) }) - }) - it('dial circuit last', (done) => { - const peerC = swarmC._peerInfo - peerC.multiaddrs.clear() - peerC.multiaddrs.add(`/p2p-circuit/ipfs/ABCD`) - peerC.multiaddrs.add(`/ip4/127.0.0.1/tcp/9998/ipfs/ABCD`) - peerC.multiaddrs.add(`/ip4/127.0.0.1/tcp/9999/ws/ipfs/ABCD`) - - swarmA.dial(peerC, (err, conn) => { - expect(err).to.exist() - expect(conn).to.not.exist() - expect(dialSpyA.lastCall.args[0]).to.be.eql('Circuit') - done() + it('dial circuit last', (done) => { + const peerC = swarmC._peerInfo + peerC.multiaddrs.clear() + peerC.multiaddrs.add(`/p2p-circuit/ipfs/ABCD`) + peerC.multiaddrs.add(`/ip4/127.0.0.1/tcp/9998/ipfs/ABCD`) + peerC.multiaddrs.add(`/ip4/127.0.0.1/tcp/9999/ws/ipfs/ABCD`) + + swarmA.dial(peerC, (err, conn) => { + expect(err).to.exist() + expect(conn).to.not.exist() + expect(dialSpyA.lastCall.args[0]).to.be.eql('Circuit') + done() + }) }) - }) - it('should not try circuit if no transports enabled', (done) => { - swarmC.dial(swarmA._peerInfo, (err, conn) => { - expect(err).to.exist() - expect(conn).to.not.exist() + it('should not try circuit if no transports enabled', (done) => { + swarmC.dial(swarmA._peerInfo, (err, conn) => { + expect(err).to.exist() + expect(conn).to.not.exist() - expect(err).to.match(/No transports registered, dial not possible/) - done() + expect(err).to.match(/No transports registered, dial not possible/) + done() + }) }) - }) - it('should not dial circuit if other transport succeed', (done) => { - swarmA.dial(swarmB._peerInfo, (err) => { - expect(err).not.to.exist() - expect(dialSpyA.lastCall.args[0]).to.not.be.eql('Circuit') - done() + it('should not dial circuit if other transport succeed', (done) => { + swarmA.dial(swarmB._peerInfo, (err) => { + expect(err).not.to.exist() + expect(dialSpyA.lastCall.args[0]).to.not.be.eql('Circuit') + done() + }) }) }) diff --git a/test/dial-fsm.node.js b/test/dial-fsm.node.js index 7e71b29cbb..e7ddd653e7 100644 --- a/test/dial-fsm.node.js +++ b/test/dial-fsm.node.js @@ -7,6 +7,7 @@ const dirtyChai = require('dirty-chai') const expect = chai.expect chai.use(require('chai-checkmark')) chai.use(dirtyChai) +const sinon = require('sinon') const PeerBook = require('peer-book') const parallel = require('async/parallel') const WS = require('libp2p-websockets') @@ -348,16 +349,24 @@ describe('dialFSM', () => { switchA.dialFSM(switchB._peerInfo, '/abort-queue/1.0.0', (err, connFSM) => { expect(err).to.not.exist() - connFSM._state.on('UPGRADING:enter', (cb) => { - expect(2).checks(done) + // 2 conn aborts, 1 close, and 1 stop + expect(4).checks(done) + + connFSM.once('close', (err) => { + expect(err).to.not.exist().mark() + }) + + sinon.stub(connFSM, '_onUpgrading').callsFake(() => { switchA.dialFSM(switchB._peerInfo, '/abort-queue/1.0.0', (err) => { - expect(err).to.exist().mark() + expect(err.code).to.eql('DIAL_ABORTED').mark() }) switchA.dialFSM(switchB._peerInfo, '/abort-queue/1.0.0', (err) => { - expect(err).to.exist().mark() + expect(err.code).to.eql('DIAL_ABORTED').mark() }) - switchA.stop(cb) + switchA.stop((err) => { + expect(err).to.not.exist().mark() + }) }) }) })