diff --git a/packages/ipfs-core/package.json b/packages/ipfs-core/package.json index 33e53a5a77..f8b21885a9 100644 --- a/packages/ipfs-core/package.json +++ b/packages/ipfs-core/package.json @@ -105,6 +105,7 @@ "it-last": "^1.0.4", "it-map": "^1.0.4", "it-merge": "^1.0.2", + "it-parallel": "^1.0.0", "it-peekable": "^1.0.2", "it-pipe": "^1.1.0", "it-pushable": "^1.4.2", @@ -128,7 +129,6 @@ "pako": "^1.0.2", "parse-duration": "^1.0.0", "peer-id": "^0.15.1", - "streaming-iterables": "^6.0.0", "timeout-abort-controller": "^1.1.1", "uint8arrays": "^3.0.0" }, diff --git a/packages/ipfs-core/src/components/block/rm.js b/packages/ipfs-core/src/components/block/rm.js index 4554e14b52..5143dc2839 100644 --- a/packages/ipfs-core/src/components/block/rm.js +++ b/packages/ipfs-core/src/components/block/rm.js @@ -1,6 +1,8 @@ import errCode from 'err-code' -import { parallelMap, filter } from 'streaming-iterables' +import parallel from 'it-parallel' +import map from 'it-map' +import filter from 'it-filter' import { pipe } from 'it-pipe' import { cleanCid } from './utils.js' import { withTimeoutOption } from 'ipfs-core-utils/with-timeout-option' @@ -27,30 +29,33 @@ export function createRm ({ repo }) { try { yield * pipe( cids, - parallelMap(BLOCK_RM_CONCURRENCY, async cid => { - cid = cleanCid(cid) + source => map(source, cid => { + return async () => { + cid = cleanCid(cid) - /** @type {import('ipfs-core-types/src/block').RmResult} */ - const result = { cid } + /** @type {import('ipfs-core-types/src/block').RmResult} */ + const result = { cid } - try { - const has = await repo.blocks.has(cid) + try { + const has = await repo.blocks.has(cid) - if (!has) { - throw errCode(new Error('block not found'), 'ERR_BLOCK_NOT_FOUND') - } + if (!has) { + throw errCode(new Error('block not found'), 'ERR_BLOCK_NOT_FOUND') + } - await repo.blocks.delete(cid) - } catch (/** @type {any} */ err) { - if (!options.force) { - err.message = `cannot remove ${cid}: ${err.message}` - result.error = err + await repo.blocks.delete(cid) + } catch (/** @type {any} */ err) { + if (!options.force) { + err.message = `cannot remove ${cid}: ${err.message}` + result.error = err + } } - } - return result + return result + } }), - filter(() => !options.quiet) + source => parallel(source, BLOCK_RM_CONCURRENCY), + source => filter(source, () => !options.quiet) ) } finally { release()