Skip to content

Commit

Permalink
stream: add helpers to create internal state
Browse files Browse the repository at this point in the history
  • Loading branch information
MattiasBuelens committed Dec 15, 2023
1 parent 3846bfe commit 0d45e6f
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 162 deletions.
72 changes: 20 additions & 52 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -251,19 +251,7 @@ class ReadableStream {
markTransferMode(this, false, true);
if (source === null)
throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
this[kState] = {
disturbed: false,
reader: undefined,
state: 'readable',
storedError: undefined,
stream: undefined,
transfer: {
writable: undefined,
port1: undefined,
port2: undefined,
promise: undefined,
},
};
this[kState] = createReadableStreamState();

this[kIsClosedPromise] = createDeferredPromise();
this[kControllerErrorFunction] = () => {};
Expand Down Expand Up @@ -647,19 +635,7 @@ ObjectDefineProperties(ReadableStream, {
function InternalTransferredReadableStream() {
markTransferMode(this, false, true);
this[kType] = 'ReadableStream';
this[kState] = {
disturbed: false,
reader: undefined,
state: 'readable',
storedError: undefined,
stream: undefined,
transfer: {
writable: undefined,
port1: undefined,
port2: undefined,
promise: undefined,
},
};
this[kState] = createReadableStreamState();

this[kIsClosedPromise] = createDeferredPromise();
}
Expand Down Expand Up @@ -1231,19 +1207,7 @@ ObjectDefineProperties(ReadableByteStreamController.prototype, {
function InternalReadableStream(start, pull, cancel, highWaterMark, size) {
markTransferMode(this, false, true);
this[kType] = 'ReadableStream';
this[kState] = {
disturbed: false,
reader: undefined,
state: 'readable',
storedError: undefined,
stream: undefined,
transfer: {
writable: undefined,
port1: undefined,
port2: undefined,
promise: undefined,
},
};
this[kState] = createReadableStreamState();
this[kIsClosedPromise] = createDeferredPromise();
const controller = new ReadableStreamDefaultController(kSkipThrow);
setupReadableStreamDefaultController(
Expand All @@ -1270,19 +1234,7 @@ function createReadableStream(start, pull, cancel, highWaterMark = 1, size = ()
function InternalReadableByteStream(start, pull, cancel) {
markTransferMode(this, false, true);
this[kType] = 'ReadableStream';
this[kState] = {
disturbed: false,
reader: undefined,
state: 'readable',
storedError: undefined,
stream: undefined,
transfer: {
writable: undefined,
port1: undefined,
port2: undefined,
promise: undefined,
},
};
this[kState] = createReadableStreamState();
this[kIsClosedPromise] = createDeferredPromise();
const controller = new ReadableByteStreamController(kSkipThrow);
setupReadableByteStreamController(
Expand Down Expand Up @@ -1319,6 +1271,22 @@ const isReadableStreamBYOBReader =

// ---- ReadableStream Implementation

function createReadableStreamState() {
return {
disturbed: false,
reader: undefined,
state: 'readable',
storedError: undefined,
stream: undefined,
transfer: {
writable: undefined,
port1: undefined,
port2: undefined,
promise: undefined,
},
};
}

function readableStreamFromIterable(iterable) {
let stream;
const iteratorRecord = getIterator(iterable, 'async');
Expand Down
148 changes: 38 additions & 110 deletions lib/internal/webstreams/writablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,45 +160,7 @@ class WritableStream {
if (type !== undefined)
throw new ERR_INVALID_ARG_VALUE.RangeError('type', type);

this[kState] = {
close: createDeferredPromise(),
closeRequest: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
inFlightWriteRequest: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
inFlightCloseRequest: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
pendingAbortRequest: {
abort: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
reason: undefined,
wasAlreadyErroring: false,
},
backpressure: false,
controller: undefined,
state: 'writable',
storedError: undefined,
writeRequests: [],
writer: undefined,
transfer: {
readable: undefined,
port1: undefined,
port2: undefined,
promise: undefined,
},
};
this[kState] = createWritableStreamState();

this[kIsClosedPromise] = createDeferredPromise();
this[kControllerErrorFunction] = () => {};
Expand Down Expand Up @@ -330,45 +292,7 @@ ObjectDefineProperties(WritableStream.prototype, {
function InternalTransferredWritableStream() {
markTransferMode(this, false, true);
this[kType] = 'WritableStream';
this[kState] = {
close: createDeferredPromise(),
closeRequest: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
inFlightWriteRequest: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
inFlightCloseRequest: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
pendingAbortRequest: {
abort: {
promise: undefined,
resolve: undefined,
reject: undefined,
},
reason: undefined,
wasAlreadyErroring: false,
},
backpressure: false,
controller: undefined,
state: 'writable',
storedError: undefined,
writeRequests: [],
writer: undefined,
transfer: {
readable: undefined,
port1: undefined,
port2: undefined,
promise: undefined,
},
};
this[kState] = createWritableStreamState();

this[kIsClosedPromise] = createDeferredPromise();
}
Expand Down Expand Up @@ -584,7 +508,42 @@ ObjectDefineProperties(WritableStreamDefaultController.prototype, {
function InternalWritableStream(start, write, close, abort, highWaterMark, size) {
markTransferMode(this, false, true);
this[kType] = 'WritableStream';
this[kState] = {
this[kState] = createWritableStreamState();
this[kIsClosedPromise] = createDeferredPromise();

const controller = new WritableStreamDefaultController(kSkipThrow);
setupWritableStreamDefaultController(
this,
controller,
start,
write,
close,
abort,
highWaterMark,
size
)
}

ObjectSetPrototypeOf(InternalWritableStream.prototype, WritableStream.prototype);
ObjectSetPrototypeOf(InternalWritableStream, WritableStream);

function createWritableStream(start, write, close, abort, highWaterMark = 1, size = () => 1) {
const stream = new InternalWritableStream(start, write, close, abort, highWaterMark, size);

// For spec compliance the InternalWritableStream must be a WritableStream
stream.constructor = WritableStream;
return stream;
}

const isWritableStream =
isBrandCheck('WritableStream');
const isWritableStreamDefaultWriter =
isBrandCheck('WritableStreamDefaultWriter');
const isWritableStreamDefaultController =
isBrandCheck('WritableStreamDefaultController');

function createWritableStreamState() {
return {
close: createDeferredPromise(),
closeRequest: {
promise: undefined,
Expand Down Expand Up @@ -623,39 +582,8 @@ function InternalWritableStream(start, write, close, abort, highWaterMark, size)
promise: undefined,
},
};
this[kIsClosedPromise] = createDeferredPromise();

const controller = new WritableStreamDefaultController(kSkipThrow);
setupWritableStreamDefaultController(
this,
controller,
start,
write,
close,
abort,
highWaterMark,
size
)
}

ObjectSetPrototypeOf(InternalWritableStream.prototype, WritableStream.prototype);
ObjectSetPrototypeOf(InternalWritableStream, WritableStream);

function createWritableStream(start, write, close, abort, highWaterMark, size) {
const stream = new InternalWritableStream(start, write, close, abort, highWaterMark, size);

// For spec compliance the InternalWritableStream must be a WritableStream
stream.constructor = WritableStream;
return stream;
}

const isWritableStream =
isBrandCheck('WritableStream');
const isWritableStreamDefaultWriter =
isBrandCheck('WritableStreamDefaultWriter');
const isWritableStreamDefaultController =
isBrandCheck('WritableStreamDefaultController');

function isWritableStreamLocked(stream) {
return stream[kState].writer !== undefined;
}
Expand Down

0 comments on commit 0d45e6f

Please sign in to comment.