diff --git a/API.md b/API.md deleted file mode 100644 index e37fa651..00000000 --- a/API.md +++ /dev/null @@ -1,82 +0,0 @@ -# API - -## Public Methods - -### `constructor(id, libp2p, datastore)` - -- `id: PeerId`, the id of the local instance. -- `libp2p: Libp2p`, instance of the local network stack. -- `blockstore: Datastore`, instance of the local database (`IpfsRepo.blockstore`) - -Create a new instance. - - -### `getStream(key)` - -- `key: Multihash|Array` - -Returns a source `pull-stream`. Values emitted are the received blocks. - -Example: - -```js -// Single block -pull( - bitswap.getStream(key), - pull.collect((err, blocks) => { - // blocks === [block] - }) -) - -// Many blocks -pull( - bitswap.getStream([key1, key2, key3]), - pull.collect((err, blocks) => { - // blocks === [block1, block2, block3] - }) -) -``` - - -> Note: This is safe guarded so that the network is not asked -> for blocks that are in the local `datastore`. - - -### `unwant(keys)` - -- `keys: Mutlihash|[]Multihash` - -Cancel previously requested keys, forcefully. That means they are removed from the -wantlist independent of how many other resources requested these keys. Callbacks -attached to `getBlock` are errored with `Error('manual unwant: key')`. - -### `cancelWants(keys)` - -- `keys: Multihash|[]Multihash` - -Cancel previously requested keys. - -### `putStream()` - -Returns a duplex `pull-stream` that emits an object `{key: Multihash}` for every written block when it was stored. -Objects passed into here should be of the form `{data: Buffer, key: Multihash}` - -### `put(blockAndKey, cb)` - -- `blockAndKey: {data: Buffer, key: Multihash}` -- `cb: Function` - -Announce that the current node now has the block containing `data`. This will store it -in the local database and attempt to serve it to all peers that are known - to have requested it. The callback is called when we are sure that the block - is stored. - -### `wantlistForPeer(peerId)` - -- `peerId: PeerId` - -Get the wantlist for a given peer. - -### `stat()` - -Get stats about about the current state of the bitswap instance. diff --git a/README.md b/README.md index e997ac45..640af1f4 100644 --- a/README.md +++ b/README.md @@ -31,13 +31,13 @@ ### npm ```sh -> npm i ipfs-bitswap +> npm install ipfs-bitswap --save ``` ### Use in Node.js ```js -const bitswap = require('ipfs-bitswap') +const Bitswap = require('ipfs-bitswap') ``` ### Use in a browser with browserify, webpack or any other bundler @@ -45,7 +45,7 @@ const bitswap = require('ipfs-bitswap') The code published to npm that gets loaded on require is in fact a ES5 transpiled version with the right shims added. This means that you can require it and use with your favourite bundler without having to adjust asset management process. ```js -const bitswap = require('ipfs-bitswap') +const Bitswap = require('ipfs-bitswap') ``` ### Use in a browser using a script tag @@ -62,6 +62,116 @@ Loading this module through a script tag will make the `IpfsBitswap` object avai For the documentation see [API.md](API.md). +### API + +#### `new Bitswap(libp2p, blockstore)` + +- `libp2p: Libp2p`, instance of the local network stack. +- `blockstore: Blockstore`, instance of the local database (`IpfsRepo.blockstore`) + +Create a new instance. + +#### `getStream(cid)` + +- `cid: CID|Array` + +Returns a source `pull-stream`. Values emitted are the received blocks. + +Example: + +```js +// Single block +pull( + bitswap.getStream(cid), + pull.collect((err, blocks) => { + // blocks === [block] + }) +) + +// Many blocks +pull( + bitswap.getStream([cid1, cid2, cid3]), + pull.collect((err, blocks) => { + // blocks === [block1, block2, block3] + }) +) +``` + +> Note: This is safe guarded so that the network is not asked +> for blocks that are in the local `datastore`. + +#### `unwant(cids)` + +- `cids: CID|[]CID` + +Cancel previously requested keys, forcefully. That means they are removed from the +wantlist independent of how many other resources requested these keys. Callbacks +attached to `getBlock` are errored with `Error('manual unwant: key')`. + +#### `cancelWants(cids)` + +- `cid: CID|[]CID` + +Cancel previously requested keys. + +#### `putStream()` + +Returns a duplex `pull-stream` that emits an object `{key: Multihash}` for every written block when it was stored. +Objects passed into here should be of the form `{data: Buffer, key: Multihash}` + +#### `put(blockAndCid, callback)` + +- `blockAndKey: {data: Buffer, cid: CID}` +- `callback: Function` + +Announce that the current node now has the block containing `data`. This will store it +in the local database and attempt to serve it to all peers that are known + to have requested it. The callback is called when we are sure that the block + is stored. + +#### `wantlistForPeer(peerId)` + +- `peerId: PeerId` + +Get the wantlist for a given peer. + +#### `stat()` + +Get stats about about the current state of the bitswap instance. + +## Development + +### Structure + +![](/img/architecture.png) + +```sh +» tree src +src +├── components +│   ├── decision +│   │   ├── engine.js +│   │   ├── index.js +│   │   ├── ledger.js +│   │   ├── peer-request-queue.js +│   │   └── pq.js +│   ├── network # Handles peerSet and open new conns +│   │   └── index.js +│   └── want-manager # Keeps track of all blocks the peer wants (not the others which it is connected) +│   ├── index.js +│   └── msg-queue.js # Messages to send queue, one per peer +├── constants.js +├── index.js +└── types + ├── message # (Type) message that is put in the wire + │   ├── entry.js + │   ├── index.js + │   └── message.proto.js + └── wantlist # (Type) track wanted blocks + ├── entry.js + └── index.js +``` + ## Contribute Feel free to join in. All welcome. Open an [issue](https://github.com/ipfs/js-ipfs-bitswap/issues)! diff --git a/benchmarks/index.js b/benchmarks/index.js index 73537139..2652449d 100644 --- a/benchmarks/index.js +++ b/benchmarks/index.js @@ -11,6 +11,7 @@ const Block = require('ipfs-block') const pull = require('pull-stream') const assert = require('assert') const crypto = require('crypto') +const CID = require('cids') const utils = require('../test/utils') @@ -54,6 +55,7 @@ function round (nodeArr, blockFactor, n, cb) { if (err) { return cb(err) } + const cids = keys.map((k) => new CID(k)) let d series([ // put blockFactor amount of blocks per node @@ -63,8 +65,8 @@ function round (nodeArr, blockFactor, n, cb) { const data = _.map(_.range(blockFactor), (j) => { const index = i * blockFactor + j return { - data: blocks[index].data, - key: keys[index] + block: blocks[index], + cid: cids[index] } }) each( @@ -80,7 +82,7 @@ function round (nodeArr, blockFactor, n, cb) { // fetch all blocks on every node (cb) => parallel(_.map(nodeArr, (node, i) => (callback) => { pull( - node.bitswap.getStream(keys), + node.bitswap.getStream(cids), pull.collect((err, res) => { if (err) { return callback(err) diff --git a/benchmarks/put-get.js b/benchmarks/put-get.js index 333f0353..33f9d8a8 100644 --- a/benchmarks/put-get.js +++ b/benchmarks/put-get.js @@ -7,6 +7,8 @@ const assert = require('assert') const pull = require('pull-stream') const series = require('async/series') const crypto = require('crypto') +const CID = require('cids') + const utils = require('../test/utils') const suite = new Benchmark.Suite('put-get') @@ -64,7 +66,7 @@ function put (blocks, bs, callback) { if (err) { return cb(err) } - cb(null, {key: key, data: b.data}) + cb(null, {cid: new CID(key), block: b}) }) }), bs.putStream(), @@ -76,7 +78,7 @@ function get (blocks, bs, callback) { pull( pull.values(blocks), pull.asyncMap((b, cb) => b.key(cb)), - pull.map((k) => bs.getStream(k)), + pull.map((k) => bs.getStream(new CID(k))), pull.flatten(), pull.collect((err, res) => { if (err) { diff --git a/img/architecture.monopic b/img/architecture.monopic new file mode 100644 index 00000000..75915564 Binary files /dev/null and b/img/architecture.monopic differ diff --git a/img/architecture.png b/img/architecture.png new file mode 100644 index 00000000..32b5967b Binary files /dev/null and b/img/architecture.png differ diff --git a/img/architecture.txt b/img/architecture.txt new file mode 100644 index 00000000..736815e2 --- /dev/null +++ b/img/architecture.txt @@ -0,0 +1,51 @@ + + ┌────────────────────────────────────────────────────────────────────────────┐ + │ Bitswap API │ + └────────────────────────────────────────────────────────────────────────────┘ + │ ▲ + │register wants │yields the + │and unwants │received + │ │blocks + │ │ + │ send block to other nodes┌───────────────────────────┐ + │ by adding them to their │ Decision Engine │ + │ buckets │ │ + │ ┌─────────◀├ ─ ─ ─ │ + │ │ │Ledger│ │ + │ │ └───────────────────────────┘ + │ │ ▲ + ▼ │ │ + ┌────────────────────┐ │ │ + │ │ │ │ + │ Want Manager │ │ │ + │ │ │ │ + ├───────────┬──┬─────┘ │ │ + │my wantlist│ │ │ │ + └───────────┘ │update wantlist │ │ + │messages │ receive a block │ + └─────┬───────┬───────┤ │ + │ │ │ │ + ▼ ▼ ▼ │ + ┌───────┬───────┬───────┐ │ + │Message│Message│... │ │ + │Queue/ │Queue/ │ │ │ + │peer │peer │ │ │ + └───────┴───────┴───────┘ │ + │ │ │ │ + │ │ │ │ + │ │ │ │ + └───────┴───────┴─┐ │ + │ │ + │ │ + ▼ │ + ┌────────────────────────────────────────┐ + │ Network │ + └────────────────────────────────────────┘ + │ ▲ │ ▲ + ▼ │ │ │ + ┌─────────┐ ▼ │ + │Transform│ /ipfs/bitswap/1.1.0 + └─────────┘ + │ ▲ + ▼ │ + /ipfs/bitswap/1.0.0 \ No newline at end of file diff --git a/package.json b/package.json index e22c6bd0..c68386b8 100644 --- a/package.json +++ b/package.json @@ -36,7 +36,7 @@ }, "homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme", "devDependencies": { - "aegir": "9.2.2", + "aegir": "9.3.0", "benchmark": "^2.1.2", "buffer-loader": "0.0.1", "chai": "^3.5.0", @@ -56,8 +56,8 @@ }, "dependencies": { "async": "^2.1.4", - "cids": "^0.3.4", - "debug": "^2.3.3", + "cids": "^0.3.5", + "debug": "^2.4.4", "heap": "^0.2.6", "ipfs-block": "^0.5.3", "lodash.debounce": "^4.0.8", @@ -68,13 +68,14 @@ "lodash.pullallwith": "^4.7.0", "lodash.uniqwith": "^4.5.0", "lodash.values": "^4.3.0", - "multihashes": "^0.3.0", + "multihashes": "^0.3.1", "protocol-buffers": "^3.2.1", "pull-defer": "^0.2.2", "pull-length-prefixed": "^1.2.0", "pull-paramap": "^1.2.1", "pull-pushable": "^2.0.1", - "pull-stream": "^3.5.0" + "pull-stream": "^3.5.0", + "varint-decoder": "^0.1.1" }, "contributors": [ "David Dias ", diff --git a/src/decision/engine.js b/src/components/decision-engine/index.js similarity index 78% rename from src/decision/engine.js rename to src/components/decision-engine/index.js index f1462351..5617a6ee 100644 --- a/src/decision/engine.js +++ b/src/components/decision-engine/index.js @@ -3,23 +3,24 @@ const debug = require('debug') const pull = require('pull-stream') const each = require('async/each') -const map = require('async/map') const waterfall = require('async/waterfall') +const map = require('async/map') const debounce = require('lodash.debounce') const uniqWith = require('lodash.uniqwith') const find = require('lodash.find') const values = require('lodash.values') const groupBy = require('lodash.groupby') const pullAllWith = require('lodash.pullallwith') +const CID = require('cids') const log = debug('bitswap:engine') log.error = debug('bitswap:engine:error') -const Message = require('../message') -const Wantlist = require('../wantlist') +const Message = require('../../types/message') +const Wantlist = require('../../types/wantlist') const Ledger = require('./ledger') -module.exports = class Engine { +class DecisionEngine { constructor (blockstore, network) { this.blockstore = blockstore this.network = network @@ -36,8 +37,9 @@ module.exports = class Engine { _sendBlocks (env, cb) { const msg = new Message(false) - env.blocks.forEach((block) => { - msg.addBlockWithKey(block.block, block.key) + + env.blocks.forEach((b) => { + msg.addBlock(b.cid, b.block) }) // console.log('sending %s blocks', msg.blocks.size) @@ -57,20 +59,20 @@ module.exports = class Engine { const tasks = this._tasks this._tasks = [] const entries = tasks.map((t) => t.entry) - const keys = entries.map((e) => e.key) - const uniqKeys = uniqWith(keys, (a, b) => a.equals(b)) + const cids = entries.map((e) => e.cid) + const uniqCids = uniqWith(cids, (a, b) => a.equals(b)) const groupedTasks = groupBy(tasks, (task) => task.target.toB58String()) waterfall([ - (cb) => map(uniqKeys, (k, cb) => { + (cb) => map(uniqCids, (cid, cb) => { pull( - this.blockstore.getStream(k), + this.blockstore.getStream(cid.multihash), pull.collect((err, blocks) => { if (err) { return cb(err) } cb(null, { - key: k, + cid: cid, block: blocks[0] }) }) @@ -79,8 +81,8 @@ module.exports = class Engine { (blocks, cb) => each(values(groupedTasks), (tasks, cb) => { // all tasks have the same target const peer = tasks[0].target - const blockList = keys.map((k) => { - return find(blocks, (b) => b.key.equals(k)) + const blockList = cids.map((cid) => { + return find(blocks, (b) => b.cid.equals(cid)) }) this._sendBlocks({ @@ -91,7 +93,7 @@ module.exports = class Engine { log.error('failed to send', err) } blockList.forEach((block) => { - this.messageSent(peer, block.block, block.key) + this.messageSent(peer, block.block, block.cid) }) cb() }) @@ -105,24 +107,25 @@ module.exports = class Engine { } wantlistForPeer (peerId) { - if (!this.ledgerMap.has(peerId.toB58String())) { + const peerIdStr = peerId.toB58String() + if (!this.ledgerMap.has(peerIdStr)) { return new Map() } - return this.ledgerMap.get(peerId.toB58String()).wantlist.sortedEntries() + return this.ledgerMap.get(peerIdStr).wantlist.sortedEntries() } peers () { return Array.from(this.ledgerMap.values()).map((l) => l.partner) } - receivedBlocks (keys) { - if (!keys.length) { + receivedBlocks (cids) { + if (!cids.length) { return } // Check all connected peers if they want the block we received for (let l of this.ledgerMap.values()) { - keys + cids .map((k) => l.wantlistContains(k)) .filter(Boolean) .forEach((e) => { @@ -162,10 +165,10 @@ module.exports = class Engine { let wants = [] for (let entry of msg.wantlist.values()) { if (entry.cancel) { - ledger.cancelWant(entry.key) + ledger.cancelWant(entry.cid) cancels.push(entry) } else { - ledger.wants(entry.key, entry.priority) + ledger.wants(entry.cid, entry.priority) wants.push(entry) } } @@ -180,15 +183,15 @@ module.exports = class Engine { pullAllWith(this._tasks, entries, (t, e) => { const sameTarget = t.target.toB58String() === id - const sameKey = t.entry.key.equals(e.key) - return sameTarget && sameKey + const sameCid = t.entry.cid.equals(e.cid) + return sameTarget && sameCid }) } _addWants (ledger, peerId, entries, cb) { each(entries, (entry, cb) => { // If we already have the block, serve it - this.blockstore.has(entry.key, (err, exists) => { + this.blockstore.has(entry.cid.multihash, (err, exists) => { if (err) { log.error('failed existence check') } else if (exists) { @@ -214,24 +217,24 @@ module.exports = class Engine { log('got block (%s bytes)', block.data.length) ledger.receivedBytes(block.data.length) - cb(null, key) + cb(null, new CID(key)) }) - }, (err, keys) => { + }, (err, cids) => { if (err) { return callback(err) } - this.receivedBlocks(keys) + this.receivedBlocks(cids) callback() }) } // Clear up all accounting things after message was sent - messageSent (peerId, block, key) { + messageSent (peerId, block, cid) { const ledger = this._findOrCreate(peerId) ledger.sentBytes(block ? block.data.length : 0) - if (key) { - ledger.wantlist.remove(key) + if (cid) { + ledger.wantlist.remove(cid) } } @@ -249,16 +252,18 @@ module.exports = class Engine { // } // // TODO: figure out how to remove all other references - // in the peerrequestqueue + // in the peer request queue } _findOrCreate (peerId) { - if (this.ledgerMap.has(peerId.toB58String())) { - return this.ledgerMap.get(peerId.toB58String()) + const peerIdStr = peerId.toB58String() + if (this.ledgerMap.has(peerIdStr)) { + return this.ledgerMap.get(peerIdStr) } const l = new Ledger(peerId) - this.ledgerMap.set(peerId.toB58String(), l) + + this.ledgerMap.set(peerIdStr, l) return l } @@ -271,3 +276,5 @@ module.exports = class Engine { this._running = false } } + +module.exports = DecisionEngine diff --git a/src/decision/ledger.js b/src/components/decision-engine/ledger.js similarity index 59% rename from src/decision/ledger.js rename to src/components/decision-engine/ledger.js index 3d98069f..a4619078 100644 --- a/src/decision/ledger.js +++ b/src/components/decision-engine/ledger.js @@ -1,8 +1,8 @@ 'use strict' -const Wantlist = require('../wantlist') +const Wantlist = require('../../types/wantlist') -module.exports = class Ledger { +class Ledger { constructor (peerId) { this.partner = peerId this.wantlist = new Wantlist() @@ -17,26 +17,28 @@ module.exports = class Ledger { } sentBytes (n) { - this.exchangeCount ++ + this.exchangeCount++ this.lastExchange = (new Date()).getTime() this.accounting.bytesSent += n } receivedBytes (n) { - this.exchangeCount ++ + this.exchangeCount++ this.lastExchange = (new Date()).getTime() this.accounting.bytesRecv += n } - wants (key, priority) { - this.wantlist.add(key, priority) + wants (cid, priority) { + this.wantlist.add(cid, priority) } - cancelWant (key) { - this.wantlist.remove(key) + cancelWant (cid) { + this.wantlist.remove(cid) } - wantlistContains (key) { - return this.wantlist.contains(key) + wantlistContains (cid) { + return this.wantlist.contains(cid) } } + +module.exports = Ledger diff --git a/src/network/index.js b/src/components/network/index.js similarity index 61% rename from src/network/index.js rename to src/components/network/index.js index 07a2b2b4..34e5589f 100644 --- a/src/network/index.js +++ b/src/components/network/index.js @@ -6,34 +6,38 @@ const pull = require('pull-stream') const pushable = require('pull-pushable') const setImmediate = require('async/setImmediate') -const Message = require('../message') -const cs = require('../constants') +const Message = require('../../types/message') +const CONSTANTS = require('../../constants') const log = debug('bitswap:network') log.error = debug('bitswap:network:error') -const PROTOCOL_IDENTIFIER = '/ipfs/bitswap/1.0.0' +const BITSWAP100 = '/ipfs/bitswap/1.0.0' +const BITSWAP110 = '/ipfs/bitswap/1.1.0' -module.exports = class Network { - constructor (libp2p, peerBook, bitswap) { +class Network { + constructor (libp2p, peerBook, bitswap, b100Only) { this.libp2p = libp2p this.peerBook = peerBook this.bitswap = bitswap this.conns = new Map() + this.b100Only = b100Only || false // increase event listener max - this.libp2p.swarm.setMaxListeners(cs.maxListeners) - this._running = false + this.libp2p.swarm.setMaxListeners(CONSTANTS.maxListeners) } start () { this._running = true // bind event listeners - this._onConnection = this._onConnection.bind(this) this._onPeerMux = this._onPeerMux.bind(this) this._onPeerMuxClosed = this._onPeerMuxClosed.bind(this) - this.libp2p.handle(PROTOCOL_IDENTIFIER, this._onConnection) + this._onConnection = this._onConnection.bind(this) + this.libp2p.handle(BITSWAP100, this._onConnection) + if (!this.b100Only) { + this.libp2p.handle(BITSWAP110, this._onConnection) + } this.libp2p.swarm.on('peer-mux-established', this._onPeerMux) this.libp2p.swarm.on('peer-mux-closed', this._onPeerMuxClosed) @@ -47,12 +51,17 @@ module.exports = class Network { stop () { this._running = false - this.libp2p.unhandle(PROTOCOL_IDENTIFIER) - this.libp2p.swarm.removeListener('peer-mux-established', this._onPeerMux) + this.libp2p.unhandle(BITSWAP100) + if (!this.b100Only) { + this.libp2p.unhandle(BITSWAP110) + } + + this.libp2p.swarm.removeListener('peer-mux-established', this._onPeerMux) this.libp2p.swarm.removeListener('peer-mux-closed', this._onPeerMuxClosed) } + // Handles both types of bitswap messgages _onConnection (protocol, conn) { if (!this._running) { return @@ -61,7 +70,7 @@ module.exports = class Network { pull( conn, lp.decode(), - pull.asyncMap((data, cb) => Message.fromProto(data, cb)), + pull.asyncMap((data, cb) => Message.deserialize(data, cb)), pull.asyncMap((msg, cb) => { conn.getPeerInfo((err, peerInfo) => { if (err) { @@ -96,8 +105,8 @@ module.exports = class Network { } // Connect to the given peer - connectTo (peerId, cb) { - const done = (err) => setImmediate(() => cb(err)) + connectTo (peerId, callback) { + const done = (err) => setImmediate(() => callback(err)) if (!this._running) { return done(new Error('No running network')) @@ -114,9 +123,9 @@ module.exports = class Network { } // Send the given msg (instance of Message) to the given peer - sendMessage (peerId, msg, cb) { + sendMessage (peerId, msg, callback) { if (!this._running) { - return cb(new Error('No running network')) + return callback(new Error('No running network')) } const stringId = peerId.toB58String() @@ -125,27 +134,50 @@ module.exports = class Network { try { peerInfo = this.peerBook.getByB58String(stringId) } catch (err) { - return cb(err) + return callback(err) } if (this.conns.has(stringId)) { - log('connection exists') - this.conns.get(stringId).push(msg.toProto()) - return cb() + this.conns.get(stringId)(msg) + return callback() } - log('dialByPeerInfo') - this.libp2p.dialByPeerInfo(peerInfo, PROTOCOL_IDENTIFIER, (err, conn) => { - // log('dialed %s', peerInfo.id.toB58String(), err) + const msgQueue = pushable() + + // Attempt Bitswap 1.1.0 + this.libp2p.dialByPeerInfo(peerInfo, BITSWAP110, (err, conn) => { if (err) { - return cb(err) + // Attempt Bitswap 1.0.0 + this.libp2p.dialByPeerInfo(peerInfo, BITSWAP100, (err, conn) => { + if (err) { + return callback(err) + } + log('dialed %s on Bitswap 1.0.0', peerInfo.id.toB58String()) + + this.conns.set(stringId, (msg) => { + msgQueue.push(msg.serializeToBitswap100()) + }) + + this.conns.get(stringId)(msg) + + withConn(this.conns, conn) + callback() + }) + return } + log('dialed %s on Bitswap 1.1.0', peerInfo.id.toB58String()) - const msgQueue = pushable() - msgQueue.push(msg.toProto()) + this.conns.set(stringId, (msg) => { + msgQueue.push(msg.serializeToBitswap110()) + }) - this.conns.set(stringId, msgQueue) + this.conns.get(stringId)(msg) + withConn(this.conns, conn) + callback() + }) + + function withConn (conns, conn) { pull( msgQueue, lp.encode(), @@ -155,11 +187,11 @@ module.exports = class Network { log.error(err) } msgQueue.end() - this.conns.delete(stringId) + conns.delete(stringId) }) ) - - cb() - }) + } } } + +module.exports = Network diff --git a/src/wantmanager/index.js b/src/components/want-manager/index.js similarity index 64% rename from src/wantmanager/index.js rename to src/components/want-manager/index.js index a7220478..c963a819 100644 --- a/src/wantmanager/index.js +++ b/src/components/want-manager/index.js @@ -2,38 +2,38 @@ const debug = require('debug') -const Message = require('../message') -const Wantlist = require('../wantlist') -const cs = require('../constants') +const Message = require('../../types/message') +const Wantlist = require('../../types/wantlist') +const CONSTANTS = require('../../constants') const MsgQueue = require('./msg-queue') const log = debug('bitswap:wantmanager') log.error = debug('bitswap:wantmanager:error') -module.exports = class Wantmanager { +module.exports = class WantManager { constructor (network) { this.peers = new Map() - this.wl = new Wantlist() + this.wantlist = new Wantlist() this.network = network } - _addEntries (keys, cancel, force) { - const entries = keys.map((key, i) => { - return new Message.Entry(key, cs.kMaxPriority - i, cancel) + _addEntries (cids, cancel, force) { + const entries = cids.map((cid, i) => { + return new Message.Entry(cid, CONSTANTS.kMaxPriority - i, cancel) }) entries.forEach((e) => { // add changes to our wantlist if (e.cancel) { if (force) { - this.wl.removeForce(e.key) + this.wantlist.removeForce(e.cid) } else { - this.wl.remove(e.key) + this.wantlist.remove(e.cid) } } else { log('adding to wl') - this.wl.add(e.key, e.priority) + this.wantlist.add(e.cid, e.priority) } }) @@ -55,8 +55,9 @@ module.exports = class Wantmanager { // new peer, give them the full wantlist const fullwantlist = new Message(true) - for (let entry of this.wl.entries()) { - fullwantlist.addEntry(entry[1].key, entry[1].priority) + + for (let entry of this.wantlist.entries()) { + fullwantlist.addEntry(entry[1].cid, entry[1].priority) } mq.addMessage(fullwantlist) @@ -80,21 +81,21 @@ module.exports = class Wantmanager { this.peers.delete(peerId.toB58String()) } - // add all the keys to the wantlist - wantBlocks (keys) { - this._addEntries(keys, false) + // add all the cids to the wantlist + wantBlocks (cids) { + this._addEntries(cids, false) } // remove blocks of all the given keys without respecting refcounts - unwantBlocks (keys) { - log('unwant blocks: %s', keys.length) - this._addEntries(keys, true, true) + unwantBlocks (cids) { + log('unwant blocks: %s', cids.length) + this._addEntries(cids, true, true) } // cancel wanting all of the given keys - cancelWants (keys) { - log('cancel wants: %s', keys.length) - this._addEntries(keys, true) + cancelWants (cids) { + log('cancel wants: %s', cids.length) + this._addEntries(cids, true) } // Returns a list of all currently connected peers @@ -114,8 +115,8 @@ module.exports = class Wantmanager { this.timer = setInterval(() => { // resend entirew wantlist every so often const fullwantlist = new Message(true) - for (let entry of this.wl.entries()) { - fullwantlist.addEntry(entry[1].key, entry[1].priority) + for (let entry of this.wantlist.entries()) { + fullwantlist.addEntry(entry[1].cid, entry[1].priority) } this.peers.forEach((p) => { @@ -126,7 +127,7 @@ module.exports = class Wantmanager { stop () { for (let mq of this.peers.values()) { - this.disconnected(mq.id) + this.disconnected(mq.peerId) } clearInterval(this.timer) } diff --git a/src/wantmanager/msg-queue.js b/src/components/want-manager/msg-queue.js similarity index 68% rename from src/wantmanager/msg-queue.js rename to src/components/want-manager/msg-queue.js index 8a629428..3ea18ac0 100644 --- a/src/wantmanager/msg-queue.js +++ b/src/components/want-manager/msg-queue.js @@ -2,14 +2,14 @@ const debug = require('debug') const debounce = require('lodash.debounce') -const Message = require('../message') +const Message = require('../../types/message') const log = debug('bitswap:wantmanager:queue') log.error = debug('bitswap:wantmanager:queue:error') module.exports = class MsgQueue { constructor (peerId, network) { - this.id = peerId + this.peerId = peerId this.network = network this.refcnt = 1 @@ -31,14 +31,16 @@ module.exports = class MsgQueue { } _sendEntries () { - if (!this._entries.length) return + if (!this._entries.length) { + return + } const msg = new Message(false) this._entries.forEach((entry) => { if (entry.cancel) { - msg.cancel(entry.key) + msg.cancel(entry.cid) } else { - msg.addEntry(entry.key, entry.priority) + msg.addEntry(entry.cid, entry.priority) } }) this._entries = [] @@ -46,14 +48,13 @@ module.exports = class MsgQueue { } send (msg) { - this.network.connectTo(this.id, (err) => { + this.network.connectTo(this.peerId, (err) => { if (err) { - log.error('cant connect to peer %s: %s', this.id.toB58String(), err.message) + log.error('cant connect to peer %s: %s', this.peerId.toB58String(), err.message) return } log('sending message') - // console.log('sending msg %s blocks, %s wants', msg.blocks.size, msg.wantlist.size) - this.network.sendMessage(this.id, msg, (err) => { + this.network.sendMessage(this.peerId, msg, (err) => { if (err) { log.error('send error: %s', err.message) return diff --git a/src/constants.js b/src/constants.js index b612d3bc..3edc1231 100644 --- a/src/constants.js +++ b/src/constants.js @@ -1,13 +1,13 @@ 'use strict' -const second = 1000 +const SECOND = 1000 module.exports = { maxProvidersPerRequest: 3, - providerRequestTimeout: 10 * second, - hasBlockTimeout: 15 * second, - provideTimeout: 15 * second, + providerRequestTimeout: 10 * SECOND, + hasBlockTimeout: 15 * SECOND, + provideTimeout: 15 * SECOND, kMaxPriority: Math.pow(2, 31) - 1, - rebroadcastDelay: 10 * second, + rebroadcastDelay: 10 * SECOND, maxListeners: 1000 } diff --git a/src/decision/index.js b/src/decision/index.js deleted file mode 100644 index 9e6bb4ad..00000000 --- a/src/decision/index.js +++ /dev/null @@ -1,5 +0,0 @@ -'use strict' - -const Engine = require('./engine') - -exports.Engine = Engine diff --git a/src/decision/pq.js b/src/decision/pq.js deleted file mode 100644 index 427e71b5..00000000 --- a/src/decision/pq.js +++ /dev/null @@ -1,31 +0,0 @@ -'use strict' - -const Heap = require('heap') - -module.exports = class PriorityQueue { - constructor (cmp) { - this.q = new Heap((a, b) => { - return cmp(a, b) ? -1 : 1 - }) - } - - push (e) { - this.q.push(e) - } - - pop () { - return this.q.pop() - } - - update (e) { - this.q.updateItem(e) - } - - size () { - return this.q.size() - } - - isEmpty () { - return this.q.empty() - } -} diff --git a/src/index.js b/src/index.js index ad1e9557..e3831546 100644 --- a/src/index.js +++ b/src/index.js @@ -2,30 +2,30 @@ const series = require('async/series') const debug = require('debug') + const log = debug('bitswap') log.error = debug('bitswap:error') const EventEmitter = require('events').EventEmitter const pull = require('pull-stream') const paramap = require('pull-paramap') const defer = require('pull-defer/source') +const CID = require('cids') -const cs = require('./constants') -const WantManager = require('./wantmanager') -const Network = require('./network') -const decision = require('./decision') - -module.exports = class Bitwap { - constructor (p, libp2p, blockstore, peerBook) { - // the ID of the peer to act on behalf of - this.self = p +const CONSTANTS = require('./constants') +const WantManager = require('./components/want-manager') +const Network = require('./components/network') +const DecisionEngine = require('./components/decision-engine') +class Bitswap { + constructor (libp2p, blockstore, peerBook) { + this.libp2p = libp2p // the network delivers messages this.network = new Network(libp2p, peerBook, this) // local database this.blockstore = blockstore - this.engine = new decision.Engine(blockstore, this.network) + this.engine = new DecisionEngine(blockstore, this.network) // handle message sending this.wm = new WantManager(this.network) @@ -35,7 +35,7 @@ module.exports = class Bitwap { this.dupDataRecvd = 0 this.notifications = new EventEmitter() - this.notifications.setMaxListeners(cs.maxListeners) + this.notifications.setMaxListeners(CONSTANTS.maxListeners) } // handle messages received through the network @@ -46,79 +46,81 @@ module.exports = class Bitwap { log('failed to receive message', incoming) } - const iblocks = Array.from(incoming.blocks.values()) + const cidsAndBlocks = Array + .from(incoming.blocks.entries()) + .map((entry) => { + return { cid: new CID(entry[0]), block: entry[1] } + }) - if (iblocks.length === 0) { + if (cidsAndBlocks.length === 0) { return cb() } // quickly send out cancels, reduces chances of duplicate block receives - pull( - pull.values(iblocks), - pull.asyncMap((block, cb) => block.key(cb)), - pull.filter((key) => this.wm.wl.contains(key)), - pull.collect((err, keys) => { + pull.values(cidsAndBlocks), + pull.filter((cidAndBlock) => this.wm.wantlist.contains(cidAndBlock.cid)), + pull.collect((err, cidsAndBlocks) => { if (err) { return log.error(err) } - this.wm.cancelWants(keys) + const cids = cidsAndBlocks.map((entry) => entry.cid) + + this.wm.cancelWants(cids) }) ) pull( - pull.values(iblocks), + pull.values(cidsAndBlocks), paramap(this._handleReceivedBlock.bind(this, peerId), 10), pull.onEnd(cb) ) }) } - _handleReceivedBlock (peerId, block, cb) { + _handleReceivedBlock (peerId, cidAndBlock, callback) { series([ - (cb) => this._updateReceiveCounters(block, (err) => { + (cb) => this._updateReceiveCounters(cidAndBlock.block, (err) => { if (err) { // ignore, as these have been handled // in _updateReceiveCounters return cb() } + log('got block from %s', peerId.toB58String(), cidAndBlock.block.data.length) cb() }), - (cb) => block.key((err, key) => { - if (err) { - return cb(err) - } - this.put({data: block.data, key: key}, (err) => { + (cb) => { + this.put(cidAndBlock, (err) => { if (err) { log.error('receiveMessage put error: %s', err.message) } cb() }) - }) - ], cb) + } + ], callback) } - _updateReceiveCounters (block, cb) { - this.blocksRecvd ++ + _updateReceiveCounters (block, callback) { + this.blocksRecvd++ block.key((err, key) => { if (err) { - return cb(err) + return callback(err) } this.blockstore.has(key, (err, has) => { if (err) { log('blockstore.has error: %s', err.message) - return cb(err) + return callback(err) } if (has) { this.dupBlocksRecvd ++ this.dupDataRecvd += block.data.length - return cb(new Error('Already have block')) + return callback(new Error('Already have block')) } - cb() + callback() }) }) } @@ -144,16 +146,16 @@ module.exports = class Bitwap { return this.engine.wantlistForPeer(peerId) } - getStream (keys) { - if (!Array.isArray(keys)) { - return this._getStreamSingle(keys) + getStream (cids) { + if (!Array.isArray(cids)) { + return this._getStreamSingle(cids) } return pull( - pull.values(keys), - paramap((key, cb) => { + pull.values(cids), + paramap((cid, cb) => { pull( - this._getStreamSingle(key), + this._getStreamSingle(cid), pull.collect(cb) ) }), @@ -161,102 +163,107 @@ module.exports = class Bitwap { ) } - _getStreamSingle (key) { + _getStreamSingle (cid) { const unwantListeners = {} const blockListeners = {} - const keyS = key.toString() + const cidStr = cid.buffer.toString() + const unwantEvent = `unwant:${cidStr}` + const blockEvent = `block:${cidStr}` - const unwantEvent = () => `unwant:${keyS}` - const blockEvent = () => `block:${keyS}` const d = defer() const cleanupListener = () => { - if (unwantListeners[keyS]) { - this.notifications.removeListener(unwantEvent(), unwantListeners[keyS]) - delete unwantListeners[keyS] + if (unwantListeners[cidStr]) { + this.notifications.removeListener(unwantEvent, unwantListeners[cidStr]) + delete unwantListeners[cidStr] } - if (blockListeners[keyS]) { - this.notifications.removeListener(blockEvent(), blockListeners[keyS]) - delete blockListeners[keyS] + if (blockListeners[cidStr]) { + this.notifications.removeListener(blockEvent, blockListeners[cidStr]) + delete blockListeners[cidStr] } } const addListener = () => { - unwantListeners[keyS] = () => { - log(`manual unwant: ${keyS}`) + unwantListeners[cidStr] = () => { + log(`manual unwant: ${cidStr}`) cleanupListener() - this.wm.cancelWants([key]) + this.wm.cancelWants([cid]) d.resolve(pull.empty()) } - blockListeners[keyS] = (block) => { - this.wm.cancelWants([key]) - cleanupListener() + blockListeners[cidStr] = (block) => { + this.wm.cancelWants([cid]) + cleanupListener(cid) d.resolve(pull.values([block])) } - this.notifications.once(unwantEvent(), unwantListeners[keyS]) - this.notifications.once(blockEvent(), blockListeners[keyS]) + this.notifications.once(unwantEvent, unwantListeners[cidStr]) + this.notifications.once(blockEvent, blockListeners[cidStr]) } - this.blockstore.has(key, (err, exists) => { + this.blockstore.has(cid.multihash, (err, exists) => { if (err) { return d.resolve(pull.error(err)) } if (exists) { - log('already have block') - return d.resolve(this.blockstore.getStream(key)) + log('already have block: %s', cidStr) + return d.resolve(this.blockstore.getStream(cid.multihash)) } addListener() - this.wm.wantBlocks([key]) + this.wm.wantBlocks([cid]) }) return d } - // removes the given keys from the want list independent of any ref counts - unwant (keys) { - if (!Array.isArray(keys)) { - keys = [keys] + // removes the given cids from the wantlist independent of any ref counts + unwant (cids) { + if (!Array.isArray(cids)) { + cids = [cids] } - this.wm.unwantBlocks(keys) - keys.forEach((key) => { - this.notifications.emit(`unwant:${key.toString()}`) + this.wm.unwantBlocks(cids) + cids.forEach((cid) => { + this.notifications.emit(`unwant:${cid.buffer.toString()}`) }) } // removes the given keys from the want list - cancelWants (keys) { - if (!Array.isArray(keys)) { - keys = [keys] + cancelWants (cids) { + if (!Array.isArray(cids)) { + cids = [cids] } - this.wm.cancelWants(keys) + this.wm.cancelWants(cids) } putStream () { return pull( - pull.asyncMap((blockAndKey, cb) => { - this.blockstore.has(blockAndKey.key, (err, exists) => { + pull.asyncMap((blockAndCid, cb) => { + this.blockstore.has(blockAndCid.cid.multihash, (err, exists) => { if (err) { return cb(err) } - cb(null, [blockAndKey, exists]) + + cb(null, [blockAndCid, exists]) }) }), pull.filter((val) => !val[1]), pull.map((val) => { - const block = val[0] + const block = val[0].block + const cid = val[0].cid log('putting block') return pull( - pull.values([block]), + pull.values([{ + data: block.data, + key: cid.multihash + }]), this.blockstore.putStream(), pull.through(() => { log('put block') - this.notifications.emit(`block:${block.key.toString()}`, block) - this.engine.receivedBlocks([block.key]) + this.notifications.emit(`block:${cid.buffer.toString()}`, block) + this.engine.receivedBlocks([cid]) }) ) }), @@ -265,16 +272,16 @@ module.exports = class Bitwap { } // announces the existance of a block to this service - put (blockAndKey, cb) { + put (blockAndCid, callback) { pull( - pull.values([blockAndKey]), + pull.values([blockAndCid]), this.putStream(), - pull.onEnd(cb) + pull.onEnd(callback) ) } getWantlist () { - return this.wm.wl.entries() + return this.wm.wantlist.entries() } stat () { @@ -295,8 +302,10 @@ module.exports = class Bitwap { // Halt everything stop () { - this.wm.stop() + this.wm.stop(this.libp2p.peerInfo.id) this.network.stop() this.engine.stop() } } + +module.exports = Bitswap diff --git a/src/message/entry.js b/src/message/entry.js deleted file mode 100644 index 85e592c0..00000000 --- a/src/message/entry.js +++ /dev/null @@ -1,38 +0,0 @@ -'use strict' - -const WantlistEntry = require('../wantlist').Entry - -module.exports = class BitswapMessageEntry { - constructor (key, priority, cancel) { - this.entry = new WantlistEntry(key, priority) - this.cancel = Boolean(cancel) - } - - get key () { - return this.entry.key - } - - set key (val) { - this.entry.key = val - } - - get priority () { - return this.entry.priority - } - - set priority (val) { - this.entry.priority = val - } - - get [Symbol.toStringTag] () { - return `BitswapMessageEntry ${this.toB58String()} ` - } - - toB58String () { - return this.entry.toB58String() - } - - equals (other) { - return (this.cancel === other.cancel) && this.entry.equals(other.entry) - } -} diff --git a/src/message/index.js b/src/message/index.js deleted file mode 100644 index 8f0e9db1..00000000 --- a/src/message/index.js +++ /dev/null @@ -1,119 +0,0 @@ -'use strict' - -const protobuf = require('protocol-buffers') -const Block = require('ipfs-block') -const isEqualWith = require('lodash.isequalwith') -const map = require('async/map') - -const pbm = protobuf(require('./message.proto')) -const Entry = require('./entry') - -class BitswapMessage { - constructor (full) { - this.full = full - this.wantlist = new Map() - this.blocks = new Map() - } - - get empty () { - return this.blocks.size === 0 && this.wantlist.size === 0 - } - - addEntry (key, priority, cancel) { - const e = this.wantlist.get(key.toString()) - - if (e) { - e.priority = priority - e.cancel = Boolean(cancel) - } else { - this.wantlist.set(key.toString(), new Entry(key, priority, cancel)) - } - } - - addBlock (block, cb) { - block.key((err, key) => { - if (err) { - return cb(err) - } - this.blocks.set(key.toString(), block) - cb() - }) - } - - addBlockWithKey (block, key) { - this.blocks.set(key.toString(), block) - } - - cancel (key) { - const keyS = key.toString() - if (this.wantlist.has(keyS)) { - this.wantlist.delete(keyS) - } else { - this.wantlist.set(keyS, new Entry(key, 0, true)) - } - } - - toProto () { - const msg = { - wantlist: { - entries: Array.from(this.wantlist.values()).map((e) => { - return { - block: e.key, - priority: Number(e.priority), - cancel: Boolean(e.cancel) - } - }) - }, - blocks: Array.from(this.blocks.values()) - .map((b) => b.data) - } - - if (this.full) { - msg.wantlist.full = true - } - - return pbm.Message.encode(msg) - } - - equals (other) { - const cmp = (a, b) => { - if (a.equals && typeof a.equals === 'function') { - return a.equals(b) - } - } - - if (this.full !== other.full || - !isEqualWith(this.wantlist, other.wantlist, cmp) || - !isEqualWith(this.blocks, other.blocks, cmp) - ) { - return false - } - - return true - } - - get [Symbol.toStringTag] () { - const list = Array.from(this.wantlist.keys()) - const blocks = Array.from(this.blocks.keys()) - return `BitswapMessage ` - } -} - -BitswapMessage.fromProto = (raw, callback) => { - const dec = pbm.Message.decode(raw) - const m = new BitswapMessage(dec.wantlist.full) - - dec.wantlist.entries.forEach((e) => { - m.addEntry(e.block, e.priority, e.cancel) - }) - - map(dec.blocks, (b, cb) => m.addBlock(new Block(b), cb), (err) => { - if (err) { - return callback(err) - } - callback(null, m) - }) -} - -BitswapMessage.Entry = Entry -module.exports = BitswapMessage diff --git a/src/message/message.proto.js b/src/message/message.proto.js deleted file mode 100644 index 6038b42a..00000000 --- a/src/message/message.proto.js +++ /dev/null @@ -1,21 +0,0 @@ -'use strict' - -module.exports = `package bitswap.message.pb; - -message Message { - - message Wantlist { - - message Entry { - optional bytes block = 1; // the block key - optional int32 priority = 2; // the priority (normalized). default to 1 - optional bool cancel = 3; // whether this revokes an entry - } - - repeated Entry entries = 1; // a list of wantlist entries - optional bool full = 2; // whether this is the full wantlist. default to false - } - - optional Wantlist wantlist = 1; - repeated bytes blocks = 2; -}` diff --git a/src/types/message/entry.js b/src/types/message/entry.js new file mode 100644 index 00000000..079ade8a --- /dev/null +++ b/src/types/message/entry.js @@ -0,0 +1,40 @@ +'use strict' + +const WantlistEntry = require('../wantlist').Entry +const CID = require('cids') +const assert = require('assert') + +module.exports = class BitswapMessageEntry { + constructor (cid, priority, cancel) { + assert(CID.isCID(cid), 'needs valid cid') + this.entry = new WantlistEntry(cid, priority) + this.cancel = Boolean(cancel) + } + + get cid () { + return this.entry.cid + } + + set cid (cid) { + this.entry.cid = cid + } + + get priority () { + return this.entry.priority + } + + set priority (val) { + this.entry.priority = val + } + + get [Symbol.toStringTag] () { + const cidStr = this.cid.toBaseEncodedString() + + return `BitswapMessageEntry ${cidStr} ` + } + + equals (other) { + return (this.cancel === other.cancel) && + this.entry.equals(other.entry) + } +} diff --git a/src/types/message/index.js b/src/types/message/index.js new file mode 100644 index 00000000..5e9f14c3 --- /dev/null +++ b/src/types/message/index.js @@ -0,0 +1,210 @@ +'use strict' + +const protobuf = require('protocol-buffers') +const Block = require('ipfs-block') +const isEqualWith = require('lodash.isequalwith') +const assert = require('assert') +const map = require('async/map') +const CID = require('cids') +const codecName = require('multicodec/src/name-table') +const vd = require('varint-decoder') + +const pbm = protobuf(require('./message.proto')) +const Entry = require('./entry') + +class BitswapMessage { + constructor (full) { + this.full = full + this.wantlist = new Map() + this.blocks = new Map() + } + + get empty () { + return this.blocks.size === 0 && + this.wantlist.size === 0 + } + + addEntry (cid, priority, cancel) { + assert(cid && CID.isCID(cid), 'must be a valid cid') + const cidStr = cid.toBaseEncodedString() + + const entry = this.wantlist.get(cidStr) + + if (entry) { + entry.priority = priority + entry.cancel = Boolean(cancel) + } else { + this.wantlist.set(cidStr, new Entry(cid, priority, cancel)) + } + } + + addBlock (cid, block) { + assert(CID.isCID(cid), 'must be a valid cid') + const cidStr = cid.toBaseEncodedString() + this.blocks.set(cidStr, block) + } + + cancel (cid) { + assert(CID.isCID(cid), 'must be a valid cid') + const cidStr = cid.toBaseEncodedString() + this.wantlist.delete(cidStr) + this.addEntry(cid, 0, true) + } + + /* + * Serializes to Bitswap Message protobuf of + * version 1.0.0 + */ + serializeToBitswap100 () { + const msg = { + wantlist: { + entries: Array.from(this.wantlist.values()).map((entry) => { + return { + block: entry.cid.buffer, // cid + priority: Number(entry.priority), + cancel: Boolean(entry.cancel) + } + }) + }, + blocks: Array.from(this.blocks.values()) + .map((block) => block.data) + } + + if (this.full) { + msg.wantlist.full = true + } + + return pbm.Message.encode(msg) + } + + /* + * Serializes to Bitswap Message protobuf of + * version 1.1.0 + */ + serializeToBitswap110 () { + const msg = { + wantlist: { + entries: Array.from(this.wantlist.values()).map((entry) => { + return { + block: entry.cid.buffer, // cid + priority: Number(entry.priority), + cancel: Boolean(entry.cancel) + } + }) + }, + payload: [] + } + + if (this.full) { + msg.wantlist.full = true + } + + this.blocks.forEach((block, cidStr) => { + const cid = new CID(cidStr) + msg.payload.push({ + prefix: cid.prefix, + data: block.data + }) + }) + + return pbm.Message.encode(msg) + } + + equals (other) { + const cmp = (a, b) => { + if (a.equals && typeof a.equals === 'function') { + return a.equals(b) + } + } + + if (this.full !== other.full || + !isEqualWith(this.wantlist, other.wantlist, cmp) || + !isEqualWith(this.blocks, other.blocks, cmp) + ) { + return false + } + + return true + } + + get [Symbol.toStringTag] () { + const list = Array.from(this.wantlist.keys()) + const blocks = Array.from(this.blocks.keys()) + return `BitswapMessage ` + } +} + +BitswapMessage.deserialize = (raw, callback) => { + let decoded + try { + decoded = pbm.Message.decode(raw) + } catch (err) { + return setImmediate(() => callback(err)) + } + + const isFull = (decoded.wantlist && decoded.wantlist.full) || false + const msg = new BitswapMessage(isFull) + + if (decoded.wantlist) { + decoded.wantlist.entries.forEach((entry) => { + // note: entry.block is the CID here + const cid = new CID(entry.block) + msg.addEntry(cid, entry.priority, entry.cancel) + }) + } + + // Bitswap 1.0.0 + // decoded.blocks are just the byte arrays + if (decoded.blocks.length > 0) { + map(decoded.blocks, (b, cb) => { + const block = new Block(b) + block.key((err, key) => { + if (err) { + return cb(err) + } + const cid = new CID(key) + msg.addBlock(cid, block) + cb() + }) + }, (err) => { + if (err) { + return callback(err) + } + callback(null, msg) + }) + return + } + + // Bitswap 1.1.0 + if (decoded.payload.length > 0) { + map(decoded.payload, (p, cb) => { + if (!p.prefix || !p.data) { + cb() + } + const values = vd(p.prefix) + const cidVersion = values[0] + const multicodec = values[1] + const hashAlg = values[2] + // const hashLen = values[3] // We haven't need to use this so far + const block = new Block(p.data) + block.key(hashAlg, (err, multihash) => { + if (err) { + return cb(err) + } + const cid = new CID(cidVersion, codecName[multicodec.toString('16')], multihash) + msg.addBlock(cid, block) + cb() + }) + }, (err) => { + if (err) { + return callback(err) + } + callback(null, msg) + }) + return + } + callback(null, msg) +} + +BitswapMessage.Entry = Entry +module.exports = BitswapMessage diff --git a/src/types/message/message.proto.js b/src/types/message/message.proto.js new file mode 100644 index 00000000..770e6f75 --- /dev/null +++ b/src/types/message/message.proto.js @@ -0,0 +1,28 @@ +'use strict' + +// from: https://github.com/ipfs/go-ipfs/blob/master/exchange/bitswap/message/pb/message.proto + +module.exports = ` + message Message { + message Wantlist { + message Entry { + // changed from string to bytes, it makes a difference in JavaScript + optional bytes block = 1; // the block cid (cidV0 in bitswap 1.0.0, cidV1 in bitswap 1.1.0) + optional int32 priority = 2; // the priority (normalized). default to 1 + optional bool cancel = 3; // whether this revokes an entry + } + + repeated Entry entries = 1; // a list of wantlist entries + optional bool full = 2; // whether this is the full wantlist. default to false + } + + message Block { + optional bytes prefix = 1; // CID prefix (cid version, multicodec and multihash prefix (type + length) + optional bytes data = 2; + } + + optional Wantlist wantlist = 1; + repeated bytes blocks = 2; // used to send Blocks in bitswap 1.0.0 + repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0 + } +` diff --git a/src/types/wantlist/entry.js b/src/types/wantlist/entry.js new file mode 100644 index 00000000..bf774b9b --- /dev/null +++ b/src/types/wantlist/entry.js @@ -0,0 +1,42 @@ +'use strict' + +const assert = require('assert') +const CID = require('cids') + +class WantListEntry { + constructor (cid, priority) { + assert(CID.isCID(cid), 'must be valid CID') + + // Keep track of how many requests we have for this key + this._refCounter = 1 + + this.cid = cid + this.priority = priority || 1 + } + + inc () { + this._refCounter += 1 + } + + dec () { + this._refCounter = Math.max(0, this._refCounter - 1) + } + + hasRefs () { + return this._refCounter > 0 + } + + // So that console.log prints a nice description of this object + get [Symbol.toStringTag] () { + const cidStr = this.cid.toBaseEncodedString() + return `WantlistEntry ` + } + + equals (other) { + return (this._refCounter === other._refCounter) && + this.cid.equals(other.cid) && + this.priority === other.priority + } +} + +module.exports = WantListEntry diff --git a/src/types/wantlist/index.js b/src/types/wantlist/index.js new file mode 100644 index 00000000..c12f8ebe --- /dev/null +++ b/src/types/wantlist/index.js @@ -0,0 +1,65 @@ +'use strict' + +const Entry = require('./entry') + +class Wantlist { + constructor () { + this.set = new Map() + } + + get length () { + return this.set.size + } + + add (cid, priority) { + const cidStr = cid.buffer.toString() + const entry = this.set.get(cidStr) + + if (entry) { + entry.inc() + entry.priority = priority + } else { + this.set.set(cidStr, new Entry(cid, priority)) + } + } + + remove (cid) { + const cidStr = cid.buffer.toString() + const entry = this.set.get(cidStr) + + if (!entry) { + return + } + + entry.dec() + + // only delete when no refs are held + if (entry.hasRefs()) { + return + } + + this.set.delete(cidStr) + } + + removeForce (cidStr) { + if (this.set.has(cidStr)) { + this.set.delete(cidStr) + } + } + + entries () { + return this.set.entries() + } + + sortedEntries () { + return new Map(Array.from(this.set.entries()).sort()) + } + + contains (cid) { + const cidStr = cid.buffer.toString() + return this.set.get(cidStr) + } +} + +Wantlist.Entry = Entry +module.exports = Wantlist diff --git a/src/wantlist/entry.js b/src/wantlist/entry.js deleted file mode 100644 index 1af066b8..00000000 --- a/src/wantlist/entry.js +++ /dev/null @@ -1,55 +0,0 @@ -'use strict' - -const assert = require('assert') -const isUndefined = require('lodash.isundefined') -const mh = require('multihashes') - -module.exports = class WantlistEntry { - constructor (key, priority) { - assert(Buffer.isBuffer(key), 'key must be a buffer') - // Keep track of how many requests we have for this key - this._refCounter = 1 - - this._key = key - this.priority = isUndefined(priority) ? 1 : priority - this._keyB58String = '' - } - - get key () { - return this._key - } - - set key (val) { - throw new Error('immutable key') - } - - inc () { - this._refCounter += 1 - } - - dec () { - this._refCounter = Math.max(0, this._refCounter - 1) - } - - hasRefs () { - return this._refCounter > 0 - } - - toB58String () { - if (!this._keyB58String) { - this._keyB58String = mh.toB58String(this.key) - } - - return this._keyB58String - } - - get [Symbol.toStringTag] () { - return `WantlistEntry ` - } - - equals (other) { - return (this._refCounter === other._refCounter) && - this.key.equals(other.key) && - this.priority === other.priority - } -} diff --git a/src/wantlist/index.js b/src/wantlist/index.js deleted file mode 100644 index a6d4bdc1..00000000 --- a/src/wantlist/index.js +++ /dev/null @@ -1,58 +0,0 @@ -'use strict' - -const Entry = require('./entry') - -class Wantlist { - constructor () { - this.set = new Map() - } - - get length () { - return this.set.size - } - - add (key, priority) { - const e = this.set.get(key.toString()) - - if (e) { - e.inc() - e.priority = priority - } else { - this.set.set(key.toString(), new Entry(key, priority)) - } - } - - remove (key) { - const e = this.set.get(key.toString()) - - if (!e) return - - e.dec() - - // only delete when no refs are held - if (e.hasRefs()) return - - this.set.delete(key.toString()) - } - - removeForce (key) { - if (this.set.has(key.toString())) { - this.set.delete(key.toString()) - } - } - - entries () { - return this.set.entries() - } - - sortedEntries () { - return new Map(Array.from(this.set.entries()).sort()) - } - - contains (key) { - return this.set.get(key.toString()) - } -} - -Wantlist.Entry = Entry -module.exports = Wantlist diff --git a/test/browser.js b/test/browser.js index 6fbd837b..f5b811c0 100644 --- a/test/browser.js +++ b/test/browser.js @@ -65,4 +65,4 @@ const repo = { } require('./index-test')(repo) -require('./decision/engine-test')(repo) +require('./components/decision-engine/index-test')(repo) diff --git a/test/decision/engine-test.js b/test/components/decision-engine/index-test.js similarity index 51% rename from test/decision/engine-test.js rename to test/components/decision-engine/index-test.js index 2d32fc52..e09f43d8 100644 --- a/test/decision/engine-test.js +++ b/test/components/decision-engine/index-test.js @@ -12,29 +12,36 @@ const map = require('async/map') const eachSeries = require('async/eachSeries') const pull = require('pull-stream') const paramap = require('pull-paramap') +const CID = require('cids') -const Message = require('../../src/message') -const Engine = require('../../src/decision/engine') +const Message = require('../../../src/types/message') +const DecisionEngine = require('../../../src/components/decision-engine') -const mockNetwork = require('../utils').mockNetwork +const mockNetwork = require('../../utils').mockNetwork + +function messageToString (m) { + return Array.from(m[1].blocks.values()) + .map((b) => b.data.toString()) +} + +function stringifyMessages (messages) { + return _.flatten(messages.map(messageToString)) +} module.exports = (repo) => { - function newEngine (id, done) { + function newEngine (path, done) { parallel([ - (cb) => repo.create(id, cb), + (cb) => repo.create(path, cb), (cb) => PeerId.create(cb) ], (err, results) => { if (err) { return done(err) } const blockstore = results[0].blockstore - const engine = new Engine(blockstore, mockNetwork()) + const engine = new DecisionEngine(blockstore, mockNetwork()) engine.start() - done(null, { - peer: results[1], - engine - }) + done(null, { peer: results[1], engine }) }) } @@ -61,45 +68,30 @@ module.exports = (repo) => { }), paramap((block, cb) => { const m = new Message(false) - m.addBlock(block, (err) => { + block.key((err, key) => { if (err) { return cb(err) } - block.key((err, key) => { - if (err) { - return cb(err) - } - sender.engine.messageSent(receiver.peer, block, key) - receiver.engine.messageReceived(sender.peer, m, cb) - }) + const cid = new CID(key) + m.addBlock(cid, block) + sender.engine.messageSent(receiver.peer, block, cid) + receiver.engine.messageReceived(sender.peer, m, cb) }) }, 100), pull.onEnd((err) => { expect(err).to.not.exist - expect( - sender.engine.numBytesSentTo(receiver.peer) - ).to.be.above( - 0 - ) - - expect( - sender.engine.numBytesSentTo(receiver.peer) - ).to.be.eql( - receiver.engine.numBytesReceivedFrom(sender.peer) - ) - - expect( - receiver.engine.numBytesSentTo(sender.peer) - ).to.be.eql( - 0 - ) - - expect( - sender.engine.numBytesReceivedFrom(receiver.peer) - ).to.be.eql( - 0 - ) + expect(sender.engine.numBytesSentTo(receiver.peer)) + .to.be.above(0) + + expect(sender.engine.numBytesSentTo(receiver.peer)) + .to.eql(receiver.engine.numBytesReceivedFrom(sender.peer)) + + expect(receiver.engine.numBytesSentTo(sender.peer)) + .to.eql(0) + + expect(sender.engine.numBytesReceivedFrom(receiver.peer)) + .to.eql(0) done() }) @@ -115,30 +107,20 @@ module.exports = (repo) => { expect(err).to.not.exist const sanfrancisco = res[0] - const seatlle = res[1] + const seattle = res[1] const m = new Message(true) - sanfrancisco.engine.messageSent(seatlle.peer) - seatlle.engine.messageReceived(sanfrancisco.peer, m, (err) => { + sanfrancisco.engine.messageSent(seattle.peer) + seattle.engine.messageReceived(sanfrancisco.peer, m, (err) => { expect(err).to.not.exist - expect( - seatlle.peer.toHexString() - ).to.not.be.eql( - sanfrancisco.peer.toHexString() - ) - - expect( - sanfrancisco.engine.peers() - ).to.include( - seatlle.peer - ) - - expect( - seatlle.engine.peers() - ).to.include( - sanfrancisco.peer - ) + expect(seattle.peer.toHexString()) + .to.not.eql(sanfrancisco.peer.toHexString()) + + expect(sanfrancisco.engine.peers()).to.include(seattle.peer) + + expect(seattle.engine.peers()) + .to.include(sanfrancisco.peer) done() }) }) @@ -153,6 +135,35 @@ module.exports = (repo) => { [alphabet, _.difference(alphabet, vowels)] ] + function partnerWants (dEngine, values, partner, cb) { + const message = new Message(false) + const blocks = values.map((k) => new Block(k)) + + map(blocks, (b, cb) => b.key(cb), (err, keys) => { + expect(err).to.not.exist + keys.forEach((key, i) => { + const cid = new CID(key) + message.addEntry(cid, Math.pow(2, 32) - 1 - i) + }) + + dEngine.messageReceived(partner, message, cb) + }) + } + + function partnerCancels (dEngine, values, partner, cb) { + const message = new Message(false) + const blocks = values.map((k) => new Block(k)) + + map(blocks, (b, cb) => b.key(cb), (err, keys) => { + expect(err).to.not.exist + keys.forEach((key) => { + const cid = new CID(key) + message.cancel(cid) + }) + dEngine.messageReceived(partner, message, cb) + }) + } + repo.create('p', (err, repo) => { expect(err).to.not.exist @@ -164,58 +175,32 @@ module.exports = (repo) => { if (err) { return cb(err) } - cb(null, {data: block.data, key: key}) + cb(null, { data: block.data, key: key }) }) }), repo.blockstore.putStream(), pull.onEnd((err) => { expect(err).to.not.exist - const partnerWants = (e, keys, p, cb) => { - const add = new Message(false) - const blocks = keys.map((k) => new Block(k)) - map(blocks, (b, cb) => b.key(cb), (err, keys) => { - expect(err).to.not.exist - blocks.forEach((b, i) => { - add.addEntry(keys[i], Math.pow(2, 32) - 1 - i) - }) - - e.messageReceived(p, add, cb) - }) - } - - const partnerCancels = (e, keys, p, cb) => { - const cancels = new Message(false) - const blocks = keys.map((k) => new Block(k)) - map(blocks, (b, cb) => b.key(cb), (err, keys) => { - expect(err).to.not.exist - keys.forEach((k) => cancels.cancel(k)) - e.messageReceived(p, cancels, cb) - }) - } - eachSeries(_.range(numRounds), (i, cb) => { + // 2 test cases + // a) want alphabet - cancel vowels + // b) want alphabet - cancels everything except vowels + eachSeries(testCases, (testcase, innerCb) => { const set = testcase[0] const cancels = testcase[1] const keeps = _.difference(set, cancels) - const messageToString = (m) => { - return Array.from(m[1].blocks.values()) - .map((b) => b.data.toString()) - } - const stringifyMessages = (messages) => { - return _.flatten(messages.map(messageToString)) - } - const network = mockNetwork(1, (res) => { const msgs = stringifyMessages(res.messages) - expect(msgs.sort()).to.be.eql(keeps.sort()) + expect(msgs.sort()).to.eql(keeps.sort()) innerCb() }) - const e = new Engine(repo.blockstore, network) - e.start() + const dEngine = new DecisionEngine(repo.blockstore, network) + dEngine.start() + let partner series([ (cb) => PeerId.create((err, id) => { @@ -225,10 +210,10 @@ module.exports = (repo) => { partner = id cb() }), - (cb) => partnerWants(e, set, partner, cb), - (cb) => partnerCancels(e, cancels, partner, cb) + (cb) => partnerWants(dEngine, set, partner, cb), + (cb) => partnerCancels(dEngine, cancels, partner, cb) ], (err) => { - if (err) throw err + expect(err).to.not.exist }) }, cb) }, done) diff --git a/test/decision/ledger.spec.js b/test/components/decision-engine/ledger.spec.js similarity index 58% rename from test/decision/ledger.spec.js rename to test/components/decision-engine/ledger.spec.js index 712ff8c0..11c2a89d 100644 --- a/test/decision/ledger.spec.js +++ b/test/components/decision-engine/ledger.spec.js @@ -4,24 +4,25 @@ const expect = require('chai').expect const PeerId = require('peer-id') -const Ledger = require('../../src/decision/ledger') +const Ledger = require('../../../src/components/decision-engine/ledger') describe('Ledger', () => { - let p + let peerId let ledger before((done) => { - PeerId.create((err, id) => { + PeerId.create((err, _peerId) => { if (err) { return done(err) } - p = id + peerId = _peerId done() }) }) + beforeEach(() => { - ledger = new Ledger(p) + ledger = new Ledger(peerId) }) it('accounts', () => { @@ -30,11 +31,10 @@ describe('Ledger', () => { ledger.receivedBytes(223432) ledger.receivedBytes(2333) - expect( - ledger.accounting - ).to.be.eql({ - bytesSent: 100 + 12000, - bytesRecv: 223432 + 2333 - }) + expect(ledger.accounting) + .to.eql({ + bytesSent: 100 + 12000, + bytesRecv: 223432 + 2333 + }) }) }) diff --git a/test/network/gen-bitswap-network.node.js b/test/components/network/gen-bitswap-network.node.js similarity index 86% rename from test/network/gen-bitswap-network.node.js rename to test/components/network/gen-bitswap-network.node.js index b0e02d42..ad868337 100644 --- a/test/network/gen-bitswap-network.node.js +++ b/test/components/network/gen-bitswap-network.node.js @@ -12,7 +12,8 @@ const Block = require('ipfs-block') const Buffer = require('safe-buffer').Buffer const pull = require('pull-stream') const crypto = require('crypto') -const utils = require('../utils') +const utils = require('../../utils') +const CID = require('cids') describe('gen Bitswap network', function () { // CI is very slow @@ -40,13 +41,16 @@ describe('gen Bitswap network', function () { (cb) => { pull( pull.values(blocks), - pull.asyncMap((b, cb) => { - b.key((err, key) => { + pull.asyncMap((block, cb) => { + block.key((err, key) => { if (err) { return cb(err) } - cb(null, {data: b.data, key: key}) + cb(null, { + block: block, + cid: new CID(key) + }) }) }), node.bitswap.putStream(), @@ -55,10 +59,11 @@ describe('gen Bitswap network', function () { }, (cb) => { each(_.range(100), (i, cb) => { - map(blocks, (b, cb) => b.key(cb), (err, keys) => { + map(blocks, (block, cb) => block.key(cb), (err, keys) => { + const cids = keys.map((key) => new CID(key)) expect(err).to.not.exist pull( - node.bitswap.getStream(keys), + node.bitswap.getStream(cids), pull.collect((err, res) => { expect(err).to.not.exist expect(res).to.have.length(blocks.length) @@ -114,6 +119,7 @@ function round (nodeArr, n, cb) { if (err) { return cb(err) } + const cids = keys.map((k) => new CID(k)) let d series([ // put blockFactor amount of blocks per node @@ -123,8 +129,8 @@ function round (nodeArr, n, cb) { const data = _.map(_.range(blockFactor), (j) => { const index = i * blockFactor + j return { - data: blocks[index].data, - key: keys[index] + block: blocks[index], + cid: cids[index] } }) each( @@ -140,7 +146,7 @@ function round (nodeArr, n, cb) { // fetch all blocks on every node (cb) => parallel(_.map(nodeArr, (node, i) => (callback) => { pull( - node.bitswap.getStream(keys), + node.bitswap.getStream(cids), pull.collect((err, res) => { if (err) { return callback(err) diff --git a/test/components/network/network.node.js b/test/components/network/network.node.js new file mode 100644 index 00000000..b40fca81 --- /dev/null +++ b/test/components/network/network.node.js @@ -0,0 +1,365 @@ +/* eslint-env mocha */ +'use strict' + +const Node = require('libp2p-ipfs-nodejs') +const PeerInfo = require('peer-info') +const multiaddr = require('multiaddr') +const expect = require('chai').expect +const PeerBook = require('peer-book') +const Block = require('ipfs-block') +const lp = require('pull-length-prefixed') +const pull = require('pull-stream') +const parallel = require('async/parallel') +const CID = require('cids') + +const Network = require('../../../src/components/network') +const Message = require('../../../src/types/message') + +describe('network', () => { + let libp2pNodeA + let peerInfoA + let peerBookA + let networkA + + let libp2pNodeB + let peerInfoB + let peerBookB + let networkB + + let libp2pNodeC + let peerInfoC + let peerBookC + let networkC + + let blocks + + before((done) => { + let counter = 0 + parallel([ + (cb) => PeerInfo.create(cb), + (cb) => PeerInfo.create(cb), + (cb) => PeerInfo.create(cb) + ], (err, results) => { + if (err) { + return done(err) + } + + peerInfoA = results[0] + peerInfoB = results[1] + peerInfoC = results[2] + + blocks = ['hello', 'world'].map((b) => new Block(b)) + + const maA = multiaddr('/ip4/127.0.0.1/tcp/10100/ipfs/' + peerInfoA.id.toB58String()) + const maB = multiaddr('/ip4/127.0.0.1/tcp/10300/ipfs/' + peerInfoB.id.toB58String()) + const maC = multiaddr('/ip4/127.0.0.1/tcp/10500/ipfs/' + peerInfoC.id.toB58String()) + + peerInfoA.multiaddr.add(maA) + peerInfoB.multiaddr.add(maB) + peerInfoC.multiaddr.add(maC) + + peerBookA = new PeerBook() + peerBookB = new PeerBook() + peerBookC = new PeerBook() + + peerBookA.put(peerInfoB) + peerBookA.put(peerInfoC) + + peerBookB.put(peerInfoA) + peerBookB.put(peerInfoC) + + peerBookC.put(peerInfoA) + peerBookC.put(peerInfoB) + + libp2pNodeA = new Node(peerInfoA, peerBookA) + libp2pNodeA.start(started) + libp2pNodeB = new Node(peerInfoB, peerBookB) + libp2pNodeB.start(started) + libp2pNodeC = new Node(peerInfoC, peerBookC) + libp2pNodeC.start(started) + + function started () { + if (++counter === 3) { + done() + } + } + }) + }) + + after((done) => { + let counter = 0 + libp2pNodeA.stop(stopped) + libp2pNodeB.stop(stopped) + libp2pNodeC.stop(stopped) + + function stopped () { + if (++counter === 3) { + done() + } + } + }) + + let bitswapMockA = { + _receiveMessage: () => {}, + _receiveError: () => {}, + _onPeerConnected: () => {}, + _onPeerDisconnected: () => {} + } + + let bitswapMockB = { + _receiveMessage: () => {}, + _receiveError: () => {}, + _onPeerConnected: () => {}, + _onPeerDisconnected: () => {} + } + + let bitswapMockC = { + _receiveMessage: () => {}, + _receiveError: () => {}, + _onPeerConnected: () => {}, + _onPeerDisconnected: () => {} + } + + it('instantiate the network obj', (done) => { + networkA = new Network(libp2pNodeA, peerBookA, bitswapMockA) + networkB = new Network(libp2pNodeB, peerBookB, bitswapMockB) + // only bitswap100 + networkC = new Network(libp2pNodeC, peerBookC, bitswapMockC, true) + + expect(networkA).to.exist + expect(networkB).to.exist + expect(networkC).to.exist + + networkA.start() + networkB.start() + networkC.start() + + done() + }) + + it('connectTo fail', (done) => { + networkA.connectTo(peerInfoB.id, (err) => { + expect(err).to.exist + done() + }) + }) + + it('onPeerConnected success', (done) => { + var counter = 0 + + bitswapMockA._onPeerConnected = (peerId) => { + expect(peerId.toB58String()).to.equal(peerInfoB.id.toB58String()) + if (++counter === 2) { + finish() + } + } + + bitswapMockB._onPeerConnected = (peerId) => { + expect(peerId.toB58String()).to.equal(peerInfoA.id.toB58String()) + if (++counter === 2) { + finish() + } + } + + libp2pNodeA.dialByPeerInfo(peerInfoB, (err) => { + expect(err).to.not.exist + }) + + function finish () { + bitswapMockA._onPeerConnected = () => {} + bitswapMockB._onPeerConnected = () => {} + done() + } + }) + + it('connectTo success', (done) => { + networkA.connectTo(peerInfoB.id, (err) => { + expect(err).to.not.exist + done() + }) + }) + + it('._receiveMessage success from Bitswap 1.0.0', (done) => { + const msg = new Message(true) + const b1 = blocks[0] + const b2 = blocks[1] + + b1.key((err, key1) => { + expect(err).to.not.exist + const cid1 = new CID(key1) + msg.addEntry(cid1, 0, false) + msg.addBlock(cid1, b1) + + b2.key((err, key2) => { + expect(err).to.not.exist + const cid2 = new CID(key2) + + msg.addBlock(cid2, b2) + + bitswapMockB._receiveMessage = (peerId, msgReceived) => { + expect(msg).to.eql(msgReceived) + bitswapMockB._receiveMessage = () => {} + bitswapMockB._receiveError = () => {} + done() + } + + bitswapMockB._receiveError = (err) => { + expect(err).to.not.exist + } + + libp2pNodeA.dialByPeerInfo(peerInfoB, '/ipfs/bitswap/1.0.0', (err, conn) => { + expect(err).to.not.exist + + pull( + pull.values([ + msg.serializeToBitswap100() + ]), + lp.encode(), + conn + ) + }) + }) + }) + }) + + it('._receiveMessage success from Bitswap 1.1.0', (done) => { + const msg = new Message(true) + const b1 = blocks[0] + const b2 = blocks[1] + + b1.key((err, key1) => { + expect(err).to.not.exist + const cid1 = new CID(key1) + msg.addEntry(cid1, 0, false) + msg.addBlock(cid1, b1) + + b2.key((err, key2) => { + expect(err).to.not.exist + const cid2 = new CID(key2) + + msg.addBlock(cid2, b2) + + bitswapMockB._receiveMessage = (peerId, msgReceived) => { + expect(msg).to.eql(msgReceived) + bitswapMockB._receiveMessage = () => {} + bitswapMockB._receiveError = () => {} + done() + } + + bitswapMockB._receiveError = (err) => { + expect(err).to.not.exist + } + + libp2pNodeA.dialByPeerInfo(peerInfoB, '/ipfs/bitswap/1.1.0', (err, conn) => { + expect(err).to.not.exist + + pull( + pull.values([ + msg.serializeToBitswap110() + ]), + lp.encode(), + conn + ) + }) + }) + }) + }) + + it('.sendMessage on Bitswap 1.1.0', (done) => { + const msg = new Message(true) + const b1 = blocks[0] + const b2 = blocks[1] + + b1.key((err, key1) => { + expect(err).to.not.exist + const cid1 = new CID(key1) + msg.addEntry(cid1, 0, false) + msg.addBlock(cid1, b1) + + b2.key((err, key2) => { + expect(err).to.not.exist + const cid2 = new CID(key2) + + msg.addBlock(cid2, b2) + + bitswapMockB._receiveMessage = (peerId, msgReceived) => { + expect(msg).to.eql(msgReceived) + bitswapMockB._receiveMessage = () => {} + bitswapMockB._receiveError = () => {} + done() + } + + bitswapMockB._receiveError = (err) => { + expect(err).to.not.exist + } + + networkA.sendMessage(peerInfoB.id, msg, (err) => { + expect(err).to.not.exist + }) + }) + }) + }) + + it('dial to peer on Bitswap 1.0.0', (done) => { + let counter = 0 + + bitswapMockA._onPeerConnected = (peerId) => { + expect(peerId.toB58String()).to.equal(peerInfoC.id.toB58String()) + if (++counter === 2) { + finish() + } + } + + bitswapMockC._onPeerConnected = (peerId) => { + expect(peerId.toB58String()).to.equal(peerInfoA.id.toB58String()) + if (++counter === 2) { + finish() + } + } + + libp2pNodeA.dialByPeerInfo(peerInfoC, (err) => { + expect(err).to.not.exist + }) + + function finish () { + bitswapMockA._onPeerConnected = () => {} + bitswapMockC._onPeerConnected = () => {} + networkA.connectTo(peerInfoC.id, done) + } + }) + + it('.sendMessage on Bitswap 1.1.0', (done) => { + const msg = new Message(true) + const b1 = blocks[0] + const b2 = blocks[1] + + b1.key((err, key1) => { + expect(err).to.not.exist + const cid1 = new CID(key1) + msg.addEntry(cid1, 0, false) + msg.addBlock(cid1, b1) + + b2.key((err, key2) => { + expect(err).to.not.exist + const cid2 = new CID(key2) + + msg.addBlock(cid2, b2) + + bitswapMockC._receiveMessage = (peerId, msgReceived) => { + expect(msg).to.eql(msgReceived) + bitswapMockC._receiveMessage = () => {} + bitswapMockC._receiveError = () => {} + done() + } + + bitswapMockC._receiveError = (err) => { + expect(err).to.not.exist + } + + networkA.sendMessage(peerInfoC.id, msg, (err) => { + expect(err).to.not.exist + }) + }) + }) + }) +}) diff --git a/test/components/wantmanager/index.spec.js b/test/components/wantmanager/index.spec.js new file mode 100644 index 00000000..cd0b09e0 --- /dev/null +++ b/test/components/wantmanager/index.spec.js @@ -0,0 +1,97 @@ +/* eslint-env mocha */ +'use strict' + +const expect = require('chai').expect +const PeerId = require('peer-id') +const parallel = require('async/parallel') +const series = require('async/series') +const map = require('async/map') +const Block = require('ipfs-block') +const CID = require('cids') + +const cs = require('../../../src/constants') +const Message = require('../../../src/types/message') +const WantManager = require('../../../src/components/want-manager') + +const mockNetwork = require('../../utils').mockNetwork + +describe('WantManager', () => { + it('sends wantlist to all connected peers', (done) => { + let cids + let blocks + + parallel([ + (cb) => PeerId.create(cb), + (cb) => PeerId.create(cb), + (cb) => { + const data = ['1', '2', '3'] + blocks = data.map((d) => new Block(d)) + map(blocks, (b, cb) => b.key(cb), (err, keys) => { + if (err) { + return done(err) + } + cids = keys.map((key) => new CID(key)) + cb() + }) + } + ], (err, peerIds) => { + if (err) { + return done(err) + } + + const peer1 = peerIds[0] + const peer2 = peerIds[1] + const cid1 = cids[0] + const cid2 = cids[1] + const cid3 = cids[2] + + let wantManager + + const network = mockNetwork(6, (calls) => { + expect(calls.connects).to.have.length(6) + const m1 = new Message(true) + + m1.addEntry(cid1, cs.kMaxPriority) + m1.addEntry(cid2, cs.kMaxPriority - 1) + + const m2 = new Message(false) + + m2.cancel(cid2) + + const m3 = new Message(false) + + m3.addEntry(cid3, cs.kMaxPriority) + + const msgs = [m1, m1, m2, m2, m3, m3] + + calls.messages.forEach((m, i) => { + expect(m[0]).to.be.eql(calls.connects[i]) + expect(m[1].equals(msgs[i])).to.be.eql(true) + }) + + wantManager = null + done() + }) + + wantManager = new WantManager(network) + + wantManager.run() + wantManager.wantBlocks([cid1, cid2]) + + wantManager.connected(peer1) + wantManager.connected(peer2) + + series([ + (cb) => setTimeout(cb, 200), + (cb) => { + wantManager.cancelWants([cid2]) + cb() + }, + (cb) => setTimeout(cb, 200) + ], (err) => { + expect(err).to.not.exist + wantManager.wantBlocks([cid3]) + }) + }) + }) +}) diff --git a/test/components/wantmanager/msg-queue.spec.js b/test/components/wantmanager/msg-queue.spec.js new file mode 100644 index 00000000..5c4215cd --- /dev/null +++ b/test/components/wantmanager/msg-queue.spec.js @@ -0,0 +1,110 @@ +/* eslint-env mocha */ +'use strict' + +const expect = require('chai').expect +const PeerId = require('peer-id') +const map = require('async/map') +const parallel = require('async/parallel') +const Block = require('ipfs-block') +const CID = require('cids') + +const Message = require('../../../src/types/message') +const MsgQueue = require('../../../src/components/want-manager/msg-queue') + +describe('MessageQueue', () => { + let peerId + let blocks + let cids + + before((done) => { + parallel([ + (cb) => { + PeerId.create((err, _peerId) => { + expect(err).to.not.exist + peerId = _peerId + cb() + }) + }, + (cb) => { + const data = ['1', '2', '3', '4', '5', '6'] + blocks = data.map((d) => new Block(d)) + map(blocks, (b, cb) => b.key(cb), (err, keys) => { + if (err) { + return done(err) + } + cids = keys.map((key) => new CID(key)) + cb() + }) + } + ], done) + }) + + it('connects and sends messages', (done) => { + const msg = new Message(true) + const cid1 = cids[0] + const cid2 = cids[1] + const cid3 = cids[2] + const cid4 = cids[3] + const cid5 = cids[4] + const cid6 = cids[5] + + msg.addEntry(cid1, 3) + msg.addEntry(cid2, 1) + + const messages = [] + const connects = [] + let i = 0 + + const finish = () => { + i++ + if (i === 2) { + expect(connects).to.be.eql([peerId, peerId]) + + const m1 = new Message(false) + m1.addEntry(cid3, 1) + m1.addEntry(cid4, 2) + m1.cancel(cid5) + m1.cancel(cid6) + + expect( + messages + ).to.be.eql([ + [peerId, msg], + [peerId, m1] + ]) + + done() + } + } + + const network = { + connectTo (p, cb) { + connects.push(p) + cb() + }, + sendMessage (p, msg, cb) { + messages.push([p, msg]) + cb() + finish() + } + } + + const mq = new MsgQueue(peerId, network) + + expect(mq.refcnt).to.equal(1) + + const batch1 = [ + new Message.Entry(cid3, 1, false), + new Message.Entry(cid4, 2, false) + ] + + const batch2 = [ + new Message.Entry(cid5, 1, true), + new Message.Entry(cid6, 2, true) + ] + + mq.addEntries(batch1) + mq.addEntries(batch2) + mq.addMessage(msg) + }) +}) diff --git a/test/index-test.js b/test/index-test.js index 86142a06..fd15cd83 100644 --- a/test/index-test.js +++ b/test/index-test.js @@ -14,8 +14,9 @@ const PeerId = require('peer-id') const Block = require('ipfs-block') const PeerBook = require('peer-book') const pull = require('pull-stream') +const CID = require('cids') -const Message = require('../src/message') +const Message = require('../src/types/message') const Bitswap = require('../src') const utils = require('./utils') @@ -35,6 +36,7 @@ module.exports = (repo) => { describe('bitswap', () => { let store let blocks + let cids let ids before((done) => { @@ -51,7 +53,13 @@ module.exports = (repo) => { blocks = results[1] ids = results[2] - done() + map(blocks, (b, cb) => b.key(cb), (err, keys) => { + if (err) { + return done(err) + } + cids = keys.map((key) => new CID(key)) + done() + }) }) }) @@ -61,82 +69,83 @@ module.exports = (repo) => { describe('receive message', () => { it('simple block message', (done) => { - const me = ids[0] const book = new PeerBook() - const bs = new Bitswap(me, libp2pMock, store, book) + const bs = new Bitswap(libp2pMock, store, book) bs.start() const other = ids[1] + const b1 = blocks[0] const b2 = blocks[1] + const cid1 = cids[0] + const cid2 = cids[1] + const msg = new Message(false) - each([b1, b2], (b, cb) => msg.addBlock(b, cb), (err) => { - expect(err).to.not.exist + msg.addBlock(cid1, b1) + msg.addBlock(cid2, b2) - bs._receiveMessage(other, msg, (err) => { - if (err) { - throw err - } + bs._receiveMessage(other, msg, (err) => { + if (err) { + throw err + } - expect(bs.blocksRecvd).to.be.eql(2) - expect(bs.dupBlocksRecvd).to.be.eql(0) + expect(bs.blocksRecvd).to.equal(2) + expect(bs.dupBlocksRecvd).to.equal(0) - pull( - pull.values([b1, b2]), - pull.asyncMap((b, cb) => b.key(cb)), - pull.map((key) => store.getStream(key)), - pull.flatten(), - pull.collect((err, blocks) => { - if (err) return done(err) - - expect(blocks[0].data).to.be.eql(b1.data) - expect(blocks[1].data).to.be.eql(b2.data) - done() - }) - ) - }) + pull( + pull.values([cid1, cid2]), + pull.map((cid) => store.getStream(cid.multihash)), + pull.flatten(), + pull.collect((err, blocks) => { + if (err) { + return done(err) + } + + expect(blocks[0].data).to.eql(b1.data) + expect(blocks[1].data).to.eql(b2.data) + done() + }) + ) }) }) it('simple want message', (done) => { - const me = ids[0] const book = new PeerBook() - const bs = new Bitswap(me, libp2pMock, store, book) + const bs = new Bitswap(libp2pMock, store, book) bs.start() const other = ids[1] - const b1 = blocks[2] - const b2 = blocks[3] + const cid1 = cids[0] + const cid2 = cids[1] + const msg = new Message(false) - parallel([ - (cb) => b1.key(cb), - (cb) => b2.key(cb) - ], (err, keys) => { - expect(err).to.not.exist - msg.addEntry(keys[0], 1, false) - msg.addEntry(keys[1], 1, false) + msg.addEntry(cid1, 1, false) + msg.addEntry(cid2, 1, false) - bs._receiveMessage(other, msg, (err) => { - expect(err).to.not.exist + bs._receiveMessage(other, msg, (err) => { + expect(err).to.not.exist - expect(bs.blocksRecvd).to.be.eql(0) - expect(bs.dupBlocksRecvd).to.be.eql(0) + expect(bs.blocksRecvd).to.be.eql(0) + expect(bs.dupBlocksRecvd).to.be.eql(0) - const wl = bs.wantlistForPeer(other) + const wl = bs.wantlistForPeer(other) - expect(wl.has(keys[0].toString())).to.be.eql(true) - expect(wl.has(keys[1].toString())).to.be.eql(true) + expect(wl.has(cid1.buffer.toString())).to.eql(true) + expect(wl.has(cid2.buffer.toString())).to.eql(true) - done() - }) + done() }) }) it('multi peer', (done) => { - const me = ids[0] const book = new PeerBook() - const bs = new Bitswap(me, libp2pMock, store, book) + const bs = new Bitswap(libp2pMock, store, book) + + let others + let blocks + let cids + bs.start() parallel([ @@ -147,21 +156,32 @@ module.exports = (repo) => { return done(err) } - const others = results[0] - const blocks = results[1] + others = results[0] + blocks = results[1] + + map(blocks, (b, cb) => b.key(cb), (err, keys) => { + if (err) { + return done(err) + } + cids = keys.map((key) => new CID(key)) + test() + }) + }) + function test () { map(_.range(5), (i, cb) => { - const m = new Message(false) - each( - [blocks[i], blocks[5 + i]], - (b, cb) => m.addBlock(b, cb), - (err) => { - if (err) { - return cb(err) - } - cb(null, m) - } - ) + const msg = new Message(false) + + each([ + { block: blocks[i], cid: cids[i] }, + { block: blocks[5 + i], cid: cids[5 + i] } + ], (blockAndCid, cb) => { + msg.addBlock(blockAndCid.cid, blockAndCid.block) + cb() + }, (err) => { + expect(err).to.not.exist + cb(null, msg) + }) }, (err, messages) => { expect(err).to.not.exist let i = 0 @@ -169,22 +189,24 @@ module.exports = (repo) => { const msg = messages[i] i++ bs._receiveMessage(other, msg, (err) => { - if (err) return cb(err) + expect(err).to.not.exist hasBlocks(msg, store, cb) }) }, done) }) - }) + } }) }) describe('getStream', () => { it('block exists locally', (done) => { - const me = ids[0] const block = blocks[4] + const cid = cids[4] + pull( - pull.values([block]), - pull.asyncMap(blockToStore), + pull.values([ + { data: block.data, key: cid.multihash } + ]), store.putStream(), pull.onEnd((err) => { if (err) { @@ -192,19 +214,16 @@ module.exports = (repo) => { } const book = new PeerBook() - const bs = new Bitswap(me, libp2pMock, store, book) + const bs = new Bitswap(libp2pMock, store, book) pull( - pull.values([block]), - pull.asyncMap((b, cb) => b.key(cb)), - pull.map((key) => bs.getStream(key)), - pull.flatten(), + bs.getStream(cid), pull.collect((err, res) => { if (err) { return done(err) } - expect(res[0].data).to.be.eql(block.data) + expect(res[0].data).to.eql(block.data) done() }) ) @@ -213,85 +232,47 @@ module.exports = (repo) => { }) it('blocks exist locally', (done) => { - const me = ids[0] const b1 = blocks[5] const b2 = blocks[6] const b3 = blocks[7] + const cid1 = cids[5] + const cid2 = cids[6] + const cid3 = cids[7] pull( - pull.values([b1, b2, b3]), - pull.asyncMap(blockToStore), + pull.values([ + { data: b1.data, key: cid1.multihash }, + { data: b2.data, key: cid2.multihash }, + { data: b3.data, key: cid3.multihash } + ]), store.putStream(), pull.onEnd((err) => { expect(err).to.not.exist const book = new PeerBook() - const bs = new Bitswap(me, libp2pMock, store, book) + const bs = new Bitswap(libp2pMock, store, book) pull( - pull.values([b1, b2, b3]), - pull.asyncMap((b, cb) => b.key(cb)), - pull.collect((err, keys) => { + bs.getStream([cid1, cid2, cid3]), + pull.collect((err, res) => { expect(err).to.not.exist - pull( - bs.getStream(keys), - pull.collect((err, res) => { - expect(err).to.not.exist - - expect(res[0].data).to.be.eql(b1.data) - expect(res[1].data).to.be.eql(b2.data) - expect(res[2].data).to.be.eql(b3.data) - done() - }) - ) + + expect(res[0].data).to.eql(b1.data) + expect(res[1].data).to.eql(b2.data) + expect(res[2].data).to.eql(b3.data) + done() }) ) }) ) }) - // Not sure if I understand what is going on here - // test fails because now the network is not properly mocked - // what are these net.stores and mockNet.bitswaps? - it.skip('block is retrived from peer', (done) => { - const block = blocks[8] - - let mockNet - waterfall([ - (cb) => utils.createMockNet(repo, 2, cb), - (net, cb) => { - mockNet = net - net.stores[1].put(block, cb) - }, - (val, cb) => { - mockNet.bitswaps[0]._onPeerConnected(mockNet.ids[1]) - mockNet.bitswaps[1]._onPeerConnected(mockNet.ids[0]) - pull( - pull.values([block]), - pull.asyncMap((b, cb) => b.key(cb)), - pull.map((key) => mockNet.bitswaps[0].getStream(key)), - pull.flatten(), - pull.collect((err, res) => { - if (err) { - return cb(err) - } - cb(null, res[0]) - }) - ) - }, - (res, cb) => { - expect(res).to.be.eql(block) - cb() - } - ], done) - }) - it('block is added locally afterwards', (done) => { - const me = ids[0] const block = blocks[9] const book = new PeerBook() - const bs = new Bitswap(me, libp2pMock, store, book) + const bs = new Bitswap(libp2pMock, store, book) const net = utils.mockNetwork() + bs.network = net bs.wm.network = net bs.engine.network = net @@ -299,8 +280,9 @@ module.exports = (repo) => { block.key((err, key) => { expect(err).to.not.exist + const cid = new CID(key) pull( - bs.getStream(key), + bs.getStream(cid), pull.collect((err, res) => { expect(err).to.not.exist expect(res[0].data).to.be.eql(block.data) @@ -310,8 +292,8 @@ module.exports = (repo) => { setTimeout(() => { bs.put({ - data: block.data, - key: key + block: block, + cid: cid }, () => {}) }, 200) }) @@ -360,7 +342,7 @@ module.exports = (repo) => { start () {}, stop () {} } - bs1 = new Bitswap(me, libp2pMock, store, new PeerBook()) + bs1 = new Bitswap(libp2pMock, store, new PeerBook()) utils.applyNetwork(bs1, n1) bs1.start() @@ -370,7 +352,7 @@ module.exports = (repo) => { (cb) => repo.create('world', cb), (repo, cb) => { store2 = repo.blockstore - bs2 = new Bitswap(other, libp2pMock, store2, new PeerBook()) + bs2 = new Bitswap(libp2pMock, store2, new PeerBook()) utils.applyNetwork(bs2, n2) bs2.start() bs1._onPeerConnected(other) @@ -378,8 +360,9 @@ module.exports = (repo) => { block.key((err, key) => { expect(err).to.not.exist + const cid = new CID(key) pull( - bs1.getStream(key), + bs1.getStream(cid), pull.collect((err, res) => { expect(err).to.not.exist cb(null, res[0]) @@ -388,14 +371,15 @@ module.exports = (repo) => { setTimeout(() => { bs2.put({ - data: block.data, - key: key + block: block, + cid: cid }) }, 1000) }) }, (res, cb) => { - expect(res).to.be.eql(res) + // TODO: Ask Fridel if this is what he really meant + expect(res).to.eql(res) cb() } ], done) @@ -404,8 +388,7 @@ module.exports = (repo) => { describe('stat', () => { it('has initial stats', () => { - const me = ids[0] - const bs = new Bitswap(me, libp2pMock, {}, new PeerBook()) + const bs = new Bitswap(libp2pMock, {}, new PeerBook()) const stats = bs.stat() expect(stats).to.have.property('wantlist') @@ -418,8 +401,7 @@ module.exports = (repo) => { describe('unwant', () => { it('removes blocks that are wanted multiple times', (done) => { - const me = ids[0] - const bs = new Bitswap(me, libp2pMock, store, new PeerBook()) + const bs = new Bitswap(libp2pMock, store, new PeerBook()) bs.start() const b = blocks[11] @@ -433,7 +415,7 @@ module.exports = (repo) => { b.key((err, key) => { expect(err).to.not.exist pull( - bs.getStream(key), + bs.getStream(new CID(key)), pull.collect((err, res) => { expect(err).to.not.exist expect(res).to.be.empty @@ -441,7 +423,7 @@ module.exports = (repo) => { }) ) pull( - bs.getStream(key), + bs.getStream(new CID(key)), pull.collect((err, res) => { expect(err).to.not.exist expect(res).to.be.empty @@ -449,7 +431,7 @@ module.exports = (repo) => { }) ) - setTimeout(() => bs.unwant(key), 10) + setTimeout(() => bs.unwant(new CID(key)), 10) }) }) }) @@ -470,13 +452,3 @@ function hasBlocks (msg, store, cb) { }) }, cb) } - -function blockToStore (b, cb) { - b.key((err, key) => { - if (err) { - return cb(err) - } - - cb(null, {data: b.data, key: key}) - }) -} diff --git a/test/message.spec.js b/test/message.spec.js deleted file mode 100644 index 1dfa2208..00000000 --- a/test/message.spec.js +++ /dev/null @@ -1,222 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const expect = require('chai').expect -const Block = require('ipfs-block') -const protobuf = require('protocol-buffers') -const mh = require('multihashes') -const series = require('async/series') -const map = require('async/map') -const pbm = protobuf(require('../src/message/message.proto')) - -const BitswapMessage = require('../src/message') - -describe('BitswapMessage', () => { - let blocks - let keys - - before((done) => { - const data = [ - 'foo', - 'hello', - 'world' - ] - blocks = data.map((d) => new Block(d)) - map(blocks, (b, cb) => b.key(cb), (err, res) => { - if (err) { - return done(err) - } - keys = res - done() - }) - }) - - it('go interop', (done) => { - const goEncoded = new Buffer('CioKKAoiEiAs8k26X7CjDiboOyrFueKeGxYeXB+nQl5zBDNik4uYJBAKGAA=', 'base64') - - const m = new BitswapMessage(false) - m.addEntry(mh.fromB58String('QmRN6wdp1S2A5EtjW9A3M1vKSBuQQGcgvuhoMUoEz4iiT5'), 10) - - BitswapMessage.fromProto(goEncoded, (err, res) => { - expect(err).to.not.exist - expect(res).to.be.eql(m) - - expect( - m.toProto() - ).to.be.eql( - goEncoded - ) - done() - }) - }) - - it('append wanted', () => { - const key = keys[1] - const m = new BitswapMessage(true) - m.addEntry(key, 1) - - expect( - pbm.Message.decode(m.toProto()).wantlist.entries[0] - ).to.be.eql({ - block: key, - priority: 1, - cancel: false - }) - }) - - it('encodes blocks', (done) => { - const block = blocks[1] - const m = new BitswapMessage(true) - m.addBlock(block, (err) => { - expect(err).to.not.exist - expect( - pbm.Message.decode(m.toProto()).blocks - ).to.be.eql([ - block.data - ]) - done() - }) - }) - - it('new message fromProto', (done) => { - const raw = pbm.Message.encode({ - wantlist: { - entries: [{ - block: new Buffer('hello'), - cancel: false - }], - full: true - }, - blocks: ['hello', 'world'] - }) - - BitswapMessage.fromProto(raw, (err, protoMessage) => { - expect(err).to.not.exist - expect( - protoMessage.full - ).to.be.eql( - true - ) - expect( - Array.from(protoMessage.wantlist) - ).to.be.eql([ - [(new Buffer('hello')).toString(), new BitswapMessage.Entry(new Buffer('hello'), 0, false)] - ]) - - const b1 = blocks[1] - const b2 = blocks[2] - const k1 = keys[1] - const k2 = keys[2] - - expect( - Array.from(protoMessage.blocks).map((b) => [b[0], b[1].data]) - ).to.be.eql([ - [k1.toString(), b1.data], - [k2.toString(), b2.data] - ]) - - done() - }) - }) - - it('duplicates', (done) => { - const b = blocks[0] - const key = keys[0] - const m = new BitswapMessage(true) - - m.addEntry(key, 1) - m.addEntry(key, 1) - - expect(m.wantlist.size).to.be.eql(1) - series([ - (cb) => m.addBlock(b, cb), - (cb) => m.addBlock(b, cb) - ], (err) => { - expect(err).to.not.exist - expect(m.blocks.size).to.be.eql(1) - done() - }) - }) - - it('empty', () => { - const m = new BitswapMessage(true) - - expect( - m.empty - ).to.be.eql( - true - ) - }) - - it('non full message', () => { - const m = new BitswapMessage(false) - - expect( - pbm.Message.decode(m.toProto()).wantlist.full - ).to.be.eql( - false - ) - }) - - describe('.equals', () => { - it('true, same message', (done) => { - const b = blocks[0] - const key = keys[0] - const m1 = new BitswapMessage(true) - const m2 = new BitswapMessage(true) - - m1.addEntry(key, 1) - m2.addEntry(key, 1) - - series([ - (cb) => m1.addBlock(b, cb), - (cb) => m2.addBlock(b, cb) - ], (err) => { - expect(err).to.not.exist - expect(m1.equals(m2)).to.be.eql(true) - done() - }) - }) - - it('false, different entries', (done) => { - const b = blocks[0] - const key = keys[0] - const m1 = new BitswapMessage(true) - const m2 = new BitswapMessage(true) - - m1.addEntry(key, 1) - m2.addEntry(key, 2) - - series([ - (cb) => m1.addBlock(b, cb), - (cb) => m2.addBlock(b, cb) - ], (err) => { - expect(err).to.not.exist - expect(m1.equals(m2)).to.be.eql(false) - done() - }) - }) - }) - - describe('Entry', () => { - it('exposes the wantlist entry properties', () => { - const entry = new BitswapMessage.Entry(new Buffer('hello'), 5, false) - - expect(entry).to.have.property('key') - expect(entry).to.have.property('priority', 5) - - expect(entry).to.have.property('cancel', false) - }) - - it('allows setting properties on the wantlist entry', () => { - const entry = new BitswapMessage.Entry(new Buffer('hello'), 5, false) - - expect(entry.entry).to.have.property('key') - expect(entry.entry).to.have.property('priority', 5) - - entry.priority = 2 - - expect(entry.entry).to.have.property('priority', 2) - }) - }) -}) diff --git a/test/network/network.node.js b/test/network/network.node.js deleted file mode 100644 index 0c5fc707..00000000 --- a/test/network/network.node.js +++ /dev/null @@ -1,203 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const Node = require('libp2p-ipfs-nodejs') -const PeerInfo = require('peer-info') -const multiaddr = require('multiaddr') -const expect = require('chai').expect -const PeerBook = require('peer-book') -const Block = require('ipfs-block') -const lp = require('pull-length-prefixed') -const pull = require('pull-stream') -const parallel = require('async/parallel') -const series = require('async/series') - -const Network = require('../../src/network') -const Message = require('../../src/message') - -describe('network', () => { - let libp2pNodeA - let libp2pNodeB - let peerInfoA - let peerInfoB - let peerBookA - let peerBookB - let networkA - let networkB - let blocks - - before((done) => { - let counter = 0 - parallel([ - (cb) => PeerInfo.create(cb), - (cb) => PeerInfo.create(cb) - ], (err, results) => { - if (err) { - return done(err) - } - - peerInfoA = results[0] - peerInfoB = results[1] - blocks = ['hello', 'world'].map((b) => new Block(b)) - - peerInfoA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/10100/ipfs/' + peerInfoA.id.toB58String())) - peerInfoB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/10500/ipfs/' + peerInfoB.id.toB58String())) - - peerBookA = new PeerBook() - peerBookB = new PeerBook() - - peerBookA.put(peerInfoB) - peerBookB.put(peerInfoA) - - libp2pNodeA = new Node(peerInfoA, peerBookA) - libp2pNodeA.start(started) - libp2pNodeB = new Node(peerInfoB, peerBookB) - libp2pNodeB.start(started) - - function started () { - if (++counter === 2) { - done() - } - } - }) - }) - - after((done) => { - let counter = 0 - libp2pNodeA.stop(stopped) - libp2pNodeB.stop(stopped) - - function stopped () { - if (++counter === 2) { - done() - } - } - }) - - let bitswapMockA = { - _receiveMessage: () => {}, - _receiveError: () => {}, - _onPeerConnected: () => {}, - _onPeerDisconnected: () => {} - } - - let bitswapMockB = { - _receiveMessage: () => {}, - _receiveError: () => {}, - _onPeerConnected: () => {}, - _onPeerDisconnected: () => {} - } - - it('instantiate the network obj', (done) => { - networkA = new Network(libp2pNodeA, peerBookA, bitswapMockA) - networkB = new Network(libp2pNodeB, peerBookB, bitswapMockB) - expect(networkA).to.exist - expect(networkB).to.exist - - networkA.start() - networkB.start() - done() - }) - - it('connectTo fail', (done) => { - networkA.connectTo(peerInfoB.id, (err) => { - expect(err).to.exist - done() - }) - }) - - it('onPeerConnected success', (done) => { - var counter = 0 - - bitswapMockA._onPeerConnected = (peerId) => { - expect(peerId.toB58String()).to.equal(peerInfoB.id.toB58String()) - if (++counter === 2) { - finish() - } - } - - bitswapMockB._onPeerConnected = (peerId) => { - expect(peerId.toB58String()).to.equal(peerInfoA.id.toB58String()) - if (++counter === 2) { - finish() - } - } - - libp2pNodeA.dialByPeerInfo(peerInfoB, (err) => { - expect(err).to.not.exist - }) - - function finish () { - bitswapMockA._onPeerConnected = () => {} - bitswapMockB._onPeerConnected = () => {} - done() - } - }) - - it('connectTo success', (done) => { - networkA.connectTo(peerInfoB.id, (err) => { - expect(err).to.not.exist - done() - }) - }) - - it('_receiveMessage success', (done) => { - const msg = new Message(true) - const b = blocks[0] - - b.key((err, key) => { - expect(err).to.not.exist - msg.addEntry(key, 0, false) - - series([ - (cb) => msg.addBlock(b, cb), - (cb) => msg.addBlock(blocks[1], cb) - ], (err) => { - expect(err).to.not.exist - bitswapMockB._receiveMessage = (peerId, msgReceived) => { - expect(msg).to.deep.equal(msgReceived) - bitswapMockB._receiveMessage = () => {} - bitswapMockB._receiveError = () => {} - done() - } - - bitswapMockB._receiveError = (err) => { - expect(err).to.not.exist - } - - libp2pNodeA.dialByPeerInfo(peerInfoB, '/ipfs/bitswap/1.0.0', (err, conn) => { - expect(err).to.not.exist - - pull( - pull.values([msg.toProto()]), - lp.encode(), - conn - ) - }) - }) - }) - }) - - it('sendMessage', (done) => { - const msg = new Message(true) - blocks[0].key((err, key) => { - expect(err).to.not.exist - msg.addEntry(key, 0, false) - series([ - (cb) => msg.addBlock(blocks[0], cb), - (cb) => msg.addBlock(blocks[1], cb) - ], (err) => { - expect(err).to.not.exist - bitswapMockB._receiveMessage = (peerId, msgReceived) => { - expect(msg).to.deep.equal(msgReceived) - bitswapMockB._receiveMessage = () => {} - done() - } - - networkA.sendMessage(peerInfoB.id, msg, (err) => { - expect(err).to.not.exist - }) - }) - }) - }) -}) diff --git a/test/node.js b/test/node.js index 1147b46a..91ec1c08 100644 --- a/test/node.js +++ b/test/node.js @@ -38,6 +38,6 @@ const repo = { } require('./index-test')(repo) -require('./decision/engine-test')(repo) -require('./network/network.node.js') -require('./network/gen-bitswap-network.node.js') +require('./components/decision-engine/index-test')(repo) +require('./components/network/network.node.js') +require('./components/network/gen-bitswap-network.node.js') diff --git a/test/test-data/serialized-from-go/bitswap110-message-full-wantlist b/test/test-data/serialized-from-go/bitswap110-message-full-wantlist new file mode 100644 index 00000000..daa992c8 Binary files /dev/null and b/test/test-data/serialized-from-go/bitswap110-message-full-wantlist differ diff --git a/test/test-data/serialized-from-go/bitswap110-message-one-block b/test/test-data/serialized-from-go/bitswap110-message-one-block new file mode 100644 index 00000000..4a436137 Binary files /dev/null and b/test/test-data/serialized-from-go/bitswap110-message-one-block differ diff --git a/test/types/message.spec.js b/test/types/message.spec.js new file mode 100644 index 00000000..27578f9c --- /dev/null +++ b/test/types/message.spec.js @@ -0,0 +1,281 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 8] */ +'use strict' + +const expect = require('chai').expect +const Block = require('ipfs-block') +const protobuf = require('protocol-buffers') +const map = require('async/map') +const pbm = protobuf(require('../../src/types/message/message.proto')) +const CID = require('cids') + +const loadFixture = require('aegir/fixtures') +const testDataPath = '../test-data/serialized-from-go' +const rawMessageFullWantlist = loadFixture(__dirname, testDataPath + '/bitswap110-message-full-wantlist') +const rawMessageOneBlock = loadFixture(__dirname, testDataPath + '/bitswap110-message-one-block') + +const BitswapMessage = require('../../src/types/message') + +describe('BitswapMessage', () => { + let blocks + let cids + + before((done) => { + const data = ['foo', 'hello', 'world'] + blocks = data.map((d) => new Block(d)) + map(blocks, (b, cb) => b.key(cb), (err, keys) => { + if (err) { + return done(err) + } + cids = keys.map((key) => new CID(key)) + done() + }) + }) + + it('.addEntry - want block', () => { + const cid = cids[1] + const msg = new BitswapMessage(true) + msg.addEntry(cid, 1) + const serialized = msg.serializeToBitswap100() + + expect(pbm.Message.decode(serialized).wantlist.entries[0]).to.be.eql({ + block: cid.buffer, + priority: 1, + cancel: false + }) + }) + + it('.serializeToBitswap100', () => { + const block = blocks[1] + const cid = cids[1] + const msg = new BitswapMessage(true) + msg.addBlock(cid, block) + const serialized = msg.serializeToBitswap100() + expect(pbm.Message.decode(serialized).blocks).to.eql([block.data]) + }) + + it('.serializeToBitswap110', () => { + const block = blocks[1] + const cid = cids[1] + const msg = new BitswapMessage(true) + msg.addBlock(cid, block) + + const serialized = msg.serializeToBitswap110() + const decoded = pbm.Message.decode(serialized) + + expect(decoded.payload[0].data).to.eql(block.data) + }) + + it('.deserialize a Bitswap100 Message', (done) => { + const cid0 = cids[0] + const cid1 = cids[1] + const cid2 = cids[2] + + const b1 = blocks[1] + const b2 = blocks[2] + + const raw = pbm.Message.encode({ + wantlist: { + entries: [{ + block: cid0.buffer, + cancel: false + }], + full: true + }, + blocks: [ + b1.data, + b2.data + ] + }) + + BitswapMessage.deserialize(raw, (err, msg) => { + expect(err).to.not.exist + expect(msg.full).to.equal(true) + expect(Array.from(msg.wantlist)) + .to.eql([[ + cid0.toBaseEncodedString(), + new BitswapMessage.Entry(cid0, 0, false) + ]]) + + expect(Array.from(msg.blocks).map((b) => [b[0], b[1].data])) + .to.eql([ + [cid1.toBaseEncodedString(), b1.data], + [cid2.toBaseEncodedString(), b2.data] + ]) + + done() + }) + }) + + it('.deserialize a Bitswap110 Message', (done) => { + const cid0 = cids[0] + const cid1 = cids[1] + const cid2 = cids[2] + + const b1 = blocks[1] + const b2 = blocks[2] + + const raw = pbm.Message.encode({ + wantlist: { + entries: [{ + block: cid0.buffer, + cancel: false + }], + full: true + }, + payload: [{ + data: b1.data, + prefix: cid1.prefix + }, { + data: b2.data, + prefix: cid2.prefix + }] + }) + + BitswapMessage.deserialize(raw, (err, msg) => { + expect(err).to.not.exist + expect(msg.full).to.equal(true) + expect(Array.from(msg.wantlist)) + .to.eql([[ + cid0.toBaseEncodedString(), + new BitswapMessage.Entry(cid0, 0, false) + ]]) + + expect(Array.from(msg.blocks).map((b) => [b[0], b[1].data])) + .to.eql([ + [cid1.toBaseEncodedString(), b1.data], + [cid2.toBaseEncodedString(), b2.data] + ]) + + done() + }) + }) + + it('duplicates', (done) => { + const b = blocks[0] + const cid = cids[0] + const m = new BitswapMessage(true) + + m.addEntry(cid, 1) + m.addEntry(cid, 1) + + expect(m.wantlist.size).to.be.eql(1) + m.addBlock(cid, b) + m.addBlock(cid, b) + expect(m.blocks.size).to.be.eql(1) + done() + }) + + it('.empty', () => { + const m = new BitswapMessage(true) + expect(m.empty).to.equal(true) + }) + + it('non full wantlist message', () => { + const msg = new BitswapMessage(false) + const serialized = msg.serializeToBitswap100() + + expect(pbm.Message.decode(serialized).wantlist.full).to.equal(false) + }) + + describe('.equals', () => { + it('true, same message', (done) => { + const b = blocks[0] + const cid = cids[0] + const m1 = new BitswapMessage(true) + const m2 = new BitswapMessage(true) + + m1.addEntry(cid, 1) + m2.addEntry(cid, 1) + + m1.addBlock(cid, b) + m2.addBlock(cid, b) + expect(m1.equals(m2)).to.equal(true) + done() + }) + + it('false, different entries', (done) => { + const b = blocks[0] + const cid = cids[0] + const m1 = new BitswapMessage(true) + const m2 = new BitswapMessage(true) + + m1.addEntry(cid, 100) + m2.addEntry(cid, 3750) + + m1.addBlock(cid, b) + m2.addBlock(cid, b) + expect(m1.equals(m2)).to.equal(false) + done() + }) + }) + + describe('BitswapMessageEntry', () => { + it('exposes the wantlist entry properties', () => { + const cid = cids[0] + const entry = new BitswapMessage.Entry(cid, 5, false) + + expect(entry).to.have.property('cid') + expect(entry).to.have.property('priority', 5) + + expect(entry).to.have.property('cancel', false) + }) + + it('allows setting properties on the wantlist entry', () => { + const cid1 = cids[0] + const cid2 = cids[1] + + const entry = new BitswapMessage.Entry(cid1, 5, false) + + expect(entry.entry).to.have.property('cid') + expect(entry.entry).to.have.property('priority', 5) + + entry.cid = cid2 + entry.priority = 2 + + expect(entry.entry).to.have.property('cid') + expect(entry.entry.cid.equals(cid2)) + expect(entry.entry).to.have.property('priority', 2) + }) + }) + + describe('go interop', () => { + it('bitswap 1.0.0 message', (done) => { + const goEncoded = new Buffer('CioKKAoiEiAs8k26X7CjDiboOyrFueKeGxYeXB+nQl5zBDNik4uYJBAKGAA=', 'base64') + + const msg = new BitswapMessage(false) + const cid = new CID('QmRN6wdp1S2A5EtjW9A3M1vKSBuQQGcgvuhoMUoEz4iiT5') + msg.addEntry(cid, 10) + + BitswapMessage.deserialize(goEncoded, (err, res) => { + expect(err).to.not.exist + expect(res).to.eql(msg) + expect(msg.serializeToBitswap100()).to.eql(goEncoded) + done() + }) + }) + + describe.skip('bitswap 1.1.0 message', () => { + // TODO check with whyrusleeping the quality of the raw protobufs + // deserialization is just failing on the first and the second has a + // payload but empty + it('full wantlist message', (done) => { + BitswapMessage.deserialize(rawMessageFullWantlist, (err, message) => { + expect(err).to.not.exist + // TODO + // check the deserialised message + done() + }) + }) + + it('one block message', (done) => { + BitswapMessage.deserialize(rawMessageOneBlock, (err, message) => { + expect(err).to.not.exist + // TODO + // check the deserialised message + done() + }) + }) + }) + }) +}) diff --git a/test/wantlist.spec.js b/test/types/wantlist.spec.js similarity index 51% rename from test/wantlist.spec.js rename to test/types/wantlist.spec.js index 473fb7c4..3534f23a 100644 --- a/test/wantlist.spec.js +++ b/test/types/wantlist.spec.js @@ -4,8 +4,9 @@ const expect = require('chai').expect const Block = require('ipfs-block') const map = require('async/map') +const CID = require('cids') -const Wantlist = require('../src/wantlist') +const Wantlist = require('../../src/types/wantlist') describe('Wantlist', () => { let wm @@ -25,11 +26,15 @@ describe('Wantlist', () => { const b1 = blocks[0] const b2 = blocks[1] - map([b1, b2], (b, cb) => b.key(cb), (err, keys) => { + map([ + b1, + b2 + ], + (b, cb) => b.key(cb), + (err, keys) => { expect(err).to.not.exist - wm.add(keys[0], 2) - wm.add(keys[1], 1) - + wm.add(new CID(keys[0]), 2) + wm.add(new CID(keys[1]), 1) expect(wm).to.have.length(2) done() }) @@ -41,9 +46,8 @@ describe('Wantlist', () => { b.key((err, key) => { expect(err).to.not.exist - wm.add(key, 1) - wm.remove(key) - + wm.add(new CID(key), 1) + wm.remove(new CID(key)) expect(wm).to.have.length(0) done() }) @@ -53,24 +57,31 @@ describe('Wantlist', () => { const b1 = blocks[0] const b2 = blocks[1] - map([b1, b2], (b, cb) => b.key(cb), (err, keys) => { + map([ + b1, + b2 + ], + (b, cb) => b.key(cb), + (err, keys) => { expect(err).to.not.exist - wm.add(keys[0], 1) - wm.add(keys[1], 2) + const cid1 = new CID(keys[0]) + const cid2 = new CID(keys[1]) + + wm.add(cid1, 1) + wm.add(cid2, 2) expect(wm).to.have.length(2) - wm.remove(keys[1]) + wm.remove(cid2) expect(wm).to.have.length(1) - wm.add(keys[0], 2) - wm.remove(keys[0]) + wm.add(cid1, 2) + wm.remove(cid1) expect(wm).to.have.length(1) - wm.remove(keys[0]) - + wm.remove(cid1) expect(wm).to.have.length(0) done() }) @@ -81,9 +92,10 @@ describe('Wantlist', () => { b.key((err, key) => { expect(err).to.not.exist - wm.add(key, 1) - wm.remove(key) - wm.remove(key) + const cid = new CID(key) + wm.add(cid, 1) + wm.remove(cid) + wm.remove(cid) expect(wm).to.have.length(0) done() @@ -95,13 +107,15 @@ describe('Wantlist', () => { const b = blocks[0] b.key((err, key) => { expect(err).to.not.exist - wm.add(key, 2) + const cid = new CID(key) + wm.add(cid, 2) expect( Array.from(wm.entries()) - ).to.be.eql([ - [key.toString(), new Wantlist.Entry(key, 2)] - ]) + ).to.be.eql([[ + cid.buffer.toString(), + new Wantlist.Entry(cid, 2) + ]]) done() }) }) @@ -110,16 +124,24 @@ describe('Wantlist', () => { const b1 = blocks[0] const b2 = blocks[1] - map([b1, b2], (b, cb) => b.key(cb), (err, keys) => { + map([ + b1, + b2 + ], + (b, cb) => b.key(cb), + (err, keys) => { expect(err).to.not.exist - wm.add(keys[1], 1) - wm.add(keys[0], 1) + const cid1 = new CID(keys[0]) + const cid2 = new CID(keys[1]) + + wm.add(cid1, 1) + wm.add(cid2, 1) expect( Array.from(wm.sortedEntries()) ).to.be.eql([ - [keys[0].toString(), new Wantlist.Entry(keys[0], 1)], - [keys[1].toString(), new Wantlist.Entry(keys[1], 1)] + [cid1.buffer.toString(), new Wantlist.Entry(cid1, 1)], + [cid2.buffer.toString(), new Wantlist.Entry(cid2, 1)] ]) done() }) @@ -128,17 +150,38 @@ describe('Wantlist', () => { it('contains', (done) => { const b1 = blocks[0] const b2 = blocks[1] - map([b1, b2], (b, cb) => b.key(cb), (err, keys) => { + + map([ + b1, + b2 + ], + (b, cb) => b.key(cb), + (err, keys) => { expect(err).to.not.exist - wm.add(keys[0], 2) + const cid1 = new CID(keys[0]) + const cid2 = new CID(keys[1]) - expect( - wm.contains(keys[0]) - ).to.exist + wm.add(cid1, 2) + + expect(wm.contains(cid1)).to.exist + expect(wm.contains(cid2)).to.not.exist + done() + }) + }) + + it('with cidV1', (done) => { + const b = blocks[0] + b.key((err, key) => { + expect(err).to.not.exist + const cid = new CID(1, 'dag-pb', key) + wm.add(cid, 2) expect( - wm.contains(keys[1]) - ).to.not.exist + Array.from(wm.entries()) + ).to.be.eql([[ + cid.buffer.toString(), + new Wantlist.Entry(cid, 2) + ]]) done() }) }) diff --git a/test/utils.js b/test/utils.js index 207a8105..84868ca9 100644 --- a/test/utils.js +++ b/test/utils.js @@ -59,7 +59,7 @@ exports.createMockNet = (repo, count, cb) => { const ids = results[1] const hexIds = ids.map((id) => id.toHexString()) - const bitswaps = _.range(count).map((i) => new Bitswap(ids[i], {}, stores[i])) + const bitswaps = _.range(count).map((i) => new Bitswap({}, stores[i])) const networks = _.range(count).map((i) => { return { connectTo (id, cb) { @@ -153,7 +153,7 @@ exports.genBitswapNetwork = (n, callback) => { // create every BitSwap function createBitswaps () { netArray.forEach((net) => { - net.bitswap = new Bitswap(net.peerInfo, net.libp2p, net.repo.blockstore, net.peerBook) + net.bitswap = new Bitswap(net.libp2p, net.repo.blockstore, net.peerBook) }) establishLinks() } diff --git a/test/wantmanager/index.spec.js b/test/wantmanager/index.spec.js deleted file mode 100644 index 57ac69fd..00000000 --- a/test/wantmanager/index.spec.js +++ /dev/null @@ -1,73 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const expect = require('chai').expect -const PeerId = require('peer-id') -const parallel = require('async/parallel') -const series = require('async/series') - -const cs = require('../../src/constants') -const Message = require('../../src/message') -const Wantmanager = require('../../src/wantmanager') - -const mockNetwork = require('../utils').mockNetwork - -describe('Wantmanager', () => { - it('sends wantlist to all connected peers', (done) => { - parallel([ - (cb) => PeerId.create(cb), - (cb) => PeerId.create(cb) - ], (err, peers) => { - if (err) { - return done(err) - } - - const peer1 = peers[0] - const peer2 = peers[1] - - let wm - const network = mockNetwork(6, (calls) => { - expect(calls.connects).to.have.length(6) - const m1 = new Message(true) - m1.addEntry(new Buffer('hello'), cs.kMaxPriority) - m1.addEntry(new Buffer('world'), cs.kMaxPriority - 1) - - const m2 = new Message(false) - m2.cancel(new Buffer('world')) - - const m3 = new Message(false) - m3.addEntry(new Buffer('foo'), cs.kMaxPriority) - - const msgs = [m1, m1, m2, m2, m3, m3] - - calls.messages.forEach((m, i) => { - expect(m[0]).to.be.eql(calls.connects[i]) - expect(m[1].equals(msgs[i])).to.be.eql(true) - }) - - wm = null - done() - }) - - wm = new Wantmanager(network) - - wm.run() - wm.wantBlocks([new Buffer('hello'), new Buffer('world')]) - - wm.connected(peer1) - wm.connected(peer2) - - series([ - (cb) => setTimeout(cb, 200), - (cb) => { - wm.cancelWants([new Buffer('world')]) - cb() - }, - (cb) => setTimeout(cb, 200) - ], (err) => { - expect(err).to.not.exist - wm.wantBlocks([new Buffer('foo')]) - }) - }) - }) -}) diff --git a/test/wantmanager/msg-queue.spec.js b/test/wantmanager/msg-queue.spec.js deleted file mode 100644 index a1596139..00000000 --- a/test/wantmanager/msg-queue.spec.js +++ /dev/null @@ -1,80 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const expect = require('chai').expect -const PeerId = require('peer-id') - -const Message = require('../../src/message') -const MsgQueue = require('../../src/wantmanager/msg-queue') - -describe('MsgQueue', () => { - it('connects and sends messages', (done) => { - PeerId.create((err, id) => { - if (err) { - return done(err) - } - - const msg = new Message(true) - msg.addEntry(new Buffer('hello world'), 3) - msg.addEntry(new Buffer('foo bar'), 1) - - const messages = [] - const connects = [] - let i = 0 - const finish = () => { - i++ - if (i === 2) { - expect( - connects - ).to.be.eql([ - id, id - ]) - - const m1 = new Message(false) - m1.addEntry(new Buffer('hello'), 1) - m1.addEntry(new Buffer('world'), 2) - m1.cancel(new Buffer('foo')) - m1.cancel(new Buffer('bar')) - - expect( - messages - ).to.be.eql([ - [id, msg], - [id, m1] - ]) - - done() - } - } - - const network = { - connectTo (p, cb) { - connects.push(p) - cb() - }, - sendMessage (p, msg, cb) { - messages.push([p, msg]) - cb() - finish() - } - } - const mq = new MsgQueue(id, network) - - expect(mq.refcnt).to.be.eql(1) - - const batch1 = [ - new Message.Entry(new Buffer('hello'), 1, false), - new Message.Entry(new Buffer('world'), 2, false) - ] - - const batch2 = [ - new Message.Entry(new Buffer('foo'), 1, true), - new Message.Entry(new Buffer('bar'), 2, true) - ] - - mq.addEntries(batch1) - mq.addEntries(batch2) - mq.addMessage(msg) - }) - }) -})