Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dag-protobuf): cache dag pb directory structure and block indexes #147

Merged
merged 9 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions src/bindings.d.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
import { CID } from '@web3-storage/gateway-lib/handlers'
import { Environment as RateLimiterEnvironment } from './middleware/withRateLimit.types.ts'
import { Environment as CarParkFetchEnvironment } from './middleware/withCarParkFetch.types.ts'
import { Environment as CarBlockEnvironment } from './middleware/withCarBlockHandler.types.ts'
import { Environment as ContentClaimsDagulaEnvironment } from './middleware/withCarBlockHandler.types.ts'
import { Environment as ContentClaimsDagulaEnvironment } from './middleware/withContentClaimsDagula.types.ts'
import { Environment as EgressTrackerEnvironment } from './middleware/withEgressTracker.types.ts'
import { Environment as EgressClientEnvironment } from './middleware/withEgressClient.types.ts'
import { Environment as GatewayIdentityEnvironment } from './middleware/withGatewayIdentity.types.ts'
import { Environment as DelegationsStorageEnvironment } from './middleware/withDelegationsStorage.types.ts'
import { Environment as LocatorEnvironment } from './middleware/withLocator.types.ts'
import { UnknownLink } from 'multiformats'
import { DIDKey } from '@ucanto/principal/ed25519'

export interface Environment
extends CarBlockEnvironment,
RateLimiterEnvironment,
ContentClaimsDagulaEnvironment,
EgressTrackerEnvironment {
extends RateLimiterEnvironment,
CarBlockEnvironment,
CarParkFetchEnvironment,
ContentClaimsDagulaEnvironment,
EgressClientEnvironment,
EgressTrackerEnvironment,
GatewayIdentityEnvironment,
DelegationsStorageEnvironment,
LocatorEnvironment {
VERSION: string
CONTENT_CLAIMS_SERVICE_URL?: string
HONEYCOMB_API_KEY: string
Expand Down
89 changes: 89 additions & 0 deletions src/middleware/withContentClaimsDagula.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Dagula } from 'dagula'
import { base58btc } from 'multiformats/bases/base58'
import * as BatchingFetcher from '@web3-storage/blob-fetcher/fetcher/batching'
import * as dagPb from '@ipld/dag-pb'

/**
* @import {
Expand All @@ -16,6 +18,7 @@ import * as BatchingFetcher from '@web3-storage/blob-fetcher/fetcher/batching'

/**
* Creates a dagula instance backed by content claims.
* Get operations for DAG Protobuf content are cached if the DAGPB_CONTENT_CACHE is enabled.
*
* @type {(
* Middleware<
Expand All @@ -31,6 +34,10 @@ export function withContentClaimsDagula (handler) {
const fetcher = BatchingFetcher.create(locator, ctx.fetch)
const dagula = new Dagula({
async get (cid) {
const dagPbContent = await getDagPbContent(env, fetcher, cid, ctx)
if (dagPbContent) {
return dagPbContent
}
const res = await fetcher.fetch(cid.multihash)
return res.ok ? { cid, bytes: await res.ok.bytes() } : undefined
},
Expand All @@ -46,3 +53,85 @@ export function withContentClaimsDagula (handler) {
return handler(request, env, { ...ctx, blocks: dagula, dag: dagula, unixfs: dagula })
}
}

/**
* Returns the cached DAG Protobuf bytes if they exist, otherwise fetches the DAG Protobuf bytes
* from the fetcher and caches them in the KV store.
*
* @param {Environment} env
* @param {import('@web3-storage/blob-fetcher').Fetcher} fetcher
* @param {import('multiformats').UnknownLink} cid
* @param {import('@web3-storage/gateway-lib').Context} ctx
* @returns {Promise<{ cid: import('multiformats').UnknownLink, bytes: Uint8Array } | undefined>}
*/
async function getDagPbContent (env, fetcher, cid, ctx) {
if (env.FF_DAGPB_CONTENT_CACHE_ENABLED === 'true' && cid.code === dagPb.code) {
const cachedBytes = await getCachedDagPbBytes(env, cid)
if (cachedBytes) {
return { cid, bytes: cachedBytes }
}

const res = await fetcher.fetch(cid.multihash)
if (res.ok) {
const bytes = await res.ok.bytes()
const dagPbNode = dagPb.decode(bytes)
if (dagPbNode.Links && dagPbNode.Links.length === 0) {
// Old DAG PB nodes have no links ("raw" blocks as leaves), so we don't want to cache them
return { cid, bytes }
}
ctx.waitUntil(cacheDagPbBytes(env, cid, bytes))
return { cid, bytes }
}
}
return undefined
}

/**
* Caches the DAG Protobuf content into the KV store if the content size is less than or equal to the max size.
* The content is cached for the duration of the TTL (seconds), if the TTL is not set, the content is cached indefinitely.
*
* @param {Environment} env
* @param {import('multiformats').UnknownLink} cid
* @param {Uint8Array} bytes
* @returns {Promise<void>}
*/
async function cacheDagPbBytes (env, cid, bytes) {
const maxSize = env.FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB ? parseInt(env.FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB) * 1024 * 1024 : undefined
if (maxSize && bytes.length <= maxSize) {
try {
const ttlSeconds = env.FF_DAGPB_CONTENT_CACHE_TTL_SECONDS ? parseInt(env.FF_DAGPB_CONTENT_CACHE_TTL_SECONDS) : 0
const key = getDagPbKey(cid)
await env.DAGPB_CONTENT_CACHE.put(key, bytes, {
expirationTtl: ttlSeconds > 60 ? ttlSeconds : undefined
})
} catch (/** @type {any} */ error) {
console.error(error)
}
}
}

/**
* Returns the cached DAG Protobuf bytes if they exist, otherwise returns null.
*
* @param {Environment} env
* @param {import('multiformats').UnknownLink} cid
* @returns {Promise<Uint8Array | null>}
*/
async function getCachedDagPbBytes (env, cid) {
const key = getDagPbKey(cid)
const dagPbBytes = await env.DAGPB_CONTENT_CACHE.get(key, 'arrayBuffer')
if (dagPbBytes) {
return new Uint8Array(dagPbBytes)
}
return null
}

/**
* Returns the base58btc encoded key for the DAG Protobuf content in the KV store.
*
* @param {import('multiformats').UnknownLink} cid
* @returns {string}
*/
function getDagPbKey (cid) {
return base58btc.encode(cid.multihash.bytes)
}
19 changes: 19 additions & 0 deletions src/middleware/withContentClaimsDagula.types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
import { KVNamespace } from '@cloudflare/workers-types'
import { Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib'

export interface Environment extends MiddlewareEnvironment {
CONTENT_CLAIMS_SERVICE_URL?: string
/**
* The KV namespace that stores the DAGPB content cache.
*/
DAGPB_CONTENT_CACHE: KVNamespace
/**
* The number that represents when to expire the key-value pair in seconds from now.
* The minimum value is 60 seconds. Any value less than 60MB will not be used.
*/
FF_DAGPB_CONTENT_CACHE_TTL_SECONDS?: string
/**
* The maximum size of the key-value pair in MB.
* The minimum value is 1 MB. Any value less than 1MB will not be used.
*/
FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB?: string
/**
* The flag that enables the DAGPB content cache.
*/
FF_DAGPB_CONTENT_CACHE_ENABLED: string
}
108 changes: 103 additions & 5 deletions test/miniflare/freeway.spec.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { describe, before, beforeEach, after, it } from 'node:test'
import assert from 'node:assert'
import { randomBytes } from 'node:crypto'
import { Miniflare } from 'miniflare'
import { Log, LogLevel, Miniflare } from 'miniflare'
import * as Link from 'multiformats/link'
import { sha256 } from 'multiformats/hashes/sha2'
import * as raw from 'multiformats/codecs/raw'
Expand Down Expand Up @@ -49,17 +49,26 @@ describe('freeway', () => {
const { port } = server.address()
url = new URL(`http://127.0.0.1:${port}`)
miniflare = new Miniflare({
host: '127.0.0.1',
port: 8787,
inspectorPort: 9898,
log: new Log(LogLevel.INFO),
cache: false, // Disable Worker Global Cache to test cache middlewares
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't disable this global cache in test the requests won't reach the Content Claim Dagula middleware where we have the KV cache.

bindings: {
CONTENT_CLAIMS_SERVICE_URL: claimsService.url.toString(),
CARPARK_PUBLIC_BUCKET_URL: url.toString(),
GATEWAY_SERVICE_DID: 'did:example:gateway'
GATEWAY_SERVICE_DID: 'did:example:gateway',
DAGPB_CONTENT_CACHE: 'DAGPB_CONTENT_CACHE',
FF_DAGPB_CONTENT_CACHE_ENABLED: 'true',
FF_DAGPB_CONTENT_CACHE_TTL_SECONDS: 300,
FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB: 2
},
inspectorPort: 9898,
scriptPath: 'dist/worker.mjs',
modules: true,
compatibilityFlags: ['nodejs_compat'],
compatibilityDate: '2024-09-23',
r2Buckets: ['CARPARK']
r2Buckets: ['CARPARK'],
kvNamespaces: ['DAGPB_CONTENT_CACHE']
})

bucket = await miniflare.getR2Bucket('CARPARK')
Expand All @@ -70,10 +79,15 @@ describe('freeway', () => {
builder = new Builder(bucket)
})

beforeEach(() => {
beforeEach(async () => {
claimsService.resetCallCount()
claimsService.resetClaims()
bucketService.resetCallCount()
const dagpbCache = await miniflare.getKVNamespace('DAGPB_CONTENT_CACHE')
const keys = await dagpbCache.list()
for (const key of keys.keys) {
await dagpbCache.delete(key.name)
}
})

after(() => {
Expand Down Expand Up @@ -481,4 +495,88 @@ describe('freeway', () => {
}
}))
})

it('should be faster to get a file in a directory when the protobuf directory structure is cached', async () => {
// Generate 3 files wrapped in a folder, >2MB each to force a unixfs file header block (dag protobuf)
const input = [
new File([randomBytes(2_050_550)], 'data.txt'),
new File([randomBytes(2_050_550)], 'image.png'),
new File([randomBytes(2_050_550)], 'image2.png')
]
// Adding to the builder will generate the unixfs file header block
const { root, shards } = await builder.add(input)
assert.equal(root.code, 112, 'Root should be a protobuf directory code 112')

// Generate claims for the shards
for (const shard of shards) {
const location = new URL(toBlobKey(shard.multihash), url)
const res = await fetch(location)
assert(res.body)
const claims = await generateBlockLocationClaims(claimsService.signer, shard, res.body, location)
claimsService.addClaims(claims)
}

// Check that the cache is empty
const dagpb = await miniflare.getKVNamespace('DAGPB_CONTENT_CACHE')
const cachedContent1 = await dagpb.list()
assert.equal(cachedContent1.keys.length, 0, 'Cache should be empty')

// First request adds the file to the cache, so it takes longer
const start = performance.now()
const res = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${root}/${input[2].name}`, {
headers: {
'Cache-Control': 'no-cache'
}
})
if (!res.ok) assert.fail(`unexpected response: ${await res.text()}`)
const end = performance.now()
assertBlobEqual(input[2], await res.blob())

const cachedContent2 = await dagpb.list()
assert(cachedContent2.keys.length > 0, 'Cache should have one or more keys')

// Second request retrieves the file from the cache, so it should take less time than the first request
const start2 = performance.now()
console.log('SECOND REQUEST')
const res2 = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${root}/${input[2].name}`, {
headers: {
'Cache-Control': 'no-cache'
}
})
if (!res2.ok) assert.fail(`unexpected response: ${await res2.text()}`)
const end2 = performance.now()
assertBlobEqual(input[2], await res2.blob())
assert(end2 - start2 < end - start, 'Second request should take less time than the first request')
})

it('should not cache content if it is not dag protobuf content', async () => {
// Generate 1 file, >1MB each and do not wrap it in a folder
const input = new File([randomBytes(256)], 'data.txt')
const { root, shards } = await builder.add(input)
assert.equal(root.code, 85, 'Root should be a raw file code 85')

// Generate claims for the shards
for (const shard of shards) {
const location = new URL(toBlobKey(shard.multihash), url)
const res = await fetch(location)
assert(res.body)
const claims = await generateBlockLocationClaims(claimsService.signer, shard, res.body, location)
claimsService.addClaims(claims)
}

// Check that the cache is empty
const dagpb = await miniflare.getKVNamespace('DAGPB_CONTENT_CACHE')
const cachedContent = await dagpb.list()
assert.equal(cachedContent.keys.length, 0, 'Cache should be empty')

// It should not add the file to the cache, because it is not dag protobuf content
const res = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${root}`, {
headers: {
'Cache-Control': 'no-cache'
}
})
if (!res.ok) assert.fail(`unexpected response: ${await res.text()}`)
assertBlobEqual(input, await res.blob())
assert.equal(cachedContent.keys.length, 0, 'Cache should be empty')
})
})
Loading
Loading