From ad42494a8d880296749f11e437c662a18b0df196 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 21:00:33 +0200 Subject: [PATCH] stream: implement fetch body mixin on Readable Make Readable exposew the fetch boxy mixin API. Bypasses webstream glue when possible. Refs: https://fetch.spec.whatwg.org/#body-mixin --- lib/internal/streams/readable.js | 136 ++++++++++++++++++++++++++++++- 1 file changed, 135 insertions(+), 1 deletion(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index e7dd60a6d78c136..b449c154c80ec04 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -26,13 +26,15 @@ const { NumberIsInteger, NumberIsNaN, NumberParseInt, + JSONParse, ObjectDefineProperties, ObjectKeys, ObjectSetPrototypeOf, Promise, SafeSet, SymbolAsyncIterator, - Symbol + Symbol, + TypeError, } = primordials; module.exports = Readable; @@ -42,6 +44,8 @@ const EE = require('events'); const { Stream, prependListener } = require('internal/streams/legacy'); const { Buffer } = require('buffer'); +let Blob; + const { addAbortSignal, } = require('internal/streams/add-abort-signal'); @@ -58,6 +62,7 @@ const { } = require('internal/streams/state'); const { + AbortError, aggregateTwoErrors, codes: { ERR_INVALID_ARG_TYPE, @@ -69,6 +74,7 @@ const { const { validateObject } = require('internal/validators'); const kPaused = Symbol('kPaused'); +const kConsume = Symbol('kConsume'); const { StringDecoder } = require('string_decoder'); const from = require('internal/streams/from'); @@ -206,6 +212,8 @@ function Readable(options) { addAbortSignal(options.signal, this); } + this[kConsume] = undefined; + Stream.call(this, options); destroyImpl.construct(this, () => { @@ -1268,6 +1276,12 @@ ObjectDefineProperties(Readable.prototype, { }); +const kWebStreamType = 1; +const kTextType = 2; +const kBlobType = 3; +const kArrayBufferType = 4; +const kJSONType = 5; + ObjectDefineProperties(ReadableState.prototype, { // Legacy getter for `pipesCount`. pipesCount: { @@ -1284,9 +1298,129 @@ ObjectDefineProperties(ReadableState.prototype, { set(value) { this[kPaused] = !!value; } + }, + + // https://fetch.spec.whatwg.org/#dom-body-bodyused + bodyUsed: { + get() { + return isDisturbed(this); + } + }, + + body: { + get() { + if (this[kConsume]?.type === kWebStreamType) { + return this[kConsume].body; + } + + return consume(this, kWebStreamType); + } + }, + + text: { + get() { + return consume(this, kTextType); + } + }, + + json: { + get() { + return consume(this, kJSONType); + } + }, + + blob: { + get() { + return consume(this, kBlobType); + } + }, + + arrayBuffer: { + get() { + return consume(this, kArrayBufferType); + } } }); +function isLocked(self) { + return self[kConsume] && + (self[kConsume].type !== kWebStreamType || self[kConsume].body.locked); +} + +// https://streams.spec.whatwg.org/#readablestream-disturbed +function isDisturbed(self) { + return self.destroyed || self.readableDidRead; +} + +// https://fetch.spec.whatwg.org/#body-unusable +function isUnusable(self) { + return isDisturbed(self) || isLocked(self); +} + +function consume(self, type) { + if (isUnusable(self)) { + throw new TypeError('unusable'); + } + + if (type === kWebStreamType) { + self[kConsume] = { + type, + body: Readable.toWeb(self) + }; + + return self[kConsume].body; + } + + return new Promise((resolve, reject) => { + self[kConsume] = { + type, + resolve, + reject, + body: type === kTextType || type === kJSONType ? '' : [] + }; + self + .on('error', reject) + .on('data', function(val) { + const { type } = this[kConsume]; + + if (type === kTextType || type === kJSONType) { + this[kConsume].body += val; + } else { + this[kConsume].body.push(val); + } + }) + .on('end', function() { + const { type, resolve, body } = this[kConsume]; + + try { + if (type === kTextType) { + resolve(body); + } else if (type === kJSONType) { + resolve(JSONParse(body)); + } else if (type === kArrayBufferType) { + resolve(Buffer.concat(body).buffer); + } else if (type === kBlobType) { + if (!Blob) { + Blob = require('buffer').Blob; + } + resolve(new Blob(body)); + } + + this[kConsume].body = null; + } catch (err) { + self.destroy(err); + } + }) + .on('close', function() { + const { body, reject } = this[kConsume]; + + if (body !== null) { + reject(new AbortError()); + } + }); + }); +} + // Exposed for testing purposes only. Readable._fromList = fromList;