From cb44781371d27093f9d8dabcf1eae7b5638dc728 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 18 Jun 2021 08:08:50 +0200 Subject: [PATCH] stream: add stream.compose MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refs: #32020 PR-URL: https://github.com/nodejs/node/pull/39029 Reviewed-By: Matteo Collina Reviewed-By: Luigi Pinca Reviewed-By: Michaƫl Zasso Backport-PR-URL: https://github.com/nodejs/node/pull/39563 --- doc/api/stream.md | 89 +++++ lib/internal/streams/compose.js | 196 +++++++++++ lib/internal/streams/duplexify.js | 19 +- lib/internal/streams/end-of-stream.js | 2 + lib/internal/streams/pipeline.js | 17 +- lib/internal/streams/utils.js | 116 ++++++- lib/stream.js | 4 + lib/stream/promises.js | 4 +- test/parallel/test-bootstrap-modules.js | 1 + test/parallel/test-stream-compose.js | 425 ++++++++++++++++++++++++ 10 files changed, 829 insertions(+), 44 deletions(-) create mode 100644 lib/internal/streams/compose.js create mode 100644 test/parallel/test-stream-compose.js diff --git a/doc/api/stream.md b/doc/api/stream.md index c4c31e116e1aee..de2e7db7ea411a 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1933,6 +1933,95 @@ run().catch(console.error); after the `callback` has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors. +### `stream.compose(...streams)` + + +> Stability: 1 - `stream.compose` is experimental. + +* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]} +* Returns: {stream.Duplex} + +Combines two or more streams into a `Duplex` stream that writes to the +first stream and reads from the last. Each provided stream is piped into +the next, using `stream.pipeline`. If any of the streams error then all +are destroyed, including the outer `Duplex` stream. + +Because `stream.compose` returns a new stream that in turn can (and +should) be piped into other streams, it enables composition. In contrast, +when passing streams to `stream.pipeline`, typically the first stream is +a readable stream and the last a writable stream, forming a closed +circuit. + +If passed a `Function` it must be a factory method taking a `source` +`Iterable`. + +```mjs +import { compose, Transform } from 'stream'; + +const removeSpaces = new Transform({ + transform(chunk, encoding, callback) { + callback(null, String(chunk).replace(' ', '')); + } +}); + +async function* toUpper(source) { + for await (const chunk of source) { + yield String(chunk).toUpperCase(); + } +} + +let res = ''; +for await (const buf of compose(removeSpaces, toUpper).end('hello world')) { + res += buf; +} + +console.log(res); // prints 'HELLOWORLD' +``` + +`stream.compose` can be used to convert async iterables, generators and +functions into streams. + +* `AsyncIterable` converts into a readable `Duplex`. Cannot yield + `null`. +* `AsyncGeneratorFunction` converts into a readable/writable transform `Duplex`. + Must take a source `AsyncIterable` as first parameter. Cannot yield + `null`. +* `AsyncFunction` converts into a writable `Duplex`. Must return + either `null` or `undefined`. + +```mjs +import { compose } from 'stream'; +import { finished } from 'stream/promises'; + +// Convert AsyncIterable into readable Duplex. +const s1 = compose(async function*() { + yield 'Hello'; + yield 'World'; +}()); + +// Convert AsyncGenerator into transform Duplex. +const s2 = compose(async function*(source) { + for await (const chunk of source) { + yield String(chunk).toUpperCase(); + } +}); + +let res = ''; + +// Convert AsyncFunction into writable Duplex. +const s3 = compose(async function(source) { + for await (const chunk of source) { + res += chunk; + } +}); + +await finished(compose(s1, s2, s3)); + +console.log(res); // prints 'HELLOWORLD' +``` + ### `stream.Readable.from(iterable, [options])`