Skip to content
This repository has been archived by the owner on Aug 31, 2018. It is now read-only.

Commit

Permalink
src: keep track of open requests
Browse files Browse the repository at this point in the history
Workers cannot shut down while requests are open, so keep a counter
that is increased whenever libuv requests are made and decreased
whenever their callback is called.
  • Loading branch information
addaleax committed Sep 28, 2017
1 parent 55356eb commit a42c922
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 12 deletions.
10 changes: 9 additions & 1 deletion src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ inline Environment::Environment(IsolateData* isolate_data,
#if HAVE_INSPECTOR
inspector_agent_(this),
#endif
handle_cleanup_waiting_(0),
http_parser_buffer_(nullptr),
fs_stats_field_array_(nullptr),
context_(context->GetIsolate(), context) {
Expand Down Expand Up @@ -397,6 +396,15 @@ inline void Environment::CloseHandle(T* handle, OnCloseCallback callback) {
});
}

void Environment::IncreaseWaitingRequestCounter() {
request_waiting_++;
}

void Environment::DecreaseWaitingRequestCounter() {
request_waiting_--;
CHECK_GE(request_waiting_, 0);
}

inline uv_loop_t* Environment::event_loop() const {
return isolate_data()->event_loop();
}
Expand Down
5 changes: 4 additions & 1 deletion src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,11 @@ void Environment::CleanupHandles() {
delete hc;
}

while (handle_cleanup_waiting_ != 0 || !handle_wrap_queue_.IsEmpty())
while (handle_cleanup_waiting_ != 0 ||
request_waiting_ != 0 ||
!handle_wrap_queue_.IsEmpty()) {
uv_run(event_loop(), UV_RUN_ONCE);
}
}

