From 9d040477df657a9f9ec865f3c4e2ececd2894fb1 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Mon, 15 Jul 2019 22:17:45 +0200 Subject: [PATCH] src: allow generic C++ callables in SetImmediate() Modify the native `SetImmediate()` functions to take generic C++ callables as arguments. This makes passing arguments to the callback easier, and in particular, it allows passing `std::unique_ptr`s directly, which in turn makes sure that the data they point to is deleted if the `Environment` is torn down before the callback can run. --- src/async_wrap.cc | 4 +-- src/async_wrap.h | 2 +- src/cares_wrap.cc | 6 ++--- src/env-inl.h | 67 ++++++++++++++++++++++++++++++++++------------ src/env.cc | 66 +++++++++++++++++++++------------------------ src/env.h | 59 ++++++++++++++++++++++++++++------------ src/node_api.cc | 42 ++++++++++++++++------------- src/node_file.cc | 18 ++++++------- src/node_http2.cc | 56 +++++++++++++++----------------------- src/node_perf.cc | 20 +++++++------- src/stream_pipe.cc | 10 +++---- src/tls_wrap.cc | 19 +++++++------ 12 files changed, 206 insertions(+), 163 deletions(-) diff --git a/src/async_wrap.cc b/src/async_wrap.cc index 83b661a12db0a8..8410dd2e0d8c65 100644 --- a/src/async_wrap.cc +++ b/src/async_wrap.cc @@ -87,7 +87,7 @@ struct AsyncWrapObject : public AsyncWrap { SET_SELF_SIZE(AsyncWrapObject) }; -void AsyncWrap::DestroyAsyncIdsCallback(Environment* env, void* data) { +void AsyncWrap::DestroyAsyncIdsCallback(Environment* env) { Local fn = env->async_hooks_destroy_function(); TryCatchScope try_catch(env, TryCatchScope::CatchMode::kFatal); @@ -642,7 +642,7 @@ void AsyncWrap::EmitDestroy(Environment* env, double async_id) { } if (env->destroy_async_id_list()->empty()) { - env->SetUnrefImmediate(DestroyAsyncIdsCallback, nullptr); + env->SetUnrefImmediate(&DestroyAsyncIdsCallback); } env->destroy_async_id_list()->push_back(async_id); diff --git a/src/async_wrap.h b/src/async_wrap.h index 546f5130e01fc6..0a29264189aa98 100644 --- a/src/async_wrap.h +++ b/src/async_wrap.h @@ -154,7 +154,7 @@ class AsyncWrap : public BaseObject { static void EmitTraceEventAfter(ProviderType type, double async_id); void EmitTraceEventDestroy(); - static void DestroyAsyncIdsCallback(Environment* env, void* data); + static void DestroyAsyncIdsCallback(Environment* env); inline ProviderType provider_type() const; inline ProviderType set_provider_type(ProviderType provider); diff --git a/src/cares_wrap.cc b/src/cares_wrap.cc index 85d0fdbde64f20..96062cb48199e3 100644 --- a/src/cares_wrap.cc +++ b/src/cares_wrap.cc @@ -690,9 +690,9 @@ class QueryWrap : public AsyncWrap { } void QueueResponseCallback(int status) { - env()->SetImmediate([](Environment*, void* data) { - static_cast(data)->AfterResponse(); - }, this, object()); + env()->SetImmediate([this](Environment*) { + AfterResponse(); + }, object()); channel_->set_query_last_ok(status != ARES_ECONNREFUSED); channel_->ModifyActivityQueryCount(-1); diff --git a/src/env-inl.h b/src/env-inl.h index 4451080940628a..1917e70549c371 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -743,33 +743,66 @@ inline void IsolateData::set_options( options_ = std::move(options); } -void Environment::CreateImmediate(native_immediate_callback cb, - void* data, - v8::Local obj, +template +void Environment::CreateImmediate(Fn&& cb, + v8::Local keep_alive, bool ref) { - native_immediate_callbacks_.push_back({ - cb, - data, - v8::Global(isolate_, obj), - ref - }); + auto callback = std::make_unique>( + std::move(cb), + v8::Global(isolate(), keep_alive), + ref); + NativeImmediateCallback* prev_tail = native_immediate_callbacks_tail_; + + native_immediate_callbacks_tail_ = callback.get(); + if (prev_tail != nullptr) + prev_tail->set_next(std::move(callback)); + else + native_immediate_callbacks_head_ = std::move(callback); + immediate_info()->count_inc(1); } -void Environment::SetImmediate(native_immediate_callback cb, - void* data, - v8::Local obj) { - CreateImmediate(cb, data, obj, true); +template +void Environment::SetImmediate(Fn&& cb, v8::Local keep_alive) { + CreateImmediate(std::move(cb), keep_alive, true); if (immediate_info()->ref_count() == 0) ToggleImmediateRef(true); immediate_info()->ref_count_inc(1); } -void Environment::SetUnrefImmediate(native_immediate_callback cb, - void* data, - v8::Local obj) { - CreateImmediate(cb, data, obj, false); +template +void Environment::SetUnrefImmediate(Fn&& cb, v8::Local keep_alive) { + CreateImmediate(std::move(cb), keep_alive, false); +} + +Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed) + : refed_(refed) {} + +bool Environment::NativeImmediateCallback::is_refed() const { + return refed_; +} + +std::unique_ptr +Environment::NativeImmediateCallback::get_next() { + return std::move(next_); +} + +void Environment::NativeImmediateCallback::set_next( + std::unique_ptr next) { + next_ = std::move(next); +} + +template +Environment::NativeImmediateCallbackImpl::NativeImmediateCallbackImpl( + Fn&& callback, v8::Global&& keep_alive, bool refed) + : NativeImmediateCallback(refed), + callback_(std::move(callback)), + keep_alive_(std::move(keep_alive)) {} + +template +void Environment::NativeImmediateCallbackImpl::Call(Environment* env) { + callback_(env); } inline bool Environment::can_call_into_js() const { diff --git a/src/env.cc b/src/env.cc index 936b9a1854cf93..f1d027deda82d4 100644 --- a/src/env.cc +++ b/src/env.cc @@ -339,7 +339,7 @@ Environment::Environment(IsolateData* isolate_data, [](void* arg) { Environment* env = static_cast(arg); if (!env->destroy_async_id_list()->empty()) - AsyncWrap::DestroyAsyncIdsCallback(env, nullptr); + AsyncWrap::DestroyAsyncIdsCallback(env); }, this); @@ -641,42 +641,38 @@ void Environment::AtExit(void (*cb)(void* arg), void* arg) { void Environment::RunAndClearNativeImmediates() { TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment), "RunAndClearNativeImmediates", this); - size_t count = native_immediate_callbacks_.size(); - if (count > 0) { - size_t ref_count = 0; - std::vector list; - native_immediate_callbacks_.swap(list); - auto drain_list = [&]() { - TryCatchScope try_catch(this); - for (auto it = list.begin(); it != list.end(); ++it) { - DebugSealHandleScope seal_handle_scope(isolate()); - it->cb_(this, it->data_); - if (it->refed_) - ref_count++; - if (UNLIKELY(try_catch.HasCaught())) { - if (!try_catch.HasTerminated()) - errors::TriggerUncaughtException(isolate(), try_catch); - - // We are done with the current callback. Increase the counter so that - // the steps below make everything *after* the current item part of - // the new list. - it++; - - // Bail out, remove the already executed callbacks from list - // and set up a new TryCatch for the other pending callbacks. - std::move_backward(it, list.end(), list.begin() + (list.end() - it)); - list.resize(list.end() - it); - return true; - } + size_t ref_count = 0; + size_t count = 0; + std::unique_ptr head; + head.swap(native_immediate_callbacks_head_); + native_immediate_callbacks_tail_ = nullptr; + + auto drain_list = [&]() { + TryCatchScope try_catch(this); + for (; head; head = head->get_next()) { + DebugSealHandleScope seal_handle_scope(isolate()); + count++; + if (head->is_refed()) + ref_count++; + + head->Call(this); + if (UNLIKELY(try_catch.HasCaught())) { + if (!try_catch.HasTerminated()) + errors::TriggerUncaughtException(isolate(), try_catch); + + // We are done with the current callback. Move one iteration along, + // as if we had completed successfully. + head = head->get_next(); + return true; } - return false; - }; - while (drain_list()) {} + } + return false; + }; + while (head && drain_list()) {} - DCHECK_GE(immediate_info()->count(), count); - immediate_info()->count_dec(count); - immediate_info()->ref_count_dec(ref_count); - } + DCHECK_GE(immediate_info()->count(), count); + immediate_info()->count_dec(count); + immediate_info()->ref_count_dec(ref_count); } diff --git a/src/env.h b/src/env.h index 4eb6575b8ff60b..c2dcd6fb337245 100644 --- a/src/env.h +++ b/src/env.h @@ -1172,15 +1172,15 @@ class Environment : public MemoryRetainer { return current_value; } - typedef void (*native_immediate_callback)(Environment* env, void* data); - // cb will be called as cb(env, data) on the next event loop iteration. - // obj will be kept alive between now and after the callback has run. - inline void SetImmediate(native_immediate_callback cb, - void* data, - v8::Local obj = v8::Local()); - inline void SetUnrefImmediate(native_immediate_callback cb, - void* data, - v8::Local obj = + // cb will be called as cb(env) on the next event loop iteration. + // keep_alive will be kept alive between now and after the callback has run. + template + inline void SetImmediate(Fn&& cb, + v8::Local keep_alive = + v8::Local()); + template + inline void SetUnrefImmediate(Fn&& cb, + v8::Local keep_alive = v8::Local()); // This needs to be available for the JS-land setImmediate(). void ToggleImmediateRef(bool ref); @@ -1245,9 +1245,9 @@ class Environment : public MemoryRetainer { #endif // HAVE_INSPECTOR private: - inline void CreateImmediate(native_immediate_callback cb, - void* data, - v8::Local obj, + template + inline void CreateImmediate(Fn&& cb, + v8::Local keep_alive, bool ref); inline void ThrowError(v8::Local (*fun)(v8::Local), @@ -1370,13 +1370,38 @@ class Environment : public MemoryRetainer { std::list at_exit_functions_; - struct NativeImmediateCallback { - native_immediate_callback cb_; - void* data_; - v8::Global keep_alive_; + class NativeImmediateCallback { + public: + explicit inline NativeImmediateCallback(bool refed); + + virtual ~NativeImmediateCallback() = default; + virtual void Call(Environment* env) = 0; + + inline bool is_refed() const; + inline std::unique_ptr get_next(); + inline void set_next(std::unique_ptr next); + + private: bool refed_; + std::unique_ptr next_; + }; + + template + class NativeImmediateCallbackImpl final : public NativeImmediateCallback { + public: + NativeImmediateCallbackImpl(Fn&& callback, + v8::Global&& keep_alive, + bool refed); + void Call(Environment* env) override; + + private: + Fn callback_; + v8::Global keep_alive_; }; - std::vector native_immediate_callbacks_; + + std::unique_ptr native_immediate_callbacks_head_; + NativeImmediateCallback* native_immediate_callbacks_tail_ = nullptr; + void RunAndClearNativeImmediates(); static void CheckImmediate(uv_check_t* handle); diff --git a/src/node_api.cc b/src/node_api.cc index 49472930634938..95664e9c7ace27 100644 --- a/src/node_api.cc +++ b/src/node_api.cc @@ -36,27 +36,33 @@ class BufferFinalizer : private Finalizer { public: // node::Buffer::FreeCallback static void FinalizeBufferCallback(char* data, void* hint) { - BufferFinalizer* finalizer = static_cast(hint); + std::unique_ptr finalizer{ + static_cast(hint)}; finalizer->_finalize_data = data; - static_cast(finalizer->_env)->node_env() - ->SetImmediate([](node::Environment* env, void* hint) { - BufferFinalizer* finalizer = static_cast(hint); - - if (finalizer->_finalize_callback != nullptr) { - v8::HandleScope handle_scope(finalizer->_env->isolate); - v8::Context::Scope context_scope(finalizer->_env->context()); - - finalizer->_env->CallIntoModuleThrow([&](napi_env env) { - finalizer->_finalize_callback( - env, - finalizer->_finalize_data, - finalizer->_finalize_hint); - }); - } - Delete(finalizer); - }, hint); + node::Environment* node_env = + static_cast(finalizer->_env)->node_env(); + node_env->SetImmediate( + [finalizer = std::move(finalizer)](node::Environment* env) { + if (finalizer->_finalize_callback == nullptr) return; + + v8::HandleScope handle_scope(finalizer->_env->isolate); + v8::Context::Scope context_scope(finalizer->_env->context()); + + finalizer->_env->CallIntoModuleThrow([&](napi_env env) { + finalizer->_finalize_callback( + env, + finalizer->_finalize_data, + finalizer->_finalize_hint); + }); + }); } + + struct Deleter { + void operator()(BufferFinalizer* finalizer) { + Finalizer::Delete(finalizer); + } + }; }; static inline napi_env NewEnv(v8::Local context) { diff --git a/src/node_file.cc b/src/node_file.cc index e11aa9054640a9..138848c49da45e 100644 --- a/src/node_file.cc +++ b/src/node_file.cc @@ -170,35 +170,33 @@ inline void FileHandle::Close() { struct err_detail { int ret; int fd; }; - err_detail* detail = new err_detail { ret, fd_ }; + err_detail detail { ret, fd_ }; if (ret < 0) { // Do not unref this - env()->SetImmediate([](Environment* env, void* data) { + env()->SetImmediate([detail](Environment* env) { char msg[70]; - std::unique_ptr detail(static_cast(data)); snprintf(msg, arraysize(msg), "Closing file descriptor %d on garbage collection failed", - detail->fd); + detail.fd); // This exception will end up being fatal for the process because // it is being thrown from within the SetImmediate handler and // there is no JS stack to bubble it to. In other words, tearing // down the process is the only reasonable thing we can do here. HandleScope handle_scope(env->isolate()); - env->ThrowUVException(detail->ret, "close", msg); - }, detail); + env->ThrowUVException(detail.ret, "close", msg); + }); return; } // If the close was successful, we still want to emit a process warning // to notify that the file descriptor was gc'd. We want to be noisy about // this because not explicitly closing the FileHandle is a bug. - env()->SetUnrefImmediate([](Environment* env, void* data) { - std::unique_ptr detail(static_cast(data)); + env()->SetUnrefImmediate([detail](Environment* env) { ProcessEmitWarning(env, "Closing file descriptor %d on garbage collection", - detail->fd); - }, detail); + detail.fd); + }); } void FileHandle::CloseReq::Resolve() { diff --git a/src/node_http2.cc b/src/node_http2.cc index ab1c73777941a5..9d6c37373ff487 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -663,12 +663,9 @@ inline bool HasHttp2Observer(Environment* env) { void Http2Stream::EmitStatistics() { if (!HasHttp2Observer(env())) return; - Http2StreamPerformanceEntry* entry = - new Http2StreamPerformanceEntry(env(), id_, statistics_); - env()->SetImmediate([](Environment* env, void* data) { - // This takes ownership, the entry is destroyed at the end of this scope. - std::unique_ptr entry { - static_cast(data) }; + auto entry = + std::make_unique(env(), id_, statistics_); + env()->SetImmediate([entry = move(entry)](Environment* env) { if (!HasHttp2Observer(env)) return; HandleScope handle_scope(env->isolate()); @@ -696,18 +693,15 @@ void Http2Stream::EmitStatistics() { buffer[IDX_STREAM_STATS_RECEIVEDBYTES] = entry->received_bytes(); Local obj; if (entry->ToObject().ToLocal(&obj)) entry->Notify(obj); - }, static_cast(entry)); + }); } void Http2Session::EmitStatistics() { if (!HasHttp2Observer(env())) return; - Http2SessionPerformanceEntry* entry = - new Http2SessionPerformanceEntry(env(), statistics_, session_type_); - env()->SetImmediate([](Environment* env, void* data) { - // This takes ownership, the entr is destroyed at the end of this scope. - std::unique_ptr entry { - static_cast(data) }; + auto entry = std::make_unique( + env(), statistics_, session_type_); + env()->SetImmediate([entry = std::move(entry)](Environment* env) { if (!HasHttp2Observer(env)) return; HandleScope handle_scope(env->isolate()); @@ -725,7 +719,7 @@ void Http2Session::EmitStatistics() { entry->max_concurrent_streams(); Local obj; if (entry->ToObject().ToLocal(&obj)) entry->Notify(obj); - }, static_cast(entry)); + }); } // Closes the session and frees the associated resources @@ -760,11 +754,9 @@ void Http2Session::Close(uint32_t code, bool socket_closed) { while (std::unique_ptr ping = PopPing()) { ping->DetachFromSession(); env()->SetImmediate( - [](Environment* env, void* data) { - std::unique_ptr ping{static_cast(data)}; + [ping = std::move(ping)](Environment* env) { ping->Done(false); - }, - static_cast(ping.release())); + }); } statistics_.end_time = uv_hrtime(); @@ -1532,10 +1524,8 @@ void Http2Session::MaybeScheduleWrite() { HandleScope handle_scope(env()->isolate()); Debug(this, "scheduling write"); flags_ |= SESSION_STATE_WRITE_SCHEDULED; - env()->SetImmediate([](Environment* env, void* data) { - Http2Session* session = static_cast(data); - if (session->session_ == nullptr || - !(session->flags_ & SESSION_STATE_WRITE_SCHEDULED)) { + env()->SetImmediate([this](Environment* env) { + if (session_ == nullptr || !(flags_ & SESSION_STATE_WRITE_SCHEDULED)) { // This can happen e.g. when a stream was reset before this turn // of the event loop, in which case SendPendingData() is called early, // or the session was destroyed in the meantime. @@ -1545,9 +1535,9 @@ void Http2Session::MaybeScheduleWrite() { // Sending data may call arbitrary JS code, so keep track of // async context. HandleScope handle_scope(env->isolate()); - InternalCallbackScope callback_scope(session); - session->SendPendingData(); - }, static_cast(this), object()); + InternalCallbackScope callback_scope(this); + SendPendingData(); + }, object()); } } @@ -1975,25 +1965,23 @@ void Http2Stream::Destroy() { // Wait until the start of the next loop to delete because there // may still be some pending operations queued for this stream. - env()->SetImmediate([](Environment* env, void* data) { - Http2Stream* stream = static_cast(data); + env()->SetImmediate([this](Environment* env) { // Free any remaining outgoing data chunks here. This should be done // here because it's possible for destroy to have been called while // we still have queued outbound writes. - while (!stream->queue_.empty()) { - nghttp2_stream_write& head = stream->queue_.front(); + while (!queue_.empty()) { + nghttp2_stream_write& head = queue_.front(); if (head.req_wrap != nullptr) head.req_wrap->Done(UV_ECANCELED); - stream->queue_.pop(); + queue_.pop(); } // We can destroy the stream now if there are no writes for it // already on the socket. Otherwise, we'll wait for the garbage collector // to take care of cleaning up. - if (stream->session() == nullptr || - !stream->session()->HasWritesOnSocketForStream(stream)) - delete stream; - }, this, this->object()); + if (session() == nullptr || !session()->HasWritesOnSocketForStream(this)) + delete this; + }, object()); statistics_.end_time = uv_hrtime(); session_->statistics_.stream_average_duration = diff --git a/src/node_perf.cc b/src/node_perf.cc index 45adcf332a6f57..3efaca60658310 100644 --- a/src/node_perf.cc +++ b/src/node_perf.cc @@ -229,9 +229,8 @@ void SetupPerformanceObservers(const FunctionCallbackInfo& args) { } // Creates a GC Performance Entry and passes it to observers -void PerformanceGCCallback(Environment* env, void* ptr) { - std::unique_ptr entry{ - static_cast(ptr)}; +void PerformanceGCCallback(Environment* env, + std::unique_ptr entry) { HandleScope scope(env->isolate()); Local context = env->context(); @@ -268,13 +267,14 @@ void MarkGarbageCollectionEnd(Isolate* isolate, // If no one is listening to gc performance entries, do not create them. if (!state->observers[NODE_PERFORMANCE_ENTRY_TYPE_GC]) return; - GCPerformanceEntry* entry = - new GCPerformanceEntry(env, - static_cast(type), - state->performance_last_gc_start_mark, - PERFORMANCE_NOW()); - env->SetUnrefImmediate(PerformanceGCCallback, - entry); + auto entry = std::make_unique( + env, + static_cast(type), + state->performance_last_gc_start_mark, + PERFORMANCE_NOW()); + env->SetUnrefImmediate([entry = std::move(entry)](Environment* env) mutable { + PerformanceGCCallback(env, std::move(entry)); + }); } static void SetupGarbageCollectionTracking( diff --git a/src/stream_pipe.cc b/src/stream_pipe.cc index a6b975ab26a3dd..be7fc882ea5c8a 100644 --- a/src/stream_pipe.cc +++ b/src/stream_pipe.cc @@ -71,18 +71,16 @@ void StreamPipe::Unpipe() { // Delay the JS-facing part with SetImmediate, because this might be from // inside the garbage collector, so we can’t run JS here. HandleScope handle_scope(env()->isolate()); - env()->SetImmediate([](Environment* env, void* data) { - StreamPipe* pipe = static_cast(data); - + env()->SetImmediate([this](Environment* env) { HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); - Local object = pipe->object(); + Local object = this->object(); Local onunpipe; if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe)) return; if (onunpipe->IsFunction() && - pipe->MakeCallback(onunpipe.As(), 0, nullptr).IsEmpty()) { + MakeCallback(onunpipe.As(), 0, nullptr).IsEmpty()) { return; } @@ -107,7 +105,7 @@ void StreamPipe::Unpipe() { .IsNothing()) { return; } - }, static_cast(this), object()); + }, object()); } uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) { diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index 8c7d56e31b8a00..2d36c1a2654aa6 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -316,9 +316,9 @@ void TLSWrap::EncOut() { // its not clear if it is always correct. Not calling Done() could block // data flow, so for now continue to call Done(), just do it in the next // tick. - env()->SetImmediate([](Environment* env, void* data) { - static_cast(data)->InvokeQueued(0); - }, this, object()); + env()->SetImmediate([this](Environment* env) { + InvokeQueued(0); + }, object()); } } return; @@ -349,9 +349,9 @@ void TLSWrap::EncOut() { HandleScope handle_scope(env()->isolate()); // Simulate asynchronous finishing, TLS cannot handle this at the moment. - env()->SetImmediate([](Environment* env, void* data) { - static_cast(data)->OnStreamAfterWrite(nullptr, 0); - }, this, object()); + env()->SetImmediate([this](Environment* env) { + OnStreamAfterWrite(nullptr, 0); + }, object()); } } @@ -718,10 +718,9 @@ int TLSWrap::DoWrite(WriteWrap* w, StreamWriteResult res = underlying_stream()->Write(bufs, count, send_handle); if (!res.async) { - env()->SetImmediate([](Environment* env, void* data) { - TLSWrap* self = static_cast(data); - self->OnStreamAfterWrite(self->current_empty_write_, 0); - }, this, object()); + env()->SetImmediate([this](Environment* env) { + OnStreamAfterWrite(current_empty_write_, 0); + }, object()); } return 0; }