Skip to content

Commit

Permalink
Reject pending reads when releasing a reader
Browse files Browse the repository at this point in the history
Currently, reader.releaseLock() throws an error if there are still pending read requests. However, in #1103 we realized that it would be more useful if we allowed releasing a reader while there are still pending reads, which would reject those reads instead. The user can then acquire a new reader to receive those chunks.

For readable byte streams, we also discard pull-into descriptors corresponding to (now rejected) read-into requests. However, we must retain the first pull-into descriptor, since the underlying byte source may still be using it through controller.byobRequest. Instead, we mark this descriptor as "detached", such that when we invalidate this BYOB request (as a result of controller.enqueue() or byobRequest.respond()), the bytes from this descriptor are pushed into the stream's queue and used to fulfill read requests from a future reader.

This also allows expressing pipeTo()'s behavior more accurately. Currently, it "cheats" by calling ReadableStreamReaderGenericRelease directly without checking if [[readRequests]] is empty. With the proposed changes, we can safely release the reader when the pipe finishes (even if there's a pending read), and be sure that any unread chunks can be read by a future reader or pipe.
  • Loading branch information
MattiasBuelens authored Jan 13, 2022
1 parent 53282dd commit d5f92d9
Show file tree
Hide file tree
Showing 9 changed files with 320 additions and 112 deletions.
235 changes: 177 additions & 58 deletions index.bs

Large diffs are not rendered by default.

21 changes: 11 additions & 10 deletions reference-implementation/lib/ReadableByteStreamController-impl.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict';
const assert = require('assert');

const { CancelSteps, PullSteps } = require('./abstract-ops/internal-methods.js');
const { CancelSteps, PullSteps, ReleaseSteps } = require('./abstract-ops/internal-methods.js');
const { ResetQueue } = require('./abstract-ops/queue-with-sizes.js');
const aos = require('./abstract-ops/readable-streams.js');

Expand Down Expand Up @@ -67,15 +67,7 @@ exports.implementation = class ReadableByteStreamControllerImpl {

if (this._queueTotalSize > 0) {
assert(aos.ReadableStreamGetNumReadRequests(stream) === 0);

const entry = this._queue.shift();
this._queueTotalSize -= entry.byteLength;

aos.ReadableByteStreamControllerHandleQueueDrain(this);

const view = new Uint8Array(entry.buffer, entry.byteOffset, entry.byteLength);

readRequest.chunkSteps(view);
aos.ReadableByteStreamControllerFillReadRequestFromQueue(this, readRequest);
return;
}

Expand Down Expand Up @@ -106,4 +98,13 @@ exports.implementation = class ReadableByteStreamControllerImpl {
aos.ReadableStreamAddReadRequest(stream, readRequest);
aos.ReadableByteStreamControllerCallPullIfNeeded(this);
}

[ReleaseSteps]() {
if (this._pendingPullIntos.length > 0) {
const firstPullInto = this._pendingPullIntos[0];
firstPullInto.readerType = 'none';

this._pendingPullIntos = [firstPullInto];
}
}
};
8 changes: 4 additions & 4 deletions reference-implementation/lib/ReadableStream-impl.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,11 @@ exports.implementation = class ReadableStreamImpl {
const readRequest = {
chunkSteps: chunk => resolvePromise(promise, chunk),
closeSteps: () => {
aos.ReadableStreamReaderGenericRelease(reader);
aos.ReadableStreamDefaultReaderRelease(reader);
resolvePromise(promise, idlUtils.asyncIteratorEOI);
},
errorSteps: e => {
aos.ReadableStreamReaderGenericRelease(reader);
aos.ReadableStreamDefaultReaderRelease(reader);
rejectPromise(promise, e);
}
};
Expand All @@ -151,11 +151,11 @@ exports.implementation = class ReadableStreamImpl {

if (iterator._preventCancel === false) {
const result = aos.ReadableStreamReaderGenericCancel(reader, arg);
aos.ReadableStreamReaderGenericRelease(reader);
aos.ReadableStreamDefaultReaderRelease(reader);
return result;
}

aos.ReadableStreamReaderGenericRelease(reader);
aos.ReadableStreamDefaultReaderRelease(reader);
return promiseResolvedWith(undefined);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,7 @@ class ReadableStreamBYOBReaderImpl {
return;
}

if (this._readIntoRequests.length > 0) {
throw new TypeError('Tried to release a reader lock when that reader has pending read() calls un-settled');
}

aos.ReadableStreamReaderGenericRelease(this);
aos.ReadableStreamBYOBReaderRelease(this);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

const { CancelSteps, PullSteps } = require('./abstract-ops/internal-methods.js');
const { CancelSteps, PullSteps, ReleaseSteps } = require('./abstract-ops/internal-methods.js');
const { DequeueValue, ResetQueue } = require('./abstract-ops/queue-with-sizes.js');
const aos = require('./abstract-ops/readable-streams.js');

Expand Down Expand Up @@ -55,4 +55,6 @@ exports.implementation = class ReadableStreamDefaultControllerImpl {
aos.ReadableStreamDefaultControllerCallPullIfNeeded(this);
}
}

[ReleaseSteps]() {}
};
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@ class ReadableStreamDefaultReaderImpl {
return;
}

if (this._readRequests.length > 0) {
throw new TypeError('Tried to release a reader lock when that reader has pending read() calls un-settled');
}

aos.ReadableStreamReaderGenericRelease(this);
aos.ReadableStreamDefaultReaderRelease(this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ exports.AbortSteps = Symbol('[[AbortSteps]]');
exports.ErrorSteps = Symbol('[[ErrorSteps]]');
exports.CancelSteps = Symbol('[[CancelSteps]]');
exports.PullSteps = Symbol('[[PullSteps]]');
exports.ReleaseSteps = Symbol('[[ReleaseSteps]]');
Loading

0 comments on commit d5f92d9

Please sign in to comment.