Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

timers: refactor setImmediate error handling #17879

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 36 additions & 29 deletions lib/timers.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ const { kInit, kDestroy, kAsyncIdCounter } = async_wrap.constants;
const async_id_symbol = timerInternals.async_id_symbol;
const trigger_async_id_symbol = timerInternals.trigger_async_id_symbol;

const [activateImmediateCheck, scheduledImmediateCountArray] =
// *Must* match Environment::ImmediateInfo::Fields in src/env.h.
const kCount = 0;
const kHasOutstanding = 1;

const [activateImmediateCheck, immediateInfo] =
setImmediateCallback(processImmediate);

// The Timeout class
Expand Down Expand Up @@ -627,16 +631,23 @@ ImmediateList.prototype.remove = function(item) {
};

// Create a single linked list instance only once at startup
var immediateQueue = new ImmediateList();
const immediateQueue = new ImmediateList();

// If an uncaught exception was thrown during execution of immediateQueue,
// this queue will store all remaining Immediates that need to run upon
// resolution of all error handling (if process is still alive).
const outstandingQueue = new ImmediateList();


function processImmediate() {
var immediate = immediateQueue.head;
var tail = immediateQueue.tail;
const queue = outstandingQueue.head !== null ?
outstandingQueue : immediateQueue;
var immediate = queue.head;
var tail = queue.tail;

// Clear the linked list early in case new `setImmediate()` calls occur while
// immediate callbacks are executed
immediateQueue.head = immediateQueue.tail = null;
queue.head = queue.tail = null;

while (immediate !== null) {
if (!immediate._onImmediate) {
Expand All @@ -645,9 +656,14 @@ function processImmediate() {
}

// Save next in case `clearImmediate(immediate)` is called from callback
var next = immediate._idleNext;
const next = immediate._idleNext;

const asyncId = immediate[async_id_symbol];
emitBefore(asyncId, immediate[trigger_async_id_symbol]);

tryOnImmediate(immediate, tail);
tryOnImmediate(immediate, next, tail);

emitAfter(asyncId);

// If `clearImmediate(immediate)` wasn't called from the callback, use the
// `immediate`'s next item
Expand All @@ -656,45 +672,36 @@ function processImmediate() {
else
immediate = next;
}

immediateInfo[kHasOutstanding] = 0;
}

// An optimization so that the try/finally only de-optimizes (since at least v8
// 4.7) what is in this smaller function.
function tryOnImmediate(immediate, oldTail) {
function tryOnImmediate(immediate, next, oldTail) {
var threw = true;
emitBefore(immediate[async_id_symbol], immediate[trigger_async_id_symbol]);
try {
// make the actual call outside the try/finally to allow it to be optimized
runCallback(immediate);
threw = false;
} finally {
immediate._onImmediate = null;
if (!threw)
emitAfter(immediate[async_id_symbol]);

if (!immediate._destroyed) {
immediate._destroyed = true;
scheduledImmediateCountArray[0]--;
immediateInfo[kCount]--;

if (async_hook_fields[kDestroy] > 0) {
emitDestroy(immediate[async_id_symbol]);
}
}

if (threw && immediate._idleNext !== null) {
// Handle any remaining on next tick, assuming we're still alive to do so.
const curHead = immediateQueue.head;
const next = immediate._idleNext;
if (curHead !== null) {
curHead._idlePrev = oldTail;
oldTail._idleNext = curHead;
next._idlePrev = null;
immediateQueue.head = next;
} else {
immediateQueue.head = next;
immediateQueue.tail = oldTail;
}
process.nextTick(processImmediate);
if (threw && (immediate._idleNext !== null || next !== null)) {
// Handle any remaining Immediates after error handling has resolved,
// assuming we're still alive to do so.
outstandingQueue.head = immediate._idleNext || next;
outstandingQueue.tail = oldTail;
immediateInfo[kHasOutstanding] = 1;
}
}
}
Expand Down Expand Up @@ -728,9 +735,9 @@ function Immediate(callback, args) {
this);
}

if (scheduledImmediateCountArray[0] === 0)
if (immediateInfo[kCount] === 0)
activateImmediateCheck();
scheduledImmediateCountArray[0]++;
immediateInfo[kCount]++;

immediateQueue.append(this);
}
Expand Down Expand Up @@ -776,7 +783,7 @@ exports.clearImmediate = function(immediate) {
if (!immediate) return;

if (!immediate._destroyed) {
scheduledImmediateCountArray[0]--;
immediateInfo[kCount]--;
immediate._destroyed = true;

if (async_hook_fields[kDestroy] > 0) {
Expand Down
39 changes: 31 additions & 8 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,30 @@ inline bool Environment::AsyncCallbackScope::in_makecallback() const {
return env_->makecallback_cntr_ > 1;
}

inline Environment::ImmediateInfo::ImmediateInfo(v8::Isolate* isolate)
: fields_(isolate, kFieldsCount) {}

inline AliasedBuffer<uint32_t, v8::Uint32Array>&
Environment::ImmediateInfo::fields() {
return fields_;
}

inline uint32_t Environment::ImmediateInfo::count() const {
return fields_[kCount];
}

inline bool Environment::ImmediateInfo::has_outstanding() const {
return fields_[kHasOutstanding] == 1;
}

inline void Environment::ImmediateInfo::count_inc(uint32_t increment) {
fields_[kCount] = fields_[kCount] + increment;
}

inline void Environment::ImmediateInfo::count_dec(uint32_t decrement) {
fields_[kCount] = fields_[kCount] - decrement;
}

inline Environment::TickInfo::TickInfo(v8::Isolate* isolate)
: fields_(isolate, kFieldsCount) {}

Expand Down Expand Up @@ -263,6 +287,7 @@ inline Environment::Environment(IsolateData* isolate_data,
v8::Local<v8::Context> context)
: isolate_(context->GetIsolate()),
isolate_data_(isolate_data),
immediate_info_(context->GetIsolate()),
tick_info_(context->GetIsolate()),
timer_base_(uv_now(isolate_data->event_loop())),
using_domains_(false),
Expand All @@ -271,7 +296,6 @@ inline Environment::Environment(IsolateData* isolate_data,
abort_on_uncaught_exception_(false),
emit_napi_warning_(true),
makecallback_cntr_(0),
scheduled_immediate_count_(isolate_, 1),
should_abort_on_uncaught_toggle_(isolate_, 1),
#if HAVE_INSPECTOR
inspector_agent_(new inspector::Agent(this)),
Expand Down Expand Up @@ -371,6 +395,10 @@ inline Environment::AsyncHooks* Environment::async_hooks() {
return &async_hooks_;
}

inline Environment::ImmediateInfo* Environment::immediate_info() {
return &immediate_info_;
}

inline Environment::TickInfo* Environment::tick_info() {
return &tick_info_;
}
Expand Down Expand Up @@ -508,11 +536,6 @@ inline void Environment::set_fs_stats_field_array(double* fields) {
fs_stats_field_array_ = fields;
}

inline AliasedBuffer<uint32_t, v8::Uint32Array>&
Environment::scheduled_immediate_count() {
return scheduled_immediate_count_;
}

void Environment::SetImmediate(native_immediate_callback cb,
void* data,
v8::Local<v8::Object> obj) {
Expand All @@ -522,9 +545,9 @@ void Environment::SetImmediate(native_immediate_callback cb,
std::unique_ptr<v8::Persistent<v8::Object>>(
obj.IsEmpty() ? nullptr : new v8::Persistent<v8::Object>(isolate_, obj))
});
if (scheduled_immediate_count_[0] == 0)
if (immediate_info()->count() == 0)
ActivateImmediateCheck();
scheduled_immediate_count_[0] = scheduled_immediate_count_[0] + 1;
immediate_info()->count_inc(1);
}

inline performance::performance_state* Environment::performance_state() {
Expand Down
20 changes: 11 additions & 9 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,14 +283,14 @@ void Environment::RunAndClearNativeImmediates() {
}

#ifdef DEBUG
CHECK_GE(scheduled_immediate_count_[0], count);
CHECK_GE(immediate_info()->count(), count);
#endif
scheduled_immediate_count_[0] = scheduled_immediate_count_[0] - count;
immediate_info()->count_dec(count);
}
}

static bool MaybeStopImmediate(Environment* env) {
if (env->scheduled_immediate_count()[0] == 0) {
if (env->immediate_info()->count() == 0) {
uv_check_stop(env->immediate_check_handle());
uv_idle_stop(env->immediate_idle_handle());
return true;
Expand All @@ -309,12 +309,14 @@ void Environment::CheckImmediate(uv_check_t* handle) {

env->RunAndClearNativeImmediates();

MakeCallback(env->isolate(),
env->process_object(),
env->immediate_callback_function(),
0,
nullptr,
{0, 0}).ToLocalChecked();
do {
MakeCallback(env->isolate(),
env->process_object(),
env->immediate_callback_function(),
0,
nullptr,
{0, 0}).ToLocalChecked();
} while (env->immediate_info()->has_outstanding());

MaybeStopImmediate(env);
}
Expand Down
29 changes: 26 additions & 3 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,30 @@ class Environment {
DISALLOW_COPY_AND_ASSIGN(AsyncCallbackScope);
};

class ImmediateInfo {
public:
inline AliasedBuffer<uint32_t, v8::Uint32Array>& fields();
inline uint32_t count() const;
inline bool has_outstanding() const;

inline void count_inc(uint32_t increment);
inline void count_dec(uint32_t decrement);

private:
friend class Environment; // So we can call the constructor.
inline explicit ImmediateInfo(v8::Isolate* isolate);

enum Fields {
kCount,
kHasOutstanding,
kFieldsCount
};

AliasedBuffer<uint32_t, v8::Uint32Array> fields_;

DISALLOW_COPY_AND_ASSIGN(ImmediateInfo);
};

class TickInfo {
public:
inline AliasedBuffer<uint8_t, v8::Uint8Array>& fields();
Expand Down Expand Up @@ -532,6 +556,7 @@ class Environment {
inline void FinishHandleCleanup(uv_handle_t* handle);

inline AsyncHooks* async_hooks();
inline ImmediateInfo* immediate_info();
inline TickInfo* tick_info();
inline uint64_t timer_base() const;

Expand Down Expand Up @@ -582,8 +607,6 @@ class Environment {
inline double* fs_stats_field_array() const;
inline void set_fs_stats_field_array(double* fields);

inline AliasedBuffer<uint32_t, v8::Uint32Array>& scheduled_immediate_count();

inline performance::performance_state* performance_state();
inline std::map<std::string, uint64_t>* performance_marks();

Expand Down Expand Up @@ -704,6 +727,7 @@ class Environment {
uv_check_t idle_check_handle_;

AsyncHooks async_hooks_;
ImmediateInfo immediate_info_;
TickInfo tick_info_;
const uint64_t timer_base_;
bool using_domains_;
Expand All @@ -714,7 +738,6 @@ class Environment {
size_t makecallback_cntr_;
std::vector<double> destroy_async_id_list_;

AliasedBuffer<uint32_t, v8::Uint32Array> scheduled_immediate_count_;
AliasedBuffer<uint32_t, v8::Uint32Array> should_abort_on_uncaught_toggle_;

int should_not_abort_scope_counter_ = 0;
Expand Down
5 changes: 3 additions & 2 deletions src/timer_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ class TimerWrap : public HandleWrap {
env->NewFunctionTemplate(activate_cb)->GetFunction(env->context())
.ToLocalChecked();
auto result = Array::New(env->isolate(), 2);
result->Set(0, activate_function);
result->Set(1, env->scheduled_immediate_count().GetJSArray());
result->Set(env->context(), 0, activate_function).FromJust();
result->Set(env->context(), 1,
env->immediate_info()->fields().GetJSArray()).FromJust();
args.GetReturnValue().Set(result);
}

Expand Down
53 changes: 53 additions & 0 deletions test/parallel/test-timers-immediate-queue-throw.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const domain = require('domain');

// setImmediate should run clear its queued cbs once per event loop turn
// but immediates queued while processing the current queue should happen
// on the next turn of the event loop.

// In addition, if any setImmediate throws, the rest of the queue should
// be processed after all error handling is resolved, but that queue
// should not include any setImmediate calls scheduled after the
// processing of the queue started.

let threw = false;
let stage = -1;

const QUEUE = 10;

const errObj = {
type: Error,
message: 'setImmediate Err'
};

process.once('uncaughtException', common.expectsError(errObj));
process.once('uncaughtException', () => assert.strictEqual(stage, 0));

const d1 = domain.create();
d1.once('error', common.expectsError(errObj));
d1.once('error', () => assert.strictEqual(stage, 0));

const run = common.mustCall((callStage) => {
assert(callStage >= stage);
stage = callStage;
if (threw)
return;

setImmediate(run, 2);
}, QUEUE * 3);

for (let i = 0; i < QUEUE; i++)
setImmediate(run, 0);
setImmediate(() => {
threw = true;
process.nextTick(() => assert.strictEqual(stage, 1));
throw new Error('setImmediate Err');
});
d1.run(() => setImmediate(() => {
throw new Error('setImmediate Err');
}));
for (let i = 0; i < QUEUE; i++)
setImmediate(run, 1);