Skip to content

Commit

Permalink
Add unsafe api calls checker to track down issues such as #4195
Browse files Browse the repository at this point in the history
This checker is used to detect accidental thread scheduling switching
points happening during profiling sampling.

See the bigger comment in unsafe_api_calls_check.h .

I was able to check that this checker correctly triggers for the bug
in #4195, and also the bug I'm going to fix next, which is the
use of `rb_hash_lookup` in the otel context reading code.
  • Loading branch information
ivoanjo authored and quinna-h committed Jan 8, 2025
1 parent 2e10ee4 commit 1f4afdc
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 18 deletions.
78 changes: 70 additions & 8 deletions ext/datadog_profiling_native_extension/collectors_thread_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "private_vm_api_access.h"
#include "stack_recorder.h"
#include "time_helpers.h"
#include "unsafe_api_calls_check.h"

// Used to trigger sampling of threads, based on external "events", such as:
// * periodic timer for cpu-time and wall-time
Expand Down Expand Up @@ -203,10 +204,10 @@ static int hash_map_per_thread_context_mark(st_data_t key_thread, st_data_t _val
static int hash_map_per_thread_context_free_values(st_data_t _thread, st_data_t value_per_thread_context, st_data_t _argument);
static VALUE _native_new(VALUE klass);
static VALUE _native_initialize(int argc, VALUE *argv, DDTRACE_UNUSED VALUE _self);
static VALUE _native_sample(VALUE self, VALUE collector_instance, VALUE profiler_overhead_stack_thread);
static VALUE _native_sample(VALUE self, VALUE collector_instance, VALUE profiler_overhead_stack_thread, VALUE allow_exception);
static VALUE _native_on_gc_start(VALUE self, VALUE collector_instance);
static VALUE _native_on_gc_finish(VALUE self, VALUE collector_instance);
static VALUE _native_sample_after_gc(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE reset_monotonic_to_system_state);
static VALUE _native_sample_after_gc(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE reset_monotonic_to_system_state, VALUE allow_exception);
static void update_metrics_and_sample(
struct thread_context_collector_state *state,
VALUE thread_being_sampled,
Expand Down Expand Up @@ -310,11 +311,11 @@ void collectors_thread_context_init(VALUE profiling_module) {
rb_define_singleton_method(collectors_thread_context_class, "_native_initialize", _native_initialize, -1);
rb_define_singleton_method(collectors_thread_context_class, "_native_inspect", _native_inspect, 1);
rb_define_singleton_method(collectors_thread_context_class, "_native_reset_after_fork", _native_reset_after_fork, 1);
rb_define_singleton_method(testing_module, "_native_sample", _native_sample, 2);
rb_define_singleton_method(testing_module, "_native_sample", _native_sample, 3);
rb_define_singleton_method(testing_module, "_native_sample_allocation", _native_sample_allocation, 3);
rb_define_singleton_method(testing_module, "_native_on_gc_start", _native_on_gc_start, 1);
rb_define_singleton_method(testing_module, "_native_on_gc_finish", _native_on_gc_finish, 1);
rb_define_singleton_method(testing_module, "_native_sample_after_gc", _native_sample_after_gc, 2);
rb_define_singleton_method(testing_module, "_native_sample_after_gc", _native_sample_after_gc, 3);
rb_define_singleton_method(testing_module, "_native_thread_list", _native_thread_list, 0);
rb_define_singleton_method(testing_module, "_native_per_thread_context", _native_per_thread_context, 1);
rb_define_singleton_method(testing_module, "_native_stats", _native_stats, 1);
Expand Down Expand Up @@ -504,31 +505,49 @@ static VALUE _native_initialize(int argc, VALUE *argv, DDTRACE_UNUSED VALUE _sel

// This method exists only to enable testing Datadog::Profiling::Collectors::ThreadContext behavior using RSpec.
// It SHOULD NOT be used for other purposes.
static VALUE _native_sample(DDTRACE_UNUSED VALUE _self, VALUE collector_instance, VALUE profiler_overhead_stack_thread) {
static VALUE _native_sample(DDTRACE_UNUSED VALUE _self, VALUE collector_instance, VALUE profiler_overhead_stack_thread, VALUE allow_exception) {
ENFORCE_BOOLEAN(allow_exception);

if (!is_thread_alive(profiler_overhead_stack_thread)) rb_raise(rb_eArgError, "Unexpected: profiler_overhead_stack_thread is not alive");

if (allow_exception == Qfalse) debug_enter_unsafe_context();

thread_context_collector_sample(collector_instance, monotonic_wall_time_now_ns(RAISE_ON_FAILURE), profiler_overhead_stack_thread);

if (allow_exception == Qfalse) debug_leave_unsafe_context();

return Qtrue;
}

// This method exists only to enable testing Datadog::Profiling::Collectors::ThreadContext behavior using RSpec.
// It SHOULD NOT be used for other purposes.
static VALUE _native_on_gc_start(DDTRACE_UNUSED VALUE self, VALUE collector_instance) {
debug_enter_unsafe_context();

thread_context_collector_on_gc_start(collector_instance);

debug_leave_unsafe_context();

return Qtrue;
}

// This method exists only to enable testing Datadog::Profiling::Collectors::ThreadContext behavior using RSpec.
// It SHOULD NOT be used for other purposes.
static VALUE _native_on_gc_finish(DDTRACE_UNUSED VALUE self, VALUE collector_instance) {
debug_enter_unsafe_context();

(void) !thread_context_collector_on_gc_finish(collector_instance);

debug_leave_unsafe_context();

return Qtrue;
}

// This method exists only to enable testing Datadog::Profiling::Collectors::ThreadContext behavior using RSpec.
// It SHOULD NOT be used for other purposes.
static VALUE _native_sample_after_gc(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE reset_monotonic_to_system_state) {
static VALUE _native_sample_after_gc(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE reset_monotonic_to_system_state, VALUE allow_exception) {
ENFORCE_BOOLEAN(reset_monotonic_to_system_state);
ENFORCE_BOOLEAN(allow_exception);

struct thread_context_collector_state *state;
TypedData_Get_Struct(collector_instance, struct thread_context_collector_state, &thread_context_collector_typed_data, state);
Expand All @@ -537,7 +556,12 @@ static VALUE _native_sample_after_gc(DDTRACE_UNUSED VALUE self, VALUE collector_
state->time_converter_state = (monotonic_to_system_epoch_state) MONOTONIC_TO_SYSTEM_EPOCH_INITIALIZER;
}

if (allow_exception == Qfalse) debug_enter_unsafe_context();

thread_context_collector_sample_after_gc(collector_instance);

if (allow_exception == Qfalse) debug_leave_unsafe_context();

return Qtrue;
}

Expand Down Expand Up @@ -982,7 +1006,13 @@ static void trigger_sample_for_thread(
// It SHOULD NOT be used for other purposes.
static VALUE _native_thread_list(DDTRACE_UNUSED VALUE _self) {
VALUE result = rb_ary_new();

debug_enter_unsafe_context();

ddtrace_thread_list(result);

debug_leave_unsafe_context();

return result;
}

Expand Down Expand Up @@ -1501,7 +1531,12 @@ void thread_context_collector_sample_allocation(VALUE self_instance, unsigned in
// This method exists only to enable testing Datadog::Profiling::Collectors::ThreadContext behavior using RSpec.
// It SHOULD NOT be used for other purposes.
static VALUE _native_sample_allocation(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE sample_weight, VALUE new_object) {
debug_enter_unsafe_context();

thread_context_collector_sample_allocation(collector_instance, NUM2UINT(sample_weight), new_object);

debug_leave_unsafe_context();

return Qtrue;
}

Expand Down Expand Up @@ -1640,7 +1675,12 @@ void thread_context_collector_sample_skipped_allocation_samples(VALUE self_insta
}

static VALUE _native_sample_skipped_allocation_samples(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE skipped_samples) {
debug_enter_unsafe_context();

thread_context_collector_sample_skipped_allocation_samples(collector_instance, NUM2UINT(skipped_samples));

debug_leave_unsafe_context();

return Qtrue;
}

Expand Down Expand Up @@ -1979,31 +2019,53 @@ static uint64_t otel_span_id_to_uint(VALUE otel_span_id) {
static VALUE _native_on_gvl_waiting(DDTRACE_UNUSED VALUE self, VALUE thread) {
ENFORCE_THREAD(thread);

debug_enter_unsafe_context();

thread_context_collector_on_gvl_waiting(thread_from_thread_object(thread));

debug_leave_unsafe_context();

return Qnil;
}

static VALUE _native_gvl_waiting_at_for(DDTRACE_UNUSED VALUE self, VALUE thread) {
ENFORCE_THREAD(thread);

debug_enter_unsafe_context();

intptr_t gvl_waiting_at = gvl_profiling_state_thread_object_get(thread);

debug_leave_unsafe_context();

return LONG2NUM(gvl_waiting_at);
}

static VALUE _native_on_gvl_running(DDTRACE_UNUSED VALUE self, VALUE thread) {
ENFORCE_THREAD(thread);

return thread_context_collector_on_gvl_running(thread_from_thread_object(thread)) == ON_GVL_RUNNING_SAMPLE ? Qtrue : Qfalse;
debug_enter_unsafe_context();

VALUE result = thread_context_collector_on_gvl_running(thread_from_thread_object(thread)) == ON_GVL_RUNNING_SAMPLE ? Qtrue : Qfalse;

debug_leave_unsafe_context();

return result;
}

static VALUE _native_sample_after_gvl_running(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE thread) {
ENFORCE_THREAD(thread);

return thread_context_collector_sample_after_gvl_running(
debug_enter_unsafe_context();

VALUE result = thread_context_collector_sample_after_gvl_running(
collector_instance,
thread,
monotonic_wall_time_now_ns(RAISE_ON_FAILURE)
);

debug_leave_unsafe_context();

return result;
}

static VALUE _native_apply_delta_to_cpu_time_at_previous_sample_ns(DDTRACE_UNUSED VALUE self, VALUE collector_instance, VALUE thread, VALUE delta_ns) {
Expand Down
2 changes: 2 additions & 0 deletions ext/datadog_profiling_native_extension/profiling.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "ruby_helpers.h"
#include "setup_signal_handler.h"
#include "time_helpers.h"
#include "unsafe_api_calls_check.h"

// Each class/module here is implemented in their separate file
void collectors_cpu_and_wall_time_worker_init(VALUE profiling_module);
Expand Down Expand Up @@ -56,6 +57,7 @@ void DDTRACE_EXPORT Init_datadog_profiling_native_extension(void) {
collectors_thread_context_init(profiling_module);
http_transport_init(profiling_module);
stack_recorder_init(profiling_module);
unsafe_api_calls_check_init();

// Hosts methods used for testing the native code using RSpec
VALUE testing_module = rb_define_module_under(native_extension_module, "Testing");
Expand Down
47 changes: 47 additions & 0 deletions ext/datadog_profiling_native_extension/unsafe_api_calls_check.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#include <ruby.h>
#include <ruby/debug.h>
#include <stdbool.h>

#include "datadog_ruby_common.h"
#include "unsafe_api_calls_check.h"
#include "extconf.h"

static bool inside_unsafe_context = false;

#ifndef NO_POSTPONED_TRIGGER
static rb_postponed_job_handle_t check_for_unsafe_api_calls_handle;
#endif

static void check_for_unsafe_api_calls(DDTRACE_UNUSED void *_unused);

void unsafe_api_calls_check_init(void) {
#ifndef NO_POSTPONED_TRIGGER
int unused_flags = 0;

check_for_unsafe_api_calls_handle = rb_postponed_job_preregister(unused_flags, check_for_unsafe_api_calls, NULL);

if (check_for_unsafe_api_calls_handle == POSTPONED_JOB_HANDLE_INVALID) {
rb_raise(rb_eRuntimeError, "Failed to register check_for_unsafe_api_calls_handle postponed job (got POSTPONED_JOB_HANDLE_INVALID)");
}
#endif
}

void debug_enter_unsafe_context(void) {
inside_unsafe_context = true;

#ifndef NO_POSTPONED_TRIGGER
rb_postponed_job_trigger(check_for_unsafe_api_calls_handle);
#else
rb_postponed_job_register(0, check_for_unsafe_api_calls, NULL);
#endif
}

void debug_leave_unsafe_context(void) {
inside_unsafe_context = false;
}

static void check_for_unsafe_api_calls(DDTRACE_UNUSED void *_unused) {
if (inside_unsafe_context) rb_bug(
"Datadog Ruby profiler detected callback nested inside sample. Please report this at https://github.com/datadog/dd-trace-rb/blob/master/CONTRIBUTING.md#found-a-bug"
);
}
25 changes: 25 additions & 0 deletions ext/datadog_profiling_native_extension/unsafe_api_calls_check.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

// This checker is used to detect accidental thread scheduling switching points happening during profiling sampling.
//
// Specifically, when the profiler is sampling, we're never supposed to call into Ruby code (e.g. methods
// implemented using Ruby code) or allocate Ruby objects.
// That's because those events introduce thread switch points, and really we don't the VM switching between threads
// in the middle of the profiler sampling.
// This includes raising exceptions, unless we're trying to stop the profiler, and even then we must be careful.
//
// The above is especially true in situations such as GC profiling or allocation/heap profiling, as in those situations
// we can even crash the Ruby VM if we switch away at the wrong time.
//
// The below APIs can be used to detect these situations. They work by relying on the following observation:
// in most (all?) thread switch points, Ruby will check for interrupts and run the postponed jobs.
//
// Thus, if we set a flag while we're sampling (inside_unsafe_context), trigger the postponed job, and then only unset
// the flag after sampling, he correct thing to happen is that the postponed job should never see the flag.
//
// If, however, we have a bug and there's a thread switch point, our postponed job will see the flag and immediately
// stop the Ruby VM before further damage happens (and hopefully giving us a stack trace clearly pointing to the culprit).

void unsafe_api_calls_check_init(void);
void debug_enter_unsafe_context(void);
void debug_leave_unsafe_context(void);
2 changes: 1 addition & 1 deletion lib/datadog/core/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def safely_synchronize
rescue ThreadError => e
logger_without_components.error(
'Detected deadlock during datadog initialization. ' \
'Please report this at https://github.com/DataDog/dd-trace-rb/blob/master/CONTRIBUTING.md#found-a-bug' \
'Please report this at https://github.com/datadog/dd-trace-rb/blob/master/CONTRIBUTING.md#found-a-bug' \
"\n\tSource:\n\t#{Array(e.backtrace).join("\n\t")}"
)
nil
Expand Down
23 changes: 14 additions & 9 deletions spec/datadog/profiling/collectors/thread_context_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@
end
end

def sample(profiler_overhead_stack_thread: Thread.current)
described_class::Testing._native_sample(cpu_and_wall_time_collector, profiler_overhead_stack_thread)
def sample(profiler_overhead_stack_thread: Thread.current, allow_exception: false)
described_class::Testing._native_sample(cpu_and_wall_time_collector, profiler_overhead_stack_thread, allow_exception)
end

def on_gc_start
Expand All @@ -78,8 +78,12 @@ def on_gc_finish
described_class::Testing._native_on_gc_finish(cpu_and_wall_time_collector)
end

def sample_after_gc(reset_monotonic_to_system_state: false)
described_class::Testing._native_sample_after_gc(cpu_and_wall_time_collector, reset_monotonic_to_system_state)
def sample_after_gc(reset_monotonic_to_system_state: false, allow_exception: false)
described_class::Testing._native_sample_after_gc(
cpu_and_wall_time_collector,
reset_monotonic_to_system_state,
allow_exception,
)
end

def sample_allocation(weight:, new_object: Object.new)
Expand Down Expand Up @@ -782,18 +786,18 @@ def otel_span_id_to_i(span_id)
)
end

context 'raises an exception' do
context 'when an exception is raised' do
before { setup_failure }
after { expect(ran_log).to eq [:ran_code] }

it 'does not leave the exception pending' do
sample
sample(allow_exception: true)

expect($!).to be nil
end

it 'omits the "local root span id" and "span id" labels in the sample' do
sample
sample(allow_exception: true)

expect(t1_sample.labels.keys).to_not include(:"local root span id", :"span id")
end
Expand Down Expand Up @@ -1433,7 +1437,7 @@ def sample_and_check(expected_state:)

context "when called before on_gc_start/on_gc_finish" do
it do
expect { sample_after_gc }.to raise_error(RuntimeError, /Unexpected call to sample_after_gc/)
expect { sample_after_gc(allow_exception: true) }.to raise_error(RuntimeError, /Unexpected call to sample_after_gc/)
end
end

Expand All @@ -1451,7 +1455,8 @@ def sample_and_check(expected_state:)
it do
sample_after_gc

expect { sample_after_gc }.to raise_error(RuntimeError, /Unexpected call to sample_after_gc/)
expect { sample_after_gc(allow_exception: true) }
.to raise_error(RuntimeError, /Unexpected call to sample_after_gc/)
end
end

Expand Down

0 comments on commit 1f4afdc

Please sign in to comment.