Skip to content

Commit

Permalink
src: cancel pending delayed platform tasks on exit
Browse files Browse the repository at this point in the history
Worker threads need an event loop without active libuv handles in
order to shut down. One source of handles that was previously
not accounted for were delayed V8 tasks; these create timers
that would be standing in the way of clearing the event loop.

To solve this, keep track of the scheduled tasks in a list
and close their timer handles before the corresponding isolate/loop
is removed from the platform.

It is not clear from the V8 documentation what the expectation is
with respect to pending background tasks at the end of the
isolate lifetime; however, an alternative approach of executing
these scheduled tasks when flushing them led to an infinite loop
of tasks scheduling each other; so it seems safe to assume that
the behaviour implemented in this patch is at least acceptable.

Original-PR-URL: ayojs/ayo#120
Original-Reviewed-By: Stephen Belanger <admin@stephenbelanger.com>
PR-URL: #16700
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
addaleax authored and MylesBorins committed Dec 11, 2017
1 parent 37a60a8 commit c2431d5
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 16 deletions.
6 changes: 6 additions & 0 deletions src/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ static struct {
platform_->DrainBackgroundTasks(isolate);
}

void CancelVMTasks(Isolate* isolate) {
platform_->CancelPendingDelayedTasks(isolate);
}

#if HAVE_INSPECTOR
bool StartInspector(Environment *env, const char* script_path,
const node::DebugOptions& options) {
Expand Down Expand Up @@ -316,6 +320,7 @@ static struct {
void Initialize(int thread_pool_size) {}
void Dispose() {}
void DrainVMTasks(Isolate* isolate) {}
void CancelVMTasks(Isolate* isolate) {}
bool StartInspector(Environment *env, const char* script_path,
const node::DebugOptions& options) {
env->ThrowError("Node compiled with NODE_USE_V8_PLATFORM=0");
Expand Down Expand Up @@ -4891,6 +4896,7 @@ inline int Start(Isolate* isolate, IsolateData* isolate_data,
uv_key_delete(&thread_local_env);

v8_platform.DrainVMTasks(isolate);
v8_platform.CancelVMTasks(isolate);
WaitForInspectorDisconnect(&env);
#if defined(LEAK_SANITIZER)
__lsan_do_leak_check();
Expand Down
1 change: 1 addition & 0 deletions src/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ class MultiIsolatePlatform : public v8::Platform {
public:
virtual ~MultiIsolatePlatform() { }
virtual void DrainBackgroundTasks(v8::Isolate* isolate) = 0;
virtual void CancelPendingDelayedTasks(v8::Isolate* isolate) = 0;

// These will be called by the `IsolateData` creation/destruction functions.
virtual void RegisterIsolate(IsolateData* isolate_data,
Expand Down
54 changes: 39 additions & 15 deletions src/node_platform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "env.h"
#include "env-inl.h"
#include "util.h"
#include <algorithm>

namespace node {

Expand Down Expand Up @@ -45,13 +46,17 @@ void PerIsolatePlatformData::CallOnForegroundThread(Task* task) {

void PerIsolatePlatformData::CallDelayedOnForegroundThread(
Task* task, double delay_in_seconds) {
auto pair = new std::pair<Task*, double>(task, delay_in_seconds);
foreground_delayed_tasks_.Push(pair);
auto delayed = new DelayedTask();
delayed->task = task;
delayed->platform_data = this;
delayed->timeout = delay_in_seconds;
foreground_delayed_tasks_.Push(delayed);
uv_async_send(flush_tasks_);
}

PerIsolatePlatformData::~PerIsolatePlatformData() {
FlushForegroundTasksInternal();
CancelPendingDelayedTasks();

uv_close(reinterpret_cast<uv_handle_t*>(flush_tasks_),
[](uv_handle_t* handle) {
Expand Down Expand Up @@ -120,7 +125,7 @@ size_t NodePlatform::NumberOfAvailableBackgroundThreads() {
return threads_.size();
}

static void RunForegroundTask(Task* task) {
void PerIsolatePlatformData::RunForegroundTask(Task* task) {
Isolate* isolate = Isolate::GetCurrent();
HandleScope scope(isolate);
Environment* env = Environment::GetCurrent(isolate);
Expand All @@ -130,14 +135,29 @@ static void RunForegroundTask(Task* task) {
delete task;
}

static void RunForegroundTask(uv_timer_t* handle) {
Task* task = static_cast<Task*>(handle->data);
RunForegroundTask(task);
uv_close(reinterpret_cast<uv_handle_t*>(handle), [](uv_handle_t* handle) {
delete reinterpret_cast<uv_timer_t*>(handle);
void PerIsolatePlatformData::RunForegroundTask(uv_timer_t* handle) {
DelayedTask* delayed = static_cast<DelayedTask*>(handle->data);
auto& tasklist = delayed->platform_data->scheduled_delayed_tasks_;
auto it = std::find(tasklist.begin(), tasklist.end(), delayed);
CHECK_NE(it, tasklist.end());
tasklist.erase(it);
RunForegroundTask(delayed->task);
uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
[](uv_handle_t* handle) {
delete static_cast<DelayedTask*>(handle->data);
});
}

void PerIsolatePlatformData::CancelPendingDelayedTasks() {
for (auto delayed : scheduled_delayed_tasks_) {
uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
[](uv_handle_t* handle) {
delete static_cast<DelayedTask*>(handle->data);
});
}
scheduled_delayed_tasks_.clear();
}

void NodePlatform::DrainBackgroundTasks(Isolate* isolate) {
PerIsolatePlatformData* per_isolate = ForIsolate(isolate);

Expand All @@ -152,18 +172,18 @@ void NodePlatform::DrainBackgroundTasks(Isolate* isolate) {

bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
bool did_work = false;

while (auto delayed = foreground_delayed_tasks_.Pop()) {
did_work = true;
uint64_t delay_millis =
static_cast<uint64_t>(delayed->second + 0.5) * 1000;
uv_timer_t* handle = new uv_timer_t();
handle->data = static_cast<void*>(delayed->first);
uv_timer_init(loop_, handle);
static_cast<uint64_t>(delayed->timeout + 0.5) * 1000;
delayed->timer.data = static_cast<void*>(delayed);
uv_timer_init(loop_, &delayed->timer);
// Timers may not guarantee queue ordering of events with the same delay if
// the delay is non-zero. This should not be a problem in practice.
uv_timer_start(handle, RunForegroundTask, delay_millis, 0);
uv_unref(reinterpret_cast<uv_handle_t*>(handle));
delete delayed;
uv_timer_start(&delayed->timer, RunForegroundTask, delay_millis, 0);
uv_unref(reinterpret_cast<uv_handle_t*>(&delayed->timer));
scheduled_delayed_tasks_.push_back(delayed);
}
while (Task* task = foreground_tasks_.Pop()) {
did_work = true;
Expand Down Expand Up @@ -199,6 +219,10 @@ void NodePlatform::FlushForegroundTasks(v8::Isolate* isolate) {
ForIsolate(isolate)->FlushForegroundTasksInternal();
}

void NodePlatform::CancelPendingDelayedTasks(v8::Isolate* isolate) {
ForIsolate(isolate)->CancelPendingDelayedTasks();
}

bool NodePlatform::IdleTasksEnabled(Isolate* isolate) { return false; }

double NodePlatform::MonotonicallyIncreasingTime() {
Expand Down
16 changes: 15 additions & 1 deletion src/node_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace node {

class NodePlatform;
class IsolateData;
class PerIsolatePlatformData;

template <class T>
class TaskQueue {
Expand All @@ -37,6 +38,13 @@ class TaskQueue {
std::queue<T*> task_queue_;
};

struct DelayedTask {
v8::Task* task;
uv_timer_t timer;
double timeout;
PerIsolatePlatformData* platform_data;
};

class PerIsolatePlatformData {
public:
PerIsolatePlatformData(v8::Isolate* isolate, uv_loop_t* loop);
Expand All @@ -52,15 +60,20 @@ class PerIsolatePlatformData {

// Returns true iff work was dispatched or executed.
bool FlushForegroundTasksInternal();
void CancelPendingDelayedTasks();

private:
static void FlushTasks(uv_async_t* handle);
static void RunForegroundTask(v8::Task* task);
static void RunForegroundTask(uv_timer_t* timer);

int ref_count_ = 1;
v8::Isolate* isolate_;
uv_loop_t* const loop_;
uv_async_t* flush_tasks_ = nullptr;
TaskQueue<v8::Task> foreground_tasks_;
TaskQueue<std::pair<v8::Task*, double>> foreground_delayed_tasks_;
TaskQueue<DelayedTask> foreground_delayed_tasks_;
std::vector<DelayedTask*> scheduled_delayed_tasks_;
};

class NodePlatform : public MultiIsolatePlatform {
Expand All @@ -69,6 +82,7 @@ class NodePlatform : public MultiIsolatePlatform {
virtual ~NodePlatform() {}

void DrainBackgroundTasks(v8::Isolate* isolate) override;
void CancelPendingDelayedTasks(v8::Isolate* isolate) override;
void Shutdown();

// v8::Platform implementation.
Expand Down

0 comments on commit c2431d5

Please sign in to comment.