From 24105a7f4419b9906930e35f43bf4fd7d895216e Mon Sep 17 00:00:00 2001 From: WilliamConnatser <43946230+WilliamConnatser@users.noreply.github.com> Date: Sun, 21 Jun 2020 12:37:18 -0500 Subject: [PATCH] doc: piping from async generators using pipeline() PR-URL: https://github.com/nodejs/node/pull/33992 Reviewed-By: Anna Henningsen Reviewed-By: Luigi Pinca Reviewed-By: Robert Nagy Reviewed-By: Matteo Collina Reviewed-By: Trivikram Kamat Reviewed-By: James M Snell --- doc/api/stream.md | 87 +++++++++++------------------------------------ 1 file changed, 20 insertions(+), 67 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index b5b4e2db8a25cc..e935351aeb6b79 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2731,80 +2731,33 @@ readable.on('data', (chunk) => { #### Piping to Writable Streams from Async Iterators -In the scenario of writing to a writable stream from an async iterator, ensure -the correct handling of backpressure and errors. +When writing to a writable stream from an async iterator, ensure correct +handling of backpressure and errors. [`stream.pipeline()`][] abstracts away +the handling of backpressure and backpressure-related errors: ```js -const { once } = require('events'); -const finished = util.promisify(stream.finished); +const { pipeline } = require('stream'); +const util = require('util'); +const fs = require('fs'); const writable = fs.createWriteStream('./file'); -function drain(writable) { - if (writable.destroyed) { - return Promise.reject(new Error('premature close')); - } - return Promise.race([ - once(writable, 'drain'), - once(writable, 'close') - .then(() => Promise.reject(new Error('premature close'))) - ]); -} - -async function pump(iterable, writable) { - for await (const chunk of iterable) { - // Handle backpressure on write(). - if (!writable.write(chunk)) { - await drain(writable); - } +// Callback Pattern +pipeline(iterator, writable, (err, value) => { + if (err) { + console.error(err); + } else { + console.log(value, 'value returned'); } - writable.end(); -} - -(async function() { - // Ensure completion without errors. - await Promise.all([ - pump(iterable, writable), - finished(writable) - ]); -})(); -``` - -In the above, errors on `write()` would be caught and thrown by the -`once()` listener for the `'drain'` event, since `once()` will also handle the -`'error'` event. To ensure completion of the write stream without errors, -it is safer to use the `finished()` method as above, instead of using the -`once()` listener for the `'finish'` event. Under certain cases, an `'error'` -event could be emitted by the writable stream after `'finish'` and as `once()` -will release the `'error'` handler on handling the `'finish'` event, it could -result in an unhandled error. - -Alternatively, the readable stream could be wrapped with `Readable.from()` and -then piped via `.pipe()`: - -```js -const finished = util.promisify(stream.finished); - -const writable = fs.createWriteStream('./file'); - -(async function() { - const readable = Readable.from(iterable); - readable.pipe(writable); - // Ensure completion without errors. - await finished(writable); -})(); -``` - -Or, using `stream.pipeline()` to pipe streams: - -```js -const pipeline = util.promisify(stream.pipeline); - -const writable = fs.createWriteStream('./file'); +}); -(async function() { - await pipeline(iterable, writable); -})(); +// Promise Pattern +const pipelinePromise = util.promisify(pipeline); +pipelinePromise(iterator, writable) + .then((value) => { + console.log(value, 'value returned'); + }) + .catch(console.error); ```