Skip to content

Commit

Permalink
src: refactor thread stopping mechanism
Browse files Browse the repository at this point in the history
- Follow style guide for naming, e.g. use lower_snake_case
  for simple setters/getters.
- For performance, use atomics instead of a mutex, and inline
  the corresponding getter/setter pair.

PR-URL: nodejs#26757
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com>
Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com>
Reviewed-By: Michael Dawson <michael_dawson@ca.ibm.com>
  • Loading branch information
addaleax authored and targos committed Mar 27, 2019
1 parent 8dd5f6e commit ae0acbf
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 33 deletions.
10 changes: 9 additions & 1 deletion src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ inline void Environment::remove_sub_worker_context(worker::Worker* context) {
}

inline bool Environment::is_stopping() const {
return thread_stopper_.IsStopped();
return thread_stopper_.is_stopped();
}

inline performance::performance_state* Environment::performance_state() {
Expand Down Expand Up @@ -979,6 +979,14 @@ void Environment::ForEachBaseObject(T&& iterator) {
}
}

bool AsyncRequest::is_stopped() const {
return stopped_.load();
}

void AsyncRequest::set_stopped(bool flag) {
stopped_.store(flag);
}

#define VP(PropertyName, StringValue) V(v8::Private, PropertyName)
#define VY(PropertyName, StringValue) V(v8::Symbol, PropertyName)
#define VS(PropertyName, StringValue) V(v8::String, PropertyName)
Expand Down
29 changes: 7 additions & 22 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -350,13 +350,13 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_prepare_handle_));
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_check_handle_));

GetAsyncRequest()->Install(
thread_stopper()->Install(
this, static_cast<void*>(this), [](uv_async_t* handle) {
Environment* env = static_cast<Environment*>(handle->data);
uv_stop(env->event_loop());
});
GetAsyncRequest()->SetStopped(false);
uv_unref(reinterpret_cast<uv_handle_t*>(GetAsyncRequest()->GetHandle()));
thread_stopper()->set_stopped(false);
uv_unref(reinterpret_cast<uv_handle_t*>(thread_stopper()->GetHandle()));

// Register clean-up cb to be called to clean up the handles
// when the environment is freed, note that they are not cleaned in
Expand All @@ -375,7 +375,7 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {

void Environment::ExitEnv() {
set_can_call_into_js(false);
GetAsyncRequest()->Stop();
thread_stopper()->Stop();
isolate_->TerminateExecution();
}

Expand Down Expand Up @@ -543,7 +543,7 @@ void Environment::RunCleanup() {
started_cleanup_ = true;
TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment),
"RunCleanup", this);
GetAsyncRequest()->Uninstall();
thread_stopper()->Uninstall();
CleanupHandles();

while (!cleanup_hooks_.empty()) {
Expand Down Expand Up @@ -977,49 +977,34 @@ char* Environment::Reallocate(char* data, size_t old_size, size_t size) {
}

void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) {
Mutex::ScopedLock lock(mutex_);
CHECK_NULL(async_);
env_ = env;
async_ = new uv_async_t;
async_->data = data;
CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0);
}

void AsyncRequest::Uninstall() {
Mutex::ScopedLock lock(mutex_);
if (async_ != nullptr) {
env_->CloseHandle(async_, [](uv_async_t* async) { delete async; });
async_ = nullptr;
}
}

void AsyncRequest::Stop() {
Mutex::ScopedLock lock(mutex_);
stop_ = true;
set_stopped(true);
if (async_ != nullptr) uv_async_send(async_);
}

void AsyncRequest::SetStopped(bool flag) {
Mutex::ScopedLock lock(mutex_);
stop_ = flag;
}

bool AsyncRequest::IsStopped() const {
Mutex::ScopedLock lock(mutex_);
return stop_;
}

uv_async_t* AsyncRequest::GetHandle() {
Mutex::ScopedLock lock(mutex_);
return async_;
}

void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
Mutex::ScopedLock lock(mutex_);
if (async_ != nullptr) tracker->TrackField("async_request", *async_);
}

AsyncRequest::~AsyncRequest() {
Mutex::ScopedLock lock(mutex_);
CHECK_NULL(async_);
}

Expand Down
12 changes: 7 additions & 5 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "uv.h"
#include "v8.h"

#include <atomic>
#include <cstdint>
#include <functional>
#include <list>
Expand Down Expand Up @@ -515,18 +516,19 @@ class AsyncRequest : public MemoryRetainer {
void Install(Environment* env, void* data, uv_async_cb target);
void Uninstall();
void Stop();
void SetStopped(bool flag);
bool IsStopped() const;
inline void set_stopped(bool flag);
inline bool is_stopped() const;
uv_async_t* GetHandle();
void MemoryInfo(MemoryTracker* tracker) const override;


SET_MEMORY_INFO_NAME(AsyncRequest)
SET_SELF_SIZE(AsyncRequest)

private:
Environment* env_;
uv_async_t* async_ = nullptr;
mutable Mutex mutex_;
bool stop_ = true;
std::atomic_bool stopped_ {true};
};

class Environment {
Expand Down Expand Up @@ -1051,7 +1053,7 @@ class Environment {
inline ExecutionMode execution_mode() { return execution_mode_; }

inline void set_execution_mode(ExecutionMode mode) { execution_mode_ = mode; }
inline AsyncRequest* GetAsyncRequest() { return &thread_stopper_; }
inline AsyncRequest* thread_stopper() { return &thread_stopper_; }

private:
inline void CreateImmediate(native_immediate_callback cb,
Expand Down
4 changes: 2 additions & 2 deletions src/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -824,14 +824,14 @@ inline int StartNodeWithIsolate(Isolate* isolate,
per_process::v8_platform.DrainVMTasks(isolate);

more = uv_loop_alive(env.event_loop());
if (more && !env.GetAsyncRequest()->IsStopped()) continue;
if (more && !env.is_stopping()) continue;

RunBeforeExit(&env);

// Emit `beforeExit` if the loop became alive either after emitting
// event, or after running some callbacks.
more = uv_loop_alive(env.event_loop());
} while (more == true && !env.GetAsyncRequest()->IsStopped());
} while (more == true && !env.is_stopping());
env.performance_state()->Mark(
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
}
Expand Down
7 changes: 4 additions & 3 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ Worker::Worker(Environment* env,
bool Worker::is_stopped() const {
Mutex::ScopedLock lock(mutex_);
if (env_ != nullptr)
return env_->GetAsyncRequest()->IsStopped();
return env_->is_stopping();
return stopped_;
}

Expand Down Expand Up @@ -217,7 +217,7 @@ void Worker::Run() {
stopped_ = true;
this->env_ = nullptr;
}
env_->GetAsyncRequest()->SetStopped(true);
env_->thread_stopper()->set_stopped(true);
env_->stop_sub_worker_contexts();
env_->RunCleanup();
RunAtExit(env_.get());
Expand Down Expand Up @@ -376,7 +376,8 @@ void Worker::OnThreadStopped() {
Worker::~Worker() {
Mutex::ScopedLock lock(mutex_);

CHECK(stopped_ || env_ == nullptr || env_->GetAsyncRequest()->IsStopped());
CHECK(stopped_);
CHECK_NULL(env_);
CHECK(thread_joined_);

Debug(this, "Worker %llu destroyed", thread_id_);
Expand Down

0 comments on commit ae0acbf

Please sign in to comment.