From 1d19c4fc5587f845257f3d30593e202c3e6cdcf6 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Wed, 11 Dec 2019 11:00:06 +0000 Subject: [PATCH] perf: expose importer concurrency controls when adding files (#2637) * perf: expose importer concurrency controls when adding files Adds two new arguments to the cli & http interface: `--file-import-concurrency` and `--block-write-concurrency` See https://github.com/ipfs/js-ipfs-unixfs-importer/pull/41 for futher discussion. * chore: update deps, remove unused * fix: pass args from http * fix: hard code file concurrency for http requests * fix: fix up chunker parsing tests * chore: use ipfs-http-client branch temporarily * chore: increase bundlesize by 1kb * chore: update dep * chore: increase bundle size again --- .aegir.js | 2 +- package.json | 7 +++---- src/cli/commands/add.js | 15 ++++++++++++++- src/core/components/block.js | 2 +- src/core/components/dag.js | 2 +- src/core/components/files-mfs.js | 2 +- .../files-regular/add-async-iterator.js | 3 +-- src/core/components/files-regular/add.js | 2 +- src/core/components/files-regular/cat.js | 2 +- src/core/components/files-regular/get.js | 2 +- src/core/components/files-regular/ls.js | 2 +- src/core/components/files-regular/refs-local.js | 2 +- src/core/components/files-regular/refs.js | 2 +- src/core/components/files-regular/utils.js | 6 ++---- src/core/runtime/add-from-fs-nodejs.js | 2 +- src/http/api/resources/block.js | 2 +- src/http/api/resources/config.js | 2 +- src/http/api/resources/dag.js | 2 +- src/http/api/resources/files-regular.js | 10 +++++++++- src/http/api/resources/object.js | 2 +- test/core/files-regular-utils.js | 12 ++++++------ 21 files changed, 50 insertions(+), 33 deletions(-) diff --git a/.aegir.js b/.aegir.js index 1d3c14e693..c0e3476112 100644 --- a/.aegir.js +++ b/.aegir.js @@ -9,7 +9,7 @@ const preloadNode = MockPreloadNode.createNode() const echoServer = EchoServer.createServer() module.exports = { - bundlesize: { maxSize: '651kB' }, + bundlesize: { maxSize: '652kB' }, webpack: { resolve: { mainFields: ['browser', 'main'], diff --git a/package.json b/package.json index 0300c5b8d1..6d2acd2849 100644 --- a/package.json +++ b/package.json @@ -66,8 +66,6 @@ "@hapi/joi": "^15.0.0", "abort-controller": "^3.0.0", "array-shuffle": "^1.0.1", - "async-iterator-all": "^1.0.0", - "async-iterator-first": "^1.0.0", "async-iterator-to-pull-stream": "^1.3.0", "async-iterator-to-stream": "^1.1.0", "base32.js": "~0.1.0", @@ -100,14 +98,14 @@ "ipfs-bitswap": "^0.26.0", "ipfs-block": "~0.8.1", "ipfs-block-service": "~0.16.0", - "ipfs-http-client": "^40.0.1", + "ipfs-http-client": "^40.1.0", "ipfs-http-response": "~0.4.0", "ipfs-mfs": "^0.13.2", "ipfs-multipart": "^0.2.0", "ipfs-repo": "^0.30.0", "ipfs-unixfs": "~0.1.16", "ipfs-unixfs-exporter": "^0.38.0", - "ipfs-unixfs-importer": "^0.40.0", + "ipfs-unixfs-importer": "^0.42.0", "ipfs-utils": "~0.4.0", "ipld": "~0.25.0", "ipld-bitcoin": "~0.3.0", @@ -123,6 +121,7 @@ "is-pull-stream": "~0.0.0", "is-stream": "^2.0.0", "iso-url": "~0.4.6", + "it-all": "^1.0.1", "it-pipe": "^1.0.1", "it-to-stream": "^0.1.1", "jsondiffpatch": "~0.3.11", diff --git a/src/cli/commands/add.js b/src/cli/commands/add.js index 2e1c8cd413..e60e0c78c0 100644 --- a/src/cli/commands/add.js +++ b/src/cli/commands/add.js @@ -49,10 +49,20 @@ module.exports = { default: false, describe: 'Only chunk and hash, do not write' }, + 'block-write-concurrency': { + type: 'integer', + default: 10, + describe: 'After a file has been chunked, this controls how many chunks to hash and add to the block store concurrently' + }, chunker: { default: 'size-262144', describe: 'Chunking algorithm to use, formatted like [size-{size}, rabin, rabin-{avg}, rabin-{min}-{avg}-{max}]' }, + 'file-import-concurrency': { + type: 'integer', + default: 50, + describe: 'How many files to import at once' + }, 'enable-sharding-experiment': { type: 'boolean', default: false @@ -130,7 +140,10 @@ module.exports = { wrapWithDirectory: argv.wrapWithDirectory, pin: argv.pin, chunker: argv.chunker, - preload: argv.preload + preload: argv.preload, + nonatomic: argv.nonatomic, + fileImportConcurrency: argv.fileImportConcurrency, + blockWriteConcurrency: argv.blockWriteConcurrency } if (options.enableShardingExperiment && argv.isDaemonOn()) { diff --git a/src/core/components/block.js b/src/core/components/block.js index 87b00fc52c..59f6603213 100644 --- a/src/core/components/block.js +++ b/src/core/components/block.js @@ -5,7 +5,7 @@ const multihashing = require('multihashing-async') const CID = require('cids') const callbackify = require('callbackify') const errCode = require('err-code') -const all = require('async-iterator-all') +const all = require('it-all') const { PinTypes } = require('./pin/pin-manager') module.exports = function block (self) { diff --git a/src/core/components/dag.js b/src/core/components/dag.js index fd704e8139..898ba84ccf 100644 --- a/src/core/components/dag.js +++ b/src/core/components/dag.js @@ -2,7 +2,7 @@ const callbackify = require('callbackify') const CID = require('cids') -const all = require('async-iterator-all') +const all = require('it-all') const errCode = require('err-code') const multicodec = require('multicodec') diff --git a/src/core/components/files-mfs.js b/src/core/components/files-mfs.js index 9d621ad60a..2a17d9e39c 100644 --- a/src/core/components/files-mfs.js +++ b/src/core/components/files-mfs.js @@ -5,7 +5,7 @@ const isPullStream = require('is-pull-stream') const toPullStream = require('async-iterator-to-pull-stream') const toReadableStream = require('async-iterator-to-stream') const pullStreamToAsyncIterator = require('pull-stream-to-async-iterator') -const all = require('async-iterator-all') +const all = require('it-all') const nodeify = require('promise-nodeify') const PassThrough = require('stream').PassThrough const pull = require('pull-stream/pull') diff --git a/src/core/components/files-regular/add-async-iterator.js b/src/core/components/files-regular/add-async-iterator.js index e138a1cd66..562d35c039 100644 --- a/src/core/components/files-regular/add-async-iterator.js +++ b/src/core/components/files-regular/add-async-iterator.js @@ -22,8 +22,7 @@ module.exports = function (self) { : Infinity }, options, { strategy: 'balanced', - chunker: chunkerOptions.chunker, - chunkerOptions: chunkerOptions.chunkerOptions + ...chunkerOptions }) // CID v0 is for multihashes encoded with sha2-256 diff --git a/src/core/components/files-regular/add.js b/src/core/components/files-regular/add.js index dcf9e7bf52..d79bcfc39f 100644 --- a/src/core/components/files-regular/add.js +++ b/src/core/components/files-regular/add.js @@ -1,6 +1,6 @@ 'use strict' -const all = require('async-iterator-all') +const all = require('it-all') module.exports = function (self) { // can't use callbackify because if `data` is a pull stream diff --git a/src/core/components/files-regular/cat.js b/src/core/components/files-regular/cat.js index 656946ad03..020e2848c3 100644 --- a/src/core/components/files-regular/cat.js +++ b/src/core/components/files-regular/cat.js @@ -1,7 +1,7 @@ 'use strict' const callbackify = require('callbackify') -const all = require('async-iterator-all') +const all = require('it-all') module.exports = function (self) { return callbackify.variadic(async function cat (ipfsPath, options) { diff --git a/src/core/components/files-regular/get.js b/src/core/components/files-regular/get.js index 58e0434dfc..dd0d1243c1 100644 --- a/src/core/components/files-regular/get.js +++ b/src/core/components/files-regular/get.js @@ -1,7 +1,7 @@ 'use strict' const callbackify = require('callbackify') -const all = require('async-iterator-all') +const all = require('it-all') module.exports = function (self) { return callbackify.variadic(async function get (ipfsPath, options) { // eslint-disable-line require-await diff --git a/src/core/components/files-regular/ls.js b/src/core/components/files-regular/ls.js index 9ae4a71a97..db35300791 100644 --- a/src/core/components/files-regular/ls.js +++ b/src/core/components/files-regular/ls.js @@ -1,7 +1,7 @@ 'use strict' const callbackify = require('callbackify') -const all = require('async-iterator-all') +const all = require('it-all') module.exports = function (self) { return callbackify.variadic(async function ls (ipfsPath, options) { // eslint-disable-line require-await diff --git a/src/core/components/files-regular/refs-local.js b/src/core/components/files-regular/refs-local.js index 799b384e30..8812d97c8e 100644 --- a/src/core/components/files-regular/refs-local.js +++ b/src/core/components/files-regular/refs-local.js @@ -1,7 +1,7 @@ 'use strict' const callbackify = require('callbackify') -const all = require('async-iterator-all') +const all = require('it-all') module.exports = function (self) { return callbackify.variadic(async function refsLocal (ipfsPath, options) { // eslint-disable-line require-await diff --git a/src/core/components/files-regular/refs.js b/src/core/components/files-regular/refs.js index 4876457606..3e3f490197 100644 --- a/src/core/components/files-regular/refs.js +++ b/src/core/components/files-regular/refs.js @@ -1,7 +1,7 @@ 'use strict' const callbackify = require('callbackify') -const all = require('async-iterator-all') +const all = require('it-all') module.exports = function (self) { return callbackify.variadic(async function refs (ipfsPath, options) { // eslint-disable-line require-await diff --git a/src/core/components/files-regular/utils.js b/src/core/components/files-regular/utils.js index 876d0b0d48..8cd05f45d4 100644 --- a/src/core/components/files-regular/utils.js +++ b/src/core/components/files-regular/utils.js @@ -45,14 +45,12 @@ const parseChunkerString = (chunker) => { } return { chunker: 'fixed', - chunkerOptions: { - maxChunkSize: size - } + maxChunkSize: size } } else if (chunker.startsWith('rabin')) { return { chunker: 'rabin', - chunkerOptions: parseRabinString(chunker) + ...parseRabinString(chunker) } } else { throw new Error(`Unrecognized chunker option: ${chunker}`) diff --git a/src/core/runtime/add-from-fs-nodejs.js b/src/core/runtime/add-from-fs-nodejs.js index 33bc3954e2..8a30815135 100644 --- a/src/core/runtime/add-from-fs-nodejs.js +++ b/src/core/runtime/add-from-fs-nodejs.js @@ -2,7 +2,7 @@ const callbackify = require('callbackify') const globSource = require('ipfs-utils/src/files/glob-source') -const all = require('async-iterator-all') +const all = require('it-all') module.exports = self => { return callbackify.variadic(async (...args) => { // eslint-disable-line require-await diff --git a/src/http/api/resources/block.js b/src/http/api/resources/block.js index c88b25b15f..ebad45251b 100644 --- a/src/http/api/resources/block.js +++ b/src/http/api/resources/block.js @@ -7,7 +7,7 @@ const multibase = require('multibase') const Boom = require('@hapi/boom') const { cidToString } = require('../../../utils/cid') const debug = require('debug') -const all = require('async-iterator-all') +const all = require('it-all') const streamResponse = require('../../utils/stream-response') const log = debug('ipfs:http-api:block') log.error = debug('ipfs:http-api:block:error') diff --git a/src/http/api/resources/config.js b/src/http/api/resources/config.js index bd45c00150..604506671b 100644 --- a/src/http/api/resources/config.js +++ b/src/http/api/resources/config.js @@ -9,7 +9,7 @@ const multipart = require('ipfs-multipart') const Boom = require('@hapi/boom') const Joi = require('@hapi/joi') const { profiles } = require('../../../core/components/config') -const all = require('async-iterator-all') +const all = require('it-all') exports.getOrSet = { // pre request handler that parses the args and returns `key` & `value` which are assigned to `request.pre.args` diff --git a/src/http/api/resources/dag.js b/src/http/api/resources/dag.js index 436382bc38..ff7fd32c01 100644 --- a/src/http/api/resources/dag.js +++ b/src/http/api/resources/dag.js @@ -11,7 +11,7 @@ const debug = require('debug') const { cidToString } = require('../../../utils/cid') -const all = require('async-iterator-all') +const all = require('it-all') const log = debug('ipfs:http-api:dag') log.error = debug('ipfs:http-api:dag:error') diff --git a/src/http/api/resources/files-regular.js b/src/http/api/resources/files-regular.js index bc963ce3d1..4c466f6959 100644 --- a/src/http/api/resources/files-regular.js +++ b/src/http/api/resources/files-regular.js @@ -159,6 +159,8 @@ exports.add = { 'only-hash': Joi.boolean(), pin: Joi.boolean().default(true), 'wrap-with-directory': Joi.boolean(), + 'file-import-concurrency': Joi.number().integer().min(0).default(50), + 'block-write-concurrency': Joi.number().integer().min(0).default(10), chunker: Joi.string(), trickle: Joi.boolean(), preload: Joi.boolean().default(true) @@ -218,7 +220,13 @@ exports.add = { pin: request.query.pin, chunker: request.query.chunker, trickle: request.query.trickle, - preload: request.query.preload + preload: request.query.preload, + + // this has to be hardcoded to 1 because we can only read one file + // at a time from a http request and we have to consume it completely + // before we can read the next file + fileImportConcurrency: 1, + blockWriteConcurrency: request.query['block-write-concurrency'] }) }, async function (source) { diff --git a/src/http/api/resources/object.js b/src/http/api/resources/object.js index 6d1bd660ee..db3e97d6db 100644 --- a/src/http/api/resources/object.js +++ b/src/http/api/resources/object.js @@ -2,7 +2,7 @@ const CID = require('cids') const multipart = require('ipfs-multipart') -const all = require('async-iterator-all') +const all = require('it-all') const dagPB = require('ipld-dag-pb') const { DAGNode, DAGLink } = dagPB const Joi = require('@hapi/joi') diff --git a/test/core/files-regular-utils.js b/test/core/files-regular-utils.js index d8520cf583..b9c9c93a92 100644 --- a/test/core/files-regular-utils.js +++ b/test/core/files-regular-utils.js @@ -20,27 +20,27 @@ describe('files-regular/utils', () => { it('parses a fixed size string', () => { const options = utils.parseChunkerString('size-512') expect(options.chunker).to.equal('fixed') - expect(options.chunkerOptions.maxChunkSize).to.equal(512) + expect(options.maxChunkSize).to.equal(512) }) it('parses a rabin string without size', () => { const options = utils.parseChunkerString('rabin') expect(options.chunker).to.equal('rabin') - expect(options.chunkerOptions.avgChunkSize).to.equal(262144) + expect(options.avgChunkSize).to.equal(262144) }) it('parses a rabin string with only avg size', () => { const options = utils.parseChunkerString('rabin-512') expect(options.chunker).to.equal('rabin') - expect(options.chunkerOptions.avgChunkSize).to.equal(512) + expect(options.avgChunkSize).to.equal(512) }) it('parses a rabin string with min, avg, and max', () => { const options = utils.parseChunkerString('rabin-42-92-184') expect(options.chunker).to.equal('rabin') - expect(options.chunkerOptions.minChunkSize).to.equal(42) - expect(options.chunkerOptions.avgChunkSize).to.equal(92) - expect(options.chunkerOptions.maxChunkSize).to.equal(184) + expect(options.minChunkSize).to.equal(42) + expect(options.avgChunkSize).to.equal(92) + expect(options.maxChunkSize).to.equal(184) }) it('throws an error for unsupported chunker type', () => {