Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(decision-engine): split large block messages #113

Merged
merged 5 commits into from
Feb 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions src/components/decision-engine/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const debug = require('debug')
const pull = require('pull-stream')
const each = require('async/each')
const eachSeries = require('async/eachSeries')
const waterfall = require('async/waterfall')
const map = require('async/map')
const debounce = require('lodash.debounce')
Expand All @@ -19,6 +20,8 @@ const Message = require('../../types/message')
const Wantlist = require('../../types/wantlist')
const Ledger = require('./ledger')

const MAX_MESSAGE_SIZE = 512 * 1024

class DecisionEngine {
constructor (blockstore, network) {
this.blockstore = blockstore
Expand All @@ -35,14 +38,42 @@ class DecisionEngine {
}

_sendBlocks (env, cb) {
// split into messges of max 512 * 1024 bytes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, because of https://github.com/ipfs/go-ipfs/blob/master/exchange/bitswap/network/ipfs_impl.go#L186

We have a limit to the maximum message size you can send.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, though that is higher than the multiplex limit you implemented in go. So I'm not sure if you could run into the same issue

const blocks = env.blocks
const total = blocks.reduce((acc, b) => {
return acc + b.block.data.byteLength
}, 0)

if (total < MAX_MESSAGE_SIZE) {
return this._sendSafeBlocks(env.peer, blocks, cb)
}

let size = 0
let batch = []

eachSeries(blocks, (b, cb) => {
batch.push(b)
size += b.block.data.byteLength

if (size >= MAX_MESSAGE_SIZE) {
const nextBatch = batch.slice()
batch = []
this._sendSafeBlocks(env.peer, nextBatch, cb)
} else {
cb()
}
}, cb)
}

_sendSafeBlocks (peer, blocks, cb) {
const msg = new Message(false)

env.blocks.forEach((b) => {
blocks.forEach((b) => {
msg.addBlock(b.cid, b.block)
})

// console.log('sending %s blocks', msg.blocks.size)
this.network.sendMessage(env.peer, msg, (err) => {
this.network.sendMessage(peer, msg, (err) => {
if (err) {
log('sendblock error: %s', err.message)
}
Expand Down
98 changes: 49 additions & 49 deletions src/components/network/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
const debug = require('debug')
const lp = require('pull-length-prefixed')
const pull = require('pull-stream')
const pushable = require('pull-pushable')
const setImmediate = require('async/setImmediate')

const Message = require('../../types/message')
Expand All @@ -19,7 +18,6 @@ class Network {
this.libp2p = libp2p
this.peerBook = peerBook
this.bitswap = bitswap
this.conns = new Map()
this.b100Only = b100Only || false

// increase event listener max
Expand Down Expand Up @@ -129,69 +127,71 @@ class Network {
}

const stringId = peerId.toB58String()
log('sendMessage to %s', stringId)
log('sendMessage to %s', stringId, msg)
let peerInfo
try {
peerInfo = this.peerBook.getByB58String(stringId)
} catch (err) {
return callback(err)
}

if (this.conns.has(stringId)) {
this.conns.get(stringId)(msg)
return callback()
}

const msgQueue = pushable()

// Attempt Bitswap 1.1.0
this.libp2p.dialByPeerInfo(peerInfo, BITSWAP110, (err, conn) => {
this._dialPeer(peerInfo, (err, conn, protocol) => {
if (err) {
// Attempt Bitswap 1.0.0
this.libp2p.dialByPeerInfo(peerInfo, BITSWAP100, (err, conn) => {
if (err) {
return callback(err)
}
log('dialed %s on Bitswap 1.0.0', peerInfo.id.toB58String())

this.conns.set(stringId, (msg) => {
msgQueue.push(msg.serializeToBitswap100())
})

this.conns.get(stringId)(msg)

withConn(this.conns, conn)
callback()
})
return
return callback(err)
}
log('dialed %s on Bitswap 1.1.0', peerInfo.id.toB58String())

this.conns.set(stringId, (msg) => {
msgQueue.push(msg.serializeToBitswap110())
let serialized
switch (protocol) {
case BITSWAP100:
serialized = msg.serializeToBitswap100()
break
case BITSWAP110:
serialized = msg.serializeToBitswap110()
break
default:
return callback(new Error('Unkown protocol: ' + protocol))
}
writeMessage(conn, serialized, (err) => {
if (err) {
log(err)
}
})

this.conns.get(stringId)(msg)

withConn(this.conns, conn)
callback()
})
}

function withConn (conns, conn) {
pull(
msgQueue,
lp.encode(),
conn,
pull.onEnd((err) => {
if (err) {
log.error(err)
}
msgQueue.end()
conns.delete(stringId)
})
)
_dialPeer (peerInfo, callback) {
// dialByPeerInfo throws if no network is there
try {
// Attempt Bitswap 1.1.0
this.libp2p.dialByPeerInfo(peerInfo, BITSWAP110, (err, conn) => {
if (err) {
// Attempt Bitswap 1.0.0
this.libp2p.dialByPeerInfo(peerInfo, BITSWAP100, (err, conn) => {
if (err) {
return callback(err)
}

callback(null, conn, BITSWAP100)
})
return
}

callback(null, conn, BITSWAP110)
})
} catch (err) {
return callback(err)
}
}
}

function writeMessage (conn, msg, callback) {
pull(
pull.values([msg]),
lp.encode(),
conn,
pull.onEnd(callback)
)
}

module.exports = Network
48 changes: 46 additions & 2 deletions test/components/decision-engine/index-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ function stringifyMessages (messages) {
}

module.exports = (repo) => {
function newEngine (path, done) {
function newEngine (path, done, net) {
parallel([
(cb) => repo.create(path, cb),
(cb) => PeerId.create(cb)
Expand All @@ -38,7 +38,7 @@ module.exports = (repo) => {
return done(err)
}
const blockstore = results[0].blockstore
const engine = new DecisionEngine(blockstore, mockNetwork())
const engine = new DecisionEngine(blockstore, net || mockNetwork())
engine.start()

done(null, { peer: results[1], engine })
Expand Down Expand Up @@ -221,5 +221,49 @@ module.exports = (repo) => {
)
})
})

it('splits large block messages', (done) => {
const data = _.range(10).map((i) => {
const b = new Buffer(1024 * 256)
b.fill(i)
return b
})
const blocks = _.range(10).map((i) => {
return new Block(data[i])
})

const net = mockNetwork(5, (res) => {
expect(res.messages).to.have.length(5)
done()
})

parallel([
(cb) => newEngine('sf', cb, net),
(cb) => map(blocks, (b, cb) => b.key(cb), cb)
], (err, res) => {
expect(err).to.not.exist
const sf = res[0].engine
const cids = res[1].map((c) => new CID(c))
const id = res[0].peer

pull(
pull.values(blocks.map((b, i) => ({
data: b.data, key: cids[i].multihash
}))),
sf.blockstore.putStream(),
pull.onEnd((err) => {
expect(err).to.not.exist
const msg = new Message(false)
cids.forEach((c, i) => {
msg.addEntry(c, Math.pow(2, 32) - 1 - i)
})

sf.messageReceived(id, msg, (err) => {
expect(err).to.not.exist
})
})
)
})
})
})
}