Skip to content

Commit

Permalink
Allow multiple readers at once
Browse files Browse the repository at this point in the history
  • Loading branch information
ehmicky committed Mar 10, 2024
1 parent 7ccab70 commit e486c9c
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 6 deletions.
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
"object",
"concat"
],
"dependencies": {
"is-stream": "^4.0.1"
},
"devDependencies": {
"@types/node": "^20.8.9",
"ava": "^5.3.1",
Expand Down
10 changes: 4 additions & 6 deletions source/contents.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import {getAsyncIterable} from './stream.js';

export const getStreamContents = async (stream, {init, convertChunk, getSize, truncateChunk, addChunk, getFinalChunk, finalize}, {maxBuffer = Number.POSITIVE_INFINITY} = {}) => {
if (!isAsyncIterable(stream)) {
throw new Error('The first argument must be a Readable, a ReadableStream, or an async iterable.');
}
const asyncIterable = getAsyncIterable(stream);

const state = init();
state.length = 0;

try {
for await (const chunk of stream) {
for await (const chunk of asyncIterable) {
const chunkType = getChunkType(chunk);
const convertedChunk = convertChunk[chunkType](chunk, state);
appendChunk({convertedChunk, state, getSize, truncateChunk, addChunk, maxBuffer});
Expand Down Expand Up @@ -51,8 +51,6 @@ const addNewChunk = (convertedChunk, state, addChunk, newLength) => {
state.length = newLength;
};

const isAsyncIterable = stream => typeof stream === 'object' && stream !== null && typeof stream[Symbol.asyncIterator] === 'function';

const getChunkType = chunk => {
const typeOfChunk = typeof chunk;

Expand Down
46 changes: 46 additions & 0 deletions source/stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import {isReadableStream} from 'is-stream';

export const getAsyncIterable = stream => {
if (isReadableStream(stream, {checkOpen: false})) {
return getStreamIterable(stream);
}

if (stream?.[Symbol.asyncIterator] === undefined) {
throw new Error('The first argument must be a Readable, a ReadableStream, or an async iterable.');
}

return stream;
};

// The default iterable for Node.js streams does not allow for multiple readers at once, so we re-implement it
const getStreamIterable = async function * (stream) {
const {on, finished} = await getNodeImports();
const onStreamData = on(stream, 'data', {highWatermark: stream.readableHighWaterMark});
handleStreamEnd(stream, onStreamData, finished);
try {
for await (const [chunk] of onStreamData) {
yield chunk;
}
} finally {
stream.destroy();
}
};

const handleStreamEnd = async (stream, onStreamData, finished) => {
try {
await finished(stream, {cleanup: true, readable: true, writable: false});
await onStreamData.return();
} catch (error) {
const normalizedError = error instanceof Error ? error : new Error(String(error));
await onStreamData.throw(normalizedError);
}
};

// Use dynamic imports to support browsers
const getNodeImports = async () => {
const [{on}, {finished}] = await Promise.all([
import('node:events'),
import('node:stream/promises'),
]);
return {on, finished};
};
2 changes: 2 additions & 0 deletions test/fixtures/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,5 @@ export const fixtureMultibyteString = '\u1000';
export const longMultibyteString = `${fixtureMultibyteString}\u1000`;

export const bigArray = Array.from({length: 1e6}, () => Math.floor(Math.random() * (2 ** 8)));

export const prematureClose = {code: 'ERR_STREAM_PREMATURE_CLOSE'};
262 changes: 262 additions & 0 deletions test/stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
import {once} from 'node:events';
import {Readable, Duplex} from 'node:stream';
import {finished} from 'node:stream/promises';
import {scheduler, setTimeout as pSetTimeout} from 'node:timers/promises';
import test from 'ava';
import getStream, {getStreamAsArray, MaxBufferError} from '../source/index.js';
import {fixtureString, fixtureMultiString, prematureClose} from './fixtures/index.js';

const onFinishedStream = stream => finished(stream, {cleanup: true});
const noopMethods = {read() {}, write() {}};

// eslint-disable-next-line max-params
const assertStream = ({readableEnded = false, writableEnded = false}, t, stream, StreamClass, error = null) => {
t.is(stream.errored, error);
t.true(stream.destroyed);
t.false(stream.readable);
t.is(stream.readableEnded, readableEnded);

if (StreamClass === Duplex) {
t.false(stream.writable);
t.is(stream.writableEnded, writableEnded);
}
};

const assertSuccess = assertStream.bind(undefined, {readableEnded: true, writableEnded: true});
const assertReadFail = assertStream.bind(undefined, {writableEnded: true});
const assertWriteFail = assertStream.bind(undefined, {readableEnded: true});
const assertBothFail = assertStream.bind(undefined, {});

const testSuccess = async (t, StreamClass) => {
const stream = StreamClass.from(fixtureMultiString);
t.true(stream instanceof StreamClass);

t.deepEqual(await getStreamAsArray(stream), fixtureMultiString);
assertSuccess(t, stream, StreamClass);
};

test('Can use Readable stream', testSuccess, Readable);
test('Can use Duplex stream', testSuccess, Duplex);

const testAlreadyEnded = async (t, StreamClass) => {
const stream = StreamClass.from(fixtureMultiString);
await stream.toArray();
assertSuccess(t, stream, StreamClass);

t.deepEqual(await getStreamAsArray(stream), []);
};

test('Can use already ended Readable', testAlreadyEnded, Readable);
test('Can use already ended Duplex', testAlreadyEnded, Duplex);

const testAlreadyAborted = async (t, StreamClass) => {
const stream = StreamClass.from(fixtureMultiString);
stream.destroy();
await t.throwsAsync(onFinishedStream(stream), prematureClose);
assertReadFail(t, stream, StreamClass);

const error = await t.throwsAsync(getStreamAsArray(stream), prematureClose);
t.deepEqual(error.bufferedData, []);
};

test('Throw if already aborted Readable', testAlreadyAborted, Readable);
test('Throw if already aborted Duplex', testAlreadyAborted, Duplex);

const testAlreadyErrored = async (t, StreamClass) => {
const stream = StreamClass.from(fixtureMultiString);
const error = new Error('test');
stream.destroy(error);
t.is(await t.throwsAsync(onFinishedStream(stream)), error);
assertReadFail(t, stream, StreamClass, error);

t.is(await t.throwsAsync(getStreamAsArray(stream)), error);
t.deepEqual(error.bufferedData, []);
};

test('Throw if already errored Readable', testAlreadyErrored, Readable);
test('Throw if already errored Duplex', testAlreadyErrored, Duplex);

const testAbort = async (t, StreamClass) => {
const stream = new StreamClass(noopMethods);
setTimeout(() => {
stream.destroy();
}, 0);
const error = await t.throwsAsync(getStreamAsArray(stream), prematureClose);
t.deepEqual(error.bufferedData, []);
assertBothFail(t, stream, StreamClass);
};

test('Throw when aborting Readable', testAbort, Readable);
test('Throw when aborting Duplex', testAbort, Duplex);

const testError = async (t, StreamClass) => {
const stream = new StreamClass(noopMethods);
const error = new Error('test');
setTimeout(() => {
stream.destroy(error);
}, 0);
t.is(await t.throwsAsync(getStreamAsArray(stream)), error);
t.deepEqual(error.bufferedData, []);
assertBothFail(t, stream, StreamClass, error);
};

test('Throw when erroring Readable', testError, Readable);
test('Throw when erroring Duplex', testError, Duplex);

const testErrorEvent = async (t, StreamClass) => {
const stream = new StreamClass(noopMethods);
const error = new Error('test');
setTimeout(() => {
stream.emit('error', error);
}, 0);
t.is(await t.throwsAsync(getStreamAsArray(stream)), error);
t.deepEqual(error.bufferedData, []);
assertBothFail(t, stream, StreamClass);
};

test('Throw when emitting "error" event with Readable', testErrorEvent, Readable);
test('Throw when emitting "error" event with Duplex', testErrorEvent, Duplex);

const testThrowRead = async (t, StreamClass) => {
const error = new Error('test');
const stream = new StreamClass({
read() {
throw error;
},
});
t.is(await t.throwsAsync(getStreamAsArray(stream)), error);
t.deepEqual(error.bufferedData, []);
assertBothFail(t, stream, StreamClass, error);
};

test('Throw when throwing error in Readable read()', testThrowRead, Readable);
test('Throw when throwing error in Duplex read()', testThrowRead, Duplex);

test('Handle non-error instances', async t => {
const stream = Readable.from(fixtureMultiString);
const errorMessage = `< ${fixtureString} >`;
stream.destroy(errorMessage);
const [{reason}] = await Promise.allSettled([onFinishedStream(stream)]);
t.is(reason, errorMessage);
assertReadFail(t, stream, Readable, errorMessage);

await t.throwsAsync(getStreamAsArray(stream), {message: errorMessage});
});

test('Handles objectMode errors', async t => {
const stream = new Readable({
read() {
this.push(fixtureString);
this.push({});
},
objectMode: true,
});

const error = await t.throwsAsync(getStream(stream), {message: /in object mode/});
t.is(error.bufferedData, fixtureString);
assertReadFail(t, stream, Readable);
});

test('Handles maxBuffer errors', async t => {
const stream = new Readable({
read() {
this.push(fixtureString);
this.push(fixtureString);
},
});

const error = await t.throwsAsync(
getStream(stream, {maxBuffer: fixtureString.length}),
{instanceOf: MaxBufferError},
);
t.is(error.bufferedData, fixtureString);
assertReadFail(t, stream, Readable);
});

test('Works if Duplex readable side ends before its writable side', async t => {
const stream = new Duplex(noopMethods);
stream.push(null);

t.deepEqual(await getStreamAsArray(stream), []);
assertWriteFail(t, stream, Duplex);
});

test('Cleans up event listeners', async t => {
const stream = Readable.from([]);
t.is(stream.listenerCount('error'), 0);

t.deepEqual(await getStreamAsArray(stream), []);

t.is(stream.listenerCount('error'), 0);
});

const testMultipleReads = async (t, wait) => {
let once = false;
const size = 10;
const stream = new Readable({
async read() {
if (once) {
return;
}

once = true;
for (let index = 0; index < size; index += 1) {
for (let index = 0; index < size; index += 1) {
this.push(fixtureString);
}

// eslint-disable-next-line no-await-in-loop
await wait();
}

this.push(null);
},
});

t.is(await getStream(stream), fixtureString.repeat(size * size));
assertSuccess(t, stream, Readable);
};

test('Handles multiple successive fast reads', testMultipleReads, () => scheduler.yield());
test('Handles multiple successive slow reads', testMultipleReads, () => pSetTimeout(100));

test('Pause stream when too much data at once', async t => {
const stream = new Readable({
read() {
this.push('.');
this.push('.');
this.push('.');
this.push('.');
this.push(null);
},
highWaterMark: 2,
});
const [result] = await Promise.all([
getStream(stream),
once(stream, 'pause'),
]);
t.is(result, '....');
assertSuccess(t, stream, Readable);
});

test('Can call twice at the same time', async t => {
const stream = Readable.from(fixtureMultiString);
const [result, secondResult] = await Promise.all([
getStream(stream),
getStream(stream),
]);
t.deepEqual(result, fixtureString);
t.deepEqual(secondResult, fixtureString);
assertSuccess(t, stream, Readable);
});

test('Can call and listen to "data" event at the same time', async t => {
const stream = Readable.from([fixtureString]);
const [result, secondResult] = await Promise.all([
getStream(stream),
once(stream, 'data'),
]);
t.deepEqual(result, fixtureString);
t.deepEqual(secondResult.toString(), fixtureString);
assertSuccess(t, stream, Readable);
});

0 comments on commit e486c9c

Please sign in to comment.