diff --git a/node.gyp b/node.gyp index 5ef250d5dd7023..08b792ef309c2a 100644 --- a/node.gyp +++ b/node.gyp @@ -190,6 +190,7 @@ 'src/node_http_parser.cc', 'src/node_main.cc', 'src/node_os.cc', + 'src/node_platform.cc', 'src/node_revert.cc', 'src/node_serdes.cc', 'src/node_url.cc', @@ -238,6 +239,7 @@ 'src/node_internals.h', 'src/node_javascript.h', 'src/node_mutex.h', + 'src/node_platform.h', 'src/node_root_certs.h', 'src/node_version.h', 'src/node_watchdog.h', @@ -656,6 +658,8 @@ 'defines': [ 'NODE_WANT_INTERNALS=1' ], 'sources': [ + 'src/node_platform.cc', + 'src/node_platform.h', 'test/cctest/test_base64.cc', 'test/cctest/test_environment.cc', 'test/cctest/test_util.cc', diff --git a/src/inspector_agent.cc b/src/inspector_agent.cc index 2520fbdd533bdc..828006ecf2fbb4 100644 --- a/src/inspector_agent.cc +++ b/src/inspector_agent.cc @@ -502,11 +502,9 @@ class InspectorTimerHandle { class NodeInspectorClient : public V8InspectorClient { public: - NodeInspectorClient(node::Environment* env, - v8::Platform* platform) : env_(env), - platform_(platform), - terminated_(false), - running_nested_loop_(false) { + NodeInspectorClient(node::Environment* env, node::NodePlatform* platform) + : env_(env), platform_(platform), terminated_(false), + running_nested_loop_(false) { client_ = V8Inspector::create(env->isolate(), this); contextCreated(env->context(), "Node.js Main Context"); } @@ -518,8 +516,7 @@ class NodeInspectorClient : public V8InspectorClient { terminated_ = false; running_nested_loop_ = true; while (!terminated_ && channel_->waitForFrontendMessage()) { - while (v8::platform::PumpMessageLoop(platform_, env_->isolate())) - {} + platform_->FlushForegroundTasksInternal(); } terminated_ = false; running_nested_loop_ = false; @@ -647,7 +644,7 @@ class NodeInspectorClient : public V8InspectorClient { private: node::Environment* env_; - v8::Platform* platform_; + node::NodePlatform* platform_; bool terminated_; bool running_nested_loop_; std::unique_ptr client_; @@ -666,7 +663,7 @@ Agent::Agent(Environment* env) : parent_env_(env), Agent::~Agent() { } -bool Agent::Start(v8::Platform* platform, const char* path, +bool Agent::Start(node::NodePlatform* platform, const char* path, const DebugOptions& options) { path_ = path == nullptr ? "" : path; debug_options_ = options; diff --git a/src/inspector_agent.h b/src/inspector_agent.h index 6ec1bc28dc2e22..8195e001c2eb3c 100644 --- a/src/inspector_agent.h +++ b/src/inspector_agent.h @@ -14,6 +14,7 @@ // Forward declaration to break recursive dependency chain with src/env.h. namespace node { class Environment; +class NodePlatform; } // namespace node #include "v8.h" @@ -42,7 +43,7 @@ class Agent { ~Agent(); // Create client_, may create io_ if option enabled - bool Start(v8::Platform* platform, const char* path, + bool Start(node::NodePlatform* platform, const char* path, const DebugOptions& options); // Stop and destroy io_ void Stop(); diff --git a/src/node.cc b/src/node.cc index a37753e385d629..1ef5adce3bb7d1 100644 --- a/src/node.cc +++ b/src/node.cc @@ -23,6 +23,7 @@ #include "node_buffer.h" #include "node_constants.h" #include "node_javascript.h" +#include "node_platform.h" #include "node_version.h" #include "node_internals.h" #include "node_revert.h" @@ -250,25 +251,26 @@ node::DebugOptions debug_options; static struct { #if NODE_USE_V8_PLATFORM - void Initialize(int thread_pool_size) { + void Initialize(int thread_pool_size, uv_loop_t* loop) { tracing_agent_ = - trace_enabled ? new tracing::Agent() : nullptr; - platform_ = v8::platform::CreateDefaultPlatform( - thread_pool_size, v8::platform::IdleTaskSupport::kDisabled, - v8::platform::InProcessStackDumping::kDisabled, - trace_enabled ? tracing_agent_->GetTracingController() : nullptr); + trace_enabled ? new tracing::Agent() : nullptr; + platform_ = new NodePlatform(thread_pool_size, loop, + trace_enabled ? tracing_agent_->GetTracingController() : nullptr); V8::InitializePlatform(platform_); tracing::TraceEventHelper::SetTracingController( - trace_enabled ? tracing_agent_->GetTracingController() : nullptr); - } - - void PumpMessageLoop(Isolate* isolate) { - v8::platform::PumpMessageLoop(platform_, isolate); + trace_enabled ? tracing_agent_->GetTracingController() : nullptr); } void Dispose() { + platform_->Shutdown(); delete platform_; platform_ = nullptr; + delete tracing_agent_; + tracing_agent_ = nullptr; + } + + void DrainVMTasks() { + platform_->DrainBackgroundTasks(); } #if HAVE_INSPECTOR @@ -293,12 +295,12 @@ static struct { tracing_agent_->Stop(); } - v8::Platform* platform_; tracing::Agent* tracing_agent_; + NodePlatform* platform_; #else // !NODE_USE_V8_PLATFORM - void Initialize(int thread_pool_size) {} - void PumpMessageLoop(Isolate* isolate) {} + void Initialize(int thread_pool_size, uv_loop_t* loop) {} void Dispose() {} + void DrainVMTasks() {} bool StartInspector(Environment *env, const char* script_path, const node::DebugOptions& options) { env->ThrowError("Node compiled with NODE_USE_V8_PLATFORM=0"); @@ -4555,19 +4557,14 @@ inline int Start(Isolate* isolate, IsolateData* isolate_data, SealHandleScope seal(isolate); bool more; do { - v8_platform.PumpMessageLoop(isolate); - more = uv_run(env.event_loop(), UV_RUN_ONCE); - - if (more == false) { - v8_platform.PumpMessageLoop(isolate); - EmitBeforeExit(&env); - - // Emit `beforeExit` if the loop became alive either after emitting - // event, or after running some callbacks. - more = uv_loop_alive(env.event_loop()); - if (uv_run(env.event_loop(), UV_RUN_NOWAIT) != 0) - more = true; - } + uv_run(env.event_loop(), UV_RUN_DEFAULT); + + EmitBeforeExit(&env); + + v8_platform.DrainVMTasks(); + // Emit `beforeExit` if the loop became alive either after emitting + // event, or after running some callbacks. + more = uv_loop_alive(env.event_loop()); } while (more == true); } @@ -4577,6 +4574,7 @@ inline int Start(Isolate* isolate, IsolateData* isolate_data, RunAtExit(&env); uv_key_delete(&thread_local_env); + v8_platform.DrainVMTasks(); WaitForInspectorDisconnect(&env); #if defined(LEAK_SANITIZER) __lsan_do_leak_check(); @@ -4665,7 +4663,7 @@ int Start(int argc, char** argv) { V8::SetEntropySource(crypto::EntropySource); #endif // HAVE_OPENSSL - v8_platform.Initialize(v8_thread_pool_size); + v8_platform.Initialize(v8_thread_pool_size, uv_default_loop()); // Enable tracing when argv has --trace-events-enabled. if (trace_enabled) { fprintf(stderr, "Warning: Trace event is an experimental feature " @@ -4682,6 +4680,12 @@ int Start(int argc, char** argv) { v8_initialized = false; V8::Dispose(); + // uv_run cannot be called from the time before the beforeExit callback + // runs until the program exits unless the event loop has any referenced + // handles after beforeExit terminates. This prevents unrefed timers + // that happen to terminate during shutdown from being run unsafely. + // Since uv_run cannot be called, uv_async handles held by the platform + // will never be fully cleaned up. v8_platform.Dispose(); delete[] exec_argv; diff --git a/src/node_platform.cc b/src/node_platform.cc new file mode 100644 index 00000000000000..3d023114ad2691 --- /dev/null +++ b/src/node_platform.cc @@ -0,0 +1,189 @@ +#include "node_platform.h" + +#include "util.h" + +namespace node { + +using v8::Isolate; +using v8::Platform; +using v8::Task; +using v8::TracingController; + +static void FlushTasks(uv_async_t* handle) { + NodePlatform* platform = static_cast(handle->data); + platform->FlushForegroundTasksInternal(); +} + +static void BackgroundRunner(void* data) { + TaskQueue* background_tasks = static_cast*>(data); + while (Task* task = background_tasks->BlockingPop()) { + task->Run(); + delete task; + background_tasks->NotifyOfCompletion(); + } +} + +NodePlatform::NodePlatform(int thread_pool_size, uv_loop_t* loop, + TracingController* tracing_controller) + : loop_(loop) { + CHECK_EQ(0, uv_async_init(loop, &flush_tasks_, FlushTasks)); + flush_tasks_.data = static_cast(this); + uv_unref(reinterpret_cast(&flush_tasks_)); + if (tracing_controller) { + tracing_controller_.reset(tracing_controller); + } else { + TracingController* controller = new TracingController(); + tracing_controller_.reset(controller); + } + for (int i = 0; i < thread_pool_size; i++) { + uv_thread_t* t = new uv_thread_t(); + if (uv_thread_create(t, BackgroundRunner, &background_tasks_) != 0) { + delete t; + break; + } + threads_.push_back(std::unique_ptr(t)); + } +} + +void NodePlatform::Shutdown() { + background_tasks_.Stop(); + for (size_t i = 0; i < threads_.size(); i++) { + CHECK_EQ(0, uv_thread_join(threads_[i].get())); + } + // uv_run cannot be called from the time before the beforeExit callback + // runs until the program exits unless the event loop has any referenced + // handles after beforeExit terminates. This prevents unrefed timers + // that happen to terminate during shutdown from being run unsafely. + // Since uv_run cannot be called, this handle will never be fully cleaned + // up. + uv_close(reinterpret_cast(&flush_tasks_), nullptr); +} + +size_t NodePlatform::NumberOfAvailableBackgroundThreads() { + return threads_.size(); +} + +static void RunForegroundTask(uv_timer_t* handle) { + Task* task = static_cast(handle->data); + task->Run(); + delete task; + uv_close(reinterpret_cast(handle), [](uv_handle_t* handle) { + delete reinterpret_cast(handle); + }); +} + +void NodePlatform::DrainBackgroundTasks() { + FlushForegroundTasksInternal(); + background_tasks_.BlockingDrain(); +} + +void NodePlatform::FlushForegroundTasksInternal() { + while (auto delayed = foreground_delayed_tasks_.Pop()) { + uint64_t delay_millis = + static_cast(delayed->second + 0.5) * 1000; + uv_timer_t* handle = new uv_timer_t(); + handle->data = static_cast(delayed->first); + uv_timer_init(loop_, handle); + // Timers may not guarantee queue ordering of events with the same delay if + // the delay is non-zero. This should not be a problem in practice. + uv_timer_start(handle, RunForegroundTask, delay_millis, 0); + uv_unref(reinterpret_cast(handle)); + delete delayed; + } + while (Task* task = foreground_tasks_.Pop()) { + task->Run(); + delete task; + } +} + +void NodePlatform::CallOnBackgroundThread(Task* task, + ExpectedRuntime expected_runtime) { + background_tasks_.Push(task); +} + +void NodePlatform::CallOnForegroundThread(Isolate* isolate, Task* task) { + foreground_tasks_.Push(task); + uv_async_send(&flush_tasks_); +} + +void NodePlatform::CallDelayedOnForegroundThread(Isolate* isolate, + Task* task, + double delay_in_seconds) { + auto pair = new std::pair(task, delay_in_seconds); + foreground_delayed_tasks_.Push(pair); + uv_async_send(&flush_tasks_); +} + +bool NodePlatform::IdleTasksEnabled(Isolate* isolate) { return false; } + +double NodePlatform::MonotonicallyIncreasingTime() { + // Convert nanos to seconds. + return uv_hrtime() / 1e9; +} + +TracingController* NodePlatform::GetTracingController() { + return tracing_controller_.get(); +} + +template +TaskQueue::TaskQueue() + : lock_(), tasks_available_(), tasks_drained_(), + outstanding_tasks_(0), stopped_(false), task_queue_() { } + +template +void TaskQueue::Push(T* task) { + Mutex::ScopedLock scoped_lock(lock_); + outstanding_tasks_++; + task_queue_.push(task); + tasks_available_.Signal(scoped_lock); +} + +template +T* TaskQueue::Pop() { + Mutex::ScopedLock scoped_lock(lock_); + T* result = nullptr; + if (!task_queue_.empty()) { + result = task_queue_.front(); + task_queue_.pop(); + } + return result; +} + +template +T* TaskQueue::BlockingPop() { + Mutex::ScopedLock scoped_lock(lock_); + while (task_queue_.empty() && !stopped_) { + tasks_available_.Wait(scoped_lock); + } + if (stopped_) { + return nullptr; + } + T* result = task_queue_.front(); + task_queue_.pop(); + return result; +} + +template +void TaskQueue::NotifyOfCompletion() { + Mutex::ScopedLock scoped_lock(lock_); + if (--outstanding_tasks_ == 0) { + tasks_drained_.Broadcast(scoped_lock); + } +} + +template +void TaskQueue::BlockingDrain() { + Mutex::ScopedLock scoped_lock(lock_); + while (outstanding_tasks_ > 0) { + tasks_drained_.Wait(scoped_lock); + } +} + +template +void TaskQueue::Stop() { + Mutex::ScopedLock scoped_lock(lock_); + stopped_ = true; + tasks_available_.Broadcast(scoped_lock); +} + +} // namespace node diff --git a/src/node_platform.h b/src/node_platform.h new file mode 100644 index 00000000000000..668fcf28e40233 --- /dev/null +++ b/src/node_platform.h @@ -0,0 +1,69 @@ +#ifndef SRC_NODE_PLATFORM_H_ +#define SRC_NODE_PLATFORM_H_ + +#include +#include + +#include "libplatform/libplatform.h" +#include "node_mutex.h" +#include "uv.h" + +namespace node { + +template +class TaskQueue { + public: + TaskQueue(); + ~TaskQueue() {} + + void Push(T* task); + T* Pop(); + T* BlockingPop(); + void NotifyOfCompletion(); + void BlockingDrain(); + void Stop(); + + private: + Mutex lock_; + ConditionVariable tasks_available_; + ConditionVariable tasks_drained_; + int outstanding_tasks_; + bool stopped_; + std::queue task_queue_; +}; + +class NodePlatform : public v8::Platform { + public: + NodePlatform(int thread_pool_size, uv_loop_t* loop, + v8::TracingController* tracing_controller); + virtual ~NodePlatform() {} + + void DrainBackgroundTasks(); + void FlushForegroundTasksInternal(); + void Shutdown(); + + // v8::Platform implementation. + size_t NumberOfAvailableBackgroundThreads() override; + void CallOnBackgroundThread(v8::Task* task, + ExpectedRuntime expected_runtime) override; + void CallOnForegroundThread(v8::Isolate* isolate, v8::Task* task) override; + void CallDelayedOnForegroundThread(v8::Isolate* isolate, v8::Task* task, + double delay_in_seconds) override; + bool IdleTasksEnabled(v8::Isolate* isolate) override; + double MonotonicallyIncreasingTime() override; + v8::TracingController* GetTracingController() override; + + private: + uv_loop_t* const loop_; + uv_async_t flush_tasks_; + TaskQueue foreground_tasks_; + TaskQueue> foreground_delayed_tasks_; + TaskQueue background_tasks_; + std::vector> threads_; + + std::unique_ptr tracing_controller_; +}; + +} // namespace node + +#endif // SRC_NODE_PLATFORM_H_ diff --git a/src/tracing/agent.cc b/src/tracing/agent.cc index 38e651ebb2a40e..1ac99bbb34bbbf 100644 --- a/src/tracing/agent.cc +++ b/src/tracing/agent.cc @@ -4,7 +4,6 @@ #include #include "env-inl.h" -#include "libplatform/libplatform.h" namespace node { namespace tracing { diff --git a/src/tracing/agent.h b/src/tracing/agent.h index cc00c53144b1fa..e781281712bbf6 100644 --- a/src/tracing/agent.h +++ b/src/tracing/agent.h @@ -1,6 +1,7 @@ #ifndef SRC_TRACING_AGENT_H_ #define SRC_TRACING_AGENT_H_ +#include "node_platform.h" #include "tracing/node_trace_buffer.h" #include "tracing/node_trace_writer.h" #include "uv.h" diff --git a/src/tracing/trace_event.cc b/src/tracing/trace_event.cc index 856b344e9d2294..f661dd5c69a07a 100644 --- a/src/tracing/trace_event.cc +++ b/src/tracing/trace_event.cc @@ -3,14 +3,14 @@ namespace node { namespace tracing { -v8::TracingController* controller_ = nullptr; +v8::TracingController* g_controller = nullptr; void TraceEventHelper::SetTracingController(v8::TracingController* controller) { - controller_ = controller; + g_controller = controller; } v8::TracingController* TraceEventHelper::GetTracingController() { - return controller_; + return g_controller; } } // namespace tracing diff --git a/src/tracing/trace_event.h b/src/tracing/trace_event.h index 24806d375fad58..61808eb94f75a1 100644 --- a/src/tracing/trace_event.h +++ b/src/tracing/trace_event.h @@ -7,6 +7,7 @@ #include +#include "node_platform.h" #include "v8-platform.h" #include "trace_event_common.h" diff --git a/test/cctest/node_test_fixture.h b/test/cctest/node_test_fixture.h index e52b1b5dfd47ca..f30823a8fdb46a 100644 --- a/test/cctest/node_test_fixture.h +++ b/test/cctest/node_test_fixture.h @@ -66,7 +66,12 @@ struct Argv { int nr_args_; }; +uv_loop_t current_loop; + class NodeTestFixture : public ::testing::Test { + public: + static uv_loop_t* CurrentLoop() { return ¤t_loop; } + protected: v8::Isolate::CreateParams params_; ArrayBufferAllocator allocator_; @@ -77,7 +82,8 @@ class NodeTestFixture : public ::testing::Test { } virtual void SetUp() { - platform_ = v8::platform::CreateDefaultPlatform(); + CHECK_EQ(0, uv_loop_init(¤t_loop)); + platform_ = new node::NodePlatform(8, ¤t_loop, nullptr); v8::V8::InitializePlatform(platform_); v8::V8::Initialize(); params_.array_buffer_allocator = &allocator_; @@ -86,13 +92,18 @@ class NodeTestFixture : public ::testing::Test { virtual void TearDown() { if (platform_ == nullptr) return; + platform_->Shutdown(); + while (uv_loop_alive(¤t_loop)) { + uv_run(¤t_loop, UV_RUN_ONCE); + } v8::V8::ShutdownPlatform(); delete platform_; platform_ = nullptr; + CHECK_EQ(0, uv_loop_close(¤t_loop)); } private: - v8::Platform* platform_ = nullptr; + node::NodePlatform* platform_ = nullptr; }; #endif // TEST_CCTEST_NODE_TEST_FIXTURE_H_ diff --git a/test/cctest/test_environment.cc b/test/cctest/test_environment.cc index aee8e795ecb6ab..4651e865a99e7d 100644 --- a/test/cctest/test_environment.cc +++ b/test/cctest/test_environment.cc @@ -31,7 +31,8 @@ class EnvironmentTest : public NodeTestFixture { const Argv& argv) { context_ = v8::Context::New(isolate); CHECK(!context_.IsEmpty()); - isolate_data_ = CreateIsolateData(isolate, uv_default_loop()); + isolate_data_ = CreateIsolateData(isolate, + NodeTestFixture::CurrentLoop()); CHECK_NE(nullptr, isolate_data_); environment_ = CreateEnvironment(isolate_data_, context_,