Skip to content

Commit

Permalink
streams: implement finished() for webstreams
Browse files Browse the repository at this point in the history
  • Loading branch information
debadree25 committed Jan 13, 2023
1 parent 91ca2d4 commit d99602f
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 36 deletions.
23 changes: 20 additions & 3 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const {
validateBoolean
} = require('internal/validators');

const { Promise } = primordials;
const { Promise, PromisePrototypeThen } = primordials;

const {
isClosed,
Expand All @@ -38,6 +38,15 @@ const {
willEmitClose: _willEmitClose,
} = require('internal/streams/utils');

const {
isBrandCheck,
} = require('internal/webstreams/util');

const isReadableStream =
isBrandCheck('ReadableStream');
const isWritableStream =
isBrandCheck('WritableStream');

function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}
Expand All @@ -62,8 +71,7 @@ function eos(stream, options, callback) {
const writable = options.writable ?? isWritableNodeStream(stream);

if (!isNodeStream(stream)) {
// TODO: Webstreams.
throw new ERR_INVALID_ARG_TYPE('stream', 'Stream', stream);
return eosWeb(stream, options, callback);
}

const wState = stream._writableState;
Expand Down Expand Up @@ -255,6 +263,15 @@ function eos(stream, options, callback) {
return cleanup;
}

function eosWeb(stream, opts, callback) {
PromisePrototypeThen(
stream.streamClosed,
() => callback.call(stream),
(err) => callback.call(stream, err)
);
return nop;
}

function finished(stream, opts) {
let autoCleanup = false;
if (opts === null) {
Expand Down
10 changes: 5 additions & 5 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ function isWritableEnded(stream) {

// Have emitted 'finish'.
function isWritableFinished(stream, strict) {
if (!isWritableNodeStream(stream)) return null;
if (!isWritableNodeStream(stream)) return stream?.state === 'closed' ? true : null;
if (stream.writableFinished === true) return true;
const wState = stream._writableState;
if (wState?.errored) return false;
Expand All @@ -106,7 +106,7 @@ function isReadableEnded(stream) {

// Have emitted 'end'.
function isReadableFinished(stream, strict) {
if (!isReadableNodeStream(stream)) return null;
if (!isReadableNodeStream(stream)) stream?.state === 'closed' ? true : null;
const rState = stream._readableState;
if (rState?.errored) return false;
if (typeof rState?.endEmitted !== 'boolean') return null;
Expand Down Expand Up @@ -155,7 +155,7 @@ function isFinished(stream, opts) {

function isWritableErrored(stream) {
if (!isNodeStream(stream)) {
return null;
return stream?.state === 'errored' ? true : null;
}

if (stream.writableErrored) {
Expand All @@ -167,7 +167,7 @@ function isWritableErrored(stream) {

function isReadableErrored(stream) {
if (!isNodeStream(stream)) {
return null;
return stream?.state === 'errored' ? true : null;
}

if (stream.readableErrored) {
Expand All @@ -179,7 +179,7 @@ function isReadableErrored(stream) {

function isClosed(stream) {
if (!isNodeStream(stream)) {
return null;
return stream?.state === 'closed' ? true : null;
}

if (typeof stream.closed === 'boolean') {
Expand Down
14 changes: 13 additions & 1 deletion lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ class ReadableStream {
port1: undefined,
port2: undefined,
promise: undefined,
}
},
streamClosed: createDeferredPromise(),
};

// The spec requires handling of the strategy first
Expand Down Expand Up @@ -288,6 +289,12 @@ class ReadableStream {
return isReadableStreamLocked(this);
}

get streamClosed() {
if (!isReadableStream(this))
throw new ERR_INVALID_THIS('ReadableStream');
return this[kState].streamClosed.promise;
}

/**
* @param {any} [reason]
* @returns { Promise<void> }
Expand Down Expand Up @@ -1869,6 +1876,7 @@ function readableStreamCancel(stream, reason) {
function readableStreamClose(stream) {
assert(stream[kState].state === 'readable');
stream[kState].state = 'closed';
stream[kState].streamClosed?.resolve?.();

const {
reader,
Expand Down Expand Up @@ -1900,6 +1908,10 @@ function readableStreamError(stream, error) {

reader[kState].close.reject(error);
setPromiseHandled(reader[kState].close.promise);
if (stream[kState].streamClosed?.promise !== undefined) {
stream[kState].streamClosed?.reject?.(error);
setPromiseHandled(stream[kState].streamClosed?.promise);
}

if (readableStreamHasDefaultReader(stream)) {
for (let n = 0; n < reader[kState].readRequests.length; n++)
Expand Down
14 changes: 13 additions & 1 deletion lib/internal/webstreams/writablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ class WritableStream {
port1: undefined,
port2: undefined,
promise: undefined,
}
},
streamClosed: createDeferredPromise(),
};

const size = extractSizeAlgorithm(strategy?.size);
Expand All @@ -201,6 +202,12 @@ class WritableStream {
return isWritableStreamLocked(this);
}

get streamClosed() {
if (!isWritableStream(this))
throw new ERR_INVALID_THIS('WritableStream');
return this[kState].streamClosed.promise;
}

/**
* @param {any} reason
* @returns {Promise<void>}
Expand Down Expand Up @@ -733,6 +740,10 @@ function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) {
writer[kState].close.reject?.(stream[kState].storedError);
setPromiseHandled(writer[kState].close.promise);
}
if (stream[kState].streamClosed?.promise !== undefined) {
stream[kState].streamClosed.reject?.(stream[kState]?.storedError);
setPromiseHandled(stream[kState].streamClosed?.promise);
}
}

function writableStreamMarkFirstWriteRequestInFlight(stream) {
Expand Down Expand Up @@ -839,6 +850,7 @@ function writableStreamFinishInFlightClose(stream) {
stream[kState].state = 'closed';
if (stream[kState].writer !== undefined)
stream[kState].writer[kState].close.resolve?.();
stream[kState].streamClosed?.resolve?.();
assert(stream[kState].pendingAbortRequest.abort.promise === undefined);
assert(stream[kState].storedError === undefined);
}
Expand Down
20 changes: 0 additions & 20 deletions test/parallel/test-stream-end-of-streams.js

This file was deleted.

7 changes: 1 addition & 6 deletions test/parallel/test-stream-finished.js
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,7 @@ const http = require('http');
const streamLike = new EE();
streamLike.readableEnded = true;
streamLike.readable = true;
assert.throws(
() => {
finished(streamLike, () => {});
},
{ code: 'ERR_INVALID_ARG_TYPE' }
);
finished(streamLike, common.mustCall());
streamLike.emit('close');
}

Expand Down
40 changes: 40 additions & 0 deletions test/parallel/test-webstreams-finished.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const { ReadableStream, WritableStream } = require('stream/web');
const { finished } = require('stream');

{
const rs = new ReadableStream({
start(controller) {
controller.enqueue('asd');
controller.close();
},
});
finished(rs, common.mustSucceed());
async function test() {
const values = [];
for await (const chunk of rs) {
values.push(chunk);
}
assert.deepStrictEqual(values, ['asd']);
}
test();
}

{
let str = '';
const ws = new WritableStream({
write(chunk) {
console.log(chunk);
str += chunk;
}
});
finished(ws, common.mustSucceed(() => {
assert.strictEqual(str, 'asd');
}));
const writer = ws.getWriter();
writer.write('asd');
writer.close();
}

0 comments on commit d99602f

Please sign in to comment.