Skip to content

Commit

Permalink
stream: commit pull-into descriptors after filling from queue
Browse files Browse the repository at this point in the history
Fixes: #56044
PR-URL: #56072
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Matthew Aitken <maitken033380023@gmail.com>
  • Loading branch information
MattiasBuelens authored and aduh95 committed Dec 10, 2024
1 parent f1c2d2f commit a4a8361
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 19 deletions.
55 changes: 36 additions & 19 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ const {
ArrayBufferViewGetByteLength,
ArrayBufferViewGetByteOffset,
AsyncIterator,
canCopyArrayBuffer,
cloneAsUint8Array,
copyArrayBuffer,
createPromiseCallback,
Expand Down Expand Up @@ -2552,6 +2553,15 @@ function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) {
}
}

function readableByteStreamControllerCommitPullIntoDescriptors(stream, descriptors) {
for (let i = 0; i < descriptors.length; ++i) {
readableByteStreamControllerCommitPullIntoDescriptor(
stream,
descriptors[i],
);
}
}

function readableByteStreamControllerInvalidateBYOBRequest(controller) {
if (controller[kState].byobRequest === null)
return;
Expand Down Expand Up @@ -2758,11 +2768,11 @@ function readableByteStreamControllerRespondInClosedState(controller, desc) {
stream,
} = controller[kState];
if (readableStreamHasBYOBReader(stream)) {
while (readableStreamGetNumReadIntoRequests(stream) > 0) {
readableByteStreamControllerCommitPullIntoDescriptor(
stream,
readableByteStreamControllerShiftPendingPullInto(controller));
const filledPullIntos = [];
for (let i = 0; i < readableStreamGetNumReadIntoRequests(stream); ++i) {
ArrayPrototypePush(filledPullIntos, readableByteStreamControllerShiftPendingPullInto(controller));
}
readableByteStreamControllerCommitPullIntoDescriptors(stream, filledPullIntos);
}
}

Expand Down Expand Up @@ -2843,8 +2853,9 @@ function readableByteStreamControllerEnqueue(controller, chunk) {
transferredBuffer,
byteOffset,
byteLength);
readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
const filledPullIntos = readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
controller);
readableByteStreamControllerCommitPullIntoDescriptors(stream, filledPullIntos);
} else {
assert(!isReadableStreamLocked(stream));
readableByteStreamControllerEnqueueChunkToQueue(
Expand Down Expand Up @@ -2937,6 +2948,7 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
let totalBytesToCopyRemaining = maxBytesToCopy;
let ready = false;
assert(!ArrayBufferPrototypeGetDetached(buffer));
assert(bytesFilled < minimumFill);
if (maxAlignedBytes >= minimumFill) {
totalBytesToCopyRemaining = maxAlignedBytes - bytesFilled;
Expand All @@ -2952,12 +2964,12 @@ function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
totalBytesToCopyRemaining,
headOfQueue.byteLength);
const destStart = byteOffset + desc.bytesFilled;
const arrayBufferByteLength = ArrayBufferPrototypeGetByteLength(buffer);
if (arrayBufferByteLength - destStart < bytesToCopy) {
throw new ERR_INVALID_STATE.RangeError(
'view ArrayBuffer size is invalid');
}
assert(arrayBufferByteLength - destStart >= bytesToCopy);
assert(canCopyArrayBuffer(
buffer,
destStart,
headOfQueue.buffer,
headOfQueue.byteOffset,
bytesToCopy));
copyArrayBuffer(
buffer,
destStart,
Expand Down Expand Up @@ -2991,26 +3003,30 @@ function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
const {
closeRequested,
pendingPullIntos,
stream,
} = controller[kState];
assert(!closeRequested);
const filledPullIntos = [];
while (pendingPullIntos.length) {
if (!controller[kState].queueTotalSize)
return;
break;
const desc = pendingPullIntos[0];
if (readableByteStreamControllerFillPullIntoDescriptorFromQueue(
controller,
desc)) {
readableByteStreamControllerShiftPendingPullInto(controller);
readableByteStreamControllerCommitPullIntoDescriptor(stream, desc);
ArrayPrototypePush(filledPullIntos, desc);
}
}
return filledPullIntos;
}

function readableByteStreamControllerRespondInReadableState(
controller,
bytesWritten,
desc) {
const {
stream,
} = controller[kState];
const {
buffer,
bytesFilled,
Expand All @@ -3031,9 +3047,10 @@ function readableByteStreamControllerRespondInReadableState(
controller,
desc,
);
readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
const filledPullIntos = readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
controller,
);
readableByteStreamControllerCommitPullIntoDescriptors(stream, filledPullIntos);
return;
}

Expand All @@ -3059,10 +3076,10 @@ function readableByteStreamControllerRespondInReadableState(
ArrayBufferPrototypeGetByteLength(remainder));
}
desc.bytesFilled -= remainderSize;
readableByteStreamControllerCommitPullIntoDescriptor(
controller[kState].stream,
desc);
readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);
const filledPullIntos = readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);

readableByteStreamControllerCommitPullIntoDescriptor(stream, desc);
readableByteStreamControllerCommitPullIntoDescriptors(stream, filledPullIntos);
}

function readableByteStreamControllerRespondWithNewView(controller, view) {
Expand Down
11 changes: 11 additions & 0 deletions lib/internal/webstreams/util.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
'use strict';

const {
ArrayBufferPrototypeGetByteLength,
ArrayBufferPrototypeGetDetached,
ArrayBufferPrototypeSlice,
ArrayPrototypePush,
ArrayPrototypeShift,
Expand Down Expand Up @@ -107,6 +109,14 @@ function cloneAsUint8Array(view) {
);
}

function canCopyArrayBuffer(toBuffer, toIndex, fromBuffer, fromIndex, count) {
return toBuffer !== fromBuffer &&
!ArrayBufferPrototypeGetDetached(toBuffer) &&
!ArrayBufferPrototypeGetDetached(fromBuffer) &&
toIndex + count <= ArrayBufferPrototypeGetByteLength(toBuffer) &&
fromIndex + count <= ArrayBufferPrototypeGetByteLength(fromBuffer);
}

function isBrandCheck(brand) {
return (value) => {
return value != null &&
Expand Down Expand Up @@ -261,6 +271,7 @@ module.exports = {
ArrayBufferViewGetByteLength,
ArrayBufferViewGetByteOffset,
AsyncIterator,
canCopyArrayBuffer,
createPromiseCallback,
cloneAsUint8Array,
copyArrayBuffer,
Expand Down

0 comments on commit a4a8361

Please sign in to comment.