Skip to content

Commit

Permalink
fix: web stream encoding (#1821)
Browse files Browse the repository at this point in the history
Co-authored-by: Romuald Brillout <git@brillout.com>
  • Loading branch information
nitedani and brillout authored Aug 22, 2024
1 parent dffe6a2 commit 5d81041
Showing 1 changed file with 37 additions and 27 deletions.
64 changes: 37 additions & 27 deletions vike/node/runtime/html/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,14 @@ async function streamReadableNodeToString(readableNode: StreamReadableNode): Pro

async function streamReadableWebToString(readableWeb: ReadableStream): Promise<string> {
const reader = readableWeb.getReader()
const decoder = new TextDecoder()

const { decode, getClosingChunk } = decodeChunks()
let str: string = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
str += decoder.decode(value, { stream: true })
str += decode(value)
}

// https://github.com/vikejs/vike/pull/1799#discussion_r1713554096
str += decoder.decode()

str += getClosingChunk()
return str
}
async function stringToStreamReadableNode(str: string): Promise<StreamReadableNode> {
Expand Down Expand Up @@ -191,15 +187,16 @@ async function streamPipeNodeToString(streamPipeNode: StreamPipeNode): Promise<s
return promise
}
function streamPipeWebToString(streamPipeWeb: StreamPipeWeb): Promise<string> {
const { decode, getClosingChunk } = decodeChunks()
let str: string = ''
let resolve: (s: string) => void
const promise = new Promise<string>((r) => (resolve = r))
const writable = new WritableStream({
write(chunk) {
assert(typeof chunk === 'string')
str += chunk
str += decode(chunk)
},
close() {
str += getClosingChunk()
resolve(str)
}
})
Expand Down Expand Up @@ -498,9 +495,7 @@ async function createStreamWrapper({
const writeChunk = (chunk: unknown) => {
assert(writableOriginal)
writableOriginal.write(chunk)
if (debug.isActivated) {
debug('data written (Node.js Writable)', getChunkAsString(chunk))
}
debugWithChunk('data written (Node.js Writable)', chunk)
}
// For libraries such as https://www.npmjs.com/package/compression
// - React calls writable.flush() when available
Expand Down Expand Up @@ -576,9 +571,7 @@ async function createStreamWrapper({
const writeChunk = (chunk: unknown) => {
assert(writerOriginal)
writerOriginal.write(encodeForWebStream(chunk))
if (debug.isActivated) {
debug('data written (Web Writable)', getChunkAsString(chunk))
}
debugWithChunk('data written (Web Writable)', chunk)
}
// Web Streams have compression built-in
// - https://developer.mozilla.org/en-US/docs/Web/API/Compression_Streams_API
Expand Down Expand Up @@ -676,13 +669,9 @@ async function createStreamWrapper({
!controllerProxyIsClosed
) {
controllerProxy.enqueue(encodeForWebStream(chunk) as any)
if (debug.isActivated) {
debug('data written (Web Readable)', getChunkAsString(chunk))
}
debugWithChunk('data written (Web Readable)', chunk)
} else {
if (debug.isActivated) {
debug('data emitted but not written (Web Readable)', getChunkAsString(chunk))
}
debugWithChunk('data emitted but not written (Web Readable)', chunk)
}
}
// Readables don't have the notion of flushing
Expand All @@ -707,9 +696,7 @@ async function createStreamWrapper({

const writeChunk = (chunk: unknown) => {
readableProxy.push(chunk)
if (debug.isActivated) {
debug('data written (Node.js Readable)', getChunkAsString(chunk))
}
debugWithChunk('data written (Node.js Readable)', chunk)
}
// Readables don't have the notion of flushing
const flushStream = null
Expand Down Expand Up @@ -966,10 +953,33 @@ function inferStreamName(stream: StreamProviderNormalized) {
assert(false)
}

function getChunkAsString(chunk: unknown): string {
function decodeChunks() {
const decoder = new TextDecoder()
const decode = (chunk: unknown): string => {
if (typeof chunk === 'string') {
return chunk
} else if (chunk instanceof Uint8Array) {
return decoder.decode(chunk, { stream: true })
} else {
assert(false)
}
}
// https://github.com/vikejs/vike/pull/1799#discussion_r1713554096
const getClosingChunk = (): string => {
return decoder.decode()
}
return { decode, getClosingChunk }
}

function debugWithChunk(msg: string, chunk: unknown): void {
if (!debug.isActivated) return

let chunkStr: string
try {
return new TextDecoder().decode(chunk as any)
chunkStr = new TextDecoder().decode(chunk as any)
} catch (err) {
return String(chunk)
chunkStr = String(chunk)
}

debug(msg, chunkStr)
}

0 comments on commit 5d81041

Please sign in to comment.