Skip to content

Commit

Permalink
streams: add stream.pipelinify
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Jun 30, 2021
1 parent f179eb0 commit bea9efc
Show file tree
Hide file tree
Showing 4 changed files with 355 additions and 0 deletions.
33 changes: 33 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1859,6 +1859,39 @@ run().catch(console.error);
after the `callback` has been invoked. In the case of reuse of streams after
failure, this can cause event listener leaks and swallowed errors.

### `stream.pipelinify(...streams)`
<!-- YAML
added: REPLACEME
-->

* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]}
* Returns: {stream.Duplex}

Combines multiple streams into a `Duplex` stream. This works by
calling `stream.pipeline` on all the passed streams and then creating
a `Duplex` stream which writes to the first passed stream and reads from
the stream returned from `stream.pipeline`.

```js
const { pipelinify, Transform } = require('streams');
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' '));
}
});
const toUpper = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).toUpperCase());
}
});
const removeSpacesAndToUpper = pipelinify(removeSpaces, toUpper);
removeSpacesAndToUpper
.end('hello world')
.on('data', (buf) => {
console.log(buf); // prints 'HELLOWORLD'
});
```

### `stream.Readable.from(iterable, [options])`
<!-- YAML
added:
Expand Down
107 changes: 107 additions & 0 deletions lib/internal/streams/pipelinify.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
'use strict';

const pipeline = require('internal/streams/pipeline');
const Duplex = require('internal/streams/duplex');
const { destroyer } = require('internal/streams/destroy');

module.exports = function pipe(...streams) {
let ondrain;
let onfinish;
let onreadable;
let onclose;
let d;

const r = pipeline(streams, function(err) {
if (onclose) {
onclose(err);
} else if (err) {
d.destroy(err);
}
onclose = null;
});
const w = streams[0];

// TODO (ronag): Avoid double buffering.
// Implement Writable/Readable/Duplex traits.
// See, https://github.com/nodejs/node/pull/33515.
d = new Duplex({
writable: w.writable,
readable: r.readable,
objectMode: w.readableObjectMode
});

if (w.writable) {
d._write = function(chunk, encoding, callback) {
if (w.write(chunk, encoding)) {
callback();
} else {
ondrain = callback;
}
};

d._final = function(callback) {
w.end();
onfinish = callback;
};

w.on('drain', function() {
if (ondrain) {
const cb = ondrain;
ondrain = null;
cb();
}
});

r.on('finish', function() {
if (onfinish) {
const cb = onfinish;
onfinish = null;
cb();
}
});
}

if (r.readable) {
r.on('readable', function() {
if (onreadable) {
const cb = onreadable;
onreadable = null;
cb();
}
});

r.on('end', function() {
d.push(null);
});

d._read = function() {
while (true) {
const buf = r.read();

if (buf === null) {
onreadable = d._read;
return;
}

if (!d.push(buf)) {
return;
}
}
};
}

d._destroy = function(err, callback) {
onreadable = null;
ondrain = null;
onfinish = null;

if (onclose === null) {
callback(err);
} else {
onclose = callback;
destroyer(r, err);
}
};

return d;
};
2 changes: 2 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const {
} = require('internal/util');

const pipeline = require('internal/streams/pipeline');
const pipelinify = require('internal/streams/pipelinify');
const { destroyer } = require('internal/streams/destroy');
const eos = require('internal/streams/end-of-stream');
const internalBuffer = require('internal/buffer');
Expand All @@ -43,6 +44,7 @@ Stream.Duplex = require('internal/streams/duplex');
Stream.Transform = require('internal/streams/transform');
Stream.PassThrough = require('internal/streams/passthrough');
Stream.pipeline = pipeline;
Stream.pipelinify = pipelinify;
const { addAbortSignal } = require('internal/streams/add-abort-signal');
Stream.addAbortSignal = addAbortSignal;
Stream.finished = eos;
Expand Down
213 changes: 213 additions & 0 deletions test/parallel/test-stream-pipelinify.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
'use strict';

const common = require('../common');
const {
Readable,
Transform,
Writable,
pipelinify
} = require('stream');
const assert = require('assert');

{
let res = '';
pipelinify(
new Transform({
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk + chunk);
})
}),
new Transform({
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk.toString().toUpperCase());
})
})
)
.end('asd')
.on('data', common.mustCall((buf) => {
res += buf;
}))
.on('end', common.mustCall(() => {
assert.strictEqual(res, 'ASDASD');
}));
}

{
let res = '';
pipelinify(
Readable.from(['asd']),
new Transform({
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk.toString().toUpperCase());
})
})
)
.on('data', common.mustCall((buf) => {
res += buf;
}))
.on('end', common.mustCall(() => {
assert.strictEqual(res, 'ASD');
}));
}

{
let res = '';
pipelinify(
async function* () {
yield 'asd';
},
new Transform({
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk.toString().toUpperCase());
})
})
)
.on('data', common.mustCall((buf) => {
res += buf;
}))
.on('end', common.mustCall(() => {
assert.strictEqual(res, 'ASD');
}));
}

{
let res = '';
pipelinify(
new Transform({
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk.toString().toUpperCase());
})
}),
async function*(source) {
for await (const chunk of source) {
yield chunk;
}
},
new Writable({
write: common.mustCall((chunk, encoding, callback) => {
res += chunk;
callback(null);
})
})
)
.end('asd')
.on('finish', common.mustCall(() => {
assert.strictEqual(res, 'ASD');
}));
}

{
let res = '';
pipelinify(
new Transform({
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk.toString().toUpperCase());
})
}),
async function*(source) {
for await (const chunk of source) {
yield chunk;
}
},
async function(source) {
for await (const chunk of source) {
res += chunk;
}
}
)
.end('asd')
.on('finish', common.mustCall(() => {
assert.strictEqual(res, 'ASD');
}));
}

{
let res;
pipelinify(
new Transform({
objectMode: true,
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, { chunk });
})
}),
async function*(source) {
for await (const chunk of source) {
yield chunk;
}
},
new Transform({
objectMode: true,
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, { chunk });
})
})
)
.end(true)
.on('data', common.mustCall((buf) => {
res = buf;
}))
.on('end', common.mustCall(() => {
assert.strictEqual(res.chunk.chunk, true);
}));
}

{
const _err = new Error('asd');
pipelinify(
new Transform({
objectMode: true,
transform: common.mustCall((chunk, encoding, callback) => {
callback(_err);
})
}),
async function*(source) {
for await (const chunk of source) {
yield chunk;
}
},
new Transform({
objectMode: true,
transform: common.mustNotCall((chunk, encoding, callback) => {
callback(null, { chunk });
})
})
)
.end(true)
.on('data', common.mustNotCall())
.on('end', common.mustNotCall())
.on('error', (err) => {
assert.strictEqual(err, _err);
});
}

{
const _err = new Error('asd');
pipelinify(
new Transform({
objectMode: true,
transform: common.mustCall((chunk, encoding, callback) => {
callback(null, chunk);
})
}),
async function*(source) {
let tmp = '';
for await (const chunk of source) {
tmp += chunk;
throw _err;
}
return tmp;
},
new Transform({
objectMode: true,
transform: common.mustNotCall((chunk, encoding, callback) => {
callback(null, { chunk });
})
})
)
.end(true)
.on('data', common.mustNotCall())
.on('end', common.mustNotCall())
.on('error', (err) => {
assert.strictEqual(err, _err);
});
}

0 comments on commit bea9efc

Please sign in to comment.