Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: race condition when requesting the same block twice #214

Merged
merged 12 commits into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions benchmarks/put-get.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

const Benchmark = require('benchmark')
const assert = require('assert')
const all = require('async-iterator-all')
const all = require('it-all')
const drain = require('it-drain')
const makeBlock = require('../test/utils/make-block')
const genBitswapNetwork = require('../test/utils/mocks').genBitswapNetwork

Expand All @@ -24,7 +25,7 @@ const blockSizes = [10, 1024, 10 * 1024]
suite.add(`put-get ${n} blocks of size ${k}`, async (defer) => {
const blocks = await makeBlock(n, k)

await bitswap.putMany(blocks)
await drain(bitswap.putMany(blocks))

const res = await all(bitswap.getMany(blocks.map(block => block.cid)))

Expand Down
14 changes: 8 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,15 @@
"homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme",
"devDependencies": {
"@nodeutils/defaults-deep": "^1.1.0",
"aegir": "^21.10.1",
"async-iterator-all": "^1.0.0",
"aegir": "^22.0.0",
"benchmark": "^2.1.4",
"buffer": "^5.6.0",
"chai": "^4.2.0",
"delay": "^4.3.0",
"dirty-chai": "^2.0.1",
"ipfs-repo": "^2.0.0",
"ipfs-repo": "^3.0.1",
"ipfs-utils": "^2.2.0",
"iso-random-stream": "^1.1.1",
"it-all": "^1.0.2",
"it-drain": "^1.0.1",
"libp2p": "^0.27.0",
"libp2p-kad-dht": "^0.18.3",
"libp2p-mplex": "^0.9.2",
Expand All @@ -71,10 +70,13 @@
"peer-info": "^0.17.0",
"promisify-es6": "^1.0.3",
"rimraf": "^3.0.0",
"sinon": "^9.0.0",
"stats-lite": "^2.2.0",
"uuid": "^3.3.2"
"uuid": "^8.0.0"
},
"dependencies": {
"abort-controller": "^3.0.0",
"any-signal": "^1.1.0",
"bignumber.js": "^9.0.0",
"cids": "~0.8.0",
"debug": "^4.1.0",
Expand Down
164 changes: 95 additions & 69 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const DecisionEngine = require('./decision-engine')
const Notifications = require('./notifications')
const logger = require('./utils').logger
const Stats = require('./stats')
const AbortController = require('abort-controller')
const anySignal = require('any-signal')

const defaultOptions = {
statsEnabled: false,
Expand Down Expand Up @@ -101,9 +103,14 @@ class Bitswap {
this._log('received block')

const has = await this.blockstore.has(block.cid)

this._updateReceiveCounters(peerId.toB58String(), block, has)

if (has || !wasWanted) {
if (wasWanted) {
this._sendHaveBlockNotifications(block)
}

return
}

Expand Down Expand Up @@ -176,65 +183,88 @@ class Bitswap {
* blockstore it is returned, otherwise the block is added to the wantlist and returned once another node sends it to us.
*
* @param {CID} cid
* @param {Object} options
* @param {AbortSignal} options.abortSignal
* @returns {Promise<Block>}
*/
async get (cid) {
for await (const block of this.getMany([cid])) {
return block
async get (cid, options = {}) { // eslint-disable-line require-await
const fetchFromNetwork = (cid, options) => {
// add it to the want list - n.b. later we will abort the AbortSignal
// so no need to remove the blocks from the wantlist after we have it
this.wm.wantBlocks([cid], options)

return this.notifications.wantBlock(cid, options)
}
}

/**
* Fetch a a list of blocks by cid. If the blocks are in the local
* blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us.
*
* @param {Iterable<CID>} cids
* @returns {Promise<AsyncIterator<Block>>}
*/
async * getMany (cids) {
let pendingStart = cids.length
const wantList = []
let promptedNetwork = false

const fetchFromNetwork = async (cid) => {
wantList.push(cid)
const loadOrFetchFromNetwork = async (cid, options) => {
try {
// have to await here as we want to handle ERR_NOT_FOUND
const block = await this.blockstore.get(cid, options)

const blockP = this.notifications.wantBlock(cid)
return block
} catch (err) {
if (err.code !== 'ERR_NOT_FOUND') {
throw err
}

if (!pendingStart) {
this.wm.wantBlocks(wantList)
}
if (!promptedNetwork) {
promptedNetwork = true

const block = await blockP
this.wm.cancelWants([cid])
this.network.findAndConnect(cid)
.catch((err) => this._log.error(err))
}

return block
// we don't have the block locally so fetch it from the network
return fetchFromNetwork(cid, options)
}
}

for (const cid of cids) {
const has = await this.blockstore.has(cid)
pendingStart--
if (has) {
if (!pendingStart) {
this.wm.wantBlocks(wantList)
}
yield this.blockstore.get(cid)
// depending on implementation it's possible for blocks to come in while
// we do the async operations to get them from the blockstore leading to
// a race condition, so register for incoming block notifications as well
// as trying to get it from the datastore
const controller = new AbortController()
const signal = anySignal([options.signal, controller.signal])

const block = await Promise.race([
this.notifications.wantBlock(cid, {
signal
}),
loadOrFetchFromNetwork(cid, {
signal
})
])

continue
}
// since we have the block we can now remove our listener
controller.abort()

if (!promptedNetwork) {
promptedNetwork = true
this.network.findAndConnect(cids[0]).catch((err) => this._log.error(err))
}
return block
}

// we don't have the block locally so fetch it from the network
yield fetchFromNetwork(cid)
/**
* Fetch a a list of blocks by cid. If the blocks are in the local
* blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us.
*
* @param {AsyncIterator<CID>} cids
* @param {Object} options
* @param {AbortSignal} options.abortSignal
* @returns {Promise<AsyncIterator<Block>>}
*/
async * getMany (cids, options = {}) {
for await (const cid of cids) {
yield this.get(cid, options)
}
}

/**
* Removes the given CIDs from the wantlist independent of any ref counts
* Removes the given CIDs from the wantlist independent of any ref counts.
*
* This will cause all outstanding promises for a given block to reject.
*
* If you want to cancel the want for a block without doing that, pass an
* AbortSignal in to `.get` or `.getMany` and abort it.
*
* @param {Iterable<CID>} cids
* @returns {void}
Expand All @@ -249,7 +279,9 @@ class Bitswap {
}

/**
* Removes the given keys from the want list
* Removes the given keys from the want list. This may cause pending promises
* for blocks to never resolve. If you wish these promises to abort instead
* call `unwant(cids)` instead.
*
* @param {Iterable<CID>} cids
* @returns {void}
Expand All @@ -269,45 +301,39 @@ class Bitswap {
* @returns {Promise<void>}
*/
async put (block) { // eslint-disable-line require-await
return this.putMany([block])
await this.blockstore.put(block)
this._sendHaveBlockNotifications(block)
}

/**
* Put the given blocks to the underlying blockstore and
* send it to nodes that have it them their wantlist.
*
* @param {AsyncIterable<Block>|Iterable<Block>} blocks
* @returns {Promise<void>}
* @param {AsyncIterable<Block>} blocks
* @returns {AsyncIterable<Block>}
*/
async putMany (blocks) { // eslint-disable-line require-await
const self = this

// Add any new blocks to the blockstore
const newBlocks = []
await this.blockstore.putMany(async function * () {
for await (const block of blocks) {
if (await self.blockstore.has(block.cid)) {
continue
}
async * putMany (blocks) { // eslint-disable-line require-await
for await (const block of this.blockstore.putMany(blocks)) {
this._sendHaveBlockNotifications(block)

yield block
newBlocks.push(block)
}
}())

// Notify engine that we have new blocks
this.engine.receivedBlocks(newBlocks)

// Notify listeners that we have received the new blocks
for (const block of newBlocks) {
this.notifications.hasBlock(block)
// Note: Don't wait for provide to finish before returning
this.network.provide(block.cid).catch((err) => {
self._log.error('Failed to provide: %s', err.message)
})
yield block
}
}

/**
* Sends notifications about the arrival of a block
*
* @param {Block} block
*/
_sendHaveBlockNotifications (block) {
this.notifications.hasBlock(block)
this.engine.receivedBlocks([block])
// Note: Don't wait for provide to finish before returning
this.network.provide(block.cid).catch((err) => {
this._log.error('Failed to provide: %s', err.message)
})
}

/**
* Get the current list of wants.
*
Expand Down
39 changes: 27 additions & 12 deletions src/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,17 @@ class Network {
*
* @param {CID} cid
* @param {number} maxProviders
* @returns {Promise<Result<Array>>}
* @param {Object} options
* @param {AbortSignal} options.abortSignal
* @returns {AsyncIterable<PeerInfo>}
*/
findProviders (cid, maxProviders) {
findProviders (cid, maxProviders, options = {}) {
return this.libp2p.contentRouting.findProviders(
cid,
{
maxTimeout: CONSTANTS.providerRequestTimeout,
maxNumProviders: maxProviders
maxNumProviders: maxProviders,
signal: options.signal
}
)
}
Expand All @@ -121,19 +124,29 @@ class Network {
* Find the providers of a given `cid` and connect to them.
*
* @param {CID} cid
* @param {Object} options
* @param {AbortSignal} options.abortSignal
* @returns {void}
*/
async findAndConnect (cid) {
async findAndConnect (cid, options) {
const connectAttempts = []
for await (const provider of this.findProviders(cid, CONSTANTS.maxProvidersPerRequest)) {
for await (const provider of this.findProviders(cid, CONSTANTS.maxProvidersPerRequest, options)) {
this._log('connecting to providers', provider.id.toB58String())
connectAttempts.push(this.connectTo(provider))
connectAttempts.push(this.connectTo(provider, options))
}
await Promise.all(connectAttempts)
}

async provide (cid) {
await this.libp2p.contentRouting.provide(cid)
/**
* Tell the network we can provide content for the passed CID
*
* @param {CID} cid
* @param {Object} options
* @param {AbortSignal} options.abortSignal
* @returns {Promise<void>}
*/
async provide (cid, options) {
await this.libp2p.contentRouting.provide(cid, options)
}

// Connect to the given peer
Expand Down Expand Up @@ -169,19 +182,21 @@ class Network {
* Connects to another peer
*
* @param {PeerInfo|PeerId|Multiaddr} peer
* @returns {Promise.<Connection>}
* @param {Object} options
* @param {AbortSignal} options.abortSignal
* @returns {Promise<Connection>}
*/
async connectTo (peer) { // eslint-disable-line require-await
async connectTo (peer, options) { // eslint-disable-line require-await
if (!this._running) {
throw new Error('network isn\'t running')
}

return this.libp2p.dial(peer)
return this.libp2p.dial(peer, options)
}

// Dial to the peer and try to use the most recent Bitswap
_dialPeer (peer) {
return this.libp2p.dialProtocol(peer, [BITSWAP110, BITSWAP100])
return this.libp2p.dialProtocol(peer, [BITSWAP120, BITSWAP110, BITSWAP100])
}

_updateSentStats (peer, blocks) {
Expand Down
Loading