Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: check for blob/accept receipts before blob/add is concluded #1459

Merged
merged 39 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
06e42e9
fix: check for blob/accept receipts before blob/add is concluded
joaosa May 14, 2024
3ac98a3
fix: address passing receipts endpoint
joaosa May 15, 2024
f186499
fix: return the site multihash for blob/add
joaosa May 21, 2024
8773f33
feat: return the location commitment for blob/add
joaosa May 23, 2024
cbf1bf0
fix: address tests to return location commitment for blob/add
joaosa May 23, 2024
5a55ba2
fix: address index/add tests
joaosa May 23, 2024
62c32e6
fix: use generators for getting receipts in upload-client tests
joaosa May 24, 2024
bcca6c7
fix: use generators for getting receipts in w3up-client tests
joaosa May 27, 2024
a089685
fix: pass w3up-client tests
joaosa May 28, 2024
9764432
chore: bump content-claims to 5.0.0
joaosa May 30, 2024
8ab1e9d
chore: cleanup receipts endpoint setup
joaosa May 30, 2024
00326be
fix: relock deps
joaosa May 30, 2024
9b84fbd
chore: remove unneeded generator from helper
joaosa May 30, 2024
61280e0
feat: wrap blob add response
joaosa May 30, 2024
021f353
Merge branch 'main' into fix/blob-add-cli
joaosa May 31, 2024
8382376
chore: reuse getReceipt code
joaosa Jun 3, 2024
37f8c84
fix: address tests to use space/blob/*
joaosa Jun 3, 2024
14cae1c
chore: add receipts server to upload-client
joaosa Jun 3, 2024
3212305
chore: extract receipts polling
joaosa Jun 3, 2024
161e086
chore: remove receipt mocking from upload-client tests
joaosa Jun 3, 2024
5e7c5c0
chore: remove receipt mocking from w3up-client tests
joaosa Jun 3, 2024
d0dc1ea
chore: add a test to cover failing to get a receipt
joaosa Jun 3, 2024
9b53536
chore: do not cover options
joaosa Jun 3, 2024
89711ea
fix: load receipt fixtures correctly
joaosa Jun 3, 2024
b732be4
chore: revert code change
joaosa Jun 4, 2024
740975e
chore: distinguish receipt errors
joaosa Jun 4, 2024
412ea48
chore: propagate the receipt not found error
joaosa Jun 4, 2024
24258da
fix: filter out getting the receipt not found error
joaosa Jun 4, 2024
7ee6b58
chore: remove redundant error
joaosa Jun 4, 2024
06606b1
chore: add receipt missing error
joaosa Jun 4, 2024
74e4c7a
fix: return blob/add location commitment delegation
joaosa Jun 4, 2024
444dd8c
chore: readd whitespace to avoid release
joaosa Jun 4, 2024
6c650c5
fix: test blob/add location commitment
joaosa Jun 4, 2024
7ad41d7
chore: cleanup error propagation
joaosa Jun 4, 2024
581cb82
chore: improve error desc
joaosa Jun 4, 2024
46a11b9
chore: simplify response check
joaosa Jun 4, 2024
1faa3dc
chore: remove unneeded error check
joaosa Jun 4, 2024
1f08812
chore: move indexShardedDAG
joaosa Jun 4, 2024
c045997
chore: break down the receipt class
joaosa Jun 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/upload-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"test:node": "hundreds -r html -r text mocha 'test/**/!(*.browser).test.js' -n experimental-vm-modules -n no-warnings",
"test:browser": "playwright-test 'test/**/!(*.node).test.js'",
"mock": "run-p mock:*",
"mock:receipts-server": "PORT=9201 node test/helpers/receipts-server.js",
"mock:bucket-200": "PORT=9200 STATUS=200 node test/helpers/bucket-server.js",
"mock:bucket-401": "PORT=9400 STATUS=400 node test/helpers/bucket-server.js",
"mock:bucket-500": "PORT=9500 STATUS=500 node test/helpers/bucket-server.js",
Expand Down Expand Up @@ -94,6 +95,7 @@
"@types/varint": "^6.0.1",
"@ucanto/principal": "^9.0.1",
"@ucanto/server": "^10.0.0",
"@web3-storage/content-claims": "^5.0.0",
"@web3-storage/eslint-config-w3up": "workspace:^",
"assert": "^2.0.0",
"blockstore-core": "^3.0.0",
Expand Down
38 changes: 35 additions & 3 deletions packages/upload-client/src/blob.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ import { sha256 } from 'multiformats/hashes/sha2'
import { ed25519 } from '@ucanto/principal'
import { conclude } from '@web3-storage/capabilities/ucan'
import * as UCAN from '@web3-storage/capabilities/ucan'
import { Receipt } from '@ucanto/core'
import { Delegation, Receipt } from '@ucanto/core'
import * as W3sBlobCapabilities from '@web3-storage/capabilities/web3.storage/blob'
import * as BlobCapabilities from '@web3-storage/capabilities/blob'
import * as HTTPCapabilities from '@web3-storage/capabilities/http'
import { SpaceDID } from '@web3-storage/capabilities/utils'
import retry, { AbortError } from 'p-retry'
import { servicePrincipal, connection } from './service.js'
import { REQUEST_RETRIES } from './constants.js'
import { Receipt as ReceiptPoller } from './receipts.js'

