Skip to content

Commit

Permalink
stream: do not swallow errors with async iterators and pipeline
Browse files Browse the repository at this point in the history
Before this patch, pipeline() could swallow errors by pre-emptively
producing a ERR_STREAM_PREMATURE_CLOSE that was not really helpful
to the user.
  • Loading branch information
mcollina committed Mar 3, 2020
1 parent 7cafd5f commit 89b2368
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 3 deletions.
18 changes: 15 additions & 3 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
const {
ArrayIsArray,
SymbolAsyncIterator,
SymbolIterator
SymbolIterator,
Symbol
} = primordials;

let eos;
Expand All @@ -21,6 +22,8 @@ const {
ERR_STREAM_DESTROYED
} = require('internal/errors').codes;

const kSkipPrematureClose = Symbol('kSkipPrematureClose');

let EE;
let PassThrough;
let createReadableStreamAsyncIterator;
Expand Down Expand Up @@ -159,9 +162,17 @@ function pipeline(...streams) {
}

function wrap(stream, reading, writing, final) {
destroys.push(destroyer(stream, reading, writing, (err) => {
const fn = destroyer(stream, reading, writing, (err) => {

// We need to skip the premature close error here to avoid swallowing
// any errors in the downstream async iterator.
if (err && err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
stream[kSkipPrematureClose]) {
err = undefined;
}
finish(err, final);
}));
});
destroys.push(fn);
}

let ret;
Expand Down Expand Up @@ -190,6 +201,7 @@ function pipeline(...streams) {
}
} else if (typeof stream === 'function') {
ret = makeAsyncIterable(ret);
ret[kSkipPrematureClose] = true;
ret = stream(ret);

if (reading) {
Expand Down
27 changes: 27 additions & 0 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -938,3 +938,30 @@ const { promisify } = require('util');
r.push(null);
r.emit('close');
}

{
let res = '';
const rs = new Readable({
read() {
setImmediate(() => {
rs.push('hello');
});
}
});
const ws = new Writable({
write: common.mustNotCall()
});
pipeline(rs, async function*(stream) {
/* eslint no-unused-vars: off */
for await (const chunk of stream) {
throw new Error('kaboom');
}
}, async function *(source) {
for await (const chunk of source) {
res += chunk;
}
}, ws, common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(res, '');
}));
}

0 comments on commit 89b2368

Please sign in to comment.