Skip to content

Commit

Permalink
Fix tee() incorrectly closing before enqueuing to the second branch
Browse files Browse the repository at this point in the history
  • Loading branch information
MattiasBuelens authored Oct 13, 2021
1 parent 02de71a commit ea03a24
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 11 deletions.
31 changes: 25 additions & 6 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -2241,6 +2241,7 @@ create them does not matter.
1. Assert: |cloneForBranch2| is a boolean.
1. Let |reader| be ? [$AcquireReadableStreamDefaultReader$](|stream|).
1. Let |reading| be false.
1. Let |readAgain| be false.
1. Let |canceled1| be false.
1. Let |canceled2| be false.
1. Let |reason1| be undefined.
Expand All @@ -2249,13 +2250,15 @@ create them does not matter.
1. Let |branch2| be undefined.
1. Let |cancelPromise| be [=a new promise=].
1. Let |pullAlgorithm| be the following steps:
1. If |reading| is true, return [=a promise resolved with=] undefined.
1. If |reading| is true,
1. Set |readAgain| to true.
1. Return [=a promise resolved with=] undefined.
1. Set |reading| to true.
1. Let |readRequest| be a [=read request=] with the following [=struct/items=]:
: [=read request/chunk steps=], given |chunk|
::
1. [=Queue a microtask=] to perform the following steps:
1. Set |reading| to false.
1. Set |readAgain| to false.
1. Let |chunk1| and |chunk2| be |chunk|.
1. If |canceled2| is false and |cloneForBranch2| is true,
1. Let |cloneResult| be [$StructuredClone$](|chunk2|).
Expand All @@ -2271,6 +2274,8 @@ create them does not matter.
1. If |canceled2| is false, perform !
[$ReadableStreamDefaultControllerEnqueue$](|branch2|.[=ReadableStream/[[controller]]=],
|chunk2|).
1. Set |reading| to false.
1. If |readAgain| is true, perform |pullAlgorithm|.

<p class="note">The microtask delay here is necessary because it takes at least a microtask to
detect errors, when we use |reader|.[=ReadableStreamGenericReader/[[closedPromise]]=] below.
Expand Down Expand Up @@ -2331,6 +2336,8 @@ create them does not matter.
{{ReadableByteStreamController}}.
1. Let |reader| be ? [$AcquireReadableStreamDefaultReader$](|stream|).
1. Let |reading| be false.
1. Let |readAgainForBranch1| be false.
1. Let |readAgainForBranch2| be false.
1. Let |canceled1| be false.
1. Let |canceled2| be false.
1. Let |reason1| be undefined.
Expand All @@ -2357,7 +2364,8 @@ create them does not matter.
: [=read request/chunk steps=], given |chunk|
::
1. [=Queue a microtask=] to perform the following steps:
1. Set |reading| to false.
1. Set |readAgainForBranch1| to false.
1. Set |readAgainForBranch2| to false.
1. Let |chunk1| and |chunk2| be |chunk|.
1. If |canceled1| is false and |canceled2| is false,
1. Let |cloneResult| be [$CloneAsUint8Array$](|chunk|).
Expand All @@ -2373,6 +2381,9 @@ create them does not matter.
1. If |canceled2| is false, perform !
[$ReadableByteStreamControllerEnqueue$](|branch2|.[=ReadableStream/[[controller]]=],
|chunk2|).
1. Set |reading| to false.
1. If |readAgainForBranch1| is true, perform |pull1Algorithm|.
1. Otherwise, if |readAgainForBranch2| is true, perform |pull2Algorithm|.

<p class="note">The microtask delay here is necessary because it takes at least a microtask to
detect errors, when we use |reader|.[=ReadableStreamGenericReader/[[closedPromise]]=] below.
Expand Down Expand Up @@ -2410,7 +2421,8 @@ create them does not matter.
: [=read-into request/chunk steps=], given |chunk|
::
1. [=Queue a microtask=] to perform the following steps:
1. Set |reading| to false.
1. Set |readAgainForBranch1| to false.
1. Set |readAgainForBranch2| to false.
1. Let |byobCanceled| be |canceled2| if |forBranch2| is true, and |canceled1| otherwise.
1. Let |otherCanceled| be |canceled2| if |forBranch2| is false, and |canceled1| otherwise.
1. If |otherCanceled| is false,
Expand All @@ -2429,6 +2441,9 @@ create them does not matter.
1. Otherwise, if |byobCanceled| is false, perform !
[$ReadableByteStreamControllerRespondWithNewView$](|byobBranch|.[=ReadableStream/[[controller]]=],
|chunk|).
1. Set |reading| to false.
1. If |readAgainForBranch1| is true, perform |pull1Algorithm|.
1. Otherwise, if |readAgainForBranch2| is true, perform |pull2Algorithm|.

