From 3f5fdb3d1a59ebcd83a0f4f3241f127bbe8c9829 Mon Sep 17 00:00:00 2001 From: Natalie Bravo Date: Tue, 11 Feb 2025 08:00:36 -0300 Subject: [PATCH] refactor: remove UCAN stream handler and insert deltas directly into space-diff table (#458) This PR addresses the objectives outlined in the issue https://github.com/storacha/project-tracking/issues/304 Please refer to the issue for detailed context. **TODO**: - [x] Remove `ucan-stream` (billing) - [x] Insert space usage delta into the diff table in `register` - [x] Insert space usage delta into the diff table in `deregister` - [x] Use `TransactWriteItems` to ensure atomic write transactions - [x] Ensure tests are passing --- billing/functions/ucan-stream.js | 115 ----- billing/lib/api.ts | 2 - billing/lib/ucan-stream.js | 174 ------- billing/test/helpers/context.js | 18 - billing/test/lib.ucan-stream.spec.js | 4 - billing/test/lib/api.ts | 7 - billing/test/lib/ucan-stream.js | 247 ---------- package-lock.json | 27 +- stacks/billing-stack.js | 36 +- .../functions/ucan-invocation-router.js | 2 +- upload-api/package.json | 2 +- upload-api/stores/blob-registry.js | 423 ++++++++++++------ upload-api/test/helpers/ucan.js | 6 +- 13 files changed, 310 insertions(+), 753 deletions(-) delete mode 100644 billing/functions/ucan-stream.js delete mode 100644 billing/lib/ucan-stream.js delete mode 100644 billing/test/lib.ucan-stream.spec.js delete mode 100644 billing/test/lib/ucan-stream.js diff --git a/billing/functions/ucan-stream.js b/billing/functions/ucan-stream.js deleted file mode 100644 index d0f30160..00000000 --- a/billing/functions/ucan-stream.js +++ /dev/null @@ -1,115 +0,0 @@ -import * as Sentry from '@sentry/serverless' -import { toString, fromString } from 'uint8arrays' -import * as Link from 'multiformats/link' -import { LRUCache } from 'lru-cache' -import { createSpaceDiffStore } from '../tables/space-diff.js' -import { createConsumerStore } from '../tables/consumer.js' -import { expect } from './lib.js' -import { findSpaceUsageDeltas, storeSpaceUsageDeltas } from '../lib/ucan-stream.js' -import { mustGetEnv } from '../../lib/env.js' - -Sentry.AWSLambda.init({ - environment: process.env.SST_STAGE, - dsn: process.env.SENTRY_DSN, - tracesSampleRate: 0 -}) - -/** - * @typedef {{ - * spaceDiffTable?: string - * consumerTable?: string - * region?: 'us-west-2'|'us-east-2' - * }} CustomHandlerContext - */ - -export const handler = Sentry.AWSLambda.wrapHandler( - /** - * @param {import('aws-lambda').KinesisStreamEvent} event - * @param {import('aws-lambda').Context} context - */ - async (event, context) => { - /** @type {CustomHandlerContext|undefined} */ - const customContext = context?.clientContext?.Custom - const spaceDiffTable = customContext?.spaceDiffTable ?? mustGetEnv('SPACE_DIFF_TABLE_NAME') - const consumerTable = customContext?.consumerTable ?? mustGetEnv('CONSUMER_TABLE_NAME') - const region = customContext?.region ?? mustGetEnv('AWS_REGION') - - const messages = parseUcanStreamEvent(event) - const deltas = findSpaceUsageDeltas(messages) - if (!deltas.length) { - console.log("No messages found that contain space usage deltas", "capabilities", messages[0].value.att.map((att) => att.can), "resources", messages[0].value.att.map((att) => att.with) ) - return - } - console.log(`Storing ${deltas.length} space usage deltas`) - - const consumerStore = createConsumerStore({ region }, { tableName: consumerTable }) - const spaceDiffStore = createSpaceDiffStore({ region }, { tableName: spaceDiffTable }) - const ctx = { spaceDiffStore, consumerStore: withConsumerListCache(consumerStore) } - expect(await storeSpaceUsageDeltas(deltas, ctx), 'storing space usage deltas') - } -) - -/** - * @param {import('aws-lambda').KinesisStreamEvent} event - * @returns {import('../lib/api.js').UcanStreamMessage[]} - */ -const parseUcanStreamEvent = event => { - const batch = event.Records.map(r => fromString(r.kinesis.data, 'base64')) - return batch.map(b => { - const json = JSON.parse(toString(b, 'utf8')) - if (json.type === 'receipt') { - return { - type: 'receipt', - value: { ...json.value, cid: Link.parse(json.value.cid) }, - carCid: Link.parse(json.carCid), - invocationCid: Link.parse(json.invocationCid), - out: json.out, - ts: new Date(json.ts) - } - } else if (json.type === 'workflow') { - return { - type: 'workflow', - value: { ...json.value, cid: Link.parse(json.value.cid) }, - carCid: Link.parse(json.carCid), - ts: new Date(json.ts) - } - } else { - throw new Error(`unknown message type: ${json.type}`) - } - }) -} - -/** - * This means that if a subscription for a space changes, there's a 5 minute - * (max) period where writes may be attributed to the previous subscription. - * - * This happens very infrequently, and DynamoDB is _already_ eventually - * consistent on read so we're just pushing out this delay a little more to - * be able to process data for spaces with frequent writes a lot quicker. - */ -const CONSUMER_LIST_CACHE_TTL = 1000 * 60 * 5 -const CONSUMER_LIST_CACHE_MAX = 10_000 - -/** - * @param {import('../lib/api.js').ConsumerStore} consumerStore - * @returns {import('../lib/api.js').ConsumerStore} - */ -const withConsumerListCache = (consumerStore) => { - /** @type {LRUCache>>} */ - const cache = new LRUCache({ - max: CONSUMER_LIST_CACHE_MAX, - ttl: CONSUMER_LIST_CACHE_TTL - }) - return { - ...consumerStore, - async list (key, options) { - const cacheKeySuffix = options ? `?cursor=${options.cursor}&size=${options.size}` : '' - const cacheKey = `${key.consumer}${cacheKeySuffix}` - const cached = cache.get(cacheKey) - if (cached) return cached - const res = await consumerStore.list(key, options) - if (res.ok) cache.set(cacheKey, res) - return res - } - } -} diff --git a/billing/lib/api.ts b/billing/lib/api.ts index 0b07b5c9..6a9248bd 100644 --- a/billing/lib/api.ts +++ b/billing/lib/api.ts @@ -336,8 +336,6 @@ export interface UcanWorkflowMessage exte type: 'workflow' } -export type UcanStreamMessage = UcanWorkflowMessage | UcanReceiptMessage - // Utility //////////////////////////////////////////////////////////////////// export type ConsumerDID = DID diff --git a/billing/lib/ucan-stream.js b/billing/lib/ucan-stream.js deleted file mode 100644 index 455e8425..00000000 --- a/billing/lib/ucan-stream.js +++ /dev/null @@ -1,174 +0,0 @@ -import * as BlobCaps from '@storacha/capabilities/blob' -import * as SpaceBlobCaps from '@storacha/capabilities/space/blob' -import * as StoreCaps from '@storacha/capabilities/store' -import * as DID from '@ipld/dag-ucan/did' - -/** - * Filters UCAN stream messages that are receipts for invocations that alter - * the store size for a resource and extracts the relevant information about - * the delta. - * - * @param {import('./api.js').UcanStreamMessage[]} messages - * @returns {import('./api.js').UsageDelta[]} - */ -export const findSpaceUsageDeltas = messages => { - const deltas = [] - for (const message of messages) { - if (!isReceipt(message)) continue - - /** @type {import('@ucanto/interface').DID|undefined} */ - let resource - /** @type {number|undefined} */ - let size - if (isReceiptForCapability(message, BlobCaps.accept) && isBlobAcceptSuccess(message.out)) { - resource = DID.decode(message.value.att[0].nb?.space ?? new Uint8Array()).did() - size = message.value.att[0].nb?.blob.size - } else if (isReceiptForCapability(message, SpaceBlobCaps.remove) && isSpaceBlobRemoveSuccess(message.out)) { - resource = /** @type {import('@ucanto/interface').DID} */ (message.value.att[0].with) - size = -message.out.ok.size - // TODO: remove me LEGACY store/add - } else if (isReceiptForCapability(message, StoreCaps.add) && isStoreAddSuccess(message.out)) { - resource = /** @type {import('@ucanto/interface').DID} */ (message.value.att[0].with) - size = message.out.ok.allocated - // TODO: remove me LEGACY store/remove - } else if (isReceiptForCapability(message, StoreCaps.remove) && isStoreRemoveSuccess(message.out)) { - resource = /** @type {import('@ucanto/interface').DID} */ (message.value.att[0].with) - size = -message.out.ok.size - } - - // Is message is a repeat store/add for the same shard or not a valid - // store/add or store/remove receipt? - if (resource == null || size == 0 || size == null) { - continue - } - - /** @type {import('./api.js').UsageDelta} */ - const delta = { - resource, - cause: message.invocationCid, - delta: size, - // TODO: use receipt timestamp per https://github.com/web3-storage/w3up/issues/970 - receiptAt: message.ts - } - deltas.push(delta) - } - return deltas -} - -/** - * Attributes a raw usage delta to a customer and stores the collected - * information in the space diff store. - * - * Space diffs are keyed by `customer`, `provider`, `space` and `cause` so - * multiple calls to this function with the same data must not add _another_ - * record to the store. - * - * @param {import('./api.js').UsageDelta[]} deltas - * @param {{ - * spaceDiffStore: import('./api.js').SpaceDiffStore - * consumerStore: import('./api.js').ConsumerStore - * }} ctx - */ -export const storeSpaceUsageDeltas = async (deltas, ctx) => { - const spaceDiffResults = await Promise.all(deltas.map(async delta => { - console.log(`Processing delta for ${delta.resource}`) - const consumerList = await ctx.consumerStore.list({ consumer: delta.resource }) - if (consumerList.error) { - console.error(`Error listing consumers for ${delta.resource}: ${consumerList.error}`) - return consumerList - } - - const diffs = [] - // There should only be one subscription per provider, but in theory you - // could have multiple providers for the same consumer (space). - const consumers = consumerList.ok.results - console.log(`Found ${consumers.length} consumers for ${delta.resource}`) - for (const consumer of consumers) { - diffs.push({ - provider: consumer.provider, - subscription: consumer.subscription, - space: delta.resource, - cause: delta.cause, - delta: delta.delta, - receiptAt: delta.receiptAt, - insertedAt: new Date() - }) - } - console.log(`Total diffs found for ${delta.resource}: ${diffs.length}`) - return { ok: diffs, error: undefined } - })) - - const spaceDiffs = [] - for (const res of spaceDiffResults) { - if (res.error) { - console.error(`Error while processing space diffs: ${res.error}`) - return res - } - spaceDiffs.push(...res.ok) - } - - if (spaceDiffs.length === 0) { - return { ok: 'no space diffs to store', error: undefined } - } - - console.log(`Total space diffs to store: ${spaceDiffs.length}`) - return ctx.spaceDiffStore.batchPut(spaceDiffs) -} - -/** - * @param {import('./api.js').UcanStreamMessage} m - * @returns {m is import('./api.js').UcanReceiptMessage} - */ -const isReceipt = m => m.type === 'receipt' - -/** - * @param {import('@ucanto/interface').Result} r - * @returns {r is { ok: import('@web3-storage/capabilities/types').BlobAcceptSuccess }} - */ -const isBlobAcceptSuccess = (r) => - !r.error && - r.ok != null && - typeof r.ok === 'object' && - 'site' in r.ok && - typeof r.ok.site === 'object' - -/** - * @param {import('@ucanto/interface').Result} r - * @returns {r is { ok: import('@storacha/capabilities/types').SpaceBlobRemoveSuccess }} - */ -const isSpaceBlobRemoveSuccess = r => - !r.error && - r.ok != null && - typeof r.ok === 'object' && - 'size' in r.ok && - (typeof r.ok.size === 'number') - -/** - * @param {import('@ucanto/interface').Result} r - * @returns {r is { ok: import('@storacha/capabilities/types').StoreAddSuccess }} - */ -const isStoreAddSuccess = r => - !r.error && - r.ok != null && - typeof r.ok === 'object' && - 'status' in r.ok && - (r.ok.status === 'done' || r.ok.status === 'upload') - -/** - * @param {import('@ucanto/interface').Result} r - * @returns {r is { ok: import('@storacha/capabilities/types').StoreRemoveSuccess }} - */ -const isStoreRemoveSuccess = r => - !r.error && - r.ok != null && - typeof r.ok === 'object' && - 'size' in r.ok - -/** - * @template {import('@ucanto/interface').Ability} Can - * @template {import('@ucanto/interface').Unit} Caveats - * @param {import('./api.js').UcanReceiptMessage} m - * @param {import('@ucanto/interface').TheCapabilityParser>} cap - * @returns {m is import('./api.js').UcanReceiptMessage<[import('@ucanto/interface').Capability]>} - */ -const isReceiptForCapability = (m, cap) => m.value.att.some(c => c.can === cap.can) diff --git a/billing/test/helpers/context.js b/billing/test/helpers/context.js index a8f98822..f50488ca 100644 --- a/billing/test/helpers/context.js +++ b/billing/test/helpers/context.js @@ -148,24 +148,6 @@ export const createStripeTestContext = async () => { return { customerStore } } -export const createUCANStreamTestContext = async () => { - await createAWSServices() - - const spaceDiffTableName = await createTable(awsServices.dynamo.client, spaceDiffTableProps, 'space-diff-') - const spaceDiffStore = createSpaceDiffStore(awsServices.dynamo.client, { tableName: spaceDiffTableName }) - const consumerTableName = await createTable(awsServices.dynamo.client, consumerTableProps, 'consumer-') - const consumerStore = { - ...createConsumerStore(awsServices.dynamo.client, { tableName: consumerTableName }), - ...createStorePutterClient(awsServices.dynamo.client, { - tableName: consumerTableName, - validate: validateConsumer, // assume test data is valid - encode: encodeConsumer - }) - } - - return { consumerStore, spaceDiffStore } -} - /** * @returns {Promise} */ diff --git a/billing/test/lib.ucan-stream.spec.js b/billing/test/lib.ucan-stream.spec.js deleted file mode 100644 index 30265d75..00000000 --- a/billing/test/lib.ucan-stream.spec.js +++ /dev/null @@ -1,4 +0,0 @@ -import * as UCANStream from './lib/ucan-stream.js' -import { bindTestContext, createUCANStreamTestContext } from './helpers/context.js' - -export const test = bindTestContext(UCANStream.test, createUCANStreamTestContext) diff --git a/billing/test/lib/api.ts b/billing/test/lib/api.ts index 317a8834..700ef108 100644 --- a/billing/test/lib/api.ts +++ b/billing/test/lib/api.ts @@ -47,12 +47,6 @@ export interface StripeTestContext { customerStore: CustomerStore } -export interface UCANStreamTestContext { - spaceDiffStore: SpaceDiffStore - consumerStore: ConsumerStore & StorePutter -} - - export interface EgressTrafficTestContext extends Context { egressTrafficQueue: EgressTrafficQueue & QueueRemover egressTrafficQueueUrl: string @@ -74,7 +68,6 @@ export type TestContext = & CustomerBillingQueueTestContext & SpaceBillingQueueTestContext & StripeTestContext - & UCANStreamTestContext & EgressTrafficTestContext /** QueueRemover can remove items from the head of the queue. */ diff --git a/billing/test/lib/ucan-stream.js b/billing/test/lib/ucan-stream.js deleted file mode 100644 index faecbc42..00000000 --- a/billing/test/lib/ucan-stream.js +++ /dev/null @@ -1,247 +0,0 @@ -import { Schema } from '@ucanto/core' -import * as BlobCaps from '@storacha/capabilities/blob' -import * as SpaceBlobCaps from '@storacha/capabilities/space/blob' -import * as StoreCaps from '@storacha/capabilities/store' -import { parse as parseDID, decode as decodeDID } from '@ipld/dag-ucan/did' -import { findSpaceUsageDeltas, storeSpaceUsageDeltas } from '../../lib/ucan-stream.js' -import { randomConsumer } from '../helpers/consumer.js' -import { randomLink } from '../helpers/dag.js' -import { randomDID, randomDIDKey } from '../helpers/did.js' - -/** @type {import('./api.js').TestSuite} */ -export const test = { - 'should filter UCANs': async (/** @type {import('entail').assert} */ assert, ctx) => { - /** @type {import('../../lib/api.js').UcanStreamMessage[]} */ - const invocations = [{ - type: 'workflow', - carCid: randomLink(), - value: { - att: [{ - with: await randomDID(), - can: StoreCaps.list.can - }], - aud: await randomDID(), - cid: randomLink() - }, - ts: new Date() - }] - - const shard = randomLink() - - /** - * @type {import('../../lib/api.js').UcanReceiptMessage<[ - * | import('@storacha/capabilities/types').BlobAccept - * | import('@storacha/capabilities/types').SpaceBlobRemove - * | import('@storacha/capabilities/types').StoreAdd - * | import('@storacha/capabilities/types').StoreRemove - * ]>[]} - */ - const receipts = [{ - type: 'receipt', - carCid: randomLink(), - invocationCid: randomLink(), - value: { - att: [{ - with: await randomDIDKey(), - can: BlobCaps.accept.can, - nb: { - _put: { - "ucan/await": [ - ".out.ok", - randomLink() - ] - }, - blob: { - digest: randomLink().multihash.bytes, - size: 138 - }, - space: parseDID(await randomDIDKey()) - } - }], - aud: await randomDID(), - cid: randomLink() - }, - out: { ok: { site: randomLink() } }, - ts: new Date() - }, { - type: 'receipt', - carCid: randomLink(), - invocationCid: randomLink(), - value: { - att: [{ - with: await randomDIDKey(), - can: SpaceBlobCaps.remove.can, - nb: { - digest: randomLink().multihash.bytes - } - }], - aud: await randomDID(), - cid: randomLink() - }, - out: { ok: { size: 138 } }, - ts: new Date() - }, { - type: 'receipt', - carCid: randomLink(), - invocationCid: randomLink(), - value: { - att: [{ - with: await randomDIDKey(), - can: StoreCaps.add.can, - nb: { - // @ts-expect-error different CID type per dep versions - link: shard, - size: 138 - } - }], - aud: await randomDID(), - cid: randomLink() - }, - out: { ok: { status: 'upload', allocated: 138 } }, - ts: new Date() - }, { - type: 'receipt', - carCid: randomLink(), - invocationCid: randomLink(), - value: { - att: [{ - with: await randomDIDKey(), - can: StoreCaps.remove.can, - // @ts-expect-error different CID type per dep versions - nb: { link: shard } - }], - aud: await randomDID(), - cid: randomLink() - }, - out: { ok: { size: 138 } }, - ts: new Date() - }] - - const deltas = findSpaceUsageDeltas([...invocations, ...receipts]) - assert.equal(deltas.length, receipts.length) - - // ensure we have a delta for every receipt - for (const r of receipts) { - assert.ok(deltas.some(d => ( - d.cause.toString() === r.invocationCid.toString() && - // resource for blob accept is found in the caveats - (r.value.att[0].can === BlobCaps.accept.can - ? d.resource === decodeDID(r.value.att[0].nb.space).did() - : d.resource === r.value.att[0].with) - ))) - } - }, - 'should store space diffs': async (/** @type {import('entail').assert} */ assert, ctx) => { - const consumer = await randomConsumer() - - await ctx.consumerStore.put(consumer) - - const from = new Date() - - /** @type {import('../../lib/api.js').UcanReceiptMessage<[import('@storacha/capabilities/types').StoreAdd]>[]} */ - const receipts = [{ - type: 'receipt', - carCid: randomLink(), - invocationCid: randomLink(), - value: { - att: [{ - with: Schema.did({ method: 'key' }).from(consumer.consumer), - can: StoreCaps.add.can, - nb: { - // @ts-expect-error different CID type per dep versions - link: randomLink(), - size: 138 - } - }], - aud: consumer.provider, - cid: randomLink() - }, - out: { ok: { status: 'upload', allocated: 138 } }, - ts: new Date(from.getTime() + 1) - }, { - type: 'receipt', - carCid: randomLink(), - invocationCid: randomLink(), - value: { - att: [{ - with: Schema.did({ method: 'key' }).from(consumer.consumer), - can: StoreCaps.add.can, - nb: { - // @ts-expect-error different CID type per dep versions - link: randomLink(), - size: 1138 - } - }], - aud: consumer.provider, - cid: randomLink() - }, - out: { ok: { status: 'upload', allocated: 1138 } }, - ts: new Date(from.getTime() + 2) - }] - - const deltas = findSpaceUsageDeltas(receipts) - const storeDeltasRes = await storeSpaceUsageDeltas(deltas, ctx) - assert.ok(storeDeltasRes.ok) - - const res = await ctx.spaceDiffStore.list({ - provider: consumer.provider, - space: consumer.consumer, - from - }, { size: receipts.length }) - assert.ok(res.ok) - assert.equal(res.ok.results.length, receipts.length) - - // ensure we have a diff for every receipt - for (const r of receipts) { - assert.ok(res.ok.results.some(d => ( - d.cause.toString() === r.invocationCid.toString() && - d.provider === consumer.provider && - d.space === r.value.att[0].with && - d.subscription === consumer.subscription && - d.delta === r.value.att[0].nb?.size - ))) - } - }, - 'should filter non-allocating store/add messages': async (/** @type {import('entail').assert} */ assert, ctx) => { - const consumer = await randomConsumer() - - await ctx.consumerStore.put(consumer) - - const from = new Date() - - /** @type {import('../../lib/api.js').UcanReceiptMessage<[import('@storacha/capabilities/types').StoreAdd]>[]} */ - const receipts = [{ - type: 'receipt', - carCid: randomLink(), - invocationCid: randomLink(), - value: { - att: [{ - with: Schema.did({ method: 'key' }).from(consumer.consumer), - can: StoreCaps.add.can, - nb: { - // @ts-expect-error different CID type per dep versions - link: randomLink(), - size: 138 - } - }], - aud: consumer.provider, - cid: randomLink() - }, - // allocated: 0 indicates this shard was previously stored in this space - out: { ok: { status: 'upload', allocated: 0 } }, - ts: new Date(from.getTime() + 1) - }] - - const deltas = findSpaceUsageDeltas(receipts) - const storeDeltasRes = await storeSpaceUsageDeltas(deltas, ctx) - assert.equal(storeDeltasRes.ok, 'no space diffs to store') - - const res = await ctx.spaceDiffStore.list({ - provider: consumer.provider, - space: consumer.consumer, - from - }, { size: 1000 }) - assert.ok(res.ok) - assert.equal(res.ok.results.length, 0) - } -} diff --git a/package-lock.json b/package-lock.json index 0e932ffd..5a285ffe 100644 --- a/package-lock.json +++ b/package-lock.json @@ -21738,10 +21738,9 @@ } }, "node_modules/@storacha/access": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/@storacha/access/-/access-1.0.1.tgz", - "integrity": "sha512-mcsONnMR8C9mCtd8AKfR4m0QoXrGvt9Qo/oEnpig0VFMIu46129UW/8BsjqWAiZPK7a9R4ic7iWasIMxXQQSeA==", - "license": "(Apache-2.0 OR MIT)", + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/@storacha/access/-/access-1.0.2.tgz", + "integrity": "sha512-BoLF7BQSJhHPOQ6ln8JJgekEJOBKutWfzAGzZxvAHWsbtlKLwwqEJDupMLnlVQOBclRbHUFrssNhgSh+0Uz4Sg==", "dependencies": { "@ipld/car": "^5.1.1", "@ipld/dag-ucan": "^3.4.5", @@ -21760,7 +21759,15 @@ "multiformats": "^13.3.1", "p-defer": "^4.0.0", "type-fest": "^4.9.0", - "uint8arrays": "^4.0.6" + "uint8arrays": "^5.1.0" + } + }, + "node_modules/@storacha/access/node_modules/uint8arrays": { + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.1.0.tgz", + "integrity": "sha512-vA6nFepEmlSKkMBnLBaUMVvAC4G3CTmO58C12y4sq6WPDOR7mOFYOi7GlrQ4djeSbP6JG9Pv9tJDM97PedRSww==", + "dependencies": { + "multiformats": "^13.0.0" } }, "node_modules/@storacha/blob-index": { @@ -21959,11 +21966,11 @@ "license": "MIT" }, "node_modules/@storacha/upload-api": { - "version": "1.3.2", - "resolved": "https://registry.npmjs.org/@storacha/upload-api/-/upload-api-1.3.2.tgz", - "integrity": "sha512-LV1GZfP3LJRCY7pWwwuv7OwmB9sdXTnUjX1dh7Gf8bZ2ABh+TIw4BNJTqzdY+2XG4exe1W9644UjDVCqo7c81A==", + "version": "1.3.3", + "resolved": "https://registry.npmjs.org/@storacha/upload-api/-/upload-api-1.3.3.tgz", + "integrity": "sha512-+WRzBtWfYO8WrQHspbR8GYLCqXqPcMo/J3o/Hsrh9de/MCn2dhgHBCyYMAx1ZzTg8cGPuaSz03fZj2MSTVkwGw==", "dependencies": { - "@storacha/access": "^1.0.1", + "@storacha/access": "^1.0.2", "@storacha/blob-index": "^1.0.1", "@storacha/capabilities": "^1.2.1", "@storacha/did-mailto": "^1.0.1", @@ -63358,7 +63365,7 @@ "@storacha/did-mailto": "^1.0.1", "@storacha/indexing-service-client": "^2.0.0", "@storacha/one-webcrypto": "^1.0.1", - "@storacha/upload-api": "^1.3.2", + "@storacha/upload-api": "^1.3.3", "@ucanto/client": "^9.0.1", "@ucanto/core": "^10.2.1", "@ucanto/interface": "^10.1.1", diff --git a/stacks/billing-stack.js b/stacks/billing-stack.js index c87fa969..1bb16e0d 100644 --- a/stacks/billing-stack.js +++ b/stacks/billing-stack.js @@ -1,7 +1,6 @@ import { use, Cron, Queue, Function, Config, Api } from 'sst/constructs' -import { StartingPosition, FilterCriteria, FilterRule } from 'aws-cdk-lib/aws-lambda' +import { StartingPosition } from 'aws-cdk-lib/aws-lambda' import { Duration } from 'aws-cdk-lib' -import { UcanInvocationStack } from './ucan-invocation-stack.js' import { BillingDbStack } from './billing-db-stack.js' import { UploadDbStack } from './upload-db-stack.js' import { setupSentry, getCustomDomain } from './config.js' @@ -91,39 +90,6 @@ export function BillingStack ({ stack, app }) { schedule: 'cron(0 0 28 * ? *)' // https://crontab.guru/#0_0_1_*_* }) - const { ucanStream } = use(UcanInvocationStack) - - // Lambda that receives UCAN stream events and writes diffs to spaceSizeDiffTable - const ucanStreamHandler = new Function(stack, 'ucan-stream-handler', { - permissions: [spaceDiffTable, consumerTable], - handler: 'billing/functions/ucan-stream.handler', - environment: { - SPACE_DIFF_TABLE_NAME: spaceDiffTable.tableName, - CONSUMER_TABLE_NAME: consumerTable.tableName - } - }) - - ucanStream.addConsumers(stack, { - ucanStreamHandler: { - function: ucanStreamHandler, - cdk: { - eventSource: { - batchSize: 25, // max dynamo BatchWriteItems size - bisectBatchOnError: true, - startingPosition: StartingPosition.LATEST, - filters: [ - FilterCriteria.filter({ - data: { - type: FilterRule.isEqual('receipt') - } - }) - ], - parallelizationFactor: 10 - }, - } - } - }) - // Lambda that sends usage table records to Stripe for invoicing. const usageTableHandler = new Function(stack, 'usage-table-handler', { permissions: [spaceSnapshotTable, spaceDiffTable], diff --git a/upload-api/functions/ucan-invocation-router.js b/upload-api/functions/ucan-invocation-router.js index 93520ab7..89449849 100644 --- a/upload-api/functions/ucan-invocation-router.js +++ b/upload-api/functions/ucan-invocation-router.js @@ -195,7 +195,7 @@ export async function ucanInvocationRouter(request) { }, }) - const blobRegistry = createBlobRegistry(AWS_REGION, blobRegistryTableName, metrics, options) + const blobRegistry = createBlobRegistry(AWS_REGION, blobRegistryTableName, spaceDiffTableName, consumerTableName, metrics, options) const allocationBlobRegistry = createAllocationTableBlobRegistry(blobRegistry, AWS_REGION, allocationTableName, options) const delegationBucket = createDelegationsStore(r2DelegationBucketEndpoint, r2DelegationBucketAccessKeyId, r2DelegationBucketSecretAccessKey, r2DelegationBucketName) const subscriptionTable = createSubscriptionTable(AWS_REGION, subscriptionTableName, options) diff --git a/upload-api/package.json b/upload-api/package.json index 6a0c19ca..9f3d5eb1 100644 --- a/upload-api/package.json +++ b/upload-api/package.json @@ -20,7 +20,7 @@ "@storacha/did-mailto": "^1.0.1", "@storacha/indexing-service-client": "^2.0.0", "@storacha/one-webcrypto": "^1.0.1", - "@storacha/upload-api": "^1.3.2", + "@storacha/upload-api": "^1.3.3", "@ucanto/client": "^9.0.1", "@ucanto/core": "^10.2.1", "@ucanto/interface": "^10.1.1", diff --git a/upload-api/stores/blob-registry.js b/upload-api/stores/blob-registry.js index 7ddfdc8b..0ba89cf3 100644 --- a/upload-api/stores/blob-registry.js +++ b/upload-api/stores/blob-registry.js @@ -3,13 +3,16 @@ import { PutItemCommand, QueryCommand, DeleteItemCommand, + TransactWriteItemsCommand, } from '@aws-sdk/client-dynamodb' -import { marshall, unmarshall } from '@aws-sdk/util-dynamodb' import { ok, error } from '@ucanto/core' -import { EntryNotFound, EntryExists } from '@storacha/upload-api/blob' -import { base58btc } from 'multiformats/bases/base58' import * as Link from 'multiformats/link' import * as Digest from 'multiformats/hashes/digest' +import { base58btc } from 'multiformats/bases/base58' +import { marshall, unmarshall } from '@aws-sdk/util-dynamodb' +import { EntryNotFound, EntryExists } from '@storacha/upload-api/blob' +import { createConsumerStore } from '@storacha/upload-service-infra-billing/tables/consumer.js' + import { getDynamoClient } from '../../lib/aws/dynamo.js' import { METRICS_NAMES, SPACE_METRICS_NAMES } from '../constants.js' @@ -17,7 +20,9 @@ import { METRICS_NAMES, SPACE_METRICS_NAMES } from '../constants.js' /** * @param {string} region - * @param {string} tableName + * @param {string} blobRegistryTableName + * @param {string} spaceDiffTableName + * @param {string} consumerTableName * @param {{ * space: import('../types.js').SpaceMetricsStore * admin: import('../types.js').MetricsStore @@ -26,164 +31,306 @@ import { METRICS_NAMES, SPACE_METRICS_NAMES } from '../constants.js' * @param {string} [options.endpoint] * @returns {BlobAPI.Registry} */ -export const createBlobRegistry = (region, tableName, metrics, options = {}) => { +export const createBlobRegistry = ( + region, + blobRegistryTableName, + spaceDiffTableName, + consumerTableName, + metrics, + options = {} +) => { const dynamoDb = getDynamoClient({ region, endpoint: options.endpoint }) - return useBlobRegistry(dynamoDb, tableName, metrics) + return useBlobRegistry( + dynamoDb, + blobRegistryTableName, + spaceDiffTableName, + consumerTableName, + metrics + ) } /** * @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamoDb - * @param {string} tableName + * @param {string} blobRegistryTableName + * @param {string} spaceDiffTableName + * @param {string} consumerTableName * @param {{ * space: import('../types.js').SpaceMetricsStore * admin: import('../types.js').MetricsStore * }} metrics * @returns {BlobAPI.Registry} */ -export const useBlobRegistry = (dynamoDb, tableName, metrics) => ({ - /** @type {BlobAPI.Registry['find']} */ - async find (space, digest) { - const key = getKey(space, digest) - const cmd = new GetItemCommand({ - TableName: tableName, - Key: key, - }) - - const response = await dynamoDb.send(cmd) - if (!response.Item) { - return error(new EntryNotFound()) +export const useBlobRegistry = ( + dynamoDb, + blobRegistryTableName, + spaceDiffTableName, + consumerTableName, + metrics +) => { + /** + * @typedef {object} DeltaInfo + * @property {import('@storacha/upload-api').DID} space - The space DID that changed size + * @property {string} cause - UCAN invocation that caused the size change. + * @property {number} delta - The size of the blob. + * @property {string} receiptAt - The ISO 8601 timestamp indicating when the receipt was created. + * @property {string} insertedAt - The ISO 8601 timestamp indicating when the entry was inserted. + */ + + const buildSpaceDiffs = async (/** @type DeltaInfo */ deltaInfo) => { + const consumerStore = createConsumerStore( + dynamoDb, + { tableName: consumerTableName } + ) + const consumerList = await consumerStore.list({ consumer: deltaInfo.space }) + if (consumerList.error) { + console.error( + `Error listing consumers for ${deltaInfo.space}: ${consumerList.error}` + ) + return consumerList } - const raw = unmarshall(response.Item) - return ok({ - blob: { - digest: Digest.decode(base58btc.decode(raw.digest)), - size: raw.size - }, - cause: Link.parse(raw.cause).toV1(), - insertedAt: new Date(raw.insertedAt) - }) - }, - - /** @type {BlobAPI.Registry['register']} */ - register: async ({ space, blob, cause }) => { - const item = { - space, - digest: base58btc.encode(blob.digest.bytes), - size: blob.size, - cause: cause.toString(), - insertedAt: new Date().toISOString(), + const diffs = [] + // There should only be one subscription per provider, but in theory you + // could have multiple providers for the same consumer (space). + const consumers = /** @type Record[] */ (consumerList.ok?.results) + console.log(`Found ${consumers.length} consumers for space ${deltaInfo.space}`) + for (const consumer of consumers) { + diffs.push({ + pk:`${consumer.provider}#${deltaInfo.space}`, + sk:`${deltaInfo.receiptAt}#${deltaInfo.cause}`, + provider: consumer.provider, + subscription: consumer.subscription, + ...deltaInfo + }) } - const cmd = new PutItemCommand({ - TableName: tableName, - Item: marshall(item, { removeUndefinedValues: true }), - ConditionExpression: 'attribute_not_exists(#S) AND attribute_not_exists(#D)', - ExpressionAttributeNames: { '#S': 'space', '#D': 'digest' } - }) - - try { - await dynamoDb.send(cmd) - await Promise.all([ - metrics.space.incrementTotals({ - [SPACE_METRICS_NAMES.BLOB_ADD_TOTAL]: [{ space, value: 1 }], - [SPACE_METRICS_NAMES.BLOB_ADD_SIZE_TOTAL]: [{ space, value: blob.size }] - }), - metrics.admin.incrementTotals({ - [METRICS_NAMES.BLOB_ADD_TOTAL]: 1, - [METRICS_NAMES.BLOB_ADD_SIZE_TOTAL]: blob.size - }) - ]) - } catch (/** @type {any} */ err) { - if (err.name === 'ConditionalCheckFailedException') { - return error(new EntryExists()) + console.log( + `Total diffs found for space ${deltaInfo.space}: ${diffs.length}` + ) + return { ok: diffs, error: undefined } + } + + return { + /** @type {BlobAPI.Registry['find']} */ + async find(space, digest) { + const key = getKey(space, digest) + const cmd = new GetItemCommand({ + TableName: blobRegistryTableName, + Key: key, + }) + + const response = await dynamoDb.send(cmd) + if (!response.Item) { + return error(new EntryNotFound()) } - return error(err) - } - return ok({}) - }, - /** @type {BlobAPI.Registry['deregister']} */ - async deregister(space, digest) { - const key = getKey(space, digest) - const cmd = new DeleteItemCommand({ - TableName: tableName, - Key: key, - ConditionExpression: 'attribute_exists(#S) AND attribute_exists(#D)', - ExpressionAttributeNames: { '#S': 'space', '#D': 'digest' }, - ReturnValues: 'ALL_OLD' - }) + const raw = unmarshall(response.Item) + return ok({ + blob: { + digest: Digest.decode(base58btc.decode(raw.digest)), + size: raw.size, + }, + cause: Link.parse(raw.cause).toV1(), + insertedAt: new Date(raw.insertedAt), + }) + }, + + /** @type {BlobAPI.Registry['register']} */ + register: async ({ space, blob, cause }) => { + /** @type {import('@aws-sdk/client-dynamodb').TransactWriteItem[]} */ + const transactWriteItems = [] + const dateNow = new Date().toISOString() + + const blobItem = { + space, + digest: base58btc.encode(blob.digest.bytes), + size: blob.size, + cause: cause.toString(), + insertedAt: dateNow + } - try { - const res = await dynamoDb.send(cmd) + transactWriteItems.push({ + Put: { + TableName: blobRegistryTableName, + Item: marshall(blobItem, { removeUndefinedValues: true }), + ConditionExpression: + 'attribute_not_exists(#S) AND attribute_not_exists(#D)', + ExpressionAttributeNames: { '#S': 'space', '#D': 'digest' } + } + }) + + console.log(`Processing delta for space ${space}`) + + const spaceDiffResults = await buildSpaceDiffs({ + space, + cause: cause.toString(), + delta: blob.size, + receiptAt: dateNow, // TODO: What exactly is the receipt timestamp? Previously, it was generated when the receipt was sent to the stream. + insertedAt: dateNow + }) + + try { + if (spaceDiffResults.error) { + throw new Error( + `Error while processing space diffs: ${spaceDiffResults.error}` + ) + } + + for (const diffItem of spaceDiffResults.ok ?? []) { + transactWriteItems.push({ + Put: { + TableName: spaceDiffTableName, + Item: marshall(diffItem, { removeUndefinedValues: true }) + } + }) + } + + const transactWriteCommand = new TransactWriteItemsCommand({ + TransactItems: transactWriteItems, + }) - if (!res.Attributes) { - throw new Error('missing return values') + await dynamoDb.send(transactWriteCommand) + await Promise.all([ + metrics.space.incrementTotals({ + [SPACE_METRICS_NAMES.BLOB_ADD_TOTAL]: [{ space, value: 1 }], + [SPACE_METRICS_NAMES.BLOB_ADD_SIZE_TOTAL]: [{ space, value: blob.size }] + }), + metrics.admin.incrementTotals({ + [METRICS_NAMES.BLOB_ADD_TOTAL]: 1, + [METRICS_NAMES.BLOB_ADD_SIZE_TOTAL]: blob.size + }) + ]) + } catch (/** @type {any} */ err) { + if (err.name === 'ConditionalCheckFailedException') { + return error(new EntryExists()) + } + return error(err) } - const raw = unmarshall(res.Attributes) - const size = Number(raw.size) - - await Promise.all([ - metrics.space.incrementTotals({ - [SPACE_METRICS_NAMES.BLOB_REMOVE_TOTAL]: [{ space, value: 1 }], - [SPACE_METRICS_NAMES.BLOB_REMOVE_SIZE_TOTAL]: [{ space, value: size }] - }), - metrics.admin.incrementTotals({ - [METRICS_NAMES.BLOB_REMOVE_TOTAL]: 1, - [METRICS_NAMES.BLOB_REMOVE_SIZE_TOTAL]: size - }) - ]) return ok({}) - } catch (/** @type {any} */ err) { - if (err.name === 'ConditionalCheckFailedException') { - return error(new EntryNotFound()) - } - return error(err) - } - }, + }, + + /** @type {BlobAPI.Registry['deregister']} */ + async deregister({space, digest, cause}) { + try { + /** @type {import('@aws-sdk/client-dynamodb').TransactWriteItem[]} */ + const transactWriteItems = [] + const key = getKey(space, digest) + + const getItemCmd = new GetItemCommand({ + TableName: blobRegistryTableName, + Key: key + }) - /** @type {BlobAPI.Registry['entries']} */ - entries: async (space, options = {}) => { - const exclusiveStartKey = options.cursor - ? marshall({ space, digest: options.cursor }) - : undefined + const itemToDelete = await dynamoDb.send(getItemCmd) - const cmd = new QueryCommand({ - TableName: tableName, - Limit: options.size || 20, - KeyConditions: { - space: { - ComparisonOperator: 'EQ', - AttributeValueList: [{ S: space }], - }, - }, - ExclusiveStartKey: exclusiveStartKey, - AttributesToGet: ['digest', 'size', 'cause', 'insertedAt'], - }) - const response = await dynamoDb.send(cmd) + if (!itemToDelete.Item) { + throw new Error('Item does not exist!') + } - const results = - response.Items?.map((i) => toEntry(unmarshall(i))) ?? [] - const firstDigest = results[0] ? base58btc.encode(results[0].blob.digest.bytes) : undefined - // Get cursor of the item where list operation stopped (inclusive). - // This value can be used to start a new operation to continue listing. - const lastKey = - response.LastEvaluatedKey && unmarshall(response.LastEvaluatedKey) - const lastDigest = lastKey ? lastKey.digest : undefined + const blob = unmarshall(itemToDelete.Item) + const blobSize = Number(blob.size) - const before = firstDigest - const after = lastDigest + transactWriteItems.push({ + Delete: { + TableName: blobRegistryTableName, + Key: key, + ConditionExpression: + 'attribute_exists(#S) AND attribute_exists(#D)', + ExpressionAttributeNames: { '#S': 'space', '#D': 'digest' } + } + }) - return { - ok: { - size: results.length, - before, - after, - cursor: after, - results, + console.log(`Processing delta for space ${space}`) + + const dateNow = new Date().toISOString() + const spaceDiffResults = await buildSpaceDiffs({ + space, + cause: cause.toString(), + delta: blobSize, + receiptAt: dateNow, + insertedAt: dateNow + }) + + if (spaceDiffResults.error) { + throw new Error(`Error while processing space diffs: ${spaceDiffResults.error}`) + } + + for (const diffItem of spaceDiffResults.ok ?? []) { + transactWriteItems.push({ + Put: { + TableName: spaceDiffTableName, + Item: marshall(diffItem, { removeUndefinedValues: true }), + }, + }) + } + + const transactWriteCommand = new TransactWriteItemsCommand({ + TransactItems: transactWriteItems, + }) + + await dynamoDb.send(transactWriteCommand) + await Promise.all([ + metrics.space.incrementTotals({ + [SPACE_METRICS_NAMES.BLOB_REMOVE_TOTAL]: [{ space, value: 1 }], + [SPACE_METRICS_NAMES.BLOB_REMOVE_SIZE_TOTAL]: [{ space, value: blobSize }] + }), + metrics.admin.incrementTotals({ + [METRICS_NAMES.BLOB_REMOVE_TOTAL]: 1, + [METRICS_NAMES.BLOB_REMOVE_SIZE_TOTAL]: blobSize + }) + ]) + return ok({}) + } catch (/** @type {any} */ err) { + if (err.name === 'ConditionalCheckFailedException') { + return error(new EntryNotFound()) + } + return error(err) } - } - }, -}) + }, + + /** @type {BlobAPI.Registry['entries']} */ + entries: async (space, options = {}) => { + const exclusiveStartKey = options.cursor + ? marshall({ space, digest: options.cursor }) + : undefined + + const cmd = new QueryCommand({ + TableName: blobRegistryTableName, + Limit: options.size || 20, + KeyConditions: { + space: { + ComparisonOperator: 'EQ', + AttributeValueList: [{ S: space }], + }, + }, + ExclusiveStartKey: exclusiveStartKey, + AttributesToGet: ['digest', 'size', 'cause', 'insertedAt'], + }) + const response = await dynamoDb.send(cmd) + + const results = + response.Items?.map((i) => toEntry(unmarshall(i))) ?? [] + const firstDigest = results[0] ? base58btc.encode(results[0].blob.digest.bytes) : undefined + // Get cursor of the item where list operation stopped (inclusive). + // This value can be used to start a new operation to continue listing. + const lastKey = + response.LastEvaluatedKey && unmarshall(response.LastEvaluatedKey) + const lastDigest = lastKey ? lastKey.digest : undefined + + const before = firstDigest + const after = lastDigest + + return { + ok: { + size: results.length, + before, + after, + cursor: after, + results, + } + } + }, + } +} /** * Upgrade from the db representation @@ -280,7 +427,7 @@ export const useAllocationTableBlobRegistry = (registry, dynamoDb, tableName) => }, /** @type {BlobAPI.Registry['deregister']} */ - async deregister(space, digest) { + async deregister({space, digest, cause}) { const key = getAllocationTableKey(space, digest) const cmd = new DeleteItemCommand({ TableName: tableName, @@ -295,7 +442,7 @@ export const useAllocationTableBlobRegistry = (registry, dynamoDb, tableName) => if (!res.Attributes) { throw new Error('missing return values') } - return registry.deregister(space, digest) + return registry.deregister({space, digest, cause}) } catch (/** @type {any} */ err) { if (err.name === 'ConditionalCheckFailedException') { return error(new EntryNotFound()) diff --git a/upload-api/test/helpers/ucan.js b/upload-api/test/helpers/ucan.js index cd46ca22..593a0e76 100644 --- a/upload-api/test/helpers/ucan.js +++ b/upload-api/test/helpers/ucan.js @@ -37,6 +37,7 @@ import { create as createStorageProvider } from './external-services/storage-pro import { create as createBlobRetriever } from '../../external-services/blob-retriever.js' import { create as createIndexingServiceClient } from './external-services/indexing-service.js' import { useStorageProviderTable } from '../../tables/storage-provider.js' +import { spaceDiffTableProps } from '@storacha/upload-service-infra-billing/tables/space-diff.js' export { API } @@ -235,9 +236,12 @@ export async function executionContextToUcantoTestServerContext(t) { space: useSpaceMetricsStore(dynamo, spaceMetricsTableName), admin: useAdminMetricsStore(dynamo, adminMetricsTableName) } + const consumerTableName = await createTable(dynamo, consumerTableProps) const blobRegistry = useBlobRegistry( dynamo, await createTable(dynamo, blobRegistryTableProps), + await createTable(dynamo, spaceDiffTableProps), + consumerTableName, metrics ) const allocationTableName = await createTable(dynamo, allocationTableProps) @@ -297,7 +301,7 @@ export async function executionContextToUcantoTestServerContext(t) { ); const consumerTable = useConsumerTable( dynamo, - await createTable(dynamo, consumerTableProps) + consumerTableName ); const spaceMetricsTable = useSpaceMetricsTable( dynamo,