From e56eedd390c9d0e0c31a396750714913390d2f08 Mon Sep 17 00:00:00 2001 From: Sarat Addepalli Date: Tue, 21 Aug 2018 11:46:04 +0530 Subject: [PATCH 1/9] stream: allow typed arrays --- lib/_stream_readable.js | 11 ++++++++--- lib/_stream_writable.js | 4 ++-- lib/stream.js | 31 ++++++++++++++++++++++--------- 3 files changed, 32 insertions(+), 14 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index f0829966438077..4bc536677b3b27 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -240,7 +240,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { if (typeof chunk !== 'string' && !state.objectMode && Object.getPrototypeOf(chunk) !== Buffer.prototype) { - chunk = Stream._uint8ArrayToBuffer(chunk); + chunk = Stream._typedArrayToBuffer(chunk); } if (addToFront) { @@ -297,12 +297,17 @@ function addChunk(stream, state, chunk, addToFront) { function chunkInvalid(state, chunk) { var er; - if (!Stream._isUint8Array(chunk) && + if (!Stream._isArrayBufferView(chunk) && typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) { er = new ERR_INVALID_ARG_TYPE( - 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); + 'chunk', + ['string', + 'Buffer', + 'TypedArray', + 'DataView'], + chunk); } return er; } diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 3bad957912b323..e3396ff8fb34d3 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -268,10 +268,10 @@ function validChunk(stream, state, chunk, cb) { Writable.prototype.write = function(chunk, encoding, cb) { var state = this._writableState; var ret = false; - var isBuf = !state.objectMode && Stream._isUint8Array(chunk); + var isBuf = !state.objectMode && Stream._isArrayBufferView(chunk); if (isBuf && Object.getPrototypeOf(chunk) !== Buffer.prototype) { - chunk = Stream._uint8ArrayToBuffer(chunk); + chunk = Stream._typedArrayToBuffer(chunk); } if (typeof encoding === 'function') { diff --git a/lib/stream.js b/lib/stream.js index 7c235108c07256..15acf08327e4cc 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -45,28 +45,41 @@ Stream.Stream = Stream; try { const types = require('util').types; if (types && typeof types.isUint8Array === 'function') { - Stream._isUint8Array = types.isUint8Array; + // Stream._isUint8Array = types.isUint8Array; + Stream._isArrayBufferView = types.isArrayBufferView; } else { // This throws for Node < 4.2.0 because there's no util binding and // returns undefined for Node < 7.4.0. - Stream._isUint8Array = process.binding('util').isUint8Array; + // Stream._isUint8Array = process.binding('util').isUint8Array; + Stream._isArrayBufferView = process.binding('util').isArrayBufferView; } } catch (e) { } -if (!Stream._isUint8Array) { - Stream._isUint8Array = function _isUint8Array(obj) { - return Object.prototype.toString.call(obj) === '[object Uint8Array]'; +if (!Stream._isArrayBufferView) { + Stream._isArrayBufferView = function _isArrayBufferView(obj) { + return [ + '[object Int8Array]', + '[object Uint8Array]', + '[object Uint8ClampedArray]', + '[object Int16Array]', + '[object Uint16Array]', + '[object Int32Array]', + '[object Uint32Array]', + '[object Float32Array]', + '[object Float64Array]', + '[object DataView]' + ].includes(Object.prototype.toString.call(obj)); }; } const version = process.version.substr(1).split('.'); if (version[0] === 0 && version[1] < 12) { - Stream._uint8ArrayToBuffer = Buffer; + Stream._typedArrayToBuffer = Buffer; } else { try { const internalBuffer = require('internal/buffer'); - Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) { + Stream._typedArrayToBuffer = function _typedArrayToBuffer(chunk) { return new internalBuffer.FastBuffer(chunk.buffer, chunk.byteOffset, chunk.byteLength); @@ -74,8 +87,8 @@ if (version[0] === 0 && version[1] < 12) { } catch (e) { } - if (!Stream._uint8ArrayToBuffer) { - Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) { + if (!Stream._typedArrayToBuffer) { + Stream._typedArrayToBuffer = function _typedArrayToBuffer(chunk) { return Buffer.prototype.slice.call(chunk); }; } From 8b89f563a2eff0f9b3e9a286d65a20cf1379a2ca Mon Sep 17 00:00:00 2001 From: Sarat Addepalli Date: Tue, 21 Aug 2018 13:44:43 +0530 Subject: [PATCH 2/9] stream: add tests and docs for typedarrays --- doc/api/stream.md | 36 ++++++-- test/parallel/test-stream-typedarrays.js | 112 +++++++++++++++++++++++ 2 files changed, 138 insertions(+), 10 deletions(-) create mode 100644 test/parallel/test-stream-typedarrays.js diff --git a/doc/api/stream.md b/doc/api/stream.md index 840e85b71606c3..1eed460eada9d1 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -376,6 +376,10 @@ but instead implement [`writable._destroy()`][writable-_destroy]. -* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams - not operating in object mode, `chunk` must be a string, `Buffer` or +* `chunk` {string|Buffer|TypedArray|DataView|any} Optional data to write. For + streams not operating in object mode, `chunk` must be a string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value other than `null`. * `encoding` {string} The encoding, if `chunk` is a string @@ -486,6 +490,10 @@ the status of the `highWaterMark`. -* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams - not operating in object mode, `chunk` must be a string, `Buffer` or +* `chunk` {string|Buffer|TypedArray|DataView|any} Optional data to write. For + streams not operating in object mode, `chunk` must be a string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value other than `null`. * `encoding` {string} The encoding, if `chunk` is a string @@ -1129,13 +1137,17 @@ setTimeout(() => { -* `chunk` {Buffer|Uint8Array|string|any} Chunk of data to unshift onto the - read queue. For streams not operating in object mode, `chunk` must be a +* `chunk` {Buffer|TypedArray|DataView|string|any} Chunk of data to unshift onto + the read queue. For streams not operating in object mode, `chunk` must be a string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value other than `null`. @@ -1860,15 +1872,19 @@ It can be overridden by child classes but it **must not** be called directly. #### readable.push(chunk[, encoding]) -* `chunk` {Buffer|Uint8Array|string|null|any} Chunk of data to push into the - read queue. For streams not operating in object mode, `chunk` must be a - string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be - any JavaScript value. +* `chunk` {Buffer|TypedArray|DataView|string|null|any} Chunk of data to push + into the read queue. For streams not operating in object mode, `chunk` must + be a string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may + be any JavaScript value. * `encoding` {string} Encoding of string chunks. Must be a valid `Buffer` encoding, such as `'utf8'` or `'ascii'`. * Returns: {boolean} `true` if additional chunks of data may continued to be diff --git a/test/parallel/test-stream-typedarrays.js b/test/parallel/test-stream-typedarrays.js new file mode 100644 index 00000000000000..ca52fb6599ba46 --- /dev/null +++ b/test/parallel/test-stream-typedarrays.js @@ -0,0 +1,112 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const { Readable, Writable } = require('stream'); + +const messageBuffer = Buffer.from('ABCDEFGH'); // Needs to be a multiple of 8 +const toBeWritten = [...common.getArrayBufferViews(messageBuffer)]; +const nthWrittenBuffer = (n) => Buffer.from( + toBeWritten[n].buffer, + toBeWritten[n].byteOffset, + toBeWritten[n].byteLength +); + +{ + // Simple Writable test. + + let n = 0; + const writable = new Writable({ + write: common.mustCall((chunk, encoding, cb) => { + assert(chunk instanceof Buffer); + assert.deepStrictEqual(chunk, nthWrittenBuffer(n++)); + cb(); + }, toBeWritten.length) + }); + + toBeWritten.forEach((msg) => writable.write(msg)); + writable.end(); +} + +{ + // Writable test, pass in TypedArray in object mode. + + let n = 0; + const writable = new Writable({ + objectMode: true, + write: common.mustCall((chunk, encoding, cb) => { + assert(!(chunk instanceof Buffer)); + assert(ArrayBuffer.isView(chunk)); + assert.strictEqual(chunk, toBeWritten[n]); + assert.deepStrictEqual(chunk, toBeWritten[n]); + assert.strictEqual(encoding, 'utf8'); + n++; + cb(); + }, toBeWritten.length) + }); + + toBeWritten.forEach((msg) => writable.write(msg)); + writable.end(); +} + +{ + // Writable test, multiple writes carried out via writev. + let callback; + + const writable = new Writable({ + write: common.mustCall((chunk, encoding, cb) => { + assert(chunk instanceof Buffer); + assert.strictEqual(encoding, 'buffer'); + assert.deepStrictEqual(chunk, nthWrittenBuffer(0)); + callback = cb; + }), + writev: common.mustCall((chunks, cb) => { + const expectedWritevLength = toBeWritten.length - 1; + assert.strictEqual(chunks.length, expectedWritevLength); + for (let n = 0; n < expectedWritevLength; n++) { + assert.deepStrictEqual(chunks[n].chunk, nthWrittenBuffer(n + 1)); + } + }) + }); + + toBeWritten.forEach((msg, index) => { + if (index !== toBeWritten.length - 1) { + writable.write(msg); + } else { + writable.end(msg); + } + }); + callback(); +} + +{ + // Simple Readable test. + const readable = new Readable({ + read() {} + }); + + toBeWritten.forEach((wbuf) => readable.push(wbuf)); + readable.unshift(toBeWritten[0]); + + const buf = readable.read(); + assert(buf instanceof Buffer); + const expectedWrittenBufferEntries = toBeWritten.map( + (wbuf, index) => nthWrittenBuffer(index) + ).reduce((acc, wbuf) => acc.concat(...wbuf), []); + assert.deepStrictEqual([...buf], expectedWrittenBufferEntries); +} + +{ + // Readable test, setEncoding. + const readable = new Readable({ + read() {} + }); + + readable.setEncoding('utf8'); + + toBeWritten.forEach((wbuf) => readable.push(wbuf)); + readable.unshift(toBeWritten[0]); + + const out = readable.read(); + assert.strictEqual(out, 'ABCDEFGH'.repeat(10)); +} From b630539b710378195906d6dfa3629475f4852950 Mon Sep 17 00:00:00 2001 From: Sarat Addepalli Date: Tue, 21 Aug 2018 13:54:31 +0530 Subject: [PATCH 3/9] doc: update pr-urls for stream.md --- doc/api/stream.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 1eed460eada9d1..86b9cf62ef3bd2 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -377,7 +377,7 @@ but instead implement [`writable._destroy()`][writable-_destroy]. added: v0.9.4 changes: - version: REPLACEME - pr-url: REPLACEME + pr-url: https://github.com/nodejs/node/pull/22427 description: The `chunk` argument can now be any `TypedArray` or a `DataView`. - version: v10.0.0 @@ -491,7 +491,7 @@ the status of the `highWaterMark`. added: v0.9.4 changes: - version: REPLACEME - pr-url: REPLACEME + pr-url: https://github.com/nodejs/node/pull/22427 description: The `chunk` argument can now be any `TypedArray` or a `DataView`. - version: v8.0.0 @@ -1138,7 +1138,7 @@ setTimeout(() => { added: v0.9.11 changes: - version: REPLACEME - pr-url: REPLACEME + pr-url: https://github.com/nodejs/node/pull/22427 description: The `chunk` argument can now be any `TypedArray` or a `DataView`. - version: v8.0.0 @@ -1873,7 +1873,7 @@ It can be overridden by child classes but it **must not** be called directly. -* `data` {string|Buffer|Uint8Array} +* `data` {string|Buffer|TypedArray|DataView} * `encoding` {string} Only used when data is `string`. **Default:** `'utf8'`. * Returns: {net.Socket} The socket itself. @@ -850,9 +855,14 @@ active socket in the event system. If the socket is already `unref`ed calling ### socket.write(data[, encoding][, callback]) -* `data` {string|Buffer|Uint8Array} +* `data` {string|Buffer|TypedArray|DataView} * `encoding` {string} Only used when data is `string`. **Default:** `utf8`. * `callback` {Function} * Returns: {boolean}