diff --git a/lib/internal/streams/pipelinify.js b/lib/internal/streams/pipelinify.js new file mode 100644 index 00000000000000..98620032fe12b7 --- /dev/null +++ b/lib/internal/streams/pipelinify.js @@ -0,0 +1,89 @@ +'use strict'; + +const pipeline = require('internal/streams/pipeline'); +const Duplex = require('internal/streams/duplex'); + +module.exports = function pipe(...streams) { + let ondrain; + let onfinish; + let onclose; + let onreadable; + let ret; + + const r = pipeline(streams, function(err) { + if (onclose) { + const cb = onclose; + onclose = null; + cb(err); + } else { + ret.destroy(err); + } + }); + const w = streams[0]; + + const writable = w.writable; + const readable = r.readable; + const objectMode = w.readableObjectMode; + + ret = new Duplex({ + writable, + readable, + objectMode, + highWaterMark: 1 + }); + + if (writable) { + ret._write = function(chunk, encoding, callback) { + if (w.write(chunk, encoding)) { + callback(); + } else { + ondrain = callback; + } + }; + + ret._final = function(chunk, encoding, callback) { + w.end(chunk, encoding); + onfinish = callback; + }; + + ret.on('drain', function () { + if (ondrain) { + const cb = ondrain; + ondrain = null; + cb(); + } + }); + + ret.on('finish', function () { + if (onfinish) { + const cb = onfinish + onfinish = null; + cb(); + } + }); + } + + if (readable) { + onreadable = function () { + while (true) { + const buf = r.read(); + + if (buf === null) { + r.once('readable', onreadable); + return; + } + + if (!ret.push(buf)) { + return; + } + } + } + + ret._read = onreadable; + } + + ret._destroy = function(err, callback) { + onclose = callback; + streams[0].destroy(err); + }; +} diff --git a/lib/stream.js b/lib/stream.js index 85adda81b32f29..3b4a83868f84ac 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -30,6 +30,7 @@ const { } = require('internal/util'); const pipeline = require('internal/streams/pipeline'); +const pipelinify = require('internal/streams/pipelinify'); const eos = require('internal/streams/end-of-stream'); const internalBuffer = require('internal/buffer'); @@ -42,6 +43,7 @@ Stream.Duplex = require('internal/streams/duplex'); Stream.Transform = require('internal/streams/transform'); Stream.PassThrough = require('internal/streams/passthrough'); Stream.pipeline = pipeline; +Stream.pipelinify = pipelinify; const { addAbortSignal } = require('internal/streams/add-abort-signal'); Stream.addAbortSignal = addAbortSignal; Stream.finished = eos;