Skip to content

Commit

Permalink
stream: preserve object mode in compose
Browse files Browse the repository at this point in the history
Fixes: #46829

PR-URL: #47413
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
rluvaton authored and targos committed May 2, 2023
1 parent 8d43689 commit 2b411f4
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 1 deletion.
2 changes: 1 addition & 1 deletion lib/internal/streams/compose.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ module.exports = function compose(...streams) {
d = new Duplex({
// TODO (ronag): highWaterMark?
writableObjectMode: !!head?.writableObjectMode,
readableObjectMode: !!tail?.writableObjectMode,
readableObjectMode: !!tail?.readableObjectMode,
writable,
readable,
});
Expand Down
74 changes: 74 additions & 0 deletions test/parallel/test-stream-compose.js
Original file line number Diff line number Diff line change
Expand Up @@ -420,3 +420,77 @@ const assert = require('assert');
assert.strictEqual(buf, 'HELLOWORLD');
}));
}

{
// In the new stream than should use the writeable of the first stream and readable of the last stream
// #46829
(async () => {
const newStream = compose(
new PassThrough({
// reading FROM you in object mode or not
readableObjectMode: false,

// writing TO you in object mode or not
writableObjectMode: false,
}),
new Transform({
// reading FROM you in object mode or not
readableObjectMode: true,

// writing TO you in object mode or not
writableObjectMode: false,
transform: (chunk, encoding, callback) => {
callback(null, {
value: chunk.toString()
});
}
})
);

assert.strictEqual(newStream.writableObjectMode, false);
assert.strictEqual(newStream.readableObjectMode, true);

newStream.write('Steve Rogers');
newStream.write('On your left');

newStream.end();

assert.deepStrictEqual(await newStream.toArray(), [{ value: 'Steve Rogers' }, { value: 'On your left' }]);
})().then(common.mustCall());
}

{
// In the new stream than should use the writeable of the first stream and readable of the last stream
// #46829
(async () => {
const newStream = compose(
new PassThrough({
// reading FROM you in object mode or not
readableObjectMode: true,

// writing TO you in object mode or not
writableObjectMode: true,
}),
new Transform({
// reading FROM you in object mode or not
readableObjectMode: false,

// writing TO you in object mode or not
writableObjectMode: true,
transform: (chunk, encoding, callback) => {
callback(null, chunk.value);
}
})
);

assert.strictEqual(newStream.writableObjectMode, true);
assert.strictEqual(newStream.readableObjectMode, false);

newStream.write({ value: 'Steve Rogers' });
newStream.write({ value: 'On your left' });

newStream.end();

assert.deepStrictEqual(await newStream.toArray(), [Buffer.from('Steve RogersOn your left')]);
})().then(common.mustCall());
}

0 comments on commit 2b411f4

Please sign in to comment.