Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Repo storage migration #443

Merged
merged 7 commits into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion packages/common/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export const asyncFilter = async <T>(
export const isErrnoException = (
err: unknown,
): err is NodeJS.ErrnoException => {
return !!err && 'code' in err
return !!err && err['code']
}

export const errHasMsg = (err: unknown, msg: string): boolean => {
Expand All @@ -84,3 +84,11 @@ export const chunkArray = <T>(arr: T[], chunkSize: number): T[][] => {
return acc
}, [] as T[][])
}

export const range = (num: number): number[] => {
const nums: number[] = []
for (let i = 0; i < num; i++) {
nums.push(i)
}
return nums
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import { chunkArray } from '@atproto/common'
import { MemoryBlockstore } from '@atproto/repo'
import { Kysely } from 'kysely'
import { CID } from 'multiformats/cid'
import DatabaseSchema from '../database-schema'
import { RepoCommitBlock } from '../tables/repo-commit-block'
import { RepoCommitHistory } from '../tables/repo-commit-history'

const commitBlockTable = 'repo_commit_block'
const commitHistoryTable = 'repo_commit_history'

export async function up(db: Kysely<unknown>): Promise<void> {
export async function up(db: DatabaseSchema): Promise<void> {
await db.schema
.createTable(commitBlockTable)
.addColumn('commit', 'varchar', (col) => col.notNull())
Expand All @@ -16,6 +22,70 @@ export async function up(db: Kysely<unknown>): Promise<void> {
.addColumn('prev', 'varchar')
.addPrimaryKeyConstraint(`${commitHistoryTable}_pkey`, ['commit', 'prev'])
.execute()

const migrateUser = async (did: string, root: CID) => {
const userBlocks = await db
.selectFrom('ipld_block')
.innerJoin(
'ipld_block_creator as creator',
'creator.cid',
'ipld_block.cid',
)
.where('creator.did', '=', did)
.select(['ipld_block.cid as cid', 'ipld_block.content as content'])
.execute()
const storage = new MemoryBlockstore()
userBlocks.forEach((row) => {
storage.putBlock(CID.parse(row.cid), row.content)
})

const commitData = await storage.getCommits(root, null)
if (!commitData) return

const commitBlock: RepoCommitBlock[] = []
const commitHistory: RepoCommitHistory[] = []

for (let i = 0; i < commitData.length; i++) {
const commit = commitData[i]
const prev = commitData[i - 1]
commit.blocks.forEach((_bytes, cid) => {
commitBlock.push({
commit: commit.root.toString(),
block: cid.toString(),
})
})
commitHistory.push({
commit: commit.root.toString(),
prev: prev ? prev.root.toString() : null,
})
}
const promises: Promise<unknown>[] = []
chunkArray(commitBlock, 500).forEach((batch) => {
promises.push(
db
.insertInto('repo_commit_block')
.values(batch)
.onConflict((oc) => oc.doNothing())
.execute(),
)
})
chunkArray(commitHistory, 500).forEach((batch) => {
promises.push(
db
.insertInto('repo_commit_history')
.values(batch)
.onConflict((oc) => oc.doNothing())
.execute(),
)
})
return Promise.all(promises)
}

const userRoots = await db.selectFrom('repo_root').selectAll().execute()

await Promise.all(
userRoots.map((row) => migrateUser(row.did, CID.parse(row.root))),
)
}

export async function down(db: Kysely<unknown>): Promise<void> {
Expand Down
34 changes: 33 additions & 1 deletion packages/pds/src/sql-repo-storage.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CommitData, RepoStorage } from '@atproto/repo'
import { CommitBlockData, CommitData, RepoStorage } from '@atproto/repo'
import BlockMap from '@atproto/repo/src/block-map'
import { chunkArray } from '@atproto/common'
import { CID } from 'multiformats/cid'
Expand Down Expand Up @@ -228,6 +228,38 @@ export class SqlRepoStorage extends RepoStorage {
return res.map((row) => CID.parse(row.commit)).reverse()
}

async getCommits(
latest: CID,
earliest: CID | null,
): Promise<CommitBlockData[] | null> {
const commitPath = await this.getCommitPath(latest, earliest)
if (!commitPath) return null
const commitStrs = commitPath.map((commit) => commit.toString())
if (commitStrs.length < 1) return []
const res = await this.db.db
.selectFrom('repo_commit_block')
.innerJoin('ipld_block', 'ipld_block.cid', 'repo_commit_block.block')
.select([
'repo_commit_block.commit',
'ipld_block.cid',
'ipld_block.content',
])
.where('commit', 'in', commitStrs)
.execute()
const sortedBlocks: { [commit: string]: BlockMap } = {}
res.forEach((row) => {
if (!sortedBlocks[row.commit]) {
sortedBlocks[row.commit] = new BlockMap()
}
sortedBlocks[row.commit].set(CID.parse(row.cid), row.content)
})
return commitPath.map((commit) => ({
root: commit,
prev: null,
blocks: sortedBlocks[commit.toString()] || new BlockMap(),
}))
}

async destroy(): Promise<void> {
throw new Error('Destruction of SQL repo storage not allowed at runtime')
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"repoData":{"com.example.posts":{"3jks3gib5622x":{"name":"6iNxhasFMmadjASg2FpfPH4NfQ44zTQQkSZnLoMH4gLdXuQ8tYsY1o74gHeaX2svzWKckFSDpRVANy3mAkVj1yoGBn23p3vLJjds"},"3jks3gib5632x":{"name":"mVA1o5LSgC9ePvuretNJo6uhA5yqMAJAxqXAmEobTxNmCTgkYWFGGFwQpY74ZLZeyUD57iJ7uXhibBwvPYVHQR13jn4YEdM7N7JS"},"3jks3gib5642x":{"name":"rzRdoEhdv1Q6aZDQEDK9rc6ZQjrKgEZoCYGc9oSq676zGgN2UprKQ52Qt3LrbFh2TntVPH967ecpYRK8PQypo8oHc6exwRUEzufp"},"3jks3gib65e2x":{"name":"aFqx5794xq8ab6Xfd3atpW498QtNDCiYRBLJZ9DTT8p57f3BLCu1Nh2ZSm5mTjumWu7Fhi3EzfdocR8n8BpQoHbwESAyE7aXvpEk"},"3jks3gib65f2x":{"name":"YPkwuUT1q8JRokdm2fNGe8DVrcUVD1HvQgzETBeHbnGGdU8hFASSNzsjk7jV7oXtEi788q1QvChdVHzc9PZXxbhQfqLeiWWXL4GW"},"3jks3gib65g2x":{"name":"Kq2ZiiLH14BHPngQ6x19CZdicvqnPqjtnwCy6S34sNDKLLCYvr3QteuW8QJDW9XnRYJWJ5dCXm5SFD4s7bvMEbJrXcdBGtejBX3E"},"3jks3gib65h2x":{"name":"vahSKmUoajxbKttaA2nx8Aeuoh9SSMvG79joH6dkeEX6Aafm7ZnvCv1iqt1xK5Y9X1YBjSBJfN3JrT8vyDHAgNVTpqF3863B112y"},"3jks3gib65i2x":{"name":"nNBv2oFmpMmmLNoaCDh1ERTzgrR7VGvbGNV1EnaHjup52f75BCxWnd7KnxueqyZPep1MdPCAD6jpDWrHFYBY4QAS6xKhH1wdU1FQ"},"3jks3gib65j2x":{"name":"iotWfn9vsQ682fuz1z2zg1tuujmkNnnkSNjtoFqK5ydfhDcgypP2URqSwhM4GLc3bqmhUvb4XsRrueB1EWuDcvQ8aVzS1PnnuRrD"},"3jks3gib65k2x":{"name":"Fwoy8DgbJNaWxo2F4N7CXodTNxNVP7M57JWp13KRUVbMWobDomWGpYs3UL8ik8u6nKjbLLcWUiab2ccSQPMzftiK5KxEuumKcNZc"},"3jks3gibb3m2x":{"name":"BGLARtQNMayNVb61St3oVZbUxwKmDe2XouYhR7ujgJ6FxB3WJ7Qbjv5LdLMPnKdTnZKd5fWZVhNDRvCEtdUF9cxdpsiCxT8zqKCn"},"3jks3gibb3n2x":{"name":"bUs9qNwp5hpF2GYKvcbfCaGv1Pyszwty5rBkBTFvaLhdFS2f1kw4R5p1DaawBbiDoD9hPSaagaJoWyHwjqgHY53E2dURGKLru4R6"},"3jks3gibb3o2x":{"name":"oumukY1UVkd3Twoc1G4Nv7xkFJF3kEHhzm7Pt9CjJJGEoLD4kUCEFC3C7haiHUmaASkycpoHxQYpiNRm4mv2JoHFhEPqVaCPK9n7"},"3jks3gibb3p2x":{"name":"yF78rrq3RDf5PizXVPxFEc97HNv5jBnPF7RqVsSKXBc9K7M6M8KfM1tSSeCivq6yE5kaMe71gxrK86H8FCJEJevXxBn1QmUtMSKX"},"3jks3gibb3q2x":{"name":"AB2J4KsQq8FTqVsXzbejX1ZXJEyedH6GuSH59uFr7C8wP1TqVqRzPNW65Rpd5WtWoHUhEDaUccJRgWKJbDqb6uvBDrpe2BSRHsb8"}},"com.example.likes":{"3jks3gib65l2x":{"name":"99rYFFmorU83gn1UgmyTLuGHfTmgBbGj1hqAoSwMTCDb5Yyk7pUZin8Wc7H7HuRzWTsrziYHKuYWrmGatmcdSpnhB96BovxWfQ9B"},"3jks3gib65m2x":{"name":"e5GfS1ZgenSKWP34KrCurykMigqJvooEAqgzXnppA74QprfuEdrbtwZMAijhTCifMzGXWRYRD6smKA6kVU5tZk8ZVjZJ1N7sc1MX"},"3jks3gib65n2x":{"name":"tnzAvrhCJddmsym2dzUcc4cdpLMVqrHx6vYjLj9Yj2g8RXFVumgZctYy7yx395zthTRdZrzFijDnuRTJe9jGfvemVc77NaWazcJ3"},"3jks3gib65o2x":{"name":"dKHLaYHKu5H3PnFRtuQvEotXqeY7He86AtP6E1rdBkYxVEJ2M4o5gHJuPk6azwP3MM27wnmMMhMJavL4dUCHBjmdJ7UtSeb6Azu2"},"3jks3gib65p2x":{"name":"wM1pv9iUnXEdpsRBKGHX4DMsS4PSun55fYEKEDxeYUU8gscVuigLiEwwHqW5dsETuFQUayuBQYrh3m6FSzDokcvG8PFmtN7o5uky"},"3jks3gib65q2x":{"name":"nVzbGDBiPF87uD7oUoFMrkJm7d1y64m9ANZ299NtUHLcT5wAnYjp5NFsmjCkFHkCo4UHwjhVinwKfCecmayz4j4hhTeDxc9zdRq7"},"3jks3gib65r2x":{"name":"xwcJa1jcTDqcKfTZawpGeBFW4f9zqFGG4oVEfezAQ2FB8jEPk8gd5cA8QHUA5adg1hbgeHBhXiM1o1V5uBYuoCccRJjzi9yYd8xy"},"3jks3gib65s2x":{"name":"qM4gBR58feygtT5Ghvud67JGoZaakWNmMovfaQySEVYuHxkZe5DQsmqsMa5yjSZ7UXbGj5yDaz4P5yRnZ3U2RR4xJ4381BerAqrx"},"3jks3gib65t2x":{"name":"D3VQ6wDGY5WsBeQv8aoaiiGFmXXbZ2QEnNjF4BVww6u7qb6ZPk2QnS7EjSSBzdZs7siE1CcKABaQNKuzGUV5vV7JNC9RubJFw2p8"},"3jks3gib65u2x":{"name":"BEXjK5hdsP9HSjELCrhU8NFNXtfJy52aMZutY2WEanAFS4eePZ2WPCCtEDNCt9pRq2Ah3YDyU2zs8D5wXigKnZUKNTRQX8tF3mub"},"3jks3gibc2y2x":{"name":"cZoEJ1QsEP9UCg5vAkrEhnjYm4FSKcgqk9d4Vo9ZE9hx5gVNSH9M6CFdW8TQsFG7VCjm2Jr38DnNQtn3SRWFkeaZAWV9Qm831eJ4"},"3jks3gibc2z2x":{"name":"BCNX7hEYb1W2XPhGnFuodR5Rdv7kHnwBBvnftaCQVUcyd6GBThCLb9tKaz8GRyCBJTy4emwu6Qu8LMaYdhDdRVTNQXfpeg8KUq9m"},"3jks3gibc322x":{"name":"EJQRfPSkbw2xzJwznzYMeCy4QSMki2jMG8anvrFriRjRHbFafyR6JuxkHWr8hTXcHroJZRzV3imqKcw6g97RdakLMwvmTzLHQBBh"},"3jks3gibc332x":{"name":"63xdcVPSvzZpzFHxoRacdFxq2EEKjX4EkHsLS1mtxkBhjdJhXhtjJsgQRhBXb1J1AVgHKSwrjHfqBaejLiEeCfbwUKJu7RmN3MRQ"},"3jks3gibc342x":{"name":"C5pUg3XXKhKdCmTfHmX74K3REx1bxBM8DBe9JtaPwH2bLY3FAn8RyTB432MXVEn7n3xtYssN3Nacyk2KisJrAXBiZm22CDGvsvjw"}}},"commitPath":["bafyreifuzhgzo6udzsgctl4wvp7ixninvpwnjvimiy7udrmggnboz2ozky","bafyreibhji7ousajscnaktajwje7wbatwky3lnmgknbih772azeksb6e2a","bafyreifmjipk3n3zfrnw25iz7ryd2d2fuah45vvqe3as7kalgggtsh74eq"]}
73 changes: 73 additions & 0 deletions packages/pds/tests/migrations/repo-sync-data/migration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { MemoryBlockstore, Repo } from '@atproto/repo'
import BlockMap from '@atproto/repo/src/block-map'
import fs from 'fs/promises'
import { CID } from 'multiformats/cid'
import { Database } from '../../../src'
import SqlRepoStorage from '../../../src/sql-repo-storage'

type TestData = {
repoData: Record<string, Record<string, unknown>>
commitPath: string[]
}

describe('repo sync data migration', () => {
let db: Database
const did = 'did:key:zDnaegoVtoy9Na3arc4QAu7Su2ZCaNQVsZNqqnCR8fjVgBrFQ'
let data: TestData

beforeAll(async () => {
await fs.copyFile(
'tests/migrations/repo-sync-data/pre-migration.sqlite',
'tests/migrations/repo-sync-data/test.sqlite',
)
db = Database.sqlite('tests/migrations/repo-sync-data/test.sqlite')
await db.migrator.migrateTo('_20221221T013010374Z')
const readData = await fs.readFile(
'tests/migrations/repo-sync-data/expected-data.json',
)
if (!readData) {
throw new Error('could not read test data')
}
data = JSON.parse(readData.toString())
})

afterAll(async () => {
await db.close()
await fs.rm('tests/migrations/repo-sync-data/test.sqlite')
})

it('migrated correctly', async () => {
const storage = new SqlRepoStorage(db, did)
const root = await storage.getHead()
if (!root) {
throw new Error('could not get repo root')
}
const commitLog = await storage.getCommitPath(root, null)
if (!commitLog) {
throw new Error('could not get commit log')
}
const commitLogStr = commitLog.map((cid) => cid.toString())
expect(commitLogStr).toEqual(data.commitPath)

const res = await db.db
.selectFrom('repo_commit_block')
.innerJoin('ipld_block', 'ipld_block.cid', 'repo_commit_block.block')
.where('repo_commit_block.commit', 'in', commitLogStr)
.select(['ipld_block.cid as cid', 'ipld_block.content as content'])
.execute()
const blockmap = new BlockMap()
res.forEach((row) => {
blockmap.set(CID.parse(row.cid), row.content)
})
const blockstore = new MemoryBlockstore()
blockstore.putMany(blockmap)
const repo = await Repo.load(blockstore, root)
for (const collName of Object.keys(data.repoData)) {
const collData = data.repoData[collName]
for (const rkey of Object.keys(collData)) {
const record = await repo.getRecord(collName, rkey)
expect(record).toEqual(collData[rkey])
}
}
})
})
Binary file not shown.
59 changes: 58 additions & 1 deletion packages/pds/tests/sql-repo-storage.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { valueToIpldBlock } from '@atproto/common'
import { range, valueToIpldBlock } from '@atproto/common'
import BlockMap from '@atproto/repo/src/block-map'
import { Database } from '../src'
import SqlRepoStorage from '../src/sql-repo-storage'
import { CloseFn, runTestServer } from './_util'
Expand Down Expand Up @@ -54,4 +55,60 @@ describe('sql repo storage', () => {

expect(cidA.equals(cidB)).toBe(true)
})

it('applies commits', async () => {
const did = 'did:key:zQ3shtxV1FrJfhqE1dvxYRcCknWNjHc3c5X1y3ZSoPDi2aur3'
const blocks = await Promise.all(
range(10).map((num) => valueToIpldBlock({ my: `block-${num}` })),
)
const commits = await Promise.all(
range(2).map((num) => valueToIpldBlock({ my: `commit-${num}` })),
)
const blocks0 = new BlockMap()
blocks0.set(commits[0].cid, commits[0].bytes)
blocks.slice(0, 5).forEach((block) => {
blocks0.set(block.cid, block.bytes)
})
const blocks1 = new BlockMap()
blocks1.set(commits[1].cid, commits[1].bytes)
blocks.slice(5, 10).forEach((block) => {
blocks1.set(block.cid, block.bytes)
})
await db.transaction(async (dbTxn) => {
const storage = new SqlRepoStorage(dbTxn, did)
await storage.applyCommit({
root: commits[0].cid,
prev: null,
blocks: blocks0,
})
await storage.applyCommit({
root: commits[1].cid,
prev: commits[0].cid,
blocks: blocks1,
})
})

const storage = new SqlRepoStorage(db, did)
const head = await storage.getHead()
if (!head) {
throw new Error('could not get repo head')
}
expect(head.toString()).toEqual(commits[1].cid.toString())
const commitPath = await storage.getCommitPath(head, null)
if (!commitPath) {
throw new Error('could not get commit path')
}
expect(commitPath.length).toBe(2)
expect(commitPath[0].equals(commits[0].cid)).toBeTruthy()
expect(commitPath[1].equals(commits[1].cid)).toBeTruthy()
const commitData = await storage.getCommits(head, null)
if (!commitData) {
throw new Error('could not get commit data')
}
expect(commitData.length).toBe(2)
expect(commitData[0].root.equals(commits[0].cid)).toBeTruthy()
expect(commitData[0].blocks.equals(blocks0)).toBeTruthy()
expect(commitData[1].root.equals(commits[1].cid)).toBeTruthy()
expect(commitData[1].blocks.equals(blocks1)).toBeTruthy()
})
})
19 changes: 19 additions & 0 deletions packages/repo/src/block-map.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { valueToIpldBlock } from '@atproto/common'
import { CID } from 'multiformats/cid'
import * as uint8arrays from 'uint8arrays'

export class BlockMap {
private map: Map<string, Uint8Array> = new Map()
Expand Down Expand Up @@ -43,6 +44,24 @@ export class BlockMap {
this.set(cid, bytes)
})
}

