diff --git a/doc/api/errors.md b/doc/api/errors.md
index 7ac741bba68737..4f0e0dad5ae7df 100644
--- a/doc/api/errors.md
+++ b/doc/api/errors.md
@@ -1428,6 +1428,11 @@ is set for the `Http2Stream`.
`http2.connect()` was passed a URL that uses any protocol other than `http:` or
`https:`.
+
+### `ERR_ILLEGAL_CONSTRUCTOR`
+
+An attempt was made to construct an object using a non-public constructor.
+
### `ERR_INCOMPATIBLE_OPTION_PAIR`
diff --git a/doc/api/index.md b/doc/api/index.md
index 71c415afaa673a..957c5d55b3008b 100644
--- a/doc/api/index.md
+++ b/doc/api/index.md
@@ -64,6 +64,7 @@
* [VM](vm.md)
* [WASI](wasi.md)
* [Web Crypto API](webcrypto.md)
+* [WHATWG Streams](whatwg_streams.md)
* [Worker threads](worker_threads.md)
* [Zlib](zlib.md)
diff --git a/doc/api/whatwg_streams.md b/doc/api/whatwg_streams.md
new file mode 100644
index 00000000000000..56129f77dd2ea0
--- /dev/null
+++ b/doc/api/whatwg_streams.md
@@ -0,0 +1,656 @@
+# WHATWG Streams
+
+> Stability: 1 - Experimental
+
+An implementation of the [WHATWG Streams Standard][].
+
+```mjs
+import {
+ ReadableStream,
+ WritableStream,
+ TransformStream,
+} from 'node:stream/web';
+```
+
+```cjs
+const {
+ ReadableStream,
+ WritableStream,
+ TransformStream,
+} = require('stream/web');
+```
+
+## Overview
+
+TBD
+
+## API
+
+### Class: `ReadableStream`
+
+
+#### `new ReadableStream([underlyingSource [, strategy]])`
+
+
+
+* `underlyingSource` {Object}
+ * `start` {Function}
+ * `controller` {ReadableStreamDefaultController|ReadableByteStreamController}
+ * Returns: `undefined` or a promise fulfilled with `undefined`.
+ * `pull` {Function}
+ * `controller` {ReadableStreamDefaultController|ReadableByteStreamController}
+ * Returns: A promise fulfilled with `undefined`.
+ * `cancel` {Function}
+ * `reason` {any}
+ * Returns: A promise fulfilled with `undefined`.
+ * `type` {string} Must be `'bytes'` or `undefined`.
+ * `autoAllocateChunkSize` {number}
+* `strategy` {Object}
+ * `highWaterMark` {number}
+ * `size` {Function}
+ * `chunk` {any}
+ * Returns: {number}
+
+
+#### `readableStream.locked`
+
+
+* Type: {boolean}
+
+#### `readableStream.cancel([reason])`
+
+
+* `reason` {any}
+* Returns: A promise fulfilled with `undefined`.
+
+#### `readableStream.getReader([options])`
+
+
+* `options` {Object}
+ * `mode` {string} `'byob'` or `undefined`
+* Returns: {ReadableStreamDefaultReader|ReadableStreamBYOBReader}
+
+#### `readableStream.pipeThrough(transform[, options])`
+
+
+* `transform` {Object}
+ * `readable` {ReadableStream}
+ * `writable` {WritableStream}
+* `options` {Object}
+ * `preventAbort` {boolean}
+ * `preventCancel` {boolean}
+ * `preventClose` {boolean}
+ * `signal` {AbortSignal}
+* Returns: {ReadableStream}
+
+#### `readableStream.pipeTo(destination, options)`
+
+
+* `destination` {WritableStream}
+* `options` {Object}
+ * `preventAbort` {boolean}
+ * `preventCancel` {boolean}
+ * `preventClose` {boolean}
+ * `signal` {AbortSignal}
+* Returns: A promise fulfilled with `undefined`
+
+#### `readableStream.tee()`
+
+
+* Returns: {ReadableStream[]}
+
+#### `readableStream.values([options])`
+
+
+* `options` {Object}
+ * `preventCancel` {boolean} When `true`, prevents the {ReadableStream}
+ from being closed when the async iterator abruptly terminates.
+ **Defaults**: `false`
+
+```mjs
+const stream = new ReadableStream(getSomeSource());
+
+for await (const chunk of stream.values({ preventCancel: true }))
+ console.log(Buffer.from(chunk).toString());
+```
+
+#### Async Iteration
+
+The {ReadableStream} object supports the async iterator protocol using
+`for await` syntax.
+
+```mjs
+const stream = new ReadableStream(getSomeSource());
+
+for await (const chunk of stream)
+ console.log(Buffer.from(chunk).toString());
+```
+
+The async iterator will consume the {ReadableStream} until it terminates.
+
+By default, if the async iterator exits early (via either a `break`,
+`return`, or a `throw`), the {ReadableStream} will be closed. To prevent
+automatic closing of the {ReadableStream}, use the `readableStream.values()`
+method to acquire the async iterator and set the `preventCancel` option to
+`true`.
+
+The {ReadableStream} must not be locked (that is, it must not have an existing
+active reader). During the async iteration, the {ReadableStream} will be locked.
+
+#### Transfering with `postMessage()`
+
+A {ReadableStream} instance can be transferred using a {MessagePort}.
+
+```js
+const stream = new ReadableStream(getReadableSourceSomehow());
+
+const { port1, port2 } = new MessageChannel();
+
+port1.onmessage = ({ data }) => {
+ data.getReader().read().then((chunk) => {
+ console.log(chunk);
+ });
+};
+
+port2.postMessage(stream);
+```
+
+### Class: `ReadableStreamDefaultReader`
+
+
+#### `new ReadableStreamDefaultReader(stream)`
+
+
+* `stream` {ReadableStream}
+
+#### `readableStreamDefaultReader.cancel([reason])`
+
+
+* `reason` {any}
+
+#### `readableStreamDefaultReader.closed`
+
+
+* Type: {Promise} Fulfilled with `undefined` when the reader is closed.
+
+#### `readableStreamDefaultReader.read()`
+
+
+* Returns: A promise fulfilled with an object:
+ * `value` {ArrayBuffer}
+ * `done` {boolean}
+
+#### `readableStreamDefaultReader.releaseLock()`
+
+
+### Class: `ReadableStreamBYOBReader`
+
+
+#### `new ReadableStreamBYOBReader(stream)`
+
+
+* `stream` {ReadableStream}
+
+#### `readableStreamBYOBReader.cancel([reason])`
+
+
+* `reason` {any}
+
+#### `readableStreamBYOBReader.closed`
+
+
+* Type: {Promise} Fulfilled with `undefined` when the reader is closed.
+
+#### `readableStreamBYOBReader.read(view)`
+
+
+* `view` {Buffer|TypedArray|DataView}
+* Returns: A promise fulfilled with an object:
+ * `value` {ArrayBuffer}
+ * `done` {boolean}
+
+#### `readableStreamBYOBReader.releaseLock()`
+
+
+### Class: `ReadableStreamDefaultController`
+
+
+#### `readableStreamDefaultController.close()`
+
+
+#### `readableStreamDefaultController.desiredSize`
+
+
+* Type: {number}
+
+#### `readableStreamDefaultController.enqueue(chunk)`
+
+
+* `chunk` {any}
+
+#### `readableStreamDefaultController.error(error)`
+
+
+* `error` {any}
+
+### Class: `ReadableByteStreamController`
+
+
+#### `readableByteStreamController.byobRequest`
+
+
+* Type: {ReadableStreamBYOBRequest}
+
+#### `readableByteStreamController.close()`
+
+
+#### `readableByteStreamController.desiredSize`
+
+
+* Type: {number}
+
+#### `readableByteStreamController.enqueue(chunk)`
+
+
+* `chunk`: {Buffer|TypedArray|DataView}
+
+#### `readableByteStreamController.error(error)`
+
+
+* `error` {any}
+
+### Class: `ReadableStreamBYOBRequest`
+
+
+#### `readableStreamBYOBRequest.respond(bytesWritten)`
+
+
+* `bytesWritten` {number}
+
+#### `readableStreamBYOBRequest.respondWithNewView(view)`
+
+
+* `view` {Buffer|TypedArray|DataView}
+
+#### `readableStreamBYOBRequest.view`
+
+
+* Type: {Buffer|TypedArray|DataView}
+
+### Class: `WritableStream`
+
+
+#### `new WritableStream([underlyingSink[, strategy]])`
+
+
+* `underlyingSink` {Object}
+ * `start` {Function}
+ * `controller` {WritableStreamDefaultController}
+ * Returns: `undefined` or a promise fulfilled with `undefined`.
+ * `write` {Function}
+ * `chunk` {any}
+ * `controller` {WritableStreamDefaultController}
+ * Returns: A promise fulfilled with `undefined`.
+ * `close` {Function}
+ * Returns: A promise fulfilled with `undefined`.
+ * `abort` {Function}
+ * `reason` {any}
+ * Returns: A promise fulfilled with `undefined`.
+ * `type` {any}
+* `strategy` {Object}
+ * `highWaterMark` {number}
+ * `size` {Function}
+ * `chunk` {any}
+ * Returns: {number}
+
+#### `writableStream.abort([reason])`
+
+
+* `reason` {any}
+* Returns: A promise fulfilled with `undefined`.
+
+#### `writableStream.close()`
+
+
+* Returns: A promise fulfilled with `undefined`.
+
+#### `writableStream.getWriter()`
+
+
+* Returns: {WritableStreamDefaultWriter}
+
+#### `writableStream.locked`
+
+
+* Type: {boolean}
+
+#### Transfering with postMessage()
+
+A {WritableStream} instance can be transferred using a {MessagePort}.
+
+```js
+const stream = new WritableStream(getWritableSinkSomehow());
+
+const { port1, port2 } = new MessageChannel();
+
+port1.onmessage = ({ data }) => {
+ data.getWriter().write('hello');
+};
+
+port2.postMessage(stream);
+```
+
+### Class: `WritableStreamDefaultWriter`
+
+
+#### `new WritableStreamDefaultWriter(stream)`
+
+
+* `stream` {WritableStream}
+
+#### `writableStreamDefaultWriter.abort([reason])`
+
+
+* `reason` {any}
+* Returns: A promise fulfilled with `undefined`.
+
+#### `writableStreamDefaultWriter.close()`
+
+
+* Returns: A promise fulfilled with `undefined`.
+
+#### `writableStreamDefaultWriter.closed`
+
+
+* Type: A promise that is fulfilled with `undefined` when the
+ writer is closed.
+
+#### `writableStreamDefaultWriter.desiredSize`
+
+
+* Type: {number}
+
+#### `writableStreamDefaultWriter.ready`
+
+
+* type: A promise that is fulfilled with `undefined` when the
+ writer is ready to be used.
+
+#### `writableStreamDefaultWriter.releaseLock()`
+
+
+#### `writableStreamDefaultWriter.write([chunk])`
+
+
+* `chunk`: {any}
+* Returns: A promise fulfilled with `undefined`.
+
+### Class: `WritableStreamDefaultController`
+
+
+#### `writableStreamDefaultController.error(error)`
+
+
+* `error` {any}
+
+### Class: `TransformStream`
+
+
+#### `new TransformStream([transformer[, writableStrategy[, readableStrategy]]])`
+
+
+* `transformer` {Object}
+ * `start` {Function}
+ * `controller` {TransformStreamDefaultController}
+ * Returns: `undefined` or a promise fulfilled with `undefined`
+ * `transform` {Function}
+ * `chunk` {any}
+ * `controller` {TransformStreamDefaultController}
+ * Returns: A promise fulfilled with `undefined`.
+ * `flush` {Function}
+ * `controller` {TransformStreamDefaultController}
+ * Returns: A promise fulfilled with `undefined`.
+ * `readableType` {any}
+ * `writableType` {any}
+* `writableStrategy` {Object}
+ * `highWaterMark` {number}
+ * `size` {Function}
+ * `chunk` {any}
+ * Returns: {number}
+* `readableStrategy` {Object}
+ * `highWaterMark` {number}
+ * `size` {Function}
+ * `chunk` {any}
+ * Returns: {number}
+
+#### `transformStream.readable`
+
+
+* Type: {ReadableStream}
+
+#### `transformStream.writable`
+
+
+* Type: {WritableStream}
+
+#### Transfering with postMessage()
+
+A {TransformStream} instance can be transferred using a {MessagePort}.
+
+```js
+const stream = new TransformStream();
+
+const { port1, port2 } = new MessageChannel();
+
+port1.onmessage = ({ data }) => {
+ const { writable, readable } = data;
+ // ...
+};
+
+port2.postMessage(stream);
+```
+
+### Class: `TransformStreamDefaultController`
+
+
+#### `transformStreamDefaultController.desiredSize`
+
+
+* Type: {number}
+
+#### `transformStreamDefaultController.enqueue([chunk])`
+
+
+* `chunk` {any}
+
+#### `transformStreamDefaultController.error([reason])`
+
+
+* `reason` {any}
+
+#### `transformStreamDefaultController.terminate()`
+
+
+### Class: `ByteLengthQueuingStrategy`
+
+
+#### `new ByteLengthQueuingStrategy(options)`
+
+
+* `options` {Object}
+ * `highWaterMark` {number}
+
+#### `byteLengthQueuingStrategy.highWaterMark`
+
+
+* Type: {number}
+
+#### `byteLengthQueuingStrategy.size`
+
+
+* Type: {Function}
+ * `chunk` {any}
+ * Returns: {number}
+
+### Class: `CountQueuingStrategy`
+
+
+#### `new CountQueuingStrategy(options)`
+
+
+* `options` {Object}
+ * `highWaterMark` {number}
+
+#### `countQueuingStrategy.highWaterMark`
+
+
+* Type: {number}
+
+#### `countQueuingStrategy.size`
+
+
+* Type: {Function}
+ * `chunk` {any}
+ * Returns: {number}
+
+[WHATWG Streams Standard]: https://streams.spec.whatwg.org/
diff --git a/lib/internal/errors.js b/lib/internal/errors.js
index 13b56311d370b8..320cb07662967c 100644
--- a/lib/internal/errors.js
+++ b/lib/internal/errors.js
@@ -1033,6 +1033,7 @@ E('ERR_HTTP_SOCKET_ENCODING',
'Changing the socket encoding is not allowed per RFC7230 Section 3.', Error);
E('ERR_HTTP_TRAILER_INVALID',
'Trailers are invalid with this transfer encoding', Error);
+E('ERR_ILLEGAL_CONSTRUCTOR', 'Illegal constructor', TypeError);
E('ERR_INCOMPATIBLE_OPTION_PAIR',
'Option "%s" cannot be used in combination with option "%s"', TypeError);
E('ERR_INPUT_TYPE_NOT_ALLOWED', '--input-type can only be used with string ' +
@@ -1256,8 +1257,8 @@ E('ERR_INVALID_RETURN_VALUE', (input, name, value) => {
}
return `Expected ${input} to be returned from the "${name}"` +
` function but got ${type}.`;
-}, TypeError);
-E('ERR_INVALID_STATE', 'Invalid state: %s', Error);
+}, TypeError, RangeError);
+E('ERR_INVALID_STATE', 'Invalid state: %s', Error, TypeError);
E('ERR_INVALID_SYNC_FORK_INPUT',
'Asynchronous forks do not support ' +
'Buffer, TypedArray, DataView or string input: %s',
diff --git a/lib/internal/streams/whatwg.js b/lib/internal/streams/whatwg.js
new file mode 100644
index 00000000000000..7843bf2131d215
--- /dev/null
+++ b/lib/internal/streams/whatwg.js
@@ -0,0 +1,4233 @@
+'use strict';
+
+/* eslint-disable no-use-before-define */
+
+const {
+ ArrayBuffer,
+ ArrayPrototypePush,
+ ArrayPrototypeShift,
+ DataViewCtor,
+ FunctionPrototypeBind,
+ FunctionPrototypeCall,
+ MathMax,
+ MathMin,
+ NumberIsInteger,
+ NumberIsNaN,
+ ObjectGetPrototypeOf,
+ ObjectSetPrototypeOf,
+ PromisePrototypeCatch,
+ PromisePrototypeThen,
+ PromiseResolve,
+ PromiseReject,
+ PromiseAll,
+ Symbol,
+ SymbolAsyncIterator,
+ SymbolToStringTag,
+ Uint8Array,
+} = primordials;
+
+const {
+ AbortError,
+ codes: {
+ ERR_ILLEGAL_CONSTRUCTOR,
+ ERR_INVALID_ARG_VALUE,
+ ERR_INVALID_ARG_TYPE,
+ ERR_INVALID_STATE,
+ },
+} = require('internal/errors');
+
+const {
+ DOMException,
+} = internalBinding('messaging');
+
+const {
+ copyArrayBuffer,
+ detachArrayBuffer
+} = internalBinding('buffer');
+
+const {
+ isArrayBufferView,
+ isDataView,
+} = require('util/types');
+
+const {
+ createDeferredPromise,
+ emitExperimentalWarning,
+ customInspectSymbol: kInspect,
+} = require('internal/util');
+
+const {
+ inspect,
+} = require('util');
+
+const {
+ serialize,
+ deserialize,
+} = require('v8');
+
+const {
+ validateAbortSignal,
+ validateBoolean,
+ validateObject,
+} = require('internal/validators');
+
+const {
+ MessageChannel,
+} = require('internal/worker/io');
+
+const {
+ kDeserialize,
+ kTransfer,
+ kTransferList,
+ JSTransferable,
+} = require('internal/worker/js_transferable');
+
+const {
+ getPromiseDetails,
+ kPending,
+} = internalBinding('util');
+
+const {
+ queueMicrotask,
+} = require('internal/process/task_queues');
+
+const assert = require('internal/assert');
+
+emitExperimentalWarning('stream/web');
+
+const kAbort = Symbol('kAbort');
+const kCancel = Symbol('kCancel');
+const kClose = Symbol('kClose');
+const kCloseSentinel = Symbol('kCloseSentinel');
+const kChunk = Symbol('kChunk');
+const kError = Symbol('kError');
+const kPull = Symbol('kPull');
+const kState = Symbol('kState');
+
+const AsyncIteratorPrototype = ObjectGetPrototypeOf(
+ ObjectGetPrototypeOf(async function* () {}).prototype);
+
+function extractHighWaterMark(value, defaultHWM) {
+ if (value === undefined) return defaultHWM;
+ if (typeof value !== 'number' ||
+ NumberIsNaN(value) ||
+ value < 0)
+ throw new ERR_INVALID_ARG_VALUE.RangeError('strategy.highWaterMark', value);
+ return value;
+}
+
+function extractSizeAlgorithm(size) {
+ if (size === undefined) return () => 1;
+ if (typeof size !== 'function')
+ throw new ERR_INVALID_ARG_VALUE('strategy.size', size);
+ return size;
+}
+
+function customInspect(depth, options, name, data) {
+ if (depth < 0)
+ return this;
+
+ const opts = {
+ ...options,
+ depth: options.depth == null ? null : options.depth - 1
+ };
+
+ return `${name} ${inspect(data, opts)}`;
+}
+
+/**
+ * @typedef {import('../abort_controller').AbortSignal} AbortSignal
+ *
+ * @typedef {
+ * ReadableStreamDefaultController | ReadableByteStreamController
+ * } ReadableStreamController
+ *
+ * @typedef {
+ * ReadableStreamDefaultReader | ReadableStreamBYOBReader
+ * } ReadableStreamReader
+ *
+ * @callback UnderlyingSourceStartCallback
+ * @param {ReadableStreamController} controller
+ * @returns { any | Promise }
+ *
+ * @callback UnderlyingSourcePullCallback
+ * @param {ReadableStreamController} controller
+ * @returns { Promise }
+ *
+ * @callback UnderlyingSourceCancelCallback
+ * @param {any} reason
+ * @returns { Promise }
+ *
+ * @callback QueuingStrategySize
+ * @param {any} chunk
+ * @returns {number}
+ *
+ * @callback UnderlyingSinkStartCallback
+ * @param {WritableStreamDefaultController} controller
+ *
+ * @callback UnderlyingSinkWriteCallback
+ * @param {any} chunk
+ * @param {WritableStreamDefaultController} controller
+ * @returns {Promise}
+ *
+ * @callback UnderlyingSinkCloseCallback
+ * @returns {Promise}
+ *
+ * @callback UnderlyingSinkAbortCallback
+ * @param {any} reason
+ * @returns {Promise}
+ *
+ * @callback TransformerStartCallback
+ * @param {TransformStreamDefaultController} controller;
+ *
+ * @callback TransformerFlushCallback
+ * @param {TransformStreamDefaultController} controller;
+ * @returns {Promise}
+ *
+ * @callback TransformerTransformCallback
+ * @param {any} chunk
+ * @param {TransformStreamDefaultController} controller
+ * @returns {Promise}
+ *
+ * @typedef {{
+ * readable: ReadableStream,
+ * writable: WritableStream,
+ * }} ReadableWritablePair
+ *
+ * @typedef {{
+ * preventClose? : boolean,
+ * preventAbort? : boolean,
+ * preventCancel? : boolean,
+ * signal? : AbortSignal,
+ * }} StreamPipeOptions
+ *
+ * @typedef {{
+ * start? : UnderlyingSourceStartCallback,
+ * pull? : UnderlyingSourcePullCallback,
+ * cancel? : UnderlyingSourceCancelCallback,
+ * type? : "bytes",
+ * autoAllocateChunkSize? : number
+ * }} UnderlyingSource
+ *
+ * @typedef {{
+ * start? : UnderlyingSinkStartCallback,
+ * write? : UnderlyingSinkWriteCallback,
+ * close? : UnderlyingSinkCloseCallback,
+ * abort? : UnderlyingSinkAbortCallback,
+ * type? : any,
+ * }} UnderlyingSink
+ *
+ * @typedef {{
+ * start? : TransformerStartCallback,
+ * transform? : TransformerTransformCallback,
+ * flush? : TransformerFlushCallback,
+ * readableType? : any,
+ * writableType? : any,
+ * }} Transformer
+ *
+ * @typedef {{
+ * highWaterMark : number,
+ * size? : QueuingStrategySize,
+ * }} QueuingStrategy
+ */
+
+class ReadableStream extends JSTransferable {
+ /**
+ * @param {UnderlyingSource} [source]
+ * @param {QueuingStrategy} [strategy]
+ */
+ constructor(source = null, strategy = {}) {
+ super();
+ this[kState] = {
+ disturbed: false,
+ state: 'readable',
+ storedError: undefined,
+ stream: undefined,
+ transfer: {
+ writable: undefined,
+ port1: undefined,
+ port2: undefined,
+ promise: undefined,
+ }
+ };
+ // The spec requires handling of the strategy first
+ // here. Specifically, if getting the size and
+ // highWaterMark from the strategy fail, that has
+ // to trigger a throw before getting the details
+ // from the source. So be sure to keep these in
+ // this order.
+ const size = strategy?.size;
+ const highWaterMark = strategy?.highWaterMark;
+ const type = source?.type;
+
+ if (type === 'bytes') {
+ if (size !== undefined)
+ throw new ERR_INVALID_ARG_VALUE('strategy.size', size);
+ setupReadableByteStreamControllerFromSource(
+ this,
+ source,
+ extractHighWaterMark(highWaterMark, 0));
+ return;
+ }
+
+ if (type !== undefined)
+ throw new ERR_INVALID_ARG_VALUE('source.type', type);
+ setupReadableStreamDefaultControllerFromSource(
+ this,
+ source,
+ extractHighWaterMark(highWaterMark, 1),
+ extractSizeAlgorithm(size));
+ }
+
+ /**
+ * @readonly
+ * @type {boolean}
+ */
+ get locked() {
+ return isReadableStreamLocked(this);
+ }
+
+ /**
+ * @param {any} [reason]
+ * @returns { Promise }
+ */
+ cancel(reason) {
+ if (isReadableStreamLocked(this)) {
+ return PromiseReject(
+ new ERR_INVALID_STATE.TypeError('ReadableStream is locked'));
+ }
+ return readableStreamCancel(this, reason);
+ }
+
+ /**
+ * @param {{
+ * mode? : "byob"
+ * }} [options]
+ * @returns {ReadableStreamReader}
+ */
+ getReader(options = {}) {
+ validateObject(options, 'options');
+ const {
+ mode,
+ } = options;
+ if (mode === undefined) {
+ return new ReadableStreamDefaultReader(this);
+ }
+ if (mode !== 'byob')
+ throw new ERR_INVALID_ARG_VALUE('options.mode', mode);
+ return new ReadableStreamBYOBReader(this);
+ }
+
+ /**
+ * @param {ReadableWritablePair} transform
+ * @param {StreamPipeOptions} [options]
+ * @returns {ReadableStream}
+ */
+ pipeThrough(transform, options = {}) {
+ const readable = transform?.readable;
+ const writable = transform?.writable;
+ if (!isReadableStream(readable)) {
+ throw new ERR_INVALID_ARG_TYPE(
+ 'transform.readable',
+ 'ReadableStream',
+ readable);
+ }
+ if (!isWritableStream(writable)) {
+ throw new ERR_INVALID_ARG_TYPE(
+ 'transform.writable',
+ 'WritableStream',
+ writable);
+ }
+ if (isReadableStreamLocked(this))
+ throw new ERR_INVALID_STATE('The ReadableStream is locked');
+ if (isWritableStreamLocked(writable))
+ throw new ERR_INVALID_STATE('The WritableStream is locked');
+ const {
+ signal,
+ preventClose = false,
+ preventAbort = false,
+ preventCancel = false,
+ } = options;
+ validateBoolean(preventAbort, 'options.preventAbort');
+ validateBoolean(preventClose, 'options.preventClose');
+ validateBoolean(preventCancel, 'options.preventCancel');
+ if (signal !== undefined)
+ validateAbortSignal(signal, 'options.signal');
+
+ const promise = readableStreamPipeTo(
+ this,
+ writable,
+ preventClose,
+ preventAbort,
+ preventCancel,
+ signal);
+ setPromiseHandled(promise);
+
+ return readable;
+ }
+
+ /**
+ * @param {WritableStream} destination
+ * @param {StreamPipeOptions} [options]
+ * @returns {Promise}
+ */
+ pipeTo(destination, options = {}) {
+ if (!isWritableStream(destination)) {
+ throw new ERR_INVALID_ARG_TYPE(
+ 'transform.writable',
+ 'WritableStream',
+ destination);
+ }
+ if (isReadableStreamLocked(this))
+ throw new ERR_INVALID_STATE('The ReadableStream is locked');
+ if (isWritableStreamLocked(destination))
+ throw new ERR_INVALID_STATE('The WritableStream is locked');
+ const {
+ signal,
+ preventClose = false,
+ preventAbort = false,
+ preventCancel = false,
+ } = options;
+ validateBoolean(preventAbort, 'options.preventAbort');
+ validateBoolean(preventClose, 'options.preventClose');
+ validateBoolean(preventCancel, 'options.preventCancel');
+ if (signal !== undefined)
+ validateAbortSignal(signal, 'options.signal');
+
+ return readableStreamPipeTo(
+ this,
+ destination,
+ preventClose,
+ preventAbort,
+ preventCancel,
+ signal);
+ }
+
+ /**
+ * @returns {ReadableStream[]}
+ */
+ tee() {
+ return readableStreamTee(this, false);
+ }
+
+ // The values() method is not defined in the streams standard
+ // spec but it is called out in the section on async iteration.
+ // So... that's weird.
+
+ /**
+ * @param {{
+ * preventCancel? : boolean,
+ * }} [options]
+ * @returns {AsyncIterable}
+ */
+ values(options = {}) {
+ validateObject(options, 'options');
+ const {
+ preventCancel = false,
+ } = options;
+ validateBoolean(preventCancel, 'options.preventCancel');
+
+ const reader = new ReadableStreamDefaultReader(this);
+
+ return ObjectSetPrototypeOf({
+ next() {
+ if (reader[kState].stream === undefined) {
+ return PromiseReject(new ERR_INVALID_STATE(
+ 'The reader is not bound to a ReadableStream'));
+ }
+ const promise = createDeferredPromise();
+ readableStreamDefaultReaderRead(reader, {
+ [kChunk](chunk) {
+ promise.resolve({ value: chunk, done: false });
+ },
+ [kClose]() {
+ readableStreamReaderGenericRelease(reader);
+ promise.resolve({ done: true });
+ },
+ [kError](error) {
+ readableStreamReaderGenericRelease(reader);
+ promise.reject(error);
+ }
+ });
+ return promise.promise;
+ },
+
+ return(error) {
+ if (reader[kState].stream === undefined) {
+ return PromiseResolve();
+ }
+ assert(!reader[kState].readRequests.length);
+ if (!preventCancel) {
+ const result = readableStreamReaderGenericCancel(reader, error);
+ readableStreamReaderGenericRelease(reader);
+ return PromisePrototypeThen(result, () => {
+ return { done: true };
+ });
+ }
+ readableStreamReaderGenericRelease(reader);
+ return PromiseResolve({ done: true });
+ },
+
+ [SymbolAsyncIterator]() { return this; }
+ }, AsyncIteratorPrototype);
+ }
+
+ [kInspect](depth, options) {
+ return customInspect(depth, options, 'ReadableStream', {
+ locked: this.locked,
+ state: this[kState].state,
+ });
+ }
+
+ /**
+ * @returns {AsyncIterable}
+ */
+ [SymbolAsyncIterator]() {
+ return this.values({ preventCancel: false });
+ }
+
+ get [SymbolToStringTag]() { return 'ReadableStream'; }
+
+ [kTransfer]() {
+ if (this.locked) {
+ this[kState].transfer.port1.close();
+ this[kState].transfer.port1 = undefined;
+ this[kState].transfer.port2 = undefined;
+ throw new DOMException(
+ 'Cannot transfer a locked ReadableStream',
+ 'DataCloneError');
+ }
+
+ this[kState].transfer.writable =
+ new WritableStream(
+ new CrossRealmTransformWritableSink(this[kState].transfer.port1));
+
+ this[kState].transfer.promise = readableStreamPipeTo(
+ this,
+ this[kState].transfer.writable,
+ false,
+ false,
+ false);
+ setPromiseHandled(this[kState].transfer.promise);
+
+ return {
+ data: { port: this[kState].transfer.port2 },
+ deserializeInfo: 'internal/streams/whatwg:TransferedReadableStream'
+ };
+ }
+
+ [kTransferList]() {
+ const { port1, port2 } = new MessageChannel();
+ this[kState].transfer.port1 = port1;
+ this[kState].transfer.port2 = port2;
+ return [ port2 ];
+ }
+
+ [kDeserialize]({ port }) {
+ setupReadableStreamDefaultControllerFromSource(
+ this,
+ new CrossRealmTransformReadableSource(port),
+ 0, () => 1);
+ }
+}
+
+class TransferedReadableStream extends JSTransferable {
+ constructor() {
+ super();
+ this[kState] = {
+ disturbed: false,
+ state: 'readable',
+ storedError: undefined,
+ stream: undefined,
+ transfer: {
+ writable: undefined,
+ port: undefined,
+ promise: undefined,
+ }
+ };
+ }
+}
+
+internalExtend(TransferedReadableStream, ReadableStream);
+
+class ReadableStreamBYOBRequest {
+ constructor() {
+ throw new ERR_ILLEGAL_CONSTRUCTOR();
+ }
+
+ /**
+ * @readonly
+ * @type {ArrayBufferView}
+ */
+ get view() { return this[kState].view; }
+
+ /**
+ * @param {number} bytesWritten
+ */
+ respond(bytesWritten) {
+ const {
+ view,
+ controller,
+ } = this[kState];
+ if (controller === undefined)
+ throw new ERR_INVALID_STATE('This request is not bound to a controller');
+ // Supposed to assert here that the view's buffer is not
+ // detached, but there's no API available to use to check that.
+ assert(view.byteLength || view.buffer.byteLength);
+ readableByteStreamControllerRespond(controller, bytesWritten);
+ }
+
+ /**
+ * @param {ArrayBufferView} view
+ */
+ respondWithNewView(view) {
+ const {
+ controller,
+ } = this[kState];
+ // Supposed to assert here that the view's buffer is not
+ // detached, but there's no API available to use to check that.
+ readableByteStreamControllerRespondWithNewView(controller, view);
+ }
+
+ [kInspect](depth, options) {
+ return customInspect(depth, options, 'ReadableStreamBYOBRequest', {
+ view: this.view,
+ controller: this[kState].controller,
+ });
+ }
+}
+
+class InternalReadableStreamBYOBRequest {
+ constructor(controller, view) {
+ this[kState] = {
+ controller,
+ view,
+ };
+ }
+}
+
+class DefaultReadRequest {
+ constructor() {
+ this[kState] = createDeferredPromise();
+ }
+
+ [kChunk](value) {
+ this[kState].resolve?.({ value, done: false });
+ }
+
+ [kClose]() {
+ this[kState].resolve?.({ value: undefined, done: true });
+ }
+
+ [kError](error) {
+ this[kState].reject?.(error);
+ }
+
+ get promise() { return this[kState].promise; }
+}
+
+class ReadIntoRequest {
+ constructor() {
+ this[kState] = createDeferredPromise();
+ }
+
+ [kChunk](value) {
+ this[kState].resolve?.({ value, done: false });
+ }
+
+ [kClose](value) {
+ this[kState].resolve?.({ value, done: true });
+ }
+
+ [kError](error) {
+ this[kState].reject?.(error);
+ }
+
+ get promise() { return this[kState].promise; }
+}
+
+class ReadableStreamDefaultReader {
+ /**
+ * @param {ReadableStream} stream
+ */
+ constructor(stream) {
+ if (!isReadableStream(stream))
+ throw new ERR_INVALID_ARG_TYPE('stream', 'ReadableStream', stream);
+ this[kState] = {
+ readRequests: [],
+ stream: undefined,
+ close: {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ },
+ };
+ setupReadableStreamDefaultReader(this, stream);
+ }
+
+ /**
+ * @returns {Promise<{
+ * value : any,
+ * done : boolean
+ * }>}
+ */
+ read() {
+ if (this[kState].stream === undefined) {
+ return PromiseReject(
+ new ERR_INVALID_STATE.TypeError(
+ 'The reader is not attached to a stream'));
+ }
+ const readRequest = new DefaultReadRequest();
+ readableStreamDefaultReaderRead(this, readRequest);
+ return readRequest.promise;
+ }
+
+ releaseLock() {
+ if (this[kState].stream === undefined)
+ return;
+ if (this[kState].readRequests.length) {
+ throw new ERR_INVALID_STATE.TypeError(
+ 'Cannot release with pending read requests');
+ }
+ readableStreamReaderGenericRelease(this);
+ }
+
+ /**
+ * @readonly
+ * @type {Promise}
+ */
+ get closed() { return this[kState].close.promise; }
+
+ /**
+ * @param {any} reason
+ * @returns {Promise}
+ */
+ cancel(reason) {
+ if (this[kState].stream === undefined) {
+ throw new ERR_INVALID_STATE.TypeError(
+ 'The reader is not attached to a stream');
+ }
+ return readableStreamReaderGenericCancel(this, reason);
+ }
+
+ [kInspect](depth, options) {
+ return customInspect(depth, options, 'ReadableStreamDefaultReader', {
+ stream: this[kState].stream,
+ readRequests: this[kState].readRequests.length,
+ close: this[kState].close.promise,
+ });
+ }
+
+ get [SymbolToStringTag]() { return 'ReadableStreamDefaultReader'; }
+}
+
+class ReadableStreamBYOBReader {
+ /**
+ * @param {ReadableStream} stream
+ */
+ constructor(stream) {
+ if (!isReadableStream(stream))
+ throw new ERR_INVALID_ARG_TYPE('stream', 'ReadableStream', stream);
+ this[kState] = {
+ stream: undefined,
+ requestIntoRequests: [],
+ close: {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ },
+ };
+ setupReadableStreamBYOBReader(this, stream);
+ }
+
+ /**
+ * @param {ArrayBufferView} view
+ * @returns {Promise<{
+ * view : ArrayBufferView,
+ * done : boolean,
+ * }>}
+ */
+ read(view) {
+ if (!isArrayBufferView(view)) {
+ return PromiseReject(
+ new ERR_INVALID_ARG_TYPE(
+ 'view',
+ [
+ 'Buffer',
+ 'TypedArray',
+ 'DataView',
+ ],
+ view));
+ }
+ if (view.byteLength === 0 || view.buffer.byteLength === 0) {
+ return PromiseReject(
+ new ERR_INVALID_ARG_VALUE('View cannot be zero length'));
+ }
+ // Supposed to assert here that the view's buffer is not
+ // detached, but there's no API available to use to check that.
+ if (this[kState].stream === undefined) {
+ return PromiseReject(
+ new ERR_INVALID_STATE('The reader is not attached to a stream'));
+ }
+ const readIntoRequest = new ReadIntoRequest();
+ readableStreamBYOBReaderRead(this, view, readIntoRequest);
+ return readIntoRequest.promise;
+ }
+
+ releaseLock() {
+ if (this[kState].stream === undefined)
+ return;
+ if (this[kState].readIntoRequests.length) {
+ throw new ERR_INVALID_STATE.TypeError(
+ 'Cannot release with pending read requests');
+ }
+ readableStreamReaderGenericRelease(this);
+ }
+
+ /**
+ * @readonly
+ * @type {Promise}
+ */
+ get closed() { return this[kState].close.promise; }
+
+ /**
+ * @param {any} reason
+ * @returns {Promise}
+ */
+ cancel(reason) {
+ if (this[kState].stream === undefined) {
+ throw new ERR_INVALID_STATE.TypeError(
+ 'The reader is not attached to a stream');
+ }
+ return readableStreamReaderGenericCancel(this, reason);
+ }
+
+ [kInspect](depth, options) {
+ return customInspect(depth, options, 'ReadableStreamBYOBReader', {
+ stream: this[kState].stream,
+ requestIntoRequests: this[kState].requestIntoRequests.length,
+ close: this[kState].close.promise,
+ });
+ }
+
+ get [SymbolToStringTag]() { return 'ReadableStreamBYOBReader'; }
+}
+
+class ReadableStreamDefaultController {
+ constructor() {
+ throw new ERR_ILLEGAL_CONSTRUCTOR();
+ }
+
+ /**
+ * @readonly
+ * @type {number}
+ */
+ get desiredSize() {
+ return readableStreamDefaultControllerGetDesiredSize(this);
+ }
+
+ close() {
+ if (!readableStreamDefaultControllerCanCloseOrEnqueue(this))
+ throw new ERR_INVALID_STATE('Controller is already closed');
+ readableStreamDefaultControllerClose(this);
+ }
+
+ /**
+ * @param {any} chunk
+ */
+ enqueue(chunk) {
+ if (!readableStreamDefaultControllerCanCloseOrEnqueue(this))
+ throw new ERR_INVALID_STATE('Controller is already closed');
+ readableStreamDefaultControllerEnqueue(this, chunk);
+ }
+
+ /**
+ * @param {any} error
+ */
+ error(error) {
+ readableStreamDefaultControllerError(this, error);
+ }
+
+ [kCancel](reason) {
+ return readableStreamDefaultControllerCancelSteps(this, reason);
+ }
+
+ [kPull](readRequest) {
+ readableStreamDefaultControllerPullSteps(this, readRequest);
+ }
+
+ [kInspect](depth, options) {
+ return customInspect(
+ depth,
+ options,
+ 'ReadableStreamDefaultController', { });
+ }
+
+ get [SymbolToStringTag]() { return 'ReadableStreamDefaultController'; }
+}
+
+class InternalReadableStreamDefaultController {
+ constructor() {
+ this[kState] = {};
+ }
+}
+
+class ReadableByteStreamController {
+ constructor() {
+ throw new ERR_ILLEGAL_CONSTRUCTOR();
+ }
+
+ /**
+ * @readonly
+ * @type {ReadableStreamBYOBRequest}
+ */
+ get byobRequest() {
+ if (this[kState].byobRequest === null &&
+ this[kState].pendingPullIntos.length) {
+ const {
+ buffer,
+ byteOffset,
+ bytesFilled,
+ byteLength,
+ } = this[kState].pendingPullIntos[0];
+ const view =
+ new Uint8Array(
+ buffer,
+ byteOffset + bytesFilled,
+ byteLength - bytesFilled);
+ this[kState].byobRequest =
+ new InternalReadableStreamBYOBRequest(this, view);
+ }
+ return this[kState].byobRequest;
+ }
+
+ /**
+ * @readonly
+ * @type {number}
+ */
+ get desiredSize() {
+ return readableByteStreamControllerGetDesiredSize(this);
+ }
+
+ close() {
+ if (this[kState].closeRequested)
+ throw new ERR_INVALID_STATE.TypeError('Controller is already closed');
+ if (this[kState].stream[kState].state !== 'readable')
+ throw new ERR_INVALID_STATE.TypeError('ReadableStream is already closed');
+ readableByteStreamControllerClose(this);
+ }
+
+ /**
+ * @param {ArrayBufferView} chunk
+ */
+ enqueue(chunk) {
+ if (!isArrayBufferView(chunk)) {
+ throw new ERR_INVALID_ARG_TYPE(
+ 'chunk',
+ [
+ 'Buffer',
+ 'TypedArray',
+ 'DataView',
+ ],
+ chunk);
+ }
+ if (chunk.byteLength === 0 || chunk.buffer.byteLength === 0) {
+ throw new ERR_INVALID_ARG_VALUE(
+ 'chunk',
+ chunk,
+ 'cannot be zero length');
+ }
+ if (this[kState].closeRequested)
+ throw new ERR_INVALID_STATE.TypeError('Controller is already closed');
+ if (this[kState].stream[kState].state !== 'readable')
+ throw new ERR_INVALID_STATE.TypeError('ReadableStream is already closed');
+ readableByteStreamControllerEnqueue(this, chunk);
+ }
+
+ /**
+ * @param {any} error
+ */
+ error(error) {
+ readableByteStreamControllerError(this, error);
+ }
+
+ [kCancel](reason) {
+ return readableByteStreamControllerCancelSteps(this, reason);
+ }
+
+ [kPull](readRequest) {
+ readableByteStreamControllerPullSteps(this, readRequest);
+ }
+
+ [kInspect](depth, options) {
+ return customInspect(
+ depth,
+ options,
+ 'ReadableByteStreamController', { });
+ }
+
+ get [SymbolToStringTag]() { return 'ReadableByteStreamController'; }
+}
+
+class InternalReadableByteStreamController {
+ constructor() {
+ this[kState] = {};
+ }
+}
+
+class WritableStream extends JSTransferable {
+ /**
+ * @param {UnderlyingSink} [sink]
+ * @param {QueuingStrategy} [strategy]
+ */
+ constructor(sink = null, strategy = {}) {
+ super();
+ const type = sink?.type;
+ if (type !== undefined)
+ throw new ERR_INVALID_ARG_VALUE.RangeError('type', type);
+
+ this[kState] = {
+ close: createDeferredPromise(),
+ closeRequest: {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ },
+ inFlightWriteRequest: {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ },
+ inFlightCloseRequest: {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ },
+ pendingAbortRequest: {
+ abort: {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ },
+ reason: undefined,
+ wasAlreadyErroring: false,
+ },
+ backpressure: false,
+ controller: undefined,
+ state: 'writable',
+ storedError: undefined,
+ writeRequests: [],
+ writer: undefined,
+ transfer: {
+ promise: undefined,
+ port1: undefined,
+ port2: undefined,
+ readable: undefined,
+ },
+ };
+
+ const highWaterMark = strategy?.highWaterMark;
+ const size = strategy?.size;
+
+ setupWritableStreamDefaultControllerFromSink(
+ this,
+ sink,
+ extractHighWaterMark(highWaterMark, 1),
+ extractSizeAlgorithm(size));
+ }
+
+ /**
+ * @readonly
+ * @type {boolean}
+ */
+ get locked() { return isWritableStreamLocked(this); }
+
+ /**
+ * @param {any} reason
+ * @returns {Promise}
+ */
+ abort(reason) {
+ if (isWritableStreamLocked(this))
+ return PromiseReject(new ERR_INVALID_STATE('WritableStream is locked'));
+ return writableStreamAbort(this, reason);
+ }
+
+ /**
+ * @returns {Promise}
+ */
+ close() {
+ if (isWritableStreamLocked(this))
+ return PromiseReject(new ERR_INVALID_STATE('WritableStream is locked'));
+ if (writableStreamCloseQueuedOrInFlight(this)) {
+ return PromiseReject(
+ new ERR_INVALID_STATE('Failure closing WritableStream'));
+ }
+ return writableStreamClose(this);
+ }
+
+ /**
+ * @returns {WritableStreamDefaultWriter}
+ */
+ getWriter() {
+ return new WritableStreamDefaultWriter(this);
+ }
+
+ [kInspect](depth, options) {
+ return customInspect(depth, options, 'WritableStream', {
+ locked: this.locked,
+ state: this[kState].state,
+ });
+ }
+
+ get [SymbolToStringTag]() { return 'WritableStream'; }
+
+ [kTransfer]() {
+ if (this.locked) {
+ this[kState].transfer.port1.close();
+ this[kState].transfer.port1 = undefined;
+ this[kState].transfer.port2 = undefined;
+ throw new DOMException(
+ 'Cannot transfer a locked WritableStream',
+ 'DataCloneError');
+ }
+
+ this[kState].transfer.readable =
+ new ReadableStream(
+ new CrossRealmTransformReadableSource(this[kState].transfer.port1));
+
+ this[kState].transfer.promise = readableStreamPipeTo(
+ this[kState].transfer.readable,
+ this,
+ false,
+ false,
+ false);
+ setPromiseHandled(this[kState].transfer.promise);
+
+ return {
+ data: { port: this[kState].transfer.port2 },
+ deserializeInfo: 'internal/streams/whatwg:TransferedWritableStream'
+ };
+ }
+
+ [kTransferList]() {
+ const { port1, port2 } = new MessageChannel();
+ this[kState].transfer.port1 = port1;
+ this[kState].transfer.port2 = port2;
+ return [ port2 ];
+ }
+
+ [kDeserialize]({ port }) {
+ setupWritableStreamDefaultControllerFromSink(
+ this,
+ new CrossRealmTransformWritableSink(port),
+ 1,
+ () => 1);
+ }
+}
+
+class TransferedWritableStream extends JSTransferable {
+ constructor() {
+ super();
+ this[kState] = {
+ close: createDeferredPromise(),
+ closeRequest: {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ },
+ inFlightWriteRequest: {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ },
+ inFlightCloseRequest: {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ },
+ pendingAbortRequest: {
+ abort: {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ },
+ reason: undefined,
+ wasAlreadyErroring: false,
+ },
+ backpressure: false,
+ controller: undefined,
+ state: 'writable',
+ storedError: undefined,
+ writeRequests: [],
+ writer: undefined,
+ transfer: {
+ promise: undefined,
+ port1: undefined,
+ port2: undefined,
+ readable: undefined,
+ },
+ };
+ }
+}
+
+internalExtend(TransferedWritableStream, WritableStream);
+
+class WritableStreamDefaultWriter {
+ /**
+ * @param {WritableStream} stream
+ */
+ constructor(stream) {
+ if (!isWritableStream(stream))
+ throw new ERR_INVALID_ARG_TYPE('stream', 'WritableStream', stream);
+ this[kState] = {
+ stream: undefined,
+ close: {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ },
+ ready: {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ }
+ };
+ setupWritableStreamDefaultWriter(this, stream);
+ }
+
+ /**
+ * @readonly
+ * @type {Promise}
+ */
+ get closed() { return this[kState].close.promise; }
+
+ /**
+ * @readonly
+ * @type {number}
+ */
+ get desiredSize() {
+ if (this[kState].stream === undefined)
+ throw new ERR_INVALID_STATE('Writer is not bound to a WritableStream');
+ return writableStreamDefaultWriterGetDesiredSize(this);
+ }
+
+ /**
+ * @readonly
+ * @type {Promise}
+ */
+ get ready() { return this[kState].ready.promise; }
+
+ /**
+ * @param {any} reason
+ * @returns {Promise}
+ */
+ abort(reason) {
+ if (this[kState].stream === undefined) {
+ return PromiseReject(
+ new ERR_INVALID_STATE('Writer is not bound to a WritableStream'));
+ }
+ return writableStreamDefaultWriterAbort(this, reason);
+ }
+
+ /**
+ * @returns {Promise}
+ */
+ close() {
+ const {
+ stream,
+ } = this[kState];
+ if (stream === undefined) {
+ return PromiseReject(
+ new ERR_INVALID_STATE('Writer is not bound to a WritableStream'));
+ }
+ if (writableStreamCloseQueuedOrInFlight(stream)) {
+ return PromiseReject(
+ new ERR_INVALID_STATE('Failure to close WritableStream'));
+ }
+ return writableStreamDefaultWriterClose(this);
+ }
+
+ releaseLock() {
+ const {
+ stream,
+ } = this[kState];
+ if (stream === undefined)
+ return;
+ assert(stream[kState].writer !== undefined);
+ writableStreamDefaultWriterRelease(this);
+ }
+
+ /**
+ * @param {any} chunk
+ * @returns {Promise}
+ */
+ write(chunk) {
+ if (this[kState].stream === undefined) {
+ return PromiseReject(
+ new ERR_INVALID_STATE('Writer is not bound to a WritableStream'));
+ }
+ return writableStreamDefaultWriterWrite(this, chunk);
+ }
+
+ [kInspect](depth, options) {
+ return customInspect(depth, options, 'WritableStreamDefaultWriter', {
+ stream: this[kState].stream,
+ close: this[kState].close.promise,
+ ready: this[kState].ready.promise,
+ desiredSize: this.desiredSize,
+ });
+ }
+
+ get [SymbolToStringTag]() { return 'WritableStreamDefaultWriter'; }
+}
+
+class WritableStreamDefaultController {
+ constructor() {
+ throw new ERR_ILLEGAL_CONSTRUCTOR();
+ }
+
+ [kAbort](reason) {
+ const result = this[kState].abortAlgorithm(reason);
+ writableStreamDefaultControllerClearAlgorithms(this);
+ return result;
+ }
+
+ [kError]() {
+ resetQueue(this);
+ }
+
+ /**
+ * @param {any} error
+ */
+ error(error) {
+ if (this[kState].stream[kState].state !== 'writable')
+ return;
+ writableStreamDefaultControllerError(this, error);
+ }
+
+ [kInspect](depth, options) {
+ return customInspect(depth, options, 'WritableStreamDefaultController', {
+ stream: this[kState].stream,
+ });
+ }
+
+ get [SymbolToStringTag]() { return 'WritableStreamDefaultController'; }
+}
+
+class InternalWritableStreamDefaultController {}
+
+class TransformStream extends JSTransferable {
+ /**
+ * @param {Transformer} [transformer]
+ * @param {QueuingStrategy} [writableStrategy]
+ * @param {QueuingStrategy} [readableStrategy]
+ */
+ constructor(
+ transformer = null,
+ writableStrategy = {},
+ readableStrategy = {}) {
+ super();
+ const readableType = transformer?.readableType;
+ const writableType = transformer?.writableType;
+ const start = transformer?.start;
+
+ if (readableType !== undefined)
+ throw new ERR_INVALID_ARG_VALUE('transformer.readableType', readableType);
+ if (writableType !== undefined)
+ throw new ERR_INVALID_ARG_VALUE('transformer.writableType', writableType);
+
+ const readableHighWaterMark = readableStrategy?.highWaterMark;
+ const readableSize = readableStrategy?.size;
+
+ const writableHighWaterMark = writableStrategy?.highWaterMark;
+ const writableSize = writableStrategy?.size;
+
+ const actualReadableHighWaterMark =
+ extractHighWaterMark(readableHighWaterMark, 0);
+ const actualReadableSize = extractSizeAlgorithm(readableSize);
+
+ const actualWritableHighWaterMark =
+ extractHighWaterMark(writableHighWaterMark, 1);
+ const actualWritableSize = extractSizeAlgorithm(writableSize);
+
+ const startPromise = createDeferredPromise();
+
+ initializeTransformStream(
+ this,
+ startPromise,
+ actualWritableHighWaterMark,
+ actualWritableSize,
+ actualReadableHighWaterMark,
+ actualReadableSize);
+
+ setupTransformStreamDefaultControllerFromTransformer(this, transformer);
+
+ if (start !== undefined) {
+ startPromise.resolve(
+ FunctionPrototypeCall(
+ start,
+ transformer,
+ this[kState].controller));
+ } else {
+ startPromise.resolve();
+ }
+ }
+
+ /**
+ * @readonly
+ * @type {ReadableStream}
+ */
+ get readable() { return this[kState].readable; }
+
+ /**
+ * @readonly
+ * @type {WritableStream}
+ */
+ get writable() { return this[kState].writable; }
+
+ [kInspect](depth, options) {
+ return customInspect(depth, options, 'TransformStream', {
+ readable: this.readable,
+ writable: this.writable,
+ backpressure: this[kState].backpressure,
+ });
+ }
+
+ get [SymbolToStringTag]() { return 'TransformStream'; }
+
+ [kTransfer]() {
+ const {
+ readable,
+ writable,
+ } = this[kState];
+ if (readable.locked) {
+ throw new DOMException(
+ 'Cannot transfer a locked ReadableStream',
+ 'DataCloneError');
+ }
+ if (writable.locked) {
+ throw new DOMException(
+ 'Cannot transfer a locked WritableStream',
+ 'DataCloneError');
+ }
+ return {
+ data: {
+ readable,
+ writable,
+ },
+ deserializeInfo: 'internal/streams/whatwg:TransferedTransformStream'
+ };
+ }
+
+ [kTransferList]() {
+ return [ this[kState].readable, this[kState].writable ];
+ }
+
+ [kDeserialize]({ readable, writable }) {
+ this[kState].readable = readable;
+ this[kState].writable = writable;
+ }
+}
+
+class TransferedTransformStream extends JSTransferable {
+ constructor() {
+ super();
+ this[kState] = {
+ readable: undefined,
+ writable: undefined,
+ backpressure: undefined,
+ backpressureChange: {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ },
+ controller: undefined,
+ };
+ }
+}
+
+internalExtend(TransferedTransformStream, TransformStream);
+
+class TransformStreamDefaultController {
+ constructor() {
+ throw new ERR_ILLEGAL_CONSTRUCTOR();
+ }
+
+ /**
+ * @readonly
+ * @type {number}
+ */
+ get desiredSize() {
+ const {
+ stream,
+ } = this[kState];
+ const {
+ readable,
+ } = stream[kState];
+ const {
+ controller: readableController,
+ } = readable[kState];
+ return readableStreamDefaultControllerGetDesiredSize(readableController);
+ }
+
+ /**
+ * @param {any} chunk
+ */
+ enqueue(chunk) {
+ transformStreamDefaultControllerEnqueue(this, chunk);
+ }
+
+ /**
+ * @param {any} reason
+ */
+ error(reason) {
+ transformStreamDefaultControllerError(this, reason);
+ }
+
+ terminate() {
+ transformStreamDefaultControllerTerminate(this);
+ }
+
+ [kInspect](depth, options) {
+ return customInspect(depth, options, 'TransformStreamDefaultController', {
+ stream: this[kState].stream,
+ });
+ }
+
+ get [SymbolToStringTag]() { return 'TransformStreamDefaultController'; }
+}
+
+class InternalTransformStreamDefaultController {}
+
+class ByteLengthQueuingStrategy {
+ /**
+ * @param {{
+ * highWaterMark : number
+ * }} init
+ */
+ constructor(init = {}) {
+ validateObject(init, 'init');
+ const {
+ highWaterMark
+ } = init;
+ if (typeof highWaterMark !== 'number') {
+ throw new ERR_INVALID_ARG_TYPE(
+ 'init.highWaterMark',
+ 'number',
+ highWaterMark);
+ }
+ this[kState] = {
+ highWaterMark,
+ };
+ this.size =
+ FunctionPrototypeBind(
+ ByteLengthQueuingStrategy.prototype.size,
+ this);
+ }
+
+ /**
+ * @readonly
+ * @type {number}
+ */
+ get highWaterMark() {
+ return this[kState].highWaterMark;
+ }
+
+ /**
+ * @type {QueuingStrategySize}
+ */
+ size(chunk) { return chunk?.byteLength | 0; }
+
+ [kInspect](depth, options) {
+ return customInspect(depth, options, 'ByteLengthQueuingStrategy', {
+ highWaterMark: this.highWaterMark,
+ });
+ }
+
+ get [SymbolToStringTag]() { return 'ByteLengthQueuingStrategy'; }
+}
+
+class CountQueuingStrategy {
+ /**
+ * @param {{
+ * highWaterMark : number
+ * }} init
+ */
+ constructor(init = {}) {
+ validateObject(init, 'init');
+ const {
+ highWaterMark
+ } = init;
+ if (typeof highWaterMark !== 'number') {
+ throw new ERR_INVALID_ARG_TYPE(
+ 'init.highWaterMark',
+ 'number',
+ highWaterMark);
+ }
+ this[kState] = {
+ highWaterMark,
+ };
+ this.size =
+ FunctionPrototypeBind(
+ CountQueuingStrategy.prototype.size,
+ this);
+ }
+
+ /**
+ * @readonly
+ * @type {number}
+ */
+ get highWaterMark() {
+ return this[kState].highWaterMark;
+ }
+
+ /**
+ * @type {QueuingStrategySize}
+ */
+ size() { return 1; }
+
+ [kInspect](depth, options) {
+ return customInspect(depth, options, 'CountQueuingStrategy', {
+ highWaterMark: this.highWaterMark,
+ });
+ }
+
+ get [SymbolToStringTag]() { return 'CountQueuingStrategy'; }
+}
+
+class CrossRealmTransformReadableSource {
+ constructor(port) {
+ this[kState] = {
+ port,
+ controller: undefined,
+ };
+
+ port.onmessage = ({ data }) => {
+ const {
+ controller,
+ } = this[kState];
+ const {
+ type,
+ value,
+ } = data;
+ switch (type) {
+ case 'chunk':
+ readableStreamDefaultControllerEnqueue(
+ controller,
+ value);
+ break;
+ case 'close':
+ readableStreamDefaultControllerClose(controller);
+ port.close();
+ break;
+ case 'error':
+ readableStreamDefaultControllerError(controller, value);
+ port.close();
+ break;
+ }
+ };
+
+ port.onmessageerror = () => {
+ const error = new DOMException(
+ 'Internal transfered ReadableStream error',
+ 'DataCloneError');
+ port.postMesssage({ type: 'error', value: error });
+ readableStreamDefaultControllerError(
+ this[kState].controller,
+ error);
+ port.close();
+ };
+ }
+
+ start(controller) {
+ this[kState].controller = controller;
+ }
+
+ async pull() {
+ this[kState].port.postMessage({ type: 'pull' });
+ }
+
+ async cancel(reason) {
+ try {
+ this[kState].port.postMessage({ type: 'error', value: reason });
+ } catch (error) {
+ this[kState].port.postMessage({ type: 'error', value: error });
+ throw error;
+ } finally {
+ this[kState].port.close();
+ }
+ }
+}
+
+class CrossRealmTransformWritableSink {
+ constructor(port) {
+ this[kState] = {
+ port,
+ controller: undefined,
+ backpressurePromise: createDeferredPromise(),
+ };
+
+ port.onmessage = ({ data }) => {
+ assert(typeof data === 'object');
+ const {
+ type,
+ value
+ } = { ...data };
+ assert(typeof type === 'string');
+ switch (type) {
+ case 'pull':
+ if (this[kState].backpressurePromise !== undefined)
+ this[kState].backpressurePromise.resolve?.();
+ this[kState].backpressurePromise = undefined;
+ break;
+ case 'error':
+ writableStreamDefaultControllerErrorIfNeeded(
+ this[kState].controller,
+ value);
+ if (this[kState].backpressurePromise !== undefined)
+ this[kState].backpressurePromise.resolve?.();
+ this[kState].backpressurePromise = undefined;
+ break;
+ }
+ };
+ port.onmessageerror = () => {
+ const error = new DOMException(
+ 'Internal transfered ReadableStream error',
+ 'DataCloneError');
+ port.postMesssage({ type: 'error', value: error });
+ writableStreamDefaultControllerErrorIfNeeded(
+ this[kState].controller,
+ error);
+ port.close();
+ };
+
+ }
+
+ start(controller) {
+ this[kState].controller = controller;
+ }
+
+ async write(chunk) {
+ if (this[kState].backpressurePromise === undefined) {
+ this[kState].backpressurePromise = {
+ promise: PromiseResolve(),
+ resolve: undefined,
+ reject: undefined,
+ };
+ }
+ await this[kState].backpressurePromise.promise;
+ this[kState].backpressurePromise = createDeferredPromise();
+ try {
+ this[kState].port.postMessage({ type: 'chunk', value: chunk });
+ } catch (error) {
+ this[kState].port.postMessage({ type: 'error', value: error });
+ this[kState].port.close();
+ throw error;
+ }
+ }
+
+ async close() {
+ this[kState].port.postMessage({ type: 'close' });
+ this[kState].port.close();
+ }
+
+ async abort(reason) {
+ try {
+ this[kState].port.postMessage({ type: 'error', value: reason });
+ } catch (error) {
+ this[kState].port.postMessage({ type: 'error', value: error });
+ throw error;
+ } finally {
+ this[kState].port.close();
+ }
+ }
+}
+
+internalExtend(
+ InternalReadableByteStreamController,
+ ReadableByteStreamController);
+
+internalExtend(
+ InternalReadableStreamDefaultController,
+ ReadableStreamDefaultController);
+
+internalExtend(
+ InternalReadableStreamBYOBRequest,
+ ReadableStreamBYOBRequest);
+
+internalExtend(
+ InternalWritableStreamDefaultController,
+ WritableStreamDefaultController);
+
+internalExtend(
+ InternalTransformStreamDefaultController,
+ TransformStreamDefaultController);
+
+function isReadableStream(value) {
+ return value[kState] !== undefined &&
+ value[SymbolToStringTag] === 'ReadableStream';
+}
+
+function isWritableStream(value) {
+ return value[kState] !== undefined &&
+ value[SymbolToStringTag] === 'WritableStream';
+}
+
+function isTransformStream(value) {
+ return value[kState] !== undefined &&
+ value[SymbolToStringTag] === 'TransformStream';
+}
+
+function isReadableByteStreamController(value) {
+ return value[kState] !== undefined &&
+ value[SymbolToStringTag] === 'ReadableByteStreamController';
+}
+
+function internalExtend(ctor, actual) {
+ ctor.prototype.constructor = actual;
+ ObjectSetPrototypeOf(ctor.prototype, actual.prototype);
+}
+
+function transferArrayBuffer(buffer) {
+ const res = detachArrayBuffer(buffer);
+ if (res === undefined)
+ throw new ERR_INVALID_ARG_VALUE('buffer', 'ArrayBuffer', buffer);
+ return res;
+}
+
+function setPromiseHandled(promise) {
+ // Alternatively, we could use the native API
+ // MarkAsHandled, but this avoids the extra boundary cross
+ // and is hopefully faster at the cost of an extra Promise
+ // allocation.
+ PromisePrototypeThen(promise, () => {}, () => {});
+}
+
+function dequeueValue(controller) {
+ assert(controller[kState].queue !== undefined);
+ assert(controller[kState].queueTotalSize !== undefined);
+ assert(controller[kState].queue.length);
+ const {
+ value,
+ size,
+ } = ArrayPrototypeShift(controller[kState].queue);
+ controller[kState].queueTotalSize =
+ MathMax(0, controller[kState].queueTotalSize - size);
+ return value;
+}
+
+function resetQueue(controller) {
+ assert(controller[kState].queue !== undefined);
+ assert(controller[kState].queueTotalSize !== undefined);
+ controller[kState].queue = [];
+ controller[kState].queueTotalSize = 0;
+}
+
+function peekQueueValue(controller) {
+ assert(controller[kState].queue !== undefined);
+ assert(controller[kState].queueTotalSize !== undefined);
+ assert(controller[kState].queue.length);
+ return controller[kState].queue[0].value;
+}
+
+function enqueueValueWithSize(controller, value, size) {
+ assert(controller[kState].queue !== undefined);
+ assert(controller[kState].queueTotalSize !== undefined);
+ if (typeof size !== 'number' ||
+ size < 0 ||
+ size === Infinity) {
+ throw new ERR_INVALID_ARG_VALUE.RangeError('size', size);
+ }
+ ArrayPrototypePush(controller[kState].queue, { value, size });
+ controller[kState].queueTotalSize += size;
+}
+
+// ---- ReadableStream Implementation
+
+function readableStreamPipeTo(
+ source,
+ dest,
+ preventClose,
+ preventAbort,
+ preventCancel,
+ signal) {
+
+ const reader = new ReadableStreamDefaultReader(source);
+ const writer = new WritableStreamDefaultWriter(dest);
+
+ source[kState].disturbed = true;
+
+ let shuttingDown = false;
+
+ const promise = createDeferredPromise();
+
+ function finalize(error) {
+ writableStreamDefaultWriterRelease(writer);
+ readableStreamReaderGenericRelease(reader);
+ if (signal !== undefined)
+ signal.removeEventListener('abort', abortAlgorithm);
+ if (error !== undefined)
+ promise.reject(error);
+ else
+ promise.resolve();
+ }
+
+ function shutdownWithAnAction(action, originalError) {
+ if (shuttingDown) return;
+ shuttingDown = true;
+ if (dest[kState].state === 'writable' &&
+ !writableStreamCloseQueuedOrInFlight(dest)) {
+ // TODO: If any chunks have been read but but not yet written, write them
+ // to dest. Wait until every chunk that has been read has been written
+ }
+ PromisePrototypeThen(action, finalize, finalize);
+ }
+
+ function shutdown(error) {
+ if (shuttingDown) return;
+ shuttingDown = true;
+ if (dest[kState].state === 'writable' &&
+ !writableStreamCloseQueuedOrInFlight(dest)) {
+ // TODO: if any chunks have been read but not yet written write them to
+ // dest. Wait until every chunk that has been read has been written
+ }
+ finalize(error);
+ }
+
+ function abortAlgorithm() {
+ const error = new AbortError();
+ const actions = [];
+ if (!preventAbort) {
+ if (dest[kState].state === 'writable')
+ ArrayPrototypePush(actions, writableStreamAbort(dest, error));
+ else
+ ArrayPrototypePush(actions, PromiseResolve());
+ }
+ if (!preventCancel) {
+ if (source[kState].state === 'readable')
+ ArrayPrototypePush(actions, readableStreamCancel(source, error));
+ else
+ ArrayPrototypePush(actions, PromiseResolve());
+ }
+ shutdownWithAnAction(PromiseAll(actions), error);
+ }
+
+ // The streams spec requires that the public API must not
+ // be used when interacting with the reader and writer,
+ // so we bypass those here by duplicating in a separate
+ // function that we can be sure is not modified.
+ function read() {
+ const readRequest = new DefaultReadRequest();
+ readableStreamDefaultReaderRead(reader, readRequest);
+ return readRequest.promise;
+ }
+
+ function write(chunk) {
+ return writableStreamDefaultWriterWrite(writer, chunk);
+ }
+
+ function watchErrored(stream, promise, action) {
+ if (stream[kState].state === 'errored')
+ action(stream[kState].storedError);
+ else
+ PromisePrototypeCatch(promise, action);
+ }
+
+ function watchClosed(stream, promise, action) {
+ if (stream[kState].state === 'closed')
+ action(stream[kState].storedError);
+ else
+ PromisePrototypeThen(promise, action);
+ }
+
+ async function step() {
+ if (shuttingDown)
+ return true;
+ await writer[kState].ready.promise;
+ const { value, done } = await read();
+ await write(value);
+ return done;
+ }
+
+ async function run() {
+ // Run until step resolves as true
+ while (!await step()) {}
+ }
+
+ if (signal !== undefined) {
+ if (signal.aborted) {
+ abortAlgorithm();
+ return promise.promise;
+ }
+ signal.addEventListener('abort', abortAlgorithm, { once: true });
+ }
+
+ setPromiseHandled(run());
+
+ watchErrored(source, reader[kState].close.promise, (error) => {
+ if (!preventAbort)
+ shutdownWithAnAction(writableStreamAbort(dest, error), error);
+ else
+ shutdown(error);
+ });
+
+ watchErrored(dest, writer[kState].close.promise, (error) => {
+ if (!preventCancel)
+ shutdownWithAnAction(readableStreamCancel(dest, error), error);
+ else
+ shutdown(error);
+ });
+
+ watchClosed(source, reader[kState].close.promise, () => {
+ if (!preventClose) {
+ shutdownWithAnAction(
+ writableStreamDefaultWriterCloseWithErrorPropagation(writer));
+ } else {
+ shutdown();
+ }
+ });
+
+ if (writableStreamCloseQueuedOrInFlight(dest) ||
+ dest[kState].state === 'closed') {
+ const error = new ERR_INVALID_STATE.TypeError(
+ 'Destination WritableStream is closed');
+ if (!preventCancel)
+ shutdownWithAnAction(readableStreamCancel(source, error), error);
+ else
+ shutdown(error);
+ }
+
+ return promise.promise;
+}
+
+function readableStreamTee(stream, cloneForBranch2) {
+ const reader = new ReadableStreamDefaultReader(stream);
+ let reading = false;
+ let canceled1 = false;
+ let canceled2 = false;
+ let reason1;
+ let reason2;
+ let branch1;
+ let branch2;
+ const cancelPromise = createDeferredPromise();
+
+ async function pullAlgorithm() {
+ if (reading) return;
+ reading = true;
+ const readRequest = {
+ [kChunk](value) {
+ queueMicrotask(() => {
+ reading = false;
+ const value1 = value;
+ let value2 = value;
+ if (!canceled2 && cloneForBranch2) {
+ // Structured Clone
+ value2 = deserialize(serialize(value2));
+ }
+ if (!canceled1) {
+ readableStreamDefaultControllerEnqueue(
+ branch1[kState].controller,
+ value1);
+ }
+ if (!canceled2) {
+ readableStreamDefaultControllerEnqueue(
+ branch2[kState].controller,
+ value2);
+ }
+ });
+ },
+ [kClose]() {
+ reading = false;
+ if (!canceled1)
+ readableStreamDefaultControllerClose(branch1[kState].controller);
+ if (!canceled2)
+ readableStreamDefaultControllerClose(branch2[kState].controller);
+ if (!canceled1 || !canceled2)
+ cancelPromise.resolve();
+ },
+ [kError]() {
+ reading = false;
+ },
+ };
+ readableStreamDefaultReaderRead(reader, readRequest);
+ }
+
+ function cancel1Algorithm(reason) {
+ canceled1 = true;
+ reason1 = reason;
+ if (canceled2) {
+ const compositeReason = [reason1, reason2];
+ cancelPromise.resolve(readableStreamCancel(stream, compositeReason));
+ }
+ return cancelPromise.promise;
+ }
+
+ function cancel2Algorithm(reason) {
+ canceled2 = true;
+ reason2 = reason;
+ if (canceled1) {
+ const compositeReason = [reason1, reason2];
+ cancelPromise.resolve(readableStreamCancel(stream, compositeReason));
+ }
+ return cancelPromise.promise;
+ }
+
+ branch1 = new ReadableStream({
+ start: nonOpStart,
+ pull: pullAlgorithm,
+ cancel: cancel1Algorithm,
+ });
+
+ branch2 = new ReadableStream({
+ start: nonOpStart,
+ pull: pullAlgorithm,
+ cancel: cancel2Algorithm,
+ });
+
+ PromisePrototypeCatch(
+ reader[kState].close.promise,
+ (error) => {
+ readableStreamDefaultControllerError(branch1[kState].controller, error);
+ readableStreamDefaultControllerError(branch2[kState].controller, error);
+ });
+
+ return [branch1, branch2];
+}
+
+function readableByteStreamControllerConvertPullIntoDescriptor(desc) {
+ const {
+ buffer,
+ bytesFilled,
+ byteLength,
+ byteOffset,
+ ctor,
+ elementSize,
+ } = desc;
+ assert(bytesFilled <= byteLength);
+ assert(bytesFilled % elementSize === 0);
+ const transferedBuffer = transferArrayBuffer(buffer);
+ return new ctor(transferedBuffer, byteOffset, bytesFilled / elementSize);
+}
+
+function isReadableStreamLocked(stream) {
+ return stream[kState].reader !== undefined;
+}
+
+function readableStreamCancel(stream, reason) {
+ stream[kState].disturbed = true;
+ switch (stream[kState].state) {
+ case 'closed':
+ return PromiseResolve();
+ case 'errored':
+ return PromiseReject(stream[kState].storedError);
+ }
+ readableStreamClose(stream);
+ const {
+ reader,
+ } = stream[kState];
+ if (reader !== undefined && readableStreamHasBYOBReader(stream)) {
+ for (let n = 0; n < reader[kState].readIntoRequests.length; n++)
+ reader[kState].readIntoRequests[n][kClose]();
+ reader[kState].readIntoRequests = [];
+ }
+
+ try {
+ return PromisePrototypeThen(
+ PromiseResolve(stream[kState].controller[kCancel](reason)));
+ } catch (error) {
+ return PromiseReject(error);
+ }
+}
+
+function readableStreamClose(stream) {
+ assert(stream[kState].state === 'readable');
+ stream[kState].state = 'closed';
+
+ const {
+ reader,
+ } = stream[kState];
+
+ if (reader === undefined)
+ return;
+
+ reader[kState].close.resolve();
+
+ if (readableStreamHasDefaultReader(stream)) {
+ for (let n = 0; n < reader[kState].readRequests.length; n++)
+ reader[kState].readRequests[n][kClose]();
+ reader[kState].readRequests = [];
+ }
+}
+
+function readableStreamError(stream, error) {
+ assert(stream[kState].state === 'readable');
+ stream[kState].state = 'errored';
+ stream[kState].storedError = error;
+
+ const {
+ reader
+ } = stream[kState];
+
+ if (reader === undefined)
+ return;
+
+ reader[kState].close.reject(error);
+ setPromiseHandled(reader[kState].close.promise);
+
+ if (readableStreamHasDefaultReader(stream)) {
+ for (let n = 0; n < reader[kState].readRequests.length; n++)
+ reader[kState].readRequests[n][kError](error);
+ reader[kState].readRequests = [];
+ } else {
+ assert(readableStreamHasBYOBReader(stream));
+ for (let n = 0; n < reader[kState].readIntoRequests.length; n++)
+ reader[kState].readIntoRequests[n][kError](error);
+ reader[kState].readIntoRequests = [];
+ }
+}
+
+function readableStreamHasDefaultReader(stream) {
+ const {
+ reader,
+ } = stream[kState];
+
+ if (reader === undefined)
+ return false;
+
+ return reader[kState] !== undefined &&
+ reader[SymbolToStringTag] === 'ReadableStreamDefaultReader';
+}
+
+function readableStreamGetNumReadRequests(stream) {
+ assert(readableStreamHasDefaultReader(stream));
+ return stream[kState].reader[kState].readRequests.length;
+}
+
+function readableStreamHasBYOBReader(stream) {
+ const {
+ reader,
+ } = stream[kState];
+
+ if (reader === undefined)
+ return false;
+
+ return reader[kState] !== undefined &&
+ reader[SymbolToStringTag] === 'ReadableStreamBYOBReader';
+}
+
+function readableStreamGetNumReadIntoRequests(stream) {
+ assert(readableStreamHasBYOBReader(stream));
+ return stream[kState].reader[kState].readIntoRequests.length;
+}
+
+function readableStreamFulfillReadRequest(stream, chunk, done) {
+ assert(readableStreamHasDefaultReader(stream));
+ const {
+ reader,
+ } = stream[kState];
+ assert(reader[kState].readRequests.length);
+ const readRequest = ArrayPrototypeShift(reader[kState].readRequests);
+ if (done)
+ readRequest[kClose]();
+ else
+ readRequest[kChunk](chunk);
+}
+
+function readableStreamFulfillReadIntoRequest(stream, chunk, done) {
+ assert(readableStreamHasBYOBReader(stream));
+ const {
+ reader,
+ } = stream[kState];
+ assert(reader[kState].readIntoRequests.length);
+ const readIntoRequest = ArrayPrototypeShift(reader[kState].readIntoRequests);
+ if (done)
+ readIntoRequest[kClose](chunk);
+ else
+ readIntoRequest[kChunk](chunk);
+}
+
+function readableStreamAddReadRequest(stream, readRequest) {
+ assert(readableStreamHasDefaultReader(stream));
+ assert(stream[kState].state === 'readable');
+ ArrayPrototypePush(stream[kState].reader[kState].readRequests, readRequest);
+}
+
+function readableStreamAddReadIntoRequest(stream, readIntoRequest) {
+ assert(readableStreamHasBYOBReader(stream));
+ assert(stream[kState].state !== 'errored');
+ ArrayPrototypePush(
+ stream[kState].reader[kState].readIntoRequests,
+ readIntoRequest);
+}
+
+function readableStreamReaderGenericCancel(reader, reason) {
+ const {
+ stream,
+ } = reader[kState];
+ assert(stream !== undefined);
+ return readableStreamCancel(stream, reason);
+}
+
+function readableStreamReaderGenericInitialize(reader, stream) {
+ reader[kState].stream = stream;
+ stream[kState].reader = reader;
+ switch (stream[kState].state) {
+ case 'readable':
+ reader[kState].close = createDeferredPromise();
+ break;
+ case 'closed':
+ reader[kState].close = {
+ promise: PromiseResolve(),
+ resolve: undefined,
+ reject: undefined,
+ };
+ break;
+ case 'errored':
+ reader[kState].close = {
+ promise: PromiseReject(stream[kState].storedError),
+ resolve: undefined,
+ reject: undefined,
+ };
+ setPromiseHandled(reader[kState].close.promise);
+ break;
+ }
+}
+
+function readableStreamReaderGenericRelease(reader) {
+ const {
+ stream,
+ } = reader[kState];
+ assert(stream !== undefined);
+ assert(stream[kState].reader === reader);
+
+ if (stream[kState].state === 'readable') {
+ reader[kState].close.reject?.(
+ new ERR_INVALID_STATE.TypeError('Reader released'));
+ } else {
+ reader[kState].close = {
+ promise: PromiseReject(
+ new ERR_INVALID_STATE.TypeError('Reader released')),
+ resolve: undefined,
+ reject: undefined,
+ };
+ }
+ setPromiseHandled(reader[kState].close.promise);
+ stream[kState].reader = undefined;
+ reader[kState].stream = undefined;
+}
+
+function readableStreamBYOBReaderRead(reader, view, readIntoRequest) {
+ const {
+ stream,
+ } = reader[kState];
+ assert(stream !== undefined);
+ stream[kState].disturbed = true;
+ if (stream[kState].state === 'errored') {
+ readIntoRequest[kError](stream[kState].storedError);
+ return;
+ }
+ readableByteStreamControllerPullInto(
+ stream[kState].controller,
+ view,
+ readIntoRequest);
+}
+
+function readableStreamDefaultReaderRead(reader, readRequest) {
+ const {
+ stream,
+ } = reader[kState];
+ assert(stream !== undefined);
+ stream[kState].disturbed = true;
+ switch (stream[kState].state) {
+ case 'closed':
+ readRequest[kClose]();
+ break;
+ case 'errored':
+ readRequest[kError](stream[kState].storedError);
+ break;
+ case 'readable':
+ stream[kState].controller[kPull](readRequest);
+ }
+}
+
+function setupReadableStreamBYOBReader(reader, stream) {
+ if (isReadableStreamLocked(stream))
+ throw new ERR_INVALID_STATE('ReadableStream is locked');
+ const {
+ controller,
+ } = stream[kState];
+ if (!isReadableByteStreamController(controller))
+ throw new ERR_INVALID_ARG_VALUE('reader', reader, 'must be a byte stream');
+ readableStreamReaderGenericInitialize(reader, stream);
+ reader[kState].readIntoRequests = [];
+}
+
+function setupReadableStreamDefaultReader(reader, stream) {
+ if (isReadableStreamLocked(stream))
+ throw new ERR_INVALID_STATE('ReadableStream is locked');
+ readableStreamReaderGenericInitialize(reader, stream);
+ reader[kState].readRequests = [];
+}
+
+function readableStreamDefaultControllerClose(controller) {
+ if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller))
+ return;
+ controller[kState].closeRequested = true;
+ if (!controller[kState].queue.length) {
+ readableStreamDefaultControllerClearAlgorithms(controller);
+ readableStreamClose(controller[kState].stream);
+ }
+}
+
+function readableStreamDefaultControllerEnqueue(controller, chunk) {
+ if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller))
+ return;
+
+ const {
+ stream,
+ } = controller[kState];
+
+ if (isReadableStreamLocked(stream) &&
+ readableStreamGetNumReadRequests(stream)) {
+ readableStreamFulfillReadRequest(stream, chunk, false);
+ } else {
+ try {
+ const chunkSize = controller[kState].sizeAlgorithm(chunk);
+ enqueueValueWithSize(controller, chunk, chunkSize);
+ } catch (error) {
+ readableStreamDefaultControllerError(controller, error);
+ return;
+ }
+ }
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+}
+
+function readableStreamDefaultControllerHasBackpressure(controller) {
+ return !readableStreamDefaultControllerShouldCallPull(controller);
+}
+
+function readableStreamDefaultControllerCanCloseOrEnqueue(controller) {
+ const {
+ stream,
+ } = controller[kState];
+ return !controller[kState].closeRequested &&
+ stream[kState].state === 'readable';
+}
+
+function readableStreamDefaultControllerGetDesiredSize(controller) {
+ const {
+ stream,
+ highWaterMark,
+ queueTotalSize,
+ } = controller[kState];
+ switch (stream[kState].state) {
+ case 'errored': return null;
+ case 'closed': return 0;
+ default:
+ return highWaterMark - queueTotalSize;
+ }
+}
+
+function readableStreamDefaultControllerShouldCallPull(controller) {
+ const {
+ stream,
+ } = controller[kState];
+ if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller) ||
+ !controller[kState].started)
+ return false;
+
+ if (isReadableStreamLocked(stream) &&
+ readableStreamGetNumReadRequests(stream)) {
+ return true;
+ }
+
+ const desiredSize = readableStreamDefaultControllerGetDesiredSize(controller);
+ assert(desiredSize !== null);
+
+ return desiredSize > 0;
+}
+
+function readableStreamDefaultControllerCallPullIfNeeded(controller) {
+ if (!readableStreamDefaultControllerShouldCallPull(controller))
+ return;
+ if (controller[kState].pulling) {
+ controller[kState].pullAgain = true;
+ return;
+ }
+ assert(!controller[kState].pullAgain);
+ controller[kState].pulling = true;
+ PromisePrototypeThen(
+ (async () => controller[kState].pullAlgorithm())(),
+ () => {
+ controller[kState].pulling = false;
+ if (controller[kState].pullAgain) {
+ controller[kState].pullAgain = false;
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+ }
+ },
+ (error) => readableStreamDefaultControllerError(controller, error));
+}
+
+function readableStreamDefaultControllerClearAlgorithms(controller) {
+ controller[kState].pullAlgorithm = undefined;
+ controller[kState].cancelAlgorithm = undefined;
+ controller[kState].sizeAlgorithm = undefined;
+}
+
+function readableStreamDefaultControllerError(controller, error) {
+ const {
+ stream,
+ } = controller[kState];
+ if (stream[kState].state === 'readable') {
+ resetQueue(controller);
+ readableStreamDefaultControllerClearAlgorithms(controller);
+ readableStreamError(stream, error);
+ }
+}
+
+function readableStreamDefaultControllerCancelSteps(controller, reason) {
+ resetQueue(controller);
+ const result = controller[kState].cancelAlgorithm(reason);
+ readableStreamDefaultControllerClearAlgorithms(controller);
+ return result;
+}
+
+function readableStreamDefaultControllerPullSteps(controller, readRequest) {
+ const {
+ stream,
+ queue,
+ } = controller[kState];
+ if (queue.length) {
+ const chunk = dequeueValue(controller);
+ if (controller[kState].closeRequested && !queue.length) {
+ readableStreamDefaultControllerClearAlgorithms(controller);
+ readableStreamClose(stream);
+ } else {
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+ }
+ readRequest[kChunk](chunk);
+ return;
+ }
+ readableStreamAddReadRequest(stream, readRequest);
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+}
+
+function setupReadableStreamDefaultController(
+ stream,
+ controller,
+ startAlgorithm,
+ pullAlgorithm,
+ cancelAlgorithm,
+ highWaterMark,
+ sizeAlgorithm) {
+ assert(stream[kState].controller === undefined);
+ controller[kState] = {
+ cancelAlgorithm,
+ closeRequested: false,
+ highWaterMark,
+ pullAgain: false,
+ pullAlgorithm,
+ pulling: false,
+ queue: [],
+ queueTotalSize: 0,
+ started: false,
+ sizeAlgorithm,
+ stream,
+ };
+ stream[kState].controller = controller;
+
+ PromisePrototypeThen(
+ (async () => startAlgorithm())(),
+ () => {
+ controller[kState].started = true;
+ assert(!controller[kState].pulling);
+ assert(!controller[kState].pullAgain);
+ readableStreamDefaultControllerCallPullIfNeeded(controller);
+ },
+ (error) => readableStreamDefaultControllerError(controller, error));
+}
+
+function setupReadableStreamDefaultControllerFromSource(
+ stream,
+ source,
+ highWaterMark,
+ sizeAlgorithm) {
+ const controller = new InternalReadableStreamDefaultController();
+ const start = source?.start;
+ const pull = source?.pull;
+ const cancel = source?.cancel;
+ const startAlgorithm = start ?
+ FunctionPrototypeBind(start, source, controller) :
+ nonOpStart;
+ const pullAlgorithm = pull ?
+ FunctionPrototypeBind(pull, source, controller) :
+ nonOpPull;
+ const cancelAlgorithm = cancel ?
+ FunctionPrototypeBind(cancel, source) :
+ nonOpCancel;
+ setupReadableStreamDefaultController(
+ stream,
+ controller,
+ startAlgorithm,
+ pullAlgorithm,
+ cancelAlgorithm,
+ highWaterMark,
+ sizeAlgorithm);
+}
+
+function readableByteStreamControllerClose(controller) {
+ const {
+ closeRequested,
+ pendingPullIntos,
+ queueTotalSize,
+ stream,
+ } = controller[kState];
+
+ if (closeRequested || stream[kState].state !== 'readable')
+ return;
+
+ if (queueTotalSize) {
+ controller[kState].closeRequested = true;
+ return;
+ }
+
+ if (pendingPullIntos.length) {
+ const firstPendingPullInto = pendingPullIntos[0];
+ if (firstPendingPullInto.bytesFilled > 0) {
+ const error = new ERR_INVALID_STATE.TypeError('Partial read');
+ readableByteStreamControllerError(controller, error);
+ throw error;
+ }
+ }
+
+ readableByteStreamControllerClearAlgorithms(controller);
+ readableStreamClose(stream);
+}
+
+function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) {
+ assert(stream[kState].state !== 'errored');
+ let done = false;
+ if (stream[kState].state === 'closed') {
+ desc.bytesFilled = 0;
+ done = true;
+ }
+
+ const filledView =
+ readableByteStreamControllerConvertPullIntoDescriptor(desc);
+
+ if (desc.type === 'default')
+ readableStreamFulfillReadRequest(stream, filledView, done);
+ else {
+ assert(desc.type === 'byob');
+ readableStreamFulfillReadIntoRequest(stream, filledView, done);
+ }
+}
+
+function readableByteStreamControllerInvalidateBYOBRequest(controller) {
+ if (controller[kState].byobRequest === null)
+ return;
+ controller[kState].byobRequest[kState].controller = undefined;
+ controller[kState].byobRequest[kState].view = null;
+ controller[kState].byobRequest = null;
+}
+
+function readableByteStreamControllerClearAlgorithms(controller) {
+ controller[kState].pullAlgorithm = undefined;
+ controller[kState].cancelAlgorithm = undefined;
+}
+
+function readableByteStreamControllerClearPendingPullIntos(controller) {
+ readableByteStreamControllerInvalidateBYOBRequest(controller);
+ controller[kState].pendingPullIntos = [];
+}
+
+function readableByteStreamControllerGetDesiredSize(controller) {
+ const {
+ stream,
+ highWaterMark,
+ queueTotalSize,
+ } = controller[kState];
+ switch (stream[kState].state) {
+ case 'errored': return null;
+ case 'closed': return 0;
+ default: return highWaterMark - queueTotalSize;
+ }
+}
+
+function readableByteStreamControllerShouldCallPull(controller) {
+ const {
+ stream,
+ } = controller[kState];
+ if (stream[kState].state !== 'readable' ||
+ controller[kState].closeRequested ||
+ !controller[kState].started) {
+ return false;
+ }
+ if (readableStreamHasDefaultReader(stream) &&
+ readableStreamGetNumReadRequests(stream) > 0) {
+ return true;
+ }
+
+ if (readableStreamHasBYOBReader(stream) &&
+ readableStreamGetNumReadIntoRequests(stream) > 0) {
+ return true;
+ }
+
+ const desiredSize = readableByteStreamControllerGetDesiredSize(controller);
+ assert(desiredSize !== null);
+
+ return desiredSize > 0;
+}
+
+function readableByteStreamControllerHandleQueueDrain(controller) {
+ const {
+ closeRequested,
+ queueTotalSize,
+ stream,
+ } = controller[kState];
+ assert(stream[kState].state === 'readable');
+ if (!queueTotalSize && closeRequested) {
+ readableByteStreamControllerClearAlgorithms(controller);
+ readableStreamClose(stream);
+ return;
+ }
+ readableByteStreamControllerCallPullIfNeeded(controller);
+}
+
+function readableByteStreamControllerPullInto(
+ controller,
+ view,
+ readIntoRequest) {
+ const {
+ closeRequested,
+ stream,
+ pendingPullIntos,
+ } = controller[kState];
+ let elementSize = 1;
+ let ctor = DataViewCtor;
+ if (isArrayBufferView(view) && !isDataView(view)) {
+ elementSize = view.constructor.BYTES_PER_ELEMENT;
+ ctor = view.constructor;
+ }
+ const {
+ buffer,
+ byteOffset,
+ byteLength,
+ } = view;
+ let transferedBuffer;
+ try {
+ transferedBuffer = transferArrayBuffer(buffer);
+ } catch (error) {
+ readIntoRequest[kError](error);
+ return;
+ }
+ const desc = {
+ buffer: transferedBuffer,
+ byteOffset,
+ byteLength,
+ bytesFilled: 0,
+ elementSize,
+ ctor,
+ type: 'byob',
+ };
+ if (pendingPullIntos.length) {
+ ArrayPrototypePush(pendingPullIntos, desc);
+ readableStreamAddReadIntoRequest(stream, readIntoRequest);
+ return;
+ }
+ if (stream[kState].state === 'closed') {
+ const emptyView = new ctor(desc.buffer, byteOffset, 0);
+ readIntoRequest[kClose](emptyView);
+ return;
+ }
+ if (controller[kState].queueTotalSize) {
+ if (readableByteStreamControllerFillPullIntoDescriptorFromQueue(
+ controller,
+ desc)) {
+ const filledView =
+ readableByteStreamControllerConvertPullIntoDescriptor(desc);
+ readableByteStreamControllerHandleQueueDrain(controller);
+ readIntoRequest[kChunk](filledView);
+ return;
+ }
+ if (closeRequested) {
+ const error = new ERR_INVALID_STATE('ReadableStream closed');
+ readableByteStreamControllerError(controller, error);
+ readIntoRequest[kError](error);
+ return;
+ }
+ }
+ ArrayPrototypePush(pendingPullIntos, desc);
+ readableStreamAddReadIntoRequest(stream, readIntoRequest);
+ readableByteStreamControllerCallPullIfNeeded(controller);
+}
+
+function readableByteStreamControllerRespondInternal(controller, bytesWritten) {
+ const {
+ stream,
+ pendingPullIntos,
+ } = controller[kState];
+ const desc = pendingPullIntos[0];
+ // TODO(@jasnell): Assert can transfer array buffer(desc.buffer)
+ readableByteStreamControllerInvalidateBYOBRequest(controller);
+
+ if (stream[kState].state === 'closed') {
+ assert(bytesWritten === 0);
+ readableByteStreamControllerRespondInClosedState(controller, desc);
+ } else {
+ assert(stream[kState].state === 'readable');
+ assert(bytesWritten > 0);
+ readableByteStreamControllerRespondInReadableState(
+ controller,
+ bytesWritten,
+ desc);
+ }
+ readableByteStreamControllerCallPullIfNeeded(controller);
+}
+
+function readableByteStreamControllerRespond(controller, bytesWritten) {
+ const {
+ pendingPullIntos,
+ stream,
+ } = controller[kState];
+ assert(pendingPullIntos.length);
+ const desc = pendingPullIntos[0];
+
+ if (stream[kState].state === 'closed') {
+ if (bytesWritten !== 0)
+ throw new ERR_INVALID_ARG_VALUE('bytesWritten', bytesWritten);
+ } else {
+ assert(stream[kState].state === 'readable');
+
+ if (bytesWritten === 0)
+ throw new ERR_INVALID_ARG_VALUE('bytesWritten', bytesWritten);
+
+ if (desc.bytesFilled + bytesWritten > desc.byteLength)
+ throw new ERR_INVALID_ARG_VALUE.RangeError('bytesWritten', bytesWritten);
+ }
+
+ desc.buffer = transferArrayBuffer(desc.buffer);
+
+ readableByteStreamControllerRespondInternal(controller, bytesWritten);
+}
+
+function readableByteStreamControllerRespondInClosedState(controller, desc) {
+ assert(desc.bytesFilled === 0);
+ const {
+ stream,
+ } = controller[kState];
+ if (readableStreamHasBYOBReader(stream)) {
+ while (readableStreamGetNumReadIntoRequests(stream) > 0) {
+ readableByteStreamControllerCommitPullIntoDescriptor(
+ stream,
+ readableByteStreamControllerShiftPendingPullInto(controller));
+ }
+ }
+}
+
+function readableByteStreamControllerFillHeadPullIntoDescriptor(
+ controller,
+ size,
+ desc) {
+ const {
+ pendingPullIntos,
+ byobRequest,
+ } = controller[kState];
+ assert(!pendingPullIntos.length || pendingPullIntos[0] === desc);
+ assert(byobRequest === null);
+ desc.bytesFilled += size;
+}
+
+function readableByteStreamControllerEnqueue(controller, chunk) {
+ const {
+ closeRequested,
+ pendingPullIntos,
+ queue,
+ stream,
+ } = controller[kState];
+
+ if (closeRequested || stream[kState].state !== 'readable')
+ return;
+
+ const {
+ buffer,
+ byteOffset,
+ byteLength,
+ } = chunk;
+
+ // Supposed to assert here that the buffer is not
+ // detached, but there's no API available to use to check that.
+
+ const transferedBuffer = transferArrayBuffer(buffer);
+
+ if (pendingPullIntos.length) {
+ const firstPendingPullInto = pendingPullIntos[0];
+ // Supposed to assert here that the firstPendingPullInto's buffer is not
+ // detached, but there's no API available to use to check that.
+ firstPendingPullInto.buffer =
+ transferArrayBuffer(firstPendingPullInto.buffer);
+ }
+
+ readableByteStreamControllerInvalidateBYOBRequest(controller);
+
+ if (readableStreamHasDefaultReader(stream)) {
+ if (readableStreamGetNumReadRequests(stream)) {
+ readableByteStreamControllerEnqueueChunkToQueue(
+ controller,
+ transferedBuffer,
+ byteOffset,
+ byteLength);
+ } else {
+ assert(!queue.length);
+ const transferedView =
+ new Uint8Array(transferedBuffer, byteOffset, byteLength);
+ readableStreamFulfillReadRequest(stream, transferedView, false);
+ }
+ } else if (readableStreamHasBYOBReader(stream)) {
+ readableByteStreamControllerEnqueueChunkToQueue(
+ controller,
+ transferedBuffer,
+ byteOffset,
+ byteLength);
+ readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
+ controller);
+ } else {
+ assert(!isReadableStreamLocked(stream));
+ readableByteStreamControllerEnqueueChunkToQueue(
+ controller,
+ transferedBuffer,
+ byteOffset,
+ byteLength);
+ }
+ readableByteStreamControllerCallPullIfNeeded(controller);
+}
+
+function readableByteStreamControllerEnqueueChunkToQueue(
+ controller,
+ buffer,
+ byteOffset,
+ byteLength) {
+ ArrayPrototypePush(
+ controller[kState].queue,
+ {
+ buffer,
+ byteOffset,
+ byteLength,
+ });
+ controller[kState].queueTotalSize += byteLength;
+}
+
+function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
+ controller,
+ desc) {
+ const {
+ buffer,
+ byteLength,
+ byteOffset,
+ bytesFilled,
+ elementSize,
+ } = desc;
+ const currentAlignedBytes = bytesFilled - (bytesFilled % elementSize);
+ const maxBytesToCopy = MathMin(
+ controller[kState].queueTotalSize,
+ byteLength - bytesFilled);
+ const maxBytesFilled = bytesFilled + maxBytesToCopy;
+ const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
+ let totalBytesToCopyRemaining = maxBytesToCopy;
+ let ready = false;
+ if (maxAlignedBytes > currentAlignedBytes) {
+ totalBytesToCopyRemaining = maxAlignedBytes - bytesFilled;
+ ready = true;
+ }
+ const {
+ queue,
+ } = controller[kState];
+
+ while (totalBytesToCopyRemaining) {
+ const headOfQueue = queue[0];
+ const bytesToCopy = MathMin(
+ totalBytesToCopyRemaining,
+ headOfQueue.byteLength);
+ const destStart = byteOffset + desc.bytesFilled;
+ copyArrayBuffer(
+ buffer,
+ destStart,
+ headOfQueue.buffer,
+ headOfQueue.byteOffset,
+ bytesToCopy);
+ if (headOfQueue.byteLength === bytesToCopy) {
+ ArrayPrototypeShift(queue);
+ } else {
+ headOfQueue.byteOffset += bytesToCopy;
+ headOfQueue.byteLength -= bytesToCopy;
+ }
+ controller[kState].queueTotalSize -= bytesToCopy;
+ readableByteStreamControllerFillHeadPullIntoDescriptor(
+ controller,
+ bytesToCopy,
+ desc);
+ totalBytesToCopyRemaining -= bytesToCopy;
+ }
+
+ if (!ready) {
+ assert(controller[kState].queueTotalSize === 0);
+ assert(desc.bytesFilled > 0);
+ assert(desc.bytesFilled < elementSize);
+ }
+ return ready;
+}
+
+function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
+ controller) {
+ const {
+ closeRequested,
+ pendingPullIntos,
+ stream,
+ } = controller[kState];
+ assert(!closeRequested);
+ while (pendingPullIntos.length) {
+ if (controller[kState].queueTotalSize === 0)
+ return;
+ const desc = pendingPullIntos[0];
+ if (readableByteStreamControllerFillPullIntoDescriptorFromQueue(
+ controller,
+ desc)) {
+ readableByteStreamControllerShiftPendingPullInto(controller);
+ readableByteStreamControllerCommitPullIntoDescriptor(stream, desc);
+ }
+ }
+}
+
+function readableByteStreamControllerRespondInReadableState(
+ controller,
+ bytesWritten,
+ desc) {
+ const {
+ buffer,
+ bytesFilled,
+ byteLength,
+ } = desc;
+ assert(bytesFilled + bytesWritten <= byteLength);
+ readableByteStreamControllerFillHeadPullIntoDescriptor(
+ controller,
+ bytesWritten,
+ desc);
+
+ if (desc.bytesFilled < desc.elementSize)
+ return;
+
+ readableByteStreamControllerShiftPendingPullInto(controller);
+
+ const remainderSize = desc.bytesFilled % desc.elementSize;
+
+ if (remainderSize) {
+ const end = desc.byteOffset + desc.bytesFilled;
+ const remainder = buffer.slice(end - remainderSize, remainderSize);
+ readableByteStreamControllerEnqueueChunkToQueue(
+ controller,
+ remainder,
+ 0,
+ remainder.byteLength);
+ }
+ desc.bytesFilled -= remainderSize;
+ readableByteStreamControllerCommitPullIntoDescriptor(
+ controller[kState].stream,
+ desc);
+ readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);
+}
+
+function readableByteStreamControllerRespondWithNewView(controller, view) {
+ const {
+ stream,
+ pendingPullIntos,
+ } = controller[kState];
+ assert(pendingPullIntos.length);
+ // Supposed to assert here that the view's buffer is not
+ // detached, but there's no API available to use to check that.
+ const desc = pendingPullIntos[0];
+ assert(stream[kState].state !== 'errored');
+
+ if (!isArrayBufferView(view)) {
+ throw new ERR_INVALID_ARG_TYPE(
+ 'view',
+ [
+ 'Buffer',
+ 'TypedArray',
+ 'DataView',
+ ],
+ view);
+ }
+ if (view.byteLength === 0 || view.buffer.byteLength === 0) {
+ throw new ERR_INVALID_ARG_VALUE(
+ 'view',
+ view,
+ 'cannot be zero length');
+ }
+
+ const {
+ buffer,
+ byteOffset,
+ byteLength,
+ bytesFilled,
+ } = desc;
+
+ if (byteOffset + bytesFilled !== view.byteOffset)
+ throw new ERR_INVALID_ARG_VALUE.RangeError('view', view);
+
+ if (buffer.byteLength !== view.buffer.byteLength)
+ throw new ERR_INVALID_ARG_VALUE.RangeError('view', view);
+
+ if (bytesFilled + view.byteLength > byteLength)
+ throw new ERR_INVALID_ARG_VALUE.RangeError('view', view);
+
+ desc.buffer = transferArrayBuffer(view.buffer);
+
+ readableByteStreamControllerRespondInternal(controller, view.byteLength);
+}
+
+function readableByteStreamControllerShiftPendingPullInto(controller) {
+ assert(controller[kState].byobRequest === null);
+ return ArrayPrototypeShift(controller[kState].pendingPullIntos);
+}
+
+function readableByteStreamControllerCallPullIfNeeded(controller) {
+ if (!readableByteStreamControllerShouldCallPull(controller))
+ return;
+ if (controller[kState].pulling) {
+ controller[kState].pullAgain = true;
+ return;
+ }
+ assert(!controller[kState].pullAgain);
+ controller[kState].pulling = true;
+ PromisePrototypeThen(
+ (async () => controller[kState].pullAlgorithm())(),
+ () => {
+ controller[kState].pulling = false;
+ if (controller[kState].pullAgain) {
+ controller[kState].pullAgain = false;
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ }
+ },
+ (error) => readableByteStreamControllerError(controller, error));
+}
+
+function readableByteStreamControllerError(controller, error) {
+ const {
+ stream,
+ } = controller[kState];
+ if (stream[kState].state !== 'readable')
+ return;
+ readableByteStreamControllerClearPendingPullIntos(controller);
+ resetQueue(controller);
+ readableByteStreamControllerClearAlgorithms(controller);
+ readableStreamError(stream, error);
+}
+
+function readableByteStreamControllerCancelSteps(controller, reason) {
+ readableByteStreamControllerClearPendingPullIntos(controller);
+ resetQueue(controller);
+ const result = controller[kState].cancelAlgorithm(reason);
+ readableByteStreamControllerClearAlgorithms(controller);
+ return result;
+}
+
+function readableByteStreamControllerPullSteps(controller, readRequest) {
+ const {
+ pendingPullIntos,
+ queue,
+ queueTotalSize,
+ stream,
+ } = controller[kState];
+ assert(readableStreamHasDefaultReader(stream));
+ if (queueTotalSize) {
+ assert(!readableStreamGetNumReadRequests(stream));
+ const {
+ buffer,
+ byteOffset,
+ byteLength,
+ } = ArrayPrototypeShift(queue);
+ controller[kState].queueTotalSize -= byteLength;
+ readableByteStreamControllerHandleQueueDrain(controller);
+ const view = new Uint8Array(buffer, byteOffset, byteLength);
+ readRequest[kChunk](view);
+ return;
+ }
+ const {
+ autoAllocateChunkSize,
+ } = controller[kState];
+ if (autoAllocateChunkSize !== undefined) {
+ try {
+ const buffer = new ArrayBuffer(autoAllocateChunkSize);
+ ArrayPrototypePush(
+ pendingPullIntos,
+ {
+ buffer,
+ byteOffset: 0,
+ byteLength: autoAllocateChunkSize,
+ bytesFilled: 0,
+ elementSize: 1,
+ ctor: Uint8Array,
+ type: 'default',
+ });
+ } catch (error) {
+ readRequest[kError](error);
+ return;
+ }
+ }
+ readableStreamAddReadRequest(stream, readRequest);
+ readableByteStreamControllerCallPullIfNeeded(controller);
+}
+
+function setupReadableByteStreamController(
+ stream,
+ controller,
+ startAlgorithm,
+ pullAlgorithm,
+ cancelAlgorithm,
+ highWaterMark,
+ autoAllocateChunkSize) {
+ assert(stream[kState].controller === undefined);
+ if (autoAllocateChunkSize !== undefined) {
+ assert(NumberIsInteger(autoAllocateChunkSize));
+ assert(autoAllocateChunkSize > 0);
+ }
+ controller[kState] = {
+ byobRequest: null,
+ closeRequested: false,
+ pullAgain: false,
+ pulling: false,
+ started: false,
+ stream,
+ queue: [],
+ queueTotalSize: 0,
+ highWaterMark,
+ pullAlgorithm,
+ cancelAlgorithm,
+ autoAllocateChunkSize,
+ pendingPullIntos: [],
+ };
+ stream[kState].controller = controller;
+
+ PromisePrototypeThen(
+ (async () => startAlgorithm())(),
+ () => {
+ controller[kState].started = true;
+ assert(!controller[kState].pulling);
+ assert(!controller[kState].pullAgain);
+ readableByteStreamControllerCallPullIfNeeded(controller);
+ },
+ (error) => readableByteStreamControllerError(controller, error));
+}
+
+function setupReadableByteStreamControllerFromSource(
+ stream,
+ source,
+ highWaterMark) {
+ const controller = new InternalReadableByteStreamController();
+ const start = source?.start;
+ const pull = source?.pull;
+ const cancel = source?.cancel;
+ const autoAllocateChunkSize = source?.autoAllocateChunkSize;
+ const startAlgorithm = start ?
+ FunctionPrototypeBind(start, source, controller) :
+ nonOpStart;
+ const pullAlgorithm = pull ?
+ FunctionPrototypeBind(pull, source, controller) :
+ nonOpPull;
+ const cancelAlgorithm = cancel ?
+ FunctionPrototypeBind(cancel, source) :
+ nonOpCancel;
+ if (autoAllocateChunkSize === 0) {
+ throw new ERR_INVALID_ARG_VALUE(
+ 'source.autoAllocateChunkSize',
+ autoAllocateChunkSize);
+ }
+ setupReadableByteStreamController(
+ stream,
+ controller,
+ startAlgorithm,
+ pullAlgorithm,
+ cancelAlgorithm,
+ highWaterMark,
+ autoAllocateChunkSize);
+}
+
+function nonOpStart() {}
+
+async function nonOpPull() {}
+
+async function nonOpCancel() {}
+
+async function nonOpWrite() {}
+
+// ---- WritableStream Implementation
+
+function isPromisePending(promise) {
+ if (promise === undefined) return false;
+ const { 0: state } = getPromiseDetails(promise);
+ return state === kPending;
+}
+
+function isWritableStreamLocked(stream) {
+ return stream[kState].writer !== undefined;
+}
+
+function setupWritableStreamDefaultWriter(writer, stream) {
+ if (isWritableStreamLocked(stream))
+ throw new ERR_INVALID_STATE.TypeError('WritableStream is locked');
+ writer[kState].stream = stream;
+ stream[kState].writer = writer;
+ switch (stream[kState].state) {
+ case 'writable':
+ if (!writableStreamCloseQueuedOrInFlight(stream) &&
+ stream[kState].backpressure) {
+ writer[kState].ready = createDeferredPromise();
+ } else {
+ writer[kState].ready = {
+ promise: PromiseResolve(),
+ resolve: undefined,
+ reject: undefined,
+ };
+ }
+ setClosedPromiseToNewPromise();
+ break;
+ case 'erroring':
+ writer[kState].ready = {
+ promise: PromiseReject(stream[kState].storedError),
+ resolve: undefined,
+ reject: undefined,
+ };
+ setPromiseHandled(writer[kState].ready.promise);
+ setClosedPromiseToNewPromise();
+ break;
+ case 'closed':
+ writer[kState].ready = {
+ promise: PromiseResolve(),
+ resolve: undefined,
+ reject: undefined,
+ };
+ writer[kState].close = {
+ promise: PromiseResolve(),
+ resolve: undefined,
+ reject: undefined,
+ };
+ break;
+ default:
+ writer[kState].ready = {
+ promise: PromiseReject(stream[kState].storedError),
+ resolve: undefined,
+ reject: undefined,
+ };
+ writer[kState].close = {
+ promise: PromiseReject(stream[kState].storedError),
+ resolve: undefined,
+ reject: undefined,
+ };
+ setPromiseHandled(writer[kState].ready.promise);
+ setPromiseHandled(writer[kState].close.promise);
+ }
+
+ function setClosedPromiseToNewPromise() {
+ writer[kState].close = createDeferredPromise();
+ }
+}
+
+function writableStreamAbort(stream, reason) {
+ const {
+ state,
+ } = stream[kState];
+ if (state === 'closed' || state === 'errored')
+ return PromiseResolve();
+
+ if (stream[kState].pendingAbortRequest.abort.promise !== undefined)
+ return stream[kState].pendingAbortRequest.abort.promise;
+
+ assert(state === 'writable' || state === 'erroring');
+
+ let wasAlreadyErroring = false;
+ if (state === 'erroring') {
+ wasAlreadyErroring = true;
+ reason = undefined;
+ }
+
+ stream[kState].pendingAbortRequest = {
+ abort: createDeferredPromise(),
+ reason,
+ wasAlreadyErroring,
+ };
+
+ if (!wasAlreadyErroring)
+ writableStreamStartErroring(stream, reason);
+
+ return stream[kState].pendingAbortRequest.abort.promise;
+}
+
+function writableStreamClose(stream) {
+ const {
+ state,
+ writer,
+ backpressure,
+ controller,
+ } = stream[kState];
+ if (state === 'closed' || state === 'errored')
+ return PromiseReject(new ERR_INVALID_STATE('WritableStream is closed'));
+ assert(state === 'writable' || state === 'erroring');
+ assert(!writableStreamCloseQueuedOrInFlight(stream));
+ stream[kState].closeRequest = createDeferredPromise();
+ const { promise } = stream[kState].closeRequest;
+ if (writer !== undefined && backpressure && state === 'writable')
+ writer[kState].ready.resolve?.();
+ writableStreamDefaultControllerClose(controller);
+ return promise;
+}
+
+function writableStreamUpdateBackpressure(stream, backpressure) {
+ assert(stream[kState].state === 'writable');
+ assert(!writableStreamCloseQueuedOrInFlight(stream));
+ const {
+ writer,
+ } = stream[kState];
+ if (writer !== undefined && stream[kState].backpressure !== backpressure) {
+ if (backpressure) {
+ writer[kState].ready = createDeferredPromise();
+ } else {
+ writer[kState].ready.resolve?.();
+ }
+ }
+ stream[kState].backpressure = backpressure;
+}
+
+function writableStreamStartErroring(stream, reason) {
+ assert(stream[kState].storedError === undefined);
+ assert(stream[kState].state === 'writable');
+ const {
+ controller,
+ writer,
+ } = stream[kState];
+ assert(controller !== undefined);
+ stream[kState].state = 'erroring';
+ stream[kState].storedError = reason;
+ if (writer !== undefined) {
+ writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason);
+ }
+ if (!writableStreamHasOperationMarkedInFlight(stream) &&
+ controller[kState].started) {
+ writableStreamFinishErroring(stream);
+ }
+}
+
+function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) {
+ assert(stream[kState].state === 'errored');
+ if (stream[kState].closeRequest.promise !== undefined) {
+ assert(stream[kState].inFlightCloseRequest.promise === undefined);
+ stream[kState].closeRequest.reject?.(stream[kState].storedError);
+ stream[kState].closeRequest = {
+ promise: undefined,
+ reject: undefined,
+ resolve: undefined,
+ };
+ }
+ const {
+ writer,
+ } = stream[kState];
+ if (writer !== undefined) {
+ writer[kState].close.reject?.(stream[kState].storedError);
+ setPromiseHandled(writer[kState].close.promise);
+ }
+}
+
+function writableStreamMarkFirstWriteRequestInFlight(stream) {
+ assert(stream[kState].inFlightWriteRequest.promise === undefined);
+ assert(stream[kState].writeRequests.length);
+ const writeRequest = ArrayPrototypeShift(stream[kState].writeRequests);
+ stream[kState].inFlightWriteRequest = writeRequest;
+}
+
+function writableStreamMarkCloseRequestInFlight(stream) {
+ assert(stream[kState].inFlightWriteRequest.promise === undefined);
+ assert(stream[kState].closeRequest.promise !== undefined);
+ stream[kState].inFlightCloseRequest = stream[kState].closeRequest;
+ stream[kState].closeRequest = {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ };
+}
+
+function writableStreamHasOperationMarkedInFlight(stream) {
+ const {
+ inFlightWriteRequest,
+ inFlightCloseRequest,
+ } = stream[kState];
+ if (inFlightWriteRequest.promise === undefined &&
+ inFlightCloseRequest.promise === undefined) {
+ return false;
+ }
+ return true;
+}
+
+function writableStreamFinishInFlightWriteWithError(stream, error) {
+ assert(stream[kState].inFlightWriteRequest.promise !== undefined);
+ stream[kState].inFlightWriteRequest.reject?.(error);
+ stream[kState].inFlightWriteRequest = {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ };
+ assert(stream[kState].state === 'writable' ||
+ stream[kState].state === 'erroring');
+ writableStreamDealWithRejection(stream, error);
+}
+
+function writableStreamFinishInFlightWrite(stream) {
+ assert(stream[kState].inFlightWriteRequest.promise !== undefined);
+ stream[kState].inFlightWriteRequest.resolve?.();
+ stream[kState].inFlightWriteRequest = {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ };
+}
+
+function writableStreamFinishInFlightCloseWithError(stream, error) {
+ assert(stream[kState].inFlightCloseRequest.promise !== undefined);
+ stream[kState].inFlightCloseRequest.reject?.(error);
+ stream[kState].inFlightCloseRequest = {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ };
+ assert(stream[kState].state === 'writable' ||
+ stream[kState].state === 'erroring');
+ if (stream[kState].pendingAbortRequest.abort.promise !== undefined) {
+ stream[kState].pendingAbortRequest.abort.reject?.(error);
+ stream[kState].pendingAbortRequest = {
+ abort: {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ },
+ reason: undefined,
+ wasAlreadyErroring: false,
+ };
+ }
+ writableStreamDealWithRejection(stream, error);
+}
+
+function writableStreamFinishInFlightClose(stream) {
+ assert(stream[kState].inFlightCloseRequest.promise !== undefined);
+ stream[kState].inFlightCloseRequest.resolve?.();
+ stream[kState].inFlightCloseRequest = {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ };
+ if (stream[kState].state === 'erroring') {
+ stream[kState].storedError = undefined;
+ if (stream[kState].pendingAbortRequest.abort.promise !== undefined) {
+ stream[kState].pendingAbortRequest.abort.resolve?.();
+ stream[kState].pendingAbortRequest = {
+ abort: {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ },
+ reason: undefined,
+ wasAlreadyErroring: false,
+ };
+ }
+ }
+ stream[kState].state = 'closed';
+ if (stream[kState].writer !== undefined)
+ stream[kState].writer[kState].close.resolve?.();
+ assert(stream[kState].pendingAbortRequest.abort.promise === undefined);
+ assert(stream[kState].storedError === undefined);
+}
+
+function writableStreamFinishErroring(stream) {
+ assert(stream[kState].state === 'erroring');
+ assert(!writableStreamHasOperationMarkedInFlight(stream));
+ stream[kState].state = 'errored';
+ stream[kState].controller[kError]();
+ const storedError = stream[kState].storedError;
+ for (let n = 0; n < stream[kState].writeRequests.length; n++)
+ stream[kState].writeRequests[n].reject?.(storedError);
+ stream[kState].writeRequests = [];
+
+ if (stream[kState].pendingAbortRequest.abort.promise === undefined) {
+ writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ return;
+ }
+
+ const abortRequest = stream[kState].pendingAbortRequest;
+ stream[kState].pendingAbortRequest = {
+ abort: {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ },
+ reason: undefined,
+ wasAlreadyErroring: false,
+ };
+ if (abortRequest.wasAlreadyErroring) {
+ abortRequest.abort.reject?.(storedError);
+ writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ return;
+ }
+ PromisePrototypeThen(
+ PromiseResolve(
+ (async () => stream[kState].controller[kAbort](abortRequest.reason))()),
+ () => {
+ abortRequest.abort.resolve?.();
+ writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ },
+ (error) => {
+ abortRequest.abort.reject?.(error);
+ writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
+ });
+}
+
+function writableStreamDealWithRejection(stream, error) {
+ const {
+ state,
+ } = stream[kState];
+ if (state === 'writable') {
+ writableStreamStartErroring(stream, error);
+ return;
+ }
+
+ assert(state === 'erroring');
+ writableStreamFinishErroring(stream);
+}
+
+function writableStreamCloseQueuedOrInFlight(stream) {
+ if (stream[kState].closeRequest.promise === undefined &&
+ stream[kState].inFlightCloseRequest.promise === undefined) {
+ return false;
+ }
+ return true;
+}
+
+function writableStreamAddWriteRequest(stream) {
+ assert(isWritableStreamLocked(stream));
+ assert(stream[kState].state === 'writable');
+ const {
+ promise,
+ resolve,
+ reject,
+ } = createDeferredPromise();
+ ArrayPrototypePush(
+ stream[kState].writeRequests,
+ {
+ promise,
+ resolve,
+ reject,
+ });
+ return promise;
+}
+
+function writableStreamDefaultWriterWrite(writer, chunk) {
+ const {
+ stream,
+ } = writer[kState];
+ assert(stream !== undefined);
+ const {
+ controller,
+ } = stream[kState];
+ const chunkSize = writableStreamDefaultControllerGetChunkSize(
+ controller,
+ chunk);
+ if (stream !== writer[kState].stream) {
+ return PromiseReject(
+ new ERR_INVALID_STATE.TypeError('Mismatched WritableStreams'));
+ }
+ const {
+ state,
+ } = stream[kState];
+
+ if (state === 'errored')
+ return PromiseReject(stream[kState].storedError);
+
+ if (writableStreamCloseQueuedOrInFlight(stream) || state === 'closed') {
+ return PromiseReject(
+ new ERR_INVALID_STATE.TypeError('WritableStream is closed'));
+ }
+
+ if (state === 'erroring')
+ return PromiseReject(stream[kState].storedError);
+
+ assert(state === 'writable');
+
+ const promise = writableStreamAddWriteRequest(stream);
+ writableStreamDefaultControllerWrite(controller, chunk, chunkSize);
+ return promise;
+}
+
+function writableStreamDefaultWriterRelease(writer) {
+ const {
+ stream,
+ } = writer[kState];
+ assert(stream !== undefined);
+ assert(stream[kState].writer === writer);
+ const releasedError =
+ new ERR_INVALID_STATE.TypeError('Writer has been released');
+ writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError);
+ writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError);
+ stream[kState].writer = undefined;
+ writer[kState].stream = undefined;
+}
+
+function writableStreamDefaultWriterGetDesiredSize(writer) {
+ const {
+ stream,
+ } = writer[kState];
+ switch (stream[kState].state) {
+ case 'erroror':
+ // Fall through
+ case 'erroring':
+ return null;
+ case 'closed':
+ return 0;
+ }
+ return writableStreamDefaultControllerGetDesiredSize(
+ stream[kState].controller);
+}
+
+function writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error) {
+ if (isPromisePending(writer[kState].ready.promise)) {
+ writer[kState].ready.reject?.(error);
+ } else {
+ writer[kState].ready = {
+ promise: PromiseReject(error),
+ resolve: undefined,
+ reject: undefined,
+ };
+ }
+ setPromiseHandled(writer[kState].ready.promise);
+}
+
+function writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, error) {
+ if (isPromisePending(writer[kState].close.promise)) {
+ writer[kState].close.reject?.(error);
+ } else {
+ writer[kState].close = {
+ promise: PromiseReject(error),
+ resolve: undefined,
+ reject: undefined,
+ };
+ }
+ setPromiseHandled(writer[kState].close.promise);
+}
+
+function writableStreamDefaultWriterCloseWithErrorPropagation(writer) {
+ const {
+ stream,
+ } = writer[kState];
+ assert(stream !== undefined);
+ const {
+ state,
+ } = stream[kState];
+ if (writableStreamCloseQueuedOrInFlight(stream) || state === 'closed')
+ return PromiseResolve();
+
+ if (state === 'errored')
+ return PromiseReject(stream[kState].storedError);
+
+ assert(state === 'writable' || state === 'erroring');
+
+ return writableStreamDefaultWriterClose(writer);
+}
+
+function writableStreamDefaultWriterClose(writer) {
+ const {
+ stream,
+ } = writer[kState];
+ assert(stream !== undefined);
+ return writableStreamClose(stream);
+}
+
+function writableStreamDefaultWriterAbort(writer, reason) {
+ const {
+ stream,
+ } = writer[kState];
+ assert(stream !== undefined);
+ return writableStreamAbort(stream, reason);
+}
+
+function writableStreamDefaultControllerWrite(controller, chunk, chunkSize) {
+ try {
+ enqueueValueWithSize(controller, chunk, chunkSize);
+ } catch (error) {
+ writableStreamDefaultControllerErrorIfNeeded(controller, error);
+ return;
+ }
+ const {
+ stream,
+ } = controller[kState];
+ if (!writableStreamCloseQueuedOrInFlight(stream) &&
+ stream[kState].state === 'writable') {
+ writableStreamUpdateBackpressure(
+ stream,
+ writableStreamDefaultControllerGetBackpressure(controller));
+ }
+ writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+}
+
+function writableStreamDefaultControllerProcessWrite(controller, chunk) {
+ const {
+ stream,
+ writeAlgorithm,
+ } = controller[kState];
+ writableStreamMarkFirstWriteRequestInFlight(stream);
+
+ PromisePrototypeThen(
+ (async () => writeAlgorithm(chunk, controller))(),
+ () => {
+ writableStreamFinishInFlightWrite(stream);
+ const {
+ state,
+ } = stream[kState];
+ assert(state === 'writable' || state === 'erroring');
+ dequeueValue(controller);
+ if (!writableStreamCloseQueuedOrInFlight(stream) &&
+ state === 'writable') {
+ writableStreamUpdateBackpressure(
+ stream,
+ writableStreamDefaultControllerGetBackpressure(controller));
+ }
+ writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+ },
+ (error) => {
+ if (stream[kState].state === 'writable')
+ writableStreamDefaultControllerClearAlgorithms(controller);
+ writableStreamFinishInFlightWriteWithError(stream, error);
+ });
+
+}
+
+function writableStreamDefaultControllerProcessClose(controller) {
+ const {
+ closeAlgorithm,
+ queue,
+ stream,
+ } = controller[kState];
+ writableStreamMarkCloseRequestInFlight(stream);
+ dequeueValue(controller);
+ assert(!queue.length);
+ const sinkClosePromise = (async () => closeAlgorithm())();
+ writableStreamDefaultControllerClearAlgorithms(controller);
+ PromisePrototypeThen(
+ PromiseResolve(sinkClosePromise),
+ () => writableStreamFinishInFlightClose(stream),
+ (error) => writableStreamFinishInFlightCloseWithError(stream, error));
+}
+
+function writableStreamDefaultControllerGetDesiredSize(controller) {
+ const {
+ highWaterMark,
+ queueTotalSize,
+ } = controller[kState];
+ return highWaterMark - queueTotalSize;
+}
+
+function writableStreamDefaultControllerGetChunkSize(controller, chunk) {
+ try {
+ return controller[kState].sizeAlgorithm(chunk);
+ } catch (error) {
+ writableStreamDefaultControllerAdvanceQueueIfNeeded(controller, error);
+ return 1;
+ }
+}
+
+function writableStreamDefaultControllerErrorIfNeeded(controller, error) {
+ const {
+ stream,
+ } = controller[kState];
+ if (stream[kState].state === 'writable')
+ writableStreamDefaultControllerError(controller, error);
+}
+
+function writableStreamDefaultControllerError(controller, error) {
+ const {
+ stream,
+ } = controller[kState];
+ assert(stream[kState].state === 'writable');
+ writableStreamDefaultControllerClearAlgorithms(controller);
+ writableStreamStartErroring(stream, error);
+}
+
+function writableStreamDefaultControllerClose(controller) {
+ enqueueValueWithSize(controller, kCloseSentinel, 0);
+ writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+}
+
+function writableStreamDefaultControllerClearAlgorithms(controller) {
+ controller[kState].writeAlgorithm = undefined;
+ controller[kState].closeAlgorithm = undefined;
+ controller[kState].abortAlgorithm = undefined;
+ controller[kState].sizeAlgorithm = undefined;
+}
+
+function writableStreamDefaultControllerGetBackpressure(controller) {
+ return writableStreamDefaultControllerGetDesiredSize(controller) <= 0;
+}
+
+function writableStreamDefaultControllerAdvanceQueueIfNeeded(controller) {
+ const {
+ queue,
+ started,
+ stream,
+ } = controller[kState];
+ if (!started || stream[kState].inFlightWriteRequest.promise !== undefined)
+ return;
+
+ if (stream[kState].state === 'erroring') {
+ writableStreamFinishErroring(stream);
+ return;
+ }
+
+ if (!queue.length)
+ return;
+
+ const value = peekQueueValue(controller);
+ if (value === kCloseSentinel)
+ writableStreamDefaultControllerProcessClose(controller);
+ else
+ writableStreamDefaultControllerProcessWrite(controller, value);
+}
+
+function setupWritableStreamDefaultControllerFromSink(
+ stream,
+ sink,
+ highWaterMark,
+ sizeAlgorithm) {
+ const controller = new InternalWritableStreamDefaultController();
+ const start = sink?.start;
+ const write = sink?.write;
+ const close = sink?.close;
+ const abort = sink?.abort;
+ const startAlgorithm = start ?
+ FunctionPrototypeBind(start, sink, controller) :
+ nonOpStart;
+ const writeAlgorithm = write ?
+ FunctionPrototypeBind(write, sink) :
+ nonOpWrite;
+ const closeAlgorithm = close ?
+ FunctionPrototypeBind(close, sink) : nonOpCancel;
+ const abortAlgorithm = abort ?
+ FunctionPrototypeBind(abort, sink) : nonOpCancel;
+ setupWritableStreamDefaultController(
+ stream,
+ controller,
+ startAlgorithm,
+ writeAlgorithm,
+ closeAlgorithm,
+ abortAlgorithm,
+ highWaterMark,
+ sizeAlgorithm);
+}
+
+function setupWritableStreamDefaultController(
+ stream,
+ controller,
+ startAlgorithm,
+ writeAlgorithm,
+ closeAlgorithm,
+ abortAlgorithm,
+ highWaterMark,
+ sizeAlgorithm) {
+ assert(isWritableStream(stream));
+ assert(stream[kState].controller === undefined);
+ controller[kState] = {
+ abortAlgorithm,
+ closeAlgorithm,
+ highWaterMark,
+ queue: [],
+ queueTotalSize: 0,
+ sizeAlgorithm,
+ started: false,
+ stream,
+ writeAlgorithm,
+ };
+ stream[kState].controller = controller;
+
+ writableStreamUpdateBackpressure(
+ stream,
+ writableStreamDefaultControllerGetBackpressure(controller));
+
+ PromisePrototypeThen(
+ (async () => startAlgorithm())(),
+ () => {
+ assert(stream[kState].state === 'writable' ||
+ stream[kState].state === 'erroring');
+ controller[kState].started = true;
+ writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
+ },
+ (error) => {
+ assert(stream[kState].state === 'writable' ||
+ stream[kState].state === 'erroring');
+ controller[kState].started = true;
+ writableStreamDealWithRejection(stream, error);
+ });
+}
+
+// ---- TransformStream Implementation
+
+async function defaultTransformAlgorithm(chunk, controller) {
+ transformStreamDefaultControllerEnqueue(controller, chunk);
+}
+
+async function nonOpFlush() {}
+
+function initializeTransformStream(
+ stream,
+ startPromise,
+ writableHighWaterMark,
+ writableSizeAlgorithm,
+ readableHighWaterMark,
+ readableSizeAlgorithm) {
+
+ const writable = new WritableStream({
+ start() { return startPromise.promise; },
+ write(chunk) {
+ return transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
+ },
+ abort(reason) {
+ return transformStreamDefaultSinkAbortAlgorithm(stream, reason);
+ },
+ close() {
+ return transformStreamDefaultSinkCloseAlgorithm(stream);
+ },
+ }, {
+ highWaterMark: writableHighWaterMark,
+ size: writableSizeAlgorithm,
+ });
+
+ const readable = new ReadableStream({
+ pull() {
+ return transformStreamDefaultSourcePullAlgorithm(stream);
+ },
+ cancel(reason) {
+ transformStreamErrorWritableAndUnblockWrite(stream, reason);
+ return PromiseResolve();
+ },
+ }, {
+ highWaterMark: readableHighWaterMark,
+ size: readableSizeAlgorithm,
+ });
+
+ stream[kState] = {
+ readable,
+ writable,
+ controller: undefined,
+ backpressure: undefined,
+ backpressureChange: {
+ promise: undefined,
+ resolve: undefined,
+ reject: undefined,
+ }
+ };
+
+ transformStreamSetBackpressure(stream, true);
+}
+
+function transformStreamError(stream, error) {
+ const {
+ readable,
+ } = stream[kState];
+ const {
+ controller: readableController,
+ } = readable[kState];
+ readableStreamDefaultControllerError(readableController, error);
+ transformStreamErrorWritableAndUnblockWrite(stream, error);
+}
+
+function transformStreamErrorWritableAndUnblockWrite(stream, error) {
+ const {
+ controller,
+ } = stream[kState];
+ transformStreamDefaultControllerClearAlgorithms(controller);
+ writableStreamDefaultControllerErrorIfNeeded(controller, error);
+ if (stream[kState].backpressure)
+ transformStreamSetBackpressure(stream, false);
+}
+
+function transformStreamSetBackpressure(stream, backpressure) {
+ assert(stream[kState].backpressure !== backpressure);
+ if (stream[kState].backpressureChange.promise !== undefined)
+ stream[kState].backpressureChange.resolve?.();
+ stream[kState].backpressureChange = createDeferredPromise();
+ stream[kState].backpressure = backpressure;
+}
+
+function setupTransformStreamDefaultController(
+ stream,
+ controller,
+ transformAlgorithm,
+ flushAlgorithm) {
+ assert(isTransformStream(stream));
+ assert(stream[kState].controller === undefined);
+ controller[kState] = {
+ stream,
+ transformAlgorithm,
+ flushAlgorithm,
+ };
+ stream[kState].controller = controller;
+}
+
+function setupTransformStreamDefaultControllerFromTransformer(
+ stream,
+ transformer) {
+ const controller = new InternalTransformStreamDefaultController();
+ const transform = transformer?.transform || defaultTransformAlgorithm;
+ const flush = transformer?.flush || nonOpFlush;
+ const transformAlgorithm =
+ FunctionPrototypeBind(transform, transformer);
+ const flushAlgorithm =
+ FunctionPrototypeBind(flush, transformer);
+
+ setupTransformStreamDefaultController(
+ stream,
+ controller,
+ transformAlgorithm,
+ flushAlgorithm);
+}
+
+function transformStreamDefaultControllerClearAlgorithms(controller) {
+ controller[kState].transformAlgorithm = undefined;
+ controller[kState].flushAlgorithm = undefined;
+}
+
+function transformStreamDefaultControllerEnqueue(controller, chunk) {
+ const {
+ stream,
+ } = controller[kState];
+ const {
+ readable,
+ } = stream[kState];
+ const {
+ controller: readableController,
+ } = readable[kState];
+ if (!readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
+ throw new ERR_INVALID_STATE.TypeError('Unable to enqueue');
+ try {
+ readableStreamDefaultControllerEnqueue(readableController, chunk);
+ } catch (error) {
+ transformStreamErrorWritableAndUnblockWrite(stream, error);
+ throw readable[kState].storedError;
+ }
+ const backpressure =
+ readableStreamDefaultControllerHasBackpressure(readableController);
+ if (backpressure !== stream[kState].backpressure) {
+ assert(backpressure);
+ transformStreamSetBackpressure(stream, true);
+ }
+}
+
+function transformStreamDefaultControllerError(controller, error) {
+ transformStreamError(controller[kState].stream, error);
+}
+
+function transformStreamDefaultControllerPerformTransform(controller, chunk) {
+ const transformPromise =
+ PromiseResolve(
+ (async () => controller[kState].transformAlgorithm(chunk, controller))());
+ return PromisePrototypeCatch(transformPromise, (error) => {
+ transformStreamError(controller[kState].stream, error);
+ throw error;
+ });
+}
+
+function transformStreamDefaultControllerTerminate(controller) {
+ const {
+ stream,
+ readable,
+ } = controller[kState];
+ assert(readable !== undefined);
+ const {
+ controller: readableController,
+ } = readable[kState];
+ readableStreamDefaultControllerClose(readableController);
+ transformStreamErrorWritableAndUnblockWrite(
+ stream,
+ new ERR_INVALID_STATE('TransformStream has been terminated'));
+}
+
+function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
+ const {
+ writable,
+ controller,
+ } = stream[kState];
+ assert(writable[kState].state === 'writable');
+ if (stream[kState].backpressure) {
+ const backpressureChange = stream[kState].backpressureChange.promise;
+ return PromisePrototypeThen(backpressureChange, () => {
+ const {
+ writable,
+ } = stream[kState].writable;
+ if (writable[kState].state === 'erroring')
+ throw writable[kState].storedError;
+ assert(writable[kState].state === 'writable');
+ return transformStreamDefaultControllerPerformTransform(
+ controller,
+ chunk);
+ });
+ }
+ transformStreamDefaultControllerPerformTransform(controller, chunk);
+}
+
+function transformStreamDefaultSinkAbortAlgorithm(stream, reason) {
+ transformStreamError(stream, reason);
+ return PromiseResolve();
+}
+
+function transformStreamDefaultSinkCloseAlgorithm(stream) {
+ const {
+ readable,
+ controller,
+ } = stream[kState];
+
+ const flushPromise =
+ (async () => controller[kState].flushAlgorithm(controller))();
+ transformStreamDefaultControllerClearAlgorithms(controller);
+ return PromisePrototypeThen(
+ flushPromise,
+ () => {
+ if (readable[kState].state === 'errored')
+ throw readable[kState].storedError;
+ readableStreamDefaultControllerClose(readable[kState].controller);
+ },
+ (error) => {
+ transformStreamError(stream, error);
+ throw readable[kState].storedError;
+ });
+}
+
+function transformStreamDefaultSourcePullAlgorithm(stream) {
+ assert(stream[kState].backpressure);
+ assert(stream[kState].backpressureChange.promise !== undefined);
+ transformStreamSetBackpressure(stream, false);
+ return stream[kState].backpressureChange.promise;
+}
+
+module.exports = {
+ kState, // Exported for testing purposes only
+ ReadableStream,
+ ReadableStreamDefaultReader,
+ ReadableStreamBYOBReader,
+ ReadableStreamBYOBRequest,
+ ReadableByteStreamController,
+ ReadableStreamDefaultController,
+ TransformStream,
+ TransformStreamDefaultController,
+ WritableStream,
+ WritableStreamDefaultWriter,
+ WritableStreamDefaultController,
+ ByteLengthQueuingStrategy,
+ CountQueuingStrategy,
+ TransferedReadableStream,
+ TransferedWritableStream,
+ TransferedTransformStream,
+};
+
+/* eslint-enable no-use-before-define */
diff --git a/lib/stream/web.js b/lib/stream/web.js
new file mode 100644
index 00000000000000..9efbbfe9f03b6e
--- /dev/null
+++ b/lib/stream/web.js
@@ -0,0 +1,33 @@
+'use strict';
+
+const {
+ ReadableStream,
+ ReadableStreamDefaultReader,
+ ReadableStreamBYOBReader,
+ ReadableStreamBYOBRequest,
+ ReadableByteStreamController,
+ ReadableStreamDefaultController,
+ TransformStream,
+ TransformStreamDefaultController,
+ WritableStream,
+ WritableStreamDefaultWriter,
+ WritableStreamDefaultController,
+ ByteLengthQueuingStrategy,
+ CountQueuingStrategy,
+} = require('internal/streams/whatwg');
+
+module.exports = {
+ ReadableStream,
+ ReadableStreamDefaultReader,
+ ReadableStreamBYOBReader,
+ ReadableStreamBYOBRequest,
+ ReadableByteStreamController,
+ ReadableStreamDefaultController,
+ TransformStream,
+ TransformStreamDefaultController,
+ WritableStream,
+ WritableStreamDefaultWriter,
+ WritableStreamDefaultController,
+ ByteLengthQueuingStrategy,
+ CountQueuingStrategy,
+};
diff --git a/node.gyp b/node.gyp
index 3f674c978d2788..82caf5b8dc90a2 100644
--- a/node.gyp
+++ b/node.gyp
@@ -80,6 +80,7 @@
'lib/repl.js',
'lib/stream.js',
'lib/stream/promises.js',
+ 'lib/stream/web.js',
'lib/_stream_readable.js',
'lib/_stream_writable.js',
'lib/_stream_duplex.js',
@@ -272,6 +273,7 @@
'lib/internal/streams/pipeline.js',
'lib/internal/streams/end-of-stream.js',
'lib/internal/streams/utils.js',
+ 'lib/internal/streams/whatwg.js',
'deps/v8/tools/splaytree.mjs',
'deps/v8/tools/codemap.mjs',
'deps/v8/tools/consarray.mjs',
diff --git a/src/node_buffer.cc b/src/node_buffer.cc
index e816ba131644ad..29c43ee2a81b12 100644
--- a/src/node_buffer.cc
+++ b/src/node_buffer.cc
@@ -67,6 +67,7 @@ using v8::MaybeLocal;
using v8::Nothing;
using v8::Number;
using v8::Object;
+using v8::SharedArrayBuffer;
using v8::String;
using v8::Uint32;
using v8::Uint32Array;
@@ -1158,6 +1159,60 @@ void GetZeroFillToggle(const FunctionCallbackInfo& args) {
args.GetReturnValue().Set(Uint32Array::New(ab, 0, 1));
}
+void DetachArrayBuffer(const FunctionCallbackInfo& args) {
+ Environment* env = Environment::GetCurrent(args);
+ if (args[0]->IsArrayBuffer()) {
+ Local buf = args[0].As();
+ if (buf->IsDetachable()) {
+ std::shared_ptr store = buf->GetBackingStore();
+ buf->Detach();
+ args.GetReturnValue().Set(ArrayBuffer::New(env->isolate(), store));
+ }
+ }
+}
+
+void CopyArrayBuffer(const FunctionCallbackInfo& args) {
+ // args[0] == Destination ArrayBuffer
+ // args[1] == Destination ArrayBuffer Offset
+ // args[2] == Source ArrayBuffer
+ // args[3] == Source ArrayBuffer Offset
+ // args[4] == bytesToCopy
+
+ CHECK(args[0]->IsArrayBuffer() || args[0]->IsSharedArrayBuffer());
+ CHECK(args[1]->IsUint32());
+ CHECK(args[2]->IsArrayBuffer() || args[2]->IsSharedArrayBuffer());
+ CHECK(args[3]->IsUint32());
+ CHECK(args[4]->IsUint32());
+
+ std::shared_ptr destination;
+ std::shared_ptr source;
+
+ if (args[0]->IsArrayBuffer()) {
+ destination = args[0].As()->GetBackingStore();
+ } else if (args[0]->IsSharedArrayBuffer()) {
+ destination = args[0].As()->GetBackingStore();
+ }
+
+ if (args[2]->IsArrayBuffer()) {
+ source = args[2].As()->GetBackingStore();
+ } else if (args[0]->IsSharedArrayBuffer()) {
+ source = args[2].As()->GetBackingStore();
+ }
+
+ uint32_t destination_offset = args[1].As()->Value();
+ uint32_t source_offset = args[3].As()->Value();
+ size_t bytes_to_copy = args[4].As()->Value();
+
+ CHECK(destination->ByteLength() - destination_offset >= bytes_to_copy);
+ CHECK(source->ByteLength() - source_offset >= bytes_to_copy);
+
+ uint8_t* dest =
+ static_cast(destination->Data()) + destination_offset;
+ uint8_t* src =
+ static_cast(source->Data()) + source_offset;
+ memcpy(dest, src, bytes_to_copy);
+}
+
void Initialize(Local