Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

fix: drop connection when stream ends unexpectedly #262

Merged
merged 2 commits into from
May 31, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 103 additions & 57 deletions src/dial.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,53 @@ const log = debug('libp2p:switch:dial')

const getPeerInfo = require('./get-peer-info')
const observeConnection = require('./observe-connection')
const UNEXPECTED_END = 'Unexpected end of input from reader.'

/**
* Uses the given MultistreamDialer to select the protocol matching the given key
*
* A helper method to catch errors from pull streams ending unexpectedly
* Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged.
*
* @param {MultistreamDialer} msDialer a multistream.Dialer
* @param {string} key The key type to select
* @param {function(Error)} callback Used for standard async flow
* @param {function(Error)} abort A callback to be used for ending the connection outright
* @returns {void}
*/
function selectSafe (msDialer, key, callback, abort) {
msDialer.select(key, (err, conn) => {
if (err === true) {
return abort(new Error(UNEXPECTED_END))
}

callback(err, conn)
})
}

/**
* Uses the given MultistreamDialer to handle the given connection
*
* A helper method to catch errors from pull streams ending unexpectedly
* Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged
*
* @param {MultistreamDialer} msDialer
* @param {Connection} connection The connection to handle
* @param {function(Error)} callback Used for standard async flow
* @param {function(Error)} abort A callback to be used for ending the connection outright
* @returns {void}
*/
function handleSafe (msDialer, connection, callback, abort) {
msDialer.handle(connection, (err) => {
// Repackage errors from pull-streams ending unexpectedly.
// Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged.
if (err === true) {
return abort(new Error(UNEXPECTED_END))
}

callback(err)
})
}

/**
* Manages dialing to another peer, including muxer upgrades
Expand Down Expand Up @@ -55,6 +102,11 @@ class Dialer {
cb(null)
}
], (err, connection) => {
if ((err && err.message === UNEXPECTED_END) || err === true) {
log('Connection dropped for %s', this.peerInfo.id.toB58String())
return this.callback(null, null)
}

this.callback(err, connection)
})

Expand Down Expand Up @@ -159,12 +211,7 @@ class Dialer {
}

connection.setPeerInfo(this.peerInfo)

waterfall([
(cb) => {
this._attemptMuxerUpgrade(connection, b58Id, cb)
}
], (err, muxer) => {
this._attemptMuxerUpgrade(connection, b58Id, (err, muxer) => {
if (err && !this.protocol) {
this.switch.conns[b58Id] = connection
return callback(null, null)
Expand All @@ -176,7 +223,7 @@ class Dialer {
}

callback(null, muxer)
})
}, callback)
}

/**
Expand All @@ -188,60 +235,62 @@ class Dialer {
* @param {Connection} connection
* @param {string} b58Id
* @param {function(Error, Connection)} callback
* @param {function(Error, Connection)} abort A callback to be used for ending the connection outright
* @returns {void}
*/
_attemptMuxerUpgrade (connection, b58Id, callback) {
_attemptMuxerUpgrade (connection, b58Id, callback, abort) {
const muxers = Object.keys(this.switch.muxers)

if (muxers.length === 0) {
return callback(new Error('no muxers available'))
}

// 1. try to handshake in one of the muxers available
// 2. if succeeds
// - add the muxedConn to the list of muxedConns
// - add incomming new streams to connHandler
const nextMuxer = (key) => {
log('selecting %s', key)
msDialer.select(key, (err, conn) => {
if (err) {
if (muxers.length === 0) {
return callback(new Error('could not upgrade to stream muxing'))
}
const msDialer = new multistream.Dialer()
handleSafe(msDialer, connection, (err) => {
if (err) {
return callback(new Error('multistream not supported'))
}

return nextMuxer(muxers.shift())
}
// 1. try to handshake in one of the muxers available
// 2. if succeeds
// - add the muxedConn to the list of muxedConns
// - add incomming new streams to connHandler
const nextMuxer = (key) => {
log('selecting %s', key)
selectSafe(msDialer, key, (err, conn) => {
if (err) {
if (muxers.length === 0) {
return callback(new Error('could not upgrade to stream muxing'))
}

const muxedConn = this.switch.muxers[key].dialer(conn)
this.switch.muxedConns[b58Id] = {}
this.switch.muxedConns[b58Id].muxer = muxedConn
return nextMuxer(muxers.shift())
}

muxedConn.once('close', () => {
delete this.switch.muxedConns[b58Id]
this.peerInfo.disconnect()
this.switch._peerInfo.disconnect()
setImmediate(() => this.switch.emit('peer-mux-closed', this.peerInfo))
})
const muxedConn = this.switch.muxers[key].dialer(conn)
this.switch.muxedConns[b58Id] = {}
this.switch.muxedConns[b58Id].muxer = muxedConn

// For incoming streams, in case identify is on
muxedConn.on('stream', (conn) => {
conn.setPeerInfo(this.peerInfo)
this.switch.protocolMuxer(null)(conn)
})
muxedConn.once('close', () => {
delete this.switch.muxedConns[b58Id]
this.peerInfo.disconnect()
this.switch._peerInfo.disconnect()
setImmediate(() => this.switch.emit('peer-mux-closed', this.peerInfo))
})

setImmediate(() => this.switch.emit('peer-mux-established', this.peerInfo))
// For incoming streams, in case identify is on
muxedConn.on('stream', (conn) => {
conn.setPeerInfo(this.peerInfo)
this.switch.protocolMuxer(null)(conn)
})

callback(null, muxedConn)
})
}
setImmediate(() => this.switch.emit('peer-mux-established', this.peerInfo))

const msDialer = new multistream.Dialer()
msDialer.handle(connection, (err) => {
if (err) {
return callback(new Error('multistream not supported'))
callback(null, muxedConn)
}, abort)
}

nextMuxer(muxers.shift())
})
}, abort)
}

/**
Expand Down Expand Up @@ -307,16 +356,15 @@ class Dialer {
*/
_encryptConnection (connection, callback) {
const msDialer = new multistream.Dialer()

msDialer.handle(connection, (err) => {
handleSafe(msDialer, connection, (err) => {
if (err) {
return callback(err)
}

const myId = this.switch._peerInfo.id
log('selecting crypto: %s', this.switch.crypto.tag)

msDialer.select(this.switch.crypto.tag, (err, _conn) => {
selectSafe(msDialer, this.switch.crypto.tag, (err, _conn) => {
if (err) {
return callback(err)
}
Expand All @@ -331,8 +379,8 @@ class Dialer {
encryptedConnection.setPeerInfo(this.peerInfo)
callback(null, encryptedConnection)
})
})
})
}, callback)
}, callback)
}

/**
Expand All @@ -350,17 +398,15 @@ class Dialer {
}

const msDialer = new multistream.Dialer()
msDialer.handle(connection, (err) => {
handleSafe(msDialer, connection, (err) => {
if (err) {
return callback(err)
}
msDialer.select(this.protocol, (err, conn) => {
if (err) {
return callback(err)
}
callback(null, conn)
})
})

selectSafe(msDialer, this.protocol, (err, conn) => {
callback(err, conn)
}, callback)
}, callback)
}
}

Expand Down