From e101b1d563e7faa537a41c4b5f632ce02a5f4c97 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 26 Jul 2021 08:38:13 +0200 Subject: [PATCH] fixup --- lib/internal/streams/destroy.js | 2 + lib/internal/streams/readable.js | 271 ++++++++++++++++++++++------ lib/internal/webstreams/adapters.js | 5 + 3 files changed, 223 insertions(+), 55 deletions(-) diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 685ef90285048c..b5eb76c1c0a8fe 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -356,6 +356,8 @@ function destroyer(stream, err) { } module.exports = { + kConstruct, + kDestroy, construct, destroyer, destroy, diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 8a2290f439a536..ab5ad656c75bb1 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -35,6 +35,7 @@ const { SymbolAsyncIterator, Symbol, TypeError, + Uint8Array, } = primordials; module.exports = Readable; @@ -45,6 +46,8 @@ const { Stream, prependListener } = require('internal/streams/legacy'); const { Buffer } = require('buffer'); let Blob; +let ReadableStream; +let CountQueuingStrategy; const { addAbortSignal, @@ -75,9 +78,11 @@ const { validateObject } = require('internal/validators'); const kPaused = Symbol('kPaused'); const kConsume = Symbol('kConsume'); +const kReading = Symbol('kReading'); const { StringDecoder } = require('string_decoder'); const from = require('internal/streams/from'); +const assert = require('internal/assert'); ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); ObjectSetPrototypeOf(Readable, Stream); @@ -213,6 +218,7 @@ function Readable(options) { } this[kConsume] = null; + this[kReading] = false; // Is stream being consumed through Readable API? Stream.call(this, options); @@ -238,6 +244,11 @@ Readable.prototype[EE.captureRejectionSymbol] = function(err) { // similar to how Writable.write() returns true if you should // write() some more. Readable.prototype.push = function(chunk, encoding) { + if (this[kConsume] && chunk !== null && !this[kReading]) { + encoding = encoding || this._readableState.defaultEncoding; + return this[kConsume].push(chunk, encoding); + } + return readableAddChunk(this, chunk, encoding, false); }; @@ -307,10 +318,12 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { maybeReadMore(stream, state); } + const consumed = this[kConsume] ? this[kConsume].push(chunk, encoding) : true; + // We can push more data if we are below the highWaterMark. // Also, if we have no data yet, we can stand some more bytes. // This is to work around cases where hwm=0, such as the repl. - return !state.ended && + return consumed && !state.ended && (state.length < state.highWaterMark || state.length === 0); } @@ -402,6 +415,27 @@ function howMuchToRead(n, state) { return state.ended ? state.length : 0; } + +function _read(self, n) { + // Call internal read method + try { + const result = self._read(n); + if (result != null) { + const then = result.then; + if (typeof then === 'function') { + then.call( + result, + nop, + function(err) { + errorOrDestroy(self, err); + }); + } + } + } catch (err) { + errorOrDestroy(self, err); + } +} + // You can override either this method, or the async _read(n) below. Readable.prototype.read = function(n) { debug('read', n); @@ -496,22 +530,7 @@ Readable.prototype.read = function(n) { state.needReadable = true; // Call internal read method - try { - const result = this._read(state.highWaterMark); - if (result != null) { - const then = result.then; - if (typeof then === 'function') { - then.call( - result, - nop, - function(err) { - errorOrDestroy(this, err); - }); - } - } - } catch (err) { - errorOrDestroy(this, err); - } + _read(this, state.highWaterMark); state.sync = false; // If _read pushed data synchronously, then `reading` will be false, @@ -906,6 +925,8 @@ Readable.prototype.on = function(ev, fn) { const state = this._readableState; if (ev === 'data') { + this[kReading] = true; + // Update readableListening so that resume() may be a no-op // a few lines down. This is needed to support once('readable'). state.readableListening = this.listenerCount('readable') > 0; @@ -914,6 +935,8 @@ Readable.prototype.on = function(ev, fn) { if (state.flowing !== false) this.resume(); } else if (ev === 'readable') { + this[kReading] = true; + if (!state.endEmitted && !state.readableListening) { state.readableListening = state.needReadable = true; state.flowing = false; @@ -1310,7 +1333,7 @@ ObjectDefineProperties(ReadableState.prototype, { body: { get() { if (this[kConsume]?.type === kWebStreamType) { - return this[kConsume].body; + return this[kConsume].stream; } return consume(this, kWebStreamType); @@ -1343,8 +1366,7 @@ ObjectDefineProperties(ReadableState.prototype, { }); function isLocked(self) { - return self[kConsume] && - (self[kConsume].type !== kWebStreamType || self[kConsume].body.locked); + return self[kConsume]?.stream?.locked === true; } // https://streams.spec.whatwg.org/#readablestream-disturbed @@ -1363,12 +1385,122 @@ function consume(self, type) { } if (type === kWebStreamType) { - self[kConsume] = { + if (!ReadableStream) { + ReadableStream = require('internal/webstreams/readablestream') + .ReadableStream; + } + + const objectMode = self.readableObjectMode; + const highWaterMark = self.readableHighWaterMark; + // When not running in objectMode explicitly, we just fall + // back to a minimal strategy that just specifies the highWaterMark + // and no size algorithm. Using a ByteLengthQueuingStrategy here + // is unnecessary. + let strategy; + if (objectMode) { + if (!CountQueuingStrategy) { + CountQueuingStrategy = require('internal/webstreams/queuingstrategies'); + } + strategy = new CountQueuingStrategy({ highWaterMark }); + } else { + strategy = { highWaterMark }; + } + + + self + .on('error', function(err) { + const { controller } = this[kConsume]; + controller.error(err); + }) + .on('close', function() { + const { controller } = this[kConsume]; + if (controller) { + controller.error(new AbortError()); + } + }); + + const consume = self[kConsume] = { type, - body: Readable.toWeb(self) + objectMode, + controller: null, + push(chunk) { + const { objectMode, controller } = this; + + assert(controller); + + if (chunk === null) { + controller.close(); + this.controller = null; + } else { + if (!objectMode) { + if (typeof chunk === 'string') { + chunk = new Uint8Array(Buffer.from(chunk)); + } else if (Buffer.isBuffer(chunk)) { + // Copy the Buffer to detach it from the pool. + chunk = new Uint8Array(chunk); + } else if (Stream._isUint8Array(chunk)) { + // Do nothing... + } else if (chunk != null) { + throw new ERR_INVALID_ARG_TYPE( + 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); + } + } + + // TODO: Does controller perform any type checks? + controller.enqueue(chunk); + } + + return controller.desiredSize > 0; + }, + stream: new ReadableStream({ + async start(controller) { + consume.controller = controller; + + const { _readableState: state } = self; + + if (self[kReading]) { + while (controller.desiredSize > 0) { + const chunk = self.read(); + if (chunk === null) { + break; + } + controller.enqueue(chunk); + } + } else { + const buffer = state.buffer; + while (buffer.length) { + controller.enqueue(buffer.shift()); + } + state.lenth = 0; + } + + if (state.ended) { + controller.close(); + } + + if (!state.constructed) { + await EE.once(destroyImpl.kConstruct, self); + } + }, + pull() { + const { _readableState: state } = self; + + const n = consume.controller.desiredSize; + + if (self[kReading]) { + assert(state.length === 0); + self.read(n); + } else { + _read(self, n); + } + }, + cancel(reason) { + self.destroy(reason); + }, + }, strategy) }; - return self[kConsume].body; + return consume.stream; } return new Promise((resolve, reject) => { @@ -1376,43 +1508,71 @@ function consume(self, type) { type, resolve, reject, - body: type === kTextType || type === kJSONType ? '' : [] - }; - self - .on('error', reject) - .on('data', function(val) { - const { type } = this[kConsume]; + decoder: null, + body: type === kTextType || type === kJSONType ? '' : [], + push(chunk, encoding) { + const { type, body, resolve, decoder } = this[kConsume]; + + if (chunk === null) { + try { + if (type === kTextType) { + resolve(body + (decoder ? decoder.end() : '')); + } else if (type === kJSONType) { + resolve(JSONParse(body + (decoder ? decoder.end() : ''))); + } else if (type === kArrayBufferType) { + resolve(Buffer.concat(body).buffer); + } else if (type === kBlobType) { + if (!Blob) { + Blob = require('buffer').Blob; + } + resolve(new Blob(body)); + } - // TODO (fix): Do we need type check and/or conversion? + this[kConsume].body = null; + } catch (err) { + self.destroy(err); + } + } else if (type === kTextType || type === kJSONType) { + if (typeof chunk === 'string') { + if (decoder) { + chunk = decoder.write(Buffer.from(chunk)); + } + // TODO: Encoding check/transform? + } else if (chunk instanceof Buffer) { + if (!decoder) { + this[kConsume].decoder = new StringDecoder('utf8'); + } + encoding = decoder.write(chunk); + } else if (Stream._isUint8Array(chunk)) { + encoding = decoder.write(Stream._uint8ArrayToBuffer(chunk)); + } else { + throw new ERR_INVALID_ARG_TYPE( + 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); + } - if (type === kTextType || type === kJSONType) { - this[kConsume].body += val; + this[kConsume].body += chunk; } else { - this[kConsume].body.push(val); - } - }) - .on('end', function() { - const { type, resolve, body } = this[kConsume]; - - try { - if (type === kTextType) { - resolve(body); - } else if (type === kJSONType) { - resolve(JSONParse(body)); - } else if (type === kArrayBufferType) { - resolve(Buffer.concat(body).buffer); - } else if (type === kBlobType) { - if (!Blob) { - Blob = require('buffer').Blob; - } - resolve(new Blob(body)); + if (typeof chunk === 'string') { + chunk = Buffer.from(chunk); + // TODO: Encoding check/transform? + } else if (chunk instanceof Buffer) { + // Do nothing... + } else if (Stream._isUint8Array(chunk)) { + chunk = Stream._uint8ArrayToBuffer(chunk); + } else { + throw new ERR_INVALID_ARG_TYPE( + 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); } - this[kConsume].body = null; - } catch (err) { - self.destroy(err); + this[kConsume].body.push(chunk); } - }) + + return true; + } + }; + + self + .on('error', reject) .on('close', function() { const { body, reject } = this[kConsume]; @@ -1522,5 +1682,6 @@ Readable.fromWeb = function(readableStream, options) { }; Readable.toWeb = function(streamReadable) { - return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable); + return streamReadable[kConsume] !== undefined ? streamReadable.stream : + lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable); }; diff --git a/lib/internal/webstreams/adapters.js b/lib/internal/webstreams/adapters.js index 1c251a830726ba..4e59d1bc043cba 100644 --- a/lib/internal/webstreams/adapters.js +++ b/lib/internal/webstreams/adapters.js @@ -50,6 +50,7 @@ const { ERR_INVALID_STATE, ERR_STREAM_PREMATURE_CLOSE, }, + AbortError, } = require('internal/errors'); const { @@ -395,6 +396,10 @@ function newReadableStreamFromStreamReadable(streamReadable) { streamReadable.pause(); const cleanup = finished(streamReadable, (error) => { + if (error?.code === 'ERR_STREAM_PREMATURE_CLOSE') { + error = new AbortError(); + } + cleanup(); // This is a protection against non-standard, legacy streams // that happen to emit an error event again after finished is called.