Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move access-api delegation bytes out of d1 and into r2 #578

Merged
merged 27 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1e83d67
start delegations store
gobengo Mar 20, 2023
198e32d
test a bit
gobengo Mar 20, 2023
963a0d4
start delegations v3 table
gobengo Mar 21, 2023
e957d94
DbDelegationsStorageWithR2 reads/writes to r2
gobengo Mar 22, 2023
83c9c49
add comment explaining v2-only test
gobengo Mar 22, 2023
e65dd19
add todo
gobengo Mar 22, 2023
b491acb
todo
gobengo Mar 22, 2023
000c704
fix issues from conflicting wrangler, miniflare, @miniflare/r2 versio…
gobengo Mar 22, 2023
044f76b
access-api adds @miniflare/r2 as devDependency, fixing tsc
gobengo Mar 22, 2023
eeab7fd
rm unused files
gobengo Mar 22, 2023
a39b780
fix depcheck for access-api
gobengo Mar 22, 2023
bd42ee4
Merge branch 'main' into 571-delegation-bytes-out-d1
gobengo Mar 22, 2023
568d095
chore: loosen wrangler/miniflare semvers to range (#597)
gobengo Mar 22, 2023
0186d6a
models only depend on table types that they need
gobengo Mar 22, 2023
9fce623
chore: upgrade kysely-d1 and clean up db-related things due to d1 lib…
gobengo Mar 22, 2023
51e2835
feat: access-api env.models.delegations uses DbDelegationsStorageWith…
gobengo Mar 22, 2023
8acaa8d
fix: DbDelegationsStorageWithR2 saving to desired r2 key (#600)
gobengo Mar 22, 2023
ecbe77d
chore: rename ACCESS_API_R2 binding to DELEGATIONS, and configure wra…
gobengo Mar 23, 2023
c77d0cd
Update packages/access-api/src/models/delegations.js
gobengo Mar 23, 2023
37b05c1
Update packages/access-api/src/models/delegations.js
gobengo Mar 23, 2023
7f616d8
Update packages/access-api/test/delegations-storage.test.js
gobengo Mar 23, 2023
6fa5390
prettier
gobengo Mar 23, 2023
fe26c70
remove delegations_v2 schema and DbDelegationsStorage (keep newer DbD…
gobengo Mar 23, 2023
e379699
assert cid string kind in d1 cid column and r2 key
gobengo Mar 23, 2023
7d017e7
improve car parsing/validating in DbDelegationsStorageWithR2 rowToDel…
gobengo Mar 23, 2023
e2da517
env.DELEGATIONS -> env.DELEGATIONS_BUCKET
gobengo Mar 23, 2023
3261215
rename r2 buckets to w3up-delegations-{env}-{i}
gobengo Mar 23, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
"lint-staged": "^13.2.0",
"prettier": "2.8.3",
"typedoc-plugin-markdown": "^3.14.0",
"typescript": "4.9.5",
"wrangler": "^2.12.3"
"typescript": "4.9.5"
},
"prettier": {
"trailingComma": "es5",
Expand Down
19 changes: 19 additions & 0 deletions packages/access-api/migrations/0007_add_delegations_v3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- Migration number: 0007 2023-03-20T23:48:40.469Z

/*
goal: add a new table to store delegations in
which doesn't have a 'bytes' column.

context: we're going to start storing bytes outside of the database (e.g. in R2)
*/

CREATE TABLE
IF NOT EXISTS delegations_v3 (
cid TEXT NOT NULL PRIMARY KEY,
audience TEXT NOT NULL,
issuer TEXT NOT NULL,
expiration TEXT,
inserted_at TEXT NOT NULL DEFAULT (strftime ('%Y-%m-%dT%H:%M:%fZ', 'now')),
updated_at TEXT NOT NULL DEFAULT (strftime ('%Y-%m-%dT%H:%M:%fZ', 'now')),
UNIQUE (cid)
);
7 changes: 4 additions & 3 deletions packages/access-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@
"toucan-js": "^2.7.0"
},
"devDependencies": {
"@cloudflare/workers-types": "^3.18.0",
"@cloudflare/workers-types": "^3.19.0",
"@databases/split-sql-query": "^1.0.3",
"@databases/sql": "^3.2.0",
"@miniflare/r2": "^2.12.1",
"@sentry/cli": "2.7.0",
"@types/assert": "^1.5.6",
"@types/git-rev-sync": "^2.0.0",
Expand All @@ -55,14 +56,14 @@
"git-rev-sync": "^3.0.2",
"hd-scripts": "^4.0.0",
"is-subset": "^0.1.1",
"miniflare": "^2.11.0",
"miniflare": "^2.12.1",
"mocha": "^10.2.0",
"p-wait-for": "^5.0.0",
"process": "^0.11.10",
"readable-stream": "^4.2.0",
"sade": "^1.8.1",
"typescript": "4.9.5",
"wrangler": "^2.8.0"
"wrangler": "^2.13.0"
},
"eslintConfig": {
"extends": [
Expand Down
11 changes: 0 additions & 11 deletions packages/access-api/src/bindings.d.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
import type { Logging } from '@web3-storage/worker-utils/logging'
import type {
AccountTable,
DelegationTable,
SpaceTable,
} from '@web3-storage/access/types'
import type { Handler as _Handler } from '@web3-storage/worker-utils/router'
import { Spaces } from './models/spaces.js'
import { Validations } from './models/validations.js'
Expand Down Expand Up @@ -108,9 +103,3 @@ export interface ModuleWorker {
export interface D1ErrorRaw extends Error {
cause: Error & { code: string }
}

export interface D1Schema {
spaces: SpaceTable
accounts: AccountTable
delegations: DelegationTable
}
20 changes: 7 additions & 13 deletions packages/access-api/src/models/accounts.js
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you avoid changing this unless needed as it would create conflict with my PR also changing this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original file line number Diff line number Diff line change
@@ -1,35 +1,29 @@
// eslint-disable-next-line no-unused-vars
import * as Ucanto from '@ucanto/interface'
import { Kysely } from 'kysely'
import { D1Dialect } from 'kysely-d1'
import { GenericPlugin } from '../utils/d1.js'

/**
* @typedef {import('@web3-storage/access/src/types.js').DelegationRecord} DelegationRecord
*/

/**
* Accounts
*/
export class Accounts {
/**
*
* @param {D1Database} d1
*/
constructor(d1) {
/** @type {GenericPlugin<DelegationRecord>} */
const objectPlugin = new GenericPlugin({
// eslint-disable-next-line unicorn/no-null
expires_at: (v) => (typeof v === 'string' ? new Date(v) : null),
inserted_at: (v) => new Date(v),
updated_at: (v) => new Date(v),
})
this.d1 = /** @type {Kysely<import('../bindings').D1Schema>} */ (
new Kysely({
dialect: new D1Dialect({ database: d1 }),
plugins: [objectPlugin],
})
)
this.d1 =
/** @type {Kysely<{ accounts: import('@web3-storage/access/src/types.js').AccountTable }>} */ (
new Kysely({
dialect: new D1Dialect({ database: d1 }),
plugins: [objectPlugin],
})
)
}

/**
Expand Down
197 changes: 185 additions & 12 deletions packages/access-api/src/models/delegations.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,24 @@ import {
} from '@web3-storage/access/encoding'

/**
* @typedef {import('@web3-storage/access/src/types').DelegationTable} DelegationRow
* @typedef {import('../types/access-api-cf-db').R2Bucket} R2Bucket
*/

/**
* @template {import('../types/access-api-cf-db').DelegationsV2Table | import('../types/access-api-cf-db').DelegationsV3Table} DelegationRow
* @typedef {Omit<DelegationRow, 'inserted_at'|'updated_at'|'expires_at'>} DelegationRowUpdate
*/

/**
* @typedef Tables
* @property {DelegationRow} delegations_v2
* @typedef V2Tables
* @property {import('../types/access-api-cf-db').DelegationsV2Table} delegations_v2
*/

/**
* @typedef {import("../types/database").Database<Tables>} DelegationsDatabase
* @typedef {import("../types/database").Database<V2Tables>} DelegationsDatabase
*/

// @todo remove reference to v2
export const delegationsTable = /** @type {const} */ ('delegations_v2')

/**
Expand All @@ -43,11 +48,7 @@ export class DbDelegationsStorage {
}

async count() {
const { size } = await this.#db
.selectFrom(this.#tables.delegations)
.select((e) => e.fn.count('cid').as('size'))
.executeTakeFirstOrThrow()
return BigInt(size)
return count(this.#db, this.#tables.delegations)
}

/**
Expand Down Expand Up @@ -116,14 +117,18 @@ class UnexpectedDelegation extends Error {
}

/**
* @param {Pick<DelegationRow, 'bytes'>} row
* @param {Pick<import('../types/access-api-cf-db').DelegationsV2Table, 'bytes'>} row
* @returns {Ucanto.Delegation}
*/
function rowToDelegation(row) {
/** @type {Ucanto.Delegation[]} */
let delegations = []
// kysely/sqlite/d1/various-miniflare-versions sometimes only give an Array here.
// (GenericPlugin tries to ensure Uint8Array, but it can't reliably detect when to cast Array -> Uint8Array)
const rowBytes =
row.bytes instanceof Uint8Array ? row.bytes : new Uint8Array(row.bytes)
try {
delegations = bytesToDelegations(row.bytes)
delegations = bytesToDelegations(rowBytes)
} catch (error) {
if (
typeof error === 'object' &&
Expand Down Expand Up @@ -152,7 +157,7 @@ function rowToDelegation(row) {

/**
* @param {Ucanto.Delegation} d
* @returns {DelegationRowUpdate}
* @returns {DelegationRowUpdate<import('../types/access-api-cf-db').DelegationsV2Table>}
*/
export function createDelegationRowUpdate(d) {
return {
Expand All @@ -163,6 +168,17 @@ export function createDelegationRowUpdate(d) {
}
}

/**
* @param {Ucanto.Delegation} d
*/
export function createDelegationRowUpdateV3(d) {
return {
cid: d.cid.toV1().toString(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to cast it to v1 only DAG-PB nodes can be in V0 and delegations can be encoded in DAG-PB.
Looks like CIDs will retain original base encoding when parsed so if you want to normalize you should pass base32 as an argument. Or alternatively do something like CID.decode(d.cid.bytes).toString()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and delegations can be encoded in DAG-PB.

is this a typo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like CIDs will retain original base encoding when parsed so if you want to normalize you should pass base32 as an argument.

good catch. I thought it would normalize to base32 for all v1. I will do it explicitly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added tests asserting the cid kinds for cid column in d1 as well as the cid part of r2 keys e379699#diff-80f191e42400b9b26178ff477cf9edac5ac2e8106246b9be714368ed719525baR134

audience: d.audience.did(),
issuer: d.issuer.did(),
}
}

/**
* @param {Array<number> | Buffer | unknown} sqlValue - value from kysely 'bytes' table - in node it could be a Buffer. In cloudflare it might be an Array
* @returns {ArrayBuffer|undefined} - undefined if unable to convert
Expand All @@ -179,3 +195,160 @@ export function delegationsTableBytesToArrayBuffer(sqlValue) {
return Uint8Array.from(sqlValue)
}
}

/**
* @typedef {`delegations_v3`} DelegationsTableWithoutBytesName
*/

/** @type {DelegationsTableWithoutBytesName} */
export const delegationsV3Table = `delegations_v3`

/**
* @typedef {import('../types/access-api-cf-db').AccessApiD1TablesV2} AccessApiD1TablesV2
* @typedef {import('../types/access-api-cf-db').AccessApiD1TablesV3} AccessApiD1TablesV3
*/

export class DbDelegationsStorageWithR2 {
// @todo abstract away R2 specifics into DagStore: ~AsyncMap<CID, Ucanto.Delegation>
/** @type {R2Bucket} */
#dags
/** @type {import('../types/database').Database<AccessApiD1TablesV3>} */
#db
/** @type {keyof AccessApiD1TablesV3} */
#delegationsTableName = delegationsV3Table

/**
* @param {import('../types/database').Database<AccessApiD1TablesV3>} db
* @param {R2Bucket} dags
* @param {keyof AccessApiD1TablesV3} delegationsTableName
*/
// eslint-disable-next-line no-useless-constructor
constructor(db, dags, delegationsTableName = delegationsV3Table) {
this.#db = db
this.#delegationsTableName = delegationsTableName
this.#dags = dags
}

/**
* store items
*
* @param {Array<Ucanto.Delegation>} delegations
* @returns {Promise<void>}
*/
async putMany(...delegations) {
if (delegations.length === 0) {
return
}
await writeDelegations(this.#dags, delegations)
Copy link
Contributor Author

@gobengo gobengo Mar 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might succeed here, but then fail on the insert, in which case we will be storing more than is necessary in the R2Bucket, but I wrote this so we err on that side but at least avoid ever having delegation.cid in D1 but are unable to dereference CID -> bytes from r2.

If d1 had access to sql transactions, we might be able to use that here to do the write to r1 after trying the sql insert inside a transaction, but we don't so this is I think the best we can do without a much more involved 2pc
(but at that point IMHO we should be looking beyond d1, even to sql setup with transactions or away from sql entirely)

const values = delegations.map((d) => createDelegationRowUpdateV3(d))
await this.#db
.insertInto(this.#delegationsTableName)
.values(values)
.onConflict((oc) => oc.column('cid').doNothing())
.executeTakeFirst()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Why executeTakeFirst ?

}

/** @returns {Promise<bigint>} */
async count() {
return count(this.#db, this.#delegationsTableName)
}

/**
* @param {import('../types/delegations').Query} query
*/
async *find(query) {
const { audience } = query
const delegations = this.#delegationsTableName
const selection = await this.#db
.selectFrom(delegations)
.selectAll()
.where(`${delegations}.audience`, '=', audience)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Would be a good idea to allow filtering by the expiry as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also might be a good idea to make audience optional field on query.

Copy link
Contributor Author

@gobengo gobengo Mar 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add filtering by expiry as followup. We had discussed previously related to #526.
(I think adding column as migration should be easy)

.execute()
for await (const row of selection) {
yield this.#rowToDelegation(row)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Should we perhaps not throw on first error and try to yield as much as we can and then throw ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to fail fast. afaict there is no good reason to keep going if this errors. something is very wrong

}
}

/**
* @param {Pick<import('../types/access-api-cf-db').DelegationsV3Table, 'cid'>} row
* @param {R2Bucket} dags
* @returns {Promise<Ucanto.Delegation>}
*/
async #rowToDelegation(row, dags = this.#dags) {
const { cid } = row
const carBytesR2 = await dags.get(cid.toString())
if (!carBytesR2) {
throw new Error(`failed to read car bytes for cid ${cid.toString()}`)
}
// @todo stream car reading
const carBytes = new Uint8Array(await carBytesR2.arrayBuffer())
const delegations = bytesToDelegations(carBytes)
if (delegations.length !== 1) {
throw new Error(
`expected 1 delegation in CAR, but got ${delegations.length}`
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We could actually create a delegation from the CAR regardless of the number of roots as long as it contains block with that CID.

Here I would instead find delegation that matches the desired CID and fail if not found instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to just check that delegations contains an entry whose cid equals parsed row.cid

const [delegation] = delegations
return delegation
}
}

/**
* @typedef {import('../types/access-api-cf-db').DelegationsV3Table} DelegationsV3Table
* @typedef {import('../types/access-api-cf-db').DelegationsV2Table} DelegationsV2Table
*/

/**
* @template {string} TableName
* @template {Record<TableName, DelegationsV3Table|DelegationsV2Table>} Tables
* @param {import('../types/database').Database<Tables>} db
* @param {TableName} delegationsTable
* @returns {Promise<bigint>} - count of table
*/
async function count(db, delegationsTable) {
const { size } = await db
.selectFrom(delegationsTable)
.select((e) => e.fn.count('cid').as('size'))
.executeTakeFirstOrThrow()
return BigInt(size)
}

/**
* @param {Ucanto.Delegation} ucan
*/
function delegationCarFileKeyer(ucan) {
return `${ucan.cid.toString()}.car`
}

/**
* @param {R2Bucket} bucket
* @param {Iterable<Ucanto.Delegation>} delegations
* @param {(d: Ucanto.Delegation) => string} keyer - builds k/v key strings for each delegation
*/
async function writeDelegations(
bucket,
delegations,
keyer = delegationCarFileKeyer
) {
return writeEntries(
bucket,
[...delegations].map((delegation) => {
const key = delegation.cid.toString()
const carBytes = delegationsToBytes([delegation])
const value = carBytes
return /** @type {[key: string, value: Uint8Array]} */ ([key, value])
})
)
}

/**
* @param {R2Bucket} bucket
* @param {Iterable<readonly [key: string, value: Uint8Array ]>} entries
*/
async function writeEntries(bucket, entries) {
await Promise.all(
[...entries].map(async ([key, value]) => {
return bucket.put(key, value)
})
)
}
Loading