<p class="note">The microtask delay here is necessary because it takes at least a microtask to
detect errors, when we use |reader|.[=ReadableStreamGenericReader/[[closedPromise]]=] below.
Expand Down Expand Up @@ -2460,14 +2475,18 @@ create them does not matter.
1. Set |reading| to false.
1. Perform ! [$ReadableStreamBYOBReaderRead$](|reader|, |view|, |readIntoRequest|).
1. Let |pull1Algorithm| be the following steps:
1. If |reading| is true, return [=a promise resolved with=] undefined.
1. If |reading| is true,
1. Set |readAgainForBranch1| to true.
1. Return [=a promise resolved with=] undefined.
1. Set |reading| to true.
1. Let |byobRequest| be ! [$ReadableByteStreamControllerGetBYOBRequest$](|branch1|.[=ReadableStream/[[controller]]=]).
1. If |byobRequest| is null, perform |pullWithDefaultReader|.
1. Otherwise, perform |pullWithBYOBReader|, given |byobRequest|.[=ReadableStreamBYOBRequest/[[view]]=] and false.
1. Return [=a promise resolved with=] undefined.
1. Let |pull2Algorithm| be the following steps:
1. If |reading| is true, return [=a promise resolved with=] undefined.
1. If |reading| is true,
1. Set |readAgainForBranch2| to true.
1. Return [=a promise resolved with=] undefined.
1. Set |reading| to true.
1. Let |byobRequest| be ! [$ReadableByteStreamControllerGetBYOBRequest$](|branch2|.[=ReadableStream/[[controller]]=]).
1. If |byobRequest| is null, perform |pullWithDefaultReader|.
Expand Down
34 changes: 30 additions & 4 deletions reference-implementation/lib/abstract-ops/readable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ function ReadableStreamDefaultTee(stream, cloneForBranch2) {
const reader = AcquireReadableStreamDefaultReader(stream);

let reading = false;
let readAgain = false;
let canceled1 = false;
let canceled2 = false;
let reason1;
Expand All @@ -359,6 +360,7 @@ function ReadableStreamDefaultTee(stream, cloneForBranch2) {

function pullAlgorithm() {
if (reading === true) {
readAgain = true;
return promiseResolvedWith(undefined);
}

Expand All @@ -370,7 +372,7 @@ function ReadableStreamDefaultTee(stream, cloneForBranch2) {
// reader._closedPromise below), and we want errors in stream to error both branches immediately. We cannot let
// successful synchronously-available reads get ahead of asynchronously-available errors.
queueMicrotask(() => {
reading = false;
readAgain = false;
const chunk1 = chunk;
const chunk2 = chunk;

Expand All @@ -390,10 +392,14 @@ function ReadableStreamDefaultTee(stream, cloneForBranch2) {
if (canceled1 === false) {
ReadableStreamDefaultControllerEnqueue(branch1._controller, chunk1);
}

if (canceled2 === false) {
ReadableStreamDefaultControllerEnqueue(branch2._controller, chunk2);
}

reading = false;
if (readAgain === true) {
pullAlgorithm();
}
});
},
closeSteps: () => {
Expand Down Expand Up @@ -460,6 +466,8 @@ function ReadableByteStreamTee(stream) {
assert(ReadableByteStreamController.isImpl(stream._controller));

let reader = AcquireReadableStreamDefaultReader(stream);
let readAgainForBranch1 = false;
let readAgainForBranch2 = false;
let reading = false;
let canceled1 = false;
let canceled2 = false;
Expand Down Expand Up @@ -498,7 +506,8 @@ function ReadableByteStreamTee(stream) {
// reader._closedPromise below), and we want errors in stream to error both branches immediately. We cannot let
// successful synchronously-available reads get ahead of asynchronously-available errors.
queueMicrotask(() => {
reading = false;
readAgainForBranch1 = false;
readAgainForBranch2 = false;

const chunk1 = chunk;
let chunk2 = chunk;
Expand All @@ -519,6 +528,13 @@ function ReadableByteStreamTee(stream) {
if (canceled2 === false) {
ReadableByteStreamControllerEnqueue(branch2._controller, chunk2);
}

reading = false;
if (readAgainForBranch1 === true) {
pull1Algorithm();
} else if (readAgainForBranch2 === true) {
pull2Algorithm();
}
});
},
closeSteps: () => {
Expand Down Expand Up @@ -564,7 +580,8 @@ function ReadableByteStreamTee(stream) {
// reader._closedPromise below), and we want errors in stream to error both branches immediately. We cannot let
// successful synchronously-available reads get ahead of asynchronously-available errors.
queueMicrotask(() => {
reading = false;
readAgainForBranch1 = false;
readAgainForBranch2 = false;

const byobCanceled = forBranch2 ? canceled2 : canceled1;
const otherCanceled = forBranch2 ? canceled1 : canceled2;
Expand All @@ -586,6 +603,13 @@ function ReadableByteStreamTee(stream) {
} else if (byobCanceled === false) {
ReadableByteStreamControllerRespondWithNewView(byobBranch._controller, chunk);
}

reading = false;
if (readAgainForBranch1 === true) {
pull1Algorithm();
} else if (readAgainForBranch2 === true) {
pull2Algorithm();
}
});
},
closeSteps: chunk => {
Expand Down Expand Up @@ -625,6 +649,7 @@ function ReadableByteStreamTee(stream) {

function pull1Algorithm() {
if (reading === true) {
readAgainForBranch1 = true;
return promiseResolvedWith(undefined);
}

Expand All @@ -642,6 +667,7 @@ function ReadableByteStreamTee(stream) {

function pull2Algorithm() {
if (reading === true) {
readAgainForBranch2 = true;
return promiseResolvedWith(undefined);
}

Expand Down

0 comments on commit ea03a24

Please sign in to comment.