diff --git a/lib/net.js b/lib/net.js index 7e3f4abcd7d841..22c16f0572bb65 100644 --- a/lib/net.js +++ b/lib/net.js @@ -103,20 +103,18 @@ function getFlags(ipv6Only) { return ipv6Only === true ? TCPConstants.UV_TCP_IPV6ONLY : 0; } -function createHandle(fd, is_server, buf) { +function createHandle(fd, is_server) { validateInt32(fd, 'fd', 0); const type = TTYWrap.guessHandleType(fd); if (type === 'PIPE') { return new Pipe( - is_server ? PipeConstants.SERVER : PipeConstants.SOCKET, - buf + is_server ? PipeConstants.SERVER : PipeConstants.SOCKET ); } if (type === 'TCP') { return new TCP( - is_server ? TCPConstants.SERVER : TCPConstants.SOCKET, - buf + is_server ? TCPConstants.SERVER : TCPConstants.SOCKET ); } @@ -223,6 +221,10 @@ function initSocketHandle(self) { self._handle[owner_symbol] = self; self._handle.onread = onStreamRead; self[async_id_symbol] = getNewAsyncId(self._handle); + + if (self[kBuffer]) { + self._handle.useUserBuffer(self[kBuffer]); + } } } @@ -904,8 +906,8 @@ Socket.prototype.connect = function(...args) { if (!this._handle) { this._handle = pipe ? - new Pipe(PipeConstants.SOCKET, this[kBuffer]) : - new TCP(TCPConstants.SOCKET, this[kBuffer]); + new Pipe(PipeConstants.SOCKET) : + new TCP(TCPConstants.SOCKET); initSocketHandle(this); } diff --git a/src/connection_wrap.cc b/src/connection_wrap.cc index ef626a65d68ce2..db239f9becdb53 100644 --- a/src/connection_wrap.cc +++ b/src/connection_wrap.cc @@ -29,17 +29,6 @@ ConnectionWrap::ConnectionWrap(Environment* env, reinterpret_cast(&handle_), provider) {} -template -ConnectionWrap::ConnectionWrap(Environment* env, - Local object, - ProviderType provider, - uv_buf_t buf) - : LibuvStreamWrap(env, - object, - reinterpret_cast(&handle_), - provider, - buf) {} - template void ConnectionWrap::OnConnection(uv_stream_t* handle, @@ -127,23 +116,11 @@ template ConnectionWrap::ConnectionWrap( Local object, ProviderType provider); -template ConnectionWrap::ConnectionWrap( - Environment* env, - Local object, - ProviderType provider, - uv_buf_t buf); - template ConnectionWrap::ConnectionWrap( Environment* env, Local object, ProviderType provider); -template ConnectionWrap::ConnectionWrap( - Environment* env, - Local object, - ProviderType provider, - uv_buf_t buf); - template void ConnectionWrap::OnConnection( uv_stream_t* handle, int status); diff --git a/src/connection_wrap.h b/src/connection_wrap.h index 399b7b45721652..5b114088760dad 100644 --- a/src/connection_wrap.h +++ b/src/connection_wrap.h @@ -19,10 +19,6 @@ class ConnectionWrap : public LibuvStreamWrap { ConnectionWrap(Environment* env, v8::Local object, ProviderType provider); - ConnectionWrap(Environment* env, - v8::Local object, - ProviderType provider, - uv_buf_t buf); UVType handle_; }; diff --git a/src/pipe_wrap.cc b/src/pipe_wrap.cc index 30c37aff1a79d5..6259cbdd1918dd 100644 --- a/src/pipe_wrap.cc +++ b/src/pipe_wrap.cc @@ -149,14 +149,7 @@ void PipeWrap::New(const FunctionCallbackInfo& args) { UNREACHABLE(); } - if (args.Length() > 1 && Buffer::HasInstance(args[1])) { - uv_buf_t buf; - buf.base = Buffer::Data(args[1]); - buf.len = Buffer::Length(args[1]); - new PipeWrap(env, args.This(), provider, ipc, buf); - } else { - new PipeWrap(env, args.This(), provider, ipc); - } + new PipeWrap(env, args.This(), provider, ipc); } @@ -170,17 +163,6 @@ PipeWrap::PipeWrap(Environment* env, // Suggestion: uv_pipe_init() returns void. } -PipeWrap::PipeWrap(Environment* env, - Local object, - ProviderType provider, - bool ipc, - uv_buf_t buf) - : ConnectionWrap(env, object, provider, buf) { - int r = uv_pipe_init(env->event_loop(), &handle_, ipc); - CHECK_EQ(r, 0); // How do we proxy this error up to javascript? - // Suggestion: uv_pipe_init() returns void. -} - void PipeWrap::Bind(const FunctionCallbackInfo& args) { PipeWrap* wrap; diff --git a/src/pipe_wrap.h b/src/pipe_wrap.h index 139dbdc3bac42c..b98d850439f0f4 100644 --- a/src/pipe_wrap.h +++ b/src/pipe_wrap.h @@ -55,11 +55,6 @@ class PipeWrap : public ConnectionWrap { v8::Local object, ProviderType provider, bool ipc); - PipeWrap(Environment* env, - v8::Local object, - ProviderType provider, - bool ipc, - uv_buf_t buf); static void New(const v8::FunctionCallbackInfo& args); static void Bind(const v8::FunctionCallbackInfo& args); diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index 4abd28af9d17ad..af5027320d0502 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -150,14 +150,6 @@ inline void StreamResource::EmitWantsWrite(size_t suggested_size) { } inline StreamBase::StreamBase(Environment* env) : env_(env) { - buf_.base = nullptr; - buf_.len = 0; - PushStreamListener(&default_listener_); -} - -inline StreamBase::StreamBase(Environment* env, uv_buf_t buf) - : env_(env), - buf_(buf) { PushStreamListener(&default_listener_); } @@ -165,10 +157,6 @@ inline Environment* StreamBase::stream_env() const { return env_; } -inline uv_buf_t StreamBase::stream_buf() const { - return buf_; -} - inline int StreamBase::Shutdown(v8::Local req_wrap_obj) { Environment* env = stream_env(); @@ -338,6 +326,9 @@ void StreamBase::AddMethods(Environment* env, Local t) { env->SetProtoMethod(t, "readStart", JSMethod); env->SetProtoMethod(t, "readStop", JSMethod); env->SetProtoMethod(t, "shutdown", JSMethod); + env->SetProtoMethod(t, + "useUserBuffer", + JSMethod); env->SetProtoMethod(t, "writev", JSMethod); env->SetProtoMethod(t, "writeBuffer", diff --git a/src/stream_base.cc b/src/stream_base.cc index cbf362f6870e5f..d081ac1479be88 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -46,6 +46,13 @@ int StreamBase::ReadStopJS(const FunctionCallbackInfo& args) { return ReadStop(); } +int StreamBase::UseUserBuffer(const FunctionCallbackInfo& args) { + CHECK(Buffer::HasInstance(args[0])); + + uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0])); + PushStreamListener(new CustomBufferJSListener(buf)); + return 0; +} int StreamBase::Shutdown(const FunctionCallbackInfo& args) { CHECK(args[0]->IsObject()); @@ -295,7 +302,7 @@ void StreamBase::CallJSOnreadMethod(ssize_t nread, DCHECK_EQ(static_cast(nread), nread); DCHECK_LE(offset, INT32_MAX); - if (ab.IsEmpty() && buf_.base == nullptr) { + if (ab.IsEmpty()) { DCHECK_EQ(offset, 0); DCHECK_LE(nread, 0); } else { @@ -347,12 +354,7 @@ void StreamResource::ClearError() { uv_buf_t StreamListener::OnStreamAlloc(size_t suggested_size) { - StreamBase* stream = static_cast(stream_); - const uv_buf_t stream_buf = stream->stream_buf(); - if (stream_buf.base != nullptr) - return stream_buf; - else - return uv_buf_init(Malloc(suggested_size), suggested_size); + return uv_buf_init(Malloc(suggested_size), suggested_size); } @@ -362,32 +364,44 @@ void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { Environment* env = stream->stream_env(); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); - const uv_buf_t stream_buf = stream->stream_buf(); - Local obj; if (nread <= 0) { - if (stream_buf.base == nullptr) - free(buf.base); - if (nread == 0) - return; - } else if (stream_buf.base != nullptr) { - CHECK_LE(static_cast(nread), stream_buf.len); - } else { - CHECK_LE(static_cast(nread), buf.len); + free(buf.base); + if (nread < 0) + stream->CallJSOnreadMethod(nread, Local()); + return; + } - char* base = Realloc(buf.base, nread); + CHECK_LE(static_cast(nread), buf.len); + char* base = Realloc(buf.base, nread); - obj = ArrayBuffer::New( - env->isolate(), - base, - nread, - // Transfer ownership to V8. - v8::ArrayBufferCreationMode::kInternalized); - } + Local obj = ArrayBuffer::New( + env->isolate(), + base, + nread, + v8::ArrayBufferCreationMode::kInternalized); // Transfer ownership to V8. stream->CallJSOnreadMethod(nread, obj); } +uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) { + return buffer_; +} + + +void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { + CHECK_NOT_NULL(stream_); + CHECK_EQ(buf.base, buffer_.base); + + StreamBase* stream = static_cast(stream_); + Environment* env = stream->stream_env(); + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + + stream->CallJSOnreadMethod(nread, Local()); +} + + void ReportWritesToJSStreamListener::OnStreamAfterReqFinished( StreamReq* req_wrap, int status) { StreamBase* stream = static_cast(stream_); diff --git a/src/stream_base.h b/src/stream_base.h index a96751381beae0..07c70cf51c89a3 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -185,6 +185,21 @@ class EmitToJSStreamListener : public ReportWritesToJSStreamListener { }; +// An alternative listener that uses a custom, user-provided buffer +// for reading data. +class CustomBufferJSListener : public ReportWritesToJSStreamListener { + public: + uv_buf_t OnStreamAlloc(size_t suggested_size) override; + void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; + void OnStreamDestroy() override { delete this; } + + explicit CustomBufferJSListener(uv_buf_t buffer) : buffer_(buffer) {} + + private: + uv_buf_t buffer_; +}; + + // A generic stream, comparable to JS land’s `Duplex` streams. // A stream is always controlled through one `StreamListener` instance. class StreamResource { @@ -271,7 +286,6 @@ class StreamBase : public StreamResource { // This is named `stream_env` to avoid name clashes, because a lot of // subclasses are also `BaseObject`s. Environment* stream_env() const; - uv_buf_t stream_buf() const; // Shut down the current stream. This request can use an existing // ShutdownWrap object (that was created in JS), or a new one will be created. @@ -302,7 +316,6 @@ class StreamBase : public StreamResource { protected: explicit StreamBase(Environment* env); - explicit StreamBase(Environment* env, uv_buf_t buf); // JS Methods int ReadStartJS(const v8::FunctionCallbackInfo& args); @@ -312,6 +325,7 @@ class StreamBase : public StreamResource { int WriteBuffer(const v8::FunctionCallbackInfo& args); template int WriteString(const v8::FunctionCallbackInfo& args); + int UseUserBuffer(const v8::FunctionCallbackInfo& args); template static void GetFD(const v8::FunctionCallbackInfo& args); @@ -341,7 +355,6 @@ class StreamBase : public StreamResource { private: Environment* env_; - uv_buf_t buf_; EmitToJSStreamListener default_listener_; void SetWriteResult(const StreamWriteResult& res); diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index ca66815a300480..ae54b019fea034 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -106,19 +106,6 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env, stream_(stream) { } -LibuvStreamWrap::LibuvStreamWrap(Environment* env, - Local object, - uv_stream_t* stream, - AsyncWrap::ProviderType provider, - uv_buf_t buf) - : HandleWrap(env, - object, - reinterpret_cast(stream), - provider), - StreamBase(env, buf), - stream_(stream) { -} - Local LibuvStreamWrap::GetConstructorTemplate( Environment* env) { diff --git a/src/stream_wrap.h b/src/stream_wrap.h index 638072639d263f..19366ff4fba2c4 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -84,11 +84,6 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase { v8::Local object, uv_stream_t* stream, AsyncWrap::ProviderType provider); - LibuvStreamWrap(Environment* env, - v8::Local object, - uv_stream_t* stream, - AsyncWrap::ProviderType provider, - uv_buf_t buf); AsyncWrap* GetAsyncWrap() override; diff --git a/src/tcp_wrap.cc b/src/tcp_wrap.cc index 8712ac2ef5ce17..dac621ec879e5c 100644 --- a/src/tcp_wrap.cc +++ b/src/tcp_wrap.cc @@ -158,14 +158,7 @@ void TCPWrap::New(const FunctionCallbackInfo& args) { UNREACHABLE(); } - if (args.Length() > 1 && Buffer::HasInstance(args[1])) { - uv_buf_t buf; - buf.base = Buffer::Data(args[1]); - buf.len = Buffer::Length(args[1]); - new TCPWrap(env, args.This(), provider, buf); - } else { - new TCPWrap(env, args.This(), provider); - } + new TCPWrap(env, args.This(), provider); } @@ -176,14 +169,6 @@ TCPWrap::TCPWrap(Environment* env, Local object, ProviderType provider) // Suggestion: uv_tcp_init() returns void. } -TCPWrap::TCPWrap(Environment* env, Local object, ProviderType provider, - uv_buf_t buf) - : ConnectionWrap(env, object, provider, buf) { - int r = uv_tcp_init(env->event_loop(), &handle_); - CHECK_EQ(r, 0); // How do we proxy this error up to javascript? - // Suggestion: uv_tcp_init() returns void. -} - void TCPWrap::SetNoDelay(const FunctionCallbackInfo& args) { TCPWrap* wrap; diff --git a/src/tcp_wrap.h b/src/tcp_wrap.h index 3fc626d49857a7..db269f65281639 100644 --- a/src/tcp_wrap.h +++ b/src/tcp_wrap.h @@ -67,8 +67,6 @@ class TCPWrap : public ConnectionWrap { TCPWrap(Environment* env, v8::Local object, ProviderType provider); - TCPWrap(Environment* env, v8::Local object, - ProviderType provider, uv_buf_t buf); static void New(const v8::FunctionCallbackInfo& args); static void SetNoDelay(const v8::FunctionCallbackInfo& args);