Skip to content

Commit

Permalink
feat(utils): introduces stream utilities
Browse files Browse the repository at this point in the history
  • Loading branch information
feugy committed Nov 25, 2022
1 parent b6413d2 commit b2fe9d0
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 67 deletions.
1 change: 1 addition & 0 deletions packages/primitives/type-definitions/fetch.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export class Request extends globalThis.Request {

export class Response extends globalThis.Response {
readonly headers: Headers
static json(data: any, init?: ResponseInit): Response
}

declare const fetchImplementation: typeof fetch
Expand Down
73 changes: 10 additions & 63 deletions packages/utils/src/edge-to-node/handler.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import type { IncomingMessage, ServerResponse } from 'http'
import { Readable } from 'stream'
import type { WebHandler, NodeHandler } from '../types'
import { transformToOugoingHeaders } from './headers'
import { transformToReadable } from './stream'

export function transformToNode(webHandler: WebHandler): NodeHandler {
return (request: IncomingMessage, response: ServerResponse) => {
// TODO supports the second parameter
// TODO map incoming message
// @ts-ignore TODO map IncompingMessage into Request
const maybePromise = webHandler(request)
const mapResponse = buildResponseMapper(response)
Expand Down Expand Up @@ -40,66 +40,13 @@ function buildResponseMapper(serverResponse: ServerResponse) {
serverResponse.end()
return
}
buildStreamFromReadableStream(webResponse.body).pipe(serverResponse)
}
}

/**
* Code adapted from Node's stream.Readable.fromWeb()
* @see https://github.com/nodejs/node/blob/bd462ad81bc30e547e52e699ee3b6fa3d7c882c9/lib/internal/webstreams/adapters.js#L458
*/
function buildStreamFromReadableStream(readableStream: ReadableStream) {
const reader = readableStream.getReader()
let closed = false

const readable = new Readable({
objectMode: false,
read() {
reader.read().then(
(chunk: any) => {
if (chunk.done) {
readable.push(null)
} else {
readable.push(chunk.value)
}
},
(error: any) => readable.destroy(error)
)
},

destroy(error: any, callback: (arg0: any) => void) {
function done() {
try {
callback(error)
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => {
throw error
})
}
}

if (!closed) {
reader.cancel(error).then(done, done)
return
}
done()
},
})

reader.closed.then(
() => {
closed = true
},
(error: any) => {
closed = true
readable.destroy(error)
if ('getReader' in webResponse.body) {
transformToReadable(webResponse.body).pipe(serverResponse)
} else if ('pipe' in webResponse.body) {
// @ts-ignore TODO @shniz how could the web response body have a pipe operator?
webResponse.body.pipe(serverResponse)
} else {
serverResponse.end(webResponse.body)
}
)

return readable
}
}
1 change: 1 addition & 0 deletions packages/utils/src/edge-to-node/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './handler'
export * from './headers'
export * from './stream'
76 changes: 76 additions & 0 deletions packages/utils/src/edge-to-node/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { Readable } from 'node:stream'

interface FromWebOptions {
objectMode?: boolean
highWaterMark?: number
encoding?: BufferEncoding
signal?: AbortSignal
}

