Skip to content

Commit

Permalink
refactor: update network to use async libp2p (#206)
Browse files Browse the repository at this point in the history
* chore: update package.json

* refactor: cleanup network.js

* refactor(test): create libp2p util

* refactor(test): remove pull-* usage

* refactor: use async peer-id

* refactor: fix peer store usage

* refactor: peerbook to peerstore

* refactor: clean up bitswap protocol stream creation

* refactor(network): cleanup findAndConnect and await message decode

* refactor(protobuf): clean up Message importing

* test(protobuf): clean up Message importing

* chore: use latest dht

* fix: networking cleanup and add bufferlist slice

* test: cleanup network tests a bit

* chore: update libp2p dep

* chore: fix lint

* fix: check connections directly form the registrar

* test: clean up gen bitswap network tests

* test: bump unwant delay

* test: add delay before getting block

* chore: bump libp2p to latest pre release

* test: add delay before stat processing

* test(fix): correct libp2p datastore usage

* test: listen before the event might fire

* chore: remove unused deps

* fix: catch errors on connection and break reading on error

* chore: update multicodec dependency

* chore: update to libp2p rc

* chore: update libp2p to 0.27 release

Co-authored-by: Alan Shaw <alan.shaw@protocol.ai>
  • Loading branch information
2 people authored and dirkmc committed Jan 28, 2020
1 parent aa1c635 commit 57cd476
Show file tree
Hide file tree
Showing 17 changed files with 227 additions and 264 deletions.
27 changes: 15 additions & 12 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,25 @@
"async-iterator-all": "^1.0.0",
"benchmark": "^2.1.4",
"chai": "^4.2.0",
"delay": "^4.3.0",
"dirty-chai": "^2.0.1",
"ipfs-repo": "^0.28.0",
"libp2p": "^0.26.1",
"libp2p-kad-dht": "^0.16.0",
"libp2p-mplex": "^0.8.0",
"libp2p-secio": "~0.11.1",
"libp2p-tcp": "^0.13.0",
"ipfs-repo": "^0.28.2",
"libp2p": "^0.27.0",
"libp2p-kad-dht": "^0.18.3",
"libp2p-mplex": "^0.9.2",
"libp2p-secio": "^0.12.1",
"libp2p-tcp": "^0.14.2",
"lodash.difference": "^4.5.0",
"lodash.flatten": "^4.4.0",
"lodash.range": "^3.2.0",
"lodash.without": "^4.4.0",
"ncp": "^2.0.0",
"p-defer": "^3.0.0",
"p-event": "^4.1.0",
"p-wait-for": "^3.1.0",
"peer-book": "~0.9.0",
"peer-id": "^0.12.2",
"peer-info": "~0.15.1",
"peer-id": "^0.13.5",
"peer-info": "^0.17.0",
"promisify-es6": "^1.0.3",
"rimraf": "^3.0.0",
"safe-buffer": "^5.1.2",
Expand All @@ -71,17 +74,17 @@
},
"dependencies": {
"bignumber.js": "^9.0.0",
"callbackify": "^1.1.0",
"cids": "~0.7.0",
"debug": "^4.1.0",
"ipfs-block": "~0.8.0",
"it-length-prefixed": "^3.0.0",
"it-pipe": "^1.1.0",
"just-debounce-it": "^1.1.0",
"moving-average": "^1.0.0",
"multicodec": "~0.5.7",
"multicodec": "^1.0.0",
"multihashing-async": "^0.8.0",
"protons": "^1.0.1",
"pull-length-prefixed": "^1.3.1",
"pull-stream": "^3.6.9",
"streaming-iterables": "^4.1.1",
"varint-decoder": "~0.1.1"
},
"pre-push": [
Expand Down
142 changes: 69 additions & 73 deletions src/network.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
'use strict'

const lp = require('pull-length-prefixed')
const pull = require('pull-stream')
const callbackify = require('callbackify')
const lp = require('it-length-prefixed')
const pipe = require('it-pipe')

const Message = require('./types/message')
const CONSTANTS = require('./constants')
Expand All @@ -17,78 +16,85 @@ class Network {
options = options || {}
this.libp2p = libp2p
this.bitswap = bitswap
this.b100Only = options.b100Only || false
this.protocols = [BITSWAP100]
if (!options.b100Only) {
// Latest bitswap first
this.protocols.unshift(BITSWAP110)
}

this._stats = stats
this._running = false
}

start () {
this._running = true
// bind event listeners
this._onPeerConnect = this._onPeerConnect.bind(this)
this._onPeerDisconnect = this._onPeerDisconnect.bind(this)

this._onConnection = this._onConnection.bind(this)
this.libp2p.handle(BITSWAP100, this._onConnection)
if (!this.b100Only) { this.libp2p.handle(BITSWAP110, this._onConnection) }
}

start () {
this._running = true
this.libp2p.handle(this.protocols, this._onConnection)

this.libp2p.on('peer:connect', this._onPeerConnect)
this.libp2p.on('peer:disconnect', this._onPeerDisconnect)

// All existing connections are like new ones for us
this.libp2p.peerBook
.getAllArray()
.filter((peer) => peer.isConnected())
.forEach((peer) => this._onPeerConnect((peer)))
for (const peer of this.libp2p.peerStore.peers.values()) {
if (this.libp2p.registrar.getConnection(peer)) {
this._onPeerConnect(peer)
}
}
}

stop () {
this._running = false

this.libp2p.unhandle(BITSWAP100)
if (!this.b100Only) { this.libp2p.unhandle(BITSWAP110) }
// Unhandle both, libp2p doesn't care if it's not already handled
this.libp2p.unhandle(this.protocols)

this.libp2p.removeListener('peer:connect', this._onPeerConnect)
this.libp2p.removeListener('peer:disconnect', this._onPeerDisconnect)
}

// Handles both types of bitswap messgages
_onConnection (protocol, conn) {
/**
* Handles both types of incoming bitswap messages
* @private
* @param {object} param0
* @param {string} param0.protocol The protocol the stream is running
* @param {Stream} param0.stream A duplex iterable stream
* @param {Connection} param0.connection A libp2p Connection
* @returns {void}
*/
async _onConnection ({ protocol, stream, connection }) {
if (!this._running) { return }
this._log('incomming new bitswap connection: %s', protocol)

pull(
conn,
lp.decode(),
pull.asyncMap((data, cb) => callbackify(Message.deserialize)(data, cb)),
pull.asyncMap((msg, cb) => {
conn.getPeerInfo((err, peerInfo) => {
if (err) {
return cb(err)
}
this._log('incoming new bitswap %s connection from %s', protocol, connection.remotePeer.toB58String())

callbackify(this.bitswap._receiveMessage.bind(this.bitswap))(peerInfo.id, msg, cb)
})
}),
pull.onEnd((err) => {
this._log('ending connection')
if (err) {
this.bitswap._receiveError(err)
try {
await pipe(
stream,
lp.decode(),
async (source) => {
for await (const data of source) {
try {
const message = await Message.deserialize(data.slice())
this.bitswap._receiveMessage(connection.remotePeer, message)
} catch (err) {
this.bitswap._receiveError(err)
break
}
}
}
})
)
)
} catch (err) {
this._log(err)
}
}

_onPeerConnect (peerInfo) {
if (!this._running) { return }

this.bitswap._onPeerConnected(peerInfo.id)
}

_onPeerDisconnect (peerInfo) {
if (!this._running) { return }

this.bitswap._onPeerDisconnected(peerInfo.id)
}

Expand Down Expand Up @@ -116,9 +122,12 @@ class Network {
* @returns {void}
*/
async findAndConnect (cid) {
const provs = await this.findProviders(cid, CONSTANTS.maxProvidersPerRequest)
this._log('connecting to providers', provs.map((p) => p.id.toB58String()))
await Promise.all(provs.map((p) => this.connectTo(p)))
const connectAttempts = []
for await (const provider of this.findProviders(cid, CONSTANTS.maxProvidersPerRequest)) {
this._log('connecting to providers', provider.id.toB58String())
connectAttempts.push(this.connectTo(provider))
}
await Promise.all(connectAttempts)
}

async provide (cid) {
Expand All @@ -130,10 +139,10 @@ class Network {
async sendMessage (peer, msg) {
if (!this._running) throw new Error('network isn\'t running')

const stringId = peer.toB58String() ? peer.toB58String() : peer.id.toB58String()
const stringId = peer.toB58String()
this._log('sendMessage to %s', stringId, msg)

const { conn, protocol } = await this._dialPeer(peer)
const { stream, protocol } = await this._dialPeer(peer)

let serialized
switch (protocol) {
Expand All @@ -148,7 +157,7 @@ class Network {
}

// Note: Don't wait for writeMessage() to complete
writeMessage(conn, serialized, this._log)
writeMessage(stream, serialized, this._log)

this._updateSentStats(peer, msg.blocks)
}
Expand All @@ -168,20 +177,8 @@ class Network {
}

// Dial to the peer and try to use the most recent Bitswap
async _dialPeer (peer) {
try {
// Attempt Bitswap 1.1.0
return {
conn: await this.libp2p.dialProtocol(peer, BITSWAP110),
protocol: BITSWAP110
}
} catch (err) {
// Attempt Bitswap 1.0.0
return {
conn: await this.libp2p.dialProtocol(peer, BITSWAP100),
protocol: BITSWAP100
}
}
_dialPeer (peer) {
return this.libp2p.dialProtocol(peer, [BITSWAP110, BITSWAP100])
}

_updateSentStats (peer, blocks) {
Expand All @@ -194,17 +191,16 @@ class Network {
}
}

function writeMessage (conn, msg, log) {
pull(
pull.values([msg]),
lp.encode(),
conn.conn,
pull.onEnd((err) => {
if (err) {
log(err)
}
})
)
async function writeMessage (stream, msg, log) {
try {
await pipe(
[msg],
lp.encode(),
stream
)
} catch (err) {
log(err)
}
}

module.exports = Network
9 changes: 4 additions & 5 deletions src/types/message/index.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
'use strict'

const protons = require('protons')
const Block = require('ipfs-block')
const CID = require('cids')
const { getName } = require('multicodec')
const vd = require('varint-decoder')
const multihashing = require('multihashing-async')
const { isMapEqual } = require('../../utils')
const pbm = protons(require('./message.proto'))
const { Message } = require('./message.proto')
const Entry = require('./entry')

class BitswapMessage {
Expand Down Expand Up @@ -69,7 +68,7 @@ class BitswapMessage {
msg.wantlist.full = true
}

return pbm.Message.encode(msg)
return Message.encode(msg)
}

/*
Expand Down Expand Up @@ -101,7 +100,7 @@ class BitswapMessage {
})
})

return pbm.Message.encode(msg)
return Message.encode(msg)
}

equals (other) {
Expand All @@ -123,7 +122,7 @@ class BitswapMessage {
}

BitswapMessage.deserialize = async (raw) => {
const decoded = pbm.Message.decode(raw)
const decoded = Message.decode(raw)

const isFull = (decoded.wantlist && decoded.wantlist.full) || false
const msg = new BitswapMessage(isFull)
Expand Down
6 changes: 3 additions & 3 deletions src/types/message/message.proto.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
'use strict'

const protons = require('protons')
// from: https://github.com/ipfs/go-ipfs/blob/master/exchange/bitswap/message/pb/message.proto

module.exports = `
module.exports = protons(`
message Message {
message Wantlist {
message Entry {
Expand All @@ -25,4 +25,4 @@ module.exports = `
repeated bytes blocks = 2; // used to send Blocks in bitswap 1.0.0
repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0
}
`
`)
5 changes: 2 additions & 3 deletions test/bitswap-mock-internals.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ const chai = require('chai')
chai.use(require('dirty-chai'))
const expect = chai.expect
const PeerId = require('peer-id')
const promisify = require('promisify-es6')
const all = require('async-iterator-all')
const Message = require('../src/types/message')
const Bitswap = require('../src')
Expand Down Expand Up @@ -347,7 +346,7 @@ describe('bitswap with mocks', function () {
bs.get(b.cid)
])

setTimeout(() => bs.unwant(b.cid), 10)
setTimeout(() => bs.unwant(b.cid), 1e3)

const res = await p
expect(res[1]).to.not.exist()
Expand All @@ -357,7 +356,7 @@ describe('bitswap with mocks', function () {
describe('ledgerForPeer', () => {
it('returns null for unknown peer', async () => {
const bs = new Bitswap(mockLibp2pNode(), repo.blocks)
const id = await promisify(PeerId.create)({ bits: 512 })
const id = await PeerId.create({ bits: 512 })
const ledger = bs.ledgerForPeer(id)
expect(ledger).to.equal(null)
})
Expand Down
Loading

0 comments on commit 57cd476

Please sign in to comment.