Skip to content

Commit

Permalink
stream: add null push transform in async_iterator
Browse files Browse the repository at this point in the history
when the readable side of a transform ends any for await
loop on that transform stream should also complete. This
fix prevents for await loop on a transform stream
from hanging indefinitely.

PR-URL: nodejs#28566
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Jeremiah Senkpiel <fishrock123@rocketmail.com>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information
David Mark Clements authored and Trott committed Jul 21, 2019
1 parent f02dfdb commit 3499741
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 2 deletions.
2 changes: 1 addition & 1 deletion lib/internal/streams/async_iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ const createReadableStreamAsyncIterator = (stream) => {
});
iterator[kLastPromise] = null;

finished(stream, (err) => {
finished(stream, { writable: false }, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
const reject = iterator[kLastReject];
// Reject if we are waiting for data in the Promise returned by next() and
Expand Down
26 changes: 25 additions & 1 deletion test/parallel/test-stream-readable-async-iterators.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict';

const common = require('../common');
const { Readable, PassThrough, pipeline } = require('stream');
const { Readable, Transform, PassThrough, pipeline } = require('stream');
const assert = require('assert');

async function tests() {
Expand Down Expand Up @@ -396,6 +396,30 @@ async function tests() {
}
}

{
console.log('readable side of a transform stream pushes null');
const transform = new Transform({
objectMode: true,
transform: (chunk, enc, cb) => { cb(null, chunk); }
});
transform.push(0);
transform.push(1);
process.nextTick(() => {
transform.push(null);
});

const mustReach = [ common.mustCall(), common.mustCall() ];

const iter = transform[Symbol.asyncIterator]();
assert.strictEqual((await iter.next()).value, 0);

for await (const d of iter) {
assert.strictEqual(d, 1);
mustReach[0]();
}
mustReach[1]();
}

{
console.log('all next promises must be resolved on end');
const r = new Readable({
Expand Down

0 comments on commit 3499741

Please sign in to comment.