Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

[WIP] feat: add support for Identify Push protocol #338

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
"hashlru": "^2.3.0",
"interface-connection": "~0.3.3",
"libp2p-circuit": "~0.3.6",
"libp2p-identify": "~0.7.6",
"libp2p-identify": "libp2p/js-libp2p-identify#feat/identify-push",
"moving-average": "^1.0.0",
"multiaddr": "^6.0.6",
"multistream-select": "~0.14.4",
Expand Down
27 changes: 14 additions & 13 deletions src/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ const withIs = require('class-is')
const BaseConnection = require('./base')
const parallel = require('async/parallel')
const nextTick = require('async/nextTick')
const identify = require('libp2p-identify')
const errCode = require('err-code')
const { msHandle, msSelect, identifyDialer } = require('../utils')
const IdentifyService = require('libp2p-identify')
const { newStream } = require('../utils')

const observeConnection = require('../observe-connection')
const {
Expand Down Expand Up @@ -391,8 +391,10 @@ class ConnectionFSM extends BaseConnection {
if (this.switch.identify) {
this._identify((err, results) => {
if (err) {
this.log('identify errored, closing the connection', err)
return this.close(err)
}
this.log('identify successful')
this.theirPeerInfo = this.switch._peerBook.put(results.peerInfo)
})
}
Expand All @@ -413,18 +415,17 @@ class ConnectionFSM extends BaseConnection {
if (!this.muxer) {
return nextTick(callback, errCode('The connection was already closed', 'ERR_CONNECTION_CLOSED'))
}
this.muxer.newStream(async (err, conn) => {

const stream = this.muxer.newStream()
newStream(stream, IdentifyService.multicodecs.identify, (err, conn) => {
if (err) return callback(err)
const ms = new multistream.Dialer()
let results
try {
await msHandle(ms, conn)
const msConn = await msSelect(ms, identify.multicodec)
results = await identifyDialer(msConn, this.theirPeerInfo)
} catch (err) {
return callback(err)
}
callback(null, results)
this.switch.identifyService.identify(conn, this.theirPeerInfo, (err, peerInfo, observedAddr) => {
if (err) return callback(err)
callback(null, {
peerInfo,
observedAddr
})
})
})
}

Expand Down
62 changes: 32 additions & 30 deletions src/connection/manager.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
/* eslint max-nested-callbacks: ["error", 5] */
'use strict'

const identify = require('libp2p-identify')
const multistream = require('multistream-select')
const IdentifyService = require('libp2p-identify')
const debug = require('debug')
const log = debug('libp2p:switch:conn-manager')
const once = require('once')
const ConnectionFSM = require('../connection')
const { msHandle, msSelect, identifyDialer } = require('../utils')
const { newStream } = require('../utils')

const Circuit = require('libp2p-circuit')

Expand Down Expand Up @@ -155,43 +154,39 @@ class ConnectionManager {
// 1. overload getPeerInfo
// 2. call getPeerInfo
// 3. add this conn to the pool
if (this.switch.identify) {
if (this.switch.identifyService) {
// Get the peer info from the crypto exchange
conn.getPeerInfo((err, cryptoPI) => {
if (err || !cryptoPI) {
log('crypto peerInfo wasnt found')
}

// overload peerInfo to use Identify instead
conn.getPeerInfo = async (callback) => {
const conn = muxedConn.newStream()
const ms = new multistream.Dialer()
callback = once(callback)

let results
try {
await msHandle(ms, conn)
const msConn = await msSelect(ms, identify.multicodec)
results = await identifyDialer(msConn, cryptoPI)
} catch (err) {
return muxedConn.end(() => {
callback(err, null)
})
}
conn.getPeerInfo = (callback) => {
log('running identify')
newStream(muxedConn.newStream(), IdentifyService.multicodecs.identify, (err, stream) => {
if (err) {
return muxedConn.end(() => callback(err))
}

const { peerInfo } = results
this.switch.identifyService.identify(stream, cryptoPI, (err, peerInfo, _observedAddr) => {
if (err) {
return muxedConn.end(() => callback(err))
}

if (peerInfo) {
conn.setPeerInfo(peerInfo)
}
callback(null, peerInfo)
if (peerInfo) {
conn.setPeerInfo(peerInfo)
}
callback(null, peerInfo)
})
})
}

conn.getPeerInfo((err, peerInfo) => {
/* eslint no-warning-comments: off */
if (err) {
return log('identify not successful')
}

const b58Str = peerInfo.id.toB58String()
peerInfo = this.switch._peerBook.put(peerInfo)

Expand Down Expand Up @@ -273,10 +268,17 @@ class ConnectionManager {
* @returns {void}
*/
reuse () {
this.switch.identify = true
this.switch.handle(identify.multicodec, (protocol, conn) => {
identify.listener(conn, this.switch._peerInfo)
})
if (!this.switch.identify) {
this.switch.identify = true
this.switch.identifyService = new IdentifyService({ switch: this.switch })

// Setup all handlers for identify
Object.values(IdentifyService.multicodecs).forEach(protocol => {
this.switch.handle(protocol, (protocol, connection) => {
this.switch.identifyService.handleMessage(protocol, connection)
})
})
}
}
}

Expand Down
22 changes: 22 additions & 0 deletions src/dialer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,33 @@ module.exports = function (_switch) {
_dial({ peerInfo, protocol, options: { useFSM: true, priority: PRIORITY_HIGH }, callback })
}

/**
* Creates a new stream to the given `peerInfo`. If a muxed connection
* does not exist to the peer, an error will be passed to the `callback`.
* @param {PeerInfo} peerInfo
* @param {String} protocol The protocol to handshake for the new stream
* @param {function(Error, Connection)} callback
* @returns {void}
*/
function newStream (peerInfo, protocol, callback) {
if (!protocol) {
return callback(new Error('a protocol must be provided to create a new stream'))
}

const connection = _switch.connection.getOne(peerInfo.id.toB58String())
if (!connection) {
return callback(new Error('no muxed connection to create stream from'))
}

connection.shake(protocol, callback)
}

return {
connect,
dial,
dialFSM,
clearBlacklist,
newStream,
BLACK_LIST_ATTEMPTS: isNaN(_switch._options.blackListAttempts) ? BLACK_LIST_ATTEMPTS : _switch._options.blackListAttempts,
BLACK_LIST_TTL: isNaN(_switch._options.blacklistTTL) ? BLACK_LIST_TTL : _switch._options.blacklistTTL,
MAX_COLD_CALLS: isNaN(_switch._options.maxColdCalls) ? MAX_COLD_CALLS : _switch._options.maxColdCalls,
Expand Down
6 changes: 6 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class Switch extends EventEmitter {
// is the Identify protocol enabled?
this.identify = false

this.identifyService = null

// Crypto details
this.crypto = plaintext

Expand Down Expand Up @@ -115,6 +117,10 @@ class Switch extends EventEmitter {
this.dialFSM = this.dialer.dialFSM
}

get peerInfo () {
return this._peerInfo
}

/**
* Returns a list of the transports peerInfo has addresses for
*
Expand Down
2 changes: 1 addition & 1 deletion src/protocol-muxer.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ module.exports = function protocolMuxer (protocols, observer) {
const handlerFunc = protocol && protocol.handlerFunc
if (handlerFunc) {
const conn = observeConn(null, protocolName, _conn, observer)
handlerFunc(protocol, conn)
handlerFunc(protocolName, conn)
}
}
}
Expand Down
24 changes: 13 additions & 11 deletions src/utils.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const Identify = require('libp2p-identify')
const Multistream = require('multistream-select')

/**
* For a given multistream, registers to handle the given connection
Expand Down Expand Up @@ -33,17 +33,19 @@ module.exports.msSelect = (multistream, protocol) => {
}

/**
* Runs identify for the given connection and verifies it against the
* PeerInfo provided
* @param {Connection} connection
* @param {PeerInfo} cryptoPeerInfo The PeerInfo determined during crypto exchange
* @returns {Promise} Resolves {peerInfo, observedAddrs}
* Takes a stream and handshakes on the given `protocol`.
* The stream for that protocol will be returned in the `callback`.
* @param {Connection} connection A connection to create a sub stream on
* @param {string} protocol The protocol to communicate on
* @param {function(Error, Stream)} callback
*/
module.exports.identifyDialer = (connection, cryptoPeerInfo) => {
return new Promise((resolve, reject) => {
Identify.dialer(connection, cryptoPeerInfo, (err, peerInfo, observedAddrs) => {
if (err) return reject(err)
resolve({ peerInfo, observedAddrs })
module.exports.newStream = (connection, protocol, callback) => {
const ms = new Multistream.Dialer()
ms.handle(connection, (err) => {
if (err) return callback(err)
ms.select(protocol, (err, stream) => {
if (err) return callback(err)
callback(null, stream)
})
})
}
Expand Down
4 changes: 2 additions & 2 deletions test/dial-fsm.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,13 @@ describe('dialFSM', () => {
// Verify the dialer knows the receiver's protocols
expect(Array.from(peerB.protocols)).to.eql([
multiplex.multicodec,
identify.multicodec,
...Object.values(identify.multicodecs),
protocol
]).mark()
// Verify the receiver knows the dialer's protocols
expect(Array.from(peerA.protocols)).to.eql([
multiplex.multicodec,
identify.multicodec
...Object.values(identify.multicodecs)
]).mark()

switchA.hangUp(switchB._peerInfo)
Expand Down
8 changes: 4 additions & 4 deletions test/identify.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ describe('Identify', () => {
const peerA = switchB._peerBook.get(switchA._peerInfo.id.toB58String())
expect(Array.from(peerB.protocols)).to.eql([
multiplex.multicodec,
identify.multicodec
...Object.values(identify.multicodecs)
])
expect(Array.from(peerA.protocols)).to.eql([
multiplex.multicodec,
identify.multicodec,
...Object.values(identify.multicodecs),
'/id-test/1.0.0'
])

Expand All @@ -136,15 +136,15 @@ describe('Identify', () => {
})

it('should close connection when identify fails', (done) => {
const stub = sinon.stub(identify, 'listener').callsFake((conn) => {
const stub = sinon.stub(switchA.identifyService, 'handleMessage').callsFake((_protocol, conn) => {
conn.getObservedAddrs((err, observedAddrs) => {
if (err) { return }
observedAddrs = observedAddrs[0]

// pretend to be another peer
let publicKey = switchC._peerInfo.id.pubKey.bytes

const msgSend = identify.message.encode({
const msgSend = identify.Message.encode({
protocolVersion: 'ipfs/0.1.0',
agentVersion: 'na',
publicKey: publicKey,
Expand Down