Skip to content

Commit

Permalink
feat: send events immediately sends response to client
Browse files Browse the repository at this point in the history
**BREAKING CHANGE**

Closes #534
  • Loading branch information
kitsonk committed Feb 25, 2024
1 parent bc88315 commit a3f0076
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 28 deletions.
2 changes: 1 addition & 1 deletion context.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ Deno.test({
name: "context.sendEvents()",
async fn() {
const context = new Context(createMockApp(), createMockNativeRequest(), {});
const sse = context.sendEvents();
const sse = await context.sendEvents();
assertEquals((context.app as any).listeners, ["close"]);
sse.dispatchComment(`hello world`);
await sse.close();
Expand Down
38 changes: 15 additions & 23 deletions context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@

import type { Application, State } from "./application.ts";
import {
assert,
createHttpError,
type ErrorStatus,
type HttpErrorOptions,
KeyStack,
SecureCookieMap,
ServerSentEventStreamTarget,
type ServerSentEventTarget,
type ServerSentEventTargetOptions,
} from "./deps.ts";
Expand Down Expand Up @@ -248,30 +246,24 @@ export class Context<
return send(this, path, sendOptions);
}

/** Convert the connection to stream events, returning an event target for
* sending server sent events. Events dispatched on the returned target will
* be sent to the client and be available in the client's `EventSource` that
* initiated the connection.
/** Convert the connection to stream events, resolving with an event target
* for sending server sent events. Events dispatched on the returned target
* will be sent to the client and be available in the client's `EventSource`
* that initiated the connection.
*
* **Note** the body needs to be returned to the client to be able to
* dispatch events, so dispatching events within the middleware will delay
* sending the body back to the client.
*
* This will set the response body and update response headers to support
* sending SSE events. Additional middleware should not modify the body.
* Invoking this will cause the a response to be sent to the client
* immediately to initialize the stream of events, and therefore any further
* changes to the response, like headers will not reach the client.
*/
sendEvents(options?: ServerSentEventTargetOptions): ServerSentEventTarget {
async sendEvents(
options?: ServerSentEventTargetOptions,
): Promise<ServerSentEventTarget> {
if (!this.#sse) {
assert(this.response.writable, "The response is not writable.");
const sse = this.#sse = new ServerSentEventStreamTarget(options);
this.app.addEventListener("close", () => sse.close());
const [bodyInit, { headers }] = sse.asResponseInit({
const sse = this.#sse = await this.request.sendEvents(options, {
headers: this.response.headers,
});
this.response.body = bodyInit;
if (headers instanceof Headers) {
this.response.headers = headers;
}
this.app.addEventListener("close", () => sse.close());
this.respond = false;
}
return this.#sse;
}
Expand All @@ -298,8 +290,8 @@ export class Context<
* `false`. If the socket cannot be upgraded, this method will throw. */
upgrade(options?: UpgradeWebSocketOptions): WebSocket {
if (!this.#socket) {
this.#socket = this.request.upgrade(options);
this.app.addEventListener("close", () => this.#socket?.close());
const socket = this.#socket = this.request.upgrade(options);
this.app.addEventListener("close", () => socket.close());
this.respond = false;
}
return this.#socket;
Expand Down
9 changes: 5 additions & 4 deletions examples/sseServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,25 @@ router
})
// for any clients that request the `/sse` endpoint, we will send a message
// every 2 seconds.
.get("/sse", (ctx: Context) => {
.get("/sse", async (ctx: Context) => {
ctx.assert(
ctx.request.accepts("text/event-stream"),
Status.UnsupportedMediaType,
);
const connection = ctx.request.ip;
const target = ctx.sendEvents();
const sse = await ctx.sendEvents();
console.log(`${green("SSE connect")} ${cyan(connection)}`);
let counter = 0;
const id = setInterval(() => {
const evt = new ServerSentEvent(
"message",
{ data: { hello: "world" }, id: counter++ },
);
target.dispatchEvent(evt);
sse.dispatchEvent(evt);
console.log("dispatched");
}, 2000);
target.addEventListener("close", () => {
sse.dispatchMessage({ hello: "world" });
sse.addEventListener("close", () => {
console.log(`${green("SSE disconnect")} ${cyan(connection)}`);
clearInterval(id);
});
Expand Down
19 changes: 19 additions & 0 deletions request.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
// Copyright 2018-2024 the oak authors. All rights reserved. MIT license.

import { Body } from "./body.ts";
import { ServerSentEventStreamTarget } from "./deps.ts";
import {
accepts,
acceptsEncodings,
acceptsLanguages,
type HTTPMethods,
ServerSentEventTarget,
type ServerSentEventTargetOptions,
UserAgent,
} from "./deps.ts";
import type { ServerRequest, UpgradeWebSocketOptions } from "./types.ts";
Expand Down Expand Up @@ -233,6 +236,22 @@ export class Request {
return acceptsLanguages(this.#serverRequest);
}

/** Take the current request and initiate server sent event connection.
*
* > ![WARNING]
* > This is not intended for direct use, as it will not manage the target in
* > the overall context or ensure that additional middleware does not attempt
* > to respond to the request.
*/
async sendEvents(
options?: ServerSentEventTargetOptions,
init?: RequestInit,
): Promise<ServerSentEventTarget> {
const sse = new ServerSentEventStreamTarget(options);
await this.#serverRequest.respond(sse.asResponse(init));
return sse;
}

/** Take the current request and upgrade it to a web socket, returning a web
* standard `WebSocket` object.
*
Expand Down

0 comments on commit a3f0076

Please sign in to comment.