From 7a3621151e1552e7fcb7574729d3405b37a3f571 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 20 Aug 2019 17:57:37 +0100 Subject: [PATCH 1/9] fix: Do not load all of a DAG into memory when pinning Given a `CID`, the `dag._recursiveGet` method returns a list of all descendents of the node with the passed `CID`. This can cause enormous memory useage when importing large datasets. Where this method is invoked the results are either a) disgarded or b) used to calculate the `CID`s of the nodes which is then bad for memory *and* CPU usage. This PR removes the buffering and `CID` recalculating for a nice speedup when adding large datasets. fixes #2310 --- package.json | 1 - src/core/components/dag.js | 34 ----------------- src/core/components/pin.js | 77 +++++++++++++++++++++++++------------- 3 files changed, 50 insertions(+), 62 deletions(-) diff --git a/package.json b/package.json index 68ff78cf34..1055590065 100644 --- a/package.json +++ b/package.json @@ -118,7 +118,6 @@ "is-pull-stream": "~0.0.0", "is-stream": "^2.0.0", "iso-url": "~0.4.6", - "just-flatten-it": "^2.1.0", "just-safe-set": "^2.1.0", "kind-of": "^6.0.2", "libp2p": "~0.25.4", diff --git a/src/core/components/dag.js b/src/core/components/dag.js index 014d0f3c9b..0f2f90a563 100644 --- a/src/core/components/dag.js +++ b/src/core/components/dag.js @@ -4,9 +4,7 @@ const promisify = require('promisify-es6') const CID = require('cids') const pull = require('pull-stream') const iterToPull = require('async-iterator-to-pull-stream') -const mapAsync = require('async/map') const setImmediate = require('async/setImmediate') -const flattenDeep = require('just-flatten-it') const errCode = require('err-code') const multicodec = require('multicodec') @@ -180,38 +178,6 @@ module.exports = function dag (self) { iterToPull(self._ipld.tree(cid, path, options)), pull.collect(callback) ) - }), - - // TODO - use IPLD selectors once they are implemented - _getRecursive: promisify((multihash, options, callback) => { - // gets flat array of all DAGNodes in tree given by multihash - - if (typeof options === 'function') { - callback = options - options = {} - } - - options = options || {} - - let cid - - try { - cid = new CID(multihash) - } catch (err) { - return setImmediate(() => callback(errCode(err, 'ERR_INVALID_CID'))) - } - - self.dag.get(cid, '', options, (err, res) => { - if (err) { return callback(err) } - - mapAsync(res.value.Links, (link, cb) => { - self.dag._getRecursive(link.Hash, options, cb) - }, (err, nodes) => { - // console.log('nodes:', nodes) - if (err) return callback(err) - callback(null, flattenDeep([res.value, nodes])) - }) - }) }) } } diff --git a/src/core/components/pin.js b/src/core/components/pin.js index c988a70026..1096318560 100644 --- a/src/core/components/pin.js +++ b/src/core/components/pin.js @@ -2,7 +2,7 @@ 'use strict' const promisify = require('promisify-es6') -const { DAGNode, DAGLink, util } = require('ipld-dag-pb') +const { DAGNode, DAGLink } = require('ipld-dag-pb') const CID = require('cids') const map = require('async/map') const mapSeries = require('async/mapSeries') @@ -12,6 +12,7 @@ const eachLimit = require('async/eachLimit') const waterfall = require('async/waterfall') const detectLimit = require('async/detectLimit') const setImmediate = require('async/setImmediate') +const queue = require('async/queue') const { Key } = require('interface-datastore') const errCode = require('err-code') const multibase = require('multibase') @@ -52,30 +53,49 @@ module.exports = (self) => { const recursiveKeys = () => Array.from(recursivePins).map(key => new CID(key).buffer) - function getIndirectKeys (callback) { - const indirectKeys = new Set() - eachLimit(recursiveKeys(), concurrencyLimit, (multihash, cb) => { - dag._getRecursive(multihash, (err, nodes) => { + function walkDag ({ cid, onCid = () => {} }, cb) { + const q = queue(function ({ cid }, done) { + dag.get(cid, { preload: false }, function (err, result) { if (err) { - return cb(err) + return done(err) } - map(nodes, (node, cb) => util.cid(util.serialize(node), { - cidVersion: 0 - }).then(cid => cb(null, cid), cb), (err, cids) => { - if (err) { - return cb(err) - } + onCid(cid) - cids - .map(cid => cid.toString()) - // recursive pins pre-empt indirect pins - .filter(key => !recursivePins.has(key)) - .forEach(key => indirectKeys.add(key)) + if (result.value.Links) { + q.push(result.value.Links.map(link => ({ + cid: link.Hash + }))) + } - cb() - }) + done() }) + }, concurrencyLimit) + q.drain = () => { + cb() + } + q.error = (err) => { + q.kill() + cb(err) + } + q.push({ cid }) + } + + function getIndirectKeys (callback) { + const indirectKeys = new Set() + eachLimit(recursiveKeys(), concurrencyLimit, (multihash, cb) => { + // load every hash in the graph + walkDag({ + cid: new CID(multihash), + onCid: (cid) => { + cid = cid.toString() + + // recursive pins pre-empt indirect pins + if (!recursivePins.has(cid)) { + indirectKeys.add(cid) + } + } + }, cb) }, (err) => { if (err) { return callback(err) } callback(null, Array.from(indirectKeys)) @@ -184,7 +204,9 @@ module.exports = (self) => { // verify that each hash can be pinned map(mhs, (multihash, cb) => { - const key = toB58String(multihash) + const cid = new CID(multihash) + const key = cid.toBaseEncodedString() + if (recursive) { if (recursivePins.has(key)) { // it's already pinned recursively @@ -193,11 +215,10 @@ module.exports = (self) => { // entire graph of nested links should be pinned, // so make sure we have all the objects - dag._getRecursive(key, { preload: options.preload }, (err) => { - if (err) { return cb(err) } - // found all objects, we can add the pin - return cb(null, key) - }) + walkDag({ + dag, + cid + }, (err) => cb(err, key)) } else { if (recursivePins.has(key)) { // recursive supersedes direct, can't have both @@ -209,8 +230,10 @@ module.exports = (self) => { } // make sure we have the object - dag.get(new CID(multihash), { preload: options.preload }, (err) => { - if (err) { return cb(err) } + dag.get(cid, (err) => { + if (err) { + return cb(err) + } // found the object, we can add the pin return cb(null, key) }) From 7f0673163d965a40e46ab86a864c740541227ef5 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 20 Aug 2019 18:28:26 +0100 Subject: [PATCH 2/9] chore: because removing a dep has made the bundle bigger? --- .aegir.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.aegir.js b/.aegir.js index 6d0ed4b2dd..25570c174a 100644 --- a/.aegir.js +++ b/.aegir.js @@ -8,7 +8,7 @@ const ipfsdServer = IPFSFactory.createServer() const preloadNode = MockPreloadNode.createNode() module.exports = { - bundlesize: { maxSize: '689kB' }, + bundlesize: { maxSize: '756KB' }, webpack: { resolve: { mainFields: ['browser', 'main'], From c7e9f8569b4773c72eb0bf75743a10a0fd97d19a Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 20 Aug 2019 18:30:37 +0100 Subject: [PATCH 3/9] chore: pass in preload args again --- src/core/components/pin.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/components/pin.js b/src/core/components/pin.js index 1096318560..2eaef96d1a 100644 --- a/src/core/components/pin.js +++ b/src/core/components/pin.js @@ -230,7 +230,7 @@ module.exports = (self) => { } // make sure we have the object - dag.get(cid, (err) => { + dag.get(cid, { preload: options.preload }, (err) => { if (err) { return cb(err) } From 74c5c8bdea42de7c76df1cd6c640be5c24d762bf Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 21 Aug 2019 09:27:52 +0100 Subject: [PATCH 4/9] chore: respect preload option --- src/core/components/pin.js | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/core/components/pin.js b/src/core/components/pin.js index 2eaef96d1a..70e8e4af54 100644 --- a/src/core/components/pin.js +++ b/src/core/components/pin.js @@ -53,9 +53,9 @@ module.exports = (self) => { const recursiveKeys = () => Array.from(recursivePins).map(key => new CID(key).buffer) - function walkDag ({ cid, onCid = () => {} }, cb) { + function walkDag ({ cid, preload = false, onCid = () => {} }, cb) { const q = queue(function ({ cid }, done) { - dag.get(cid, { preload: false }, function (err, result) { + dag.get(cid, { preload }, function (err, result) { if (err) { return done(err) } @@ -217,7 +217,8 @@ module.exports = (self) => { // so make sure we have all the objects walkDag({ dag, - cid + cid, + preload: options.preload }, (err) => cb(err, key)) } else { if (recursivePins.has(key)) { From c3e8094512b3da9692a7c123245b3223576c4c87 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 21 Aug 2019 09:28:32 +0100 Subject: [PATCH 5/9] chore: increase bundle size to something symbolic-but-bad --- .aegir.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.aegir.js b/.aegir.js index 25570c174a..c3b16d5569 100644 --- a/.aegir.js +++ b/.aegir.js @@ -8,7 +8,7 @@ const ipfsdServer = IPFSFactory.createServer() const preloadNode = MockPreloadNode.createNode() module.exports = { - bundlesize: { maxSize: '756KB' }, + bundlesize: { maxSize: '1MB' }, webpack: { resolve: { mainFields: ['browser', 'main'], From cc8ef7633fa79c920e5f45852b78246e1621a271 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 21 Aug 2019 10:37:52 +0100 Subject: [PATCH 6/9] chore: reset bundle size --- .aegir.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.aegir.js b/.aegir.js index c3b16d5569..25570c174a 100644 --- a/.aegir.js +++ b/.aegir.js @@ -8,7 +8,7 @@ const ipfsdServer = IPFSFactory.createServer() const preloadNode = MockPreloadNode.createNode() module.exports = { - bundlesize: { maxSize: '1MB' }, + bundlesize: { maxSize: '756KB' }, webpack: { resolve: { mainFields: ['browser', 'main'], From 60b4f52c09de4a182f596617d2559ed4667da88d Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 21 Aug 2019 11:19:10 +0100 Subject: [PATCH 7/9] chore: disable bundle check temporarily --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index f7eef6a3ce..cf551e6e12 100644 --- a/.travis.yml +++ b/.travis.yml @@ -25,7 +25,7 @@ jobs: include: - stage: check script: - - npx aegir build --bundlesize + # - npx aegir build --bundlesize - npx aegir dep-check -- -i wrtc -i electron-webrtc - npm run lint From 72d2f322309d2a3ab0fe22e0b49518279f8c08db Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 21 Aug 2019 14:59:37 +0100 Subject: [PATCH 8/9] chore: revert bundlesize increase --- .aegir.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.aegir.js b/.aegir.js index 25570c174a..6d0ed4b2dd 100644 --- a/.aegir.js +++ b/.aegir.js @@ -8,7 +8,7 @@ const ipfsdServer = IPFSFactory.createServer() const preloadNode = MockPreloadNode.createNode() module.exports = { - bundlesize: { maxSize: '756KB' }, + bundlesize: { maxSize: '689kB' }, webpack: { resolve: { mainFields: ['browser', 'main'], From 7177bb7b7a8340a134e0e616fed57da3dfac8340 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 21 Aug 2019 15:02:53 +0100 Subject: [PATCH 9/9] chore: pass preload option when listing pins --- src/core/components/pin.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/core/components/pin.js b/src/core/components/pin.js index 70e8e4af54..a41cba72d4 100644 --- a/src/core/components/pin.js +++ b/src/core/components/pin.js @@ -81,12 +81,13 @@ module.exports = (self) => { q.push({ cid }) } - function getIndirectKeys (callback) { + function getIndirectKeys ({ preload }, callback) { const indirectKeys = new Set() eachLimit(recursiveKeys(), concurrencyLimit, (multihash, cb) => { // load every hash in the graph walkDag({ cid: new CID(multihash), + preload: preload || false, onCid: (cid) => { cid = cid.toString() @@ -398,7 +399,7 @@ module.exports = (self) => { ) } if (type === types.indirect || type === types.all) { - getIndirectKeys((err, indirects) => { + getIndirectKeys(options, (err, indirects) => { if (err) { return callback(err) } pins = pins // if something is pinned both directly and indirectly,