Skip to content

Commit

Permalink
stream: implement fetch body mixin on Readable
Browse files Browse the repository at this point in the history
Make Readable exposew the fetch boxy mixin API.

Bypasses webstream glue when possible.

Refs: https://fetch.spec.whatwg.org/#body-mixin
  • Loading branch information
ronag committed Jul 25, 2021
1 parent ab03ab4 commit ad42494
Showing 1 changed file with 135 additions and 1 deletion.
136 changes: 135 additions & 1 deletion lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ const {
NumberIsInteger,
NumberIsNaN,
NumberParseInt,
JSONParse,
ObjectDefineProperties,
ObjectKeys,
ObjectSetPrototypeOf,
Promise,
SafeSet,
SymbolAsyncIterator,
Symbol
Symbol,
TypeError,
} = primordials;

module.exports = Readable;
Expand All @@ -42,6 +44,8 @@ const EE = require('events');
const { Stream, prependListener } = require('internal/streams/legacy');
const { Buffer } = require('buffer');

let Blob;

const {
addAbortSignal,
} = require('internal/streams/add-abort-signal');
Expand All @@ -58,6 +62,7 @@ const {
} = require('internal/streams/state');

const {
AbortError,
aggregateTwoErrors,
codes: {
ERR_INVALID_ARG_TYPE,
Expand All @@ -69,6 +74,7 @@ const {
const { validateObject } = require('internal/validators');

const kPaused = Symbol('kPaused');
const kConsume = Symbol('kConsume');

const { StringDecoder } = require('string_decoder');
const from = require('internal/streams/from');
Expand Down Expand Up @@ -206,6 +212,8 @@ function Readable(options) {
addAbortSignal(options.signal, this);
}

this[kConsume] = undefined;

Stream.call(this, options);

destroyImpl.construct(this, () => {
Expand Down Expand Up @@ -1268,6 +1276,12 @@ ObjectDefineProperties(Readable.prototype, {

});

const kWebStreamType = 1;
const kTextType = 2;
const kBlobType = 3;
const kArrayBufferType = 4;
const kJSONType = 5;

ObjectDefineProperties(ReadableState.prototype, {
// Legacy getter for `pipesCount`.
pipesCount: {
Expand All @@ -1284,9 +1298,129 @@ ObjectDefineProperties(ReadableState.prototype, {
set(value) {
this[kPaused] = !!value;
}
},

// https://fetch.spec.whatwg.org/#dom-body-bodyused
bodyUsed: {
get() {
return isDisturbed(this);
}
},

body: {
get() {
if (this[kConsume]?.type === kWebStreamType) {
return this[kConsume].body;
}

return consume(this, kWebStreamType);
}
},

text: {
get() {
return consume(this, kTextType);
}
},

json: {
get() {
return consume(this, kJSONType);
}
},

blob: {
get() {
return consume(this, kBlobType);
}
},

arrayBuffer: {
get() {
return consume(this, kArrayBufferType);
}
}
});

function isLocked(self) {
return self[kConsume] &&
(self[kConsume].type !== kWebStreamType || self[kConsume].body.locked);
}

// https://streams.spec.whatwg.org/#readablestream-disturbed
function isDisturbed(self) {
return self.destroyed || self.readableDidRead;
}

// https://fetch.spec.whatwg.org/#body-unusable
function isUnusable(self) {
return isDisturbed(self) || isLocked(self);
}

function consume(self, type) {
if (isUnusable(self)) {
throw new TypeError('unusable');
}

if (type === kWebStreamType) {
self[kConsume] = {
type,
body: Readable.toWeb(self)
};

return self[kConsume].body;
}

return new Promise((resolve, reject) => {
self[kConsume] = {
type,
resolve,
reject,
body: type === kTextType || type === kJSONType ? '' : []
};
self
.on('error', reject)
.on('data', function(val) {
const { type } = this[kConsume];

if (type === kTextType || type === kJSONType) {
this[kConsume].body += val;
} else {
this[kConsume].body.push(val);
}
})
.on('end', function() {
const { type, resolve, body } = this[kConsume];

try {
if (type === kTextType) {
resolve(body);
} else if (type === kJSONType) {
resolve(JSONParse(body));
} else if (type === kArrayBufferType) {
resolve(Buffer.concat(body).buffer);
} else if (type === kBlobType) {
if (!Blob) {
Blob = require('buffer').Blob;
}
resolve(new Blob(body));
}

this[kConsume].body = null;
} catch (err) {
self.destroy(err);
}
})
.on('close', function() {
const { body, reject } = this[kConsume];

if (body !== null) {
reject(new AbortError());
}
});
});
}

// Exposed for testing purposes only.
Readable._fromList = fromList;

Expand Down

0 comments on commit ad42494

Please sign in to comment.