Skip to content
This repository has been archived by the owner on Oct 1, 2021. It is now read-only.

Commit

Permalink
feat: report migration progress (#33)
Browse files Browse the repository at this point in the history
Changes the `onProgress` option to receive feedback on how far each
migration has progressed with a percent complete indicator and a
message to show the user.

If an `onProgress` option is passed, migrations will get a bit slower
as we need to calculate the total volume of work before starting a
migration in order to work out the percent complete.

Also removes datastores from runtime dependencies as these should only
ever be passed in as config.

Fixes #32

BREAKING CHANGES:

- The signature of the `onProgress` callback has changed
  • Loading branch information
achingbrain authored Aug 15, 2020
1 parent 1ba2a18 commit 051c0a4
Show file tree
Hide file tree
Showing 22 changed files with 269 additions and 355 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ This package is inspired by the [go-ipfs repo migration tool](https://github.com
- [Usage](#usage)
- [API](#api)
- [`.migrate(path, repoOptions, toVersion, {ignoreLock, onProgress, isDryRun}) -> Promise<void>`](#migratepath-repooptions-toversion-ignorelock-onprogress-isdryrun---promisevoid)
- [`onProgress(migration, counter, totalMigrations)`](#onprogressmigration-counter-totalmigrations)
- [`onProgress(versionFrom, versionTo, percent, message)`](#onprogressversionfrom-versionto-percent-message)
- [`.revert(path, repoOptions, toVersion, {ignoreLock, onProgress, isDryRun}) -> Promise<void>`](#revertpath-repooptions-toversion-ignorelock-onprogress-isdryrun---promisevoid)
- [`getLatestMigrationVersion() -> int`](#getlatestmigrationversion---int)
- [Creating a new migration](#creating-a-new-migration)
Expand Down Expand Up @@ -121,7 +121,7 @@ Executes a forward migration to a specific version, or to the latest version if
* `options.onProgress` (function, optional) - callback that is called after finishing execution of each migration to report progress.
* `options.isDryRun` (bool, optional) - flag that indicates if it is a dry run that should give the same output as running a migration but without making any actual changes.

#### `onProgress(migration, counter, totalMigrations)`
#### `onProgress(versionFrom, versionTo, percent, message)`

Signature of the progress callback.

Expand Down
58 changes: 23 additions & 35 deletions migrations/migration-8/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,11 @@

const CID = require('cids')
const Key = require('interface-datastore').Key
const core = require('datastore-core')
const ShardingStore = core.ShardingDatastore
const mb = require('multibase')
const utils = require('../../src/utils')
const log = require('debug')('ipfs-repo-migrations:migration-8')
const uint8ArrayToString = require('uint8arrays/to-string')

// This function in js-ipfs-repo defaults to not using sharding
// but the default value of the options.sharding is true hence this
// function defaults to use sharding.
async function maybeWithSharding (filestore, options) {
if (options.sharding === false) {
return filestore
}

const shard = new core.shard.NextToLast(2)

return ShardingStore.createOrOpen(filestore, shard)
}
const { createStore } = require('../../src/utils')
const length = require('it-length')

function keyToMultihash (key) {
const buf = mb.decode(`b${key.toString().slice(1)}`)
Expand All @@ -46,44 +32,46 @@ function keyToCid (key) {
return new Key(`/${uint8ArrayToString(multihash)}`.toUpperCase(), false)
}

async function process (repoPath, options, keyFunction){
const { StorageBackend, storageOptions } = utils.getDatastoreAndOptions(options, 'blocks')
async function process (repoPath, repoOptions, onProgress, keyFunction) {
const blockstore = await createStore(repoPath, 'blocks', repoOptions)
await blockstore.open()

let blockCount

const baseStore = new StorageBackend(`${repoPath}/blocks`, storageOptions)
await baseStore.open()
const store = await maybeWithSharding(baseStore, storageOptions)
await store.open()
if (onProgress) {
blockCount = await length(blockstore.query({ keysOnly: true }))
}

try {
let counter = 0

for await (const block of store.query({})) {
for await (const block of blockstore.query({})) {
const newKey = keyFunction(block.key)
counter += 1

// If the Key is base32 CIDv0 then there's nothing to do
if(newKey.toString() !== block.key.toString()) {
counter += 1
log(`Migrating Block from ${block.key} to ${newKey}`)
await blockstore.delete(block.key)
await blockstore.put(newKey, block.value)

log(`Migrating Block from ${block.key.toString()} to ${newKey.toString()}`)
await store.delete(block.key)
await store.put(newKey, block.value)
if (onProgress) {
onProgress((counter / blockCount) * 100, `Migrated Block from ${block.key} to ${newKey}`)
}
}
}

log(`Changed ${ counter } blocks`)
} finally {
await store.close()
await baseStore.close()
await blockstore.close()
}
}

module.exports = {
version: 8,
description: 'Transforms key names into base32 encoding and converts Block store to use bare multihashes encoded as base32',
migrate: (repoPath, options = {}) => {
return process(repoPath, options, keyToMultihash)
migrate: (repoPath, repoOptions, onProgress) => {
return process(repoPath, repoOptions, onProgress, keyToMultihash)
},
revert: (repoPath, options = {}) => {
return process(repoPath, options, keyToCid)
revert: (repoPath, repoOptions, onProgress) => {
return process(repoPath, repoOptions, onProgress, keyToCid)
}
}
54 changes: 41 additions & 13 deletions migrations/migration-9/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,21 @@ const cbor = require('cbor')
const multicodec = require('multicodec')
const multibase = require('multibase')
const pinset = require('./pin-set')
const { createStore, cidToKey, PIN_DS_KEY, PinTypes } = require('./utils')
const { createStore } = require('../../src/utils')
const { cidToKey, PIN_DS_KEY, PinTypes } = require('./utils')
const length = require('it-length')

async function pinsToDatastore (blockstore, datastore, pinstore) {
async function pinsToDatastore (blockstore, datastore, pinstore, onProgress) {
const mh = await datastore.get(PIN_DS_KEY)
const cid = new CID(mh)

const pinRootBuf = await blockstore.get(cidToKey(cid))
const pinRoot = dagpb.util.deserialize(pinRootBuf)

const pinCount = (await length(pinset.loadSet(blockstore, pinRoot, PinTypes.recursive))) + (await length(pinset.loadSet(blockstore, pinRoot, PinTypes.direct)))
let counter = 0

for await (const cid of pinset.loadSet(blockstore, pinRoot, PinTypes.recursive)) {
counter++
const pin = {
depth: Infinity
}
Expand All @@ -29,9 +34,12 @@ async function pinsToDatastore (blockstore, datastore, pinstore) {
}

await pinstore.put(cidToKey(cid), cbor.encode(pin))

onProgress((counter / pinCount) * 100, `Migrated recursive pin ${cid}`)
}

for await (const cid of pinset.loadSet(blockstore, pinRoot, PinTypes.direct)) {
counter++
const pin = {
depth: 0
}
Expand All @@ -45,27 +53,47 @@ async function pinsToDatastore (blockstore, datastore, pinstore) {
}

await pinstore.put(cidToKey(cid), cbor.encode(pin))

onProgress((counter / pinCount) * 100, `Migrated direct pin ${cid}`)
}

await blockstore.delete(cidToKey(cid))
await datastore.delete(PIN_DS_KEY)
}

async function pinsToDAG (blockstore, datastore, pinstore) {
async function pinsToDAG (blockstore, datastore, pinstore, onProgress) {
let recursivePins = []
let directPins = []

let pinCount

if (onProgress) {
pinCount = await length(pinstore.query({ keysOnly: true }))
}

let counter = 0

for await (const { key, value } of pinstore.query({})) {
counter++
const pin = cbor.decode(value)
const cid = new CID(pin.version || 0, pin.codec && multicodec.getName(pin.codec) || 'dag-pb', multibase.decode('b' + key.toString().split('/').pop()))

if (pin.depth === 0) {
if (onProgress) {
onProgress((counter / pinCount) * 100, `Reverted direct pin ${cid}`)
}

directPins.push(cid)
} else {
if (onProgress) {
onProgress((counter / pinCount) * 100, `Reverted recursive pin ${cid}`)
}

recursivePins.push(cid)
}
}

onProgress(100, 'Updating pin root')
const pinRoot = new dagpb.DAGNode(new Uint8Array(), [
await pinset.storeSet(blockstore, PinTypes.recursive, recursivePins),
await pinset.storeSet(blockstore, PinTypes.direct, directPins)
Expand All @@ -79,17 +107,17 @@ async function pinsToDAG (blockstore, datastore, pinstore) {
await datastore.put(PIN_DS_KEY, cid.multihash)
}

async function process (repoPath, options, fn) {
const blockstore = await createStore(repoPath, 'blocks', options)
const datastore = await createStore(repoPath, 'datastore', options)
const pinstore = await createStore(repoPath, 'pins', options)
async function process (repoPath, repoOptions, onProgress, fn) {
const blockstore = await createStore(repoPath, 'blocks', repoOptions)
const datastore = await createStore(repoPath, 'datastore', repoOptions)
const pinstore = await createStore(repoPath, 'pins', repoOptions)

await blockstore.open()
await datastore.open()
await pinstore.open()

try {
await fn(blockstore, datastore, pinstore)
await fn(blockstore, datastore, pinstore, onProgress)
} finally {
await pinstore.close()
await datastore.close()
Expand All @@ -100,10 +128,10 @@ async function process (repoPath, options, fn) {
module.exports = {
version: 9,
description: 'Migrates pins to datastore',
migrate: (repoPath, options = {}) => {
return process(repoPath, options, pinsToDatastore)
migrate: (repoPath, repoOptions, onProgress) => {
return process(repoPath, repoOptions, onProgress, pinsToDatastore)
},
revert: (repoPath, options = {}) => {
return process(repoPath, options, pinsToDAG)
revert: (repoPath, repoOptions, onProgress) => {
return process(repoPath, repoOptions, onProgress, pinsToDAG)
}
}
26 changes: 0 additions & 26 deletions migrations/migration-9/utils.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
'use strict'

const core = require('datastore-core')
const ShardingStore = core.ShardingDatastore
const utils = require('../../src/utils')
const multibase = require('multibase')
const { Key } = require('interface-datastore')
const multihashes = require('multihashing-async').multihash
Expand All @@ -21,34 +18,11 @@ function cidToKey (cid) {
return new Key(`/${multibase.encoding('base32upper').encode(cid.multihash)}`)
}

// This function in js-ipfs-repo defaults to not using sharding
// but the default value of the options.sharding is true hence this
// function defaults to use sharding.
async function maybeWithSharding (filestore, options) {
if (options.sharding === false) {
return filestore
}

const shard = new core.shard.NextToLast(2)

return ShardingStore.createOrOpen(filestore, shard)
}

const createStore = async (location, name, options) => {
const { StorageBackend, storageOptions } = utils.getDatastoreAndOptions(options, name)

let store = new StorageBackend(`${location}/${name}`, storageOptions)
store = maybeWithSharding(store, storageOptions)

return store
}

module.exports = {
PIN_DS_KEY,
DEFAULT_FANOUT,
MAX_ITEMS,
EMPTY_KEY,
PinTypes,
createStore,
cidToKey
}
10 changes: 4 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,11 @@
"cbor": "^5.0.2",
"cids": "^1.0.0",
"datastore-core": "^2.0.0",
"datastore-fs": "^2.0.0",
"datastore-level": "^2.0.0",
"debug": "^4.1.0",
"fnv1a": "^1.0.1",
"interface-datastore": "^2.0.0",
"ipld-dag-pb": "^0.20.0",
"it-length": "0.0.2",
"multibase": "^3.0.0",
"multicodec": "^2.0.0",
"multihashing-async": "^2.0.0",
Expand All @@ -59,11 +58,10 @@
"varint": "^5.0.0"
},
"devDependencies": {
"aegir": "^25.0.0",
"chai": "^4.2.0",
"chai-as-promised": "^7.1.1",
"aegir": "^26.0.0",
"datastore-car": "^1.2.0",
"dirty-chai": "^2.0.1",
"datastore-fs": "^2.0.1",
"datastore-level": "^2.0.0",
"it-all": "^1.0.2",
"just-safe-set": "^2.1.0",
"ncp": "^2.0.0",
Expand Down
Loading

0 comments on commit 051c0a4

Please sign in to comment.