diff --git a/lib/internal/streams/pipelinify.js b/lib/internal/streams/pipelinify.js new file mode 100644 index 000000000000000..919b9da35227473 --- /dev/null +++ b/lib/internal/streams/pipelinify.js @@ -0,0 +1,106 @@ +'use strict'; + +const pipeline = require('internal/streams/pipeline'); +const Duplex = require('internal/streams/duplex'); +const { destroyer } = require('internal/streams/destroy'); + +module.exports = function pipe(...streams) { + let ondrain; + let onfinish; + let onreadable; + let onclose; + let ret; + + const r = pipeline(streams, function(err) { + if (onclose) { + const cb = onclose; + onclose = null; + cb(err); + } else { + ret.destroy(err); + } + }); + const w = streams[0]; + + const writable = w.writable; + const readable = r.readable; + const objectMode = w.readableObjectMode; + + ret = new Duplex({ + writable, + readable, + objectMode, + highWaterMark: 1 + }); + + if (writable) { + ret._write = function(chunk, encoding, callback) { + if (w.write(chunk, encoding)) { + callback(); + } else { + ondrain = callback; + } + }; + + ret._final = function(callback) { + w.end(); + onfinish = callback; + }; + + ret.on('drain', function() { + if (ondrain) { + const cb = ondrain; + ondrain = null; + cb(); + } + }); + + ret.on('finish', function() { + if (onfinish) { + const cb = onfinish; + onfinish = null; + cb(); + } + }); + + // Writable.write unfortunately checks buffering before + // writing, and not after due to https://github.com/nodejs/node/pull/35941. + // We need to override this in order to avoid buffering overhead. + ret.write = function write (chunk, encoding, cb) { + return Duplex.prototype.write.call(ret, chunk, encoding, cb) && !ondrain; + } + } + + if (readable) { + r.on('readable', function() { + if (onreadable) { + const cb = onreadable; + onreadable = null; + cb(); + } + }); + + ret._read = function() { + while (true) { + const buf = r.read(); + + if (buf === null) { + onreadable = ret._read; + return; + } + + if (!ret.push(buf)) { + return; + } + } + }; + } + + ret._destroy = function(err, callback) { + onclose = callback; + onreadable = null; + ondrain = null; + onfinish = null; + destroyer(r, err); + }; +}; diff --git a/lib/stream.js b/lib/stream.js index 85adda81b32f294..3b4a83868f84acd 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -30,6 +30,7 @@ const { } = require('internal/util'); const pipeline = require('internal/streams/pipeline'); +const pipelinify = require('internal/streams/pipelinify'); const eos = require('internal/streams/end-of-stream'); const internalBuffer = require('internal/buffer'); @@ -42,6 +43,7 @@ Stream.Duplex = require('internal/streams/duplex'); Stream.Transform = require('internal/streams/transform'); Stream.PassThrough = require('internal/streams/passthrough'); Stream.pipeline = pipeline; +Stream.pipelinify = pipelinify; const { addAbortSignal } = require('internal/streams/add-abort-signal'); Stream.addAbortSignal = addAbortSignal; Stream.finished = eos;