Skip to content

Commit

Permalink
src: use RAII for mutexes and condition variables
Browse files Browse the repository at this point in the history
We will be introducing many more critical sections in the upcoming
multi-isolate changes, so let's make manual synchronization a thing
of the past.

PR-URL: nodejs#7334
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Trevor Norris <trev.norris@gmail.com>
  • Loading branch information
bnoordhuis committed Jun 21, 2016
1 parent bb33c28 commit d7087df
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 65 deletions.
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@
'src/node_http_parser.h',
'src/node_internals.h',
'src/node_javascript.h',
'src/node_mutex.h',
'src/node_root_certs.h',
'src/node_version.h',
'src/node_watchdog.h',
Expand Down
15 changes: 3 additions & 12 deletions src/debug-agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,14 @@ Agent::Agent(Environment* env) : state_(kNone),
parent_env_(env),
child_env_(nullptr),
dispatch_handler_(nullptr) {
int err;

err = uv_sem_init(&start_sem_, 0);
CHECK_EQ(err, 0);

err = uv_mutex_init(&message_mutex_);
CHECK_EQ(err, 0);
CHECK_EQ(0, uv_sem_init(&start_sem_, 0));
}


Agent::~Agent() {
Stop();

uv_sem_destroy(&start_sem_);
uv_mutex_destroy(&message_mutex_);

while (AgentMessage* msg = messages_.PopFront())
delete msg;
Expand Down Expand Up @@ -270,7 +263,7 @@ void Agent::ChildSignalCb(uv_async_t* signal) {
HandleScope scope(isolate);
Local<Object> api = PersistentToLocal(isolate, a->api_);

uv_mutex_lock(&a->message_mutex_);
Mutex::ScopedLock scoped_lock(a->message_mutex_);
while (AgentMessage* msg = a->messages_.PopFront()) {
// Time to close everything
if (msg->data() == nullptr) {
Expand Down Expand Up @@ -301,14 +294,12 @@ void Agent::ChildSignalCb(uv_async_t* signal) {
argv);
delete msg;
}
uv_mutex_unlock(&a->message_mutex_);
}


void Agent::EnqueueMessage(AgentMessage* message) {
uv_mutex_lock(&message_mutex_);
Mutex::ScopedLock scoped_lock(message_mutex_);
messages_.PushBack(message);
uv_mutex_unlock(&message_mutex_);
uv_async_send(&child_signal_);
}

Expand Down
3 changes: 2 additions & 1 deletion src/debug-agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include "node_mutex.h"
#include "util.h"
#include "util-inl.h"
#include "uv.h"
Expand Down Expand Up @@ -117,7 +118,7 @@ class Agent {
bool wait_;

uv_sem_t start_sem_;
uv_mutex_t message_mutex_;
node::Mutex message_mutex_;
uv_async_t child_signal_;

uv_thread_t thread_;
Expand Down
40 changes: 15 additions & 25 deletions src/inspector_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "env.h"
#include "env-inl.h"
#include "node.h"
#include "node_mutex.h"
#include "node_version.h"
#include "v8-platform.h"
#include "util.h"
Expand Down Expand Up @@ -189,9 +190,9 @@ class AgentImpl {
void Write(const std::string& message);

uv_sem_t start_sem_;
uv_cond_t pause_cond_;
uv_mutex_t queue_lock_;
uv_mutex_t pause_lock_;
ConditionVariable pause_cond_;
Mutex pause_lock_;
Mutex queue_lock_;
uv_thread_t thread_;
uv_loop_t child_loop_;

Expand Down Expand Up @@ -290,9 +291,10 @@ class V8NodeInspector : public blink::V8Inspector {
terminated_ = false;
running_nested_loop_ = true;
do {
uv_mutex_lock(&agent_->pause_lock_);
uv_cond_wait(&agent_->pause_cond_, &agent_->pause_lock_);
uv_mutex_unlock(&agent_->pause_lock_);
{
Mutex::ScopedLock scoped_lock(agent_->pause_lock_);
agent_->pause_cond_.Wait(scoped_lock);
}
while (v8::platform::PumpMessageLoop(platform_, isolate_))
{}
} while (!terminated_);
Expand Down Expand Up @@ -321,19 +323,14 @@ AgentImpl::AgentImpl(Environment* env) : port_(0),
inspector_(nullptr),
platform_(nullptr),
dispatching_messages_(false) {
int err;
err = uv_sem_init(&start_sem_, 0);
CHECK_EQ(err, 0);
CHECK_EQ(0, uv_sem_init(&start_sem_, 0));
memset(&data_written_, 0, sizeof(data_written_));
memset(&io_thread_req_, 0, sizeof(io_thread_req_));
}

AgentImpl::~AgentImpl() {
if (!inspector_)
return;
uv_mutex_destroy(&queue_lock_);
uv_mutex_destroy(&pause_lock_);
uv_cond_destroy(&pause_cond_);
uv_close(reinterpret_cast<uv_handle_t*>(&data_written_), nullptr);
}

Expand All @@ -349,12 +346,6 @@ void AgentImpl::Start(v8::Platform* platform, int port, bool wait) {
CHECK_EQ(err, 0);
err = uv_async_init(env->event_loop(), &data_written_, nullptr);
CHECK_EQ(err, 0);
err = uv_mutex_init(&queue_lock_);
CHECK_EQ(err, 0);
err = uv_mutex_init(&pause_lock_);
CHECK_EQ(err, 0);
err = uv_cond_init(&pause_cond_);
CHECK_EQ(err, 0);

uv_unref(reinterpret_cast<uv_handle_t*>(&data_written_));

Expand Down Expand Up @@ -441,6 +432,7 @@ void AgentImpl::OnRemoteDataIO(uv_stream_t* stream,
const uv_buf_t* b) {
inspector_socket_t* socket = static_cast<inspector_socket_t*>(stream->data);
AgentImpl* agent = static_cast<AgentImpl*>(socket->data);
Mutex::ScopedLock scoped_lock(agent->pause_lock_);
if (read > 0) {
std::string str(b->base, read);
agent->PushPendingMessage(&agent->message_queue_, str);
Expand Down Expand Up @@ -470,21 +462,19 @@ void AgentImpl::OnRemoteDataIO(uv_stream_t* stream,
}
DisconnectAndDisposeIO(socket);
}
uv_cond_broadcast(&agent->pause_cond_);
agent->pause_cond_.Broadcast(scoped_lock);
}

void AgentImpl::PushPendingMessage(std::vector<std::string>* queue,
const std::string& message) {
uv_mutex_lock(&queue_lock_);
const std::string& message) {
Mutex::ScopedLock scoped_lock(queue_lock_);
queue->push_back(message);
uv_mutex_unlock(&queue_lock_);
}

void AgentImpl::SwapBehindLock(std::vector<std::string> AgentImpl::*queue,
std::vector<std::string>* output) {
uv_mutex_lock(&queue_lock_);
std::vector<std::string>* output) {
Mutex::ScopedLock scoped_lock(queue_lock_);
(this->*queue).swap(*output);
uv_mutex_unlock(&queue_lock_);
}

// static
Expand Down
30 changes: 14 additions & 16 deletions src/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ static double prog_start_time;
static bool debugger_running;
static uv_async_t dispatch_debug_messages_async;

static uv_mutex_t node_isolate_mutex;
static Mutex node_isolate_mutex;
static v8::Isolate* node_isolate;
static v8::Platform* default_platform;

Expand Down Expand Up @@ -3698,18 +3698,17 @@ static void EnableDebug(Environment* env) {

// Called from an arbitrary thread.
static void TryStartDebugger() {
uv_mutex_lock(&node_isolate_mutex);
Mutex::ScopedLock scoped_lock(node_isolate_mutex);
if (auto isolate = node_isolate) {
v8::Debug::DebugBreak(isolate);
uv_async_send(&dispatch_debug_messages_async);
}
uv_mutex_unlock(&node_isolate_mutex);
}


// Called from the main thread.
static void DispatchDebugMessagesAsyncCallback(uv_async_t* handle) {
uv_mutex_lock(&node_isolate_mutex);
Mutex::ScopedLock scoped_lock(node_isolate_mutex);
if (auto isolate = node_isolate) {
if (debugger_running == false) {
fprintf(stderr, "Starting debugger agent.\n");
Expand All @@ -3725,7 +3724,6 @@ static void DispatchDebugMessagesAsyncCallback(uv_async_t* handle) {
Isolate::Scope isolate_scope(isolate);
v8::Debug::ProcessDebugMessages(isolate);
}
uv_mutex_unlock(&node_isolate_mutex);
}


Expand Down Expand Up @@ -4059,8 +4057,6 @@ void Init(int* argc,
// Make inherited handles noninheritable.
uv_disable_stdio_inheritance();

CHECK_EQ(0, uv_mutex_init(&node_isolate_mutex));

// init async debug messages dispatching
// Main thread uses uv_default_loop
uv_async_init(uv_default_loop(),
Expand Down Expand Up @@ -4254,12 +4250,13 @@ static void StartNodeInstance(void* arg) {
#endif
Isolate* isolate = Isolate::New(params);

uv_mutex_lock(&node_isolate_mutex);
if (instance_data->is_main()) {
CHECK_EQ(node_isolate, nullptr);
node_isolate = isolate;
{
Mutex::ScopedLock scoped_lock(node_isolate_mutex);
if (instance_data->is_main()) {
CHECK_EQ(node_isolate, nullptr);
node_isolate = isolate;
}
}
uv_mutex_unlock(&node_isolate_mutex);

if (track_heap_objects) {
isolate->GetHeapProfiler()->StartTrackingHeapObjects(true);
Expand Down Expand Up @@ -4331,10 +4328,11 @@ static void StartNodeInstance(void* arg) {
#endif
}

uv_mutex_lock(&node_isolate_mutex);
if (node_isolate == isolate)
node_isolate = nullptr;
uv_mutex_unlock(&node_isolate_mutex);
{
Mutex::ScopedLock scoped_lock(node_isolate_mutex);
if (node_isolate == isolate)
node_isolate = nullptr;
}

CHECK_NE(isolate, nullptr);
isolate->Dispose();
Expand Down
16 changes: 5 additions & 11 deletions src/node_crypto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ static X509_NAME *cnnic_ev_name =
d2i_X509_NAME(nullptr, &cnnic_ev_p,
sizeof(CNNIC_EV_ROOT_CA_SUBJECT_DATA)-1);

static uv_mutex_t* locks;
static Mutex* mutexes;

const char* const root_certs[] = {
#include "node_root_certs.h" // NOLINT(build/include_order)
Expand Down Expand Up @@ -182,25 +182,19 @@ static void crypto_threadid_cb(CRYPTO_THREADID* tid) {


static void crypto_lock_init(void) {
int i, n;

n = CRYPTO_num_locks();
locks = new uv_mutex_t[n];

for (i = 0; i < n; i++)
if (uv_mutex_init(locks + i))
ABORT();
mutexes = new Mutex[CRYPTO_num_locks()];
}


static void crypto_lock_cb(int mode, int n, const char* file, int line) {
CHECK(!(mode & CRYPTO_LOCK) ^ !(mode & CRYPTO_UNLOCK));
CHECK(!(mode & CRYPTO_READ) ^ !(mode & CRYPTO_WRITE));

auto mutex = &mutexes[n];
if (mode & CRYPTO_LOCK)
uv_mutex_lock(locks + n);
mutex->Lock();
else
uv_mutex_unlock(locks + n);
mutex->Unlock();
}


Expand Down
Loading

0 comments on commit d7087df

Please sign in to comment.