-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: check for blob/accept receipts before blob/add is concluded #1459
Changes from 33 commits
06e42e9
3ac98a3
f186499
8773f33
cbf1bf0
5a55ba2
62c32e6
bcca6c7
a089685
9764432
8ab1e9d
00326be
9b84fbd
61280e0
021f353
8382376
37f8c84
14cae1c
3212305
161e086
5e7c5c0
d0dc1ea
9b53536
89711ea
b732be4
740975e
412ea48
24258da
7ee6b58
06606b1
74e4c7a
444dd8c
6c650c5
7ad41d7
581cb82
46a11b9
1faa3dc
1f08812
c045997
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,14 +2,15 @@ import { sha256 } from 'multiformats/hashes/sha2' | |
import { ed25519 } from '@ucanto/principal' | ||
import { conclude } from '@web3-storage/capabilities/ucan' | ||
import * as UCAN from '@web3-storage/capabilities/ucan' | ||
import { Receipt } from '@ucanto/core' | ||
import { Delegation, Receipt } from '@ucanto/core' | ||
import * as W3sBlobCapabilities from '@web3-storage/capabilities/web3.storage/blob' | ||
import * as BlobCapabilities from '@web3-storage/capabilities/blob' | ||
import * as HTTPCapabilities from '@web3-storage/capabilities/http' | ||
import { SpaceDID } from '@web3-storage/capabilities/utils' | ||
import retry, { AbortError } from 'p-retry' | ||
import { servicePrincipal, connection } from './service.js' | ||
import { REQUEST_RETRIES } from './constants.js' | ||
import { Receipt as ReceiptPoller } from './receipts.js' | ||
|
||
/** | ||
* @param {string} url | ||
|
@@ -166,7 +167,7 @@ export function createConcludeInvocation(id, serviceDid, receipt) { | |
* The issuer needs the `blob/add` delegated capability. | ||
* @param {Blob|Uint8Array} data Blob data. | ||
* @param {import('./types.js').RequestOptions} [options] | ||
* @returns {Promise<import('multiformats').MultihashDigest>} | ||
* @returns {Promise<import('./types.js').BlobAddOk>} | ||
*/ | ||
export async function add( | ||
{ issuer, with: resource, proofs, audience }, | ||
|
@@ -303,7 +304,38 @@ export async function add( | |
}) | ||
} | ||
|
||
return multihash | ||
// Ensure the blob has been accepted | ||
const acceptReceipt = await new ReceiptPoller(options).poll( | ||
nextTasks.accept.task.link() | ||
) | ||
|
||
const blocks = new Map( | ||
[...acceptReceipt.iterateIPLDBlocks()].map((block) => [ | ||
`${block.cid}`, | ||
block, | ||
]) | ||
) | ||
const site = Delegation.view( | ||
{ | ||
root: /** @type {import('@ucanto/interface').UCANLink} */ ( | ||
// @ts-ignore Property | ||
acceptReceipt.out.ok.site | ||
), | ||
blocks, | ||
}, | ||
null | ||
) | ||
/* c8 ignore next 5 */ | ||
if (!site) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why would this be falsey? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh you need to not pass There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed |
||
throw new Error(`failed ${BlobCapabilities.add.can} invocation`, { | ||
cause: 'failed to get blob/accept receipt delegation view', | ||
}) | ||
} | ||
|
||
return { | ||
multihash, | ||
site, | ||
} | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,10 +10,11 @@ import * as Upload from './upload.js' | |
import * as UnixFS from './unixfs.js' | ||
import * as CAR from './car.js' | ||
import { ShardingStream, defaultFileComparator } from './sharding.js' | ||
import { codec as carCodec } from '@ucanto/transport/car' | ||
|
||
export { Blob, Index, Store, Upload, UnixFS, CAR } | ||
export * from './sharding.js' | ||
export { receiptsEndpoint } from './service.js' | ||
export { Receipt } from './receipts.js' | ||
|
||
/** | ||
* Uploads a file to the service and returns the root data CID for the | ||
|
@@ -144,9 +145,9 @@ async function uploadBlockStream( | |
async transform(car, controller) { | ||
const bytes = new Uint8Array(await car.arrayBuffer()) | ||
// Invoke blob/add and write bytes to write target | ||
const multihash = await Blob.add(conf, bytes, options) | ||
const { multihash } = await Blob.add(conf, bytes, options) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's make sure to create an issue where we can follow up to have user make the site delegation public by default by invoking the Claim delegation There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. created #1486 👍 |
||
// Should this be raw instead? | ||
const cid = Link.create(carCodec.code, multihash) | ||
const cid = Link.create(CAR.code, multihash) | ||
let piece | ||
if (pieceHasher) { | ||
const multihashDigest = await pieceHasher.digest(bytes) | ||
|
@@ -199,20 +200,15 @@ async function uploadBlockStream( | |
/* c8 ignore next */ | ||
if (!root) throw new Error('missing root CID') | ||
|
||
const index = ShardedDAGIndex.create(root) | ||
for (const [i, shard] of shards.entries()) { | ||
const slices = shardIndexes[i] | ||
index.shards.set(shard.multihash, slices) | ||
} | ||
const indexBytes = await index.archive() | ||
const indexBytes = await indexShardedDAG(root, shards, shardIndexes) | ||
/* c8 ignore next 3 */ | ||
if (!indexBytes.ok) { | ||
throw new Error('failed to archive DAG index', { cause: indexBytes.error }) | ||
} | ||
|
||
// Store the index in the space | ||
const indexDigest = await Blob.add(conf, indexBytes.ok, options) | ||
const indexLink = Link.create(carCodec.code, indexDigest) | ||
const { multihash } = await Blob.add(conf, indexBytes.ok, options) | ||
const indexLink = Link.create(CAR.code, multihash) | ||
|
||
// Register the index with the service | ||
await Index.add(conf, indexLink, options) | ||
|
@@ -221,3 +217,19 @@ async function uploadBlockStream( | |
|
||
return root | ||
} | ||
|
||
/** | ||
* Indexes a sharded DAG | ||
* | ||
* @param {import('multiformats').Link} root | ||
* @param {import('./types.js').CARLink[]} shards | ||
* @param {Array<Map<import('./types.js').SliceDigest, import('./types.js').Position>>} shardIndexes | ||
*/ | ||
export async function indexShardedDAG(root, shards, shardIndexes) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be an export from the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed |
||
const index = ShardedDAGIndex.create(root) | ||
for (const [i, shard] of shards.entries()) { | ||
const slices = shardIndexes[i] | ||
index.shards.set(shard.multihash, slices) | ||
} | ||
return await index.archive() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
import retry, { AbortError } from 'p-retry' | ||
import { CAR } from '@ucanto/transport' | ||
import { receiptsEndpoint } from './service.js' | ||
import { REQUEST_RETRIES } from './constants.js' | ||
|
||
export class ReceiptNotFound extends Error { | ||
/** | ||
* @param {import('multiformats').UnknownLink} taskCid | ||
*/ | ||
constructor(taskCid) { | ||
super() | ||
this.taskCid = taskCid | ||
} | ||
|
||
/* c8 ignore start */ | ||
get reason() { | ||
return `receipt not found for task ${this.taskCid}` | ||
joaosa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
/* c8 ignore end */ | ||
|
||
get name() { | ||
return 'ReceiptNotFound' | ||
} | ||
} | ||
|
||
export class ReceiptMissing extends Error { | ||
/** | ||
* @param {import('multiformats').UnknownLink} taskCid | ||
*/ | ||
constructor(taskCid) { | ||
super() | ||
this.taskCid = taskCid | ||
} | ||
|
||
/* c8 ignore start */ | ||
get reason() { | ||
return `receipt missing for task ${this.taskCid}` | ||
} | ||
/* c8 ignore end */ | ||
|
||
get name() { | ||
return 'ReceiptMissing' | ||
} | ||
} | ||
|
||
export class Receipt { | ||
/** | ||
* @param {import('./types.js').RequestOptions} [options] | ||
*/ | ||
constructor(options = {}) { | ||
/* c8 ignore start */ | ||
this.receiptsEndpoint = options.receiptsEndpoint ?? receiptsEndpoint | ||
this.retries = options.retries ?? REQUEST_RETRIES | ||
this.fetch = options.fetch ?? globalThis.fetch.bind(globalThis) | ||
/* c8 ignore stop */ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: IMHO it's overkill to use a class here, since there's no state that changes for the lifetime of the instance and in usage you're never storing the instance to a variable (i.e. You could just export 2 methods, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. alright sounds good. addressed as well |
||
} | ||
|
||
/** | ||
* Polls for a receipt for an executed task by its CID. | ||
* | ||
* @param {import('multiformats').UnknownLink} taskCid | ||
* @returns {Promise<import('@ucanto/interface').Receipt>} | ||
*/ | ||
async poll(taskCid) { | ||
return await retry( | ||
async () => { | ||
const res = await this.get(taskCid) | ||
if (res.error) { | ||
// @ts-ignore | ||
if (res.error.name === 'ReceiptNotFound') { | ||
// throw an error that will cause `p-retry` to retry with | ||
throw new ReceiptNotFound(taskCid) | ||
joaosa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} else { | ||
throw new AbortError( | ||
new Error('failed to fetch blob/accept receipt', { | ||
cause: res.error, | ||
}) | ||
) | ||
} | ||
} | ||
return res.ok | ||
}, | ||
{ | ||
onFailedAttempt: console.warn, | ||
/* c8 ignore next */ | ||
retries: this.retries ?? REQUEST_RETRIES, | ||
} | ||
) | ||
} | ||
|
||
/** | ||
* Get a receipt for an executed task by its CID. | ||
* | ||
* @param {import('multiformats').UnknownLink} taskCid | ||
* @returns {Promise<import('@ucanto/client').Result<import('@ucanto/interface').Receipt, Error>>} | ||
*/ | ||
async get(taskCid) { | ||
// Fetch receipt from endpoint | ||
const url = new URL(taskCid.toString(), this.receiptsEndpoint) | ||
const workflowResponse = await this.fetch(url) | ||
/* c8 ignore start */ | ||
if (!workflowResponse.ok && workflowResponse.status === 404) { | ||
joaosa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return { | ||
error: new ReceiptNotFound(taskCid), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't this only be on not response ok + Status 404? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. true! addressed |
||
} | ||
} | ||
/* c8 ignore stop */ | ||
// Get receipt from Message Archive | ||
const agentMessageBytes = new Uint8Array( | ||
await workflowResponse.arrayBuffer() | ||
) | ||
// Decode message | ||
const agentMessage = await CAR.request.decode({ | ||
body: agentMessageBytes, | ||
headers: {}, | ||
}) | ||
// Get receipt from the potential multiple receipts in the message | ||
const receipt = agentMessage.receipts.get(taskCid.toString()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: It's kinda odd that this endpoint returns an I wonder if this should just always return an agent message (but with no receipts in it if not found) and then you wouldn't have to check for HTTP 404 status as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is because we in the past decided to redirect to the workflow object in S3, so there is really no way to know it exists, unless we would do redundant HEAD Request... We will need to UCANify the gets for workflows/receipts and, we may do changes at that point to the endpoint There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah ok gotcha! |
||
if (!receipt) { | ||
return { | ||
error: new ReceiptMissing(taskCid), | ||
} | ||
} | ||
return { | ||
ok: receipt, | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should check for
error
and then you don't need tots-ignore
here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I need at all anymore, as previously the receipt acquisition method could return null. Now it's either a receipt or it throws.