/**
* @param {string} url
Expand Down Expand Up @@ -166,7 +167,7 @@ export function createConcludeInvocation(id, serviceDid, receipt) {
* The issuer needs the `blob/add` delegated capability.
* @param {Blob|Uint8Array} data Blob data.
* @param {import('./types.js').RequestOptions} [options]
* @returns {Promise<import('multiformats').MultihashDigest>}
* @returns {Promise<import('./types.js').BlobAddOk>}
*/
export async function add(
{ issuer, with: resource, proofs, audience },
Expand Down Expand Up @@ -303,7 +304,38 @@ export async function add(
})
}

return multihash
// Ensure the blob has been accepted
const acceptReceipt = await new ReceiptPoller(options).poll(
nextTasks.accept.task.link()
)

const blocks = new Map(
[...acceptReceipt.iterateIPLDBlocks()].map((block) => [
`${block.cid}`,
block,
])
)
const site = Delegation.view(
{
root: /** @type {import('@ucanto/interface').UCANLink} */ (
// @ts-ignore Property
acceptReceipt.out.ok.site
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should check for error and then you don't need to ts-ignore here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I need at all anymore, as previously the receipt acquisition method could return null. Now it's either a receipt or it throws.

),
blocks,
},
null
)
/* c8 ignore next 5 */
if (!site) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this be falsey?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh you need to not pass null above (pass undefined or simply omit) for view(...) to throw in the case where the root does not exist in the passed blocks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

throw new Error(`failed ${BlobCapabilities.add.can} invocation`, {
cause: 'failed to get blob/accept receipt delegation view',
})
}

return {
multihash,
site,
}
}

/**
Expand Down
34 changes: 23 additions & 11 deletions packages/upload-client/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import * as Upload from './upload.js'
import * as UnixFS from './unixfs.js'
import * as CAR from './car.js'
import { ShardingStream, defaultFileComparator } from './sharding.js'
import { codec as carCodec } from '@ucanto/transport/car'

export { Blob, Index, Store, Upload, UnixFS, CAR }
export * from './sharding.js'
export { receiptsEndpoint } from './service.js'
export { Receipt } from './receipts.js'

/**
* Uploads a file to the service and returns the root data CID for the
Expand Down Expand Up @@ -144,9 +145,9 @@ async function uploadBlockStream(
async transform(car, controller) {
const bytes = new Uint8Array(await car.arrayBuffer())
// Invoke blob/add and write bytes to write target
const multihash = await Blob.add(conf, bytes, options)
const { multihash } = await Blob.add(conf, bytes, options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make sure to create an issue where we can follow up to have user make the site delegation public by default by invoking the Claim delegation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created #1486 👍

// Should this be raw instead?
const cid = Link.create(carCodec.code, multihash)
const cid = Link.create(CAR.code, multihash)
let piece
if (pieceHasher) {
const multihashDigest = await pieceHasher.digest(bytes)
Expand Down Expand Up @@ -199,20 +200,15 @@ async function uploadBlockStream(
/* c8 ignore next */
if (!root) throw new Error('missing root CID')

