From d36392003b381411e8f46259efc93266c45a5d39 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 2 Aug 2021 13:08:32 +0200 Subject: [PATCH 1/6] stream: add isDisturbed helper Adds a helper util used to determine whether a stream has been disturbed (read or cancelled). Refs: https://github.com/nodejs/node/issues/39627 --- lib/internal/streams/utils.js | 13 +++++++++++++ lib/internal/webstreams/readablestream.js | 9 +++++++++ lib/stream.js | 1 + 3 files changed, 23 insertions(+) diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 01396b5113340f..0b7f80ea1b93b6 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -7,6 +7,7 @@ const { } = primordials; const kDestroyed = Symbol('kDestroyed'); +const kIsDisturbed = Symbol('kIsDisturbed'); function isReadableNodeStream(obj) { return !!( @@ -195,8 +196,20 @@ function willEmitClose(stream) { ); } +function isDisturbed(stream) { + const state = stream && stream._readableState; + return stream && ( + stream.readableDidRead || + isDestroyed(stream) || + stream[kIsDisturbed] || + (state && state.endEmitted) + ); +} + module.exports = { kDestroyed, + isDisturbed, + kIsDisturbed, isClosed, isDestroyed, isFinished, diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 62da68ff28086f..efe890b7ebedaf 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -80,6 +80,10 @@ const { queueMicrotask, } = require('internal/process/task_queues'); +const { + kIsDisturbed, +} = require('internal/stream/utils'); + const { ArrayBufferViewGetBuffer, ArrayBufferViewGetByteLength, @@ -200,6 +204,7 @@ class ReadableStream { promise: undefined, } }; + // The spec requires handling of the strategy first // here. Specifically, if getting the size and // highWaterMark from the strategy fail, that has @@ -232,6 +237,10 @@ class ReadableStream { return makeTransferable(this); } + get [kIsDisturbed]() { + return this[kState].disturbed; + } + /** * @readonly * @type {boolean} diff --git a/lib/stream.js b/lib/stream.js index b84efb0fd8862d..43f59788f62bc8 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -38,6 +38,7 @@ const internalBuffer = require('internal/buffer'); const promises = require('stream/promises'); const Stream = module.exports = require('internal/streams/legacy').Stream; +Stream.isDisturbed = require('internal/streams/utils').isDisturbed; Stream.Readable = require('internal/streams/readable'); Stream.Writable = require('internal/streams/writable'); Stream.Duplex = require('internal/streams/duplex'); From ac0a1df2de4cc7c495ecd88932441ea9a8cf33c7 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 2 Aug 2021 13:41:17 +0200 Subject: [PATCH 2/6] fixup --- doc/api/stream.md | 12 +++++ lib/internal/streams/readable.js | 2 +- lib/internal/streams/utils.js | 4 +- lib/internal/webstreams/readablestream.js | 2 +- test/parallel/test-stream-readable-didRead.js | 4 +- test/parallel/test-whatwg-readablestream.js | 50 +++++++++++++++++++ 6 files changed, 69 insertions(+), 5 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 312f200e61c4d5..3e5ee464f32682 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2046,6 +2046,18 @@ added: REPLACEME * `signal` {AbortSignal} * Returns: {stream.Readable} +### `stream.Readable.isDisturbed(stream)` + + +> Stability: 1 - Experimental + +* `stream` {stream.Readable|ReadableStream} +* Returns: `boolean` + +Returns whether the stream has been read from. + ### `stream.Readable.toWeb(streamReadable)` + +> Stability: 1 - Experimental + +* {boolean} + +Returns whether the stream was destroyed before `'end'` has been emitted. + ##### `readable.readableDidRead` +> Stability: 1 - Experimental + * {boolean} -Allows determining if the stream has been or is about to be read. -Returns true if `'data'`, `'end'`, `'error'` or `'close'` has been -emitted. +Returns whether `'data'` has been emitted. ##### `readable.readableEncoding`