From d5f92d9f17306d31ba6b27424d23d58e89bf64a5 Mon Sep 17 00:00:00 2001 From: Mattias Buelens <649348+MattiasBuelens@users.noreply.github.com> Date: Thu, 13 Jan 2022 21:12:23 +0100 Subject: [PATCH] Reject pending reads when releasing a reader Currently, reader.releaseLock() throws an error if there are still pending read requests. However, in #1103 we realized that it would be more useful if we allowed releasing a reader while there are still pending reads, which would reject those reads instead. The user can then acquire a new reader to receive those chunks. For readable byte streams, we also discard pull-into descriptors corresponding to (now rejected) read-into requests. However, we must retain the first pull-into descriptor, since the underlying byte source may still be using it through controller.byobRequest. Instead, we mark this descriptor as "detached", such that when we invalidate this BYOB request (as a result of controller.enqueue() or byobRequest.respond()), the bytes from this descriptor are pushed into the stream's queue and used to fulfill read requests from a future reader. This also allows expressing pipeTo()'s behavior more accurately. Currently, it "cheats" by calling ReadableStreamReaderGenericRelease directly without checking if [[readRequests]] is empty. With the proposed changes, we can safely release the reader when the pipe finishes (even if there's a pending read), and be sure that any unread chunks can be read by a future reader or pipe. --- index.bs | 235 +++++++++++++----- .../lib/ReadableByteStreamController-impl.js | 21 +- .../lib/ReadableStream-impl.js | 8 +- .../lib/ReadableStreamBYOBReader-impl.js | 6 +- .../ReadableStreamDefaultController-impl.js | 4 +- .../lib/ReadableStreamDefaultReader-impl.js | 6 +- .../lib/abstract-ops/internal-methods.js | 1 + .../lib/abstract-ops/readable-streams.js | 149 ++++++++--- reference-implementation/web-platform-tests | 2 +- 9 files changed, 320 insertions(+), 112 deletions(-) diff --git a/index.bs b/index.bs index e1aa4b2aa..692c85fcc 100644 --- a/index.bs +++ b/index.bs @@ -1023,11 +1023,11 @@ default-reader-asynciterator-prototype-internal-slots">Asynchronous iterationAsynchronous iteration @@ -1243,9 +1243,10 @@ to filling the [=readable stream=]'s [=internal queue=] or changing its state. I when the lock is released, the reader will appear errored in the same way from now on; otherwise, the reader will appear closed. -
A reader's lock cannot be released while it still has a pending read request, i.e., if a - promise returned by the reader's {{ReadableStreamDefaultReader/read()}} method has not yet been - settled. Attempting to do so will throw a {{TypeError}} and leave the reader locked to the stream. +
If the reader's lock is released while it still has pending read requests, then the + promises returned by the reader's {{ReadableStreamDefaultReader/read()}} method are immediately + rejected with a {{TypeError}}. Any unread chunks remain in the stream's [=internal queue=] and can + be read later by acquiring a new reader.
A reader's lock cannot be released while it still has a pending read request, i.e., if a - promise returned by the reader's {{ReadableStreamBYOBReader/read()}} method has not yet been - settled. Attempting to do so will throw a {{TypeError}} and leave the reader locked to the stream. +
If the reader's lock is released while it still has pending read requests, then the + promises returned by the reader's {{ReadableStreamBYOBReader/read()}} method are immediately + rejected with a {{TypeError}}. Any unread chunks remain in the stream's [=internal queue=] and can + be read later by acquiring a new reader.
To release a {{ReadableStreamDefaultReader}} |reader|, perform ! -[$ReadableStreamReaderGenericRelease$](|reader|). +[$ReadableStreamDefaultReaderRelease$](|reader|).
To cancel a {{ReadableStreamDefaultReader}} |reader| with |reason|, perform ! diff --git a/reference-implementation/lib/ReadableByteStreamController-impl.js b/reference-implementation/lib/ReadableByteStreamController-impl.js index 215d72452..66cbe2278 100644 --- a/reference-implementation/lib/ReadableByteStreamController-impl.js +++ b/reference-implementation/lib/ReadableByteStreamController-impl.js @@ -1,7 +1,7 @@ 'use strict'; const assert = require('assert'); -const { CancelSteps, PullSteps } = require('./abstract-ops/internal-methods.js'); +const { CancelSteps, PullSteps, ReleaseSteps } = require('./abstract-ops/internal-methods.js'); const { ResetQueue } = require('./abstract-ops/queue-with-sizes.js'); const aos = require('./abstract-ops/readable-streams.js'); @@ -67,15 +67,7 @@ exports.implementation = class ReadableByteStreamControllerImpl { if (this._queueTotalSize > 0) { assert(aos.ReadableStreamGetNumReadRequests(stream) === 0); - - const entry = this._queue.shift(); - this._queueTotalSize -= entry.byteLength; - - aos.ReadableByteStreamControllerHandleQueueDrain(this); - - const view = new Uint8Array(entry.buffer, entry.byteOffset, entry.byteLength); - - readRequest.chunkSteps(view); + aos.ReadableByteStreamControllerFillReadRequestFromQueue(this, readRequest); return; } @@ -106,4 +98,13 @@ exports.implementation = class ReadableByteStreamControllerImpl { aos.ReadableStreamAddReadRequest(stream, readRequest); aos.ReadableByteStreamControllerCallPullIfNeeded(this); } + + [ReleaseSteps]() { + if (this._pendingPullIntos.length > 0) { + const firstPullInto = this._pendingPullIntos[0]; + firstPullInto.readerType = 'none'; + + this._pendingPullIntos = [firstPullInto]; + } + } }; diff --git a/reference-implementation/lib/ReadableStream-impl.js b/reference-implementation/lib/ReadableStream-impl.js index 5bff0d165..430261850 100644 --- a/reference-implementation/lib/ReadableStream-impl.js +++ b/reference-implementation/lib/ReadableStream-impl.js @@ -129,11 +129,11 @@ exports.implementation = class ReadableStreamImpl { const readRequest = { chunkSteps: chunk => resolvePromise(promise, chunk), closeSteps: () => { - aos.ReadableStreamReaderGenericRelease(reader); + aos.ReadableStreamDefaultReaderRelease(reader); resolvePromise(promise, idlUtils.asyncIteratorEOI); }, errorSteps: e => { - aos.ReadableStreamReaderGenericRelease(reader); + aos.ReadableStreamDefaultReaderRelease(reader); rejectPromise(promise, e); } }; @@ -151,11 +151,11 @@ exports.implementation = class ReadableStreamImpl { if (iterator._preventCancel === false) { const result = aos.ReadableStreamReaderGenericCancel(reader, arg); - aos.ReadableStreamReaderGenericRelease(reader); + aos.ReadableStreamDefaultReaderRelease(reader); return result; } - aos.ReadableStreamReaderGenericRelease(reader); + aos.ReadableStreamDefaultReaderRelease(reader); return promiseResolvedWith(undefined); } }; diff --git a/reference-implementation/lib/ReadableStreamBYOBReader-impl.js b/reference-implementation/lib/ReadableStreamBYOBReader-impl.js index fe605ea14..446846a45 100644 --- a/reference-implementation/lib/ReadableStreamBYOBReader-impl.js +++ b/reference-implementation/lib/ReadableStreamBYOBReader-impl.js @@ -41,11 +41,7 @@ class ReadableStreamBYOBReaderImpl { return; } - if (this._readIntoRequests.length > 0) { - throw new TypeError('Tried to release a reader lock when that reader has pending read() calls un-settled'); - } - - aos.ReadableStreamReaderGenericRelease(this); + aos.ReadableStreamBYOBReaderRelease(this); } } diff --git a/reference-implementation/lib/ReadableStreamDefaultController-impl.js b/reference-implementation/lib/ReadableStreamDefaultController-impl.js index f91f2d43e..5c7ec7033 100644 --- a/reference-implementation/lib/ReadableStreamDefaultController-impl.js +++ b/reference-implementation/lib/ReadableStreamDefaultController-impl.js @@ -1,6 +1,6 @@ 'use strict'; -const { CancelSteps, PullSteps } = require('./abstract-ops/internal-methods.js'); +const { CancelSteps, PullSteps, ReleaseSteps } = require('./abstract-ops/internal-methods.js'); const { DequeueValue, ResetQueue } = require('./abstract-ops/queue-with-sizes.js'); const aos = require('./abstract-ops/readable-streams.js'); @@ -55,4 +55,6 @@ exports.implementation = class ReadableStreamDefaultControllerImpl { aos.ReadableStreamDefaultControllerCallPullIfNeeded(this); } } + + [ReleaseSteps]() {} }; diff --git a/reference-implementation/lib/ReadableStreamDefaultReader-impl.js b/reference-implementation/lib/ReadableStreamDefaultReader-impl.js index f54fddccf..519cdafdd 100644 --- a/reference-implementation/lib/ReadableStreamDefaultReader-impl.js +++ b/reference-implementation/lib/ReadableStreamDefaultReader-impl.js @@ -31,11 +31,7 @@ class ReadableStreamDefaultReaderImpl { return; } - if (this._readRequests.length > 0) { - throw new TypeError('Tried to release a reader lock when that reader has pending read() calls un-settled'); - } - - aos.ReadableStreamReaderGenericRelease(this); + aos.ReadableStreamDefaultReaderRelease(this); } } diff --git a/reference-implementation/lib/abstract-ops/internal-methods.js b/reference-implementation/lib/abstract-ops/internal-methods.js index 1debfa6a7..a858e602e 100644 --- a/reference-implementation/lib/abstract-ops/internal-methods.js +++ b/reference-implementation/lib/abstract-ops/internal-methods.js @@ -4,3 +4,4 @@ exports.AbortSteps = Symbol('[[AbortSteps]]'); exports.ErrorSteps = Symbol('[[ErrorSteps]]'); exports.CancelSteps = Symbol('[[CancelSteps]]'); exports.PullSteps = Symbol('[[PullSteps]]'); +exports.ReleaseSteps = Symbol('[[ReleaseSteps]]'); diff --git a/reference-implementation/lib/abstract-ops/readable-streams.js b/reference-implementation/lib/abstract-ops/readable-streams.js index 4186d85d4..db1da4c73 100644 --- a/reference-implementation/lib/abstract-ops/readable-streams.js +++ b/reference-implementation/lib/abstract-ops/readable-streams.js @@ -11,7 +11,7 @@ const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js'); const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort, WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease, WritableStreamDefaultWriterWrite, WritableStreamCloseQueuedOrInFlight } = require('./writable-streams.js'); -const { CancelSteps, PullSteps } = require('./internal-methods.js'); +const { CancelSteps, PullSteps, ReleaseSteps } = require('./internal-methods.js'); const ReadableByteStreamController = require('../../generated/ReadableByteStreamController.js'); const ReadableStreamBYOBReader = require('../../generated/ReadableStreamBYOBReader.js'); @@ -34,6 +34,7 @@ Object.assign(exports, { ReadableByteStreamControllerClose, ReadableByteStreamControllerEnqueue, ReadableByteStreamControllerError, + ReadableByteStreamControllerFillReadRequestFromQueue, ReadableByteStreamControllerGetBYOBRequest, ReadableByteStreamControllerGetDesiredSize, ReadableByteStreamControllerHandleQueueDrain, @@ -41,6 +42,7 @@ Object.assign(exports, { ReadableByteStreamControllerRespondWithNewView, ReadableStreamAddReadRequest, ReadableStreamBYOBReaderRead, + ReadableStreamBYOBReaderRelease, ReadableStreamCancel, ReadableStreamClose, ReadableStreamDefaultControllerCallPullIfNeeded, @@ -52,11 +54,11 @@ Object.assign(exports, { ReadableStreamDefaultControllerGetDesiredSize, ReadableStreamDefaultControllerHasBackpressure, ReadableStreamDefaultReaderRead, + ReadableStreamDefaultReaderRelease, ReadableStreamGetNumReadRequests, ReadableStreamHasDefaultReader, ReadableStreamPipeTo, ReadableStreamReaderGenericCancel, - ReadableStreamReaderGenericRelease, ReadableStreamTee, SetUpReadableByteStreamControllerFromUnderlyingSource, SetUpReadableStreamBYOBReader, @@ -318,7 +320,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC function finalize(isError, error) { WritableStreamDefaultWriterRelease(writer); - ReadableStreamReaderGenericRelease(reader); + ReadableStreamDefaultReaderRelease(reader); if (signal !== undefined) { signal.removeEventListener('abort', abortAlgorithm); @@ -494,7 +496,7 @@ function ReadableByteStreamTee(stream) { function pullWithDefaultReader() { if (ReadableStreamBYOBReader.isImpl(reader)) { assert(reader._readIntoRequests.length === 0); - ReadableStreamReaderGenericRelease(reader); + ReadableStreamBYOBReaderRelease(reader); reader = AcquireReadableStreamDefaultReader(stream); forwardReaderError(reader); @@ -565,7 +567,7 @@ function ReadableByteStreamTee(stream) { function pullWithBYOBReader(view, forBranch2) { if (ReadableStreamDefaultReader.isImpl(reader)) { assert(reader._readRequests.length === 0); - ReadableStreamReaderGenericRelease(reader); + ReadableStreamDefaultReaderRelease(reader); reader = AcquireReadableStreamBYOBReader(stream); forwardReaderError(reader); @@ -745,10 +747,11 @@ function ReadableStreamCancel(stream, reason) { const reader = stream._reader; if (reader !== undefined && ReadableStreamBYOBReader.isImpl(reader)) { - for (const readIntoRequest of reader._readIntoRequests) { + const readIntoRequests = reader._readIntoRequests; + reader._readIntoRequests = []; + for (const readIntoRequest of readIntoRequests) { readIntoRequest.closeSteps(undefined); } - reader._readIntoRequests = []; } const sourceCancelPromise = stream._controller[CancelSteps](reason); @@ -769,10 +772,11 @@ function ReadableStreamClose(stream) { resolvePromise(reader._closedPromise, undefined); if (ReadableStreamDefaultReader.isImpl(reader)) { - for (const readRequest of reader._readRequests) { + const readRequests = reader._readRequests; + reader._readRequests = []; + for (const readRequest of readRequests) { readRequest.closeSteps(); } - reader._readRequests = []; } } @@ -792,19 +796,10 @@ function ReadableStreamError(stream, e) { setPromiseIsHandledToTrue(reader._closedPromise); if (ReadableStreamDefaultReader.isImpl(reader)) { - for (const readRequest of reader._readRequests) { - readRequest.errorSteps(e); - } - - reader._readRequests = []; + ReadableStreamDefaultReaderErrorReadRequests(reader, e); } else { assert(ReadableStreamBYOBReader.isImpl(reader)); - - for (const readIntoRequest of reader._readIntoRequests) { - readIntoRequest.errorSteps(e); - } - - reader._readIntoRequests = []; + ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e); } } @@ -895,10 +890,11 @@ function ReadableStreamReaderGenericInitialize(reader, stream) { } function ReadableStreamReaderGenericRelease(reader) { - assert(reader._stream !== undefined); - assert(reader._stream._reader === reader); + const stream = reader._stream; + assert(stream !== undefined); + assert(stream._reader === reader); - if (reader._stream._state === 'readable') { + if (stream._state === 'readable') { rejectPromise( reader._closedPromise, new TypeError('Reader was released and can no longer be used to monitor the stream\'s closedness') @@ -910,7 +906,9 @@ function ReadableStreamReaderGenericRelease(reader) { } setPromiseIsHandledToTrue(reader._closedPromise); - reader._stream._reader = undefined; + stream._controller[ReleaseSteps](); + + stream._reader = undefined; reader._stream = undefined; } @@ -928,6 +926,20 @@ function ReadableStreamBYOBReaderRead(reader, view, readIntoRequest) { } } +function ReadableStreamBYOBReaderRelease(reader) { + ReadableStreamReaderGenericRelease(reader); + const e = new TypeError('Reader was released'); + ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e); +} + +function ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e) { + const readIntoRequests = reader._readIntoRequests; + reader._readIntoRequests = []; + for (const readIntoRequest of readIntoRequests) { + readIntoRequest.errorSteps(e); + } +} + function ReadableStreamDefaultReaderRead(reader, readRequest) { const stream = reader._stream; @@ -945,6 +957,20 @@ function ReadableStreamDefaultReaderRead(reader, readRequest) { } } +function ReadableStreamDefaultReaderRelease(reader) { + ReadableStreamReaderGenericRelease(reader); + const e = new TypeError('Reader was released'); + ReadableStreamDefaultReaderErrorReadRequests(reader, e); +} + +function ReadableStreamDefaultReaderErrorReadRequests(reader, e) { + const readRequests = reader._readRequests; + reader._readRequests = []; + for (const readRequest of readRequests) { + readRequest.errorSteps(e); + } +} + function SetUpReadableStreamBYOBReader(reader, stream) { if (IsReadableStreamLocked(stream) === true) { throw new TypeError('This stream has already been locked for exclusive reading by another reader'); @@ -1259,6 +1285,7 @@ function ReadableByteStreamControllerClose(controller) { function ReadableByteStreamControllerCommitPullIntoDescriptor(stream, pullIntoDescriptor) { assert(stream._state !== 'errored'); + assert(pullIntoDescriptor.readerType !== 'none'); let done = false; if (stream._state === 'closed') { @@ -1308,12 +1335,15 @@ function ReadableByteStreamControllerEnqueue(controller, chunk) { 'The BYOB request\'s buffer has been detached and so cannot be filled with an enqueued chunk' ); } + ReadableByteStreamControllerInvalidateBYOBRequest(controller); firstPendingPullInto.buffer = TransferArrayBuffer(firstPendingPullInto.buffer); + if (firstPendingPullInto.readerType === 'none') { + ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, firstPendingPullInto); + } } - ReadableByteStreamControllerInvalidateBYOBRequest(controller); - if (ReadableStreamHasDefaultReader(stream) === true) { + ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller); if (ReadableStreamGetNumReadRequests(stream) === 0) { assert(controller._pendingPullIntos.length === 0); ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength); @@ -1343,6 +1373,30 @@ function ReadableByteStreamControllerEnqueueChunkToQueue(controller, buffer, byt controller._queueTotalSize += byteLength; } +function ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller, buffer, byteOffset, byteLength) { + let clonedChunk; + try { + clonedChunk = buffer.slice(byteOffset, byteOffset + byteLength); + } catch (cloneE) { + ReadableByteStreamControllerError(controller, cloneE); + throw cloneE; + } + ReadableByteStreamControllerEnqueueChunkToQueue(controller, clonedChunk, 0, byteLength); +} + +function ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, firstDescriptor) { + assert(firstDescriptor.readerType === 'none'); + if (firstDescriptor.bytesFilled > 0) { + ReadableByteStreamControllerEnqueueClonedChunkToQueue( + controller, + firstDescriptor.buffer, + firstDescriptor.byteOffset, + firstDescriptor.bytesFilled + ); + } + ReadableByteStreamControllerShiftPendingPullInto(controller); +} + function ReadableByteStreamControllerError(controller, e) { const stream = controller._stream; @@ -1412,6 +1466,18 @@ function ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, return ready; } +function ReadableByteStreamControllerFillReadRequestFromQueue(controller, readRequest) { + assert(controller._queueTotalSize > 0); + + const entry = controller._queue.shift(); + controller._queueTotalSize -= entry.byteLength; + + ReadableByteStreamControllerHandleQueueDrain(controller); + + const view = new Uint8Array(entry.buffer, entry.byteOffset, entry.byteLength); + readRequest.chunkSteps(view); +} + function ReadableByteStreamControllerGetBYOBRequest(controller) { if (controller._byobRequest === null && controller._pendingPullIntos.length > 0) { const firstDescriptor = controller._pendingPullIntos[0]; @@ -1471,6 +1537,7 @@ function ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(contro } const pullIntoDescriptor = controller._pendingPullIntos[0]; + assert(pullIntoDescriptor.readerType !== 'none'); if (ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) === true) { ReadableByteStreamControllerShiftPendingPullInto(controller); @@ -1483,6 +1550,18 @@ function ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(contro } } +function ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller) { + const reader = controller._stream._reader; + assert(ReadableStreamDefaultReader.isImpl(reader)); + while (reader._readRequests.length > 0) { + if (controller._queueTotalSize === 0) { + return; + } + const readRequest = reader._readRequests.shift(); + ReadableByteStreamControllerFillReadRequestFromQueue(controller, readRequest); + } +} + function ReadableByteStreamControllerPullInto(controller, view, readIntoRequest) { const stream = controller._stream; @@ -1582,6 +1661,10 @@ function ReadableByteStreamControllerRespond(controller, bytesWritten) { function ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor) { assert(firstDescriptor.bytesFilled === 0); + if (firstDescriptor.readerType === 'none') { + ReadableByteStreamControllerShiftPendingPullInto(controller); + } + const stream = controller._stream; if (ReadableStreamHasBYOBReader(stream) === true) { while (ReadableStreamGetNumReadIntoRequests(stream) > 0) { @@ -1596,6 +1679,12 @@ function ReadableByteStreamControllerRespondInReadableState(controller, bytesWri ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesWritten, pullIntoDescriptor); + if (pullIntoDescriptor.readerType === 'none') { + ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, pullIntoDescriptor); + ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller); + return; + } + if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) { return; } @@ -1605,8 +1694,12 @@ function ReadableByteStreamControllerRespondInReadableState(controller, bytesWri const remainderSize = pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize; if (remainderSize > 0) { const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled; - const remainder = pullIntoDescriptor.buffer.slice(end - remainderSize, end); - ReadableByteStreamControllerEnqueueChunkToQueue(controller, remainder, 0, remainder.byteLength); + ReadableByteStreamControllerEnqueueClonedChunkToQueue( + controller, + pullIntoDescriptor.buffer, + end - remainderSize, + remainderSize + ); } pullIntoDescriptor.bytesFilled -= remainderSize; diff --git a/reference-implementation/web-platform-tests b/reference-implementation/web-platform-tests index 71c864f8b..99d74f952 160000 --- a/reference-implementation/web-platform-tests +++ b/reference-implementation/web-platform-tests @@ -1 +1 @@ -Subproject commit 71c864f8bd7f855f12c81bfc7a1fc3ebf18657f3 +Subproject commit 99d74f9529e16ec0722ef11136ab29b9e80fff26