const index = ShardedDAGIndex.create(root)
for (const [i, shard] of shards.entries()) {
const slices = shardIndexes[i]
index.shards.set(shard.multihash, slices)
}
const indexBytes = await index.archive()
const indexBytes = await indexShardedDAG(root, shards, shardIndexes)
/* c8 ignore next 3 */
if (!indexBytes.ok) {
throw new Error('failed to archive DAG index', { cause: indexBytes.error })
}

// Store the index in the space
const indexDigest = await Blob.add(conf, indexBytes.ok, options)
const indexLink = Link.create(carCodec.code, indexDigest)
const { multihash } = await Blob.add(conf, indexBytes.ok, options)
const indexLink = Link.create(CAR.code, multihash)

// Register the index with the service
await Index.add(conf, indexLink, options)
Expand All @@ -221,3 +217,19 @@ async function uploadBlockStream(

return root
}

/**
* Indexes a sharded DAG
*
* @param {import('multiformats').Link} root
* @param {import('./types.js').CARLink[]} shards
* @param {Array<Map<import('./types.js').SliceDigest, import('./types.js').Position>>} shardIndexes
*/
export async function indexShardedDAG(root, shards, shardIndexes) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be an export from the blob-index utils.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

const index = ShardedDAGIndex.create(root)
for (const [i, shard] of shards.entries()) {
const slices = shardIndexes[i]
index.shards.set(shard.multihash, slices)
}
return await index.archive()
}
128 changes: 128 additions & 0 deletions packages/upload-client/src/receipts.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import retry, { AbortError } from 'p-retry'
import { CAR } from '@ucanto/transport'
import { receiptsEndpoint } from './service.js'
import { REQUEST_RETRIES } from './constants.js'

export class ReceiptNotFound extends Error {
/**
* @param {import('multiformats').UnknownLink} taskCid
*/
constructor(taskCid) {
super()
this.taskCid = taskCid
}

/* c8 ignore start */
get reason() {
return `receipt not found for task ${this.taskCid}`
joaosa marked this conversation as resolved.
Show resolved Hide resolved
}
/* c8 ignore end */

get name() {
return 'ReceiptNotFound'
}
}

export class ReceiptMissing extends Error {
/**
* @param {import('multiformats').UnknownLink} taskCid
*/
constructor(taskCid) {
super()
this.taskCid = taskCid
}

/* c8 ignore start */
get reason() {
return `receipt missing for task ${this.taskCid}`
}
/* c8 ignore end */

get name() {
return 'ReceiptMissing'
}
}

