Skip to content

Commit

Permalink
src: distinguish refed/unrefed threadsafe Immediates
Browse files Browse the repository at this point in the history
In some situations, it can be useful to use threadsafe callbacks
on an `Environment` to perform cleanup operations that should run
even when the process would otherwise be ending.

PR-URL: nodejs#33320
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
addaleax authored and codebytere committed Jun 9, 2020
1 parent b4d9034 commit b924910
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 16 deletions.
4 changes: 2 additions & 2 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -752,9 +752,9 @@ void Environment::SetUnrefImmediate(Fn&& cb) {
}

template <typename Fn>
void Environment::SetImmediateThreadsafe(Fn&& cb) {
void Environment::SetImmediateThreadsafe(Fn&& cb, bool refed) {
auto callback =
native_immediates_threadsafe_.CreateCallback(std::move(cb), false);
native_immediates_threadsafe_.CreateCallback(std::move(cb), refed);
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_threadsafe_.Push(std::move(callback));
Expand Down
29 changes: 17 additions & 12 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -704,19 +704,10 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) {
// exceptions, so we do not need to handle that.
RunAndClearInterrupts();

// It is safe to check .size() first, because there is a causal relationship
// between pushes to the threadsafe and this function being called.
// For the common case, it's worth checking the size first before establishing
// a mutex lock.
if (native_immediates_threadsafe_.size() > 0) {
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_.ConcatMove(std::move(native_immediates_threadsafe_));
}

auto drain_list = [&]() {
auto drain_list = [&](NativeImmediateQueue* queue) {
TryCatchScope try_catch(this);
DebugSealHandleScope seal_handle_scope(isolate());
while (auto head = native_immediates_.Shift()) {
while (auto head = queue->Shift()) {
if (head->is_refed())
ref_count++;

Expand All @@ -734,12 +725,26 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) {
}
return false;
};
while (drain_list()) {}
while (drain_list(&native_immediates_)) {}

immediate_info()->ref_count_dec(ref_count);

if (immediate_info()->ref_count() == 0)
ToggleImmediateRef(false);

// It is safe to check .size() first, because there is a causal relationship
// between pushes to the threadsafe immediate list and this function being
// called. For the common case, it's worth checking the size first before
// establishing a mutex lock.
// This is intentionally placed after the `ref_count` handling, because when
// refed threadsafe immediates are created, they are not counted towards the
// count in immediate_info() either.
NativeImmediateQueue threadsafe_immediates;
if (native_immediates_threadsafe_.size() > 0) {
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
threadsafe_immediates.ConcatMove(std::move(native_immediates_threadsafe_));
}
while (drain_list(&threadsafe_immediates)) {}
}

void Environment::RequestInterruptFromV8() {
Expand Down
2 changes: 1 addition & 1 deletion src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -1185,7 +1185,7 @@ class Environment : public MemoryRetainer {
inline void SetUnrefImmediate(Fn&& cb);
template <typename Fn>
// This behaves like SetImmediate() but can be called from any thread.
inline void SetImmediateThreadsafe(Fn&& cb);
inline void SetImmediateThreadsafe(Fn&& cb, bool refed = true);
// This behaves like V8's Isolate::RequestInterrupt(), but also accounts for
// the event loop (i.e. combines the V8 function with SetImmediate()).
// The passed callback may not throw exceptions.
Expand Down
2 changes: 1 addition & 1 deletion src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
env, std::move(snapshot));
Local<Value> args[] = { stream->object() };
taker->MakeCallback(env->ondone_string(), arraysize(args), args);
});
}, /* refed */ false);
});
args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>());
}
Expand Down

0 comments on commit b924910

Please sign in to comment.