Skip to content

Commit

Permalink
ReadableStream: Reject pending reads when releasing reader
Browse files Browse the repository at this point in the history
Previously, calling releaseLock() on ReadableStreamDefaultReader or
ReadableStreamBYOBReader while there are pending read requests would
throw a TypeError. The specification has been changed[1] to allow this
case, and to reject such pending read requests with a TypeError
instead.

[1] whatwg/streams#1168

Bug: 1287273
Change-Id: Id4013571212e20b0d6ecccdcf68cd6d3927d38b2
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3750760
Commit-Queue: Nidhi Jaju <nidhijaju@chromium.org>
Reviewed-by: Adam Rice <ricea@chromium.org>
Cr-Commit-Position: refs/heads/main@{#1023012}
NOKEYCHECK=True
GitOrigin-RevId: 1e39b24fb910a075f4d337e6bced80f117dfbd7a
  • Loading branch information
nidhijaju authored and copybara-github committed Jul 12, 2022
1 parent 0595e91 commit deb688d
Show file tree
Hide file tree
Showing 27 changed files with 391 additions and 1,028 deletions.
226 changes: 188 additions & 38 deletions blink/renderer/core/streams/readable_byte_stream_controller.cc

Large diffs are not rendered by default.

26 changes: 24 additions & 2 deletions blink/renderer/core/streams/readable_byte_stream_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class ReadableByteStreamController : public ReadableStreamController {
void Trace(Visitor*) const;
};

enum class ReaderType { kDefault, kBYOB };
enum class ReaderType { kDefault, kBYOB, kNone };

// https://streams.spec.whatwg.org/#pull-into-descriptor
struct PullIntoDescriptor final
Expand Down Expand Up @@ -106,7 +106,7 @@ class ReadableByteStreamController : public ReadableStreamController {
size_t bytes_filled;
const size_t element_size;
const ViewConstructorType view_constructor;
const ReaderType reader_type;
ReaderType reader_type;

void Trace(Visitor*) const;
};
Expand All @@ -131,12 +131,26 @@ class ReadableByteStreamController : public ReadableStreamController {
size_t byte_offset,
size_t byte_length);

// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueueclonedchunktoqueue
static void EnqueueClonedChunkToQueue(ReadableByteStreamController*,
DOMArrayBuffer*,
size_t byte_offset,
size_t byte_length);

// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueuedetachedpullintotoqueue
static void EnqueueDetachedPullIntoToQueue(ReadableByteStreamController*,
PullIntoDescriptor*);

// https://streams.spec.whatwg.org/#readable-byte-stream-controller-process-pull-into-descriptors-using-queue
static void ProcessPullIntoDescriptorsUsingQueue(
ScriptState*,
ReadableByteStreamController*,
ExceptionState&);

// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue
static void ProcessReadRequestsUsingQueue(ScriptState*,
ReadableByteStreamController*);

// https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed
static void CallPullIfNeeded(ScriptState*, ReadableByteStreamController*);

Expand Down Expand Up @@ -196,6 +210,11 @@ class ReadableByteStreamController : public ReadableStreamController {
static bool FillPullIntoDescriptorFromQueue(ReadableByteStreamController*,
PullIntoDescriptor*);

// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerfillreadrequestfromqueue
static void FillReadRequestFromQueue(ScriptState*,
ReadableByteStreamController*,
StreamPromiseResolver* read_request);

// https://streams.spec.whatwg.org/#readable-byte-stream-controller-pull-into
static void PullInto(ScriptState*,
ReadableByteStreamController*,
Expand Down Expand Up @@ -255,6 +274,9 @@ class ReadableByteStreamController : public ReadableStreamController {
// https://streams.spec.whatwg.org/#rbs-controller-private-pull
StreamPromiseResolver* PullSteps(ScriptState*) override;

// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontroller-releasesteps
void ReleaseSteps() override;

// autoAllocateChunkSize is encoded as 0 when it is undefined
size_t auto_allocate_chunk_size_ = 0u;
Member<ReadableStreamBYOBRequest> byob_request_;
Expand Down
108 changes: 58 additions & 50 deletions blink/renderer/core/streams/readable_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -701,18 +701,31 @@ class ReadableStream::PipeToEngine final
// a. Perform ! WritableStreamDefaultWriterRelease(writer).
WritableStreamDefaultWriter::Release(script_state_, writer_);

// b. Perform ! ReadableStreamReaderGenericRelease(reader).
ReadableStreamGenericReader::GenericRelease(script_state_, reader_);
// b. If reader implements ReadableStreamBYOBReader, perform !
// ReadableStreamBYOBReaderRelease(reader).
if (reader_->IsBYOBReader()) {
ReadableStreamGenericReader* reader = reader_;
ReadableStreamBYOBReader* byob_reader =
To<ReadableStreamBYOBReader>(reader);
ReadableStreamBYOBReader::Release(script_state_, byob_reader);
} else {
// c. Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader).
DCHECK(reader_->IsDefaultReader());
ReadableStreamGenericReader* reader = reader_;
ReadableStreamDefaultReader* default_reader =
To<ReadableStreamDefaultReader>(reader);
ReadableStreamDefaultReader::Release(script_state_, default_reader);
}

// TODO(ricea): Implement signal.
// c. If signal is not undefined, remove abortAlgorithm from signal.
// d. If signal is not undefined, remove abortAlgorithm from signal.

v8::Local<v8::Value> error;
if (error_maybe.ToLocal(&error)) {
// d. If error was given, reject promise with error.
// e. If error was given, reject promise with error.
promise_->Reject(script_state_, error);
} else {
// e. Otherwise, resolve promise with undefined.
// f. Otherwise, resolve promise with undefined.
promise_->ResolveWithUndefined(script_state_);
}
}
Expand Down Expand Up @@ -1907,20 +1920,26 @@ v8::Local<v8::Promise> ReadableStream::Cancel(ScriptState* script_state,
// 6. If reader is not undefined and reader implements
// ReadableStreamBYOBReader,
if (reader && reader->IsBYOBReader()) {
// a. For each readIntoRequest of reader.[[readIntoRequests]],
// a. Let readIntoRequests be reader.[[readIntoRequests]].
ReadableStreamBYOBReader* byob_reader =
To<ReadableStreamBYOBReader>(reader);
HeapDeque<Member<ReadableStreamBYOBReader::ReadIntoRequest>>
read_into_requests;
read_into_requests.Swap(byob_reader->read_into_requests_);

// b. Set reader.[[readIntoRequests]] to an empty list.
// This is not required since we've already called Swap().

// c. For each readIntoRequest of readIntoRequests,
for (ReadableStreamBYOBReader::ReadIntoRequest* request :
byob_reader->read_into_requests_) {
read_into_requests) {
// i. Perform readIntoRequest's close steps, given undefined.
request->CloseSteps(script_state, nullptr);
}
// b. Set reader.[[readIntoRequests]] to an empty list.
byob_reader->read_into_requests_.clear();
}

// 7. Let sourceCancelPromise be ! stream.[[readableStreamController]].
// [[CancelSteps]](reason).
// 7. Let sourceCancelPromise be !
// stream.[[controller]].[[CancelSteps]](reason).
v8::Local<v8::Promise> source_cancel_promise =
stream->readable_stream_controller_->CancelSteps(script_state, reason);

Expand All @@ -1934,8 +1953,8 @@ v8::Local<v8::Promise> ReadableStream::Cancel(ScriptState* script_state,
v8::Local<v8::Value>) override {}
};

// 8. Return the result of transforming sourceCancelPromise with a
// fulfillment handler that returns undefined.
// 8. Return the result of reacting to sourceCancelPromise with a
// fulfillment step that returns undefined.
return StreamThenPromise(
script_state->GetContext(), source_cancel_promise,
MakeGarbageCollected<ScriptFunction>(
Expand All @@ -1962,16 +1981,23 @@ void ReadableStream::Close(ScriptState* script_state, ReadableStream* stream) {
if (ExecutionContext::From(script_state)->IsContextDestroyed())
return;

// 5. If ! IsReadableStreamDefaultReader(reader) is true,
// 5. Resolve reader.[[closedPromise]] with undefined.
reader->closed_promise_->ResolveWithUndefined(script_state);

// 6. If reader implements ReadableStreamDefaultReader,
if (reader->IsDefaultReader()) {
// a. Repeat for each readRequest that is an element of reader.
// [[readRequests]],
// a. Let readRequests be reader.[[readRequests]].
HeapDeque<Member<StreamPromiseResolver>> requests;
requests.Swap(To<ReadableStreamDefaultReader>(reader)->read_requests_);
bool for_author_code =
To<ReadableStreamDefaultReader>(reader)->for_author_code_;
// b. Set reader.[[readRequests]] to an empty list.`
// This is not required since we've already called Swap()

// c. Repeat for each readRequest that is an element of reader.
// [[readRequests]],
for (StreamPromiseResolver* promise : requests) {
// i. Resolve readRequest.[[promise]] with !
// i. Resolve readRequest.[[promise]] with !
// ReadableStreamCreateReadResult(undefined, true, reader.
// [[forAuthorCode]]).
promise->Resolve(
Expand All @@ -1980,12 +2006,7 @@ void ReadableStream::Close(ScriptState* script_state, ReadableStream* stream) {
v8::Undefined(script_state->GetIsolate()), true,
for_author_code));
}

// b. Set reader.[[readRequests]] to an empty List.
// This is not required since we've already called Swap().
}
// 6. Resolve reader.[[closedPromise]] with undefined.
reader->closed_promise_->ResolveWithUndefined(script_state);
}

v8::Local<v8::Value> ReadableStream::CreateReadResult(
Expand Down Expand Up @@ -2056,42 +2077,29 @@ void ReadableStream::Error(ScriptState* script_state,
return;
}

// 6. If ! IsReadableStreamDefaultReader(reader) is true,
// a. Repeat for each readRequest that is an element of reader.
// [[readRequests]],
// 6. Reject reader.[[closedPromise]] with e.
reader->closed_promise_->Reject(script_state, e);

// 7. Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
reader->closed_promise_->MarkAsHandled(isolate);

// 8. If reader implements ReadableStreamDefaultReader,
if (reader->IsDefaultReader()) {
// a. Perform ! ReadableStreamDefaultReaderErrorReadRequests(reader, e).
ReadableStreamDefaultReader* default_reader =
To<ReadableStreamDefaultReader>(reader);
for (StreamPromiseResolver* promise : default_reader->read_requests_) {
// i. Reject readRequest.[[promise]] with e.
promise->Reject(script_state, e);
}

// b. Set reader.[[readRequests]] to a new empty List.
default_reader->read_requests_.clear();
}

// 7. Otherwise,
else {
ReadableStreamDefaultReader::ErrorReadRequests(script_state, default_reader,
e);
} else {
// 9. Otherwise,
// a. Assert: reader implements ReadableStreamBYOBReader.
DCHECK(reader->IsBYOBReader());
// b. For each readIntoRequest of reader.[[readIntoRequests]],
// b. Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
ReadableStreamBYOBReader* byob_reader =
To<ReadableStreamBYOBReader>(reader);
for (ReadableStreamBYOBReader::ReadIntoRequest* request :
byob_reader->read_into_requests_) {
// i. Perform readIntoRequests' error steps, given e.
request->ErrorSteps(script_state, e);
}
// c. Set reader.[[readIntoRequests]] to a new empty list.
byob_reader->read_into_requests_.clear();
ReadableStreamBYOBReader::ErrorReadIntoRequests(script_state, byob_reader,
e);
}

// 8. Reject reader.[[closedPromise]] with e.
reader->closed_promise_->Reject(script_state, e);

// 9. Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
reader->closed_promise_->MarkAsHandled(isolate);
}

void ReadableStream::FulfillReadIntoRequest(ScriptState* script_state,
Expand Down
43 changes: 33 additions & 10 deletions blink/renderer/core/streams/readable_stream_byob_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "third_party/blink/renderer/platform/bindings/script_state.h"
#include "third_party/blink/renderer/platform/bindings/to_v8.h"
#include "third_party/blink/renderer/platform/bindings/v8_binding.h"
#include "third_party/blink/renderer/platform/bindings/v8_throw_exception.h"

namespace blink {

Expand Down Expand Up @@ -166,6 +167,36 @@ void ReadableStreamBYOBReader::Read(ScriptState* script_state,
}
}

void ReadableStreamBYOBReader::ErrorReadIntoRequests(
ScriptState* script_state,
ReadableStreamBYOBReader* reader,
v8::Local<v8::Value> e) {
// https://streams.spec.whatwg.org/#abstract-opdef-readablestreambyobreadererrorreadintorequests
// 1. Let readIntoRequests be reader.[[readIntoRequests]].
// 2. Set reader.[[readIntoRequests]] to a new empty list.
// 3. For each readIntoRequest of readIntoRequests,
for (ReadableStreamBYOBReader::ReadIntoRequest* request :
reader->read_into_requests_) {
// a. Perform readIntoRequest’s error steps, given e.
request->ErrorSteps(script_state, e);
}
reader->read_into_requests_.clear();
}

void ReadableStreamBYOBReader::Release(ScriptState* script_state,
ReadableStreamBYOBReader* reader) {
// https://streams.spec.whatwg.org/#abstract-opdef-readablestreambyobreaderrelease
// 1. Perform ! ReadableStreamReaderGenericRelease(reader).
ReadableStreamGenericReader::GenericRelease(script_state, reader);

// 2. Let e be a new TypeError exception.
v8::Local<v8::Value> e = V8ThrowException::CreateTypeError(
script_state->GetIsolate(), "Releasing BYOB reader");

// 3. Perform ! ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e).
ErrorReadIntoRequests(script_state, reader, e);
}

void ReadableStreamBYOBReader::releaseLock(ScriptState* script_state,
ExceptionState& exception_state) {
// https://streams.spec.whatwg.org/#byob-reader-release-lock
Expand All @@ -174,16 +205,8 @@ void ReadableStreamBYOBReader::releaseLock(ScriptState* script_state,
return;
}

// 2. If this.[[readIntoRequests]] is not empty, throw a TypeError exception.
if (read_into_requests_.size() > 0) {
exception_state.ThrowTypeError(
"Cannot release a readable stream reader when it still has outstanding "
"read() calls that have not yet settled");
return;
}

// 3. Perform ! ReadableStreamReaderGenericRelease(this).
GenericRelease(script_state, this);
// 2. Perform ! ReadableStreamBYOBReaderRelease(this).
Release(script_state, this);
}

void ReadableStreamBYOBReader::SetUpBYOBReader(
Expand Down
12 changes: 12 additions & 0 deletions blink/renderer/core/streams/readable_stream_byob_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,25 @@ class CORE_EXPORT ReadableStreamBYOBReader
Member<StreamPromiseResolver> resolver_;
};

//
// Readable stream reader abstract operations
//

// https://streams.spec.whatwg.org/#readable-stream-byob-reader-read
static void Read(ScriptState*,
ReadableStreamBYOBReader*,
NotShared<DOMArrayBufferView> view,
ReadIntoRequest*,
ExceptionState&);

// https://streams.spec.whatwg.org/#abstract-opdef-readablestreambyobreadererrorreadintorequests
static void ErrorReadIntoRequests(ScriptState*,
ReadableStreamBYOBReader*,
v8::Local<v8::Value> e);

// https://streams.spec.whatwg.org/#abstract-opdef-readablestreambyobreaderrelease
static void Release(ScriptState*, ReadableStreamBYOBReader*);

HeapDeque<Member<ReadIntoRequest>> read_into_requests_;
};

Expand Down
3 changes: 3 additions & 0 deletions blink/renderer/core/streams/readable_stream_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ class ReadableStreamController : public ScriptWrappable {

// https://streams.spec.whatwg.org/#abstract-opdef-readablestreamcontroller-pullsteps
virtual StreamPromiseResolver* PullSteps(ScriptState*) = 0;

// https://streams.spec.whatwg.org/#abstract-opdef-readablestreamcontroller-releasesteps
virtual void ReleaseSteps() = 0;
};

} // namespace blink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,12 @@ StreamPromiseResolver* ReadableStreamDefaultController::PullSteps(
return pendingPromise;
}

void ReadableStreamDefaultController::ReleaseSteps() {
// https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultcontroller-releasesteps
// 1. Return.
return;
}

//
// Readable Stream Default Controller Abstract Operations
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ class ReadableStreamDefaultController : public ReadableStreamController {
// https://streams.spec.whatwg.org/#rs-default-controller-private-pull
StreamPromiseResolver* PullSteps(ScriptState*) override;

// https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaultcontroller-releasesteps
void ReleaseSteps() override;

private:
friend class ReadableStream;
friend class ReadableStreamDefaultReader;
Expand Down
Loading

0 comments on commit deb688d

Please sign in to comment.