From d7d719dc4923d60574b2a2d0ec2ae0f874996734 Mon Sep 17 00:00:00 2001 From: ehmicky Date: Sun, 10 Mar 2024 12:06:09 +0000 Subject: [PATCH] Allow multiple readers at once --- package.json | 4 + source/contents.js | 10 +- source/stream.js | 62 ++++++++ test/fixtures/index.js | 2 + test/stream.js | 313 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 385 insertions(+), 6 deletions(-) create mode 100644 source/stream.js create mode 100644 test/stream.js diff --git a/package.json b/package.json index 59d1b4d..a2de604 100644 --- a/package.json +++ b/package.json @@ -42,9 +42,13 @@ "object", "concat" ], + "dependencies": { + "is-stream": "^4.0.1" + }, "devDependencies": { "@types/node": "^20.8.9", "ava": "^5.3.1", + "onetime": "^7.0.0", "precise-now": "^3.0.0", "stream-json": "^1.8.0", "tsd": "^0.29.0", diff --git a/source/contents.js b/source/contents.js index 20cb92e..e41bdb5 100644 --- a/source/contents.js +++ b/source/contents.js @@ -1,13 +1,13 @@ +import {getAsyncIterable} from './stream.js'; + export const getStreamContents = async (stream, {init, convertChunk, getSize, truncateChunk, addChunk, getFinalChunk, finalize}, {maxBuffer = Number.POSITIVE_INFINITY} = {}) => { - if (!isAsyncIterable(stream)) { - throw new Error('The first argument must be a Readable, a ReadableStream, or an async iterable.'); - } + const asyncIterable = getAsyncIterable(stream); const state = init(); state.length = 0; try { - for await (const chunk of stream) { + for await (const chunk of asyncIterable) { const chunkType = getChunkType(chunk); const convertedChunk = convertChunk[chunkType](chunk, state); appendChunk({convertedChunk, state, getSize, truncateChunk, addChunk, maxBuffer}); @@ -52,8 +52,6 @@ const addNewChunk = (convertedChunk, state, addChunk, newLength) => { state.length = newLength; }; -const isAsyncIterable = stream => typeof stream === 'object' && stream !== null && typeof stream[Symbol.asyncIterator] === 'function'; - const getChunkType = chunk => { const typeOfChunk = typeof chunk; diff --git a/source/stream.js b/source/stream.js new file mode 100644 index 0000000..d069ca2 --- /dev/null +++ b/source/stream.js @@ -0,0 +1,62 @@ +import {isReadableStream} from 'is-stream'; + +export const getAsyncIterable = stream => { + if (isReadableStream(stream, {checkOpen: false})) { + return getStreamIterable(stream); + } + + if (typeof stream?.[Symbol.asyncIterator] !== 'function') { + throw new TypeError('The first argument must be a Readable, a ReadableStream, or an async iterable.'); + } + + return stream; +}; + +// The default iterable for Node.js streams does not allow for multiple readers at once, so we re-implement it +const getStreamIterable = async function * (stream) { + if (nodeImports === undefined) { + await loadNodeImports(); + } + + const controller = new AbortController(); + handleStreamEnd(stream, controller); + + try { + for await (const [chunk] of nodeImports.events.on(stream, 'data', { + signal: controller.signal, + highWatermark: stream.readableHighWaterMark, + })) { + yield chunk; + } + } catch (error) { + if (!controller.signal.aborted || error.cause === undefined) { + throw error; + } else if (error.cause !== abortError) { + throw error.cause; + } + } finally { + stream.destroy(); + } +}; + +const handleStreamEnd = async (stream, controller) => { + try { + await nodeImports.streamPromises.finished(stream, {cleanup: true, readable: true, writable: false, error: false}); + controller.abort(abortError); + } catch (error) { + controller.abort(error); + } +}; + +const abortError = new Error('Internal error'); + +// Use dynamic imports to support browsers +const loadNodeImports = async () => { + const [events, streamPromises] = await Promise.all([ + import('node:events'), + import('node:stream/promises'), + ]); + nodeImports = {events, streamPromises}; +}; + +let nodeImports; diff --git a/test/fixtures/index.js b/test/fixtures/index.js index 754ff1e..a9c635f 100644 --- a/test/fixtures/index.js +++ b/test/fixtures/index.js @@ -31,3 +31,5 @@ export const fixtureMultibyteString = '\u1000'; export const longMultibyteString = `${fixtureMultibyteString}\u1000`; export const bigArray = Array.from({length: 1e5}, () => Math.floor(Math.random() * (2 ** 8))); + +export const prematureClose = {code: 'ERR_STREAM_PREMATURE_CLOSE'}; diff --git a/test/stream.js b/test/stream.js new file mode 100644 index 0000000..721ac81 --- /dev/null +++ b/test/stream.js @@ -0,0 +1,313 @@ +import {once} from 'node:events'; +import {version} from 'node:process'; +import {Readable, Duplex} from 'node:stream'; +import {finished} from 'node:stream/promises'; +import {scheduler, setTimeout as pSetTimeout} from 'node:timers/promises'; +import test from 'ava'; +import onetime from 'onetime'; +import getStream, {getStreamAsArray, MaxBufferError} from '../source/index.js'; +import {fixtureString, fixtureMultiString, prematureClose} from './fixtures/index.js'; + +const onFinishedStream = stream => finished(stream, {cleanup: true}); +const noopMethods = {read() {}, write() {}}; + +// eslint-disable-next-line max-params +const assertStream = ({readableEnded = false, writableEnded = false}, t, stream, StreamClass, error = null) => { + t.is(stream.errored, error); + t.true(stream.destroyed); + t.false(stream.readable); + t.is(stream.readableEnded, readableEnded); + + if (StreamClass === Duplex) { + t.false(stream.writable); + t.is(stream.writableEnded, writableEnded); + } +}; + +const assertSuccess = assertStream.bind(undefined, {readableEnded: true, writableEnded: true}); +const assertReadFail = assertStream.bind(undefined, {writableEnded: true}); +const assertWriteFail = assertStream.bind(undefined, {readableEnded: true}); +const assertBothFail = assertStream.bind(undefined, {}); + +const testSuccess = async (t, StreamClass) => { + const stream = StreamClass.from(fixtureMultiString); + t.true(stream instanceof StreamClass); + + t.deepEqual(await getStreamAsArray(stream), fixtureMultiString); + assertSuccess(t, stream, StreamClass); +}; + +test('Can use Readable stream', testSuccess, Readable); +test('Can use Duplex stream', testSuccess, Duplex); + +const testAlreadyEnded = async (t, StreamClass) => { + const stream = StreamClass.from(fixtureMultiString); + await stream.toArray(); + assertSuccess(t, stream, StreamClass); + + t.deepEqual(await getStreamAsArray(stream), []); +}; + +test('Can use already ended Readable', testAlreadyEnded, Readable); +test('Can use already ended Duplex', testAlreadyEnded, Duplex); + +const testAlreadyAborted = async (t, StreamClass) => { + const stream = StreamClass.from(fixtureMultiString); + stream.destroy(); + await t.throwsAsync(onFinishedStream(stream), prematureClose); + assertReadFail(t, stream, StreamClass); + + const error = await t.throwsAsync(getStreamAsArray(stream), prematureClose); + t.deepEqual(error.bufferedData, []); +}; + +test('Throw if already aborted Readable', testAlreadyAborted, Readable); +test('Throw if already aborted Duplex', testAlreadyAborted, Duplex); + +const testAlreadyErrored = async (t, StreamClass) => { + const stream = StreamClass.from(fixtureMultiString); + const error = new Error('test'); + stream.destroy(error); + t.is(await t.throwsAsync(onFinishedStream(stream)), error); + assertReadFail(t, stream, StreamClass, error); + + t.is(await t.throwsAsync(getStreamAsArray(stream)), error); + t.deepEqual(error.bufferedData, []); +}; + +test('Throw if already errored Readable', testAlreadyErrored, Readable); +test('Throw if already errored Duplex', testAlreadyErrored, Duplex); + +const testAbort = async (t, StreamClass) => { + const stream = new StreamClass(noopMethods); + setTimeout(() => { + stream.destroy(); + }, 0); + const error = await t.throwsAsync(getStreamAsArray(stream), prematureClose); + t.deepEqual(error.bufferedData, []); + assertBothFail(t, stream, StreamClass); +}; + +test('Throw when aborting Readable', testAbort, Readable); +test('Throw when aborting Duplex', testAbort, Duplex); + +const testError = async (t, StreamClass) => { + const stream = new StreamClass(noopMethods); + const error = new Error('test'); + setTimeout(() => { + stream.destroy(error); + }, 0); + t.is(await t.throwsAsync(getStreamAsArray(stream)), error); + t.deepEqual(error.bufferedData, []); + assertBothFail(t, stream, StreamClass, error); +}; + +test('Throw when erroring Readable', testError, Readable); +test('Throw when erroring Duplex', testError, Duplex); + +const testErrorEvent = async (t, StreamClass) => { + const stream = new StreamClass(noopMethods); + const error = new Error('test'); + setTimeout(() => { + stream.emit('error', error); + }, 0); + t.is(await t.throwsAsync(getStreamAsArray(stream)), error); + t.deepEqual(error.bufferedData, []); + assertBothFail(t, stream, StreamClass); +}; + +test('Throw when emitting "error" event with Readable', testErrorEvent, Readable); +test('Throw when emitting "error" event with Duplex', testErrorEvent, Duplex); + +const testThrowRead = async (t, StreamClass) => { + const error = new Error('test'); + const stream = new StreamClass({ + read() { + throw error; + }, + }); + t.is(await t.throwsAsync(getStreamAsArray(stream)), error); + t.deepEqual(error.bufferedData, []); + assertBothFail(t, stream, StreamClass, error); +}; + +test('Throw when throwing error in Readable read()', testThrowRead, Readable); +test('Throw when throwing error in Duplex read()', testThrowRead, Duplex); + +test('Throw when throwing error in Readable destroy()', async t => { + const error = new Error('test'); + const stream = new Readable({ + read: onetime(function () { + this.push(fixtureString); + this.push(null); + }), + destroy(_, done) { + done(error); + }, + }); + + t.is(await t.throwsAsync(getStream(stream)), error); + t.deepEqual(error.bufferedData, fixtureString); + assertSuccess(t, stream, Readable, error); +}); + +test('Throw when throwing error in Duplex final()', async t => { + const error = new Error('test'); + const stream = new Duplex({ + read: onetime(function () { + this.push(null); + }), + final(done) { + done(error); + }, + }); + stream.end(); + + t.is(await t.throwsAsync(getStream(stream)), error); + t.is(await t.throwsAsync(onFinishedStream(stream)), error); + assertReadFail(t, stream, Duplex, error); +}); + +test('Does not wait for Duplex writable side', async t => { + const error = new Error('test'); + const stream = new Duplex({ + read: onetime(function () { + this.push(null); + }), + destroy(_, done) { + done(error); + }, + }); + + t.is(await getStream(stream), ''); + t.is(await t.throwsAsync(onFinishedStream(stream)), error); + assertWriteFail(t, stream, Duplex, error); +}); + +test('Handle non-error instances', async t => { + const stream = Readable.from(fixtureMultiString); + const errorMessage = `< ${fixtureString} >`; + stream.destroy(errorMessage); + const [{reason}] = await Promise.allSettled([onFinishedStream(stream)]); + t.is(reason, errorMessage); + assertReadFail(t, stream, Readable, errorMessage); + + await t.throwsAsync(getStreamAsArray(stream), {message: errorMessage}); +}); + +test('Handles objectMode errors', async t => { + const stream = new Readable({ + read: onetime(function () { + this.push(fixtureString); + this.push({}); + }), + objectMode: true, + }); + + const error = await t.throwsAsync(getStream(stream), {message: /in object mode/}); + t.is(error.bufferedData, fixtureString); + assertReadFail(t, stream, Readable); +}); + +test('Handles maxBuffer errors', async t => { + const stream = new Readable({ + read: onetime(function () { + this.push(fixtureString); + this.push(fixtureString); + }), + }); + + const error = await t.throwsAsync( + getStream(stream, {maxBuffer: fixtureString.length}), + {instanceOf: MaxBufferError}, + ); + t.is(error.bufferedData, fixtureString); + assertReadFail(t, stream, Readable); +}); + +test('Works if Duplex readable side ends before its writable side', async t => { + const stream = new Duplex(noopMethods); + stream.push(null); + + t.deepEqual(await getStreamAsArray(stream), []); + assertWriteFail(t, stream, Duplex); +}); + +test('Cleans up event listeners', async t => { + const stream = Readable.from([]); + t.is(stream.listenerCount('error'), 0); + + t.deepEqual(await getStreamAsArray(stream), []); + + t.is(stream.listenerCount('error'), 0); +}); + +const testMultipleReads = async (t, wait) => { + const size = 10; + const stream = new Readable({ + read: onetime(async function () { + for (let index = 0; index < size; index += 1) { + for (let index = 0; index < size; index += 1) { + this.push(fixtureString); + } + + // eslint-disable-next-line no-await-in-loop + await wait(); + } + + this.push(null); + }), + }); + + t.is(await getStream(stream), fixtureString.repeat(size * size)); + assertSuccess(t, stream, Readable); +}; + +test('Handles multiple successive fast reads', testMultipleReads, () => scheduler.yield()); +test('Handles multiple successive slow reads', testMultipleReads, () => pSetTimeout(100)); + +// The `highWaterMark` option was added to `once()` by Node 20. +// See https://github.com/nodejs/node/pull/41276 +const nodeMajor = version.split('.')[0].slice(1); +if (nodeMajor >= 20) { + test('Pause stream when too much data at once', async t => { + const stream = new Readable({ + read: onetime(function () { + this.push('.'); + this.push('.'); + this.push('.'); + this.push('.'); + this.push(null); + }), + highWaterMark: 2, + }); + const [result] = await Promise.all([ + getStream(stream), + once(stream, 'pause'), + ]); + t.is(result, '....'); + assertSuccess(t, stream, Readable); + }); +} + +test('Can call twice at the same time', async t => { + const stream = Readable.from(fixtureMultiString); + const [result, secondResult] = await Promise.all([ + getStream(stream), + getStream(stream), + ]); + t.deepEqual(result, fixtureString); + t.deepEqual(secondResult, fixtureString); + assertSuccess(t, stream, Readable); +}); + +test('Can call and listen to "data" event at the same time', async t => { + const stream = Readable.from([fixtureString]); + const [result, secondResult] = await Promise.all([ + getStream(stream), + once(stream, 'data'), + ]); + t.deepEqual(result, fixtureString); + t.deepEqual(secondResult.toString(), fixtureString); + assertSuccess(t, stream, Readable); +});