/**
* Code adapted from Node's stream.Readable.fromWeb()
* @see https://github.com/nodejs/node/blob/bd462ad81bc30e547e52e699ee3b6fa3d7c882c9/lib/internal/webstreams/adapters.js#L458
*/
export function transformToReadable(
webStream: ReadableStream,
options: FromWebOptions = {}
) {
const reader = webStream.getReader()
let closed = false
const { highWaterMark, encoding, objectMode = false, signal } = options

const readable = new Readable({
objectMode,
highWaterMark,
encoding,
// @ts-ignore signal exist only since Node@17
signal,
read() {
reader.read().then(
(chunk: any) => {
if (chunk.done) {
readable.push(null)
} else {
readable.push(chunk.value)
}
},
(error: any) => readable.destroy(error)
)
},

destroy(error: any, callback: (arg0: any) => void) {
function done() {
try {
callback(error)
} catch (error) {
// In a next tick because this is happening within
// a promise context, and if there are any errors
// thrown we don't want those to cause an unhandled
// rejection. Let's just escape the promise and
// handle it separately.
process.nextTick(() => {
throw error
})
}
}

if (!closed) {
reader.cancel(error).then(done, done)
return
}
done()
},
})

reader.closed.then(
() => {
closed = true
},
(error: any) => {
closed = true
readable.destroy(error)
}
)

return readable
}
60 changes: 56 additions & 4 deletions packages/utils/test/edge-to-node/handler.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import * as primitives from '@edge-runtime/primitives'
import { createServer, Server } from 'node:http'
import { Response } from '@edge-runtime/primitives'
import { createServer, type Server } from 'node:http'
import type { AddressInfo } from 'node:net'
import { Readable } from 'node:stream'
import { transformToNode } from '../../src'
import { WebHandler } from '../../src/types'

Expand Down Expand Up @@ -35,7 +38,9 @@ describe('transformToNode()', () => {
}
})
)
const response = await fetch(`http://localhost:${server?.address()?.port}`)
const response = await fetch(
`http://localhost:${(server?.address() as AddressInfo)?.port}`
)

// extract response content to ease expectations
const headers: Record<string, string> = {}
Expand Down Expand Up @@ -113,6 +118,19 @@ describe('transformToNode()', () => {
})
})

it('returns an async json response', async () => {
const json = { works: 'just right' }
const response = await invokeWebHandler(() =>
Promise.resolve(Response.json(json))
)
expect(response).toMatchObject({
status: 200,
statusText: 'OK',
headers: { 'content-type': 'application/json' },
json,
})
})

it('can configure response headers', async () => {
const response = await invokeWebHandler(() => {
const response = new Response()
Expand All @@ -129,7 +147,7 @@ describe('transformToNode()', () => {
const data = ['lorem', 'ipsum', 'nec', 'mergitur']

const response = await invokeWebHandler(
() =>
async () =>
new Response(
new ReadableStream({
start(controller) {
Expand All @@ -147,9 +165,43 @@ describe('transformToNode()', () => {
})
)
)
expect(response).toMatchObject({ status: 200, text: data.join('') })
})

it('returns a stream body', async () => {
const stream = Readable.from(
(async function* () {
yield 'hello'
await new Promise((resolve) => setTimeout(resolve, 200))
yield ' world'
})()
)
const response = await invokeWebHandler(
() =>
({
status: 200,
body: stream,
headers: new Headers(),
} as unknown as Response)
)
expect(response).toMatchObject({
status: 200,
text: data.join(''),
statusText: 'OK',
text: 'hello world',
})
})

it('returns a buffer body', async () => {
const text = 'blah'

const response = await invokeWebHandler(
() =>
({
status: 200,
body: Buffer.from(text),
headers: new Headers(),
} as unknown as Response)
)
expect(response).toMatchObject({ status: 200, statusText: 'OK', text })
})
})
30 changes: 30 additions & 0 deletions packages/utils/test/edge-to-node/stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { ReadableStream } from '@edge-runtime/primitives'
import { Readable } from 'node:stream'
import { transformToReadable } from '../../src'

describe('transformToReadable()', () => {
it('handles a web ReadableStream', async () => {
const readableStream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder()
controller.enqueue(encoder.encode('hello'))
await new Promise((resolve) => setTimeout(resolve, 200))
controller.enqueue(encoder.encode(' world'))
controller.close()
},
})

const readable = transformToReadable(readableStream)
expect((await transformToBuffer(readable)).toString()).toEqual(
'hello world'
)
})
})

async function transformToBuffer(stream: Readable) {
const buffers = []
for await (const data of stream) {
buffers.push(data)
}
return Buffer.concat(buffers)
}

0 comments on commit b2fe9d0

Please sign in to comment.