Skip to content

Commit

Permalink
feat!: generate sharded DAG index on client and invoke w index/add (s…
Browse files Browse the repository at this point in the history
…toracha#1451)

This PR generates index data for blocks as CAR shards are constructed.
Once all shards have been successfully sent, a sharded DAG index is
encoded and stored (using `blob/add`) and then `index/add` is invoked
with the CID of the index as a parameter.

BREAKING CHANGE: delegated capabilities required to use `uploadFile`, `uploadDirectory` and `uploadCAR` have changed. In order to use these methods your agent will now need to be delegated `blob/add`, `index/add`, `filecoin/offer` and `upload/add` capabilities. Note: no code changes are required.
  • Loading branch information
Alan Shaw authored May 15, 2024
1 parent eefd885 commit a6d9026
Show file tree
Hide file tree
Showing 29 changed files with 994 additions and 338 deletions.
1 change: 1 addition & 0 deletions packages/access-client/src/access.js
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ export const toCapabilities = (access) => {
export const spaceAccess = {
'space/*': {},
'blob/*': {},
'index/*': {},
'store/*': {},
'upload/*': {},
'access/*': {},
Expand Down
4 changes: 3 additions & 1 deletion packages/upload-api/src/blob/accept.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ export function blobAcceptProvider(context) {
const url =
/** @type {API.URI<'https:'>} */
(
`https://w3s.link/ipfs/${content}?format=raw&origin=${encodeURIComponent(`r2://${R2_REGION}/${R2_BUCKET}`)}`
`https://w3s.link/ipfs/${content}?format=raw&origin=${encodeURIComponent(
`r2://${R2_REGION}/${R2_BUCKET}`
)}`
)

const locationClaim = await Assert.location.delegate({
Expand Down
12 changes: 12 additions & 0 deletions packages/upload-api/test/storage/blobs-storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ export class BlobsStorage {
*/
async stream(digest) {
const key = this.#bucketPath(digest)
if (!this.server) {
const url = new URL(key, this.baseURL)
const res = await fetch(url.toString())
if (res.status === 404) return error(new BlobNotFound(digest))
if (!res.ok || !res.body) {
throw new Error(
`serverless blob storage failed to fetch from: ${url} status: ${res.status}`
)
}
return ok(res.body)
}

const bytes = this.content.get(key)
if (!bytes) return error(new BlobNotFound(digest))

Expand Down
134 changes: 96 additions & 38 deletions packages/upload-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,23 @@ const cid = await uploadDirectory(conf, [
The buffering API loads all data into memory so is suitable only for small files. The root data CID is derived from the data before any transfer to the service takes place.

```js
import { UnixFS, CAR, Store, Upload } from '@web3-storage/upload-client'
import { UnixFS, CAR, Blob, Index, Upload } from '@web3-storage/upload-client'
import * as BlobIndexUtil from '@web3-storage/blob-index/util'
import * as Link from 'multiformats/link'

// Encode a file as a DAG, get back a root data CID and a set of blocks
const { cid, blocks } = await UnixFS.encodeFile(file)
// Encode the DAG as a CAR file
const car = await CAR.encode(blocks, cid)
// Store the CAR file to the service
const carCID = await Store.add(conf, car)
const carDigest = await Blob.add(conf, car)
// Create an index
const index = await BlobIndexUtil.fromShardArchives(cid, [new Uint8Array(await car.arrayBuffer())])
// Store the index to the service
const indexDigest = await Blob.add(conf, (await index.archive()).ok)
await Index.add(conf, Link.create(CAR.code, indexDigest))
// Register an "upload" - a root CID contained within the passed CAR file(s)
await Upload.add(conf, cid, [carCID])
await Upload.add(conf, cid, [Link.create(CAR.code, carDigest)])
```

#### Streaming API
Expand All @@ -97,11 +104,14 @@ This API offers streaming DAG generation, allowing CAR "shards" to be sent to th
import {
UnixFS,
ShardingStream,
Store,
Blob,
Index,
Upload,
} from '@web3-storage/upload-client'
import { ShardedDAGIndex } from '@web3-storage/blob-index'

let rootCID, carCIDs
const shardIndexes = []
// Encode a file as a DAG, get back a readable stream of blocks.
await UnixFS.createFileEncoderStream(file)
// Pipe blocks to a stream that yields CARs files - shards of the DAG.
Expand All @@ -111,13 +121,29 @@ await UnixFS.createFileEncoderStream(file)
.pipeTo(
new WritableStream({
async write (car) {
const carCID = await Store.add(conf, car)
carCIDs.push(carCID)
const carDigest = await Blob.add(conf, car)
carCIDs.push(Link.create(CAR.code, carDigest))

// add the CAR shard itself to the slices
meta.slices.set(carDigest, [0, car.size])
shardIndexes.push(car.slices)

rootCID = rootCID || car.roots[0]
},
})
)

// Combine the shard indexes to create the complete DAG index
const index = ShardedDAGIndex.create(rootCID)
for (const [i, shard] of carCIDs.entries()) {
const slices = shardIndexes[i]
index.shards.set(shard.multihash, slices)
}

// Store the index to the service
const indexDigest = await Blob.add(conf, (await index.archive()).ok)
await Index.add(conf, Link.create(CAR.code, indexDigest))

// Register an "upload" - a root CID contained within the passed CAR file(s)
await Upload.add(conf, rootCID, carCIDs)
```
Expand All @@ -135,12 +161,13 @@ await Upload.add(conf, rootCID, carCIDs)
- [`uploadDirectory`](#uploaddirectory)
- [`uploadFile`](#uploadfile)
- [`uploadCAR`](#uploadcar)
- [`Blob.add`](#blobadd)
- [`Blob.list`](#bloblist)
- [`Blob.remove`](#blobremove)
- [`CAR.BlockStream`](#carblockstream)
- [`CAR.encode`](#carencode)
- [`Index.add`](#indexadd)
- [`ShardingStream`](#shardingstream)
- [`Store.add`](#storeadd)
- [`Store.list`](#storelist)
- [`Store.remove`](#storeremove)
- [`UnixFS.createDirectoryEncoderStream`](#unixfscreatedirectoryencoderstream)
- [`UnixFS.createFileEncoderStream`](#unixfscreatefileencoderstream)
- [`UnixFS.encodeDirectory`](#unixfsencodedirectory)
Expand Down Expand Up @@ -178,7 +205,7 @@ function uploadDirectory(

Uploads a directory of files to the service and returns the root data CID for the generated DAG. All files are added to a container directory, with paths in file names preserved.

Required delegated capability proofs: `store/add`, `upload/add`
Required delegated capability proofs: `blob/add`, `index/add`, `upload/add`, `filecoin/offer`

More information: [`InvocationConfig`](#invocationconfig), [`ShardStoredCallback`](#shardstoredcallback)

Expand All @@ -200,7 +227,7 @@ function uploadFile(

Uploads a file to the service and returns the root data CID for the generated DAG.

Required delegated capability proofs: `store/add`, `upload/add`
Required delegated capability proofs: `blob/add`, `index/add`, `upload/add`, `filecoin/offer`

More information: [`InvocationConfig`](#invocationconfig)

Expand All @@ -221,12 +248,58 @@ function uploadCAR(
): Promise<CID>
```

Uploads a CAR file to the service. The difference between this function and [Store.add](#storeadd) is that the CAR file is automatically sharded and an "upload" is registered (see [`Upload.add`](#uploadadd)), linking the individual shards. Use the `onShardStored` callback to obtain the CIDs of the CAR file shards.
Uploads a CAR file to the service. The difference between this function and [Blob.add](#blobadd) is that the CAR file is automatically sharded, an index is generated, uploaded and registered (see [`Index.add`](#indexadd)) and finally an "upload" is registered (see [`Upload.add`](#uploadadd)), linking the individual shards. Use the `onShardStored` callback to obtain the CIDs of the CAR file shards.

Required delegated capability proofs: `store/add`, `upload/add`
Required delegated capability proofs: `blob/add`, `index/add`, `upload/add`, `filecoin/offer`

More information: [`InvocationConfig`](#invocationconfig), [`ShardStoredCallback`](#shardstoredcallback)

### `Blob.add`

```ts
function add(
blob: Blob,
options: { retries?: number; signal?: AbortSignal } = {}
): Promise<MultihashDigest>
```

Store a blob to the service.

Required delegated capability proofs: `blob/add`

More information: [`InvocationConfig`](#invocationconfig)

### `Blob.list`

```ts
function list(
conf: InvocationConfig,
options: { retries?: number; signal?: AbortSignal } = {}
): Promise<ListResponse<BlobListResult>>
```

List blobs stored in the space.

Required delegated capability proofs: `blob/list`

More information: [`InvocationConfig`](#invocationconfig)

### `Blob.remove`

```ts
function remove(
conf: InvocationConfig,
digest: MultihashDigest,
options: { retries?: number; signal?: AbortSignal } = {}
): Promise<void>
```

Remove a stored blob by multihash digest.

Required delegated capability proofs: `blob/remove`

More information: [`InvocationConfig`](#invocationconfig)

### `CAR.BlockStream`

```ts
Expand All @@ -252,46 +325,31 @@ const { cid, blocks } = await UnixFS.encodeFile(new Blob(['data']))
const car = await CAR.encode(blocks, cid)
```

### `ShardingStream`

```ts
class ShardingStream extends TransformStream<Block, CARFile>
```

Shard a set of blocks into a set of CAR files. The last block written to the stream is assumed to be the DAG root and becomes the CAR root CID for the last CAR output.

More information: [`CARFile`](#carfile)

### `Store.list`
### `Index.add`

```ts
function list(
function add(
conf: InvocationConfig,
index: CID,
options: { retries?: number; signal?: AbortSignal } = {}
): Promise<ListResponse<StoreListResult>>
): Promise<IndexAddResponse>
```

List CAR files stored by the issuer.
Register an "index" with the service. The `index` CID should be the CID of a CAR file, containing an index ad defined by [w3-index](https://github.com/w3s-project/specs/blob/main/w3-index.md).

Required delegated capability proofs: `store/list`
Required delegated capability proofs: `index/add`

More information: [`InvocationConfig`](#invocationconfig)

### `Store.remove`
### `ShardingStream`

```ts
function remove(
conf: InvocationConfig,
link: CID,
options: { retries?: number; signal?: AbortSignal } = {}
): Promise<void>
class ShardingStream extends TransformStream<Block, CARFile>
```

Remove a stored CAR file by CAR CID.

Required delegated capability proofs: `store/remove`
Shard a set of blocks into a set of CAR files. The last block written to the stream is assumed to be the DAG root and becomes the CAR root CID for the last CAR output.

More information: [`InvocationConfig`](#invocationconfig)
More information: [`CARFile`](#carfile)

### `UnixFS.createDirectoryEncoderStream`

Expand Down
3 changes: 2 additions & 1 deletion packages/upload-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@
"@ipld/dag-cbor": "^9.0.6",
"@ipld/dag-ucan": "^3.4.0",
"@ipld/unixfs": "^2.1.1",
"@ucanto/core": "^10.0.1",
"@ucanto/client": "^9.0.1",
"@ucanto/core": "^10.0.1",
"@ucanto/interface": "^10.0.1",
"@ucanto/transport": "^9.1.1",
"@web3-storage/blob-index": "workspace:^",
"@web3-storage/capabilities": "workspace:^",
"@web3-storage/data-segment": "^5.1.0",
"@web3-storage/filecoin-client": "workspace:^",
Expand Down
13 changes: 10 additions & 3 deletions packages/upload-client/src/car.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import varint from 'varint'
* @typedef {import('@ipld/unixfs').Block} Block
*/

export const code = 0x0202

/** Byte length of a CBOR encoded CAR header with zero roots. */
const NO_ROOTS_HEADER_LENGTH = 17
const NO_ROOTS_HEADER_LENGTH = 18

/** @param {import('./types.js').AnyLink} [root] */
export function headerEncodingLength(root) {
Expand All @@ -18,10 +20,15 @@ export function headerEncodingLength(root) {
}

/** @param {Block} block */
export function blockEncodingLength(block) {
export function blockHeaderEncodingLength(block) {
const payloadLength = block.cid.bytes.length + block.bytes.length
const varintLength = varint.encodingLength(payloadLength)
return varintLength + payloadLength
return varintLength + block.cid.bytes.length
}

/** @param {Block} block */
export function blockEncodingLength(block) {
return blockHeaderEncodingLength(block) + block.bytes.length
}

/**
Expand Down
63 changes: 63 additions & 0 deletions packages/upload-client/src/dag-index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import * as IndexCapabilities from '@web3-storage/capabilities/index'
import { SpaceDID } from '@web3-storage/capabilities/utils'
import retry from 'p-retry'
import { servicePrincipal, connection } from './service.js'
import { REQUEST_RETRIES } from './constants.js'

/**
* Register an "index" with the service. The issuer needs the `index/add`
* delegated capability.
*
* Required delegated capability proofs: `index/add`
*
* @param {import('./types.js').InvocationConfig} conf Configuration
* for the UCAN invocation. An object with `issuer`, `with` and `proofs`.
*
* The `issuer` is the signing authority that is issuing the UCAN
* invocation(s). It is typically the user _agent_.
*
* The `with` is the resource the invocation applies to. It is typically the
* DID of a space.
*
* The `proofs` are a set of capability delegations that prove the issuer
* has the capability to perform the action.
*
* The issuer needs the `index/add` delegated capability.
* @param {import('./types.js').CARLink} index Index to store.
* @param {import('./types.js').RequestOptions} [options]
* @returns {Promise<import('./types.js').IndexAddSuccess>}
*/
export async function add(
{ issuer, with: resource, proofs, audience },
index,
options = {}
) {
/* c8 ignore next */
const conn = options.connection ?? connection
const result = await retry(
async () => {
return await IndexCapabilities.add
.invoke({
issuer,
/* c8 ignore next */
audience: audience ?? servicePrincipal,
with: SpaceDID.from(resource),
nb: { index },
proofs,
})
.execute(conn)
},
{
onFailedAttempt: console.warn,
retries: options.retries ?? REQUEST_RETRIES,
}
)

if (!result.out.ok) {
throw new Error(`failed ${IndexCapabilities.add.can} invocation`, {
cause: result.out.error,
})
}

return result.out.ok
}
Loading

0 comments on commit a6d9026

Please sign in to comment.