Skip to content

Commit

Permalink
stream: duplexify
Browse files Browse the repository at this point in the history
PR-URL: #39519
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
  • Loading branch information
ronag committed Aug 3, 2021
1 parent 51cd4a8 commit 533cafc
Show file tree
Hide file tree
Showing 11 changed files with 560 additions and 111 deletions.
28 changes: 28 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2081,6 +2081,34 @@ added: REPLACEME
* `streamWritable` {stream.Writable}
* Returns: {WritableStream}

### `stream.Duplex.from(src)`
<!-- YAML
added: REPLACEME
-->

* `src` {Stream|Blob|ArrayBuffer|string|Iterable|AsyncIterable|
AsyncGeneratorFunction|AsyncFunction|Promise|Object}

A utility method for creating duplex streams.

* `Stream` converts writable stream into writable `Duplex` and readable stream
to `Duplex`.
* `Blob` converts into readable `Duplex`.
* `string` converts into readable `Duplex`.
* `ArrayBuffer` converts into readable `Duplex`.
* `AsyncIterable` converts into a readable `Duplex`. Cannot yield
`null`.
* `AsyncGeneratorFunction` converts into a readable/writable transform
`Duplex`. Must take a source `AsyncIterable` as first parameter. Cannot yield
`null`.
* `AsyncFunction` converts into a writable `Duplex`. Must return
either `null` or `undefined`
* `Object ({ writable, readable })` converts `readable` and
`writable` into `Stream` and then combines them into `Duplex` where the
`Duplex` will write to the `writable` and read from the `readable`.
* `Promise` converts into readable `Duplex`. Value `null` is ignored.
* Returns: {stream.Duplex}

### `stream.Duplex.fromWeb(pair[, options])`
<!-- YAML
added: REPLACEME
Expand Down
96 changes: 4 additions & 92 deletions lib/internal/streams/compose.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,19 @@

const pipeline = require('internal/streams/pipeline');
const Duplex = require('internal/streams/duplex');
const { createDeferredPromise } = require('internal/util');
const { destroyer } = require('internal/streams/destroy');
const from = require('internal/streams/from');
const {
isNodeStream,
isIterable,
isReadable,
isWritable,
} = require('internal/streams/utils');
const {
PromiseResolve,
} = primordials;
const {
AbortError,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
ERR_INVALID_RETURN_VALUE,
ERR_MISSING_ARGS,
},
} = require('internal/errors');
const assert = require('internal/assert');

// This is needed for pre node 17.
class ComposeDuplex extends Duplex {
Expand Down Expand Up @@ -53,18 +44,18 @@ module.exports = function compose(...streams) {
}

if (streams.length === 1) {
return makeDuplex(streams[0], 'streams[0]');
return Duplex.from(streams[0]);
}

const orgStreams = [...streams];

if (typeof streams[0] === 'function') {
streams[0] = makeDuplex(streams[0], 'streams[0]');
streams[0] = Duplex.from(streams[0]);
}

if (typeof streams[streams.length - 1] === 'function') {
const idx = streams.length - 1;
streams[idx] = makeDuplex(streams[idx], `streams[${idx}]`);
streams[idx] = Duplex.from(streams[idx]);
}

for (let n = 0; n < streams.length; ++n) {
Expand Down Expand Up @@ -117,7 +108,7 @@ module.exports = function compose(...streams) {
// Implement Writable/Readable/Duplex traits.
// See, https://github.com/nodejs/node/pull/33515.
d = new ComposeDuplex({
highWaterMark: 1,
// TODO (ronag): highWaterMark?
writableObjectMode: !!head?.writableObjectMode,
readableObjectMode: !!tail?.writableObjectMode,
writable,
Expand Down Expand Up @@ -203,82 +194,3 @@ module.exports = function compose(...streams) {

return d;
};

function makeDuplex(stream, name) {
let ret;
if (typeof stream === 'function') {
assert(stream.length > 0);

const { value, write, final } = fromAsyncGen(stream);

if (isIterable(value)) {
ret = from(ComposeDuplex, value, {
objectMode: true,
highWaterMark: 1,
write,
final
});
} else if (typeof value?.then === 'function') {
const promise = PromiseResolve(value)
.then((val) => {
if (val != null) {
throw new ERR_INVALID_RETURN_VALUE('nully', name, val);
}
})
.catch((err) => {
destroyer(ret, err);
});

ret = new ComposeDuplex({
objectMode: true,
highWaterMark: 1,
readable: false,
write,
final(cb) {
final(() => promise.then(cb, cb));
}
});
} else {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or AsyncFunction', name, value);
}
} else if (isNodeStream(stream)) {
ret = stream;
} else if (isIterable(stream)) {
ret = from(ComposeDuplex, stream, {
objectMode: true,
highWaterMark: 1,
writable: false
});
} else {
throw new ERR_INVALID_ARG_TYPE(
name,
['Stream', 'Iterable', 'AsyncIterable', 'Function'],
stream)
;
}
return ret;
}

function fromAsyncGen(fn) {
let { promise, resolve } = createDeferredPromise();
const value = fn(async function*() {
while (true) {
const { chunk, done, cb } = await promise;
process.nextTick(cb);
if (done) return;
yield chunk;
({ promise, resolve } = createDeferredPromise());
}
}());

return {
value,
write(chunk, encoding, cb) {
resolve({ chunk, done: false, cb });
},
final(cb) {
resolve({ done: true, cb });
}
};
}
2 changes: 1 addition & 1 deletion lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ function emitErrorCloseLegacy(stream, err) {

// Normalize destroy for legacy.
function destroyer(stream, err) {
if (isDestroyed(stream)) {
if (!stream || isDestroyed(stream)) {
return;
}

Expand Down
9 changes: 9 additions & 0 deletions lib/internal/streams/duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,12 @@ Duplex.fromWeb = function(pair, options) {
Duplex.toWeb = function(duplex) {
return lazyWebStreams().newReadableWritablePairFromDuplex(duplex);
};

let duplexify;

Duplex.from = function(body) {
if (!duplexify) {
duplexify = require('internal/streams/duplexify');
}
return duplexify(body, 'body');
};
Loading

0 comments on commit 533cafc

Please sign in to comment.