From fb7bee46cd5d56143396dca6e7f2c592203096ba Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 18 Jun 2021 08:08:50 +0200 Subject: [PATCH] stream: add stream.compose MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refs: #32020 PR-URL: https://github.com/nodejs/node/pull/39029 Reviewed-By: Matteo Collina Reviewed-By: Luigi Pinca Reviewed-By: Michaƫl Zasso Backport-PR-URL: --- doc/api/stream.md | 42 +++ lib/internal/streams/compose.js | 300 +++++++++++++++++ lib/internal/streams/pipeline.js | 3 - lib/internal/streams/utils.js | 62 +++- lib/stream.js | 22 ++ test/parallel/test-bootstrap-modules.js | 1 + test/parallel/test-stream-compose.js | 425 ++++++++++++++++++++++++ 7 files changed, 851 insertions(+), 4 deletions(-) create mode 100644 lib/internal/streams/compose.js create mode 100644 test/parallel/test-stream-compose.js diff --git a/doc/api/stream.md b/doc/api/stream.md index 8ff36423cb246ee..8ff78db1becd3eb 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1857,6 +1857,48 @@ run().catch(console.error); after the `callback` has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors. +### `stream.compose(...streams)` + + +* `streams` {Stream[]} +* Returns: {stream.Duplex} + +Combines two or more streams into a `Duplex` stream that writes to the +first stream and reads from the last. Each provided stream is piped into +the next, using `stream.pipeline`. If any of the streams error then all +are destroyed, including the outer `Duplex` stream. + +Because `stream.compose` returns a new stream that in turn can (and +should) be piped into other streams, it enables composition. In contrast, +when passing streams to `stream.pipeline`, typically the first stream is +a readable stream and the last a writable stream, forming a closed +circuit. + +```mjs +import { compose, Transform } from 'stream'; + +const removeSpaces = new Transform({ + transform(chunk, encoding, callback) { + callback(null, String(chunk).replace(' ', '')); + } +}); + +const toUpper = new Transform({ + transform(chunk, encoding, callback) { + callback(null, String(chunk).toUpperCase()); + } +}); + +let res = ''; +for await (const buf of compose(removeSpaces, toUpper).end('hello world')) { + res += buf; +} + +console.log(res); // prints 'HELLOWORLD' +``` + ### `stream.Readable.from(iterable, [options])`