Skip to content
This repository has been archived by the owner on Oct 1, 2021. It is now read-only.

Commit

Permalink
feat: add migration 9 to migrate pins to the datastore and back (#15)
Browse files Browse the repository at this point in the history
Stores pins in the datastore for greatly increased speed of access.

Borrows the pin-set code from js-ipfs to perform the reverse migration.
  • Loading branch information
achingbrain authored Jul 21, 2020
1 parent ca69f8b commit 2b14578
Show file tree
Hide file tree
Showing 12 changed files with 650 additions and 49 deletions.
5 changes: 4 additions & 1 deletion .aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ module.exports = {
webpack: {
node: {
// this is needed until level stops using node buffers in browser code
Buffer: true
Buffer: true,

// needed by cbor, binary-parse-stream and nofilter
stream: true
}
}
}
22 changes: 15 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Migration tool for JS IPFS Repo
# Migration tool for JS IPFS Repo <!-- omit in toc -->

[![Travis CI](https://flat.badgen.net/travis/ipfs/js-ipfs-repo-migrations)](https://travis-ci.com/ipfs/js-ipfs-repo-migrations)
[![codecov](https://codecov.io/gh/ipfs/js-ipfs-repo-migrations/branch/master/graph/badge.svg)](https://codecov.io/gh/ipfs/js-ipfs-repo-migrations)
Expand All @@ -15,30 +15,36 @@
This package is inspired by the [go-ipfs repo migration tool](https://github.com/ipfs/fs-repo-migrations/)

## Lead Maintainer
## Lead Maintainer <!-- omit in toc -->

[Adam Uhlíř](https://github.com/auhau/)
[Alex Potsides](http://github.com/achingbrain)

## Table of Contents
## Table of Contents <!-- omit in toc -->

- [Background](#background)
- [Install](#install)
- [npm](#npm)
- [Use in Node.js](#use-in-nodejs)
- [Use in a browser with browserify, webpack or any other bundler](#use-in-a-browser-with-browserify-webpack-or-any-other-bundler)
- [Use in a browser Using a script tag](#use-in-a-browser-using-a-script-tag)
- [Usage](#usage)
- [API](#api)
- [`.migrate(path, toVersion, {ignoreLock, repoOptions, onProgress, isDryRun}) -> Promise<void>`](#migratepath-toversion-ignorelock-repooptions-onprogress-isdryrun---promisevoid)
- [`onProgress(migration, counter, totalMigrations)`](#onprogressmigration-counter-totalmigrations)
- [`.revert(path, toVersion, {ignoreLock, repoOptions, onProgress, isDryRun}) -> Promise<void>`](#revertpath-toversion-ignorelock-repooptions-onprogress-isdryrun---promisevoid)
- [`getLatestMigrationVersion() -> int`](#getlatestmigrationversion---int)
- [CLI](#cli)
- [Creating a new migration](#creating-a-new-migration)
- [Architecture of a migration](#architecture-of-a-migration)
- [`.migrate(repoPath, repoOptions)`](#migraterepopath-repooptions)
- [`.revert(repoPath, repoOptions)`](#revertrepopath-repooptions)
- [Browser vs. NodeJS environments](#browser-vs-nodejs-environments)
- [Guidelines](#guidelines)
- [Integration with js-ipfs](#integration-with-js-ipfs)
- [Tests](#tests)
- [Empty migrations](#empty-migrations)
- [Migrations matrix](#migrations-matrix)
- [Developer](#developer)
- [Module versioning notes](#module-versioning-notes)
- [Module versioning notes](#module-versioning-notes)
- [Contribute](#contribute)
- [License](#license)

Expand Down Expand Up @@ -266,7 +272,9 @@ This will create an empty migration with the next version.

| IPFS repo version | JS IPFS version |
| -----------------: |:----------------:|
| 7 | v0.0.0 - latest |
| 7 | v0.0.0 |
| 8 | v0.48.0 |
| 9 | v0.49.0 |

## Developer

Expand Down
107 changes: 107 additions & 0 deletions migrations/migration-9/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
'use strict'

const CID = require('cids')
const dagpb = require('ipld-dag-pb')
const cbor = require('cbor')
const multicodec = require('multicodec')
const multibase = require('multibase')
const pinset = require('./pin-set')
const { createStore, cidToKey, PIN_DS_KEY, PinTypes } = require('./utils')

async function pinsToDatastore (blockstore, datastore, pinstore) {
const mh = await datastore.get(PIN_DS_KEY)
const cid = new CID(mh)

const pinRootBuf = await blockstore.get(cidToKey(cid))
const pinRoot = dagpb.util.deserialize(pinRootBuf)

for await (const cid of pinset.loadSet(blockstore, pinRoot, PinTypes.recursive)) {
const pin = {}

if (cid.version !== 0) {
pin.version = version
}

if (cid.codec !== 'dag-pb') {
pin.codec = multicodec.getNumber(cid.codec)
}

await pinstore.put(cidToKey(cid), cbor.encode(pin))
}

for await (const cid of pinset.loadSet(blockstore, pinRoot, PinTypes.direct)) {
const pin = {
depth: 0
}

if (cid.version !== 0) {
pin.version = version
}

if (cid.codec !== 'dag-pb') {
pin.codec = multicodec.getNumber(cid.codec)
}

await pinstore.put(cidToKey(cid), cbor.encode(pin))
}

await blockstore.delete(cidToKey(cid))
await datastore.delete(PIN_DS_KEY)
}

async function pinsToDAG (blockstore, datastore, pinstore) {
let recursivePins = []
let directPins = []

for await (const { key, value } of pinstore.query({})) {
const pin = cbor.decode(value)
const cid = new CID(pin.version || 0, pin.codec && multicodec.getName(pin.codec) || 'dag-pb', multibase.decode('b' + key.toString().split('/').pop()))

if (pin.depth === 0) {
directPins.push(cid)
} else {
recursivePins.push(cid)
}
}

const pinRoot = new dagpb.DAGNode(Buffer.alloc(0), [
await pinset.storeSet(blockstore, PinTypes.recursive, recursivePins),
await pinset.storeSet(blockstore, PinTypes.direct, directPins)
])
const buf = pinRoot.serialize()
const cid = await dagpb.util.cid(buf, {
cidVersion: 0,
hashAlg: multicodec.SHA2_256,
})
await blockstore.put(cidToKey(cid), buf)
await datastore.put(PIN_DS_KEY, cid.multihash)
}

async function process (repoPath, options, fn) {
const blockstore = await createStore(repoPath, 'blocks', options)
const datastore = await createStore(repoPath, 'datastore', options)
const pinstore = await createStore(repoPath, 'pins', options)

await blockstore.open()
await datastore.open()
await pinstore.open()

try {
await fn(blockstore, datastore, pinstore)
} finally {
await pinstore.close()
await datastore.close()
await blockstore.close()
}
}

module.exports = {
version: 9,
description: 'Migrates pins to datastore',
migrate: (repoPath, options = {}) => {
return process(repoPath, options, pinsToDatastore)
},
revert: (repoPath, options = {}) => {
return process(repoPath, options, pinsToDAG)
}
}
205 changes: 205 additions & 0 deletions migrations/migration-9/pin-set.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
'use strict'

const CID = require('cids')
const protobuf = require('protons')
const fnv1a = require('fnv1a')
const varint = require('varint')
const dagpb = require('ipld-dag-pb')
const { DAGNode, DAGLink } = dagpb
const multicodec = require('multicodec')
const pbSchema = require('./pin.proto')
const { Buffer } = require('buffer')
const { cidToKey, DEFAULT_FANOUT, MAX_ITEMS, EMPTY_KEY } = require('./utils')

const pb = protobuf(pbSchema)

function toB58String (hash) {
return new CID(hash).toBaseEncodedString()
}

function readHeader (rootNode) {
// rootNode.data should be a buffer of the format:
// < varint(headerLength) | header | itemData... >
const rootData = rootNode.Data
const hdrLength = varint.decode(rootData)
const vBytes = varint.decode.bytes

if (vBytes <= 0) {
throw new Error('Invalid Set header length')
}

if (vBytes + hdrLength > rootData.length) {
throw new Error('Impossibly large set header length')
}

const hdrSlice = rootData.slice(vBytes, hdrLength + vBytes)
const header = pb.Set.decode(hdrSlice)

if (header.version !== 1) {
throw new Error(`Unsupported Set version: ${header.version}`)
}

if (header.fanout > rootNode.Links.length) {
throw new Error('Impossibly large fanout')
}

return {
header: header,
data: rootData.slice(hdrLength + vBytes)
}
}

function hash (seed, key) {
const buf = Buffer.alloc(4)
buf.writeUInt32LE(seed, 0)
const data = Buffer.concat([
buf, Buffer.from(toB58String(key))
])
return fnv1a(data.toString('binary'))
}

async function * walkItems (blockstore, node) {
const pbh = readHeader(node)
let idx = 0

for (const link of node.Links) {
if (idx < pbh.header.fanout) {
// the first pbh.header.fanout links are fanout bins
// if a fanout bin is not 'empty', dig into and walk its DAGLinks
const linkHash = link.Hash

if (!EMPTY_KEY.equals(linkHash.buffer)) {
// walk the links of this fanout bin
const buf = await blockstore.get(cidToKey(linkHash))
const node = dagpb.util.deserialize(buf)

yield * walkItems(blockstore, node)
}
} else {
// otherwise, the link is a pin
yield link.Hash
}

idx++
}
}

async function * loadSet (blockstore, rootNode, name) {
const link = rootNode.Links.find(l => l.Name === name)

if (!link) {
throw new Error('No link found with name ' + name)
}

const buf = await blockstore.get(cidToKey(link.Hash))
const node = dagpb.util.deserialize(buf)

yield * walkItems(blockstore, node)
}

function storeItems (blockstore, items) {
return storePins(items, 0)

async function storePins (pins, depth) {
const pbHeader = pb.Set.encode({
version: 1,
fanout: DEFAULT_FANOUT,
seed: depth
})
const headerBuf = Buffer.concat([
Buffer.from(varint.encode(pbHeader.length)), pbHeader
])
const fanoutLinks = []

for (let i = 0; i < DEFAULT_FANOUT; i++) {
fanoutLinks.push(new DAGLink('', 1, EMPTY_KEY))
}

if (pins.length <= MAX_ITEMS) {
const nodes = pins
.map(item => {
return ({
link: new DAGLink('', 1, item.key),
data: item.data || Buffer.alloc(0)
})
})
// sorting makes any ordering of `pins` produce the same DAGNode
.sort((a, b) => Buffer.compare(a.link.Hash.buffer, b.link.Hash.buffer))

const rootLinks = fanoutLinks.concat(nodes.map(item => item.link))
const rootData = Buffer.concat(
[headerBuf].concat(nodes.map(item => item.data))
)

return new DAGNode(rootData, rootLinks)
} else {
// If the array of pins is > MAX_ITEMS, we:
// - distribute the pins among `DEFAULT_FANOUT` bins
// - create a DAGNode for each bin
// - add each pin as a DAGLink to that bin
// - create a root DAGNode
// - add each bin as a DAGLink
// - send that root DAGNode via callback
// (using go-ipfs' "wasteful but simple" approach for consistency)
// https://github.com/ipfs/go-ipfs/blob/master/pin/set.go#L57

const bins = pins.reduce((bins, pin) => {
const n = hash(depth, pin.key) % DEFAULT_FANOUT
bins[n] = n in bins ? bins[n].concat([pin]) : [pin]
return bins
}, [])

let idx = 0
for (const bin of bins) {
const child = await storePins(bin, depth + 1)

await storeChild(child, idx)

idx++
}

return new DAGNode(headerBuf, fanoutLinks)
}

async function storeChild (child, binIdx) {
const opts = {
version: 0,
format: multicodec.DAG_PB,
hashAlg: multicodec.SHA2_256,
preload: false
}

const buf = dagpb.util.serialize(child)
const cid = dagpb.util.cid(buf, {
cidVersion: 0,
hashAlg: multicodec.SHA2_256,
})
await blockstore.put(cidToKey(cid), buf)

fanoutLinks[binIdx] = new DAGLink('', child.size, cid)
}
}
}

async function storeSet (blockstore, type, cids) {
const rootNode = await storeItems(blockstore, cids.map(cid => {
return {
key: cid,
data: null
}
}))
const buf = rootNode.serialize(rootNode)
const cid = await dagpb.util.cid(buf, {
cidVersion: 0,
hashAlg: multicodec.SHA2_256
})

await blockstore.put(cidToKey(cid), buf)

return new DAGLink(type, rootNode.size, cid)
}

module.exports = {
loadSet,
storeSet
}
Loading

0 comments on commit 2b14578

Please sign in to comment.