-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: check for blob/accept receipts before blob/add is concluded #1459
Changes from 14 commits
06e42e9
3ac98a3
f186499
8773f33
cbf1bf0
5a55ba2
62c32e6
bcca6c7
a089685
9764432
8ab1e9d
00326be
9b84fbd
61280e0
021f353
8382376
37f8c84
14cae1c
3212305
161e086
5e7c5c0
d0dc1ea
9b53536
89711ea
b732be4
740975e
412ea48
24258da
7ee6b58
06606b1
74e4c7a
444dd8c
6c650c5
7ad41d7
581cb82
46a11b9
1faa3dc
1f08812
c045997
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,15 @@ | ||
import { CAR } from '@ucanto/transport' | ||
import { sha256 } from 'multiformats/hashes/sha2' | ||
import { ed25519 } from '@ucanto/principal' | ||
import { conclude } from '@web3-storage/capabilities/ucan' | ||
import * as UCAN from '@web3-storage/capabilities/ucan' | ||
import { Receipt } from '@ucanto/core' | ||
import { Delegation, Receipt } from '@ucanto/core' | ||
import * as W3sBlobCapabilities from '@web3-storage/capabilities/web3.storage/blob' | ||
import * as BlobCapabilities from '@web3-storage/capabilities/blob' | ||
import * as HTTPCapabilities from '@web3-storage/capabilities/http' | ||
import { SpaceDID } from '@web3-storage/capabilities/utils' | ||
import retry, { AbortError } from 'p-retry' | ||
import { servicePrincipal, connection } from './service.js' | ||
import { receiptsEndpoint, servicePrincipal, connection } from './service.js' | ||
import { REQUEST_RETRIES } from './constants.js' | ||
|
||
/** | ||
|
@@ -26,6 +27,40 @@ function createUploadProgressHandler(url, handler) { | |
return onUploadProgress | ||
} | ||
|
||
// FIXME this code was copied over from w3up-client and modified to parameterise receiptsEndpoint | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I though I wrote this earlier, but can't find it now. Can we make w3up to use this function from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed. I think I somehow mixed it with doing the same for the receipts endpoint |
||
/** | ||
* Get a receipt for an executed task by its CID. | ||
* | ||
* @param {import('multiformats').UnknownLink} taskCid | ||
* @param {import('./types.js').RequestOptions} [options] | ||
*/ | ||
async function getReceipt(taskCid, options = {}) { | ||
// Fetch receipt from endpoint | ||
const url = new URL( | ||
taskCid.toString(), | ||
options.receiptsEndpoint ?? receiptsEndpoint | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One thing I'm wondering about is if there's a better way to achieve this param config and what kind implications this would have in terms of configuration in a testing environment 🤔 . Also, I'm usually wary of defaulting to production, but I see this is a pattern on the codebase and is, as such, likely to be okay here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, I think this is the pattern here of defaulting to prod. So would say is good |
||
) | ||
/* c8 ignore next */ | ||
const fetchReceipt = options.fetch ?? globalThis.fetch.bind(globalThis) | ||
const workflowResponse = await fetchReceipt(url) | ||
/* c8 ignore start */ | ||
if (!workflowResponse.ok) { | ||
throw new Error( | ||
`no receipt available for requested task ${taskCid.toString()}` | ||
) | ||
} | ||
/* c8 ignore stop */ | ||
// Get receipt from Message Archive | ||
const agentMessageBytes = new Uint8Array(await workflowResponse.arrayBuffer()) | ||
// Decode message | ||
const agentMessage = await CAR.request.decode({ | ||
body: agentMessageBytes, | ||
headers: {}, | ||
}) | ||
// Get receipt from the potential multiple receipts in the message | ||
return agentMessage.receipts.get(taskCid.toString()) | ||
} | ||
|
||
// FIXME this code has been copied over from upload-api | ||
/** | ||
* @param {import('@ucanto/interface').Invocation} concludeFx | ||
|
@@ -166,7 +201,7 @@ export function createConcludeInvocation(id, serviceDid, receipt) { | |
* The issuer needs the `blob/add` delegated capability. | ||
* @param {Blob|Uint8Array} data Blob data. | ||
* @param {import('./types.js').RequestOptions} [options] | ||
* @returns {Promise<import('multiformats').MultihashDigest>} | ||
* @returns {Promise<import('./types.js').BlobAddOk>} | ||
*/ | ||
export async function add( | ||
{ issuer, with: resource, proofs, audience }, | ||
|
@@ -303,7 +338,57 @@ export async function add( | |
}) | ||
} | ||
|
||
return multihash | ||
// Ensure the blob has been accepted | ||
const acceptReceipt = await retry( | ||
async () => { | ||
try { | ||
return await getReceipt(nextTasks.accept.task.link(), options) | ||
} catch (err) { | ||
throw new Error(`failed ${BlobCapabilities.add.can} invocation`, { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this the correct message here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice to distinguish between not there yet vs an actual error, and quit early from retry if the latter. Typically new code returns const acceptReceipt = await retry(async () => {
const res = await getReceipt(nextTasks.accept.task.link(), options)
if (res.error) {
if (res.error.name === 'ReceiptNotFound') {
// throw an error that will cause `p-retry` to retry with
throw new Error('receipt not yet available')
} else {
// bail the retry using `import { AbortError } from 'p-retry'`
throw new AbortError(new Error('failed to fetch blob/accept receipt', { cause: res.error }))
}
}
return res.ok
}) Not 100% necessary but I would then consider extracting this as a utility like: const acceptReceipt = await Receipt.poll(nextTasks.accept.task.link(), options) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. working on this 🙏 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
cause: err, | ||
}) | ||
} | ||
}, | ||
{ | ||
onFailedAttempt: console.warn, | ||
retries: options.retries ?? REQUEST_RETRIES, | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would be great to test that these defaults work nicely in a real environment, as they may exist a delay in between the receipt to be there. Did you try it against staging? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have not tried this but that is a good point |
||
) | ||
|
||
// @ts-ignore Property | ||
if (!acceptReceipt?.out.ok?.site) { | ||
throw new Error(`failed ${BlobCapabilities.add.can} invocation`, { | ||
cause: 'failed to get blob/accept receipt', | ||
}) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: would be good to distinct this error, from Not Found, or Other errors There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah I'd just use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed |
||
|
||
const blocks = new Map( | ||
[...acceptReceipt.iterateIPLDBlocks()].map((block) => [ | ||
`${block.cid}`, | ||
block, | ||
]) | ||
) | ||
const site = Delegation.view( | ||
{ | ||
root: /** @type {import('@ucanto/interface').UCANLink} */ ( | ||
// @ts-ignore Property | ||
acceptReceipt.out.ok.site | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should check for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think I need at all anymore, as previously the receipt acquisition method could return null. Now it's either a receipt or it throws. |
||
), | ||
blocks, | ||
}, | ||
null | ||
) | ||
/* c8 ignore next 5 */ | ||
if (!site) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why would this be falsey? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh you need to not pass There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed |
||
throw new Error(`failed ${BlobCapabilities.add.can} invocation`, { | ||
cause: 'failed to get blob/accept receipt', | ||
}) | ||
} | ||
|
||
return { | ||
multihash, | ||
site: site.link(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should return the delegation, so that the client can invoke it if intended There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good point! addressed |
||
} | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,10 +10,10 @@ import * as Upload from './upload.js' | |
import * as UnixFS from './unixfs.js' | ||
import * as CAR from './car.js' | ||
import { ShardingStream, defaultFileComparator } from './sharding.js' | ||
import { codec as carCodec } from '@ucanto/transport/car' | ||
|
||
export { Blob, Index, Store, Upload, UnixFS, CAR } | ||
export * from './sharding.js' | ||
export { receiptsEndpoint } from './service.js' | ||
|
||
/** | ||
* Uploads a file to the service and returns the root data CID for the | ||
|
@@ -144,9 +144,9 @@ async function uploadBlockStream( | |
async transform(car, controller) { | ||
const bytes = new Uint8Array(await car.arrayBuffer()) | ||
// Invoke blob/add and write bytes to write target | ||
const multihash = await Blob.add(conf, bytes, options) | ||
const { multihash } = await Blob.add(conf, bytes, options) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's make sure to create an issue where we can follow up to have user make the site delegation public by default by invoking the Claim delegation There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. created #1486 👍 |
||
// Should this be raw instead? | ||
const cid = Link.create(carCodec.code, multihash) | ||
const cid = Link.create(CAR.code, multihash) | ||
let piece | ||
if (pieceHasher) { | ||
const multihashDigest = await pieceHasher.digest(bytes) | ||
|
@@ -199,20 +199,15 @@ async function uploadBlockStream( | |
/* c8 ignore next */ | ||
if (!root) throw new Error('missing root CID') | ||
|
||
const index = ShardedDAGIndex.create(root) | ||
for (const [i, shard] of shards.entries()) { | ||
const slices = shardIndexes[i] | ||
index.shards.set(shard.multihash, slices) | ||
} | ||
const indexBytes = await index.archive() | ||
const indexBytes = await indexShardedDAG(root, shards, shardIndexes) | ||
/* c8 ignore next 3 */ | ||
if (!indexBytes.ok) { | ||
throw new Error('failed to archive DAG index', { cause: indexBytes.error }) | ||
} | ||
|
||
// Store the index in the space | ||
const indexDigest = await Blob.add(conf, indexBytes.ok, options) | ||
const indexLink = Link.create(carCodec.code, indexDigest) | ||
const { multihash } = await Blob.add(conf, indexBytes.ok, options) | ||
const indexLink = Link.create(CAR.code, multihash) | ||
|
||
// Register the index with the service | ||
await Index.add(conf, indexLink, options) | ||
|
@@ -221,3 +216,19 @@ async function uploadBlockStream( | |
|
||
return root | ||
} | ||
|
||
/** | ||
* Indexes a sharded DAG | ||
* | ||
* @param {import('multiformats').Link} root | ||
* @param {import('./types.js').CARLink[]} shards | ||
* @param {Array<Map<import('./types.js').SliceDigest, import('./types.js').Position>>} shardIndexes | ||
*/ | ||
export async function indexShardedDAG(root, shards, shardIndexes) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be an export from the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed |
||
const index = ShardedDAGIndex.create(root) | ||
for (const [i, shard] of shards.entries()) { | ||
const slices = shardIndexes[i] | ||
index.shards.set(shard.multihash, slices) | ||
} | ||
return await index.archive() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we take this out to avoid a
upload-api
release?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed