Skip to content

Commit

Permalink
feat(middleware): add R2 interceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward committed Jan 4, 2025
1 parent 53c7e28 commit ea1e6b8
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 58 deletions.
14 changes: 8 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
"@ucanto/client": "^9.0.1",
"@ucanto/principal": "^9.0.1",
"@ucanto/transport": "^9.1.1",
"@web3-storage/blob-fetcher": "^2.4.3",
"@web3-storage/blob-fetcher": "^2.4.4-rc.0",
"@web3-storage/capabilities": "^17.4.1",
"@web3-storage/gateway-lib": "^5.1.2",
"dagula": "^8.0.0",
Expand Down
4 changes: 3 additions & 1 deletion src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import {
withLocator,
withUcanInvocationHandler,
withDelegationsStorage,
withDelegationStubs
withDelegationStubs,
withCarParkFetch
} from './middleware/index.js'
import { instrument } from '@microlabs/otel-cf-workers'
import { NoopSpanProcessor } from '@opentelemetry/sdk-trace-base'
Expand Down Expand Up @@ -69,6 +70,7 @@ const middleware = composeMiddleware(
withParsedIpfsUrl,
withAuthToken,
withLocator,
withCarParkFetch,

// TODO: replace this with a handler to fetch the real delegations
withDelegationStubs,
Expand Down
1 change: 1 addition & 0 deletions src/middleware/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export { withRateLimit } from './withRateLimit.js'
export { withVersionHeader } from './withVersionHeader.js'
export { withAuthorizedSpace } from './withAuthorizedSpace.js'
export { withLocator } from './withLocator.js'
export { withCarParkFetch } from './withCarParkFetch.js'
export { withEgressTracker } from './withEgressTracker.js'
export { withEgressClient } from './withEgressClient.js'
export { withDelegationStubs } from './withDelegationStubs.js'
Expand Down
48 changes: 48 additions & 0 deletions src/middleware/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/* eslint-env browser */
/* global FixedLengthStream */
// @ts-expect-error no types
import httpRangeParse from 'http-range-parse'

/**
* Convert a HTTP Range header to a range object.
* @param {string} value
* @returns {import("@cloudflare/workers-types").R2Range}
*/
export function parseRange (value) {
const result = httpRangeParse(value)
if (result.ranges) throw new Error('Multipart ranges not supported')
const { unit, first, last, suffix } = result
if (unit !== 'bytes') throw new Error(`Unsupported range unit: ${unit}`)
return suffix != null
? { suffix }
: { offset: first, length: last != null ? last - first + 1 : undefined }
}

/**
*
* @param {import("@cloudflare/workers-types").R2ObjectBody} obj
* @param {import("@cloudflare/workers-types").R2Range | undefined} range
* @param {Headers} [headers]
* @returns
*/
export const toResponse = (obj, range, headers) => {
const status = range ? 206 : 200
headers = headers || new Headers({})
let contentLength = obj.size
if (range) {
let first, last
if ('suffix' in range) {
first = obj.size - range.suffix
last = obj.size - 1
} else {
first = range.offset || 0
last = range.length != null ? first + range.length - 1 : obj.size - 1
}
headers.set('Content-Range', `bytes ${first}-${last}/${obj.size}`)
contentLength = last - first + 1
}
headers.set('Content-Length', contentLength.toString())

// @ts-expect-error ReadableStream types incompatible
return new Response(obj.body.pipeThrough(new FixedLengthStream(contentLength)), { status, headers })
}
50 changes: 5 additions & 45 deletions src/middleware/withCarBlockHandler.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
/* eslint-env browser */
/* global FixedLengthStream */

import { CAR_CODE } from '../constants.js'
import { HttpError } from '@web3-storage/gateway-lib/util'
// @ts-expect-error no types
import httpRangeParse from 'http-range-parse'
import { base58btc } from 'multiformats/bases/base58'
import { parseRange, toResponse } from './utils.js'

/**
* @import { R2Bucket, KVNamespace, RateLimit } from '@cloudflare/workers-types'
* @import { R2Bucket, KVNamespace, RateLimit, R2Range } from '@cloudflare/workers-types'
* @import {
* IpfsUrlContext,
* Middleware,
Expand All @@ -19,8 +15,6 @@ import { base58btc } from 'multiformats/bases/base58'
* @import { Environment } from './withCarBlockHandler.types.js'
*/

/** @typedef {{ offset: number, length?: number } | { offset?: number, length: number } | { suffix: number }} Range */

/**
* Middleware that will serve CAR files if a CAR codec is found in the path
* CID. If the CID is not a CAR CID it delegates to the next middleware.
Expand Down Expand Up @@ -92,7 +86,7 @@ export async function handleCarBlock (request, env, ctx) {
})
}

/** @type {Range|undefined} */
/** @type {R2Range|undefined} */
let range
if (request.headers.has('range')) {
try {
Expand All @@ -108,32 +102,13 @@ export async function handleCarBlock (request, env, ctx) {
}
if (!obj) throw new HttpError('CAR not found', { status: 404 })

const status = range ? 206 : 200
const headers = new Headers({
return toResponse(obj, range, new Headers({
'Content-Type': 'application/vnd.ipld.car; version=1;',
'X-Content-Type-Options': 'nosniff',
'Cache-Control': 'public, max-age=29030400, immutable',
'Content-Disposition': `attachment; filename="${dataCid}.car"`,
Etag: etag
})

let contentLength = obj.size
if (range) {
let first, last
if ('suffix' in range) {
first = obj.size - range.suffix
last = obj.size - 1
} else {
first = range.offset || 0
last = range.length != null ? first + range.length - 1 : obj.size - 1
}
headers.set('Content-Range', `bytes ${first}-${last}/${obj.size}`)
contentLength = last - first
}
headers.set('Content-Length', contentLength.toString())

// @ts-expect-error ReadableStream types incompatible
return new Response(obj.body.pipeThrough(new FixedLengthStream(contentLength)), { status, headers })
}))
}

/** @param {import('multiformats').UnknownLink} cid */
Expand All @@ -144,18 +119,3 @@ const toBlobKey = digest => {
const mhStr = base58btc.encode(digest.bytes)
return `${mhStr}/${mhStr}.blob`
}

/**
* Convert a HTTP Range header to a range object.
* @param {string} value
* @returns {Range}
*/
function parseRange (value) {
const result = httpRangeParse(value)
if (result.ranges) throw new Error('Multipart ranges not supported')
const { unit, first, last, suffix } = result
if (unit !== 'bytes') throw new Error(`Unsupported range unit: ${unit}`)
return suffix != null
? { suffix }
: { offset: first, length: last != null ? last - first + 1 : undefined }
}
66 changes: 66 additions & 0 deletions src/middleware/withCarParkFetch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { withSimpleSpan } from '@web3-storage/blob-fetcher/tracing/tracing'
import { parseRange, toResponse } from './utils.js'
/**
* @import {
* Middleware,
* Context as MiddlewareContext
* } from '@web3-storage/gateway-lib'
* @import {
* CarParkFetchContext,
* CarParkFetchEnvironment
* } from './withCarParkFetch.types.js'
*/

/**
* Adds {@link CarParkFetchContext.fetch} to the context. This version of fetch
* will pull directly from R2 CARPARK when present
*
* @type {Middleware<CarParkFetchContext, MiddlewareContext, CarParkFetchEnvironment>}
*/
export function withCarParkFetch (handler) {
return async (request, env, ctx) => {
// if carpark public bucket is not set, just use default
if (!env.CARPARK_PUBLIC_BUCKET_URL) {
return handler(request, env, { ...ctx, fetch: globalThis.fetch })
}
/**
*
* @param {globalThis.RequestInfo | URL} input
* @param {globalThis.RequestInit} [init]
* @returns {Promise<globalThis.Response>}
*/
const fetch = async (input, init) => {
const urlString = input instanceof Request ? input.url : input instanceof URL ? input.toString() : input
// check whether request is going to CARPARK
if (env.CARPARK_PUBLIC_BUCKET_URL && urlString.startsWith(env.CARPARK_PUBLIC_BUCKET_URL)) {
// extract carpark key from request
let key = urlString.replace(env.CARPARK_PUBLIC_BUCKET_URL, '')
key = key[0] === '/' ? key.slice(1) : key
// extract headers from request
const headers = input instanceof Request ? input.headers : init?.headers || {}
// extract range header
const rangeHeader = (new Headers(headers)).get('Range')

// extract range if present from range header
/** @type {import('@cloudflare/workers-types').R2GetOptions} */

/** @type {import('@cloudflare/workers-types').R2Range|undefined} */
let range
if (rangeHeader) {
try {
range = parseRange(request.headers.get('range') ?? '')
} catch (err) {
return globalThis.fetch(input, init)
}
}
// fetch directly from carpark
const resp = await withSimpleSpan('carPark.get', env.CARPARK.get, env.CARPARK)(key, { range })

// return a fetch response object from the CARPARK response
return resp == null ? new Response(null, { status: 404 }) : toResponse(resp, range)
}
return globalThis.fetch(input, init)
}
return handler(request, env, { ...ctx, fetch })
}
}
14 changes: 14 additions & 0 deletions src/middleware/withCarParkFetch.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import {
Environment as MiddlewareEnvironment,
Context as MiddlewareContext,
} from '@web3-storage/gateway-lib'
import { R2Bucket } from '@cloudflare/workers-types'

export interface CarParkFetchEnvironment extends MiddlewareEnvironment {
CARPARK: R2Bucket
CARPARK_PUBLIC_BUCKET_URL?: string
}

export interface CarParkFetchContext extends MiddlewareContext {
fetch: typeof globalThis.fetch
}
7 changes: 4 additions & 3 deletions src/middleware/withContentClaimsDagula.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import * as BatchingFetcher from '@web3-storage/blob-fetcher/fetcher/batching'
* Middleware,
* } from '@web3-storage/gateway-lib'
* @import { LocatorContext } from './withLocator.types.js'
* @import { CarParkFetchContext } from './withCarParkFetch.types.js'
* @import { Environment } from './withContentClaimsDagula.types.js'
*/

Expand All @@ -18,16 +19,16 @@ import * as BatchingFetcher from '@web3-storage/blob-fetcher/fetcher/batching'
*
* @type {(
* Middleware<
* BlockContext & DagContext & UnixfsContext & IpfsUrlContext & LocatorContext,
* IpfsUrlContext & LocatorContext,
* BlockContext & DagContext & UnixfsContext & IpfsUrlContext & LocatorContext & CarParkFetchContext,
* IpfsUrlContext & LocatorContext & CarParkFetchContext,
* Environment
* >
* )}
*/
export function withContentClaimsDagula (handler) {
return async (request, env, ctx) => {
const { locator } = ctx
const fetcher = BatchingFetcher.create(locator)
const fetcher = BatchingFetcher.create(locator, ctx.fetch)
const dagula = new Dagula({
async get (cid) {
const res = await fetcher.fetch(cid.multihash)
Expand Down
4 changes: 2 additions & 2 deletions test/miniflare/freeway.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ describe('freeway', () => {

const res = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${shards[0]}`, {
headers: {
Range: `bytes=${rootBlock.blockOffset}-${rootBlock.blockOffset + rootBlock.blockLength}`
Range: `bytes=${rootBlock.blockOffset}-${rootBlock.blockOffset + rootBlock.blockLength - 1}`
}
})
assert(res.ok)
Expand All @@ -349,7 +349,7 @@ describe('freeway', () => {
const contentLength = parseInt(res.headers.get('Content-Length') ?? '0')
assert(contentLength)
assert.equal(contentLength, rootBlock.bytes.length)
assert.equal(res.headers.get('Content-Range'), `bytes ${rootBlock.blockOffset}-${rootBlock.blockOffset + rootBlock.blockLength}/${obj.size}`)
assert.equal(res.headers.get('Content-Range'), `bytes ${rootBlock.blockOffset}-${rootBlock.blockOffset + rootBlock.blockLength - 1}/${obj.size}`)
assert.equal(res.headers.get('Content-Type'), 'application/vnd.ipld.car; version=1;')
assert.equal(res.headers.get('Etag'), `"${shards[0]}"`)
})
Expand Down

0 comments on commit ea1e6b8

Please sign in to comment.