Skip to content

Commit

Permalink
feat(upload-api)!: integrate agent store for idempotence & invocation…
Browse files Browse the repository at this point in the history
…/receipt persistence (storacha#1444)

This is a draft that I could not finished on Friday, but I thought I'd
share in case @vasco-santos or @alanshaw are looking into related stuff
on Monday.

PR introduces AgentStore that is used for writing ucanto AgentMessage's
from which receipts and invocations get indexed. This store is then
consulted by a service during invocations to provide idempotence or more
simply to return receipt without executing a task if we already have
one.

Main goal here is to take out custom logic out of w3infra and to remove
need for manual receipt / invocation storing in the handlers.


⚠️ It is worth calling out that this is significantly changes how things
work. In the past sending same invocation used to re-run the handler,
with this change that is no longer the case and you'll get a same
receipt back (which is a neat way to lookup receipts actually). However
that also means that bunch of our tests were affected. I don't expect it
would affect real world cases however, because same invocation send
second later will have unique CID (due to how we generate expiry)
however it does affect tests because we don't wait a second between
calls. That is why nonce's were added in a lot of places, but that is
not something I expect users would have to do.

---------

Co-authored-by: Joao Andrade <sousa.andrade.joao@gmail.com>
Co-authored-by: Alan Shaw <alan.shaw@protocol.ai>
Co-authored-by: Vasco Santos <santos.vasco10@gmail.com>
  • Loading branch information
4 people authored May 16, 2024
1 parent 92f3017 commit c9bf33e
Show file tree
Hide file tree
Showing 56 changed files with 9,874 additions and 7,354 deletions.
4 changes: 3 additions & 1 deletion packages/access-client/src/agent-use-cases.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,19 @@ export async function requestAccess(access, account, capabilities) {
* @param {AccessAgent} access
* @param {API.DID} [audienceOfClaimedDelegations] - audience of claimed delegations. defaults to access.connection.id.did()
* @param {object} opts
* @param {string} [opts.nonce] - nonce to use for the claim
* @param {boolean} [opts.addProofs] - whether to addProof to access agent
* @returns
*/
export async function claimAccess(
access,
audienceOfClaimedDelegations = access.connection.id.did(),
{ addProofs = false } = {}
{ addProofs = false, nonce } = {}
) {
const res = await access.invokeAndExecute(Access.claim, {
audience: access.connection.id,
with: audienceOfClaimedDelegations,
nonce,
})
if (res.out.error) {
throw res.out.error
Expand Down
6 changes: 5 additions & 1 deletion packages/access-client/src/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ export class Agent {
}),
issuer: this.issuer,
proofs: [...proofs],
nonce: options.nonce,
})

return /** @type {API.IssuedInvocationView<API.InferInvokedCapability<CAP>>} */ (
Expand All @@ -592,13 +593,16 @@ export class Agent {
* Get Space information from Access service
*
* @param {API.URI<"did:">} [space]
* @param {object} [options]
* @param {string} [options.nonce]
*/
async getSpaceInfo(space) {
async getSpaceInfo(space, options) {
const _space = space || this.currentSpace()
if (!_space) {
throw new Error('No space selected, you need pass a resource.')
}
const inv = await this.invokeAndExecute(Capabilities.info, {
...options,
with: _space,
})

Expand Down
2 changes: 1 addition & 1 deletion packages/filecoin-api/src/storefront/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
UpdatableAndQueryableStore,
Queue,
ServiceConfig,
StoreGetError
StoreGetError,
} from '../types.js'

export type PieceStore = UpdatableAndQueryableStore<
Expand Down
1 change: 0 additions & 1 deletion packages/upload-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@
"is-subset": "^0.1.1",
"mocha": "^10.2.0",
"one-webcrypto": "git://github.com/web3-storage/one-webcrypto",
"p-defer": "^4.0.1",
"typescript": "5.2.2"
},
"eslintConfig": {
Expand Down
91 changes: 89 additions & 2 deletions packages/upload-api/src/blob/accept.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ import * as DID from '@ipld/dag-ucan/did'
import * as W3sBlob from '@web3-storage/capabilities/web3.storage/blob'
import { Assert } from '@web3-storage/content-claims/capability'
import { create as createLink } from 'multiformats/link'
import { Invocation } from '@ucanto/core'
import * as Digest from 'multiformats/hashes/digest'
import { code as rawCode } from 'multiformats/codecs/raw'
import * as API from '../types.js'
import { AllocatedMemoryHadNotBeenWrittenTo } from './lib.js'
import {
AllocatedMemoryHadNotBeenWrittenTo,
UnsupportedCapability,
} from './lib.js'
import * as HTTP from '@web3-storage/capabilities/http'

/**
* @param {API.W3ServiceContext} context
Expand All @@ -16,6 +21,13 @@ export function blobAcceptProvider(context) {
return Server.provideAdvanced({
capability: W3sBlob.accept,
handler: async ({ capability }) => {
// Only service principal can perform an allocation
if (capability.with !== context.id.did()) {
return {
error: new UnsupportedCapability({ capability }),
}
}

const { blob, space } = capability.nb
// If blob is not stored, we must fail
const hasBlob = await context.blobsStorage.has(blob.digest)
Expand All @@ -29,7 +41,9 @@ export function blobAcceptProvider(context) {

const digest = Digest.decode(blob.digest)
const content = createLink(rawCode, digest)
const createUrl = await context.blobsStorage.createDownloadUrl(digest.bytes)
const createUrl = await context.blobsStorage.createDownloadUrl(
digest.bytes
)
if (createUrl.error) {
return createUrl
}
Expand All @@ -55,3 +69,76 @@ export function blobAcceptProvider(context) {
},
})
}

/**
* Polls `blob/accept` task whenever we receive a receipt. It may error if passed
* receipt is for `http/put` task that refers to the `blob/allocate` that we
* are unable to find.
*
* @param {API.ConcludeServiceContext} context
* @param {API.Receipt} receipt
* @returns {Promise<API.Result<{}, API.StorageGetError>>}
*/
export const poll = async (context, receipt) => {
const ran = Invocation.isInvocation(receipt.ran)
? { ok: receipt.ran }
: await context.agentStore.invocations.get(receipt.ran)

// If can not find an invocation for this receipt there is nothing to do here,
// if it was receipt for `http/put` we would have invocation record.
if (!ran.ok) {
return { ok: {} }
}

// Detect if this receipt is for an `http/put` invocation
const put = /** @type {?API.HTTPPut} */ (
ran.ok.capabilities.find(({ can }) => can === HTTP.put.can)
)

// If it's not an http/put invocation nothing to do here.
if (put == null) {
return { ok: {} }
}

// Otherwise we are going to lookup allocation corresponding to this http/put
// in order to issue blob/accept.
const [, allocation] = /** @type {API.UCANAwait} */ (put.nb.url)['ucan/await']
const result = await context.agentStore.invocations.get(allocation)
// If could not find blob/allocate invocation there is something wrong in
// the system and we return error so it could be propagated to the user. It is
// not a proper solution as user can not really do anything, but still seems
// better than silently ignoring, this way user has a chance to report a
// problem. Client test could potentially also catch errors.
if (result.error) {
return result
}

const [allocate] = /** @type {[API.BlobAllocate]} */ (result.ok.capabilities)

// If this is a receipt for the http/put we will perform blob/accept.
const blobAccept = await W3sBlob.accept.invoke({
issuer: context.id,
audience: context.id,
with: context.id.did(),
nb: {
blob: put.nb.body,
space: allocate.nb.space,
_put: {
'ucan/await': ['.out.ok', receipt.ran.link()],
},
},
// ⚠️ We need invocation to be deterministic which is why we use exact
// same as it is on allocation which will guarantee that expiry is the
// same regardless when we received `http/put` receipt.
//
// ℹ️ This works around the fact that we index receipts by invocation link
// as opposed to task link which would not care about the expiration.
expiration: result.ok.expiration,
})

// We do not care about the result we just want receipt to be issued and
// stored.
await blobAccept.execute(context.getServiceConnection())

return { ok: {} }
}
Loading

0 comments on commit c9bf33e

Please sign in to comment.