From 9e847cdc27cead4aecba73590344982e556d7e14 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 23 Jul 2021 08:41:40 +0200 Subject: [PATCH 01/19] feat: return body as consumable body --- lib/api/api-request.js | 2 +- lib/api/readable.js | 265 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 266 insertions(+), 1 deletion(-) create mode 100644 lib/api/readable.js diff --git a/lib/api/api-request.js b/lib/api/api-request.js index e9a67430ea0..fa1543d55cd 100644 --- a/lib/api/api-request.js +++ b/lib/api/api-request.js @@ -1,6 +1,6 @@ 'use strict' -const { Readable } = require('stream') +const Readable = require('./readable') const { InvalidArgumentError, RequestAbortedError diff --git a/lib/api/readable.js b/lib/api/readable.js new file mode 100644 index 00000000000..1ff696a87cb --- /dev/null +++ b/lib/api/readable.js @@ -0,0 +1,265 @@ +'use strict' + +const { Readable } = require('stream') +const assert = require('assert') + +let Blob +let ReadableStream + +const kBody = Symbol('body') + +const kWebStreamType = 1 +const kTextType = 2 +const kBlobType = 3 +const kArrayBufferType = 4 +const kJSONType = 5 + +class AbortError extends Error { + constructor () { + super('The operation was aborted') + this.code = 'ABORT_ERR' + this.name = 'AbortError' + } +} + +module.exports = class BodyReadable extends Readable { + constructor (opts) { + super(opts) + this[kBody] = undefined + } + + destroy (err) { + // TODO (fix): This is not strictly correct. + if (!err && this[kBody] && !this[kBody].ended) { + err = new AbortError() + } + + return Readable.prototype.destroy.call(this, err) + } + + push (val) { + if (this[kBody]) { + try { + return this[kBody].push(val) + } catch (err) { + this.destroy(err) + return false + } + } + + return Readable.prototype.push.call(this, val) + } + + read (n) { + if (this[kBody] === undefined) { + consume(this) + } + return Readable.prototype.read.call(this, n) + } + + resume () { + if (this[kBody] === undefined) { + consume(this) + } + return Readable.prototype.resume.call(this) + } + + pipe (dest, pipeOpts) { + if (this[kBody] === undefined) { + consume(this) + } + return Readable.prototype.pipe.call(this, dest, pipeOpts) + } + + on (ev, fn) { + if (this[kBody] === undefined && (ev === 'data' || ev === 'readable')) { + consume(this) + } + return Readable.prototype.on.call(this, ev, fn) + } + + addListener (ev, fn) { + return this.on(ev, fn) + } + + get bodyUsed () { + if (this[kBody]) { + return this[kBody].used + } + + return this.readableDidRead !== undefined + ? this.readableDidRead + : this[kBody] === null + } + + get body () { + return consume(this, kWebStreamType) + } + + text () { + return consume(this, kTextType) + } + + json () { + return consume(this, kJSONType) + } + + blob () { + return consume(this, kBlobType) + } + + arrayBuffer () { + return consume(this, kArrayBufferType) + } +} + +function start (self) { + assert(self.listenerCount('data') === 0) + + const state = self._readableState + while (state.buffer.length) { + self[kBody].push(state.buffer.shift()) + } + if (state.ended) { + self[kBody].push(null) + } + + self._read() +} + +function consume (self, type) { + try { + if (self.bodyUsed) { + throw new TypeError('disturbed') + } + + if (self[kBody]) { + throw new TypeError('locked') + } + } catch (err) { + if (!type) { + self.destroy(err) + } else { + throw err + } + } + + if (!type) { + self[kBody] = null + return self + } + + if (type === kWebStreamType) { + if (!ReadableStream) { + ReadableStream = require('stream/web').ReadableStream + } + + return new ReadableStream({ + start (controller) { + if (self[kBody]) { + // TODO (fix): it's a little unclear what we need to do here. + this.controller.error(new Error('locked')) + } else { + self.on('error', err => { + this.controller.error(err) + }) + self[kBody] = { + type, + used: false, + buffer: self, + controller, + ended: false, + push (val) { + // TODO (fix): This is not strictly correct. + this.used = true + + if (!this.controller) { + this.buffer.push(val) + return false + } + + if (!val) { + this.controller.close() + this.ended = true + + // TODO (fix): This is not strictly correct. + queueMicrotask(() => { + Readable.prototype.push.call(self, null) + }) + } else { + this.controller.enqueue(new Uint8Array(val)) + } + + return this.controller.desiredSize > 0 + } + } + } + start(self) + }, + + pull () { + self._read() + }, + + cancel (reason) { + let err + + if (reason instanceof Error) { + err = reason + } else if (typeof reason === 'string') { + err = new Error(reason) + } else { + err = new AbortError() + } + + self.destroy(err) + } + }, { highWaterMark: 16 * 1024 }) + } + + return new Promise((resolve, reject) => { + self.on('error', reject) + self[kBody] = { + type, + used: false, + ended: false, + body: this.type === kTextType || this.type === kJSONType ? '' : [], + push (val) { + this.used = true + + if (this.type === kTextType || this.type === kJSONType) { + if (val !== null) { + this.body += val + } else if (this.type === kTextType) { + resolve(this.body) + } else if (this.type === kJSONType) { + resolve(JSON.parse(this.body)) + } + } else { + if (val !== null) { + this.body.push(val) + } else if (this.type === kArrayBufferType) { + resolve(Buffer.concat(this.body).buffer) + } else if (this.type === kBlobType) { + if (!Blob) { + Blob = require('buffer').Blob + } + resolve(new Blob(this.body)) + } + } + + if (val === null) { + this.ended = true + this.body = null + queueMicrotask(() => { + Readable.prototype.push.call(self, null) + }) + } + + return true + } + } + + start(self) + }) +} From b94bafb4d010349fa1669a9baa4353b930ed5e9f Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 11:56:45 +0200 Subject: [PATCH 02/19] fixup --- lib/api/readable.js | 80 +++++++++++++++++++++------------------------ 1 file changed, 38 insertions(+), 42 deletions(-) diff --git a/lib/api/readable.js b/lib/api/readable.js index 1ff696a87cb..5998dad84f4 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -39,12 +39,7 @@ module.exports = class BodyReadable extends Readable { push (val) { if (this[kBody]) { - try { - return this[kBody].push(val) - } catch (err) { - this.destroy(err) - return false - } + return this[kBody].push(val) } return Readable.prototype.push.call(this, val) @@ -128,20 +123,12 @@ function start (self) { } function consume (self, type) { - try { - if (self.bodyUsed) { - throw new TypeError('disturbed') - } + if (self.bodyUsed) { + throw new TypeError('disturbed') + } - if (self[kBody]) { - throw new TypeError('locked') - } - } catch (err) { - if (!type) { - self.destroy(err) - } else { - throw err - } + if (self[kBody]) { + throw new TypeError('locked') } if (!type) { @@ -170,6 +157,10 @@ function consume (self, type) { controller, ended: false, push (val) { + if (self.destroyed) { + return false + } + // TODO (fix): This is not strictly correct. this.used = true @@ -183,9 +174,7 @@ function consume (self, type) { this.ended = true // TODO (fix): This is not strictly correct. - queueMicrotask(() => { - Readable.prototype.push.call(self, null) - }) + Readable.prototype.push.call(self, null) } else { this.controller.enqueue(new Uint8Array(val)) } @@ -225,35 +214,42 @@ function consume (self, type) { ended: false, body: this.type === kTextType || this.type === kJSONType ? '' : [], push (val) { + if (self.destroyed) { + return false + } + this.used = true - if (this.type === kTextType || this.type === kJSONType) { - if (val !== null) { - this.body += val - } else if (this.type === kTextType) { - resolve(this.body) - } else if (this.type === kJSONType) { - resolve(JSON.parse(this.body)) - } - } else { - if (val !== null) { - this.body.push(val) - } else if (this.type === kArrayBufferType) { - resolve(Buffer.concat(this.body).buffer) - } else if (this.type === kBlobType) { - if (!Blob) { - Blob = require('buffer').Blob + try { + if (this.type === kTextType || this.type === kJSONType) { + if (val !== null) { + this.body += val + } else if (this.type === kTextType) { + resolve(this.body) + } else if (this.type === kJSONType) { + resolve(JSON.parse(this.body)) + } + } else { + if (val !== null) { + this.body.push(val) + } else if (this.type === kArrayBufferType) { + resolve(Buffer.concat(this.body).buffer) + } else if (this.type === kBlobType) { + if (!Blob) { + Blob = require('buffer').Blob + } + resolve(new Blob(this.body)) } - resolve(new Blob(this.body)) } + } catch (err) { + self.destroy(err) + return false } if (val === null) { this.ended = true this.body = null - queueMicrotask(() => { - Readable.prototype.push.call(self, null) - }) + Readable.prototype.push.call(self, null) } return true From 62aa3ad40f61db5de8ba6edfacac432554b6b1e5 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 12:00:35 +0200 Subject: [PATCH 03/19] fixup --- lib/api/readable.js | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/lib/api/readable.js b/lib/api/readable.js index 5998dad84f4..ad24b15bba2 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -29,8 +29,7 @@ module.exports = class BodyReadable extends Readable { } destroy (err) { - // TODO (fix): This is not strictly correct. - if (!err && this[kBody] && !this[kBody].ended) { + if (this[kBody] && !err && !this._readableState.ended) { err = new AbortError() } @@ -155,13 +154,14 @@ function consume (self, type) { used: false, buffer: self, controller, - ended: false, push (val) { if (self.destroyed) { return false } // TODO (fix): This is not strictly correct. + // Just because chunk was enqueue doesn't mean + // that it was read? this.used = true if (!this.controller) { @@ -171,10 +171,15 @@ function consume (self, type) { if (!val) { this.controller.close() - this.ended = true // TODO (fix): This is not strictly correct. - Readable.prototype.push.call(self, null) + // Just because chunk was enqueue doesn't mean + // that it was read? How do we wait for stream + // to be emptired? queueMicrotask is just a hack + // to hope that it's been drained. + queueMicrotask(() => { + Readable.prototype.push.call(self, null) + }) } else { this.controller.enqueue(new Uint8Array(val)) } @@ -211,7 +216,6 @@ function consume (self, type) { self[kBody] = { type, used: false, - ended: false, body: this.type === kTextType || this.type === kJSONType ? '' : [], push (val) { if (self.destroyed) { @@ -247,7 +251,6 @@ function consume (self, type) { } if (val === null) { - this.ended = true this.body = null Readable.prototype.push.call(self, null) } From fb927f8a2b9164888f1d2e79f7464f06c504c82d Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 12:05:08 +0200 Subject: [PATCH 04/19] fixup --- lib/api/readable.js | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/lib/api/readable.js b/lib/api/readable.js index ad24b15bba2..395a5ed3145 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -7,6 +7,7 @@ let Blob let ReadableStream const kBody = Symbol('body') +const kDestroyed = Symbol('destroyed') const kWebStreamType = 1 const kTextType = 2 @@ -26,13 +27,20 @@ module.exports = class BodyReadable extends Readable { constructor (opts) { super(opts) this[kBody] = undefined + this[kDestroyed] = false } destroy (err) { + if (this[kDestroyed]) { + return + } + if (this[kBody] && !err && !this._readableState.ended) { err = new AbortError() } + this[kDestroyed] = true + return Readable.prototype.destroy.call(this, err) } @@ -146,9 +154,14 @@ function consume (self, type) { // TODO (fix): it's a little unclear what we need to do here. this.controller.error(new Error('locked')) } else { - self.on('error', err => { - this.controller.error(err) - }) + self + .on('error', err => { + this.controller.error(err) + }) + .on('end', () => { + // autoDestroy might have been disabled. + Readable.prototype.destroy.call(self, null) + }) self[kBody] = { type, used: false, @@ -160,7 +173,7 @@ function consume (self, type) { } // TODO (fix): This is not strictly correct. - // Just because chunk was enqueue doesn't mean + // Just because chunk was enqueued doesn't mean // that it was read? this.used = true @@ -173,9 +186,9 @@ function consume (self, type) { this.controller.close() // TODO (fix): This is not strictly correct. - // Just because chunk was enqueue doesn't mean + // Just because chunk was enqueued doesn't mean // that it was read? How do we wait for stream - // to be emptired? queueMicrotask is just a hack + // to be drained? queueMicrotask is just a hack // to hope that it's been drained. queueMicrotask(() => { Readable.prototype.push.call(self, null) @@ -212,7 +225,12 @@ function consume (self, type) { } return new Promise((resolve, reject) => { - self.on('error', reject) + self + .on('error', reject) + .on('end', () => { + // autoDestroy might have been disabled. + Readable.prototype.destroy.call(self, null) + }) self[kBody] = { type, used: false, From a4838fa1049f5059d620e99ddec8378b3192b187 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 12:05:35 +0200 Subject: [PATCH 05/19] fixup --- lib/api/readable.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/api/readable.js b/lib/api/readable.js index 395a5ed3145..ccf174cd9da 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -35,7 +35,7 @@ module.exports = class BodyReadable extends Readable { return } - if (this[kBody] && !err && !this._readableState.ended) { + if (this[kBody] && !err && !this._readableState.endEmitted) { err = new AbortError() } From 61ffc29e7e4285a57a0840fdbff8a44b9be15959 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 12:08:12 +0200 Subject: [PATCH 06/19] fixup --- lib/api/readable.js | 33 +++++++++++++++------------------ 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/lib/api/readable.js b/lib/api/readable.js index ccf174cd9da..eed01cbc103 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -160,7 +160,7 @@ function consume (self, type) { }) .on('end', () => { // autoDestroy might have been disabled. - Readable.prototype.destroy.call(self, null) + self.destroy() }) self[kBody] = { type, @@ -229,7 +229,7 @@ function consume (self, type) { .on('error', reject) .on('end', () => { // autoDestroy might have been disabled. - Readable.prototype.destroy.call(self, null) + self.destroy() }) self[kBody] = { type, @@ -242,18 +242,18 @@ function consume (self, type) { this.used = true - try { + if (val !== null) { if (this.type === kTextType || this.type === kJSONType) { - if (val !== null) { - this.body += val - } else if (this.type === kTextType) { + this.body += val + } else { + this.body.push(val) + } + } else { + try { + if (this.type === kTextType) { resolve(this.body) } else if (this.type === kJSONType) { resolve(JSON.parse(this.body)) - } - } else { - if (val !== null) { - this.body.push(val) } else if (this.type === kArrayBufferType) { resolve(Buffer.concat(this.body).buffer) } else if (this.type === kBlobType) { @@ -262,15 +262,12 @@ function consume (self, type) { } resolve(new Blob(this.body)) } - } - } catch (err) { - self.destroy(err) - return false - } - if (val === null) { - this.body = null - Readable.prototype.push.call(self, null) + this.body = null + Readable.prototype.push.call(self, null) + } catch (err) { + self.destroy(err) + } } return true From b2316f702a507b51d68bf4e3b649bb0cdc066304 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 12:10:58 +0200 Subject: [PATCH 07/19] fixup --- lib/api/readable.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/api/readable.js b/lib/api/readable.js index eed01cbc103..d747f3b7351 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -84,6 +84,10 @@ module.exports = class BodyReadable extends Readable { return this.on(ev, fn) } + get locked () { + return !!this[kBody] + } + get bodyUsed () { if (this[kBody]) { return this[kBody].used @@ -134,7 +138,7 @@ function consume (self, type) { throw new TypeError('disturbed') } - if (self[kBody]) { + if (self.locked) { throw new TypeError('locked') } From 470dcfd059a83ea5dfd1c9bac393e1b9af0fd177 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 12:40:56 +0200 Subject: [PATCH 08/19] fixup --- lib/api/readable.js | 52 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 43 insertions(+), 9 deletions(-) diff --git a/lib/api/readable.js b/lib/api/readable.js index d747f3b7351..0ef76297351 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -158,14 +158,6 @@ function consume (self, type) { // TODO (fix): it's a little unclear what we need to do here. this.controller.error(new Error('locked')) } else { - self - .on('error', err => { - this.controller.error(err) - }) - .on('end', () => { - // autoDestroy might have been disabled. - self.destroy() - }) self[kBody] = { type, used: false, @@ -205,7 +197,49 @@ function consume (self, type) { } } } - start(self) + + return new Promise((resolve, reject) => { + // TODO (fix): This should wait until stream + // has been "constructed". + + const orgEmit = self.emit + + function cleanup () { + self.emit = orgEmit + self.off('error', check) + } + + function check (err) { + if (err) { + cleanup() + reject(err) + } else if (this._readableState.constructed) { + self + .on('error', err => { + this.controller.error(err) + }) + .on('end', () => { + // autoDestroy might have been disabled. + self.destroy() + }) + + start(self) + + cleanup() + resolve() + } else { + setTimeout(check, 100) + } + } + + self.emit = function (ev, fn) { + check(null) + return orgEmit.call(this, ev, fn) + } + self.on('error', check) + + check() + }) }, pull () { From ec51cb79f944b17611c30acd43345cf86d8197aa Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 15:34:02 +0200 Subject: [PATCH 09/19] fixup --- lib/api/readable.js | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/lib/api/readable.js b/lib/api/readable.js index 0ef76297351..acfcd89abab 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -199,9 +199,6 @@ function consume (self, type) { } return new Promise((resolve, reject) => { - // TODO (fix): This should wait until stream - // has been "constructed". - const orgEmit = self.emit function cleanup () { @@ -210,10 +207,13 @@ function consume (self, type) { } function check (err) { + if (!err && !this._readableState.constructed) { + return + } + if (err) { - cleanup() reject(err) - } else if (this._readableState.constructed) { + } else { self .on('error', err => { this.controller.error(err) @@ -225,11 +225,10 @@ function consume (self, type) { start(self) - cleanup() resolve() - } else { - setTimeout(check, 100) } + + cleanup() } self.emit = function (ev, fn) { From 3d371d05536210ba7aa0a81cbe5817c7783085d2 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 15:35:03 +0200 Subject: [PATCH 10/19] fixup --- lib/api/readable.js | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/api/readable.js b/lib/api/readable.js index acfcd89abab..4f1ad2dc029 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -84,10 +84,6 @@ module.exports = class BodyReadable extends Readable { return this.on(ev, fn) } - get locked () { - return !!this[kBody] - } - get bodyUsed () { if (this[kBody]) { return this[kBody].used From 16dbf3d976e78240e8c4837277c42eae6548d47c Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 15:36:12 +0200 Subject: [PATCH 11/19] fixup --- lib/api/readable.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/api/readable.js b/lib/api/readable.js index 4f1ad2dc029..d86c176ba1c 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -218,6 +218,10 @@ function consume (self, type) { // autoDestroy might have been disabled. self.destroy() }) + .on('close', () => { + // TODO (fix): Do webstreams have something + // corresponding? + }) start(self) From 02a169d543c93e7542781dc3182c43805b26e321 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 15:37:07 +0200 Subject: [PATCH 12/19] fixup --- lib/api/readable.js | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lib/api/readable.js b/lib/api/readable.js index d86c176ba1c..ac67e887998 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -66,6 +66,13 @@ module.exports = class BodyReadable extends Readable { return Readable.prototype.resume.call(this) } + pause (dest, pipeOpts) { + if (this[kBody] === undefined) { + consume(this) + } + return Readable.prototype.pause.call(this) + } + pipe (dest, pipeOpts) { if (this[kBody] === undefined) { consume(this) From f2c6797bf0150210e848579d355ffe4720518469 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 15:38:40 +0200 Subject: [PATCH 13/19] fixup --- lib/api/readable.js | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/lib/api/readable.js b/lib/api/readable.js index ac67e887998..f5962a6df33 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -30,6 +30,16 @@ module.exports = class BodyReadable extends Readable { this[kDestroyed] = false } + get readableLength () { + // TODO (fix): Read from web stream? + return super.readableLength + } + + get readableHighWaterMark () { + // TODO (fix): Read from web stream? + return super.readableHighWaterMark + } + destroy (err) { if (this[kDestroyed]) { return From d1e640a6758b98f467b1017e723a521e47b6d141 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 15:39:18 +0200 Subject: [PATCH 14/19] fixup --- lib/api/readable.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/api/readable.js b/lib/api/readable.js index f5962a6df33..f0e346aeb14 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -165,6 +165,8 @@ function consume (self, type) { ReadableStream = require('stream/web').ReadableStream } + // TODO (fix): emit pause/resume on self? + return new ReadableStream({ start (controller) { if (self[kBody]) { From 105686ada36177e18851348ea592bf77b7e7eb0e Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 15:40:05 +0200 Subject: [PATCH 15/19] fixup --- lib/api/readable.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/api/readable.js b/lib/api/readable.js index f0e346aeb14..06ed1b7ab18 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -25,6 +25,8 @@ class AbortError extends Error { module.exports = class BodyReadable extends Readable { constructor (opts) { + // TODO (fix): Improve allocation by making readableState + // allocated as lazily as possible and/or introduce pooling. super(opts) this[kBody] = undefined this[kDestroyed] = false From d50386a5ae8982fc1b1720628537a40f94c3fd99 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 15:40:58 +0200 Subject: [PATCH 16/19] fixup --- lib/api/readable.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/api/readable.js b/lib/api/readable.js index 06ed1b7ab18..0e626f2ae8a 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -94,6 +94,8 @@ module.exports = class BodyReadable extends Readable { on (ev, fn) { if (this[kBody] === undefined && (ev === 'data' || ev === 'readable')) { + // TODO (fix): consume in next tick in case event handler is removed + // in same tick? consume(this) } return Readable.prototype.on.call(this, ev, fn) From 673720d7b3dcbf5b284c441b7da902383e832256 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 15:41:50 +0200 Subject: [PATCH 17/19] fixup --- lib/api/readable.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/api/readable.js b/lib/api/readable.js index 0e626f2ae8a..9b6ed72e262 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -60,7 +60,6 @@ module.exports = class BodyReadable extends Readable { if (this[kBody]) { return this[kBody].push(val) } - return Readable.prototype.push.call(this, val) } @@ -151,6 +150,11 @@ function start (self) { } function consume (self, type) { + if (self.destroyed) { + // TODO (fix): What error? + throw new TypeError('disturbed') + } + if (self.bodyUsed) { throw new TypeError('disturbed') } From cc1d7564380103fa243fb0e1f87ccb94a0b1e89a Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 15:42:03 +0200 Subject: [PATCH 18/19] fixup --- lib/api/readable.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/api/readable.js b/lib/api/readable.js index 9b6ed72e262..ba6668c5a22 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -159,7 +159,7 @@ function consume (self, type) { throw new TypeError('disturbed') } - if (self.locked) { + if (self[kBody]) { throw new TypeError('locked') } From e0e0d842999262ceb00fa9bea5098d59a9a0a085 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 25 Jul 2021 19:26:28 +0200 Subject: [PATCH 19/19] fixup --- lib/api/readable.js | 329 ++++++++------------------------------------ 1 file changed, 61 insertions(+), 268 deletions(-) diff --git a/lib/api/readable.js b/lib/api/readable.js index ba6668c5a22..47bf8f297bc 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -1,13 +1,10 @@ 'use strict' const { Readable } = require('stream') -const assert = require('assert') let Blob -let ReadableStream const kBody = Symbol('body') -const kDestroyed = Symbol('destroyed') const kWebStreamType = 1 const kTextType = 2 @@ -25,96 +22,21 @@ class AbortError extends Error { module.exports = class BodyReadable extends Readable { constructor (opts) { - // TODO (fix): Improve allocation by making readableState - // allocated as lazily as possible and/or introduce pooling. super(opts) - this[kBody] = undefined - this[kDestroyed] = false - } - - get readableLength () { - // TODO (fix): Read from web stream? - return super.readableLength - } - - get readableHighWaterMark () { - // TODO (fix): Read from web stream? - return super.readableHighWaterMark - } - - destroy (err) { - if (this[kDestroyed]) { - return - } - - if (this[kBody] && !err && !this._readableState.endEmitted) { - err = new AbortError() - } - - this[kDestroyed] = true - - return Readable.prototype.destroy.call(this, err) - } - - push (val) { - if (this[kBody]) { - return this[kBody].push(val) - } - return Readable.prototype.push.call(this, val) - } - - read (n) { - if (this[kBody] === undefined) { - consume(this) - } - return Readable.prototype.read.call(this, n) - } - - resume () { - if (this[kBody] === undefined) { - consume(this) - } - return Readable.prototype.resume.call(this) - } - - pause (dest, pipeOpts) { - if (this[kBody] === undefined) { - consume(this) - } - return Readable.prototype.pause.call(this) - } - - pipe (dest, pipeOpts) { - if (this[kBody] === undefined) { - consume(this) - } - return Readable.prototype.pipe.call(this, dest, pipeOpts) - } - - on (ev, fn) { - if (this[kBody] === undefined && (ev === 'data' || ev === 'readable')) { - // TODO (fix): consume in next tick in case event handler is removed - // in same tick? - consume(this) - } - return Readable.prototype.on.call(this, ev, fn) - } - addListener (ev, fn) { - return this.on(ev, fn) + this[kBody] = undefined } + // https://fetch.spec.whatwg.org/#dom-body-bodyused get bodyUsed () { - if (this[kBody]) { - return this[kBody].used - } - - return this.readableDidRead !== undefined - ? this.readableDidRead - : this[kBody] === null + return isDisturbed(this) } get body () { + if (this[kBody]?.type === kWebStreamType) { + return this[kBody].body + } + return consume(this, kWebStreamType) } @@ -135,209 +57,80 @@ module.exports = class BodyReadable extends Readable { } } -function start (self) { - assert(self.listenerCount('data') === 0) +function isLocked (self) { + return self[kBody] && (self[kBody].type !== kWebStreamType || self[kBody].body.locked) +} - const state = self._readableState - while (state.buffer.length) { - self[kBody].push(state.buffer.shift()) - } - if (state.ended) { - self[kBody].push(null) - } +// https://streams.spec.whatwg.org/#readablestream-disturbed +function isDisturbed (self) { + return self.destroyed || self.readableDidRead +} - self._read() +// https://fetch.spec.whatwg.org/#body-unusable +function isUnusable (self) { + return isDisturbed(self) || isLocked(self) } function consume (self, type) { - if (self.destroyed) { - // TODO (fix): What error? - throw new TypeError('disturbed') - } - - if (self.bodyUsed) { - throw new TypeError('disturbed') - } - - if (self[kBody]) { - throw new TypeError('locked') - } - - if (!type) { - self[kBody] = null - return self + if (isUnusable(self)) { + throw new TypeError('unusable') } if (type === kWebStreamType) { - if (!ReadableStream) { - ReadableStream = require('stream/web').ReadableStream + self[kBody] = { + type, + body: Readable.toWeb(self) } - // TODO (fix): emit pause/resume on self? - - return new ReadableStream({ - start (controller) { - if (self[kBody]) { - // TODO (fix): it's a little unclear what we need to do here. - this.controller.error(new Error('locked')) - } else { - self[kBody] = { - type, - used: false, - buffer: self, - controller, - push (val) { - if (self.destroyed) { - return false - } - - // TODO (fix): This is not strictly correct. - // Just because chunk was enqueued doesn't mean - // that it was read? - this.used = true - - if (!this.controller) { - this.buffer.push(val) - return false - } - - if (!val) { - this.controller.close() - - // TODO (fix): This is not strictly correct. - // Just because chunk was enqueued doesn't mean - // that it was read? How do we wait for stream - // to be drained? queueMicrotask is just a hack - // to hope that it's been drained. - queueMicrotask(() => { - Readable.prototype.push.call(self, null) - }) - } else { - this.controller.enqueue(new Uint8Array(val)) - } - - return this.controller.desiredSize > 0 - } - } - } - - return new Promise((resolve, reject) => { - const orgEmit = self.emit - - function cleanup () { - self.emit = orgEmit - self.off('error', check) - } - - function check (err) { - if (!err && !this._readableState.constructed) { - return - } - - if (err) { - reject(err) - } else { - self - .on('error', err => { - this.controller.error(err) - }) - .on('end', () => { - // autoDestroy might have been disabled. - self.destroy() - }) - .on('close', () => { - // TODO (fix): Do webstreams have something - // corresponding? - }) - - start(self) - - resolve() - } - - cleanup() - } - - self.emit = function (ev, fn) { - check(null) - return orgEmit.call(this, ev, fn) - } - self.on('error', check) - - check() - }) - }, - - pull () { - self._read() - }, - - cancel (reason) { - let err - - if (reason instanceof Error) { - err = reason - } else if (typeof reason === 'string') { - err = new Error(reason) - } else { - err = new AbortError() - } - - self.destroy(err) - } - }, { highWaterMark: 16 * 1024 }) + return self[kBody].body } return new Promise((resolve, reject) => { - self - .on('error', reject) - .on('end', () => { - // autoDestroy might have been disabled. - self.destroy() - }) self[kBody] = { type, - used: false, - body: this.type === kTextType || this.type === kJSONType ? '' : [], - push (val) { - if (self.destroyed) { - return false - } - - this.used = true + resolve, + reject, + body: type === kTextType || type === kJSONType ? '' : [] + } + self + .on('error', reject) + .on('data', function (val) { + const { type } = this[kBody] - if (val !== null) { - if (this.type === kTextType || this.type === kJSONType) { - this.body += val - } else { - this.body.push(val) - } + if (type === kTextType || type === kJSONType) { + this[kBody].body += val } else { - try { - if (this.type === kTextType) { - resolve(this.body) - } else if (this.type === kJSONType) { - resolve(JSON.parse(this.body)) - } else if (this.type === kArrayBufferType) { - resolve(Buffer.concat(this.body).buffer) - } else if (this.type === kBlobType) { - if (!Blob) { - Blob = require('buffer').Blob - } - resolve(new Blob(this.body)) + this[kBody].body.push(val) + } + }) + .on('end', function () { + const { type, resolve, body } = this[kBody] + + try { + if (type === kTextType) { + resolve(body) + } else if (type === kJSONType) { + resolve(JSON.parse(body)) + } else if (type === kArrayBufferType) { + resolve(Buffer.concat(body).buffer) + } else if (type === kBlobType) { + if (!Blob) { + Blob = require('buffer').Blob } - - this.body = null - Readable.prototype.push.call(self, null) - } catch (err) { - self.destroy(err) + resolve(new Blob(body)) } - } - return true - } - } + this[kBody].body = null + } catch (err) { + self.destroy(err) + } + }) + .on('close', function () { + const { body, reject } = this[kBody] - start(self) + if (body !== null) { + reject(new AbortError()) + } + }) }) }