Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

fix: improve connection tracking #318

Merged
merged 2 commits into from
Apr 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/connection/incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ class IncomingConnectionFSM extends BaseConnection {
this.emit('muxed', this.conn)
})
this._state.on('DISCONNECTING', () => {
if (this.theirPeerInfo) {
this.theirPeerInfo.disconnect()
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was unnecessary. Incoming connections don't cause the connect to happen until after the are muxed, which will now be handled by the switch.connection.add() logic.

this._state('done')
})
}
Expand Down
13 changes: 3 additions & 10 deletions src/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,6 @@ class ConnectionFSM extends BaseConnection {
_onDisconnecting () {
this.log('disconnecting from %s', this.theirB58Id, Boolean(this.muxer))

// Issue disconnects on both Peers
if (this.theirPeerInfo) {
this.theirPeerInfo.disconnect()
}

this.switch.connection.remove(this)

delete this.switch.conns[this.theirB58Id]
Expand All @@ -284,7 +279,6 @@ class ConnectionFSM extends BaseConnection {
tasks.push((cb) => {
this.muxer.end(() => {
delete this.muxer
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
cb()
})
})
Expand Down Expand Up @@ -325,13 +319,13 @@ class ConnectionFSM extends BaseConnection {
return this.close(maybeUnexpectedEnd(err))
}

const conn = observeConnection(null, this.switch.crypto.tag, _conn, this.switch.observer)

this.conn = this.switch.crypto.encrypt(this.ourPeerInfo.id, conn, this.theirPeerInfo.id, (err) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found an edge case here, where if the connection was terminated while encryption was being negotiated, this.conn would get resolved and would later throw an error when secio went to set the inner connection. This avoids that.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

const observedConn = observeConnection(null, this.switch.crypto.tag, _conn, this.switch.observer)
const encryptedConn = this.switch.crypto.encrypt(this.ourPeerInfo.id, observedConn, this.theirPeerInfo.id, (err) => {
if (err) {
return this.close(err)
}

this.conn = encryptedConn
this.conn.setPeerInfo(this.theirPeerInfo)
this._state('done')
})
Expand Down Expand Up @@ -392,7 +386,6 @@ class ConnectionFSM extends BaseConnection {
this.switch.protocolMuxer(null)(conn)
})

this.switch.emit('peer-mux-established', this.theirPeerInfo)
this._didUpgrade(null)

// Run identify on the connection
Expand Down
44 changes: 29 additions & 15 deletions src/connection/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class ConnectionManager {
// Only add it if it's not there
if (!this.get(connection)) {
this.connections[connection.theirB58Id].push(connection)
this.switch.emit('peer-mux-established', connection.theirPeerInfo)
}
}

Expand Down Expand Up @@ -78,14 +79,26 @@ class ConnectionManager {
* @returns {void}
*/
remove (connection) {
if (!this.connections[connection.theirB58Id]) return
// No record of the peer, disconnect it
if (!this.connections[connection.theirB58Id]) {
connection.theirPeerInfo.disconnect()
this.switch.emit('peer-mux-closed', connection.theirPeerInfo)
return
}

for (let i = 0; i < this.connections[connection.theirB58Id].length; i++) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like the code in this class could be simpler with Set and Map - suggestion for a future PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, a Map would allow for handling the two dimensional array pretty cleanly.

if (this.connections[connection.theirB58Id][i] === connection) {
this.connections[connection.theirB58Id].splice(i, 1)
return
break
}
}

// The peer is fully disconnected
if (this.connections[connection.theirB58Id].length === 0) {
delete this.connections[connection.theirB58Id]
connection.theirPeerInfo.disconnect()
this.switch.emit('peer-mux-closed', connection.theirPeerInfo)
}
}

/**
Expand Down Expand Up @@ -175,6 +188,7 @@ class ConnectionManager {
return log('identify not successful')
}
const b58Str = peerInfo.id.toB58String()
peerInfo = this.switch._peerBook.put(peerInfo)

const connection = new ConnectionFSM({
_switch: this.switch,
Expand All @@ -185,24 +199,24 @@ class ConnectionManager {
})
this.switch.connection.add(connection)

if (peerInfo.multiaddrs.size > 0) {
// with incomming conn and through identify, going to pick one
// of the available multiaddrs from the other peer as the one
// I'm connected to as we really can't be sure at the moment
// TODO add this consideration to the connection abstraction!
peerInfo.connect(peerInfo.multiaddrs.toArray()[0])
} else {
// for the case of websockets in the browser, where peers have
// no addr, use just their IPFS id
peerInfo.connect(`/ipfs/${b58Str}`)
// Only update if it's not already connected
if (!peerInfo.isConnected()) {
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
if (peerInfo.multiaddrs.size > 0) {
// with incomming conn and through identify, going to pick one
// of the available multiaddrs from the other peer as the one
// I'm connected to as we really can't be sure at the moment
// TODO add this consideration to the connection abstraction!
peerInfo.connect(peerInfo.multiaddrs.toArray()[0])
} else {
// for the case of websockets in the browser, where peers have
// no addr, use just their IPFS id
peerInfo.connect(`/ipfs/${b58Str}`)
}
}
peerInfo = this.switch._peerBook.put(peerInfo)

muxedConn.once('close', () => {
connection.close()
})

this.switch.emit('peer-mux-established', peerInfo)
})
})
}
Expand Down
11 changes: 4 additions & 7 deletions src/dialer/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

const ConnectionFSM = require('../connection')
const { DIAL_ABORTED, ERR_BLACKLISTED } = require('../errors')
const Connection = require('interface-connection').Connection
const nextTick = require('async/nextTick')
const once = require('once')
const debug = require('debug')
Expand Down Expand Up @@ -45,10 +44,8 @@ function createConnectionWithProtocol ({ protocol, connection, callback }) {
return callback(err)
}

const proxyConnection = new Connection()
proxyConnection.setPeerInfo(connection.theirPeerInfo)
proxyConnection.setInnerConn(conn)
callback(null, proxyConnection)
conn.setPeerInfo(connection.theirPeerInfo)
callback(null, conn)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proxyConnection was superfluous here, so I got rid of it.

})
}

Expand Down Expand Up @@ -192,6 +189,8 @@ class Queue {
conn: null
})

this.switch.connection.add(connectionFSM)

// Add control events and start the dialer
connectionFSM.once('connected', () => connectionFSM.protect())
connectionFSM.once('private', () => connectionFSM.encrypt())
Expand Down Expand Up @@ -252,15 +251,13 @@ class Queue {
// If we're not muxed yet, add listeners
connectionFSM.once('muxed', () => {
this.blackListCount = 0 // reset blacklisting on good connections
this.switch.connection.add(connectionFSM)
queuedDial.connection = connectionFSM
createConnectionWithProtocol(queuedDial)
next()
})

connectionFSM.once('unmuxed', () => {
this.blackListCount = 0
this.switch.connection.add(connectionFSM)
queuedDial.connection = connectionFSM
createConnectionWithProtocol(queuedDial)
next()
Expand Down
1 change: 0 additions & 1 deletion src/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ class TransportManager {
}

peerInfo.connect(success.multiaddr)
this.switch._peerBook.put(peerInfo)
callback(null, success.conn)
})
}
Expand Down
8 changes: 4 additions & 4 deletions test/dial-fsm.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ describe('dialFSM', () => {

// Expect 4 `peer-mux-established` events
expect(4).checks(() => {
// Expect 4 `peer-mux-closed`, plus 1 hangup
expect(5).checks(() => {
// Expect 2 `peer-mux-closed`, plus 1 hangup
expect(3).checks(() => {
switchA.removeAllListeners('peer-mux-closed')
switchB.removeAllListeners('peer-mux-closed')
switchA.removeAllListeners('peer-mux-established')
Expand Down Expand Up @@ -286,8 +286,8 @@ describe('dialFSM', () => {
switchA.handle(protocol, (_, conn) => { pull(conn, conn) })
switchB.handle(protocol, (_, conn) => { pull(conn, conn) })

// 4 close checks and 1 hangup check
expect(5).checks(() => {
// 2 close checks and 1 hangup check
expect(2).checks(() => {
switchA.removeAllListeners('peer-mux-closed')
switchB.removeAllListeners('peer-mux-closed')
// restart the node for subsequent tests
Expand Down