Skip to content

Commit

Permalink
Re-implement Cache gateway using new storage system (#562)
Browse files Browse the repository at this point in the history
* Bump to `ava@5.2.0` and disable worker threads when debugging

This attempts to fix an issue where tests would pass, but AVA would
hang. Step-through debugging doesn't seem to work when the
`workerThreads` option is enabled either.

* Re-implement Cache gateway using new storage system

Closes DEVX-590
  • Loading branch information
mrbbot committed Oct 31, 2023
1 parent bb3f01c commit 4aee223
Show file tree
Hide file tree
Showing 13 changed files with 512 additions and 337 deletions.
10 changes: 10 additions & 0 deletions packages/miniflare/src/plugins/cache/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { CfHeader } from "../shared/constants";

enum Status {
PayloadTooLarge = 413,
RangeNotSatisfiable = 416,
NotFound = 404,
CacheMiss = 504,
}
Expand Down Expand Up @@ -52,3 +53,12 @@ export class CacheMiss extends CacheError {
);
}
}

export class RangeNotSatisfiable extends CacheError {
constructor(size: number) {
super(Status.RangeNotSatisfiable, "Range not satisfiable", [
["Content-Range", `bytes */${size}`],
[CfHeader.CacheStatus, "HIT"],
]);
}
}
275 changes: 169 additions & 106 deletions packages/miniflare/src/plugins/cache/gateway.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
import assert from "assert";
import crypto from "crypto";
import http from "http";
import { AddressInfo } from "net";
import { ReadableStream, TransformStream } from "stream/web";
import CachePolicy from "http-cache-semantics";
import { Headers, HeadersInit, Request, Response, fetch } from "../../http";
import { Clock, Log, millisToSeconds } from "../../shared";
import { Clock, DeferredPromise, Log } from "../../shared";
import { Storage } from "../../storage";
import {
InclusiveRange,
KeyValueStorage,
MultipartReadableStream,
} from "../../storage2";
import { isSitesRequest } from "../kv";
import { _getRangeResponse } from "../shared";
import { CacheMiss, PurgeFailure, StorageFailure } from "./errors";
import { _parseRanges } from "../shared";
import {
CacheMiss,
PurgeFailure,
RangeNotSatisfiable,
StorageFailure,
} from "./errors";

interface CacheMetadata {
headers: string[][];
status: number;
size: number;
}

