Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

fix: Do not load all of a DAG into memory when pinning #2372

Merged
merged 9 commits into from
Aug 21, 2019
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
include:
- stage: check
script:
- npx aegir build --bundlesize
# - npx aegir build --bundlesize
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self we need a sister PR that re-enables this after it is merged.

- npx aegir dep-check -- -i wrtc -i electron-webrtc
- npm run lint

Expand Down
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@
"is-pull-stream": "~0.0.0",
"is-stream": "^2.0.0",
"iso-url": "~0.4.6",
"just-flatten-it": "^2.1.0",
"just-safe-set": "^2.1.0",
"kind-of": "^6.0.2",
"libp2p": "~0.25.4",
Expand Down
34 changes: 0 additions & 34 deletions src/core/components/dag.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ const promisify = require('promisify-es6')
const CID = require('cids')
const pull = require('pull-stream')
const iterToPull = require('async-iterator-to-pull-stream')
const mapAsync = require('async/map')
const setImmediate = require('async/setImmediate')
const flattenDeep = require('just-flatten-it')
const errCode = require('err-code')
const multicodec = require('multicodec')

Expand Down Expand Up @@ -180,38 +178,6 @@ module.exports = function dag (self) {
iterToPull(self._ipld.tree(cid, path, options)),
pull.collect(callback)
)
}),

// TODO - use IPLD selectors once they are implemented
_getRecursive: promisify((multihash, options, callback) => {
// gets flat array of all DAGNodes in tree given by multihash

if (typeof options === 'function') {
callback = options
options = {}
}

options = options || {}

let cid

try {
cid = new CID(multihash)
} catch (err) {
return setImmediate(() => callback(errCode(err, 'ERR_INVALID_CID')))
}

self.dag.get(cid, '', options, (err, res) => {
if (err) { return callback(err) }

mapAsync(res.value.Links, (link, cb) => {
self.dag._getRecursive(link.Hash, options, cb)
}, (err, nodes) => {
// console.log('nodes:', nodes)
if (err) return callback(err)
callback(null, flattenDeep([res.value, nodes]))
})
})
})
}
}
81 changes: 53 additions & 28 deletions src/core/components/pin.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
'use strict'

const promisify = require('promisify-es6')
const { DAGNode, DAGLink, util } = require('ipld-dag-pb')
const { DAGNode, DAGLink } = require('ipld-dag-pb')
const CID = require('cids')
const map = require('async/map')
const mapSeries = require('async/mapSeries')
Expand All @@ -12,6 +12,7 @@ const eachLimit = require('async/eachLimit')
const waterfall = require('async/waterfall')
const detectLimit = require('async/detectLimit')
const setImmediate = require('async/setImmediate')
const queue = require('async/queue')
const { Key } = require('interface-datastore')
const errCode = require('err-code')
const multibase = require('multibase')
Expand Down Expand Up @@ -52,30 +53,50 @@ module.exports = (self) => {
const recursiveKeys = () =>
Array.from(recursivePins).map(key => new CID(key).buffer)

function getIndirectKeys (callback) {
const indirectKeys = new Set()
eachLimit(recursiveKeys(), concurrencyLimit, (multihash, cb) => {
dag._getRecursive(multihash, (err, nodes) => {
function walkDag ({ cid, preload = false, onCid = () => {} }, cb) {
const q = queue(function ({ cid }, done) {
dag.get(cid, { preload }, function (err, result) {
if (err) {
return cb(err)
return done(err)
}

map(nodes, (node, cb) => util.cid(util.serialize(node), {
cidVersion: 0
}).then(cid => cb(null, cid), cb), (err, cids) => {
if (err) {
return cb(err)
}
onCid(cid)

cids
.map(cid => cid.toString())
// recursive pins pre-empt indirect pins
.filter(key => !recursivePins.has(key))
.forEach(key => indirectKeys.add(key))
if (result.value.Links) {
q.push(result.value.Links.map(link => ({
cid: link.Hash
})))
}

cb()
})
done()
})
}, concurrencyLimit)
q.drain = () => {
cb()
}
q.error = (err) => {
q.kill()
cb(err)
}
q.push({ cid })
}

function getIndirectKeys ({ preload }, callback) {
const indirectKeys = new Set()
eachLimit(recursiveKeys(), concurrencyLimit, (multihash, cb) => {
// load every hash in the graph
walkDag({
cid: new CID(multihash),
preload: preload || false,
onCid: (cid) => {
cid = cid.toString()

// recursive pins pre-empt indirect pins
if (!recursivePins.has(cid)) {
indirectKeys.add(cid)
}
}
}, cb)
}, (err) => {
if (err) { return callback(err) }
callback(null, Array.from(indirectKeys))
Expand Down Expand Up @@ -184,7 +205,9 @@ module.exports = (self) => {

// verify that each hash can be pinned
map(mhs, (multihash, cb) => {
const key = toB58String(multihash)
const cid = new CID(multihash)
const key = cid.toBaseEncodedString()

if (recursive) {
if (recursivePins.has(key)) {
// it's already pinned recursively
Expand All @@ -193,11 +216,11 @@ module.exports = (self) => {

// entire graph of nested links should be pinned,
// so make sure we have all the objects
dag._getRecursive(key, { preload: options.preload }, (err) => {
if (err) { return cb(err) }
// found all objects, we can add the pin
return cb(null, key)
})
walkDag({
dag,
cid,
preload: options.preload
}, (err) => cb(err, key))
} else {
if (recursivePins.has(key)) {
// recursive supersedes direct, can't have both
Expand All @@ -209,8 +232,10 @@ module.exports = (self) => {
}

// make sure we have the object
dag.get(new CID(multihash), { preload: options.preload }, (err) => {
if (err) { return cb(err) }
dag.get(cid, { preload: options.preload }, (err) => {
achingbrain marked this conversation as resolved.
Show resolved Hide resolved
if (err) {
return cb(err)
}
// found the object, we can add the pin
return cb(null, key)
})
Expand Down Expand Up @@ -374,7 +399,7 @@ module.exports = (self) => {
)
}
if (type === types.indirect || type === types.all) {
getIndirectKeys((err, indirects) => {
getIndirectKeys(options, (err, indirects) => {
if (err) { return callback(err) }
pins = pins
// if something is pinned both directly and indirectly,
Expand Down