diff --git a/benchmark/net/tcp-raw-c2s.js b/benchmark/net/tcp-raw-c2s.js index 1f10ae7c839d87..116cf57a234393 100644 --- a/benchmark/net/tcp-raw-c2s.js +++ b/benchmark/net/tcp-raw-c2s.js @@ -46,15 +46,15 @@ function main({ dur, len, type }) { process.exit(0); }, dur * 1000); - clientHandle.onread = function(nread, buffer) { + clientHandle.onread = function(buffer) { // we're not expecting to ever get an EOF from the client. // just lots of data forever. - if (nread < 0) - fail(nread, 'read'); + if (!buffer) + fail('read'); // don't slice the buffer. the point of this is to isolate, not // simulate real traffic. - bytes += buffer.length; + bytes += buffer.byteLength; }; clientHandle.readStart(); diff --git a/benchmark/net/tcp-raw-pipe.js b/benchmark/net/tcp-raw-pipe.js index 16dc6955c46240..7144c237af21b4 100644 --- a/benchmark/net/tcp-raw-pipe.js +++ b/benchmark/net/tcp-raw-pipe.js @@ -43,15 +43,15 @@ function main({ dur, len, type }) { if (err) fail(err, 'connect'); - clientHandle.onread = function(nread, buffer) { + clientHandle.onread = function(buffer) { // we're not expecting to ever get an EOF from the client. // just lots of data forever. - if (nread < 0) - fail(nread, 'read'); + if (!buffer) + fail('read'); const writeReq = new WriteWrap(); writeReq.async = false; - err = clientHandle.writeBuffer(writeReq, buffer); + err = clientHandle.writeBuffer(writeReq, Buffer.from(buffer)); if (err) fail(err, 'write'); @@ -89,11 +89,11 @@ function main({ dur, len, type }) { if (err) fail(err, 'connect'); - clientHandle.onread = function(nread, buffer) { - if (nread < 0) - fail(nread, 'read'); + clientHandle.onread = function(buffer) { + if (!buffer) + fail('read'); - bytes += buffer.length; + bytes += buffer.byteLength; }; connectReq.oncomplete = function(err) { diff --git a/benchmark/net/tcp-raw-s2c.js b/benchmark/net/tcp-raw-s2c.js index 1700d23890a3b5..fbb7d2520cfe3b 100644 --- a/benchmark/net/tcp-raw-s2c.js +++ b/benchmark/net/tcp-raw-s2c.js @@ -109,15 +109,15 @@ function main({ dur, len, type }) { connectReq.oncomplete = function() { var bytes = 0; - clientHandle.onread = function(nread, buffer) { + clientHandle.onread = function(buffer) { // we're not expecting to ever get an EOF from the client. // just lots of data forever. - if (nread < 0) - fail(nread, 'read'); + if (!buffer) + fail('read'); // don't slice the buffer. the point of this is to isolate, not // simulate real traffic. - bytes += buffer.length; + bytes += buffer.byteLength; }; clientHandle.readStart(); diff --git a/lib/internal/child_process.js b/lib/internal/child_process.js index e6cdde56c1f7bc..74d69de0dcdeee 100644 --- a/lib/internal/child_process.js +++ b/lib/internal/child_process.js @@ -22,7 +22,12 @@ const util = require('util'); const assert = require('assert'); const { Process } = internalBinding('process_wrap'); -const { WriteWrap } = internalBinding('stream_wrap'); +const { + WriteWrap, + kReadBytesOrError, + kArrayBufferOffset, + streamBaseState +} = internalBinding('stream_wrap'); const { Pipe, constants: PipeConstants } = internalBinding('pipe_wrap'); const { TCP } = internalBinding('tcp_wrap'); const { TTY } = internalBinding('tty_wrap'); @@ -486,11 +491,13 @@ function setupChannel(target, channel) { var pendingHandle = null; channel.buffering = false; channel.pendingHandle = null; - channel.onread = function(nread, pool) { + channel.onread = function(arrayBuffer) { const recvHandle = channel.pendingHandle; channel.pendingHandle = null; - // TODO(bnoordhuis) Check that nread > 0. - if (pool) { + if (arrayBuffer) { + const nread = streamBaseState[kReadBytesOrError]; + const offset = streamBaseState[kArrayBufferOffset]; + const pool = new Uint8Array(arrayBuffer, offset, nread); if (recvHandle) pendingHandle = recvHandle; diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 4e9ab05af512b5..ded26644c590d5 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -120,7 +120,11 @@ const { isArrayBufferView } = require('internal/util/types'); const { FileHandle } = process.binding('fs'); const binding = internalBinding('http2'); -const { ShutdownWrap } = internalBinding('stream_wrap'); +const { + ShutdownWrap, + kReadBytesOrError, + streamBaseState +} = internalBinding('stream_wrap'); const { UV_EOF } = internalBinding('uv'); const { StreamPipe } = internalBinding('stream_pipe'); @@ -2043,7 +2047,8 @@ function onFileUnpipe() { // This is only called once the pipe has returned back control, so // it only has to handle errors and End-of-File. -function onPipedFileHandleRead(err) { +function onPipedFileHandleRead() { + const err = streamBaseState[kReadBytesOrError]; if (err < 0 && err !== UV_EOF) { this.stream.close(NGHTTP2_INTERNAL_ERROR); } diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js index 8da15983f18997..870b5b3e3b01a2 100644 --- a/lib/internal/stream_base_commons.js +++ b/lib/internal/stream_base_commons.js @@ -1,7 +1,13 @@ 'use strict'; const { Buffer } = require('buffer'); -const { WriteWrap } = internalBinding('stream_wrap'); +const { FastBuffer } = require('internal/buffer'); +const { + WriteWrap, + kReadBytesOrError, + kArrayBufferOffset, + streamBaseState +} = internalBinding('stream_wrap'); const { UV_EOF } = internalBinding('uv'); const { errnoException } = require('internal/errors'); const { owner_symbol } = require('internal/async_hooks').symbols; @@ -84,13 +90,17 @@ function afterWriteDispatched(self, req, err, cb) { } } -function onStreamRead(nread, buf) { +function onStreamRead(arrayBuffer) { + const nread = streamBaseState[kReadBytesOrError]; + const handle = this; const stream = this[owner_symbol]; stream[kUpdateTimer](); if (nread > 0 && !stream.destroyed) { + const offset = streamBaseState[kArrayBufferOffset]; + const buf = new FastBuffer(arrayBuffer, offset, nread); if (!stream.push(buf)) { handle.reading = false; if (!stream.destroyed) { diff --git a/src/env-inl.h b/src/env-inl.h index 6ace0bf82533d5..9d369d492c1cd1 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -446,6 +446,11 @@ Environment::trace_category_state() { return trace_category_state_; } +inline AliasedBuffer& +Environment::stream_base_state() { + return stream_base_state_; +} + inline uint32_t Environment::get_next_module_id() { return module_id_counter_++; } diff --git a/src/env.cc b/src/env.cc index f77bcbf1699912..9d8fd967a406a6 100644 --- a/src/env.cc +++ b/src/env.cc @@ -158,6 +158,7 @@ Environment::Environment(IsolateData* isolate_data, makecallback_cntr_(0), should_abort_on_uncaught_toggle_(isolate_, 1), trace_category_state_(isolate_, kTraceCategoryCount), + stream_base_state_(isolate_, StreamBase::kNumStreamBaseStateFields), http_parser_buffer_(nullptr), fs_stats_field_array_(isolate_, kFsStatsFieldsLength * 2), fs_stats_field_bigint_array_(isolate_, kFsStatsFieldsLength * 2), diff --git a/src/env.h b/src/env.h index 3daa48f9cbb70e..a85058f895a4d8 100644 --- a/src/env.h +++ b/src/env.h @@ -668,6 +668,7 @@ class Environment { should_abort_on_uncaught_toggle(); inline AliasedBuffer& trace_category_state(); + inline AliasedBuffer& stream_base_state(); // The necessary API for async_hooks. inline double new_async_id(); @@ -951,6 +952,8 @@ class Environment { AliasedBuffer trace_category_state_; std::unique_ptr trace_state_observer_; + AliasedBuffer stream_base_state_; + std::unique_ptr performance_state_; std::unordered_map performance_marks_; diff --git a/src/node_http2.cc b/src/node_http2.cc index 633d2389c7cf35..ce5523a9d22aa8 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -1256,10 +1256,7 @@ void Http2StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { CHECK_LE(offset, session->stream_buf_.len); CHECK_LE(offset + buf.len, session->stream_buf_.len); - Local buffer = - Buffer::New(env, session->stream_buf_ab_, offset, nread).ToLocalChecked(); - - stream->CallJSOnreadMethod(nread, buffer); + stream->CallJSOnreadMethod(nread, session->stream_buf_ab_, offset); } diff --git a/src/stream_base.cc b/src/stream_base.cc index c6cce9c2d09ba9..57713d5eaf30d3 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -17,6 +17,7 @@ namespace node { using v8::Array; +using v8::ArrayBuffer; using v8::Boolean; using v8::Context; using v8::FunctionCallbackInfo; @@ -303,17 +304,29 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { } -void StreamBase::CallJSOnreadMethod(ssize_t nread, Local buf) { +void StreamBase::CallJSOnreadMethod(ssize_t nread, + Local ab, + size_t offset) { Environment* env = env_; +#ifdef DEBUG + CHECK_EQ(static_cast(nread), nread); + CHECK_EQ(static_cast(offset), offset); + + if (ab.IsEmpty()) { + CHECK_EQ(offset, 0); + CHECK_LE(nread, 0); + } else { + CHECK_GE(nread, 0); + } +#endif + env->stream_base_state()[kReadBytesOrError] = nread; + env->stream_base_state()[kArrayBufferOffset] = offset; + Local argv[] = { - Integer::New(env->isolate(), nread), - buf + ab.IsEmpty() ? Undefined(env->isolate()).As() : ab.As() }; - if (argv[1].IsEmpty()) - argv[1] = Undefined(env->isolate()); - AsyncWrap* wrap = GetAsyncWrap(); CHECK_NOT_NULL(wrap); wrap->MakeCallback(env->onread_string(), arraysize(argv), argv); @@ -366,14 +379,18 @@ void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { if (nread <= 0) { free(buf.base); if (nread < 0) - stream->CallJSOnreadMethod(nread, Local()); + stream->CallJSOnreadMethod(nread, Local()); return; } CHECK_LE(static_cast(nread), buf.len); char* base = Realloc(buf.base, nread); - Local obj = Buffer::New(env, base, nread).ToLocalChecked(); + Local obj = ArrayBuffer::New( + env->isolate(), + base, + nread, + v8::ArrayBufferCreationMode::kInternalized); // Transfer ownership to V8. stream->CallJSOnreadMethod(nread, obj); } diff --git a/src/stream_base.h b/src/stream_base.h index d8e6df960f4f54..039009e07257b6 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -264,7 +264,9 @@ class StreamBase : public StreamResource { virtual bool IsIPCPipe(); virtual int GetFD(); - void CallJSOnreadMethod(ssize_t nread, v8::Local buf); + void CallJSOnreadMethod(ssize_t nread, + v8::Local ab, + size_t offset = 0); // This is named `stream_env` to avoid name clashes, because a lot of // subclasses are also `BaseObject`s. @@ -326,12 +328,20 @@ class StreamBase : public StreamResource { const v8::FunctionCallbackInfo& args)> static void JSMethod(const v8::FunctionCallbackInfo& args); + // Internal, used only in StreamBase methods + env.cc. + enum StreamBaseStateFields { + kReadBytesOrError, + kArrayBufferOffset, + kNumStreamBaseStateFields + }; + private: Environment* env_; EmitToJSStreamListener default_listener_; friend class WriteWrap; friend class ShutdownWrap; + friend class Environment; // For kNumStreamBaseStateFields. }; diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index 9ccace435c6796..a3c45b940a53eb 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -80,6 +80,11 @@ void LibuvStreamWrap::Initialize(Local target, target->Set(writeWrapString, ww->GetFunction(env->context()).ToLocalChecked()); env->set_write_wrap_template(ww->InstanceTemplate()); + + NODE_DEFINE_CONSTANT(target, kReadBytesOrError); + NODE_DEFINE_CONSTANT(target, kArrayBufferOffset); + target->Set(context, FIXED_ONE_BYTE_STRING(env->isolate(), "streamBaseState"), + env->stream_base_state().GetJSArray()).FromJust(); } diff --git a/test/parallel/test-net-end-close.js b/test/parallel/test-net-end-close.js index c0705da9d089e4..b488f16510f7cf 100644 --- a/test/parallel/test-net-end-close.js +++ b/test/parallel/test-net-end-close.js @@ -6,11 +6,15 @@ const net = require('net'); const { internalBinding } = require('internal/test/binding'); const { UV_EOF } = internalBinding('uv'); +const { streamBaseState, kReadBytesOrError } = internalBinding('stream_wrap'); const s = new net.Socket({ handle: { readStart: function() { - setImmediate(() => this.onread(UV_EOF, null)); + setImmediate(() => { + streamBaseState[kReadBytesOrError] = UV_EOF; + this.onread(); + }); }, close: (cb) => setImmediate(cb) }, diff --git a/test/parallel/test-process-wrap.js b/test/parallel/test-process-wrap.js index eccdeb5d075575..ef9075e9158229 100644 --- a/test/parallel/test-process-wrap.js +++ b/test/parallel/test-process-wrap.js @@ -44,11 +44,10 @@ p.onexit = function(exitCode, signal) { processExited = true; }; -pipe.onread = function(err, b, off, len) { +pipe.onread = function(arrayBuffer) { assert.ok(processExited); - if (b) { + if (arrayBuffer) { gotPipeData = true; - console.log('read %d', len); } else { gotPipeEOF = true; pipe.close(); diff --git a/test/parallel/test-tcp-wrap-listen.js b/test/parallel/test-tcp-wrap-listen.js index 9ecdf60f8c554c..72981b683ccea3 100644 --- a/test/parallel/test-tcp-wrap-listen.js +++ b/test/parallel/test-tcp-wrap-listen.js @@ -5,7 +5,12 @@ const assert = require('assert'); const { internalBinding } = require('internal/test/binding'); const { TCP, constants: TCPConstants } = internalBinding('tcp_wrap'); -const { WriteWrap } = internalBinding('stream_wrap'); +const { + WriteWrap, + kReadBytesOrError, + kArrayBufferOffset, + streamBaseState +} = internalBinding('stream_wrap'); const server = new TCP(TCPConstants.SOCKET); @@ -30,8 +35,11 @@ server.onconnection = (err, client) => { client.readStart(); client.pendingWrites = []; - client.onread = common.mustCall((err, buffer) => { - if (buffer) { + client.onread = common.mustCall((arrayBuffer) => { + if (arrayBuffer) { + const offset = streamBaseState[kArrayBufferOffset]; + const nread = streamBaseState[kReadBytesOrError]; + const buffer = Buffer.from(arrayBuffer, offset, nread); assert.ok(buffer.length > 0); assert.strictEqual(client.writeQueueSize, 0);