From d14cba401a425dc184f929b38442b1d996cdd5f6 Mon Sep 17 00:00:00 2001 From: Gireesh Punathil Date: Sun, 17 Feb 2019 05:24:15 -0500 Subject: [PATCH] worker: refactor thread life cycle management The current mechanism of uses two async handles, one owned by the creator of the worker thread to terminate a running worker, and another one employed by the worker to interrupt its creator on its natural termination. The force termination piggybacks on the message- passing mechanism to inform the worker to quiesce. Also there are few flags that represent the other thread's state / request state because certain code path is shared by multiple control flows, and there are certain code path where the async handles may not have come to life. Refactor into an AsyncRequest abstraction that exposes routines to install a handle as well as to save a state. PR-URL: https://github.com/nodejs/node/pull/26099 Refs: https://github.com/nodejs/node/pull/21283 Reviewed-By: Anna Henningsen --- src/node_messaging.cc | 16 ---- src/node_messaging.h | 4 - src/node_worker.cc | 137 +++++++++++++++------------- src/node_worker.h | 42 ++++++--- test/pummel/test-heapdump-worker.js | 4 +- 5 files changed, 104 insertions(+), 99 deletions(-) diff --git a/src/node_messaging.cc b/src/node_messaging.cc index 34977557c5bfb8..7ca3ad14d03406 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -584,13 +584,6 @@ void MessagePort::OnMessage() { // Get the head of the message queue. Mutex::ScopedLock lock(data_->mutex_); - if (stop_event_loop_) { - Debug(this, "MessagePort stops loop as requested"); - CHECK(!data_->receiving_messages_); - uv_stop(env()->event_loop()); - break; - } - Debug(this, "MessagePort has message, receiving = %d", static_cast(data_->receiving_messages_)); @@ -740,15 +733,6 @@ void MessagePort::Stop() { data_->receiving_messages_ = false; } -void MessagePort::StopEventLoop() { - Mutex::ScopedLock lock(data_->mutex_); - data_->receiving_messages_ = false; - stop_event_loop_ = true; - - Debug(this, "Received StopEventLoop request"); - TriggerAsync(); -} - void MessagePort::Start(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); MessagePort* port; diff --git a/src/node_messaging.h b/src/node_messaging.h index cfda69ae7ff16b..9055a8bf961844 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -159,9 +159,6 @@ class MessagePort : public HandleWrap { void Start(); // Stop processing messages on this port as a receiving end. void Stop(); - // Stop processing messages on this port as a receiving end, - // and stop the event loop that this port is associated with. - void StopEventLoop(); static void New(const v8::FunctionCallbackInfo& args); static void PostMessage(const v8::FunctionCallbackInfo& args); @@ -206,7 +203,6 @@ class MessagePort : public HandleWrap { inline uv_async_t* async(); std::unique_ptr data_ = nullptr; - bool stop_event_loop_ = false; friend class MessagePortData; }; diff --git a/src/node_worker.cc b/src/node_worker.cc index 2c8222f7f5229c..0f1535074cca6c 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -58,6 +58,46 @@ void WaitForWorkerInspectorToStop(Environment* child) { } // anonymous namespace +void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) { + Mutex::ScopedLock lock(mutex_); + env_ = env; + async_ = new uv_async_t; + if (data != nullptr) async_->data = data; + CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0); +} + +void AsyncRequest::Uninstall() { + Mutex::ScopedLock lock(mutex_); + if (async_ != nullptr) + env_->CloseHandle(async_, [](uv_async_t* async) { delete async; }); +} + +void AsyncRequest::Stop() { + Mutex::ScopedLock lock(mutex_); + stop_ = true; + if (async_ != nullptr) uv_async_send(async_); +} + +void AsyncRequest::SetStopped(bool flag) { + Mutex::ScopedLock lock(mutex_); + stop_ = flag; +} + +bool AsyncRequest::IsStopped() const { + Mutex::ScopedLock lock(mutex_); + return stop_; +} + +uv_async_t* AsyncRequest::GetHandle() { + Mutex::ScopedLock lock(mutex_); + return async_; +} + +void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const { + Mutex::ScopedLock lock(mutex_); + if (async_ != nullptr) tracker->TrackField("async_request", *async_); +} + Worker::Worker(Environment* env, Local wrap, const std::string& url, @@ -98,8 +138,7 @@ Worker::Worker(Environment* env, } bool Worker::is_stopped() const { - Mutex::ScopedLock stopped_lock(stopped_mutex_); - return stopped_; + return thread_stopper_.IsStopped(); } // This class contains data that is only relevant to the child thread itself, @@ -207,6 +246,8 @@ void Worker::Run() { Context::Scope context_scope(env_->context()); if (child_port != nullptr) child_port->Close(); + thread_stopper_.Uninstall(); + thread_stopper_.SetStopped(true); env_->stop_sub_worker_contexts(); env_->RunCleanup(); RunAtExit(env_.get()); @@ -215,11 +256,6 @@ void Worker::Run() { WaitForWorkerInspectorToStop(env_.get()); #endif - { - Mutex::ScopedLock stopped_lock(stopped_mutex_); - stopped_ = true; - } - // This call needs to be made while the `Environment` is still alive // because we assume that it is available for async tracking in the // NodePlatform implementation. @@ -227,11 +263,12 @@ void Worker::Run() { } }); + if (thread_stopper_.IsStopped()) return; { HandleScope handle_scope(isolate_); Local context = NewContext(isolate_); - if (is_stopped()) return; + if (thread_stopper_.IsStopped()) return; CHECK(!context.IsEmpty()); Context::Scope context_scope(context); { @@ -253,6 +290,14 @@ void Worker::Run() { Debug(this, "Created Environment for worker with id %llu", thread_id_); if (is_stopped()) return; + thread_stopper_.Install(env_.get(), env_.get(), [](uv_async_t* handle) { + Environment* env_ = static_cast(handle->data); + uv_stop(env_->event_loop()); + }); + uv_unref(reinterpret_cast(thread_stopper_.GetHandle())); + + Debug(this, "Created Environment for worker with id %llu", thread_id_); + if (thread_stopper_.IsStopped()) return; { HandleScope handle_scope(isolate_); Mutex::ScopedLock lock(mutex_); @@ -268,7 +313,7 @@ void Worker::Run() { Debug(this, "Created message port for worker %llu", thread_id_); } - if (is_stopped()) return; + if (thread_stopper_.IsStopped()) return; { #if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR StartWorkerInspector(env_.get(), @@ -289,22 +334,21 @@ void Worker::Run() { Debug(this, "Loaded environment for worker %llu", thread_id_); } - if (is_stopped()) return; + if (thread_stopper_.IsStopped()) return; { SealHandleScope seal(isolate_); bool more; env_->performance_state()->Mark( node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START); do { - if (is_stopped()) break; + if (thread_stopper_.IsStopped()) break; uv_run(&data.loop_, UV_RUN_DEFAULT); - if (is_stopped()) break; + if (thread_stopper_.IsStopped()) break; platform_->DrainTasks(isolate_); more = uv_loop_alive(&data.loop_); - if (more && !is_stopped()) - continue; + if (more && !thread_stopper_.IsStopped()) continue; EmitBeforeExit(env_.get()); @@ -319,7 +363,7 @@ void Worker::Run() { { int exit_code; - bool stopped = is_stopped(); + bool stopped = thread_stopper_.IsStopped(); if (!stopped) exit_code = EmitExit(env_.get()); Mutex::ScopedLock lock(mutex_); @@ -341,34 +385,11 @@ void Worker::JoinThread() { thread_joined_ = true; env()->remove_sub_worker_context(this); - - if (thread_exit_async_) { - env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) { - delete async; - }); - - if (scheduled_on_thread_stopped_) - OnThreadStopped(); - } + OnThreadStopped(); + on_thread_finished_.Uninstall(); } void Worker::OnThreadStopped() { - { - Mutex::ScopedLock lock(mutex_); - scheduled_on_thread_stopped_ = false; - - Debug(this, "Worker %llu thread stopped", thread_id_); - - { - Mutex::ScopedLock stopped_lock(stopped_mutex_); - CHECK(stopped_); - } - - parent_port_ = nullptr; - } - - JoinThread(); - { HandleScope handle_scope(env()->isolate()); Context::Scope context_scope(env()->context()); @@ -391,7 +412,7 @@ Worker::~Worker() { Mutex::ScopedLock lock(mutex_); JoinThread(); - CHECK(stopped_); + CHECK(thread_stopper_.IsStopped()); CHECK(thread_joined_); // This has most likely already happened within the worker thread -- this @@ -480,16 +501,15 @@ void Worker::StartThread(const FunctionCallbackInfo& args) { Mutex::ScopedLock lock(w->mutex_); w->env()->add_sub_worker_context(w); - w->stopped_ = false; w->thread_joined_ = false; + w->thread_stopper_.SetStopped(false); - w->thread_exit_async_.reset(new uv_async_t); - w->thread_exit_async_->data = w; - CHECK_EQ(uv_async_init(w->env()->event_loop(), - w->thread_exit_async_.get(), - [](uv_async_t* handle) { - static_cast(handle->data)->OnThreadStopped(); - }), 0); + w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) { + Worker* w_ = static_cast(handle->data); + CHECK(w_->thread_stopper_.IsStopped()); + w_->parent_port_ = nullptr; + w_->JoinThread(); + }); uv_thread_options_t thread_options; thread_options.flags = UV_THREAD_HAS_STACK_SIZE; @@ -505,9 +525,7 @@ void Worker::StartThread(const FunctionCallbackInfo& args) { w->Run(); Mutex::ScopedLock lock(w->mutex_); - CHECK(w->thread_exit_async_); - w->scheduled_on_thread_stopped_ = true; - uv_async_send(w->thread_exit_async_.get()); + w->on_thread_finished_.Stop(); }, static_cast(w)), 0); } @@ -523,28 +541,23 @@ void Worker::StopThread(const FunctionCallbackInfo& args) { void Worker::Ref(const FunctionCallbackInfo& args) { Worker* w; ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); - if (w->thread_exit_async_) - uv_ref(reinterpret_cast(w->thread_exit_async_.get())); + uv_ref(reinterpret_cast(w->on_thread_finished_.GetHandle())); } void Worker::Unref(const FunctionCallbackInfo& args) { Worker* w; ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); - if (w->thread_exit_async_) - uv_unref(reinterpret_cast(w->thread_exit_async_.get())); + uv_unref(reinterpret_cast(w->on_thread_finished_.GetHandle())); } void Worker::Exit(int code) { Mutex::ScopedLock lock(mutex_); - Mutex::ScopedLock stopped_lock(stopped_mutex_); Debug(this, "Worker %llu called Exit(%d)", thread_id_, code); - - if (!stopped_) { - stopped_ = true; + if (!thread_stopper_.IsStopped()) { exit_code_ = code; - if (child_port_ != nullptr) - child_port_->StopEventLoop(); + Debug(this, "Received StopEventLoop request"); + thread_stopper_.Stop(); if (isolate_ != nullptr) isolate_->TerminateExecution(); } diff --git a/src/node_worker.h b/src/node_worker.h index dad0713fd92df2..fb94bdc307e8b6 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -3,14 +3,35 @@ #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS -#include "node_messaging.h" #include +#include "node_messaging.h" +#include "uv.h" namespace node { namespace worker { class WorkerThreadData; +class AsyncRequest : public MemoryRetainer { + public: + AsyncRequest() {} + void Install(Environment* env, void* data, uv_async_cb target); + void Uninstall(); + void Stop(); + void SetStopped(bool flag); + bool IsStopped() const; + uv_async_t* GetHandle(); + void MemoryInfo(MemoryTracker* tracker) const override; + SET_MEMORY_INFO_NAME(AsyncRequest) + SET_SELF_SIZE(AsyncRequest) + + private: + Environment* env_; + uv_async_t* async_ = nullptr; + mutable Mutex mutex_; + bool stop_ = true; +}; + // A worker thread, as represented in its parent thread. class Worker : public AsyncWrap { public: @@ -31,11 +52,9 @@ class Worker : public AsyncWrap { void JoinThread(); void MemoryInfo(MemoryTracker* tracker) const override { - tracker->TrackFieldWithSize( - "isolate_data", sizeof(IsolateData), "IsolateData"); - tracker->TrackFieldWithSize("env", sizeof(Environment), "Environment"); - tracker->TrackField("thread_exit_async", *thread_exit_async_); tracker->TrackField("parent_port", parent_port_); + tracker->TrackInlineField(&thread_stopper_, "thread_stopper_"); + tracker->TrackInlineField(&on_thread_finished_, "on_thread_finished_"); } SET_MEMORY_INFO_NAME(Worker) @@ -67,16 +86,6 @@ class Worker : public AsyncWrap { // This mutex protects access to all variables listed below it. mutable Mutex mutex_; - // Currently only used for telling the parent thread that the child - // thread exited. - std::unique_ptr thread_exit_async_; - bool scheduled_on_thread_stopped_ = false; - - // This mutex only protects stopped_. If both locks are acquired, this needs - // to be the latter one. - mutable Mutex stopped_mutex_; - bool stopped_ = true; - bool thread_joined_ = true; int exit_code_ = 0; uint64_t thread_id_ = -1; @@ -96,6 +105,9 @@ class Worker : public AsyncWrap { // instance refers to it via its [kPort] property. MessagePort* parent_port_ = nullptr; + AsyncRequest thread_stopper_; + AsyncRequest on_thread_finished_; + friend class WorkerThreadData; }; diff --git a/test/pummel/test-heapdump-worker.js b/test/pummel/test-heapdump-worker.js index 06679964a23a1c..2a3c93a7ad3e68 100644 --- a/test/pummel/test-heapdump-worker.js +++ b/test/pummel/test-heapdump-worker.js @@ -9,8 +9,8 @@ const worker = new Worker('setInterval(() => {}, 100);', { eval: true }); validateSnapshotNodes('Node / Worker', [ { children: [ - { node_name: 'Node / uv_async_t', edge_name: 'thread_exit_async' }, - { node_name: 'Node / Environment', edge_name: 'env' }, + { node_name: 'Node / AsyncRequest', edge_name: 'thread_stopper_' }, + { node_name: 'Node / AsyncRequest', edge_name: 'on_thread_finished_' }, { node_name: 'Node / MessagePort', edge_name: 'parent_port' }, { node_name: 'Worker', edge_name: 'wrapped' } ]