From 465e55023ade107aa48c507562d6b06dd8b51cff Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Mon, 8 Jan 2018 01:14:06 +0100 Subject: [PATCH 1/2] src: refactor stream callbacks and ownership Instead of setting individual callbacks on streams and tracking stream ownership through a boolean `consume_` flag, always have one specific listener object in charge of a stream, and call methods on that object rather than generic C-style callbacks. Benchmark results show no significant changes: $ ./node benchmark/compare.js --runs 5 --new ./node --old ./node-master net | Rscript benchmark/compare.R [00:43:05|% 100| 8/8 files | 10/10 runs | 6/6 configs]: Done improvement confidence p.value net/net-c2s-cork.js dur=5 type="buf" len=1024 -0.80 % 0.720985414 net/net-c2s-cork.js dur=5 type="buf" len=128 -3.50 % 0.278786279 net/net-c2s-cork.js dur=5 type="buf" len=16 -4.44 % * 0.010458284 net/net-c2s-cork.js dur=5 type="buf" len=32 -0.51 % 0.445313528 net/net-c2s-cork.js dur=5 type="buf" len=4 -1.57 % 0.074816557 net/net-c2s-cork.js dur=5 type="buf" len=512 -0.25 % 0.926451422 net/net-c2s-cork.js dur=5 type="buf" len=64 1.66 % * 0.020469582 net/net-c2s-cork.js dur=5 type="buf" len=8 -0.18 % 0.739524856 net/net-c2s.js dur=5 type="asc" len=102400 -0.22 % 0.904819514 net/net-c2s.js dur=5 type="asc" len=16777216 0.34 % 0.862222556 net/net-c2s.js dur=5 type="buf" len=102400 -0.45 % 0.755593966 net/net-c2s.js dur=5 type="buf" len=16777216 1.87 % 0.477896886 net/net-c2s.js dur=5 type="utf" len=102400 -0.30 % 0.572739665 net/net-c2s.js dur=5 type="utf" len=16777216 1.18 % 0.369268245 net/net-pipe.js dur=5 type="asc" len=102400 1.18 % 0.368102481 net/net-pipe.js dur=5 type="asc" len=16777216 0.41 % 0.659646192 net/net-pipe.js dur=5 type="buf" len=102400 1.65 % 0.148484290 net/net-pipe.js dur=5 type="buf" len=16777216 0.05 % 0.949649889 net/net-pipe.js dur=5 type="utf" len=102400 0.65 % 0.463140117 net/net-pipe.js dur=5 type="utf" len=16777216 0.57 % 0.531757174 net/net-s2c.js dur=5 type="asc" len=102400 0.01 % 0.994663657 net/net-s2c.js dur=5 type="asc" len=16777216 0.55 % 0.690648594 net/net-s2c.js dur=5 type="buf" len=102400 1.06 % 0.162661878 net/net-s2c.js dur=5 type="buf" len=16777216 2.21 % 0.458328732 net/net-s2c.js dur=5 type="utf" len=102400 0.47 % 0.346382821 net/net-s2c.js dur=5 type="utf" len=16777216 -1.19 % 0.075676276 net/net-wrap-js-stream-passthrough.js dur=5 type="asc" len=102400 -5.01 % 0.566507367 net/net-wrap-js-stream-passthrough.js dur=5 type="asc" len=16777216 1.81 % 0.382296906 net/net-wrap-js-stream-passthrough.js dur=5 type="buf" len=102400 -4.32 % 0.543143575 net/net-wrap-js-stream-passthrough.js dur=5 type="buf" len=16777216 0.12 % 0.774690856 net/net-wrap-js-stream-passthrough.js dur=5 type="utf" len=102400 2.33 % 0.152586683 net/net-wrap-js-stream-passthrough.js dur=5 type="utf" len=16777216 0.50 % 0.687525683 net/tcp-raw-c2s.js dur=5 type="asc" len=102400 0.05 % 0.917082371 net/tcp-raw-c2s.js dur=5 type="asc" len=16777216 4.17 % ** 0.005564067 net/tcp-raw-c2s.js dur=5 type="buf" len=102400 0.56 % * 0.037673166 net/tcp-raw-c2s.js dur=5 type="buf" len=16777216 0.77 % ** 0.006890503 net/tcp-raw-c2s.js dur=5 type="utf" len=102400 -0.50 % 0.397862701 net/tcp-raw-c2s.js dur=5 type="utf" len=16777216 1.00 % 0.300638263 net/tcp-raw-pipe.js dur=5 type="asc" len=102400 0.82 % 0.722353484 net/tcp-raw-pipe.js dur=5 type="asc" len=16777216 15.00 % 0.070918075 net/tcp-raw-pipe.js dur=5 type="buf" len=102400 -1.03 % 0.819639125 net/tcp-raw-pipe.js dur=5 type="buf" len=16777216 18.35 % 0.329069149 net/tcp-raw-pipe.js dur=5 type="utf" len=102400 -0.27 % 0.984576346 net/tcp-raw-pipe.js dur=5 type="utf" len=16777216 2.78 % 0.362840470 net/tcp-raw-s2c.js dur=5 type="asc" len=102400 -0.15 % 0.820491736 net/tcp-raw-s2c.js dur=5 type="asc" len=16777216 -0.42 % 0.813160796 net/tcp-raw-s2c.js dur=5 type="buf" len=102400 0.26 % 0.615102013 net/tcp-raw-s2c.js dur=5 type="buf" len=16777216 -2.16 % 0.464289164 net/tcp-raw-s2c.js dur=5 type="utf" len=102400 -0.33 % 0.383964275 net/tcp-raw-s2c.js dur=5 type="utf" len=16777216 1.08 % 0.224603980 PR-URL: https://github.com/nodejs/node/pull/18334 Reviewed-By: James M Snell Reviewed-By: Anatoli Papirovski Reviewed-By: Matteo Collina --- lib/_http_server.js | 2 +- src/connection_wrap.cc | 1 + src/js_stream.cc | 55 +----- src/node_http2.cc | 208 ++++++++++++----------- src/node_http2.h | 33 ++-- src/node_http_parser.cc | 75 +++----- src/pipe_wrap.cc | 1 + src/process_wrap.cc | 1 + src/stream_base-inl.h | 85 ++++++++- src/stream_base.cc | 49 +++++- src/stream_base.h | 201 ++++++++++++---------- src/stream_wrap.cc | 62 +++---- src/stream_wrap.h | 22 ++- src/tcp_wrap.cc | 1 + src/tls_wrap.cc | 115 +++---------- src/tls_wrap.h | 24 +-- src/tty_wrap.cc | 1 + test/parallel/test-tls-socket-destroy.js | 1 + 18 files changed, 463 insertions(+), 474 deletions(-) diff --git a/lib/_http_server.js b/lib/_http_server.js index c60119822a98d5..496ebf285c814e 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -666,7 +666,7 @@ function onSocketPause() { function unconsume(parser, socket) { if (socket._handle) { if (parser._consumed) - parser.unconsume(socket._handle._externalStream); + parser.unconsume(); parser._consumed = false; socket.removeListener('pause', onSocketPause); socket.removeListener('resume', onSocketResume); diff --git a/src/connection_wrap.cc b/src/connection_wrap.cc index 8de77f361dcde4..a6cf67ceee2477 100644 --- a/src/connection_wrap.cc +++ b/src/connection_wrap.cc @@ -3,6 +3,7 @@ #include "connect_wrap.h" #include "env-inl.h" #include "pipe_wrap.h" +#include "stream_base-inl.h" #include "stream_wrap.h" #include "tcp_wrap.h" #include "util-inl.h" diff --git a/src/js_stream.cc b/src/js_stream.cc index 7d1115f12ac3e2..9e67a2094ded89 100644 --- a/src/js_stream.cc +++ b/src/js_stream.cc @@ -25,9 +25,6 @@ JSStream::JSStream(Environment* env, Local obj) StreamBase(env) { node::Wrap(obj, this); MakeWeak(this); - - set_alloc_cb({ OnAllocImpl, this }); - set_read_cb({ OnReadImpl, this }); } @@ -35,45 +32,6 @@ JSStream::~JSStream() { } -void JSStream::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) { - buf->base = Malloc(size); - buf->len = size; -} - - -void JSStream::OnReadImpl(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx) { - JSStream* wrap = static_cast(ctx); - CHECK_NE(wrap, nullptr); - Environment* env = wrap->env(); - HandleScope handle_scope(env->isolate()); - Context::Scope context_scope(env->context()); - - if (nread < 0) { - if (buf != nullptr && buf->base != nullptr) - free(buf->base); - wrap->EmitData(nread, Local(), Local()); - return; - } - - if (nread == 0) { - if (buf->base != nullptr) - free(buf->base); - return; - } - - CHECK_LE(static_cast(nread), buf->len); - char* base = node::Realloc(buf->base, nread); - - CHECK_EQ(pending, UV_UNKNOWN_HANDLE); - - Local obj = Buffer::New(env, base, nread).ToLocalChecked(); - wrap->EmitData(nread, obj, Local()); -} - - AsyncWrap* JSStream::GetAsyncWrap() { return static_cast(this); } @@ -212,18 +170,19 @@ void JSStream::ReadBuffer(const FunctionCallbackInfo& args) { char* data = Buffer::Data(args[0]); int len = Buffer::Length(args[0]); - do { - uv_buf_t buf; + // Repeatedly ask the stream's owner for memory, copy the data that we + // just read from JS into those buffers and emit them as reads. + while (len != 0) { + uv_buf_t buf = wrap->EmitAlloc(len); ssize_t avail = len; - wrap->EmitAlloc(len, &buf); if (static_cast(buf.len) < avail) avail = buf.len; memcpy(buf.base, data, avail); data += avail; len -= avail; - wrap->EmitRead(avail, &buf); - } while (len != 0); + wrap->EmitRead(avail, buf); + } } @@ -231,7 +190,7 @@ void JSStream::EmitEOF(const FunctionCallbackInfo& args) { JSStream* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); - wrap->EmitRead(UV_EOF, nullptr); + wrap->EmitRead(UV_EOF); } diff --git a/src/node_http2.cc b/src/node_http2.cc index bd7eeee8655e52..bd2e93a13c208b 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -531,24 +531,12 @@ Http2Session::Http2Session(Environment* env, outgoing_buffers_.reserve(32); } -void Http2Session::Unconsume() { - if (stream_ != nullptr) { - DEBUG_HTTP2SESSION(this, "unconsuming the i/o stream"); - stream_->set_destruct_cb({ nullptr, nullptr }); - stream_->set_alloc_cb({ nullptr, nullptr }); - stream_->set_read_cb({ nullptr, nullptr }); - stream_->Unconsume(); - stream_ = nullptr; - } -} - Http2Session::~Http2Session() { CHECK_EQ(flags_ & SESSION_STATE_HAS_SCOPE, 0); if (!object().IsEmpty()) ClearWrap(object()); persistent().Reset(); CHECK(persistent().IsEmpty()); - Unconsume(); DEBUG_HTTP2SESSION(this, "freeing nghttp2 session"); nghttp2_session_del(session_); } @@ -646,7 +634,8 @@ void Http2Session::Close(uint32_t code, bool socket_closed) { DEBUG_HTTP2SESSION2(this, "terminating session with code %d", code); CHECK_EQ(nghttp2_session_terminate_session(session_, code), 0); } else { - Unconsume(); + if (stream_ != nullptr) + stream_->RemoveStreamListener(this); } // If there are outstanding pings, those will need to be canceled, do @@ -1044,22 +1033,38 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle, stream->statistics_.received_bytes += len; - // There is a single large array buffer for the entire data read from the - // network; create a slice of that array buffer and emit it as the - // received data buffer. - CHECK(!session->stream_buf_ab_.IsEmpty()); - size_t offset = reinterpret_cast(data) - session->stream_buf_; - // Verify that the data offset is inside the current read buffer. - CHECK_LE(offset, session->stream_buf_size_); - - Local buf = - Buffer::New(env, session->stream_buf_ab_, offset, len).ToLocalChecked(); - - stream->EmitData(len, buf, Local()); - if (!stream->IsReading()) - stream->inbound_consumed_data_while_paused_ += len; - else - nghttp2_session_consume_stream(handle, id, len); + // Repeatedly ask the stream's owner for memory, and copy the read data + // into those buffers. + // The typical case is actually the exception here; Http2StreamListeners + // know about the HTTP2 session associated with this stream, so they know + // about the larger from-socket read buffer, so they do not require copying. + do { + uv_buf_t buf = stream->EmitAlloc(len); + ssize_t avail = len; + if (static_cast(buf.len) < avail) + avail = buf.len; + + // `buf.base == nullptr` is the default Http2StreamListener's way + // of saying that it wants a pointer to the raw original. + // Since it has access to the original socket buffer from which the data + // was read in the first place, it can use that to minizime ArrayBuffer + // allocations. + if (LIKELY(buf.base == nullptr)) + buf.base = reinterpret_cast(const_cast(data)); + else + memcpy(buf.base, data, avail); + data += avail; + len -= avail; + stream->EmitRead(avail, buf); + + // If the stream owner (e.g. the JS Http2Stream) wants more data, just + // tell nghttp2 that all data has been consumed. Otherwise, defer until + // more data is being requested. + if (stream->IsReading()) + nghttp2_session_consume_stream(handle, id, avail); + else + stream->inbound_consumed_data_while_paused_ += avail; + } while (len != 0); } return 0; } @@ -1129,6 +1134,38 @@ inline void Http2Session::GetTrailers(Http2Stream* stream, uint32_t* flags) { } } +uv_buf_t Http2StreamListener::OnStreamAlloc(size_t size) { + // See the comments in Http2Session::OnDataChunkReceived + // (which is the only possible call site for this method). + return uv_buf_init(nullptr, size); +} + +void Http2StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { + Http2Stream* stream = static_cast(stream_); + Http2Session* session = stream->session(); + Environment* env = stream->env(); + + if (nread < 0) { + PassReadErrorToPreviousListener(nread); + return; + } + + CHECK(!session->stream_buf_ab_.IsEmpty()); + + // There is a single large array buffer for the entire data read from the + // network; create a slice of that array buffer and emit it as the + // received data buffer. + size_t offset = buf.base - session->stream_buf_.base; + + // Verify that the data offset is inside the current read buffer. + CHECK_LE(offset, session->stream_buf_.len); + CHECK_LE(offset + buf.len, session->stream_buf_.len); + + Local buffer = + Buffer::New(env, session->stream_buf_ab_, offset, nread).ToLocalChecked(); + + stream->CallJSOnreadMethod(nread, buffer); +} Http2Stream::SubmitTrailers::SubmitTrailers( Http2Session* session, @@ -1257,7 +1294,7 @@ inline void Http2Session::HandleDataFrame(const nghttp2_frame* frame) { return; if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { - stream->EmitData(UV_EOF, Local(), Local()); + stream->EmitRead(UV_EOF); } } @@ -1378,16 +1415,15 @@ inline void Http2Session::HandleSettingsFrame(const nghttp2_frame* frame) { } // Callback used when data has been written to the stream. -void Http2Session::OnStreamAfterWriteImpl(WriteWrap* w, int status, void* ctx) { - Http2Session* session = static_cast(ctx); - DEBUG_HTTP2SESSION2(session, "write finished with status %d", status); +void Http2Session::OnStreamAfterWrite(WriteWrap* w, int status) { + DEBUG_HTTP2SESSION2(this, "write finished with status %d", status); // Inform all pending writes about their completion. - session->ClearOutgoing(status); + ClearOutgoing(status); - if (!(session->flags_ & SESSION_STATE_WRITE_SCHEDULED)) { + if (!(flags_ & SESSION_STATE_WRITE_SCHEDULED)) { // Schedule a new write if nghttp2 wants to send data. - session->MaybeScheduleWrite(); + MaybeScheduleWrite(); } } @@ -1625,97 +1661,76 @@ WriteWrap* Http2Session::AllocateSend() { Local obj = env()->write_wrap_constructor_function() ->NewInstance(env()->context()).ToLocalChecked(); - return WriteWrap::New(env(), obj, stream_); -} - -// Allocates the data buffer used to receive inbound data from the i/o stream -void Http2Session::OnStreamAllocImpl(size_t suggested_size, - uv_buf_t* buf, - void* ctx) { - Http2Session* session = static_cast(ctx); - CHECK_EQ(session->stream_buf_, nullptr); - CHECK_EQ(session->stream_buf_size_, 0); - buf->base = session->stream_buf_ = Malloc(suggested_size); - buf->len = session->stream_buf_size_ = suggested_size; - session->IncrementCurrentSessionMemory(suggested_size); + return WriteWrap::New(env(), obj, static_cast(stream_)); } // Callback used to receive inbound data from the i/o stream -void Http2Session::OnStreamReadImpl(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx) { - Http2Session* session = static_cast(ctx); - Http2Scope h2scope(session); - CHECK_NE(session->stream_, nullptr); - DEBUG_HTTP2SESSION2(session, "receiving %d bytes", nread); +void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { + Http2Scope h2scope(this); + CHECK_NE(stream_, nullptr); + DEBUG_HTTP2SESSION2(this, "receiving %d bytes", nread); + IncrementCurrentSessionMemory(buf.len); + CHECK(stream_buf_ab_.IsEmpty()); + if (nread <= 0) { - free(session->stream_buf_); + free(buf.base); if (nread < 0) { - uv_buf_t tmp_buf = uv_buf_init(nullptr, 0); - session->prev_read_cb_.fn(nread, - &tmp_buf, - pending, - session->prev_read_cb_.ctx); + PassReadErrorToPreviousListener(nread); } } else { // Only pass data on if nread > 0 + // Makre sure that there was no read previously active. + CHECK_EQ(stream_buf_.base, nullptr); + CHECK_EQ(stream_buf_.len, 0); + + // Remember the current buffer, so that OnDataChunkReceived knows the + // offset of a DATA frame's data into the socket read buffer. + stream_buf_ = uv_buf_init(buf.base, nread); + // Verify that currently: There is memory allocated into which // the data has been read, and that memory buffer is at least as large // as the amount of data we have read, but we have not yet made an // ArrayBuffer out of it. - CHECK_NE(session->stream_buf_, nullptr); - CHECK_EQ(session->stream_buf_, buf->base); - CHECK_EQ(session->stream_buf_size_, buf->len); - CHECK_GE(session->stream_buf_size_, static_cast(nread)); - CHECK(session->stream_buf_ab_.IsEmpty()); + CHECK_LE(static_cast(nread), stream_buf_.len); - Environment* env = session->env(); - Isolate* isolate = env->isolate(); + Isolate* isolate = env()->isolate(); HandleScope scope(isolate); - Local context = env->context(); - Context::Scope context_scope(context); + Context::Scope context_scope(env()->context()); // Create an array buffer for the read data. DATA frames will be emitted // as slices of this array buffer to avoid having to copy memory. - session->stream_buf_ab_ = + stream_buf_ab_ = ArrayBuffer::New(isolate, - session->stream_buf_, - session->stream_buf_size_, + buf.base, + nread, v8::ArrayBufferCreationMode::kInternalized); - uv_buf_t buf_ = uv_buf_init(buf->base, nread); - session->statistics_.data_received += nread; - ssize_t ret = session->Write(&buf_, 1); + statistics_.data_received += nread; + ssize_t ret = Write(&stream_buf_, 1); // Note: if ssize_t is not defined (e.g. on Win32), nghttp2 will typedef // ssize_t to int. Cast here so that the < 0 check actually works on // Windows. if (static_cast(ret) < 0) { - DEBUG_HTTP2SESSION2(session, "fatal error receiving data: %d", ret); + DEBUG_HTTP2SESSION2(this, "fatal error receiving data: %d", ret); - Local argv[1] = { + Local argv[] = { Integer::New(isolate, ret), }; - session->MakeCallback(env->error_string(), arraysize(argv), argv); + MakeCallback(env()->error_string(), arraysize(argv), argv); } else { - DEBUG_HTTP2SESSION2(session, "processed %d bytes. wants more? %d", ret, - nghttp2_session_want_read(**session)); + DEBUG_HTTP2SESSION2(this, "processed %d bytes. wants more? %d", ret, + nghttp2_session_want_read(session_)); } } // Since we are finished handling this write, reset the stream buffer. // The memory has either been free()d or was handed over to V8. - session->DecrementCurrentSessionMemory(session->stream_buf_size_); - session->stream_buf_ = nullptr; - session->stream_buf_size_ = 0; - session->stream_buf_ab_ = Local(); -} + DecrementCurrentSessionMemory(buf.len); -void Http2Session::OnStreamDestructImpl(void* ctx) { - Http2Session* session = static_cast(ctx); - session->stream_ = nullptr; + stream_buf_ab_ = Local(); + stream_buf_ = uv_buf_init(nullptr, 0); } // Every Http2Session session is tightly bound to a single i/o StreamBase @@ -1724,14 +1739,7 @@ void Http2Session::OnStreamDestructImpl(void* ctx) { // C++ layer via the StreamBase API. void Http2Session::Consume(Local external) { StreamBase* stream = static_cast(external->Value()); - stream->Consume(); - stream_ = stream; - prev_alloc_cb_ = stream->alloc_cb(); - prev_read_cb_ = stream->read_cb(); - stream->set_alloc_cb({ Http2Session::OnStreamAllocImpl, this }); - stream->set_read_cb({ Http2Session::OnStreamReadImpl, this }); - stream->set_after_write_cb({ Http2Session::OnStreamAfterWriteImpl, this }); - stream->set_destruct_cb({ Http2Session::OnStreamDestructImpl, this }); + stream->PushStreamListener(this); DEBUG_HTTP2SESSION(this, "i/o stream consumed"); } @@ -1769,6 +1777,8 @@ Http2Stream::Http2Stream( if (options & STREAM_OPTION_GET_TRAILERS) flags_ |= NGHTTP2_STREAM_FLAG_TRAILERS; + PushStreamListener(&stream_listener_); + if (options & STREAM_OPTION_EMPTY_PAYLOAD) Shutdown(); session->AddStream(this); diff --git a/src/node_http2.h b/src/node_http2.h index 9027ed7feb7dad..bf41d74ed49ad9 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -535,6 +535,12 @@ class Http2Priority { nghttp2_priority_spec spec; }; +class Http2StreamListener : public StreamListener { + public: + uv_buf_t OnStreamAlloc(size_t suggested_size) override; + void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; +}; + class Http2Stream : public AsyncWrap, public StreamBase { public: @@ -747,6 +753,8 @@ class Http2Stream : public AsyncWrap, int64_t fd_offset_ = 0; int64_t fd_length_ = -1; + Http2StreamListener stream_listener_; + friend class Http2Session; }; @@ -798,7 +806,7 @@ class Http2Stream::Provider::Stream : public Http2Stream::Provider { }; -class Http2Session : public AsyncWrap { +class Http2Session : public AsyncWrap, public StreamListener { public: Http2Session(Environment* env, Local wrap, @@ -872,21 +880,11 @@ class Http2Session : public AsyncWrap { size_t self_size() const override { return sizeof(*this); } - char* stream_alloc() { - return stream_buf_; - } - inline void GetTrailers(Http2Stream* stream, uint32_t* flags); - static void OnStreamAllocImpl(size_t suggested_size, - uv_buf_t* buf, - void* ctx); - static void OnStreamReadImpl(ssize_t nread, - const uv_buf_t* bufs, - uv_handle_type pending, - void* ctx); - static void OnStreamAfterWriteImpl(WriteWrap* w, int status, void* ctx); - static void OnStreamDestructImpl(void* ctx); + // Handle reads/writes from the underlying network transport. + void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; + void OnStreamAfterWrite(WriteWrap* w, int status) override; // The JavaScript API static void New(const FunctionCallbackInfo& args); @@ -1074,16 +1072,12 @@ class Http2Session : public AsyncWrap { int flags_ = SESSION_STATE_NONE; // The StreamBase instance being used for i/o - StreamBase* stream_; - StreamResource::Callback prev_alloc_cb_; - StreamResource::Callback prev_read_cb_; padding_strategy_type padding_strategy_ = PADDING_STRATEGY_NONE; // use this to allow timeout tracking during long-lasting writes uint32_t chunks_sent_since_last_write_ = 0; - char* stream_buf_ = nullptr; - size_t stream_buf_size_ = 0; + uv_buf_t stream_buf_ = uv_buf_init(nullptr, 0); v8::Local stream_buf_ab_; size_t max_outstanding_pings_ = DEFAULT_MAX_PINGS; @@ -1099,6 +1093,7 @@ class Http2Session : public AsyncWrap { void ClearOutgoing(int status); friend class Http2Scope; + friend class Http2StreamListener; }; class Http2SessionPerformanceEntry : public PerformanceEntry { diff --git a/src/node_http_parser.cc b/src/node_http_parser.cc index 9debb8a205ef1c..d4044f8bbeea7b 100644 --- a/src/node_http_parser.cc +++ b/src/node_http_parser.cc @@ -144,7 +144,7 @@ struct StringPtr { }; -class Parser : public AsyncWrap { +class Parser : public AsyncWrap, public StreamListener { public: Parser(Environment* env, Local wrap, enum http_parser_type type) : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_HTTPPARSER), @@ -494,14 +494,7 @@ class Parser : public AsyncWrap { Local stream_obj = args[0].As(); StreamBase* stream = static_cast(stream_obj->Value()); CHECK_NE(stream, nullptr); - - stream->Consume(); - - parser->prev_alloc_cb_ = stream->alloc_cb(); - parser->prev_read_cb_ = stream->read_cb(); - - stream->set_alloc_cb({ OnAllocImpl, parser }); - stream->set_read_cb({ OnReadImpl, parser }); + stream->PushStreamListener(parser); } @@ -510,22 +503,10 @@ class Parser : public AsyncWrap { ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder()); // Already unconsumed - if (parser->prev_alloc_cb_.is_empty()) + if (parser->stream_ == nullptr) return; - // Restore stream's callbacks - if (args.Length() == 1 && args[0]->IsExternal()) { - Local stream_obj = args[0].As(); - StreamBase* stream = static_cast(stream_obj->Value()); - CHECK_NE(stream, nullptr); - - stream->set_alloc_cb(parser->prev_alloc_cb_); - stream->set_read_cb(parser->prev_read_cb_); - stream->Unconsume(); - } - - parser->prev_alloc_cb_.clear(); - parser->prev_read_cb_.clear(); + parser->stream_->RemoveStreamListener(parser); } @@ -544,33 +525,19 @@ class Parser : public AsyncWrap { protected: static const size_t kAllocBufferSize = 64 * 1024; - static void OnAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) { - Parser* parser = static_cast(ctx); - Environment* env = parser->env(); + uv_buf_t OnStreamAlloc(size_t suggested_size) override { + if (env()->http_parser_buffer() == nullptr) + env()->set_http_parser_buffer(new char[kAllocBufferSize]); - if (env->http_parser_buffer() == nullptr) - env->set_http_parser_buffer(new char[kAllocBufferSize]); - - buf->base = env->http_parser_buffer(); - buf->len = kAllocBufferSize; + return uv_buf_init(env()->http_parser_buffer(), kAllocBufferSize); } - static void OnReadImpl(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx) { - Parser* parser = static_cast(ctx); - HandleScope scope(parser->env()->isolate()); + void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override { + HandleScope scope(env()->isolate()); if (nread < 0) { - uv_buf_t tmp_buf; - tmp_buf.base = nullptr; - tmp_buf.len = 0; - parser->prev_read_cb_.fn(nread, - &tmp_buf, - pending, - parser->prev_read_cb_.ctx); + PassReadErrorToPreviousListener(nread); return; } @@ -578,27 +545,27 @@ class Parser : public AsyncWrap { if (nread == 0) return; - parser->current_buffer_.Clear(); - Local ret = parser->Execute(buf->base, nread); + current_buffer_.Clear(); + Local ret = Execute(buf.base, nread); // Exception if (ret.IsEmpty()) return; - Local obj = parser->object(); - Local cb = obj->Get(kOnExecute); + Local cb = + object()->Get(env()->context(), kOnExecute).ToLocalChecked(); if (!cb->IsFunction()) return; // Hooks for GetCurrentBuffer - parser->current_buffer_len_ = nread; - parser->current_buffer_data_ = buf->base; + current_buffer_len_ = nread; + current_buffer_data_ = buf.base; - parser->MakeCallback(cb.As(), 1, &ret); + MakeCallback(cb.As(), 1, &ret); - parser->current_buffer_len_ = 0; - parser->current_buffer_data_ = nullptr; + current_buffer_len_ = 0; + current_buffer_data_ = nullptr; } @@ -713,8 +680,6 @@ class Parser : public AsyncWrap { Local current_buffer_; size_t current_buffer_len_; char* current_buffer_data_; - StreamResource::Callback prev_alloc_cb_; - StreamResource::Callback prev_read_cb_; // These are helper functions for filling `http_parser_settings`, which turn // a member function of Parser into a C-style HTTP parser callback. diff --git a/src/pipe_wrap.cc b/src/pipe_wrap.cc index c5958a2271a83e..016ce480b6a809 100644 --- a/src/pipe_wrap.cc +++ b/src/pipe_wrap.cc @@ -29,6 +29,7 @@ #include "node_buffer.h" #include "node_wrap.h" #include "connect_wrap.h" +#include "stream_base-inl.h" #include "stream_wrap.h" #include "util-inl.h" diff --git a/src/process_wrap.cc b/src/process_wrap.cc index b01ef56270767e..314131e1dd319f 100644 --- a/src/process_wrap.cc +++ b/src/process_wrap.cc @@ -22,6 +22,7 @@ #include "env-inl.h" #include "handle_wrap.h" #include "node_wrap.h" +#include "stream_base-inl.h" #include "util-inl.h" #include diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index cdcff67cc55e66..287978a87034eb 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -25,6 +25,87 @@ using v8::Value; using AsyncHooks = Environment::AsyncHooks; + +inline StreamListener::~StreamListener() { + if (stream_ != nullptr) + stream_->RemoveStreamListener(this); +} + +inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) { + CHECK_NE(previous_listener_, nullptr); + previous_listener_->OnStreamRead(nread, + uv_buf_init(nullptr, 0), + UV_UNKNOWN_HANDLE); +} + + +inline StreamResource::~StreamResource() { + while (listener_ != nullptr) { + listener_->OnStreamDestroy(); + RemoveStreamListener(listener_); + } +} + +inline void StreamResource::PushStreamListener(StreamListener* listener) { + CHECK_NE(listener, nullptr); + CHECK_EQ(listener->stream_, nullptr); + + listener->previous_listener_ = listener_; + listener->stream_ = this; + + listener_ = listener; +} + +inline void StreamResource::RemoveStreamListener(StreamListener* listener) { + CHECK_NE(listener, nullptr); + + StreamListener* previous; + StreamListener* current; + + // Remove from the linked list. + for (current = listener_, previous = nullptr; + /* No loop condition because we want a crash if listener is not found */ + ; previous = current, current = current->previous_listener_) { + CHECK_NE(current, nullptr); + if (current == listener) { + if (previous != nullptr) + previous->previous_listener_ = current->previous_listener_; + else + listener_ = listener->previous_listener_; + break; + } + } + + listener->stream_ = nullptr; + listener->previous_listener_ = nullptr; +} + + +inline uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) { + return listener_->OnStreamAlloc(suggested_size); +} + +inline void StreamResource::EmitRead(ssize_t nread, + const uv_buf_t& buf, + uv_handle_type pending) { + if (nread > 0) + bytes_read_ += static_cast(nread); + listener_->OnStreamRead(nread, buf, pending); +} + +inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) { + listener_->OnStreamAfterWrite(w, status); +} + + +inline StreamBase::StreamBase(Environment* env) : env_(env) { + PushStreamListener(&default_listener_); +} + +inline Environment* StreamBase::stream_env() const { + return env_; +} + template void StreamBase::AddMethods(Environment* env, Local t, @@ -70,8 +151,8 @@ void StreamBase::AddMethods(Environment* env, Local(), attributes); - env->SetProtoMethod(t, "readStart", JSMethod); - env->SetProtoMethod(t, "readStop", JSMethod); + env->SetProtoMethod(t, "readStart", JSMethod); + env->SetProtoMethod(t, "readStop", JSMethod); if ((flags & kFlagNoShutdown) == 0) env->SetProtoMethod(t, "shutdown", JSMethod); if ((flags & kFlagHasWritev) != 0) diff --git a/src/stream_base.cc b/src/stream_base.cc index 0fb801ddd57445..9acf2273abd78b 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -34,12 +34,12 @@ template int StreamBase::WriteString( const FunctionCallbackInfo& args); -int StreamBase::ReadStart(const FunctionCallbackInfo& args) { +int StreamBase::ReadStartJS(const FunctionCallbackInfo& args) { return ReadStart(); } -int StreamBase::ReadStop(const FunctionCallbackInfo& args) { +int StreamBase::ReadStopJS(const FunctionCallbackInfo& args) { return ReadStop(); } @@ -437,9 +437,9 @@ void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) { } -void StreamBase::EmitData(ssize_t nread, - Local buf, - Local handle) { +void StreamBase::CallJSOnreadMethod(ssize_t nread, + Local buf, + Local handle) { Environment* env = env_; Local argv[] = { @@ -490,4 +490,43 @@ void StreamResource::ClearError() { // No-op } + +uv_buf_t StreamListener::OnStreamAlloc(size_t suggested_size) { + return uv_buf_init(Malloc(suggested_size), suggested_size); +} + +void StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { + // This cannot be virtual because it is just as valid to override the other + // OnStreamRead() callback. + CHECK(0 && "OnStreamRead() needs to be implemented"); +} + +void StreamListener::OnStreamRead(ssize_t nread, + const uv_buf_t& buf, + uv_handle_type pending) { + CHECK_EQ(pending, UV_UNKNOWN_HANDLE); + OnStreamRead(nread, buf); +} + + +void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { + CHECK_NE(stream_, nullptr); + StreamBase* stream = static_cast(stream_); + Environment* env = stream->stream_env(); + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + + if (nread <= 0) { + free(buf.base); + if (nread < 0) + stream->CallJSOnreadMethod(nread, Local()); + return; + } + + CHECK_LE(static_cast(nread), buf.len); + + Local obj = Buffer::New(env, buf.base, nread).ToLocalChecked(); + stream->CallJSOnreadMethod(nread, obj); +} + } // namespace node diff --git a/src/stream_base.h b/src/stream_base.h index d063176b04a4db..0b176d11819fca 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -15,6 +15,7 @@ namespace node { // Forward declarations class StreamBase; +class StreamResource; template class StreamReq { @@ -123,38 +124,78 @@ class WriteWrap : public ReqWrap, const size_t storage_size_; }; -class StreamResource { + +// This is the generic interface for objects that control Node.js' C++ streams. +// For example, the default `EmitToJSStreamListener` emits a stream's data +// as Buffers in JS, or `TLSWrap` reads and decrypts data from a stream. +class StreamListener { public: - template - struct Callback { - Callback() : fn(nullptr), ctx(nullptr) {} - Callback(T fn, void* ctx) : fn(fn), ctx(ctx) {} - Callback(const Callback&) = default; - - inline bool is_empty() { return fn == nullptr; } - inline void clear() { - fn = nullptr; - ctx = nullptr; - } + virtual ~StreamListener(); + + // This is called when a stream wants to allocate memory immediately before + // reading data into the freshly allocated buffer (i.e. it is always followed + // by a `OnStreamRead()` call). + // This memory may be statically or dynamically allocated; for example, + // a protocol parser may want to read data into a static buffer if it knows + // that all data is going to be fully handled during the next + // `OnStreamRead()` call. + // The returned buffer does not need to contain `suggested_size` bytes. + // The default implementation of this method returns a buffer that has exactly + // the suggested size and is allocated using malloc(). + virtual uv_buf_t OnStreamAlloc(size_t suggested_size); + + // `OnStreamRead()` is called when data is available on the socket and has + // been read into the buffer provided by `OnStreamAlloc()`. + // The `buf` argument is the return value of `uv_buf_t`, or may be a buffer + // with base nullpptr in case of an error. + // `nread` is the number of read bytes (which is at most the buffer length), + // or, if negative, a libuv error code. + // The variant with a `uv_handle_type` argument is used by libuv-backed + // streams for handle transfers (e.g. passing net.Socket instances between + // cluster workers). For all other streams, overriding the simple variant + // should be sufficient. + // By default, the second variant crashes if `pending` is set and otherwise + // calls the simple variant. + virtual void OnStreamRead(ssize_t nread, + const uv_buf_t& buf) = 0; + virtual void OnStreamRead(ssize_t nread, + const uv_buf_t& buf, + uv_handle_type pending); + + // This is called once a Write has finished. `status` may be 0 or, + // if negative, a libuv error code. + virtual void OnStreamAfterWrite(WriteWrap* w, int status) {} + + // This is called immediately before the stream is destroyed. + virtual void OnStreamDestroy() {} - T fn; - void* ctx; - }; + protected: + // Pass along a read error to the `StreamListener` instance that was active + // before this one. For example, a protocol parser does not care about read + // errors and may instead want to let the original handler + // (e.g. the JS handler) take care of the situation. + void PassReadErrorToPreviousListener(ssize_t nread); - typedef void (*AfterWriteCb)(WriteWrap* w, int status, void* ctx); - typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx); - typedef void (*ReadCb)(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx); - typedef void (*DestructCb)(void* ctx); + StreamResource* stream_ = nullptr; + StreamListener* previous_listener_ = nullptr; - StreamResource() : bytes_read_(0) { - } - virtual ~StreamResource() { - if (!destruct_cb_.is_empty()) - destruct_cb_.fn(destruct_cb_.ctx); - } + friend class StreamResource; +}; + + +// A default emitter that just pushes data chunks as Buffer instances to +// JS land via the handle’s .ondata method. +class EmitToJSStreamListener : public StreamListener { + public: + void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; +}; + + +// A generic stream, comparable to JS land’s `Duplex` streams. +// A stream is always controlled through one `StreamListener` instance. +class StreamResource { + public: + virtual ~StreamResource(); virtual int DoShutdown(ShutdownWrap* req_wrap) = 0; virtual int DoTryWrite(uv_buf_t** bufs, size_t* count); @@ -162,50 +203,45 @@ class StreamResource { uv_buf_t* bufs, size_t count, uv_stream_t* send_handle) = 0; - virtual const char* Error() const; - virtual void ClearError(); - - // Events - inline void EmitAfterWrite(WriteWrap* w, int status) { - if (!after_write_cb_.is_empty()) - after_write_cb_.fn(w, status, after_write_cb_.ctx); - } - inline void EmitAlloc(size_t size, uv_buf_t* buf) { - if (!alloc_cb_.is_empty()) - alloc_cb_.fn(size, buf, alloc_cb_.ctx); - } - - inline void EmitRead(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending = UV_UNKNOWN_HANDLE) { - if (nread > 0) - bytes_read_ += static_cast(nread); - if (!read_cb_.is_empty()) - read_cb_.fn(nread, buf, pending, read_cb_.ctx); - } - - inline void set_after_write_cb(Callback c) { - after_write_cb_ = c; - } + // Start reading from the underlying resource. This is called by the consumer + // when more data is desired. + virtual int ReadStart() = 0; + // Stop reading from the underlying resource. This is called by the + // consumer when its buffers are full and no more data can be handled. + virtual int ReadStop() = 0; - inline void set_alloc_cb(Callback c) { alloc_cb_ = c; } - inline void set_read_cb(Callback c) { read_cb_ = c; } - inline void set_destruct_cb(Callback c) { destruct_cb_ = c; } + // Optionally, this may provide an error message to be used for + // failing writes. + virtual const char* Error() const; + // Clear the current error (i.e. that would be returned by Error()). + virtual void ClearError(); - inline Callback after_write_cb() { return after_write_cb_; } - inline Callback alloc_cb() { return alloc_cb_; } - inline Callback read_cb() { return read_cb_; } - inline Callback destruct_cb() { return destruct_cb_; } + // Transfer ownership of this tream to `listener`. The previous listener + // will not receive any more callbacks while the new listener was active. + void PushStreamListener(StreamListener* listener); + // Remove a listener, and, if this was the currently active one, + // transfer ownership back to the previous listener. + void RemoveStreamListener(StreamListener* listener); protected: - Callback after_write_cb_; - Callback alloc_cb_; - Callback read_cb_; - Callback destruct_cb_; - uint64_t bytes_read_; + // Call the current listener's OnStreamAlloc() method. + uv_buf_t EmitAlloc(size_t suggested_size); + // Call the current listener's OnStreamRead() method and update the + // stream's read byte counter. + void EmitRead(ssize_t nread, + const uv_buf_t& buf = uv_buf_init(nullptr, 0), + uv_handle_type pending = UV_UNKNOWN_HANDLE); + // Call the current listener's OnStreamAfterWrite() method. + void EmitAfterWrite(WriteWrap* w, int status); + + StreamListener* listener_ = nullptr; + uint64_t bytes_read_ = 0; + + friend class StreamListener; }; + class StreamBase : public StreamResource { public: enum Flags { @@ -224,40 +260,29 @@ class StreamBase : public StreamResource { virtual bool IsIPCPipe(); virtual int GetFD(); - virtual int ReadStart() = 0; - virtual int ReadStop() = 0; - - inline void Consume() { - CHECK_EQ(consumed_, false); - consumed_ = true; - } - - inline void Unconsume() { - CHECK_EQ(consumed_, true); - consumed_ = false; - } - - void EmitData(ssize_t nread, - v8::Local buf, - v8::Local handle); + void CallJSOnreadMethod( + ssize_t nread, + v8::Local buf, + v8::Local handle = v8::Local()); // These are called by the respective {Write,Shutdown}Wrap class. virtual void AfterShutdown(ShutdownWrap* req, int status); virtual void AfterWrite(WriteWrap* req, int status); - protected: - explicit StreamBase(Environment* env) : env_(env), consumed_(false) { - } + // This is named `stream_env` to avoid name clashes, because a lot of + // subclasses are also `BaseObject`s. + Environment* stream_env() const; - virtual ~StreamBase() = default; + protected: + explicit StreamBase(Environment* env); // One of these must be implemented virtual AsyncWrap* GetAsyncWrap() = 0; virtual v8::Local GetObject(); // JS Methods - int ReadStart(const v8::FunctionCallbackInfo& args); - int ReadStop(const v8::FunctionCallbackInfo& args); + int ReadStartJS(const v8::FunctionCallbackInfo& args); + int ReadStopJS(const v8::FunctionCallbackInfo& args); int Shutdown(const v8::FunctionCallbackInfo& args); int Writev(const v8::FunctionCallbackInfo& args); int WriteBuffer(const v8::FunctionCallbackInfo& args); @@ -280,7 +305,7 @@ class StreamBase : public StreamResource { private: Environment* env_; - bool consumed_; + EmitToJSStreamListener default_listener_; }; } // namespace node diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index b639d945004cfa..0be73f9114adb1 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -93,8 +93,7 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env, provider), StreamBase(env), stream_(stream) { - set_alloc_cb({ OnAllocImpl, this }); - set_read_cb({ OnReadImpl, this }); + PushStreamListener(this); } @@ -157,23 +156,18 @@ int LibuvStreamWrap::ReadStop() { void LibuvStreamWrap::OnAlloc(uv_handle_t* handle, - size_t suggested_size, - uv_buf_t* buf) { + size_t suggested_size, + uv_buf_t* buf) { LibuvStreamWrap* wrap = static_cast(handle->data); HandleScope scope(wrap->env()->isolate()); Context::Scope context_scope(wrap->env()->context()); CHECK_EQ(wrap->stream(), reinterpret_cast(handle)); - return wrap->EmitAlloc(suggested_size, buf); + *buf = wrap->EmitAlloc(suggested_size); } -void LibuvStreamWrap::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) { - buf->base = node::Malloc(size); - buf->len = size; -} - template static Local AcceptHandle(Environment* env, LibuvStreamWrap* parent) { @@ -196,51 +190,41 @@ static Local AcceptHandle(Environment* env, LibuvStreamWrap* parent) { } -void LibuvStreamWrap::OnReadImpl(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx) { - LibuvStreamWrap* wrap = static_cast(ctx); - Environment* env = wrap->env(); - HandleScope handle_scope(env->isolate()); - Context::Scope context_scope(env->context()); - - Local pending_obj; +void LibuvStreamWrap::OnStreamRead(ssize_t nread, + const uv_buf_t& buf, + uv_handle_type pending) { + HandleScope handle_scope(env()->isolate()); + Context::Scope context_scope(env()->context()); - if (nread < 0) { - if (buf->base != nullptr) - free(buf->base); - wrap->EmitData(nread, Local(), pending_obj); + if (nread <= 0) { + free(buf.base); + if (nread < 0) + CallJSOnreadMethod(nread, Local()); return; } - if (nread == 0) { - if (buf->base != nullptr) - free(buf->base); - return; - } + CHECK_LE(static_cast(nread), buf.len); - CHECK_LE(static_cast(nread), buf->len); - char* base = node::Realloc(buf->base, nread); + Local pending_obj; if (pending == UV_TCP) { - pending_obj = AcceptHandle(env, wrap); + pending_obj = AcceptHandle(env(), this); } else if (pending == UV_NAMED_PIPE) { - pending_obj = AcceptHandle(env, wrap); + pending_obj = AcceptHandle(env(), this); } else if (pending == UV_UDP) { - pending_obj = AcceptHandle(env, wrap); + pending_obj = AcceptHandle(env(), this); } else { CHECK_EQ(pending, UV_UNKNOWN_HANDLE); } - Local obj = Buffer::New(env, base, nread).ToLocalChecked(); - wrap->EmitData(nread, obj, pending_obj); + Local obj = Buffer::New(env(), buf.base, nread).ToLocalChecked(); + CallJSOnreadMethod(nread, obj, pending_obj); } void LibuvStreamWrap::OnRead(uv_stream_t* handle, - ssize_t nread, - const uv_buf_t* buf) { + ssize_t nread, + const uv_buf_t* buf) { LibuvStreamWrap* wrap = static_cast(handle->data); HandleScope scope(wrap->env()->isolate()); Context::Scope context_scope(wrap->env()->context()); @@ -263,7 +247,7 @@ void LibuvStreamWrap::OnRead(uv_stream_t* handle, } } - wrap->EmitRead(nread, buf, type); + wrap->EmitRead(nread, *buf, type); } diff --git a/src/stream_wrap.h b/src/stream_wrap.h index 0146d41c6e8c7b..129006b1600c6c 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -33,7 +33,9 @@ namespace node { -class LibuvStreamWrap : public HandleWrap, public StreamBase { +class LibuvStreamWrap : public HandleWrap, + public StreamListener, + public StreamBase { public: static void Initialize(v8::Local target, v8::Local unused, @@ -79,9 +81,6 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase { uv_stream_t* stream, AsyncWrap::ProviderType provider); - ~LibuvStreamWrap() { - } - AsyncWrap* GetAsyncWrap() override; static void AddMethods(Environment* env, @@ -105,11 +104,16 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase { static void AfterUvShutdown(uv_shutdown_t* req, int status); // Resource interface implementation - static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx); - static void OnReadImpl(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx); + void OnStreamRead(ssize_t nread, + const uv_buf_t& buf) override { + CHECK(0 && "must not be called"); + } + void OnStreamRead(ssize_t nread, + const uv_buf_t& buf, + uv_handle_type pending) override; + void OnStreamAfterWrite(WriteWrap* w, int status) override { + previous_listener_->OnStreamAfterWrite(w, status); + } void AfterWrite(WriteWrap* req_wrap, int status) override; diff --git a/src/tcp_wrap.cc b/src/tcp_wrap.cc index 3a0a3f295e2c72..a0a58fb1b5cc8d 100644 --- a/src/tcp_wrap.cc +++ b/src/tcp_wrap.cc @@ -27,6 +27,7 @@ #include "node_buffer.h" #include "node_wrap.h" #include "connect_wrap.h" +#include "stream_base-inl.h" #include "stream_wrap.h" #include "util-inl.h" diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index 18b3cf01f406f0..971dbb857f77f5 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -59,7 +59,6 @@ TLSWrap::TLSWrap(Environment* env, SSLWrap(env, sc, kind), StreamBase(env), sc_(sc), - stream_(stream), enc_in_(nullptr), enc_out_(nullptr), write_size_(0), @@ -78,14 +77,7 @@ TLSWrap::TLSWrap(Environment* env, SSL_CTX_sess_set_get_cb(sc_->ctx_, SSLWrap::GetSessionCallback); SSL_CTX_sess_set_new_cb(sc_->ctx_, SSLWrap::NewSessionCallback); - stream_->Consume(); - stream_->set_after_write_cb({ OnAfterWriteImpl, this }); - stream_->set_alloc_cb({ OnAllocImpl, this }); - stream_->set_read_cb({ OnReadImpl, this }); - stream_->set_destruct_cb({ OnDestructImpl, this }); - - set_alloc_cb({ OnAllocSelf, this }); - set_read_cb({ OnReadSelf, this }); + stream->PushStreamListener(this); InitSSL(); } @@ -100,19 +92,6 @@ TLSWrap::~TLSWrap() { #ifdef SSL_CTRL_SET_TLSEXT_SERVERNAME_CB sni_context_.Reset(); #endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB - - // See test/parallel/test-tls-transport-destroy-after-own-gc.js: - // If this TLSWrap is garbage collected, we cannot allow callbacks to be - // called on this stream. - - if (stream_ == nullptr) - return; - stream_->set_destruct_cb({ nullptr, nullptr }); - stream_->set_after_write_cb({ nullptr, nullptr }); - stream_->set_alloc_cb({ nullptr, nullptr }); - stream_->set_read_cb({ nullptr, nullptr }); - stream_->set_destruct_cb({ nullptr, nullptr }); - stream_->Unconsume(); } @@ -208,15 +187,13 @@ void TLSWrap::Receive(const FunctionCallbackInfo& args) { char* data = Buffer::Data(args[0]); size_t len = Buffer::Length(args[0]); - uv_buf_t buf; - // Copy given buffer entirely or partiall if handle becomes closed while (len > 0 && wrap->IsAlive() && !wrap->IsClosing()) { - wrap->stream_->EmitAlloc(len, &buf); + uv_buf_t buf = wrap->OnStreamAlloc(len); size_t copy = buf.len > len ? len : buf.len; memcpy(buf.base, data, copy); buf.len = copy; - wrap->stream_->EmitRead(buf.len, &buf); + wrap->OnStreamRead(copy, buf); data += copy; len -= copy; @@ -307,7 +284,7 @@ void TLSWrap::EncOut() { ->NewInstance(env()->context()).ToLocalChecked(); WriteWrap* write_req = WriteWrap::New(env(), req_wrap_obj, - stream_); + static_cast(stream_)); uv_buf_t buf[arraysize(data)]; for (size_t i = 0; i < count; i++) @@ -324,7 +301,7 @@ void TLSWrap::EncOut() { } -void TLSWrap::EncOutAfterWrite(WriteWrap* req_wrap, int status) { +void TLSWrap::OnStreamAfterWrite(WriteWrap* req_wrap, int status) { // We should not be getting here after `DestroySSL`, because all queued writes // must be invoked with UV_ECANCELED CHECK_NE(ssl_, nullptr); @@ -421,12 +398,11 @@ void TLSWrap::ClearOut() { while (read > 0) { int avail = read; - uv_buf_t buf; - EmitAlloc(avail, &buf); + uv_buf_t buf = EmitAlloc(avail); if (static_cast(buf.len) < avail) avail = buf.len; memcpy(buf.base, current, avail); - EmitRead(avail, &buf); + EmitRead(avail, buf); // Caveat emptor: OnRead() calls into JS land which can result in // the SSL context object being destroyed. We have to carefully @@ -442,7 +418,7 @@ void TLSWrap::ClearOut() { int flags = SSL_get_shutdown(ssl_); if (!eof_ && flags & SSL_RECEIVED_SHUTDOWN) { eof_ = true; - EmitRead(UV_EOF, nullptr); + EmitRead(UV_EOF); } // We need to check whether an error occurred or the connection was @@ -524,22 +500,24 @@ AsyncWrap* TLSWrap::GetAsyncWrap() { bool TLSWrap::IsIPCPipe() { - return stream_->IsIPCPipe(); + return static_cast(stream_)->IsIPCPipe(); } int TLSWrap::GetFD() { - return stream_->GetFD(); + return static_cast(stream_)->GetFD(); } bool TLSWrap::IsAlive() { - return ssl_ != nullptr && stream_ != nullptr && stream_->IsAlive(); + return ssl_ != nullptr && + stream_ != nullptr && + static_cast(stream_)->IsAlive(); } bool TLSWrap::IsClosing() { - return stream_->IsClosing(); + return static_cast(stream_)->IsClosing(); } @@ -638,62 +616,16 @@ int TLSWrap::DoWrite(WriteWrap* w, } -void TLSWrap::OnAfterWriteImpl(WriteWrap* w, int status, void* ctx) { - TLSWrap* wrap = static_cast(ctx); - wrap->EncOutAfterWrite(w, status); -} - - -void TLSWrap::OnAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) { - TLSWrap* wrap = static_cast(ctx); - - if (wrap->ssl_ == nullptr) { - *buf = uv_buf_init(nullptr, 0); - return; - } - - size_t size = 0; - buf->base = crypto::NodeBIO::FromBIO(wrap->enc_in_)->PeekWritable(&size); - buf->len = size; -} - - -void TLSWrap::OnReadImpl(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx) { - TLSWrap* wrap = static_cast(ctx); - wrap->DoRead(nread, buf, pending); -} - - -void TLSWrap::OnDestructImpl(void* ctx) { - TLSWrap* wrap = static_cast(ctx); - wrap->clear_stream(); -} - - -void TLSWrap::OnAllocSelf(size_t suggested_size, uv_buf_t* buf, void* ctx) { - buf->base = node::Malloc(suggested_size); - buf->len = suggested_size; -} - +uv_buf_t TLSWrap::OnStreamAlloc(size_t suggested_size) { + CHECK_NE(ssl_, nullptr); -void TLSWrap::OnReadSelf(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx) { - TLSWrap* wrap = static_cast(ctx); - Local buf_obj; - if (buf != nullptr) - buf_obj = Buffer::New(wrap->env(), buf->base, buf->len).ToLocalChecked(); - wrap->EmitData(nread, buf_obj, Local()); + size_t size = suggested_size; + char* base = crypto::NodeBIO::FromBIO(enc_in_)->PeekWritable(&size); + return uv_buf_init(base, size); } -void TLSWrap::DoRead(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending) { +void TLSWrap::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { if (nread < 0) { // Error should be emitted only after all data was read ClearOut(); @@ -705,13 +637,13 @@ void TLSWrap::DoRead(ssize_t nread, eof_ = true; } - EmitRead(nread, nullptr); + EmitRead(nread); return; } // Only client connections can receive data if (ssl_ == nullptr) { - EmitRead(UV_EPROTO, nullptr); + EmitRead(UV_EPROTO); return; } @@ -800,6 +732,9 @@ void TLSWrap::DestroySSL(const FunctionCallbackInfo& args) { // Destroy the SSL structure and friends wrap->SSLWrap::DestroySSL(); + + if (wrap->stream_ != nullptr) + wrap->stream_->RemoveStreamListener(wrap); } diff --git a/src/tls_wrap.h b/src/tls_wrap.h index ae83c82c3226fd..a1f0b99e86beec 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -48,7 +48,8 @@ class NodeBIO; class TLSWrap : public AsyncWrap, public crypto::SSLWrap, - public StreamBase { + public StreamBase, + public StreamListener { public: ~TLSWrap() override; @@ -76,8 +77,6 @@ class TLSWrap : public AsyncWrap, size_t self_size() const override { return sizeof(*this); } - void clear_stream() { stream_ = nullptr; } - protected: static const int kClearOutChunkSize = 16384; @@ -98,7 +97,6 @@ class TLSWrap : public AsyncWrap, static void SSLInfoCallback(const SSL* ssl_, int where, int ret); void InitSSL(); void EncOut(); - void EncOutAfterWrite(WriteWrap* req_wrap, int status); bool ClearIn(); void ClearOut(); bool InvokeQueued(int status, const char* error_str = nullptr); @@ -119,20 +117,9 @@ class TLSWrap : public AsyncWrap, bool IsIPCPipe() override; // Resource implementation - static void OnAfterWriteImpl(WriteWrap* w, int status, void* ctx); - static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx); - static void OnReadImpl(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx); - static void OnAllocSelf(size_t size, uv_buf_t* buf, void* ctx); - static void OnReadSelf(ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending, - void* ctx); - static void OnDestructImpl(void* ctx); - - void DoRead(ssize_t nread, const uv_buf_t* buf, uv_handle_type pending); + void OnStreamAfterWrite(WriteWrap* w, int status) override; + uv_buf_t OnStreamAlloc(size_t size) override; + void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; v8::Local GetSSLError(int status, int* err, std::string* msg); @@ -154,7 +141,6 @@ class TLSWrap : public AsyncWrap, #endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB crypto::SecureContext* sc_; - StreamBase* stream_; BIO* enc_in_; BIO* enc_out_; std::vector pending_cleartext_input_; diff --git a/src/tty_wrap.cc b/src/tty_wrap.cc index fae39158ef2bcf..9977738afcbfd5 100644 --- a/src/tty_wrap.cc +++ b/src/tty_wrap.cc @@ -26,6 +26,7 @@ #include "node_buffer.h" #include "node_wrap.h" #include "req_wrap-inl.h" +#include "stream_base-inl.h" #include "stream_wrap.h" #include "util-inl.h" diff --git a/test/parallel/test-tls-socket-destroy.js b/test/parallel/test-tls-socket-destroy.js index f62b6f905296db..6f1d4b4186b74f 100644 --- a/test/parallel/test-tls-socket-destroy.js +++ b/test/parallel/test-tls-socket-destroy.js @@ -19,6 +19,7 @@ const server = net.createServer(common.mustCall((conn) => { const socket = new tls.TLSSocket(conn, options); socket.once('data', common.mustCall(() => { socket._destroySSL(); // Should not crash. + socket.destroy(); server.close(); })); })); From 69f38319a28ba651c9ec1ecba7bdd90fb43bbf52 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Thu, 25 Jan 2018 00:43:06 +0100 Subject: [PATCH 2/2] src: simplify transferring handles for libuv streams Instead of passing along the handle object, just set it as a property on the stream handle object and let the read handler grab it from there. PR-URL: https://github.com/nodejs/node/pull/18334 Reviewed-By: James M Snell Reviewed-By: Anatoli Papirovski Reviewed-By: Matteo Collina --- lib/internal/child_process.js | 5 +- src/env.h | 1 + src/stream_base-inl.h | 10 ++-- src/stream_base.cc | 23 +------- src/stream_base.h | 18 +------ src/stream_wrap.cc | 98 ++++++++++++++--------------------- src/stream_wrap.h | 26 ++-------- 7 files changed, 53 insertions(+), 128 deletions(-) diff --git a/lib/internal/child_process.js b/lib/internal/child_process.js index eb17ec21d406ba..2bade01f95f209 100644 --- a/lib/internal/child_process.js +++ b/lib/internal/child_process.js @@ -465,7 +465,10 @@ function setupChannel(target, channel) { var jsonBuffer = ''; var pendingHandle = null; channel.buffering = false; - channel.onread = function(nread, pool, recvHandle) { + channel.pendingHandle = null; + channel.onread = function(nread, pool) { + const recvHandle = channel.pendingHandle; + channel.pendingHandle = null; // TODO(bnoordhuis) Check that nread > 0. if (pool) { if (recvHandle) diff --git a/src/env.h b/src/env.h index d73be8156ecebf..734261359565c4 100644 --- a/src/env.h +++ b/src/env.h @@ -210,6 +210,7 @@ class ModuleWrap; V(owner_string, "owner") \ V(parse_error_string, "Parse Error") \ V(path_string, "path") \ + V(pending_handle_string, "pendingHandle") \ V(pbkdf2_error_string, "PBKDF2 Error") \ V(pid_string, "pid") \ V(pipe_string, "pipe") \ diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index 287978a87034eb..76922c1d8af77d 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -33,9 +33,7 @@ inline StreamListener::~StreamListener() { inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) { CHECK_NE(previous_listener_, nullptr); - previous_listener_->OnStreamRead(nread, - uv_buf_init(nullptr, 0), - UV_UNKNOWN_HANDLE); + previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0)); } @@ -85,12 +83,10 @@ inline uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) { return listener_->OnStreamAlloc(suggested_size); } -inline void StreamResource::EmitRead(ssize_t nread, - const uv_buf_t& buf, - uv_handle_type pending) { +inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) { if (nread > 0) bytes_read_ += static_cast(nread); - listener_->OnStreamRead(nread, buf, pending); + listener_->OnStreamRead(nread, buf); } inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) { diff --git a/src/stream_base.cc b/src/stream_base.cc index 9acf2273abd78b..8bdcebe88ab19f 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -437,23 +437,17 @@ void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) { } -void StreamBase::CallJSOnreadMethod(ssize_t nread, - Local buf, - Local handle) { +void StreamBase::CallJSOnreadMethod(ssize_t nread, Local buf) { Environment* env = env_; Local argv[] = { Integer::New(env->isolate(), nread), - buf, - handle + buf }; if (argv[1].IsEmpty()) argv[1] = Undefined(env->isolate()); - if (argv[2].IsEmpty()) - argv[2] = Undefined(env->isolate()); - AsyncWrap* wrap = GetAsyncWrap(); CHECK_NE(wrap, nullptr); wrap->MakeCallback(env->onread_string(), arraysize(argv), argv); @@ -495,19 +489,6 @@ uv_buf_t StreamListener::OnStreamAlloc(size_t suggested_size) { return uv_buf_init(Malloc(suggested_size), suggested_size); } -void StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { - // This cannot be virtual because it is just as valid to override the other - // OnStreamRead() callback. - CHECK(0 && "OnStreamRead() needs to be implemented"); -} - -void StreamListener::OnStreamRead(ssize_t nread, - const uv_buf_t& buf, - uv_handle_type pending) { - CHECK_EQ(pending, UV_UNKNOWN_HANDLE); - OnStreamRead(nread, buf); -} - void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { CHECK_NE(stream_, nullptr); diff --git a/src/stream_base.h b/src/stream_base.h index 0b176d11819fca..f18b6bda0a08a4 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -150,17 +150,8 @@ class StreamListener { // with base nullpptr in case of an error. // `nread` is the number of read bytes (which is at most the buffer length), // or, if negative, a libuv error code. - // The variant with a `uv_handle_type` argument is used by libuv-backed - // streams for handle transfers (e.g. passing net.Socket instances between - // cluster workers). For all other streams, overriding the simple variant - // should be sufficient. - // By default, the second variant crashes if `pending` is set and otherwise - // calls the simple variant. virtual void OnStreamRead(ssize_t nread, const uv_buf_t& buf) = 0; - virtual void OnStreamRead(ssize_t nread, - const uv_buf_t& buf, - uv_handle_type pending); // This is called once a Write has finished. `status` may be 0 or, // if negative, a libuv error code. @@ -229,9 +220,7 @@ class StreamResource { uv_buf_t EmitAlloc(size_t suggested_size); // Call the current listener's OnStreamRead() method and update the // stream's read byte counter. - void EmitRead(ssize_t nread, - const uv_buf_t& buf = uv_buf_init(nullptr, 0), - uv_handle_type pending = UV_UNKNOWN_HANDLE); + void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0)); // Call the current listener's OnStreamAfterWrite() method. void EmitAfterWrite(WriteWrap* w, int status); @@ -260,10 +249,7 @@ class StreamBase : public StreamResource { virtual bool IsIPCPipe(); virtual int GetFD(); - void CallJSOnreadMethod( - ssize_t nread, - v8::Local buf, - v8::Local handle = v8::Local()); + void CallJSOnreadMethod(ssize_t nread, v8::Local buf); // These are called by the respective {Write,Shutdown}Wrap class. virtual void AfterShutdown(ShutdownWrap* req, int status); diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index 0be73f9114adb1..bc10cf80e828f1 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -93,7 +93,6 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env, provider), StreamBase(env), stream_(stream) { - PushStreamListener(this); } @@ -146,7 +145,13 @@ bool LibuvStreamWrap::IsIPCPipe() { int LibuvStreamWrap::ReadStart() { - return uv_read_start(stream(), OnAlloc, OnRead); + return uv_read_start(stream(), [](uv_handle_t* handle, + size_t suggested_size, + uv_buf_t* buf) { + static_cast(handle->data)->OnUvAlloc(suggested_size, buf); + }, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { + static_cast(stream->data)->OnUvRead(nread, buf); + }); } @@ -155,16 +160,11 @@ int LibuvStreamWrap::ReadStop() { } -void LibuvStreamWrap::OnAlloc(uv_handle_t* handle, - size_t suggested_size, - uv_buf_t* buf) { - LibuvStreamWrap* wrap = static_cast(handle->data); - HandleScope scope(wrap->env()->isolate()); - Context::Scope context_scope(wrap->env()->context()); - - CHECK_EQ(wrap->stream(), reinterpret_cast(handle)); +void LibuvStreamWrap::OnUvAlloc(size_t suggested_size, uv_buf_t* buf) { + HandleScope scope(env()->isolate()); + Context::Scope context_scope(env()->context()); - *buf = wrap->EmitAlloc(suggested_size); + *buf = EmitAlloc(suggested_size); } @@ -190,64 +190,47 @@ static Local AcceptHandle(Environment* env, LibuvStreamWrap* parent) { } -void LibuvStreamWrap::OnStreamRead(ssize_t nread, - const uv_buf_t& buf, - uv_handle_type pending) { - HandleScope handle_scope(env()->isolate()); +void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) { + HandleScope scope(env()->isolate()); Context::Scope context_scope(env()->context()); - - if (nread <= 0) { - free(buf.base); - if (nread < 0) - CallJSOnreadMethod(nread, Local()); - return; - } - - CHECK_LE(static_cast(nread), buf.len); - - Local pending_obj; - - if (pending == UV_TCP) { - pending_obj = AcceptHandle(env(), this); - } else if (pending == UV_NAMED_PIPE) { - pending_obj = AcceptHandle(env(), this); - } else if (pending == UV_UDP) { - pending_obj = AcceptHandle(env(), this); - } else { - CHECK_EQ(pending, UV_UNKNOWN_HANDLE); - } - - Local obj = Buffer::New(env(), buf.base, nread).ToLocalChecked(); - CallJSOnreadMethod(nread, obj, pending_obj); -} - - -void LibuvStreamWrap::OnRead(uv_stream_t* handle, - ssize_t nread, - const uv_buf_t* buf) { - LibuvStreamWrap* wrap = static_cast(handle->data); - HandleScope scope(wrap->env()->isolate()); - Context::Scope context_scope(wrap->env()->context()); uv_handle_type type = UV_UNKNOWN_HANDLE; - if (wrap->is_named_pipe_ipc() && - uv_pipe_pending_count(reinterpret_cast(handle)) > 0) { - type = uv_pipe_pending_type(reinterpret_cast(handle)); + if (is_named_pipe_ipc() && + uv_pipe_pending_count(reinterpret_cast(stream())) > 0) { + type = uv_pipe_pending_type(reinterpret_cast(stream())); } // We should not be getting this callback if someone as already called // uv_close() on the handle. - CHECK_EQ(wrap->persistent().IsEmpty(), false); + CHECK_EQ(persistent().IsEmpty(), false); if (nread > 0) { - if (wrap->is_tcp()) { + if (is_tcp()) { NODE_COUNT_NET_BYTES_RECV(nread); - } else if (wrap->is_named_pipe()) { + } else if (is_named_pipe()) { NODE_COUNT_PIPE_BYTES_RECV(nread); } + + Local pending_obj; + + if (type == UV_TCP) { + pending_obj = AcceptHandle(env(), this); + } else if (type == UV_NAMED_PIPE) { + pending_obj = AcceptHandle(env(), this); + } else if (type == UV_UDP) { + pending_obj = AcceptHandle(env(), this); + } else { + CHECK_EQ(type, UV_UNKNOWN_HANDLE); + } + + if (!pending_obj.IsEmpty()) { + object()->Set(env()->context(), + env()->pending_handle_string(), + pending_obj).FromJust(); + } } - wrap->EmitRead(nread, *buf, type); + EmitRead(nread, *buf); } @@ -373,11 +356,6 @@ void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) { req_wrap->Done(status); } - -void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) { - StreamBase::AfterWrite(w, status); -} - } // namespace node NODE_BUILTIN_MODULE_CONTEXT_AWARE(stream_wrap, diff --git a/src/stream_wrap.h b/src/stream_wrap.h index 129006b1600c6c..e5ad25b91e6fea 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -33,9 +33,7 @@ namespace node { -class LibuvStreamWrap : public HandleWrap, - public StreamListener, - public StreamBase { +class LibuvStreamWrap : public HandleWrap, public StreamBase { public: static void Initialize(v8::Local target, v8::Local unused, @@ -93,30 +91,12 @@ class LibuvStreamWrap : public HandleWrap, static void SetBlocking(const v8::FunctionCallbackInfo& args); // Callbacks for libuv - static void OnAlloc(uv_handle_t* handle, - size_t suggested_size, - uv_buf_t* buf); + void OnUvAlloc(size_t suggested_size, uv_buf_t* buf); + void OnUvRead(ssize_t nread, const uv_buf_t* buf); - static void OnRead(uv_stream_t* handle, - ssize_t nread, - const uv_buf_t* buf); static void AfterUvWrite(uv_write_t* req, int status); static void AfterUvShutdown(uv_shutdown_t* req, int status); - // Resource interface implementation - void OnStreamRead(ssize_t nread, - const uv_buf_t& buf) override { - CHECK(0 && "must not be called"); - } - void OnStreamRead(ssize_t nread, - const uv_buf_t& buf, - uv_handle_type pending) override; - void OnStreamAfterWrite(WriteWrap* w, int status) override { - previous_listener_->OnStreamAfterWrite(w, status); - } - - void AfterWrite(WriteWrap* req_wrap, int status) override; - uv_stream_t* const stream_; };