Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(helper/streaming): Avoid attaching AbortSignal on newer versions of Bun #3859

Merged
merged 2 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/helper/streaming/sse.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ describe('SSE Streaming helper', () => {
})

it('Check streamSSE Response if aborted by abort signal', async () => {
// Emulate an old version of Bun (version 1.1.0) for this specific test case
// @ts-expect-error Bun is not typed
global.Bun = {
version: '1.1.0',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To pass the test, specify the old version at the global.

}
const ac = new AbortController()
const req = new Request('http://localhost/', { signal: ac.signal })
const c = new Context(req)
Expand Down
17 changes: 11 additions & 6 deletions src/helper/streaming/sse.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Context } from '../../context'
import { HtmlEscapedCallbackPhase, resolveCallback } from '../../utils/html'
import { StreamingApi } from '../../utils/stream'
import { isOldBunVersion } from './utils'

export interface SSEMessage {
data: string | Promise<string>
Expand Down Expand Up @@ -61,6 +62,7 @@ const run = async (
}

const contextStash: WeakMap<ReadableStream, Context> = new WeakMap<ReadableStream, Context>()

export const streamSSE = (
c: Context,
cb: (stream: SSEStreamingApi) => Promise<void>,
Expand All @@ -69,12 +71,15 @@ export const streamSSE = (
const { readable, writable } = new TransformStream()
const stream = new SSEStreamingApi(writable, readable)

// bun does not cancel response stream when request is canceled, so detect abort by signal
c.req.raw.signal.addEventListener('abort', () => {
if (!stream.closed) {
stream.abort()
}
})
// Until Bun v1.1.27, Bun didn't call cancel() on the ReadableStream for Response objects from Bun.serve()
if (isOldBunVersion()) {
c.req.raw.signal.addEventListener('abort', () => {
if (!stream.closed) {
stream.abort()
}
})
}

// in bun, `c` is destroyed when the request is returned, so hold it until the end of streaming
contextStash.set(stream.responseReadable, c)

Expand Down
15 changes: 15 additions & 0 deletions src/helper/streaming/stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Context } from '../../context'
import { isOldBunVersion } from './utils'

Check warning on line 2 in src/helper/streaming/stream.test.ts

View workflow job for this annotation

GitHub Actions / Main

'isOldBunVersion' is defined but never used
import { stream } from '.'

describe('Basic Streaming Helper', () => {
Expand Down Expand Up @@ -47,6 +48,11 @@
})

it('Check stream Response if aborted by abort signal', async () => {
// Emulate an old version of Bun (version 1.1.0) for this specific test case
// @ts-expect-error Bun is not typed
global.Bun = {
version: '1.1.0',
}
const ac = new AbortController()
const req = new Request('http://localhost/', { signal: ac.signal })
const c = new Context(req)
Expand All @@ -69,9 +75,16 @@
expect(value).toEqual(new Uint8Array([0]))
ac.abort()
expect(aborted).toBeTruthy()
// @ts-expect-error Bun is not typed
delete global.Bun
})

it('Check stream Response if pipe is aborted by abort signal', async () => {
// Emulate an old version of Bun (version 1.1.0) for this specific test case
// @ts-expect-error Bun is not typed
global.Bun = {
version: '1.1.0',
}
const ac = new AbortController()
const req = new Request('http://localhost/', { signal: ac.signal })
const c = new Context(req)
Expand All @@ -91,6 +104,8 @@
ac.abort()
await pReading
expect(aborted).toBeTruthy()
// @ts-expect-error Bun is not typed
delete global.Bun
})

it('Check stream Response if error occurred', async () => {
Expand Down
17 changes: 11 additions & 6 deletions src/helper/streaming/stream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import type { Context } from '../../context'
import { StreamingApi } from '../../utils/stream'
import { isOldBunVersion } from './utils'

const contextStash: WeakMap<ReadableStream, Context> = new WeakMap<ReadableStream, Context>()

export const stream = (
c: Context,
cb: (stream: StreamingApi) => Promise<void>,
Expand All @@ -10,12 +12,15 @@ export const stream = (
const { readable, writable } = new TransformStream()
const stream = new StreamingApi(writable, readable)

// bun does not cancel response stream when request is canceled, so detect abort by signal
c.req.raw.signal.addEventListener('abort', () => {
if (!stream.closed) {
stream.abort()
}
})
// Until Bun v1.1.27, Bun didn't call cancel() on the ReadableStream for Response objects from Bun.serve()
if (isOldBunVersion()) {
c.req.raw.signal.addEventListener('abort', () => {
if (!stream.closed) {
stream.abort()
}
})
}

// in bun, `c` is destroyed when the request is returned, so hold it until the end of streaming
contextStash.set(stream.responseReadable, c)
;(async () => {
Expand Down
11 changes: 11 additions & 0 deletions src/helper/streaming/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export let isOldBunVersion = (): boolean => {
// @ts-expect-error @types/bun is not installed
const version: string = typeof Bun !== 'undefined' ? Bun.version : undefined
if (version === undefined) {
return false
}
const result = version.startsWith('1.1') || version.startsWith('1.0') || version.startsWith('0.')
// Avoid running this check on every call
isOldBunVersion = () => result
return result
}
Loading