diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index ca42174c86459a..c28e3c14421f04 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -38,6 +38,18 @@ const { willEmitClose: _willEmitClose, } = require('internal/streams/utils'); +const Readable = require('internal/streams/readable'); +const Writable = require('internal/streams/writable'); + +const { + isBrandCheck, +} = require('internal/webstreams/util'); + +const isReadableStream = + isBrandCheck('ReadableStream'); +const isWritableStream = + isBrandCheck('WritableStream'); + function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function'; } @@ -58,14 +70,17 @@ function eos(stream, options, callback) { callback = once(callback); - const readable = options.readable ?? isReadableNodeStream(stream); - const writable = options.writable ?? isWritableNodeStream(stream); - if (!isNodeStream(stream)) { - // TODO: Webstreams. - throw new ERR_INVALID_ARG_TYPE('stream', 'Stream', stream); + if (isReadableStream(stream)) { + stream = Readable.fromWeb(stream); + } else if (isWritableStream(stream)) { + stream = Writable.fromWeb(stream); + } } + const readable = options.readable ?? isReadableNodeStream(stream); + const writable = options.writable ?? isWritableNodeStream(stream); + const wState = stream._writableState; const rState = stream._readableState; diff --git a/test/parallel/test-webstreams-finished.js b/test/parallel/test-webstreams-finished.js new file mode 100644 index 00000000000000..6954fec5cad3d4 --- /dev/null +++ b/test/parallel/test-webstreams-finished.js @@ -0,0 +1,17 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { ReadableStream, WritableStream } = require('stream/web'); +const { finished } = require('stream'); + +{ + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('asd'); + }, + }); + finished(rs, common.mustCall(() => { + + })); +}