diff --git a/src/components/decision-engine/index.js b/src/components/decision-engine/index.js index ed42f319..f4b6d507 100644 --- a/src/components/decision-engine/index.js +++ b/src/components/decision-engine/index.js @@ -3,6 +3,7 @@ const debug = require('debug') const pull = require('pull-stream') const each = require('async/each') +const eachSeries = require('async/eachSeries') const waterfall = require('async/waterfall') const map = require('async/map') const debounce = require('lodash.debounce') @@ -19,6 +20,8 @@ const Message = require('../../types/message') const Wantlist = require('../../types/wantlist') const Ledger = require('./ledger') +const MAX_MESSAGE_SIZE = 512 * 1024 + class DecisionEngine { constructor (blockstore, network) { this.blockstore = blockstore @@ -35,14 +38,42 @@ class DecisionEngine { } _sendBlocks (env, cb) { + // split into messges of max 512 * 1024 bytes + const blocks = env.blocks + const total = blocks.reduce((acc, b) => { + return acc + b.block.data.byteLength + }, 0) + + if (total < MAX_MESSAGE_SIZE) { + return this._sendSafeBlocks(env.peer, blocks, cb) + } + + let size = 0 + let batch = [] + + eachSeries(blocks, (b, cb) => { + batch.push(b) + size += b.block.data.byteLength + + if (size >= MAX_MESSAGE_SIZE) { + const nextBatch = batch.slice() + batch = [] + this._sendSafeBlocks(env.peer, nextBatch, cb) + } else { + cb() + } + }, cb) + } + + _sendSafeBlocks (peer, blocks, cb) { const msg = new Message(false) - env.blocks.forEach((b) => { + blocks.forEach((b) => { msg.addBlock(b.cid, b.block) }) // console.log('sending %s blocks', msg.blocks.size) - this.network.sendMessage(env.peer, msg, (err) => { + this.network.sendMessage(peer, msg, (err) => { if (err) { log('sendblock error: %s', err.message) } diff --git a/src/components/network/index.js b/src/components/network/index.js index 34e5589f..79b54e69 100644 --- a/src/components/network/index.js +++ b/src/components/network/index.js @@ -3,7 +3,6 @@ const debug = require('debug') const lp = require('pull-length-prefixed') const pull = require('pull-stream') -const pushable = require('pull-pushable') const setImmediate = require('async/setImmediate') const Message = require('../../types/message') @@ -19,7 +18,6 @@ class Network { this.libp2p = libp2p this.peerBook = peerBook this.bitswap = bitswap - this.conns = new Map() this.b100Only = b100Only || false // increase event listener max @@ -129,7 +127,7 @@ class Network { } const stringId = peerId.toB58String() - log('sendMessage to %s', stringId) + log('sendMessage to %s', stringId, msg) let peerInfo try { peerInfo = this.peerBook.getByB58String(stringId) @@ -137,61 +135,63 @@ class Network { return callback(err) } - if (this.conns.has(stringId)) { - this.conns.get(stringId)(msg) - return callback() - } - - const msgQueue = pushable() - - // Attempt Bitswap 1.1.0 - this.libp2p.dialByPeerInfo(peerInfo, BITSWAP110, (err, conn) => { + this._dialPeer(peerInfo, (err, conn, protocol) => { if (err) { - // Attempt Bitswap 1.0.0 - this.libp2p.dialByPeerInfo(peerInfo, BITSWAP100, (err, conn) => { - if (err) { - return callback(err) - } - log('dialed %s on Bitswap 1.0.0', peerInfo.id.toB58String()) - - this.conns.set(stringId, (msg) => { - msgQueue.push(msg.serializeToBitswap100()) - }) - - this.conns.get(stringId)(msg) - - withConn(this.conns, conn) - callback() - }) - return + return callback(err) } - log('dialed %s on Bitswap 1.1.0', peerInfo.id.toB58String()) - this.conns.set(stringId, (msg) => { - msgQueue.push(msg.serializeToBitswap110()) + let serialized + switch (protocol) { + case BITSWAP100: + serialized = msg.serializeToBitswap100() + break + case BITSWAP110: + serialized = msg.serializeToBitswap110() + break + default: + return callback(new Error('Unkown protocol: ' + protocol)) + } + writeMessage(conn, serialized, (err) => { + if (err) { + log(err) + } }) - - this.conns.get(stringId)(msg) - - withConn(this.conns, conn) callback() }) + } - function withConn (conns, conn) { - pull( - msgQueue, - lp.encode(), - conn, - pull.onEnd((err) => { - if (err) { - log.error(err) - } - msgQueue.end() - conns.delete(stringId) - }) - ) + _dialPeer (peerInfo, callback) { + // dialByPeerInfo throws if no network is there + try { + // Attempt Bitswap 1.1.0 + this.libp2p.dialByPeerInfo(peerInfo, BITSWAP110, (err, conn) => { + if (err) { + // Attempt Bitswap 1.0.0 + this.libp2p.dialByPeerInfo(peerInfo, BITSWAP100, (err, conn) => { + if (err) { + return callback(err) + } + + callback(null, conn, BITSWAP100) + }) + return + } + + callback(null, conn, BITSWAP110) + }) + } catch (err) { + return callback(err) } } } +function writeMessage (conn, msg, callback) { + pull( + pull.values([msg]), + lp.encode(), + conn, + pull.onEnd(callback) + ) +} + module.exports = Network diff --git a/test/components/decision-engine/index-test.js b/test/components/decision-engine/index-test.js index f5e11f08..0b5408cd 100644 --- a/test/components/decision-engine/index-test.js +++ b/test/components/decision-engine/index-test.js @@ -29,7 +29,7 @@ function stringifyMessages (messages) { } module.exports = (repo) => { - function newEngine (path, done) { + function newEngine (path, done, net) { parallel([ (cb) => repo.create(path, cb), (cb) => PeerId.create(cb) @@ -38,7 +38,7 @@ module.exports = (repo) => { return done(err) } const blockstore = results[0].blockstore - const engine = new DecisionEngine(blockstore, mockNetwork()) + const engine = new DecisionEngine(blockstore, net || mockNetwork()) engine.start() done(null, { peer: results[1], engine }) @@ -221,5 +221,49 @@ module.exports = (repo) => { ) }) }) + + it('splits large block messages', (done) => { + const data = _.range(10).map((i) => { + const b = new Buffer(1024 * 256) + b.fill(i) + return b + }) + const blocks = _.range(10).map((i) => { + return new Block(data[i]) + }) + + const net = mockNetwork(5, (res) => { + expect(res.messages).to.have.length(5) + done() + }) + + parallel([ + (cb) => newEngine('sf', cb, net), + (cb) => map(blocks, (b, cb) => b.key(cb), cb) + ], (err, res) => { + expect(err).to.not.exist + const sf = res[0].engine + const cids = res[1].map((c) => new CID(c)) + const id = res[0].peer + + pull( + pull.values(blocks.map((b, i) => ({ + data: b.data, key: cids[i].multihash + }))), + sf.blockstore.putStream(), + pull.onEnd((err) => { + expect(err).to.not.exist + const msg = new Message(false) + cids.forEach((c, i) => { + msg.addEntry(c, Math.pow(2, 32) - 1 - i) + }) + + sf.messageReceived(id, msg, (err) => { + expect(err).to.not.exist + }) + }) + ) + }) + }) }) }