get size(): number {
return this.map.size
}

equals(other: BlockMap): boolean {
if (this.size !== other.size) {
return false
}
for (const entry of this.entries()) {
const otherBytes = other.get(entry.cid)
if (!otherBytes) return false
if (!uint8arrays.equals(entry.bytes, otherBytes)) {
return false
}
Comment on lines +59 to +61
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first glance it struck me that this check wasn't needed. But I think I follow— is the idea that the block-map itself doesn't enforce how bytes map to cid, so its usage allows different bytes to be behind the same cid in two different block-maps (therefore we need to check the bytes in equals())?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that was my thinking.

Another line of thought is that we prevent this from happening in the first place (with an integrity check on set). But I kind've like BlockMap being a relatively dumb class. Almost didn't want to put add on it (which serializes some data & calculates a CID), but it made all the consuming code so much simpler that I did.

Relatedly, I've actually realized that we don't enforce integrity checks correctly on our blockstore (I guess I'd assumed that the car reader does this, but it doesn't). So have been putting some thinking today into ensuring that blockstores have correct CID mappings

}
return true
}
}

type Entry = {
Expand Down
17 changes: 17 additions & 0 deletions packages/repo/src/mst/mst.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { BlockWriter } from '@ipld/car/api'
import * as util from './util'
import MstWalker from './walker'
import BlockMap from '../block-map'
import CidSet from '../cid-set'

/**
* This is an implementation of a Merkle Search Tree (MST)
Expand Down Expand Up @@ -731,6 +732,22 @@ export class MST implements DataStore {
return nodes
}

// Walks tree & returns all cids
async allCids(): Promise<CidSet> {
const cids = new CidSet()
const entries = await this.getEntries()
for (const entry of entries) {
if (entry.isLeaf()) {
cids.add(entry.value)
} else {
const subtreeCids = await entry.allCids()
cids.addSet(subtreeCids)
}
}
cids.add(await this.getPointer())
return cids
}

// Walks tree & returns all leaves
async leaves() {
const leaves: Leaf[] = []
Expand Down
Loading