diff --git a/src/node_platform.cc b/src/node_platform.cc index 6a3ae2e5dc84ab..92e9b371c5be6f 100644 --- a/src/node_platform.cc +++ b/src/node_platform.cc @@ -2,6 +2,7 @@ #include "node_internals.h" #include "env-inl.h" +#include "debug_utils.h" #include "util.h" #include @@ -29,7 +30,127 @@ static void PlatformWorkerThread(void* data) { } // namespace +class WorkerThreadsTaskRunner::DelayedTaskScheduler { + public: + explicit DelayedTaskScheduler(TaskQueue* tasks) + : pending_worker_tasks_(tasks) {} + + std::unique_ptr Start() { + auto start_thread = [](void* data) { + static_cast(data)->Run(); + }; + std::unique_ptr t { new uv_thread_t() }; + uv_sem_init(&ready_, 0); + CHECK_EQ(0, uv_thread_create(t.get(), start_thread, this)); + uv_sem_wait(&ready_); + uv_sem_destroy(&ready_); + return t; + } + + void PostDelayedTask(std::unique_ptr task, double delay_in_seconds) { + tasks_.Push(std::unique_ptr(new ScheduleTask(this, std::move(task), + delay_in_seconds))); + uv_async_send(&flush_tasks_); + } + + void Stop() { + tasks_.Push(std::unique_ptr(new StopTask(this))); + uv_async_send(&flush_tasks_); + } + + private: + void Run() { + TRACE_EVENT_METADATA1("__metadata", "thread_name", "name", + "WorkerThreadsTaskRunner::DelayedTaskScheduler"); + loop_.data = this; + CHECK_EQ(0, uv_loop_init(&loop_)); + flush_tasks_.data = this; + CHECK_EQ(0, uv_async_init(&loop_, &flush_tasks_, FlushTasks)); + uv_sem_post(&ready_); + + uv_run(&loop_, UV_RUN_DEFAULT); + CheckedUvLoopClose(&loop_); + } + + static void FlushTasks(uv_async_t* flush_tasks) { + DelayedTaskScheduler* scheduler = + ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop); + while (std::unique_ptr task = scheduler->tasks_.Pop()) + task->Run(); + } + + class StopTask : public Task { + public: + explicit StopTask(DelayedTaskScheduler* scheduler): scheduler_(scheduler) {} + + void Run() override { + std::vector timers; + for (uv_timer_t* timer : scheduler_->timers_) + timers.push_back(timer); + for (uv_timer_t* timer : timers) + scheduler_->TakeTimerTask(timer); + uv_close(reinterpret_cast(&scheduler_->flush_tasks_), + [](uv_handle_t* handle) {}); + } + + private: + DelayedTaskScheduler* scheduler_; + }; + + class ScheduleTask : public Task { + public: + ScheduleTask(DelayedTaskScheduler* scheduler, + std::unique_ptr task, + double delay_in_seconds) + : scheduler_(scheduler), + task_(std::move(task)), + delay_in_seconds_(delay_in_seconds) {} + + void Run() override { + uint64_t delay_millis = + static_cast(delay_in_seconds_ + 0.5) * 1000; + std::unique_ptr timer(new uv_timer_t()); + CHECK_EQ(0, uv_timer_init(&scheduler_->loop_, timer.get())); + timer->data = task_.release(); + CHECK_EQ(0, uv_timer_start(timer.get(), RunTask, delay_millis, 0)); + scheduler_->timers_.insert(timer.release()); + } + + private: + DelayedTaskScheduler* scheduler_; + std::unique_ptr task_; + double delay_in_seconds_; + }; + + static void RunTask(uv_timer_t* timer) { + DelayedTaskScheduler* scheduler = + ContainerOf(&DelayedTaskScheduler::loop_, timer->loop); + scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer)); + } + + std::unique_ptr TakeTimerTask(uv_timer_t* timer) { + std::unique_ptr task(static_cast(timer->data)); + uv_timer_stop(timer); + uv_close(reinterpret_cast(timer), [](uv_handle_t* handle) { + delete reinterpret_cast(handle); + }); + timers_.erase(timer); + return task; + } + + uv_sem_t ready_; + TaskQueue* pending_worker_tasks_; + + TaskQueue tasks_; + uv_loop_t loop_; + uv_async_t flush_tasks_; + std::unordered_set timers_; +}; + WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) { + delayed_task_scheduler_.reset( + new DelayedTaskScheduler(&pending_worker_tasks_)); + threads_.push_back(delayed_task_scheduler_->Start()); for (int i = 0; i < thread_pool_size; i++) { std::unique_ptr t { new uv_thread_t() }; if (uv_thread_create(t.get(), PlatformWorkerThread, @@ -46,7 +167,7 @@ void WorkerThreadsTaskRunner::PostTask(std::unique_ptr task) { void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr task, double delay_in_seconds) { - UNREACHABLE(); + delayed_task_scheduler_->PostDelayedTask(std::move(task), delay_in_seconds); } void WorkerThreadsTaskRunner::BlockingDrain() { @@ -55,6 +176,7 @@ void WorkerThreadsTaskRunner::BlockingDrain() { void WorkerThreadsTaskRunner::Shutdown() { pending_worker_tasks_.Stop(); + delayed_task_scheduler_->Stop(); for (size_t i = 0; i < threads_.size(); i++) { CHECK_EQ(0, uv_thread_join(threads_[i].get())); } diff --git a/src/node_platform.h b/src/node_platform.h index 62301a302b22e0..69968c49f80bb3 100644 --- a/src/node_platform.h +++ b/src/node_platform.h @@ -109,6 +109,10 @@ class WorkerThreadsTaskRunner { private: TaskQueue pending_worker_tasks_; + + class DelayedTaskScheduler; + std::unique_ptr delayed_task_scheduler_; + std::vector> threads_; }; diff --git a/test/sequential/test-inspector-runtime-evaluate-with-timeout.js b/test/sequential/test-inspector-runtime-evaluate-with-timeout.js new file mode 100644 index 00000000000000..1def39a82fead4 --- /dev/null +++ b/test/sequential/test-inspector-runtime-evaluate-with-timeout.js @@ -0,0 +1,21 @@ +// Flags: --expose-internals +'use strict'; + +const common = require('../common'); +common.skipIfInspectorDisabled(); + +(async function test() { + const { strictEqual } = require('assert'); + const { Session } = require('inspector'); + const { promisify } = require('util'); + + const session = new Session(); + session.connect(); + session.post = promisify(session.post); + const result = await session.post('Runtime.evaluate', { + expression: 'for(;;);', + timeout: 0 + }).catch((e) => e); + strictEqual(result.message, 'Execution was terminated'); + session.disconnect(); +})();