From 730f67fd9b9ed337b915f98503ab2cb6bcbaaf8d Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Wed, 8 May 2019 17:31:43 +0800 Subject: [PATCH] feat: GC --- src/cli/commands/repo/gc.js | 31 ++++++-- src/core/components/gc.js | 150 ++++++++++++++++++++++++++++++++++++ src/core/components/repo.js | 7 +- 3 files changed, 177 insertions(+), 11 deletions(-) create mode 100644 src/core/components/gc.js diff --git a/src/cli/commands/repo/gc.js b/src/cli/commands/repo/gc.js index ec3b547e93..0f7bfff29a 100644 --- a/src/cli/commands/repo/gc.js +++ b/src/cli/commands/repo/gc.js @@ -1,16 +1,37 @@ 'use strict' +const { print } = require('../../utils') + module.exports = { command: 'gc', describe: 'Perform a garbage collection sweep on the repo.', - builder: {}, + builder: { + quiet: { + alias: 'q', + desc: 'Write minimal output', + type: 'boolean', + default: false + }, + 'stream-errors': { + desc: 'Output individual errors thrown when deleting blocks.', + type: 'boolean', + default: false + } + }, - handler (argv) { - argv.resolve((async () => { - const ipfs = await argv.getIpfs() - await ipfs.repo.gc() + handler ({ getIpfs, quiet, streamErrors, resolve }) { + resolve((async () => { + const ipfs = await getIpfs() + const res = await ipfs.repo.gc() + for (const r of res) { + if (res.err) { + streamErrors && print(res.err, true, true) + } else { + print((quiet ? '' : 'Removed ') + r.cid) + } + } })()) } } diff --git a/src/core/components/gc.js b/src/core/components/gc.js new file mode 100644 index 0000000000..5f93530c04 --- /dev/null +++ b/src/core/components/gc.js @@ -0,0 +1,150 @@ +'use strict' + +const promisify = require('promisify-es6') +const CID = require('cids') +const base32 = require('base32.js') +const parallel = require('async/parallel') +const map = require('async/map') + +module.exports = function gc (self) { + return promisify(async (opts, callback) => { + if (typeof opts === 'function') { + callback = opts + opts = {} + } + + const start = Date.now() + self.log(`GC: Creating set of marked blocks`) + + parallel([ + // Get all blocks from the blockstore + (cb) => self._repo.blocks.query({ keysOnly: true }, cb), + // Mark all blocks that are being used + (cb) => createColoredSet(self, cb) + ], (err, [blocks, coloredSet]) => { + if (err) { + self.log(`GC: Error - ${err.message}`) + return callback(err) + } + + // Delete blocks that are not being used + deleteUnmarkedBlocks(self, coloredSet, blocks, start, (err, res) => { + err && self.log(`GC: Error - ${err.message}`) + callback(err, res) + }) + }) + }) +} + +// TODO: make global constants +const { Key } = require('interface-datastore') +const pinDataStoreKey = new Key('/local/pins') +const MFS_ROOT_KEY = new Key('/local/filesroot') + +function createColoredSet (ipfs, callback) { + parallel([ + // "Empty block" used by the pinner + (cb) => cb(null, ['QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n']), + + // All pins, direct and indirect + (cb) => ipfs.pin.ls((err, pins) => { + if (err) { + return cb(new Error(`Could not list pinned blocks: ${err.message}`)) + } + ipfs.log(`GC: Found ${pins.length} pinned blocks`) + cb(null, pins.map(p => p.hash)) + }), + + // Blocks used internally by the pinner + (cb) => ipfs._repo.datastore.get(pinDataStoreKey, (err, mh) => { + if (err) { + if (err.code === 'ERR_NOT_FOUND') { + ipfs.log(`GC: No pinned blocks`) + return cb(null, []) + } + return cb(new Error(`Could not get pin sets root from datastore: ${err.message}`)) + } + + const cid = new CID(mh) + ipfs.dag.get(cid, '', { preload: false }, (err, obj) => { + // TODO: Handle not found? + if (err) { + return cb(new Error(`Could not get pin sets from store: ${err.message}`)) + } + + // The pinner stores an object that has two links to pin sets: + // 1. The directly pinned CIDs + // 2. The recursively pinned CIDs + cb(null, [cid.toString(), ...obj.value.links.map(l => l.cid.toString())]) + }) + }), + + // The MFS root and all its descendants + (cb) => ipfs._repo.datastore.get(MFS_ROOT_KEY, (err, mh) => { + if (err) { + if (err.code === 'ERR_NOT_FOUND') { + ipfs.log(`GC: No blocks in MFS`) + return cb(null, []) + } + return cb(new Error(`Could not get MFS root from datastore: ${err.message}`)) + } + + getDescendants(ipfs, new CID(mh), cb) + }) + ], (err, res) => callback(err, !err && new Set(res.flat()))) +} + +function getDescendants (ipfs, cid, callback) { + // TODO: Make sure we don't go out to the network + ipfs.refs(cid, { recursive: true }, (err, refs) => { + if (err) { + return callback(new Error(`Could not get MFS root descendants from store: ${err.message}`)) + } + ipfs.log(`GC: Found ${refs.length} MFS blocks`) + callback(null, [cid.toString(), ...refs.map(r => r.ref)]) + }) +} + +function deleteUnmarkedBlocks (ipfs, coloredSet, blocks, start, callback) { + // Iterate through all blocks and find those that are not in the marked set + // The blocks variable has the form { { key: Key() }, { key: Key() }, ... } + const unreferenced = [] + const res = [] + for (const { key: k } of blocks) { + try { + const cid = dsKeyToCid(k) + if (!coloredSet.has(cid.toString())) { + unreferenced.push(cid) + } + } catch (err) { + res.push({ err: new Error(`Could not convert block with key '${k}' to CID: ${err.message}`) }) + } + } + + const msg = `GC: Marked set has ${coloredSet.size} blocks. Blockstore has ${blocks.length} blocks. ` + + `Deleting ${unreferenced.length} blocks.` + ipfs.log(msg) + + // TODO: limit concurrency + map(unreferenced, (cid, cb) => { + // Delete blocks from blockstore + ipfs._repo.blocks.delete(cid, (err) => { + const res = { + cid: cid.toString(), + err: err && new Error(`Could not delete block with CID ${cid}: ${err.message}`) + } + cb(null, res) + }) + }, (_, delRes) => { + ipfs.log(`GC: Complete (${Date.now() - start}ms)`) + + callback(null, res.concat(delRes)) + }) +} + +function dsKeyToCid (key) { + // Block key is of the form / + const decoder = new base32.Decoder() + const buff = decoder.write(key.toString().slice(1)).finalize() + return new CID(buff) +} diff --git a/src/core/components/repo.js b/src/core/components/repo.js index 23116d8cf5..d88ad887a3 100644 --- a/src/core/components/repo.js +++ b/src/core/components/repo.js @@ -39,12 +39,7 @@ module.exports = function repo (self) { }), gc: promisify((options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } - - callback(new Error('Not implemented')) + require('./gc')(self)(options, callback) }), stat: promisify((options, callback) => {