Skip to content

Commit

Permalink
stream: improve readable webstream pipeTo
Browse files Browse the repository at this point in the history
PR-URL: nodejs#49690
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
Reviewed-By: Moshe Atlow <moshe@atlow.co.il>
  • Loading branch information
rluvaton authored and alexfernandez committed Nov 1, 2023
1 parent a434390 commit cdc6f45
Showing 1 changed file with 34 additions and 17 deletions.
51 changes: 34 additions & 17 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ const {
ObjectCreate,
ObjectDefineProperties,
ObjectSetPrototypeOf,
Promise,
PromisePrototypeThen,
PromiseResolve,
PromiseReject,
Expand Down Expand Up @@ -1351,7 +1350,9 @@ function readableStreamPipeTo(

const promise = createDeferredPromise();

let currentWrite = PromiseResolve();
const state = {
currentWrite: PromiseResolve(),
};

// The error here can be undefined. The rejected arg
// tells us that the promise must be rejected even
Expand All @@ -1368,9 +1369,9 @@ function readableStreamPipeTo(
}

async function waitForCurrentWrite() {
const write = currentWrite;
const write = state.currentWrite;
await write;
if (write !== currentWrite)
if (write !== state.currentWrite)
await waitForCurrentWrite();
}

Expand Down Expand Up @@ -1461,20 +1462,14 @@ function readableStreamPipeTo(
async function step() {
if (shuttingDown)
return true;

await writer[kState].ready.promise;
return new Promise((resolve, reject) => {
readableStreamDefaultReaderRead(
reader,
{
[kChunk](chunk) {
currentWrite = writableStreamDefaultWriterWrite(writer, chunk);
setPromiseHandled(currentWrite);
resolve(false);
},
[kClose]: () => resolve(true),
[kError]: reject,
});
});

const promise = createDeferredPromise();
// eslint-disable-next-line no-use-before-define
readableStreamDefaultReaderRead(reader, new PipeToReadableStreamReadRequest(writer, state, promise));

return promise.promise;
}

async function run() {
Expand Down Expand Up @@ -1536,6 +1531,28 @@ function readableStreamPipeTo(
return promise.promise;
}

class PipeToReadableStreamReadRequest {
constructor(writer, state, promise) {
this.writer = writer;
this.state = state;
this.promise = promise;
}

[kChunk](chunk) {
this.state.currentWrite = writableStreamDefaultWriterWrite(this.writer, chunk);
setPromiseHandled(this.state.currentWrite);
this.promise.resolve(false);
}

[kClose]() {
this.promise.resolve(true);
}

[kError](error) {
this.promise.reject(error);
}
}

function readableStreamTee(stream, cloneForBranch2) {
if (isReadableByteStreamController(stream[kState].controller)) {
return readableByteStreamTee(stream);
Expand Down

0 comments on commit cdc6f45

Please sign in to comment.