From 0b853a7576d4a72d2f364631ede9f0fe2032d0dc Mon Sep 17 00:00:00 2001 From: IlyasShabi Date: Wed, 20 Mar 2024 18:27:29 +0100 Subject: [PATCH] stream: support typed arrays PR-URL: https://github.com/nodejs/node/pull/51866 Reviewed-By: Matteo Collina Reviewed-By: Raz Luvaton Reviewed-By: Paolo Insogna Reviewed-By: Benjamin Gruenbaum --- benchmark/streams/readable-uint8array.js | 36 ++++++++ benchmark/streams/writable-uint8array.js | 51 +++++++++++ doc/api/stream.md | 75 ++++++++++------ lib/internal/streams/readable.js | 8 +- lib/internal/streams/writable.js | 4 +- lib/stream.js | 4 +- test/parallel/test-net-write-arguments.js | 2 +- test/parallel/test-stream-typedarray.js | 105 ++++++++++++++++++++++ 8 files changed, 250 insertions(+), 35 deletions(-) create mode 100644 benchmark/streams/readable-uint8array.js create mode 100644 benchmark/streams/writable-uint8array.js create mode 100644 test/parallel/test-stream-typedarray.js diff --git a/benchmark/streams/readable-uint8array.js b/benchmark/streams/readable-uint8array.js new file mode 100644 index 00000000000000..372b415f87be20 --- /dev/null +++ b/benchmark/streams/readable-uint8array.js @@ -0,0 +1,36 @@ +'use strict'; +const common = require('../common.js'); +const { Readable } = require('stream'); + +const bench = common.createBenchmark(main, { + n: [1e6], + kind: ['read', 'encoding'], +}); +const ABC = new Uint8Array([0x41, 0x42, 0x43]); + +function main({ n, kind }) { + switch (kind) { + case 'read': { + bench.start(); + const readable = new Readable({ + read() {}, + }); + for (let i = 0; i < n; ++i) readable.push(ABC); + bench.end(n); + break; + } + + case 'encoding': { + bench.start(); + const readable = new Readable({ + read() {}, + }); + readable.setEncoding('utf8'); + for (let i = 0; i < n; ++i) readable.push(ABC); + bench.end(n); + break; + } + default: + throw new Error('Invalid kind'); + } +} diff --git a/benchmark/streams/writable-uint8array.js b/benchmark/streams/writable-uint8array.js new file mode 100644 index 00000000000000..9e243384738711 --- /dev/null +++ b/benchmark/streams/writable-uint8array.js @@ -0,0 +1,51 @@ +'use strict'; +const common = require('../common.js'); +const { Writable } = require('stream'); + +const bench = common.createBenchmark(main, { + n: [50e6], + kind: ['write', 'object-mode', 'writev'], +}); +const ABC = new Uint8Array([0x41, 0x42, 0x43]); + +function main({ n, kind }) { + switch (kind) { + case 'write': { + bench.start(); + const wr = new Writable({ + write(chunk, encoding, cb) { + cb(); + }, + }); + for (let i = 0; i < n; ++i) wr.write(ABC); + bench.end(n); + break; + } + + case 'object-mode': { + bench.start(); + const wr = new Writable({ + objectMode: true, + write(chunk, encoding, cb) { + cb(); + }, + }); + for (let i = 0; i < n; ++i) wr.write(ABC); + bench.end(n); + break; + } + case 'writev': { + bench.start(); + const wr = new Writable({ + writev(chunks, cb) { + cb(); + }, + }); + for (let i = 0; i < n; ++i) wr.write(ABC); + bench.end(n); + break; + } + default: + throw new Error('Invalid kind'); + } +} diff --git a/doc/api/stream.md b/doc/api/stream.md index e770814c3d6315..dc1bff6b5ae4a7 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -282,11 +282,19 @@ The `finished` API also provides a [callback version][stream-finished]. ### Object mode -All streams created by Node.js APIs operate exclusively on strings and `Buffer` -(or `Uint8Array`) objects. It is possible, however, for stream implementations -to work with other types of JavaScript values (with the exception of `null`, -which serves a special purpose within streams). Such streams are considered to -operate in "object mode". +All streams created by Node.js APIs operate exclusively on strings, {Buffer}, +{TypedArray} and {DataView} objects: + +* `Strings` and `Buffers` are the most common types used with streams. +* `TypedArray` and `DataView` lets you handle binary data with types like + `Int32Array` or `Uint8Array`. When you write a TypedArray or DataView to a + stream, Node.js processes + the raw bytes. + +It is possible, however, for stream +implementations to work with other types of JavaScript values (with the +exception of `null`, which serves a special purpose within streams). +Such streams are considered to operate in "object mode". Stream instances are switched into object mode using the `objectMode` option when the stream is created. Attempting to switch an existing stream into @@ -712,6 +720,9 @@ console.log(myStream.destroyed); // true -* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams - not operating in object mode, `chunk` must be a string, `Buffer` or - `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value - other than `null`. +* `chunk` {string|Buffer|TypedArray|DataView|any} Optional data to write. For + streams not operating in object mode, `chunk` must be a {string}, {Buffer}, + {TypedArray} or {DataView}. For object mode streams, `chunk` may be any + JavaScript value other than `null`. * `encoding` {string} The encoding if `chunk` is a string * `callback` {Function} Callback for when the stream is finished. * Returns: {this} @@ -926,6 +937,9 @@ Getter for the property `objectMode` of a given `Writable` stream. -* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams - not operating in object mode, `chunk` must be a string, `Buffer` or - `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value - other than `null`. +* `chunk` {string|Buffer|TypedArray|DataView|any} Optional data to write. For + streams not operating in object mode, `chunk` must be a {string}, {Buffer}, + {TypedArray} or {DataView}. For object mode streams, `chunk` may be any + JavaScript value other than `null`. * `encoding` {string|null} The encoding, if `chunk` is a string. **Default:** `'utf8'` * `callback` {Function} Callback for when this chunk of data is flushed. * Returns: {boolean} `false` if the stream wishes for the calling code to @@ -1763,15 +1777,18 @@ setTimeout(() => { -* `chunk` {Buffer|Uint8Array|string|null|any} Chunk of data to unshift onto the - read queue. For streams not operating in object mode, `chunk` must be a - string, `Buffer`, `Uint8Array`, or `null`. For object mode streams, `chunk` - may be any JavaScript value. +* `chunk` {Buffer|TypedArray|DataView|string|null|any} Chunk of data to unshift + onto the read queue. For streams not operating in object mode, `chunk` must + be a {string}, {Buffer}, {TypedArray}, {DataView} or `null`. + For object mode streams, `chunk` may be any JavaScript value. * `encoding` {string} Encoding of string chunks. Must be a valid `Buffer` encoding, such as `'utf8'` or `'ascii'`. @@ -3525,8 +3542,8 @@ changes: **Default:** `'utf8'`. * `objectMode` {boolean} Whether or not the [`stream.write(anyObj)`][stream-write] is a valid operation. When set, - it becomes possible to write JavaScript values other than string, - `Buffer` or `Uint8Array` if supported by the stream implementation. + it becomes possible to write JavaScript values other than string, {Buffer}, + {TypedArray} or {DataView} if supported by the stream implementation. **Default:** `false`. * `emitClose` {boolean} Whether or not the stream should emit `'close'` after it has been destroyed. **Default:** `true`. @@ -4075,22 +4092,25 @@ It can be overridden by child classes but it **must not** be called directly. -* `chunk` {Buffer|Uint8Array|string|null|any} Chunk of data to push into the - read queue. For streams not operating in object mode, `chunk` must be a - string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be - any JavaScript value. +* `chunk` {Buffer|TypedArray|DataView|string|null|any} Chunk of data to push + into the read queue. For streams not operating in object mode, `chunk` must + be a {string}, {Buffer}, {TypedArray} or {DataView}. For object mode streams, + `chunk` may be any JavaScript value. * `encoding` {string} Encoding of string chunks. Must be a valid `Buffer` encoding, such as `'utf8'` or `'ascii'`. * Returns: {boolean} `true` if additional chunks of data may continue to be pushed; `false` otherwise. -When `chunk` is a `Buffer`, `Uint8Array`, or `string`, the `chunk` of data will -be added to the internal queue for users of the stream to consume. +When `chunk` is a {Buffer}, {TypedArray}, {DataView} or {string}, the `chunk` +of data will be added to the internal queue for users of the stream to consume. Passing `chunk` as `null` signals the end of the stream (EOF), after which no more data can be written. @@ -4765,8 +4785,9 @@ situations within Node.js where this is done, particularly in the Use of `readable.push('')` is not recommended. -Pushing a zero-byte string, `Buffer`, or `Uint8Array` to a stream that is not in -object mode has an interesting side effect. Because it _is_ a call to +Pushing a zero-byte {string}, {Buffer}, {TypedArray} or {DataView} to a stream +that is not in object mode has an interesting side effect. +Because it _is_ a call to [`readable.push()`][stream-push], the call will end the reading process. However, because the argument is an empty string, no data is added to the readable buffer so there is nothing for a user to consume. diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 3800399c82ad62..ff07a2fd89afe0 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -420,11 +420,11 @@ function readableAddChunkUnshiftByteMode(stream, state, chunk, encoding) { chunk = Buffer.from(chunk, encoding); } } - } else if (Stream._isUint8Array(chunk)) { + } else if (Stream._isArrayBufferView(chunk)) { chunk = Stream._uint8ArrayToBuffer(chunk); } else if (chunk !== undefined && !(chunk instanceof Buffer)) { errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE( - 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk)); + 'chunk', ['string', 'Buffer', 'TypedArray', 'DataView'], chunk)); return false; } @@ -473,12 +473,12 @@ function readableAddChunkPushByteMode(stream, state, chunk, encoding) { } } else if (chunk instanceof Buffer) { encoding = ''; - } else if (Stream._isUint8Array(chunk)) { + } else if (Stream._isArrayBufferView(chunk)) { chunk = Stream._uint8ArrayToBuffer(chunk); encoding = ''; } else if (chunk !== undefined) { errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE( - 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk)); + 'chunk', ['string', 'Buffer', 'TypedArray', 'DataView'], chunk)); return false; } diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index 4de88c48eebdb9..0a16aee369b955 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -467,12 +467,12 @@ function _write(stream, chunk, encoding, cb) { } } else if (chunk instanceof Buffer) { encoding = 'buffer'; - } else if (Stream._isUint8Array(chunk)) { + } else if (Stream._isArrayBufferView(chunk)) { chunk = Stream._uint8ArrayToBuffer(chunk); encoding = 'buffer'; } else { throw new ERR_INVALID_ARG_TYPE( - 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); + 'chunk', ['string', 'Buffer', 'TypedArray', 'DataView'], chunk); } } diff --git a/lib/stream.js b/lib/stream.js index cdbc1fe0380694..a69354138a2248 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -50,6 +50,7 @@ const internalBuffer = require('internal/buffer'); const promises = require('stream/promises'); const utils = require('internal/streams/utils'); +const { isArrayBufferView, isUint8Array } = require('internal/util/types'); const Stream = module.exports = require('internal/streams/legacy').Stream; @@ -137,7 +138,8 @@ ObjectDefineProperty(eos, customPromisify, { // Backwards-compat with node 0.4.x Stream.Stream = Stream; -Stream._isUint8Array = require('internal/util/types').isUint8Array; +Stream._isArrayBufferView = isArrayBufferView; +Stream._isUint8Array = isUint8Array; Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) { return new internalBuffer.FastBuffer(chunk.buffer, chunk.byteOffset, diff --git a/test/parallel/test-net-write-arguments.js b/test/parallel/test-net-write-arguments.js index 289c3c0f36bcf3..2e2aa55432053b 100644 --- a/test/parallel/test-net-write-arguments.js +++ b/test/parallel/test-net-write-arguments.js @@ -34,6 +34,6 @@ assert.throws(() => { code: 'ERR_INVALID_ARG_TYPE', name: 'TypeError', message: 'The "chunk" argument must be of type string or an instance of ' + - `Buffer or Uint8Array.${common.invalidArgTypeHelper(value)}` + `Buffer, TypedArray, or DataView.${common.invalidArgTypeHelper(value)}` }); }); diff --git a/test/parallel/test-stream-typedarray.js b/test/parallel/test-stream-typedarray.js new file mode 100644 index 00000000000000..a374989276cf64 --- /dev/null +++ b/test/parallel/test-stream-typedarray.js @@ -0,0 +1,105 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const { Readable, Writable } = require('stream'); + +const buffer = Buffer.from('ABCD'); +const views = common.getArrayBufferViews(buffer); + +{ + // Simple Writable test. + let n = 0; + const writable = new Writable({ + write: common.mustCall((chunk, encoding, cb) => { + assert(chunk instanceof Buffer); + assert(ArrayBuffer.isView(chunk)); + assert.deepStrictEqual(common.getBufferSources(chunk)[n], views[n]); + n++; + cb(); + }, views.length), + }); + + views.forEach((msg) => writable.write(msg)); + writable.end(); +} + +{ + // Writable test with object mode True. + let n = 0; + const writable = new Writable({ + objectMode: true, + write: common.mustCall((chunk, encoding, cb) => { + assert(!(chunk instanceof Buffer)); + assert(ArrayBuffer.isView(chunk)); + assert.deepStrictEqual(common.getBufferSources(chunk)[n], views[n]); + n++; + cb(); + }, views.length), + }); + + views.forEach((msg) => writable.write(msg)); + writable.end(); +} + + +{ + // Writable test, multiple writes carried out via writev. + let n = 0; + let callback; + const writable = new Writable({ + write: common.mustCall((chunk, encoding, cb) => { + assert(chunk instanceof Buffer); + assert(ArrayBuffer.isView(chunk)); + assert.deepStrictEqual(common.getBufferSources(chunk)[n], views[n]); + n++; + callback = cb; + }), + + writev: common.mustCall((chunks, cb) => { + assert.strictEqual(chunks.length, views.length); + let res = ''; + for (const chunk of chunks) { + assert.strictEqual(chunk.encoding, 'buffer'); + res += chunk.chunk; + } + assert.strictEqual(res, 'ABCD'.repeat(9)); + }), + + }); + views.forEach((msg) => writable.write(msg)); + writable.end(views[0]); + callback(); +} + + +{ + // Simple Readable test. + const readable = new Readable({ + read() {} + }); + + readable.push(views[1]); + readable.push(views[2]); + readable.unshift(views[0]); + + const buf = readable.read(); + assert(buf instanceof Buffer); + assert.deepStrictEqual([...buf], [...views[0], ...views[1], ...views[2]]); +} + +{ + // Readable test, setEncoding. + const readable = new Readable({ + read() {} + }); + + readable.setEncoding('utf8'); + + readable.push(views[1]); + readable.push(views[2]); + readable.unshift(views[0]); + + const out = readable.read(); + assert.strictEqual(out, 'ABCD'.repeat(3)); +}