From 0a20d5022c4f915dc7af3b75c43f167d9df60db9 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 10 Nov 2021 10:45:10 +0200 Subject: [PATCH] stream: remove thenable support Remove support for returning thenables in stream implementation methods. This is causing more confusion and issues than it's worth. Refs: https://github.com/nodejs/node/issues/39535 --- lib/internal/streams/destroy.js | 30 +- lib/internal/streams/readable.js | 13 +- lib/internal/streams/transform.js | 67 +---- lib/internal/streams/writable.js | 15 +- .../test-stream-construct-async-error.js | 261 ------------------ 5 files changed, 6 insertions(+), 380 deletions(-) delete mode 100644 test/parallel/test-stream-construct-async-error.js diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 7d3657443e6ab5..10f5471e21d3eb 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -106,20 +106,7 @@ function _destroy(self, err, cb) { } } try { - const result = self._destroy(err || null, onDestroy); - if (result != null) { - const then = result.then; - if (typeof then === 'function') { - then.call( - result, - function() { - process.nextTick(onDestroy, null); - }, - function(err) { - process.nextTick(onDestroy, err); - }); - } - } + self._destroy(err || null, onDestroy); } catch (err) { onDestroy(err); } @@ -285,20 +272,7 @@ function constructNT(stream) { } try { - const result = stream._construct(onConstruct); - if (result != null) { - const then = result.then; - if (typeof then === 'function') { - then.call( - result, - function() { - process.nextTick(onConstruct, null); - }, - function(err) { - process.nextTick(onConstruct, err); - }); - } - } + stream._construct(onConstruct); } catch (err) { onConstruct(err); } diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 3d37f277a37779..fd182739dbf6c6 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -493,18 +493,7 @@ Readable.prototype.read = function(n) { // Call internal read method try { - const result = this._read(state.highWaterMark); - if (result != null) { - const then = result.then; - if (typeof then === 'function') { - then.call( - result, - nop, - function(err) { - errorOrDestroy(this, err); - }); - } - } + this._read(state.highWaterMark); } catch (err) { errorOrDestroy(this, err); } diff --git a/lib/internal/streams/transform.js b/lib/internal/streams/transform.js index 26e0b07c2956c8..57326d5f3fbd93 100644 --- a/lib/internal/streams/transform.js +++ b/lib/internal/streams/transform.js @@ -107,10 +107,8 @@ function Transform(options) { } function final(cb) { - let called = false; if (typeof this._flush === 'function' && !this.destroyed) { - const result = this._flush((er, data) => { - called = true; + this._flush((er, data) => { if (er) { if (cb) { cb(er); @@ -128,33 +126,6 @@ function final(cb) { cb(); } }); - if (result !== undefined && result !== null) { - try { - const then = result.then; - if (typeof then === 'function') { - then.call( - result, - (data) => { - if (called) - return; - if (data != null) - this.push(data); - this.push(null); - if (cb) - process.nextTick(cb); - }, - (err) => { - if (cb) { - process.nextTick(cb, err); - } else { - process.nextTick(() => this.destroy(err)); - } - }); - } - } catch (err) { - process.nextTick(() => this.destroy(err)); - } - } } else { this.push(null); if (cb) { @@ -180,9 +151,7 @@ Transform.prototype._write = function(chunk, encoding, callback) { const wState = this._writableState; const length = rState.length; - let called = false; - const result = this._transform(chunk, encoding, (err, val) => { - called = true; + this._transform(chunk, encoding, (err, val) => { if (err) { callback(err); return; @@ -203,38 +172,6 @@ Transform.prototype._write = function(chunk, encoding, callback) { this[kCallback] = callback; } }); - if (result !== undefined && result != null) { - try { - const then = result.then; - if (typeof then === 'function') { - then.call( - result, - (val) => { - if (called) - return; - - if (val != null) { - this.push(val); - } - - if ( - wState.ended || - length === rState.length || - rState.length < rState.highWaterMark || - rState.length === 0) { - process.nextTick(callback); - } else { - this[kCallback] = callback; - } - }, - (err) => { - process.nextTick(callback, err); - }); - } - } catch (err) { - process.nextTick(callback, err); - } - } }; Transform.prototype._read = function() { diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index 727e7ede71f8aa..1acc81fdf0b07f 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -692,20 +692,7 @@ function callFinal(stream, state) { state.pendingcb++; try { - const result = stream._final(onFinish); - if (result != null) { - const then = result.then; - if (typeof then === 'function') { - then.call( - result, - function() { - process.nextTick(onFinish, null); - }, - function(err) { - process.nextTick(onFinish, err); - }); - } - } + stream._final(onFinish); } catch (err) { onFinish(stream, state, err); } diff --git a/test/parallel/test-stream-construct-async-error.js b/test/parallel/test-stream-construct-async-error.js deleted file mode 100644 index 1c647235e29429..00000000000000 --- a/test/parallel/test-stream-construct-async-error.js +++ /dev/null @@ -1,261 +0,0 @@ -'use strict'; - -const common = require('../common'); -const { - Duplex, - Writable, - Transform, -} = require('stream'); -const { setTimeout } = require('timers/promises'); -const assert = require('assert'); - -{ - class Foo extends Duplex { - async _construct(cb) { - // eslint-disable-next-line no-restricted-syntax - await setTimeout(common.platformTimeout(1)); - cb(); - throw new Error('boom'); - } - } - - const foo = new Foo(); - foo.on('error', common.expectsError({ - message: 'boom' - })); - foo.on('close', common.mustCall(() => { - assert(foo._writableState.constructed); - assert(foo._readableState.constructed); - })); -} - -{ - class Foo extends Duplex { - async _destroy(err, cb) { - // eslint-disable-next-line no-restricted-syntax - await setTimeout(common.platformTimeout(1)); - throw new Error('boom'); - } - } - - const foo = new Foo(); - foo.destroy(); - foo.on('error', common.expectsError({ - message: 'boom' - })); - foo.on('close', common.mustCall(() => { - assert(foo.destroyed); - })); -} - -{ - class Foo extends Duplex { - async _destroy(err, cb) { - // eslint-disable-next-line no-restricted-syntax - await setTimeout(common.platformTimeout(1)); - } - } - - const foo = new Foo(); - foo.destroy(); - foo.on('close', common.mustCall(() => { - assert(foo.destroyed); - })); -} - -{ - class Foo extends Duplex { - async _construct() { - // eslint-disable-next-line no-restricted-syntax - await setTimeout(common.platformTimeout(1)); - } - - _write = common.mustCall((chunk, encoding, cb) => { - cb(); - }); - - _read() {} - } - - const foo = new Foo(); - foo.write('test', common.mustCall()); -} - -{ - class Foo extends Duplex { - async _construct(callback) { - // eslint-disable-next-line no-restricted-syntax - await setTimeout(common.platformTimeout(1)); - callback(); - } - - _write = common.mustCall((chunk, encoding, cb) => { - cb(); - }); - - _read() {} - } - - const foo = new Foo(); - foo.write('test', common.mustCall()); - foo.on('error', common.expectsError({ - code: 'ERR_MULTIPLE_CALLBACK' - })); -} - -{ - class Foo extends Writable { - _write = common.mustCall((chunk, encoding, cb) => { - cb(); - }); - - async _final() { - // eslint-disable-next-line no-restricted-syntax - await setTimeout(common.platformTimeout(1)); - } - } - - const foo = new Foo(); - foo.end('hello'); - foo.on('finish', common.mustCall()); -} - -{ - class Foo extends Writable { - _write = common.mustCall((chunk, encoding, cb) => { - cb(); - }); - - async _final(callback) { - // eslint-disable-next-line no-restricted-syntax - await setTimeout(common.platformTimeout(1)); - callback(); - } - } - - const foo = new Foo(); - foo.end('hello'); - foo.on('finish', common.mustCall()); -} - -{ - class Foo extends Writable { - _write = common.mustCall((chunk, encoding, cb) => { - cb(); - }); - - async _final() { - // eslint-disable-next-line no-restricted-syntax - await setTimeout(common.platformTimeout(1)); - throw new Error('boom'); - } - } - - const foo = new Foo(); - foo.end('hello'); - foo.on('error', common.expectsError({ - message: 'boom' - })); - foo.on('close', common.mustCall()); -} - -{ - const expected = ['hello', 'world']; - class Foo extends Transform { - async _flush() { - return 'world'; - } - - _transform(chunk, encoding, callback) { - callback(null, chunk); - } - } - - const foo = new Foo(); - foo.end('hello'); - foo.on('data', common.mustCall((chunk) => { - assert.strictEqual(chunk.toString(), expected.shift()); - }, 2)); -} - -{ - const expected = ['hello', 'world']; - class Foo extends Transform { - async _flush(callback) { - callback(null, 'world'); - } - - _transform(chunk, encoding, callback) { - callback(null, chunk); - } - } - - const foo = new Foo(); - foo.end('hello'); - foo.on('data', common.mustCall((chunk) => { - assert.strictEqual(chunk.toString(), expected.shift()); - }, 2)); -} - -{ - class Foo extends Transform { - async _flush(callback) { - throw new Error('boom'); - } - - _transform(chunk, encoding, callback) { - callback(null, chunk); - } - } - - const foo = new Foo(); - foo.end('hello'); - foo.on('data', common.mustCall()); - foo.on('error', common.expectsError({ - message: 'boom' - })); - foo.on('close', common.mustCall()); -} - -{ - class Foo extends Transform { - async _transform(chunk) { - return chunk.toString().toUpperCase(); - } - } - - const foo = new Foo(); - foo.end('hello'); - foo.on('data', common.mustCall((chunk) => { - assert.strictEqual(chunk.toString(), 'HELLO'); - })); -} - -{ - class Foo extends Transform { - async _transform(chunk, _, callback) { - callback(null, chunk.toString().toUpperCase()); - } - } - - const foo = new Foo(); - foo.end('hello'); - foo.on('data', common.mustCall((chunk) => { - assert.strictEqual(chunk.toString(), 'HELLO'); - })); -} - -{ - class Foo extends Transform { - async _transform() { - throw new Error('boom'); - } - } - - const foo = new Foo(); - foo.end('hello'); - foo.on('error', common.expectsError({ - message: 'boom' - })); - foo.on('close', common.mustCall()); -}