void Environment::StartProfilerIdleNotifier() {
Expand Down
6 changes: 5 additions & 1 deletion src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,9 @@ class Environment {
template <typename T, typename OnCloseCallback>
inline void CloseHandle(T* handle, OnCloseCallback callback);

inline void IncreaseWaitingRequestCounter();
inline void DecreaseWaitingRequestCounter();

inline AsyncHooks* async_hooks();
inline DomainFlag* domain_flag();
inline TickInfo* tick_info();
Expand Down Expand Up @@ -724,7 +727,8 @@ class Environment {
ReqWrapQueue req_wrap_queue_;
ListHead<HandleCleanup,
&HandleCleanup::handle_cleanup_queue_> handle_cleanup_queue_;
int handle_cleanup_waiting_;
int handle_cleanup_waiting_ = 0;
int request_waiting_ = 0;

double* heap_statistics_buffer_ = nullptr;
double* heap_space_statistics_buffer_ = nullptr;
Expand Down
10 changes: 6 additions & 4 deletions src/node_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3382,6 +3382,9 @@ class Work : public node::AsyncResource {
// Establish a handle scope here so that every callback doesn't have to.
// Also it is needed for the exception-handling below.
v8::HandleScope scope(env->isolate);
auto env_ = node::Environment::GetCurrent(env->isolate);
env_->DecreaseWaitingRequestCounter();

CallbackScope callback_scope(work);

work->_complete(env, ConvertUVErrorCode(status), work->_data);
Expand Down Expand Up @@ -3470,13 +3473,12 @@ napi_status napi_queue_async_work(napi_env env, napi_async_work work) {
CHECK_ARG(env, work);

// Consider: Encapsulate the uv_loop_t into an opaque pointer parameter.
// Currently the environment event loop is the same as the UV default loop.
// Someday (if node ever supports multiple isolates), it may be better to get
// the loop from node::Environment::GetCurrent(env->isolate)->event_loop();
uv_loop_t* event_loop = uv_default_loop();
auto env_ = node::Environment::GetCurrent(env->isolate);
uv_loop_t* event_loop = env_->event_loop();

uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);

env_->IncreaseWaitingRequestCounter();
CALL_UV(env, uv_queue_work(event_loop,
w->Request(),
uvimpl::Work::ExecuteCallback,
Expand Down
17 changes: 15 additions & 2 deletions src/node_crypto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5272,8 +5272,13 @@ void PBKDF2Request::After() {


void PBKDF2Request::After(uv_work_t* work_req, int status) {
CHECK_EQ(status, 0);
PBKDF2Request* req = ContainerOf(&PBKDF2Request::work_req_, work_req);
req->env()->DecreaseWaitingRequestCounter();
if (status == UV_ECANCELED) {
delete req;
return;
}
CHECK_EQ(status, 0);
req->After();
delete req;
}
Expand Down Expand Up @@ -5380,6 +5385,7 @@ void PBKDF2(const FunctionCallbackInfo<Value>& args) {
.FromJust();
}

env->IncreaseWaitingRequestCounter();
uv_queue_work(env->event_loop(),
req->work_req(),
PBKDF2Request::Work,
Expand Down Expand Up @@ -5532,10 +5538,15 @@ void RandomBytesCheck(RandomBytesRequest* req, Local<Value> (*argv)[2]) {


void RandomBytesAfter(uv_work_t* work_req, int status) {
CHECK_EQ(status, 0);
RandomBytesRequest* req =
ContainerOf(&RandomBytesRequest::work_req_, work_req);
Environment* env = req->env();
env->DecreaseWaitingRequestCounter();
if (status == UV_ECANCELED) {
delete req;
return;
}
CHECK_EQ(status, 0);
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
Local<Value> argv[2];
Expand Down Expand Up @@ -5589,6 +5600,7 @@ void RandomBytes(const FunctionCallbackInfo<Value>& args) {
.FromJust();
}

env->IncreaseWaitingRequestCounter();
uv_queue_work(env->event_loop(),
req->work_req(),
RandomBytesWork,
Expand Down Expand Up @@ -5635,6 +5647,7 @@ void RandomBytesBuffer(const FunctionCallbackInfo<Value>& args) {
.FromJust();
}

env->IncreaseWaitingRequestCounter();
uv_queue_work(env->event_loop(),
req->work_req(),
RandomBytesWork,
Expand Down
13 changes: 10 additions & 3 deletions src/node_zlib.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ class ZCtx : public AsyncWrap {
}

// async version
env->IncreaseWaitingRequestCounter();
uv_queue_work(env->event_loop(), work_req, ZCtx::Process, ZCtx::After);
}

Expand Down Expand Up @@ -366,10 +367,17 @@ class ZCtx : public AsyncWrap {

// v8 land!
static void After(uv_work_t* work_req, int status) {
CHECK_EQ(status, 0);

ZCtx* ctx = ContainerOf(&ZCtx::work_req_, work_req);
Environment* env = ctx->env();
ctx->write_in_progress_ = false;

env->DecreaseWaitingRequestCounter();
if (status == UV_ECANCELED) {
ctx->Close();
return;
}

CHECK_EQ(status, 0);

HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
Expand All @@ -379,7 +387,6 @@ class ZCtx : public AsyncWrap {

ctx->write_result_[0] = ctx->strm_.avail_out;
ctx->write_result_[1] = ctx->strm_.avail_in;
ctx->write_in_progress_ = false;

// call the write() cb
Local<Function> cb = PersistentToLocal(env->isolate(),
Expand Down
2 changes: 2 additions & 0 deletions src/req-wrap-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ struct MakeLibuvRequestCallback<ReqT, void(*)(ReqT*, Args...)> {

static void Wrapper(ReqT* req, Args... args) {
ReqWrap<ReqT>* req_wrap = ContainerOf(&ReqWrap<ReqT>::req_, req);
req_wrap->env()->DecreaseWaitingRequestCounter();
T original_callback = reinterpret_cast<T>(req_wrap->original_callback_);
original_callback(req, args...);
}
Expand All @@ -126,6 +127,7 @@ template <typename T>
template <typename LibuvFunction, typename... Args>
int ReqWrap<T>::Dispatch(LibuvFunction fn, Args... args) {
Dispatched();
env()->IncreaseWaitingRequestCounter();

// This expands as:
//
Expand Down

0 comments on commit a42c922

Please sign in to comment.