From befc23002dcd21cf7797bc60815be469d206bb9f Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Sat, 30 Oct 2021 23:56:24 +0200 Subject: [PATCH 1/8] Implement TransformStream using the public API of ReadableStream and WritableStream --- src/lib/transform-stream.ts | 400 ++++++++++++++++++++++++++++++++---- 1 file changed, 355 insertions(+), 45 deletions(-) diff --git a/src/lib/transform-stream.ts b/src/lib/transform-stream.ts index 59adc2c..815461a 100644 --- a/src/lib/transform-stream.ts +++ b/src/lib/transform-stream.ts @@ -1,18 +1,10 @@ import assert from '../stub/assert'; import { newPromise, promiseRejectedWith, promiseResolvedWith, transformPromiseWith } from './helpers/webidl'; -import type { ReadableStream, ReadableStreamDefaultController } from './readable-stream'; -import { CreateReadableStream } from './readable-stream'; -import { - ReadableStreamDefaultControllerCanCloseOrEnqueue, - ReadableStreamDefaultControllerClose, - ReadableStreamDefaultControllerEnqueue, - ReadableStreamDefaultControllerError, - ReadableStreamDefaultControllerGetDesiredSize, - ReadableStreamDefaultControllerHasBackpressure -} from './readable-stream/default-controller'; +import type { ReadableStreamDefaultController, ReadableStreamState } from './readable-stream'; +import { IsReadableStream, ReadableStream } from './readable-stream'; import type { QueuingStrategy, QueuingStrategySizeCallback } from './queuing-strategy'; -import type { WritableStream } from './writable-stream'; -import { CreateWritableStream, WritableStreamDefaultControllerErrorIfNeeded } from './writable-stream'; +import type { WritableStreamDefaultController, WritableStreamState } from './writable-stream'; +import { IsWritableStream, WritableStream } from './writable-stream'; import { typeIsObject } from './helpers/miscellaneous'; import { IsNonNegativeNumber } from './abstract-ops/miscellaneous'; import { convertQueuingStrategy } from './validators/queuing-strategy'; @@ -25,6 +17,7 @@ import { TransformerTransformCallback } from './transform-stream/transformer'; import { convertTransformer } from './validators/transformer'; +import type { ReadableStreamLike, WritableStreamLike } from './helpers/stream-like'; // Class TransformStream @@ -37,10 +30,41 @@ import { convertTransformer } from './validators/transformer'; * @public */ export class TransformStream { + /** + * The writable side of the transform stream. + * + * We use `WritableStreamLike` instead of `WritableStream` so we can only use the public API, + * and we don't accidentally depend on internal state or abstract operations. + * This allows for interoperability with native streams. + * @internal + */ + _writable!: WritableStreamLike; + /** @internal */ + _writableController!: WritableStreamDefaultController; + /** @internal */ + _writableState!: WritableStreamState; + /** @internal */ + _writableStoredError!: any; + /** @internal */ + _writableHasInFlightOperation!: boolean; + /** @internal */ + _writableStarted!: boolean; + /** + * The readable side of the transform stream. + * + * We use `ReadableStreamLike` instead of `ReadableStream` so we can only use the public API + * and allow for interoperability with native streams. + * @internal + */ + _readable!: ReadableStreamLike; + /** @internal */ + _readableController!: ReadableStreamDefaultController; /** @internal */ - _writable!: WritableStream; + _readableState!: ReadableStreamState; /** @internal */ - _readable!: ReadableStream; + _readableStoredError!: any; + /** @internal */ + _readableCloseRequested!: boolean; /** @internal */ _backpressure!: boolean; /** @internal */ @@ -103,7 +127,7 @@ export class TransformStream { throw streamBrandCheckException('readable'); } - return this._readable; + return this._readable as ReadableStream; } /** @@ -114,7 +138,7 @@ export class TransformStream { throw streamBrandCheckException('writable'); } - return this._writable; + return this._writable as WritableStream; } } @@ -189,8 +213,19 @@ function InitializeTransformStream(stream: TransformStream, return TransformStreamDefaultSinkCloseAlgorithm(stream); } - stream._writable = CreateWritableStream(startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, - writableHighWaterMark, writableSizeAlgorithm); + stream._writableState = 'writable'; + stream._writableStoredError = undefined; + stream._writableHasInFlightOperation = false; + stream._writableStarted = false; + stream._writable = CreateWritableStream( + stream, + startAlgorithm, + writeAlgorithm, + closeAlgorithm, + abortAlgorithm, + writableHighWaterMark, + writableSizeAlgorithm + ); function pullAlgorithm(): Promise { return TransformStreamDefaultSourcePullAlgorithm(stream); @@ -201,8 +236,17 @@ function InitializeTransformStream(stream: TransformStream, return promiseResolvedWith(undefined); } - stream._readable = CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, readableHighWaterMark, - readableSizeAlgorithm); + stream._readableState = 'readable'; + stream._readableStoredError = undefined; + stream._readableCloseRequested = false; + stream._readable = CreateReadableStream( + stream, + startAlgorithm, + pullAlgorithm, + cancelAlgorithm, + readableHighWaterMark, + readableSizeAlgorithm + ); // The [[backpressure]] slot is set to undefined so that it can be initialised by TransformStreamSetBackpressure. stream._backpressure = undefined!; @@ -227,16 +271,13 @@ function IsTransformStream(x: unknown): x is TransformStream { // This is a no-op if both sides are already errored. function TransformStreamError(stream: TransformStream, e: any) { - ReadableStreamDefaultControllerError( - stream._readable._readableStreamController as ReadableStreamDefaultController, - e - ); + ReadableStreamDefaultControllerError(stream, e); TransformStreamErrorWritableAndUnblockWrite(stream, e); } function TransformStreamErrorWritableAndUnblockWrite(stream: TransformStream, e: any) { TransformStreamDefaultControllerClearAlgorithms(stream._transformStreamController); - WritableStreamDefaultControllerErrorIfNeeded(stream._writable._writableStreamController, e); + WritableStreamDefaultControllerErrorIfNeeded(stream, e); if (stream._backpressure) { // Pretend that pull() was called to permit any pending write() calls to complete. TransformStreamSetBackpressure() // cannot be called from enqueue() or pull() once the ReadableStream is errored, so this will will be the final time @@ -287,8 +328,8 @@ export class TransformStreamDefaultController { throw defaultControllerBrandCheckException('desiredSize'); } - const readableController = this._controlledTransformStream._readable._readableStreamController; - return ReadableStreamDefaultControllerGetDesiredSize(readableController as ReadableStreamDefaultController); + const stream = this._controlledTransformStream; + return ReadableStreamDefaultControllerGetDesiredSize(stream); } /** @@ -405,8 +446,7 @@ function TransformStreamDefaultControllerClearAlgorithms(controller: TransformSt function TransformStreamDefaultControllerEnqueue(controller: TransformStreamDefaultController, chunk: O) { const stream = controller._controlledTransformStream; - const readableController = stream._readable._readableStreamController as ReadableStreamDefaultController; - if (!ReadableStreamDefaultControllerCanCloseOrEnqueue(readableController)) { + if (!ReadableStreamDefaultControllerCanCloseOrEnqueue(stream)) { throw new TypeError('Readable side is not in a state that permits enqueue'); } @@ -414,15 +454,15 @@ function TransformStreamDefaultControllerEnqueue(controller: TransformStreamD // accept TransformStreamDefaultControllerEnqueue() calls. try { - ReadableStreamDefaultControllerEnqueue(readableController, chunk); + ReadableStreamDefaultControllerEnqueue(stream, chunk); } catch (e) { // This happens when readableStrategy.size() throws. TransformStreamErrorWritableAndUnblockWrite(stream, e); - throw stream._readable._storedError; + throw stream._readableStoredError; } - const backpressure = ReadableStreamDefaultControllerHasBackpressure(readableController); + const backpressure = ReadableStreamDefaultControllerHasBackpressure(stream); if (backpressure !== stream._backpressure) { assert(backpressure); TransformStreamSetBackpressure(stream, true); @@ -444,9 +484,10 @@ function TransformStreamDefaultControllerPerformTransform(controller: Tran function TransformStreamDefaultControllerTerminate(controller: TransformStreamDefaultController) { const stream = controller._controlledTransformStream; - const readableController = stream._readable._readableStreamController as ReadableStreamDefaultController; - ReadableStreamDefaultControllerClose(readableController); + if (ReadableStreamDefaultControllerCanCloseOrEnqueue(stream)) { + ReadableStreamDefaultControllerClose(stream); + } const error = new TypeError('TransformStream terminated'); TransformStreamErrorWritableAndUnblockWrite(stream, error); @@ -455,7 +496,7 @@ function TransformStreamDefaultControllerTerminate(controller: TransformStrea // TransformStreamDefaultSink Algorithms function TransformStreamDefaultSinkWriteAlgorithm(stream: TransformStream, chunk: I): Promise { - assert(stream._writable._state === 'writable'); + assert(stream._writableState === 'writable'); const controller = stream._transformStreamController; @@ -463,10 +504,9 @@ function TransformStreamDefaultSinkWriteAlgorithm(stream: TransformStream< const backpressureChangePromise = stream._backpressureChangePromise; assert(backpressureChangePromise !== undefined); return transformPromiseWith(backpressureChangePromise, () => { - const writable = stream._writable; - const state = writable._state; + const state = stream._writableState; if (state === 'erroring') { - throw writable._storedError; + throw stream._writableStoredError; } assert(state === 'writable'); return TransformStreamDefaultControllerPerformTransform(controller, chunk); @@ -484,22 +524,21 @@ function TransformStreamDefaultSinkAbortAlgorithm(stream: TransformStream, reaso } function TransformStreamDefaultSinkCloseAlgorithm(stream: TransformStream): Promise { - // stream._readable cannot change after construction, so caching it across a call to user code is safe. - const readable = stream._readable; - const controller = stream._transformStreamController; const flushPromise = controller._flushAlgorithm(); TransformStreamDefaultControllerClearAlgorithms(controller); // Return a promise that is fulfilled with undefined on success. return transformPromiseWith(flushPromise, () => { - if (readable._state === 'errored') { - throw readable._storedError; + if (stream._readableState === 'errored') { + throw stream._readableStoredError; + } + if (ReadableStreamDefaultControllerCanCloseOrEnqueue(stream)) { + ReadableStreamDefaultControllerClose(stream); } - ReadableStreamDefaultControllerClose(readable._readableStreamController as ReadableStreamDefaultController); }, r => { TransformStreamError(stream, r); - throw readable._storedError; + throw stream._readableStoredError; }); } @@ -530,3 +569,274 @@ function streamBrandCheckException(name: string): TypeError { return new TypeError( `TransformStream.prototype.${name} can only be used on a TransformStream`); } +// Stubs for abstract operations used from ReadableStream and WritableStream. + +function CreateReadableStream(stream: TransformStream, + startAlgorithm: () => Promise, + pullAlgorithm: () => Promise, + cancelAlgorithm: (reason: any) => Promise, + highWaterMark: number, + sizeAlgorithm: QueuingStrategySizeCallback): ReadableStream { + return new ReadableStream({ + start(controller) { + stream._readableController = controller; + return startAlgorithm().catch(e => { + ReadableStreamDefaultControllerError(stream, e); + }); + }, + pull() { + return pullAlgorithm().catch(e => { + ReadableStreamDefaultControllerError(stream, e); + }); + }, + cancel(reason) { + assert(stream._readableState === 'readable'); + stream._readableState = 'closed'; + ReadableStreamAssertState(stream); + return cancelAlgorithm(reason); + } + }, { highWaterMark, size: sizeAlgorithm }); +} + +function ReadableStreamDefaultControllerCanCloseOrEnqueue(stream: TransformStream): boolean { + const state = stream._readableState; + + if (!stream._readableCloseRequested && state === 'readable') { + return true; + } + + return false; +} + +function ReadableStreamDefaultControllerClose(stream: TransformStream): void { + assert(stream._readableState === 'readable'); + assert(!stream._readableCloseRequested); + + // This is incorrect: if there are still queued chunks, the stream remains 'readable' until they have been read. + // Luckily, this does not matter for ReadableStreamDefaultControllerCanCloseOrEnqueue. + stream._readableState = 'closed'; + stream._readableCloseRequested = true; + + stream._readableController.close(); + ReadableStreamAssertState(stream); +} + +function ReadableStreamDefaultControllerEnqueue(stream: TransformStream, chunk: R): void { + try { + stream._readableController.enqueue(chunk); + ReadableStreamAssertState(stream); + } catch (e) { + ReadableStreamDefaultControllerError(stream, e); + ReadableStreamAssertState(stream); + throw e; + } +} + +function ReadableStreamDefaultControllerError(stream: TransformStream, e: any) { + if (stream._readableState === 'readable') { + stream._readableState = 'errored'; + stream._readableStoredError = e; + } + + stream._readableController.error(e); + ReadableStreamAssertState(stream); +} + +function ReadableStreamDefaultControllerGetDesiredSize(stream: TransformStream): number | null { + return stream._readableController.desiredSize; +} + +function ReadableStreamDefaultControllerHasBackpressure(stream: TransformStream): boolean { + return ReadableStreamDefaultControllerGetDesiredSize(stream)! <= 0; +} + +function ReadableStreamAssertState(stream: TransformStream): void { + if (DEBUG && IsReadableStream(stream._readable)) { + const readable = stream._readable; + const controller = stream._readableController; + // If closeRequested = true, we cannot know if the readable stream's state is 'readable' or 'closed'. + // This also means we cannot know whether readable.cancel() can still change the state to 'errored' or not. + // Luckily, this does not matter for ReadableStreamDefaultControllerCanCloseOrEnqueue. + if (!(controller._closeRequested && stream._readableCloseRequested)) { + assert(readable._state === stream._readableState); + assert(readable._storedError === stream._readableStoredError); + } + assert(controller._closeRequested === stream._readableCloseRequested); + } +} + +function CreateWritableStream(stream: TransformStream, + startAlgorithm: () => Promise, + writeAlgorithm: (chunk: W) => Promise, + closeAlgorithm: () => Promise, + abortAlgorithm: (reason: any) => Promise, + highWaterMark: number, + sizeAlgorithm: QueuingStrategySizeCallback): WritableStream { + return new WritableStream({ + start(controller) { + stream._writableController = controller; + return startAlgorithm().then( + () => { + assert(stream._writableState === 'writable' || stream._writableState === 'erroring'); + stream._writableStarted = true; + WritableStreamDefaultControllerAdvanceQueueIfNeeded(stream); + }, + r => { + assert(stream._writableState === 'writable' || stream._writableState === 'erroring'); + stream._writableStarted = true; + WritableStreamDealWithRejection(stream, r); + throw r; + } + ); + }, + write(chunk) { + WritableStreamMarkFirstWriteRequestInFlight(stream); + return writeAlgorithm(chunk).then( + () => { + WritableStreamFinishInFlightWrite(stream); + assert(stream._writableState === 'writable' || stream._writableState === 'erroring'); + WritableStreamDefaultControllerAdvanceQueueIfNeeded(stream); + }, + reason => { + WritableStreamFinishInFlightWriteWithError(stream, reason); + throw reason; + }); + }, + close() { + WritableStreamMarkCloseRequestInFlight(stream); + return closeAlgorithm().then(() => { + WritableStreamFinishInFlightClose(stream); + }, e => { + WritableStreamFinishInFlightCloseWithError(stream, e); + throw e; + }); + }, + abort(reason) { + stream._writableState = 'errored'; + stream._writableStoredError = reason; + WritableStreamAssertState(stream); + return abortAlgorithm(reason); + } + }, { highWaterMark, size: sizeAlgorithm }); +} + +function WritableStreamDealWithRejection(stream: TransformStream, error: any) { + const state = stream._writableState; + + if (state === 'writable') { + WritableStreamStartErroring(stream, error); + return; + } + + assert(state === 'erroring'); + WritableStreamFinishErroring(stream); +} + +function WritableStreamStartErroring(stream: TransformStream, reason: any) { + assert(stream._writableStoredError === undefined); + assert(stream._writableState === 'writable'); + + stream._writableState = 'erroring'; + stream._writableStoredError = reason; + + if (!WritableStreamHasOperationMarkedInFlight(stream) && stream._writableStarted) { + WritableStreamFinishErroring(stream); + } + + WritableStreamAssertState(stream); +} + +function WritableStreamFinishErroring(stream: TransformStream) { + assert(stream._writableState === 'erroring'); + assert(!WritableStreamHasOperationMarkedInFlight(stream)); + stream._writableState = 'errored'; + + WritableStreamAssertState(stream); +} + +function WritableStreamFinishInFlightWrite(stream: TransformStream) { + assert(stream._writableHasInFlightOperation); + stream._writableHasInFlightOperation = false; +} + +function WritableStreamFinishInFlightWriteWithError(stream: TransformStream, error: any) { + assert(stream._writableHasInFlightOperation); + stream._writableHasInFlightOperation = false; + + assert(stream._writableState === 'writable' || stream._writableState === 'erroring'); + + WritableStreamDealWithRejection(stream, error); +} + +function WritableStreamFinishInFlightClose(stream: TransformStream) { + assert(stream._writableHasInFlightOperation); + stream._writableHasInFlightOperation = false; + + const state = stream._writableState; + assert(state === 'writable' || state === 'erroring'); + + if (state === 'erroring') { + // The error was too late to do anything, so it is ignored. + stream._writableStoredError = undefined; + } + + stream._writableState = 'closed'; + + assert(stream._writableStoredError === undefined); + WritableStreamAssertState(stream); +} + +function WritableStreamFinishInFlightCloseWithError(stream: TransformStream, error: any) { + assert(stream._writableHasInFlightOperation); + stream._writableHasInFlightOperation = false; + + const state = stream._writableState; + + assert(state === 'writable' || state === 'erroring'); + + WritableStreamDealWithRejection(stream, error); + WritableStreamAssertState(stream); +} + +function WritableStreamHasOperationMarkedInFlight(stream: TransformStream): boolean { + return stream._writableHasInFlightOperation; +} + +function WritableStreamMarkCloseRequestInFlight(stream: TransformStream) { + assert(!stream._writableHasInFlightOperation); + stream._writableHasInFlightOperation = true; +} + +function WritableStreamMarkFirstWriteRequestInFlight(stream: TransformStream) { + assert(!stream._writableHasInFlightOperation); + stream._writableHasInFlightOperation = true; +} + +function WritableStreamDefaultControllerAdvanceQueueIfNeeded(stream: TransformStream) { + const state = stream._writableState; + assert(state !== 'closed' && state !== 'errored'); + if (state === 'erroring') { + WritableStreamFinishErroring(stream); + } + WritableStreamAssertState(stream); +} + +function WritableStreamDefaultControllerErrorIfNeeded(stream: TransformStream, error: any) { + stream._writableController.error(error); + + const state = stream._writableState; + if (state === 'writable') { + WritableStreamStartErroring(stream, error); + } +} + +function WritableStreamAssertState(stream: TransformStream): void { + if (DEBUG && IsWritableStream(stream._writable)) { + // Check state asynchronously, because we update our state before we update the actual writable stream. + setTimeout(() => { + const writable = stream._writable as WritableStream; + assert(writable._state === stream._writableState); + assert(writable._storedError === stream._writableStoredError); + }, 0); + } +} From cd31fbb21d3a8598a7a2890a32304d00310d50c2 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Sun, 31 Oct 2021 00:01:14 +0200 Subject: [PATCH 2/8] Detect abort() calls synchronously if possible --- src/lib/transform-stream.ts | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/lib/transform-stream.ts b/src/lib/transform-stream.ts index 815461a..44a0bd2 100644 --- a/src/lib/transform-stream.ts +++ b/src/lib/transform-stream.ts @@ -675,6 +675,19 @@ function CreateWritableStream(stream: TransformStream, return new WritableStream({ start(controller) { stream._writableController = controller; + const abortSignal = controller.signal; + if (abortSignal !== undefined) { + // `controller.signal` must be supported in order to synchronously detect `writable.abort()` calls + // when there are pending writes. + abortSignal.addEventListener('abort', () => { + assert(stream._writableState === 'writable' || stream._writableState === 'erroring'); + if (stream._writableState === 'writable') { + stream._writableState = 'erroring'; + stream._writableStoredError = controller.abortReason; + } + WritableStreamAssertState(stream); + }); + } return startAlgorithm().then( () => { assert(stream._writableState === 'writable' || stream._writableState === 'erroring'); From 2687b308b8a5b13554a7bdc2a9396ccccf003a24 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Sun, 31 Oct 2021 00:34:31 +0200 Subject: [PATCH 3/8] Simplify --- src/lib/readable-stream/default-controller.ts | 14 ++------------ src/lib/transform-stream.ts | 8 +------- 2 files changed, 3 insertions(+), 19 deletions(-) diff --git a/src/lib/readable-stream/default-controller.ts b/src/lib/readable-stream/default-controller.ts index 63d6372..d4bf14e 100644 --- a/src/lib/readable-stream/default-controller.ts +++ b/src/lib/readable-stream/default-controller.ts @@ -307,23 +307,13 @@ export function ReadableStreamDefaultControllerGetDesiredSize( export function ReadableStreamDefaultControllerHasBackpressure( controller: ReadableStreamDefaultController ): boolean { - if (ReadableStreamDefaultControllerShouldCallPull(controller)) { - return false; - } - - return true; + return !ReadableStreamDefaultControllerShouldCallPull(controller); } export function ReadableStreamDefaultControllerCanCloseOrEnqueue( controller: ReadableStreamDefaultController ): boolean { - const state = controller._controlledReadableStream._state; - - if (!controller._closeRequested && state === 'readable') { - return true; - } - - return false; + return !controller._closeRequested && controller._controlledReadableStream._state === 'readable'; } export function SetUpReadableStreamDefaultController(stream: ReadableStream, diff --git a/src/lib/transform-stream.ts b/src/lib/transform-stream.ts index 44a0bd2..13cc154 100644 --- a/src/lib/transform-stream.ts +++ b/src/lib/transform-stream.ts @@ -599,13 +599,7 @@ function CreateReadableStream(stream: TransformStream, } function ReadableStreamDefaultControllerCanCloseOrEnqueue(stream: TransformStream): boolean { - const state = stream._readableState; - - if (!stream._readableCloseRequested && state === 'readable') { - return true; - } - - return false; + return !stream._readableCloseRequested && stream._readableState === 'readable'; } function ReadableStreamDefaultControllerClose(stream: TransformStream): void { From a8e851e0edb57ced33383de5228fcceb2002f300 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Sat, 6 Nov 2021 16:33:41 +0100 Subject: [PATCH 4/8] Improve HasBackpressure check --- src/lib/transform-stream.ts | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/src/lib/transform-stream.ts b/src/lib/transform-stream.ts index 13cc154..f29a34d 100644 --- a/src/lib/transform-stream.ts +++ b/src/lib/transform-stream.ts @@ -66,6 +66,8 @@ export class TransformStream { /** @internal */ _readableCloseRequested!: boolean; /** @internal */ + _readablePulling!: boolean; + /** @internal */ _backpressure!: boolean; /** @internal */ _backpressureChangePromise!: Promise; @@ -239,6 +241,7 @@ function InitializeTransformStream(stream: TransformStream, stream._readableState = 'readable'; stream._readableStoredError = undefined; stream._readableCloseRequested = false; + stream._readablePulling = false; stream._readable = CreateReadableStream( stream, startAlgorithm, @@ -585,6 +588,8 @@ function CreateReadableStream(stream: TransformStream, }); }, pull() { + assert(!stream._readablePulling); + stream._readablePulling = true; return pullAlgorithm().catch(e => { ReadableStreamDefaultControllerError(stream, e); }); @@ -616,6 +621,10 @@ function ReadableStreamDefaultControllerClose(stream: TransformStream): void { } function ReadableStreamDefaultControllerEnqueue(stream: TransformStream, chunk: R): void { + // If there is backpressure, enqueue() will not call pull(), and stream._readablePulling will remain false. + // If there is no backpressure, enqueue() will call pull() and change stream._readablePulling back to true. + stream._readablePulling = false; + try { stream._readableController.enqueue(chunk); ReadableStreamAssertState(stream); @@ -641,7 +650,27 @@ function ReadableStreamDefaultControllerGetDesiredSize(stream: TransformStream): } function ReadableStreamDefaultControllerHasBackpressure(stream: TransformStream): boolean { - return ReadableStreamDefaultControllerGetDesiredSize(stream)! <= 0; + return !ReadableStreamDefaultControllerShouldCallPull(stream); +} + +function ReadableStreamDefaultControllerShouldCallPull(stream: TransformStream): boolean { + if (!ReadableStreamDefaultControllerCanCloseOrEnqueue(stream)) { + return false; + } + + // Instead of checking whether there are any pending read() requests, we check if pull() was called. + // if (IsReadableStreamLocked(stream._readable) && ReadableStreamGetNumReadRequests(stream._readable) > 0) { + if (stream._readablePulling) { + return true; + } + + const desiredSize = ReadableStreamDefaultControllerGetDesiredSize(stream); + assert(desiredSize !== null); + if (desiredSize! > 0) { + return true; + } + + return false; } function ReadableStreamAssertState(stream: TransformStream): void { From 13335cfd221a38e3ad25d571f1de05e2faea2126 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Sat, 6 Nov 2021 16:43:25 +0100 Subject: [PATCH 5/8] Ignore errors from accessing controller.signal --- src/lib/transform-stream.ts | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/lib/transform-stream.ts b/src/lib/transform-stream.ts index f29a34d..f23f649 100644 --- a/src/lib/transform-stream.ts +++ b/src/lib/transform-stream.ts @@ -698,18 +698,22 @@ function CreateWritableStream(stream: TransformStream, return new WritableStream({ start(controller) { stream._writableController = controller; - const abortSignal = controller.signal; - if (abortSignal !== undefined) { + try { // `controller.signal` must be supported in order to synchronously detect `writable.abort()` calls // when there are pending writes. - abortSignal.addEventListener('abort', () => { - assert(stream._writableState === 'writable' || stream._writableState === 'erroring'); - if (stream._writableState === 'writable') { - stream._writableState = 'erroring'; - stream._writableStoredError = controller.abortReason; - } - WritableStreamAssertState(stream); - }); + const abortSignal = controller.signal; + if (abortSignal !== undefined) { + abortSignal.addEventListener('abort', () => { + assert(stream._writableState === 'writable' || stream._writableState === 'erroring'); + if (stream._writableState === 'writable') { + stream._writableState = 'erroring'; + stream._writableStoredError = controller.abortReason; + } + WritableStreamAssertState(stream); + }); + } + } catch { + // WritableStreamDefaultController.prototype.signal throws if its brand check fails } return startAlgorithm().then( () => { From 877e9d3201eb35f13ab8ca6c7bd11e01f31ab60a Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Sat, 6 Nov 2021 16:48:39 +0100 Subject: [PATCH 6/8] Use real AbortController polyfill in Node unit tests --- package-lock.json | 15 +++++++++++++++ package.json | 1 + test/unit/readable-stream/basic.spec.js | 12 +++++++----- test/unit/readable-stream/regression.spec.js | 1 + test/unit/util/fake-abort-signal.js | 17 ----------------- 5 files changed, 24 insertions(+), 22 deletions(-) delete mode 100644 test/unit/util/fake-abort-signal.js diff --git a/package-lock.json b/package-lock.json index 94123d4..3e6258c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -422,6 +422,15 @@ "integrity": "sha512-9IK9EadsbHo6jLWIpxpR6pL0sazTXV6+SQv25ZB+F7Bj9mJNaOc4nCRabwd5M/JwmUa8idz6Eci6eKfJryPs6Q==", "dev": true }, + "abort-controller": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/abort-controller/-/abort-controller-3.0.0.tgz", + "integrity": "sha512-h8lQ8tacZYnR3vNQTgibj+tODHI5/+l06Au2Pcriv/Gmet0eaj4TwWH41sO9wnHDiQsEj19q0drzdWdeAHtweg==", + "dev": true, + "requires": { + "event-target-shim": "^5.0.0" + } + }, "acorn": { "version": "7.4.1", "resolved": "https://registry.npmjs.org/acorn/-/acorn-7.4.1.tgz", @@ -1119,6 +1128,12 @@ "integrity": "sha512-kVscqXk4OCp68SZ0dkgEKVi6/8ij300KBWTJq32P/dYeWTSwK41WyTxalN1eRmA5Z9UU/LX9D7FWSmV9SAYx6g==", "dev": true }, + "event-target-shim": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/event-target-shim/-/event-target-shim-5.0.1.tgz", + "integrity": "sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ==", + "dev": true + }, "extract-zip": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/extract-zip/-/extract-zip-2.0.1.tgz", diff --git a/package.json b/package.json index 1d22d94..7d89ba3 100644 --- a/package.json +++ b/package.json @@ -90,6 +90,7 @@ "@typescript-eslint/eslint-plugin": "^4.29.3", "@typescript-eslint/parser": "^4.29.3", "@ungap/promise-all-settled": "^1.1.2", + "abort-controller": "^3.0.0", "eslint": "^7.32.0", "jasmine": "^3.9.0", "micromatch": "^4.0.4", diff --git a/test/unit/readable-stream/basic.spec.js b/test/unit/readable-stream/basic.spec.js index 507db34..465644a 100644 --- a/test/unit/readable-stream/basic.spec.js +++ b/test/unit/readable-stream/basic.spec.js @@ -1,5 +1,6 @@ +require('abort-controller/polyfill'); const { ReadableStream, WritableStream } = require('web-streams-polyfill'); -const { FakeAbortSignal } = require('../util/fake-abort-signal'); +const { AbortController } = require('abort-controller'); describe('ReadableStream', () => { describe('constructor', () => { @@ -34,8 +35,8 @@ describe('ReadableStream', () => { } }); const ws = new WritableStream(); - const signal = new FakeAbortSignal(false); - await rs.pipeTo(ws, { signal }); + const controller = new AbortController(); + await rs.pipeTo(ws, { signal: controller.signal }); }); it('rejects with an AbortError when aborted', async () => { const rs = new ReadableStream({ @@ -45,9 +46,10 @@ describe('ReadableStream', () => { } }); const ws = new WritableStream(); - const signal = new FakeAbortSignal(true); + const controller = new AbortController(); + controller.abort(); try { - await rs.pipeTo(ws, { signal }); + await rs.pipeTo(ws, { signal: controller.signal }); fail('should have rejected'); } catch (e) { expect(e.name).toBe('AbortError'); diff --git a/test/unit/readable-stream/regression.spec.js b/test/unit/readable-stream/regression.spec.js index ef2aa15..f96229e 100644 --- a/test/unit/readable-stream/regression.spec.js +++ b/test/unit/readable-stream/regression.spec.js @@ -1,3 +1,4 @@ +require('abort-controller/polyfill'); const { ReadableStream, WritableStream, TransformStream } = require('web-streams-polyfill'); describe('ReadableStream regressions', () => { diff --git a/test/unit/util/fake-abort-signal.js b/test/unit/util/fake-abort-signal.js deleted file mode 100644 index b94fab3..0000000 --- a/test/unit/util/fake-abort-signal.js +++ /dev/null @@ -1,17 +0,0 @@ -class FakeAbortSignal { - constructor(aborted) { - this.aborted = aborted; - } - - addEventListener(type, listener) { - return; - } - - removeEventListener(type, listener) { - return; - } -} - -module.exports = { - FakeAbortSignal -}; From fd103e6894b3f22f2793ff88018d570a17379bf3 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Wed, 17 Nov 2021 16:06:23 +0100 Subject: [PATCH 7/8] Signal abort with reason --- etc/web-streams-polyfill.api.md | 1 + src/lib/abort-signal.ts | 7 ++++++- src/lib/transform-stream.ts | 2 +- src/lib/writable-stream.ts | 2 +- 4 files changed, 9 insertions(+), 3 deletions(-) diff --git a/etc/web-streams-polyfill.api.md b/etc/web-streams-polyfill.api.md index 8240134..1241479 100644 --- a/etc/web-streams-polyfill.api.md +++ b/etc/web-streams-polyfill.api.md @@ -10,6 +10,7 @@ export interface AbortSignal { readonly aborted: boolean; addEventListener(type: 'abort', listener: () => void): void; + readonly reason?: any; removeEventListener(type: 'abort', listener: () => void): void; } diff --git a/src/lib/abort-signal.ts b/src/lib/abort-signal.ts index 44bbeaf..8ff94d9 100644 --- a/src/lib/abort-signal.ts +++ b/src/lib/abort-signal.ts @@ -15,6 +15,11 @@ export interface AbortSignal { */ readonly aborted: boolean; + /** + * The abort reason. + */ + readonly reason?: any; + /** * Add an event listener to be triggered when this signal becomes aborted. */ @@ -51,7 +56,7 @@ export function isAbortSignal(value: unknown): value is AbortSignal { export interface AbortController { readonly signal: AbortSignal; - abort(): void; + abort(reason?: any): void; } interface AbortControllerConstructor { diff --git a/src/lib/transform-stream.ts b/src/lib/transform-stream.ts index f23f649..a96e44d 100644 --- a/src/lib/transform-stream.ts +++ b/src/lib/transform-stream.ts @@ -707,7 +707,7 @@ function CreateWritableStream(stream: TransformStream, assert(stream._writableState === 'writable' || stream._writableState === 'erroring'); if (stream._writableState === 'writable') { stream._writableState = 'erroring'; - stream._writableStoredError = controller.abortReason; + stream._writableStoredError = abortSignal.reason ?? controller.abortReason; } WritableStreamAssertState(stream); }); diff --git a/src/lib/writable-stream.ts b/src/lib/writable-stream.ts index f014dff..2333399 100644 --- a/src/lib/writable-stream.ts +++ b/src/lib/writable-stream.ts @@ -292,7 +292,7 @@ function WritableStreamAbort(stream: WritableStream, reason: any): Promise Date: Wed, 17 Nov 2021 16:13:23 +0100 Subject: [PATCH 8/8] Use special case for our own WritableStream when detecting 'erroring' --- src/lib/transform-stream.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/lib/transform-stream.ts b/src/lib/transform-stream.ts index a96e44d..cb8dd08 100644 --- a/src/lib/transform-stream.ts +++ b/src/lib/transform-stream.ts @@ -507,9 +507,11 @@ function TransformStreamDefaultSinkWriteAlgorithm(stream: TransformStream< const backpressureChangePromise = stream._backpressureChangePromise; assert(backpressureChangePromise !== undefined); return transformPromiseWith(backpressureChangePromise, () => { - const state = stream._writableState; + // Since we can only detect abort() calls through controller.signal, and AbortSignal.reason may not be + // supported on this platform, we use a special case for our own WritableStream to access the state and error. + const state = IsWritableStream(stream._writable) ? stream._writable._state : stream._writableState; if (state === 'erroring') { - throw stream._writableStoredError; + throw IsWritableStream(stream._writable) ? stream._writable._storedError : stream._writableStoredError; } assert(state === 'writable'); return TransformStreamDefaultControllerPerformTransform(controller, chunk); @@ -707,7 +709,9 @@ function CreateWritableStream(stream: TransformStream, assert(stream._writableState === 'writable' || stream._writableState === 'erroring'); if (stream._writableState === 'writable') { stream._writableState = 'erroring'; - stream._writableStoredError = abortSignal.reason ?? controller.abortReason; + if (abortSignal.reason) { + stream._writableStoredError = abortSignal.reason; + } } WritableStreamAssertState(stream); });