export class Receipt {
/**
* @param {import('./types.js').RequestOptions} [options]
*/
constructor(options = {}) {
/* c8 ignore start */
this.receiptsEndpoint = options.receiptsEndpoint ?? receiptsEndpoint
this.retries = options.retries ?? REQUEST_RETRIES
this.fetch = options.fetch ?? globalThis.fetch.bind(globalThis)
/* c8 ignore stop */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: IMHO it's overkill to use a class here, since there's no state that changes for the lifetime of the instance and in usage you're never storing the instance to a variable (i.e. await new ReceiptPoller(options).poll(...)).

You could just export 2 methods, poll and get, pass them options objects and import like import * as Receipt from './receipts.js'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright sounds good. addressed as well

}

/**
* Polls for a receipt for an executed task by its CID.
*
* @param {import('multiformats').UnknownLink} taskCid
* @returns {Promise<import('@ucanto/interface').Receipt>}
*/
async poll(taskCid) {
return await retry(
async () => {
const res = await this.get(taskCid)
if (res.error) {
// @ts-ignore
if (res.error.name === 'ReceiptNotFound') {
// throw an error that will cause `p-retry` to retry with
throw new ReceiptNotFound(taskCid)
joaosa marked this conversation as resolved.
Show resolved Hide resolved
} else {
throw new AbortError(
new Error('failed to fetch blob/accept receipt', {
cause: res.error,
})
)
}
}
return res.ok
},
{
onFailedAttempt: console.warn,
/* c8 ignore next */
retries: this.retries ?? REQUEST_RETRIES,
}
)
}

/**
* Get a receipt for an executed task by its CID.
*
* @param {import('multiformats').UnknownLink} taskCid
* @returns {Promise<import('@ucanto/client').Result<import('@ucanto/interface').Receipt, Error>>}
*/
async get(taskCid) {
// Fetch receipt from endpoint
const url = new URL(taskCid.toString(), this.receiptsEndpoint)
const workflowResponse = await this.fetch(url)
/* c8 ignore start */
if (!workflowResponse.ok && workflowResponse.status === 404) {
joaosa marked this conversation as resolved.
Show resolved Hide resolved
return {
error: new ReceiptNotFound(taskCid),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this only be on not response ok + Status 404?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true! addressed

}
}
/* c8 ignore stop */
// Get receipt from Message Archive
const agentMessageBytes = new Uint8Array(
await workflowResponse.arrayBuffer()
)
// Decode message
const agentMessage = await CAR.request.decode({
body: agentMessageBytes,
headers: {},
})
// Get receipt from the potential multiple receipts in the message
const receipt = agentMessage.receipts.get(taskCid.toString())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It's kinda odd that this endpoint returns an AgentMessage but is not a ucanto endpoint...

I wonder if this should just always return an agent message (but with no receipts in it if not found) and then you wouldn't have to check for HTTP 404 status as well.

Copy link
Contributor

@vasco-santos vasco-santos Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because we in the past decided to redirect to the workflow object in S3, so there is really no way to know it exists, unless we would do redundant HEAD Request... We will need to UCANify the gets for workflows/receipts and, we may do changes at that point to the endpoint

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok gotcha!

if (!receipt) {
return {
error: new ReceiptMissing(taskCid),
}
}
return {
ok: receipt,
}
}
}
1 change: 1 addition & 0 deletions packages/upload-client/src/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as DID from '@ipld/dag-ucan/did'

export const serviceURL = new URL('https://up.web3.storage')
export const servicePrincipal = DID.parse('did:web:web3.storage')
export const receiptsEndpoint = 'https://up.web3.storage/receipt/'

/** @type {import('@ucanto/interface').ConnectionView<import('./types.js').Service>} */
export const connection = connect({
Expand Down
16 changes: 15 additions & 1 deletion packages/upload-client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ import type {
FetchOptions as IpfsUtilsFetchOptions,
ProgressStatus as XHRProgressStatus,
} from 'ipfs-utils/src/types.js'
import { Link, UnknownLink, Version, MultihashHasher } from 'multiformats'
import {
MultihashDigest,
Link,
UnknownLink,
Version,
MultihashHasher,
} from 'multiformats'
import { Block } from '@ipld/unixfs'
import {
ServiceMethod,
Expand All @@ -12,6 +18,8 @@ import {
DID,
Principal,
Failure,
Delegation,
Capabilities,
} from '@ucanto/interface'
import {
UCANConclude,
Expand Down Expand Up @@ -307,6 +315,7 @@ export interface RequestOptions
UploadProgressTrackable {
fetch?: typeof globalThis.fetch
nonce?: string
receiptsEndpoint?: string
}

export interface ListRequestOptions extends RequestOptions, Pageable {}
Expand Down Expand Up @@ -374,3 +383,8 @@ export interface FileLike extends BlobLike {
*/
name: string
}

export interface BlobAddOk {
multihash: MultihashDigest
site: Delegation<Capabilities>
}
Loading
Loading