-
-
Notifications
You must be signed in to change notification settings - Fork 32
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
322 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
import {isReadableStream} from 'is-stream'; | ||
|
||
export const getAsyncIterable = stream => { | ||
if (isReadableStream(stream, {checkOpen: false})) { | ||
return getStreamIterable(stream); | ||
} | ||
|
||
if (stream?.[Symbol.asyncIterator] === undefined) { | ||
throw new Error('The first argument must be a Readable, a ReadableStream, or an async iterable.'); | ||
} | ||
|
||
return stream; | ||
}; | ||
|
||
// The default iterable for Node.js streams does not allow for multiple readers at once, so we re-implement it | ||
const getStreamIterable = async function * (stream) { | ||
const {on, finished} = await getNodeImports(); | ||
const onStreamData = on(stream, 'data', {highWatermark: stream.readableHighWaterMark}); | ||
handleStreamEnd(stream, onStreamData, finished); | ||
try { | ||
for await (const [chunk] of onStreamData) { | ||
yield chunk; | ||
} | ||
} finally { | ||
stream.destroy(); | ||
} | ||
}; | ||
|
||
const handleStreamEnd = async (stream, onStreamData, finished) => { | ||
try { | ||
await finished(stream, {cleanup: true, readable: true, writable: false}); | ||
await onStreamData.return(); | ||
} catch (error) { | ||
const normalizedError = error instanceof Error ? error : new Error(String(error)); | ||
await onStreamData.throw(normalizedError); | ||
} | ||
}; | ||
|
||
// Use dynamic imports to support browsers | ||
const getNodeImports = async () => { | ||
const [{on}, {finished}] = await Promise.all([ | ||
import('node:events'), | ||
import('node:stream/promises'), | ||
]); | ||
return {on, finished}; | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,265 @@ | ||
import {once} from 'node:events'; | ||
import {Readable, Duplex} from 'node:stream'; | ||
import {finished} from 'node:stream/promises'; | ||
import {scheduler, setTimeout as pSetTimeout} from 'node:timers/promises'; | ||
import test from 'ava'; | ||
import getStream, {getStreamAsArray, MaxBufferError} from '../source/index.js'; | ||
|
||
const foobarString = 'foobar'; | ||
const foobarArray = ['foo', 'bar']; | ||
const prematureClose = {code: 'ERR_STREAM_PREMATURE_CLOSE'}; | ||
|
||
const onFinishedStream = stream => finished(stream, {cleanup: true}); | ||
const noopMethods = {read() {}, write() {}}; | ||
|
||
// eslint-disable-next-line max-params | ||
const assertStream = ({readableEnded = false, writableEnded = false}, t, stream, StreamClass, error = null) => { | ||
t.is(stream.errored, error); | ||
t.true(stream.destroyed); | ||
t.false(stream.readable); | ||
t.is(stream.readableEnded, readableEnded); | ||
|
||
if (StreamClass === Duplex) { | ||
t.false(stream.writable); | ||
t.is(stream.writableEnded, writableEnded); | ||
} | ||
}; | ||
|
||
const assertSuccess = assertStream.bind(undefined, {readableEnded: true, writableEnded: true}); | ||
const assertReadFail = assertStream.bind(undefined, {writableEnded: true}); | ||
const assertWriteFail = assertStream.bind(undefined, {readableEnded: true}); | ||
const assertBothFail = assertStream.bind(undefined, {}); | ||
|
||
const testSuccess = async (t, StreamClass) => { | ||
const stream = StreamClass.from(foobarArray); | ||
t.true(stream instanceof StreamClass); | ||
|
||
t.deepEqual(await getStreamAsArray(stream), foobarArray); | ||
assertSuccess(t, stream, StreamClass); | ||
}; | ||
|
||
test('Can use Readable stream', testSuccess, Readable); | ||
test('Can use Duplex stream', testSuccess, Duplex); | ||
|
||
const testAlreadyEnded = async (t, StreamClass) => { | ||
const stream = StreamClass.from(foobarArray); | ||
await stream.toArray(); | ||
assertSuccess(t, stream, StreamClass); | ||
|
||
t.deepEqual(await getStreamAsArray(stream), []); | ||
}; | ||
|
||
test('Can use already ended Readable', testAlreadyEnded, Readable); | ||
test('Can use already ended Duplex', testAlreadyEnded, Duplex); | ||
|
||
const testAlreadyAborted = async (t, StreamClass) => { | ||
const stream = StreamClass.from(foobarArray); | ||
stream.destroy(); | ||
await t.throwsAsync(onFinishedStream(stream), prematureClose); | ||
assertReadFail(t, stream, StreamClass); | ||
|
||
const error = await t.throwsAsync(getStreamAsArray(stream), prematureClose); | ||
t.deepEqual(error.bufferedData, []); | ||
}; | ||
|
||
test('Throw if already aborted Readable', testAlreadyAborted, Readable); | ||
test('Throw if already aborted Duplex', testAlreadyAborted, Duplex); | ||
|
||
const testAlreadyErrored = async (t, StreamClass) => { | ||
const stream = StreamClass.from(foobarArray); | ||
const error = new Error('test'); | ||
stream.destroy(error); | ||
t.is(await t.throwsAsync(onFinishedStream(stream)), error); | ||
assertReadFail(t, stream, StreamClass, error); | ||
|
||
t.is(await t.throwsAsync(getStreamAsArray(stream)), error); | ||
t.deepEqual(error.bufferedData, []); | ||
}; | ||
|
||
test('Throw if already errored Readable', testAlreadyErrored, Readable); | ||
test('Throw if already errored Duplex', testAlreadyErrored, Duplex); | ||
|
||
const testAbort = async (t, StreamClass) => { | ||
const stream = new StreamClass(noopMethods); | ||
setTimeout(() => { | ||
stream.destroy(); | ||
}, 0); | ||
const error = await t.throwsAsync(getStreamAsArray(stream), prematureClose); | ||
t.deepEqual(error.bufferedData, []); | ||
assertBothFail(t, stream, StreamClass); | ||
}; | ||
|
||
test('Throw when aborting Readable', testAbort, Readable); | ||
test('Throw when aborting Duplex', testAbort, Duplex); | ||
|
||
const testError = async (t, StreamClass) => { | ||
const stream = new StreamClass(noopMethods); | ||
const error = new Error('test'); | ||
setTimeout(() => { | ||
stream.destroy(error); | ||
}, 0); | ||
t.is(await t.throwsAsync(getStreamAsArray(stream)), error); | ||
t.deepEqual(error.bufferedData, []); | ||
assertBothFail(t, stream, StreamClass, error); | ||
}; | ||
|
||
test('Throw when erroring Readable', testError, Readable); | ||
test('Throw when erroring Duplex', testError, Duplex); | ||
|
||
const testErrorEvent = async (t, StreamClass) => { | ||
const stream = new StreamClass(noopMethods); | ||
const error = new Error('test'); | ||
setTimeout(() => { | ||
stream.emit('error', error); | ||
}, 0); | ||
t.is(await t.throwsAsync(getStreamAsArray(stream)), error); | ||
t.deepEqual(error.bufferedData, []); | ||
assertBothFail(t, stream, StreamClass); | ||
}; | ||
|
||
test('Throw when emitting "error" event with Readable', testErrorEvent, Readable); | ||
test('Throw when emitting "error" event with Duplex', testErrorEvent, Duplex); | ||
|
||
const testThrowRead = async (t, StreamClass) => { | ||
const error = new Error('test'); | ||
const stream = new StreamClass({ | ||
read() { | ||
throw error; | ||
}, | ||
}); | ||
t.is(await t.throwsAsync(getStreamAsArray(stream)), error); | ||
t.deepEqual(error.bufferedData, []); | ||
assertBothFail(t, stream, StreamClass, error); | ||
}; | ||
|
||
test('Throw when throwing error in Readable read()', testThrowRead, Readable); | ||
test('Throw when throwing error in Duplex read()', testThrowRead, Duplex); | ||
|
||
test('Handle non-error instances', async t => { | ||
const stream = Readable.from(foobarArray); | ||
const errorMessage = `< ${foobarString} >`; | ||
stream.destroy(errorMessage); | ||
const [{reason}] = await Promise.allSettled([onFinishedStream(stream)]); | ||
t.is(reason, errorMessage); | ||
assertReadFail(t, stream, Readable, errorMessage); | ||
|
||
await t.throwsAsync(getStreamAsArray(stream), {message: errorMessage}); | ||
}); | ||
|
||
test('Handles objectMode errors', async t => { | ||
const stream = new Readable({ | ||
read() { | ||
this.push(foobarString); | ||
this.push({}); | ||
}, | ||
objectMode: true, | ||
}); | ||
|
||
const error = await t.throwsAsync(getStream(stream), {message: /in object mode/}); | ||
t.is(error.bufferedData, foobarString); | ||
assertReadFail(t, stream, Readable); | ||
}); | ||
|
||
test('Handles maxBuffer errors', async t => { | ||
const stream = new Readable({ | ||
read() { | ||
this.push(foobarString); | ||
this.push(foobarString); | ||
}, | ||
}); | ||
|
||
const error = await t.throwsAsync( | ||
getStream(stream, {maxBuffer: foobarString.length}), | ||
{instanceOf: MaxBufferError}, | ||
); | ||
t.is(error.bufferedData, foobarString); | ||
assertReadFail(t, stream, Readable); | ||
}); | ||
|
||
test('Works if Duplex readable side ends before its writable side', async t => { | ||
const stream = new Duplex(noopMethods); | ||
stream.push(null); | ||
|
||
t.deepEqual(await getStreamAsArray(stream), []); | ||
assertWriteFail(t, stream, Duplex); | ||
}); | ||
|
||
test('Cleans up event listeners', async t => { | ||
const stream = Readable.from([]); | ||
t.is(stream.listenerCount('error'), 0); | ||
|
||
t.deepEqual(await getStreamAsArray(stream), []); | ||
|
||
t.is(stream.listenerCount('error'), 0); | ||
}); | ||
|
||
const testMultipleReads = async (t, wait) => { | ||
let once = false; | ||
const size = 10; | ||
const stream = new Readable({ | ||
async read() { | ||
if (once) { | ||
return; | ||
} | ||
|
||
once = true; | ||
for (let index = 0; index < size; index += 1) { | ||
for (let index = 0; index < size; index += 1) { | ||
this.push(foobarString); | ||
} | ||
|
||
// eslint-disable-next-line no-await-in-loop | ||
await wait(); | ||
} | ||
|
||
this.push(null); | ||
}, | ||
}); | ||
|
||
t.is(await getStream(stream), foobarString.repeat(size * size)); | ||
assertSuccess(t, stream, Readable); | ||
}; | ||
|
||
test('Handles multiple successive fast reads', testMultipleReads, () => scheduler.yield()); | ||
test('Handles multiple successive slow reads', testMultipleReads, () => pSetTimeout(100)); | ||
|
||
test('Pause stream when too much data at once', async t => { | ||
const stream = new Readable({ | ||
read() { | ||
this.push('.'); | ||
this.push('.'); | ||
this.push('.'); | ||
this.push('.'); | ||
this.push(null); | ||
}, | ||
highWaterMark: 2, | ||
}); | ||
const [result] = await Promise.all([ | ||
getStream(stream), | ||
once(stream, 'pause'), | ||
]); | ||
t.is(result, '....'); | ||
assertSuccess(t, stream, Readable); | ||
}); | ||
|
||
test('Can call twice at the same time', async t => { | ||
const stream = Readable.from(foobarArray); | ||
const [result, secondResult] = await Promise.all([ | ||
getStream(stream), | ||
getStream(stream), | ||
]); | ||
t.deepEqual(result, foobarString); | ||
t.deepEqual(secondResult, foobarString); | ||
assertSuccess(t, stream, Readable); | ||
}); | ||
|
||
test('Can call and listen to "data" event at the same time', async t => { | ||
const stream = Readable.from([foobarString]); | ||
const [result, secondResult] = await Promise.all([ | ||
getStream(stream), | ||
once(stream, 'data'), | ||
]); | ||
t.deepEqual(result, foobarString); | ||
t.deepEqual(secondResult.toString(), foobarString); | ||
assertSuccess(t, stream, Readable); | ||
}); |