function getExpiration(clock: Clock, req: Request, res: Response) {
Expand Down Expand Up @@ -92,26 +103,27 @@ function parseUTCDate(value: string): number {
return utcDateRegexp.test(value) ? Date.parse(value) : NaN;
}

// Lifted from Miniflare 2
function getMatchResponse(
reqHeaders: Headers,
resStatus: number,
resHeaders: Headers,
resBody: Uint8Array
): Response {
interface CachedResponse {
status: number;
headers: Headers;
ranges: InclusiveRange[];
body: ReadableStream<Uint8Array> | MultipartReadableStream;
totalSize: number;
}
function getMatchResponse(reqHeaders: Headers, res: CachedResponse): Response {
// If `If-None-Match` is set, perform a conditional request:
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-None-Match
const reqIfNoneMatchHeader = reqHeaders.get("If-None-Match");
const resETagHeader = resHeaders.get("ETag");
const resETagHeader = res.headers.get("ETag");
if (reqIfNoneMatchHeader !== null && resETagHeader !== null) {
const resETag = parseETag(resETagHeader);
if (resETag !== undefined) {
if (reqIfNoneMatchHeader.trim() === "*") {
return new Response(null, { status: 304, headers: resHeaders });
return new Response(null, { status: 304, headers: res.headers });
}
for (const reqIfNoneMatch of reqIfNoneMatchHeader.split(",")) {
if (resETag === parseETag(reqIfNoneMatch)) {
return new Response(null, { status: 304, headers: resHeaders });
return new Response(null, { status: 304, headers: res.headers });
}
}
}
Expand All @@ -120,150 +132,201 @@ function getMatchResponse(
// If `If-Modified-Since` is set, perform a conditional request:
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-Modified-Since
const reqIfModifiedSinceHeader = reqHeaders.get("If-Modified-Since");
const resLastModifiedHeader = resHeaders.get("Last-Modified");
const resLastModifiedHeader = res.headers.get("Last-Modified");
if (reqIfModifiedSinceHeader !== null && resLastModifiedHeader !== null) {
const reqIfModifiedSince = parseUTCDate(reqIfModifiedSinceHeader);
const resLastModified = parseUTCDate(resLastModifiedHeader);
// Comparison of NaN's (invalid dates), will always result in `false`
if (resLastModified <= reqIfModifiedSince) {
return new Response(null, { status: 304, headers: resHeaders });
return new Response(null, { status: 304, headers: res.headers });
}
}

// If `Range` is set, return a partial response:
// If `Range` was set, return a partial response:
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Range
const reqRangeHeader = reqHeaders.get("Range");
if (reqRangeHeader !== null) {
return _getRangeResponse(reqRangeHeader, resStatus, resHeaders, resBody);
if (res.ranges.length > 0) {
res.status = 206; // Partial Content
if (res.ranges.length > 1) {
assert(!(res.body instanceof ReadableStream)); // assert(isMultipart)
res.headers.set("Content-Type", res.body.multipartContentType);
} else {
const { start, end } = res.ranges[0];
res.headers.set(
"Content-Range",
`bytes ${start}-${end}/${res.totalSize}`
);
res.headers.set("Content-Length", `${end - start + 1}`);
}
}

// Otherwise, return the full response
return new Response(resBody, { status: resStatus, headers: resHeaders });
if (!(res.body instanceof ReadableStream)) res.body = res.body.body;
return new Response(res.body, { status: res.status, headers: res.headers });
}

class CacheResponse {
constructor(
readonly metadata: CacheMetadata,
readonly value: Uint8Array
) {}
toResponse(): Response {
return new Response(this.value, {
status: this.metadata.status,
headers: this.metadata.headers,
});
/** @internal */
export class _HttpParser {
private static INSTANCE: _HttpParser;
static get(): _HttpParser {
_HttpParser.INSTANCE ??= new _HttpParser();
return _HttpParser.INSTANCE;
}
}

interface ParsedHttpResponse {
headers: Headers;
status: number;
body: Uint8Array;
}
class HttpParser {
readonly server: http.Server;
readonly responses: Map<string, Uint8Array> = new Map();
readonly connected: Promise<void>;
private static INSTANCE: HttpParser;
static get(): HttpParser {
HttpParser.INSTANCE ??= new HttpParser();
return HttpParser.INSTANCE;
}
readonly #responses: Map<string, ReadableStream<Uint8Array>> = new Map();
readonly #ready: Promise<URL>;

private constructor() {
this.server = http.createServer(this.listen.bind(this)).unref();
this.connected = new Promise((accept) => {
this.server.listen(0, "localhost", accept);
const server = http.createServer(this.#listen).unref();
this.#ready = new Promise((resolve) => {
server.listen(0, "localhost", () => {
const address = server.address();
assert(address !== null && typeof address === "object");
resolve(new URL(`http://localhost:${address.port}`));
});
});
}
private listen(request: http.IncomingMessage, response: http.ServerResponse) {
assert(request.url !== undefined);
assert(response.socket !== null);
const array = this.responses.get(request.url);
assert(array !== undefined);

#listen: http.RequestListener = async (req, res) => {
assert(req.url !== undefined);
assert(res.socket !== null);
const stream = this.#responses.get(req.url);
assert(stream !== undefined);
// Write response to parse directly to underlying socket
response.socket.write(array);
response.socket.end();
}
public async parse(response: Uint8Array): Promise<ParsedHttpResponse> {
await this.connected;
for await (const chunk of stream) res.socket.write(chunk);
res.socket.end();
};

async parse(response: ReadableStream<Uint8Array>): Promise<Response> {
const baseURL = await this.#ready;
// Since multiple parses can be in-flight at once, an identifier is needed
const id = `/${crypto.randomBytes(16).toString("hex")}`;
this.responses.set(id, response);
const address = this.server.address()! as AddressInfo;
this.#responses.set(id, response);
try {
const parsedResponse = await fetch(
`http://localhost:${address.port}${id}`
);
const body = await parsedResponse.arrayBuffer();
return {
headers: parsedResponse.headers,
status: parsedResponse.status,
body: new Uint8Array(body),
};
return await fetch(new URL(id, baseURL));
} finally {
this.responses.delete(id);
this.#responses.delete(id);
}
}
}

class SizingStream extends TransformStream<Uint8Array, Uint8Array> {
readonly size: Promise<number>;

constructor() {
const sizePromise = new DeferredPromise<number>();
let size = 0;
super({
transform(chunk, controller) {
size += chunk.byteLength;
controller.enqueue(chunk);
},
flush() {
sizePromise.resolve(size);
},
});
this.size = sizePromise;
}
}

export class CacheGateway {
private readonly storage: KeyValueStorage<CacheMetadata>;

constructor(
private readonly log: Log,
private readonly storage: Storage,
log: Log,
legacyStorage: Storage,
private readonly clock: Clock
) {}
) {
const storage = legacyStorage.getNewStorage();
this.storage = new KeyValueStorage(storage, clock);
}

async match(request: Request, cacheKey?: string): Promise<Response> {
// Never cache Workers Sites requests, so we always return on-disk files
if (isSitesRequest(request)) throw new CacheMiss();

cacheKey ??= request.url;
const cached = await this.storage.get<CacheMetadata>(cacheKey);

let resHeaders: Headers | undefined;
let resRanges: InclusiveRange[] | undefined;

const cached = await this.storage.get(cacheKey, ({ size, headers }) => {
resHeaders = new Headers(headers);
const contentType = resHeaders.get("Content-Type");

// Need size from metadata to parse `Range` header
const rangeHeader = request.headers.get("Range");
if (rangeHeader !== null) {
resRanges = _parseRanges(rangeHeader, size);
if (resRanges === undefined) throw new RangeNotSatisfiable(size);
}

return {
ranges: resRanges,
contentLength: size,
contentType: contentType ?? undefined,
};
});
if (cached?.metadata === undefined) throw new CacheMiss();

const response = new CacheResponse(
cached.metadata,
cached.value
).toResponse();
response.headers.set("CF-Cache-Status", "HIT");

return getMatchResponse(
request.headers,
cached.metadata.status,
response.headers,
cached.value
);
// Should've constructed headers when we extracted range options (the only
// time we don't do this is when the entry isn't found, or expired, in which
// case, we just threw a `CacheMiss`)
assert(resHeaders !== undefined);
resHeaders.set("CF-Cache-Status", "HIT");
resRanges ??= [];

return getMatchResponse(request.headers, {
status: cached.metadata.status,
headers: resHeaders,
ranges: resRanges,
body: cached.value,
totalSize: cached.metadata.size,
});
}

async put(
request: Request,
value: Uint8Array,
value: ReadableStream<Uint8Array>,
cacheKey?: string
): Promise<Response> {
// Never cache Workers Sites requests, so we always return on-disk files
// Never cache Workers Sites requests, so we always return on-disk files.
if (isSitesRequest(request)) return new Response(null, { status: 204 });

const response = await HttpParser.get().parse(value);
const response = await _HttpParser.get().parse(value);
let body = response.body;
assert(body !== null);

const { storable, expiration, headers } = getExpiration(
this.clock,
request,
new Response(response.body, {
status: response.status,
headers: response.headers,
})
response
);
if (!storable) {
throw new StorageFailure();
}
if (!storable) throw new StorageFailure();

cacheKey ??= request.url;
await this.storage.put<CacheMetadata>(cacheKey, {
value: response.body,
expiration: millisToSeconds(this.clock() + expiration),
metadata: {
headers: Object.entries(headers),
status: response.status,
},

// If we know the size, avoid passing the body through a transform stream to
// count it (trusting `workerd` to send correct value here).
// Safety of `!`: `parseInt(null)` is `NaN`
const contentLength = parseInt(response.headers.get("Content-Length")!);
let sizePromise: Promise<number>;
if (Number.isNaN(contentLength)) {
const stream = new SizingStream();
body = body.pipeThrough(stream);
sizePromise = stream.size;
} else {
sizePromise = Promise.resolve(contentLength);
}

const metadata: Promise<CacheMetadata> = sizePromise.then((size) => ({
headers: Object.entries(headers),
status: response.status,
size,
}));

await this.storage.put({
key: cacheKey,
value: body,
expiration: this.clock() + expiration,
metadata,
});
return new Response(null, { status: 204 });
}
Expand Down
1 change: 1 addition & 0 deletions packages/miniflare/src/plugins/cache/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,4 @@ export const CACHE_PLUGIN: Plugin<
};

export * from "./gateway";
export { _RemoveTransformEncodingChunkedStream } from "./router";
Loading

0 comments on commit 4aee223

Please sign in to comment.