Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update streaming callable API #1652

Merged
merged 5 commits into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
- Add an authPolicy callback to CallableOptions for reusable auth middleware as well as helper auth policies (#1650)
- Multiple breaking changes to the not-yet-announced streaming feature for Callable Functions (#1652)
12 changes: 6 additions & 6 deletions spec/common/providers/https.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ describe("onCallHandler", () => {
cors: { origin: true, methods: "POST" },
},
(req, resp) => {
resp.write("hello");
resp.sendChunk("hello");
return "world";
},
"gcfv2"
Expand Down Expand Up @@ -840,10 +840,10 @@ describe("onCallHandler", () => {
{
cors: { origin: true, methods: "POST" },
},
(req, resp) => {
resp.write("initial message");
mockReq.emit("close");
resp.write("should not be sent");
async (req, resp) => {
await resp.sendChunk("initial message");
await mockReq.emit("close");
await resp.sendChunk("should not be sent");
return "done";
},
"gcfv2"
Expand Down Expand Up @@ -908,7 +908,7 @@ describe("onCallHandler", () => {
},
async (resp, res) => {
await new Promise((resolve) => setTimeout(resolve, 3_000));
res.write("hello");
res.sendChunk("hello");
await new Promise((resolve) => setTimeout(resolve, 3_000));
return "done";
},
Expand Down
5 changes: 4 additions & 1 deletion spec/helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,11 @@ export function runHandler(
}
}

public write(writeBody: any) {
public write(writeBody: any, cb?: () => void) {
this.sentBody += typeof writeBody === "object" ? JSON.stringify(writeBody) : writeBody;
if (cb) {
setImmediate(cb);
}
return true;
}

Expand Down
75 changes: 46 additions & 29 deletions src/common/providers/https.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,23 +141,30 @@ export interface CallableRequest<T = any> {
* The raw request handled by the callable.
*/
rawRequest: Request;

/**
* Whether this is a streaming request.
* Code can be optimized by not trying to generate a stream of chunks to
* call response.sendChunk on if request.acceptsStreaming is false.
* It is always safe, however, to call response.sendChunk as this will
* noop if acceptsStreaming is false.
*/
acceptsStreaming: boolean;
}

/**
* CallableProxyResponse exposes subset of express.Response object
* to allow writing partial, streaming responses back to the client.
* CallableProxyResponse allows streaming response chunks and listening to signals
* triggered in events such as a disconnect.
*/
export interface CallableProxyResponse {
export interface CallableResponse<T = unknown> {
/**
* Writes a chunk of the response body to the client. This method can be called
* multiple times to stream data progressively.
* Returns a promise of whether the data was written. This can be false, for example,
* if the request was not a streaming request. Rejects if there is a network error.
*/
write: express.Response["write"];
/**
* Indicates whether the client has requested and can handle streaming responses.
* This should be checked before attempting to stream data to avoid compatibility issues.
*/
acceptsStreaming: boolean;
sendChunk: (chunk: T) => Promise<boolean>;

/**
* An AbortSignal that is triggered when the client disconnects or the
* request is terminated prematurely.
Expand Down Expand Up @@ -586,13 +593,9 @@ async function checkTokens(
auth: "INVALID",
};

await Promise.all([
Promise.resolve().then(async () => {
verifications.auth = await checkAuthToken(req, ctx);
}),
Promise.resolve().then(async () => {
verifications.app = await checkAppCheckToken(req, ctx, options);
}),
[verifications.auth, verifications.app] = await Promise.all([
checkAuthToken(req, ctx),
checkAppCheckToken(req, ctx, options),
]);

const logPayload = {
Expand Down Expand Up @@ -697,9 +700,9 @@ async function checkAppCheckToken(
}

type v1CallableHandler = (data: any, context: CallableContext) => any | Promise<any>;
type v2CallableHandler<Req, Res> = (
type v2CallableHandler<Req, Res, Stream> = (
request: CallableRequest<Req>,
response?: CallableProxyResponse
response?: CallableResponse<Stream>
) => Res;

/** @internal **/
Expand All @@ -718,9 +721,9 @@ export interface CallableOptions<T = any> {
}

/** @internal */
export function onCallHandler<Req = any, Res = any>(
export function onCallHandler<Req = any, Res = any, Stream = unknown>(
options: CallableOptions<Req>,
handler: v1CallableHandler | v2CallableHandler<Req, Res>,
handler: v1CallableHandler | v2CallableHandler<Req, Res, Stream>,
version: "gcfv1" | "gcfv2"
): (req: Request, res: express.Response) => Promise<void> {
const wrapped = wrapOnCallHandler(options, handler, version);
Expand All @@ -739,9 +742,9 @@ function encodeSSE(data: unknown): string {
}

/** @internal */
function wrapOnCallHandler<Req = any, Res = any>(
function wrapOnCallHandler<Req = any, Res = any, Stream = unknown>(
options: CallableOptions<Req>,
handler: v1CallableHandler | v2CallableHandler<Req, Res>,
handler: v1CallableHandler | v2CallableHandler<Req, Res, Stream>,
version: "gcfv1" | "gcfv2"
): (req: Request, res: express.Response) => Promise<void> {
return async (req: Request, res: express.Response): Promise<void> => {
Expand Down Expand Up @@ -855,27 +858,41 @@ function wrapOnCallHandler<Req = any, Res = any>(
const arg: CallableRequest<Req> = {
...context,
data,
acceptsStreaming,
};

const responseProxy: CallableProxyResponse = {
write(chunk): boolean {
const responseProxy: CallableResponse<Stream> = {
sendChunk(chunk: Stream): Promise<boolean> {
// if client doesn't accept sse-protocol, response.write() is no-op.
if (!acceptsStreaming) {
return false;
return Promise.resolve(false);
}
// if connection is already closed, response.write() is no-op.
if (abortController.signal.aborted) {
return false;
return Promise.resolve(false);
}
const formattedData = encodeSSE({ message: chunk });
const wrote = res.write(formattedData);
let resolve: (wrote: boolean) => void;
let reject: (err: Error) => void;
const p = new Promise<boolean>((res, rej) => {
resolve = res;
reject = rej;
});
const wrote = res.write(formattedData, (error) => {
if (error) {
reject(error);
return;
}
resolve(wrote);
});

// Reset heartbeat timer after successful write
if (wrote && heartbeatInterval !== null && heartbeatSeconds > 0) {
scheduleHeartbeat();
}
return wrote;

return p;
},
acceptsStreaming,
signal: abortController.signal,
};
if (acceptsStreaming) {
Expand Down
38 changes: 27 additions & 11 deletions src/v2/providers/https.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import { isDebugFeatureEnabled } from "../../common/debug";
import { ResetValue } from "../../common/options";
import {
CallableRequest,
CallableProxyResponse,
CallableResponse,
FunctionsErrorCode,
HttpsError,
onCallHandler,
Expand Down Expand Up @@ -258,12 +258,17 @@ export type HttpsFunction = ((
/**
* Creates a callable method for clients to call using a Firebase SDK.
*/
export interface CallableFunction<T, Return> extends HttpsFunction {
export interface CallableFunction<T, Return, Stream = unknown> extends HttpsFunction {
/** Executes the handler function with the provided data as input. Used for unit testing.
* @param data - An input for the handler function.
* @returns The output of the handler function.
*/
run(data: CallableRequest<T>): Return;
run(request: CallableRequest<T>): Return;

stream(
request: CallableRequest<T>,
response: CallableResponse<Stream>
): { stream: AsyncIterator<Stream>; output: Return };
}

/**
Expand Down Expand Up @@ -387,22 +392,22 @@ export function onRequest(
* @param handler - A function that takes a {@link https.CallableRequest}.
* @returns A function that you can export and deploy.
*/
export function onCall<T = any, Return = any | Promise<any>>(
export function onCall<T = any, Return = any | Promise<any>, Stream = unknown>(
opts: CallableOptions<T>,
handler: (request: CallableRequest<T>, response?: CallableProxyResponse) => Return
): CallableFunction<T, Return extends Promise<unknown> ? Return : Promise<Return>>;
handler: (request: CallableRequest<T>, response?: CallableResponse<Stream>) => Return
): CallableFunction<T, Return extends Promise<unknown> ? Return : Promise<Return>, Stream>;

/**
* Declares a callable method for clients to call using a Firebase SDK.
* @param handler - A function that takes a {@link https.CallableRequest}.
* @returns A function that you can export and deploy.
*/
export function onCall<T = any, Return = any | Promise<any>>(
handler: (request: CallableRequest<T>, response?: CallableProxyResponse) => Return
export function onCall<T = any, Return = any | Promise<any>, Stream = unknown>(
handler: (request: CallableRequest<T>, response?: CallableResponse<Stream>) => Return
): CallableFunction<T, Return extends Promise<unknown> ? Return : Promise<Return>>;
export function onCall<T = any, Return = any | Promise<any>>(
export function onCall<T = any, Return = any | Promise<any>, Stream = unknown>(
optsOrHandler: CallableOptions<T> | ((request: CallableRequest<T>) => Return),
handler?: (request: CallableRequest<T>, response?: CallableProxyResponse) => Return
handler?: (request: CallableRequest<T>, response?: CallableResponse<Stream>) => Return
): CallableFunction<T, Return extends Promise<unknown> ? Return : Promise<Return>> {
let opts: CallableOptions;
if (arguments.length === 1) {
Expand All @@ -421,7 +426,7 @@ export function onCall<T = any, Return = any | Promise<any>>(
}

// fix the length of handler to make the call to handler consistent
const fixedLen = (req: CallableRequest<T>, resp?: CallableProxyResponse) => handler(req, resp);
const fixedLen = (req: CallableRequest<T>, resp?: CallableResponse<Stream>) => handler(req, resp);
let func: any = onCallHandler(
{
cors: { origin, methods: "POST" },
Expand Down Expand Up @@ -474,6 +479,17 @@ export function onCall<T = any, Return = any | Promise<any>>(
callableTrigger: {},
};

// TODO: in the next major version, do auth/appcheck in these helper methods too.
func.run = withInit(handler);
func.stream = () => {
return {
stream: {
next(): Promise<IteratorResult<Stream>> {
return Promise.reject("Coming soon");
},
},
output: Promise.reject("Coming soon"),
};
};
return func;
}
Loading