From a3f0076153233eb6551cbf5e487400453e963806 Mon Sep 17 00:00:00 2001 From: Kitson Kelly Date: Sun, 25 Feb 2024 19:39:21 +1100 Subject: [PATCH] feat: send events immediately sends response to client **BREAKING CHANGE** Closes #534 --- context.test.ts | 2 +- context.ts | 38 +++++++++++++++----------------------- examples/sseServer.ts | 9 +++++---- request.ts | 19 +++++++++++++++++++ 4 files changed, 40 insertions(+), 28 deletions(-) diff --git a/context.test.ts b/context.test.ts index 9a8baa9f..7bbafeb7 100644 --- a/context.test.ts +++ b/context.test.ts @@ -362,7 +362,7 @@ Deno.test({ name: "context.sendEvents()", async fn() { const context = new Context(createMockApp(), createMockNativeRequest(), {}); - const sse = context.sendEvents(); + const sse = await context.sendEvents(); assertEquals((context.app as any).listeners, ["close"]); sse.dispatchComment(`hello world`); await sse.close(); diff --git a/context.ts b/context.ts index f5ec23f1..b39e7bd2 100644 --- a/context.ts +++ b/context.ts @@ -2,13 +2,11 @@ import type { Application, State } from "./application.ts"; import { - assert, createHttpError, type ErrorStatus, type HttpErrorOptions, KeyStack, SecureCookieMap, - ServerSentEventStreamTarget, type ServerSentEventTarget, type ServerSentEventTargetOptions, } from "./deps.ts"; @@ -248,30 +246,24 @@ export class Context< return send(this, path, sendOptions); } - /** Convert the connection to stream events, returning an event target for - * sending server sent events. Events dispatched on the returned target will - * be sent to the client and be available in the client's `EventSource` that - * initiated the connection. + /** Convert the connection to stream events, resolving with an event target + * for sending server sent events. Events dispatched on the returned target + * will be sent to the client and be available in the client's `EventSource` + * that initiated the connection. * - * **Note** the body needs to be returned to the client to be able to - * dispatch events, so dispatching events within the middleware will delay - * sending the body back to the client. - * - * This will set the response body and update response headers to support - * sending SSE events. Additional middleware should not modify the body. + * Invoking this will cause the a response to be sent to the client + * immediately to initialize the stream of events, and therefore any further + * changes to the response, like headers will not reach the client. */ - sendEvents(options?: ServerSentEventTargetOptions): ServerSentEventTarget { + async sendEvents( + options?: ServerSentEventTargetOptions, + ): Promise { if (!this.#sse) { - assert(this.response.writable, "The response is not writable."); - const sse = this.#sse = new ServerSentEventStreamTarget(options); - this.app.addEventListener("close", () => sse.close()); - const [bodyInit, { headers }] = sse.asResponseInit({ + const sse = this.#sse = await this.request.sendEvents(options, { headers: this.response.headers, }); - this.response.body = bodyInit; - if (headers instanceof Headers) { - this.response.headers = headers; - } + this.app.addEventListener("close", () => sse.close()); + this.respond = false; } return this.#sse; } @@ -298,8 +290,8 @@ export class Context< * `false`. If the socket cannot be upgraded, this method will throw. */ upgrade(options?: UpgradeWebSocketOptions): WebSocket { if (!this.#socket) { - this.#socket = this.request.upgrade(options); - this.app.addEventListener("close", () => this.#socket?.close()); + const socket = this.#socket = this.request.upgrade(options); + this.app.addEventListener("close", () => socket.close()); this.respond = false; } return this.#socket; diff --git a/examples/sseServer.ts b/examples/sseServer.ts index ee0cd8a2..ed7227bc 100644 --- a/examples/sseServer.ts +++ b/examples/sseServer.ts @@ -43,13 +43,13 @@ router }) // for any clients that request the `/sse` endpoint, we will send a message // every 2 seconds. - .get("/sse", (ctx: Context) => { + .get("/sse", async (ctx: Context) => { ctx.assert( ctx.request.accepts("text/event-stream"), Status.UnsupportedMediaType, ); const connection = ctx.request.ip; - const target = ctx.sendEvents(); + const sse = await ctx.sendEvents(); console.log(`${green("SSE connect")} ${cyan(connection)}`); let counter = 0; const id = setInterval(() => { @@ -57,10 +57,11 @@ router "message", { data: { hello: "world" }, id: counter++ }, ); - target.dispatchEvent(evt); + sse.dispatchEvent(evt); console.log("dispatched"); }, 2000); - target.addEventListener("close", () => { + sse.dispatchMessage({ hello: "world" }); + sse.addEventListener("close", () => { console.log(`${green("SSE disconnect")} ${cyan(connection)}`); clearInterval(id); }); diff --git a/request.ts b/request.ts index e1e83e17..526b3184 100644 --- a/request.ts +++ b/request.ts @@ -1,11 +1,14 @@ // Copyright 2018-2024 the oak authors. All rights reserved. MIT license. import { Body } from "./body.ts"; +import { ServerSentEventStreamTarget } from "./deps.ts"; import { accepts, acceptsEncodings, acceptsLanguages, type HTTPMethods, + ServerSentEventTarget, + type ServerSentEventTargetOptions, UserAgent, } from "./deps.ts"; import type { ServerRequest, UpgradeWebSocketOptions } from "./types.ts"; @@ -233,6 +236,22 @@ export class Request { return acceptsLanguages(this.#serverRequest); } + /** Take the current request and initiate server sent event connection. + * + * > ![WARNING] + * > This is not intended for direct use, as it will not manage the target in + * > the overall context or ensure that additional middleware does not attempt + * > to respond to the request. + */ + async sendEvents( + options?: ServerSentEventTargetOptions, + init?: RequestInit, + ): Promise { + const sse = new ServerSentEventStreamTarget(options); + await this.#serverRequest.respond(sse.asResponse(init)); + return sse; + } + /** Take the current request and upgrade it to a web socket, returning a web * standard `WebSocket` object. *