diff --git a/ddtrace/internal/datadog/profiling/stack_v2/CMakeLists.txt b/ddtrace/internal/datadog/profiling/stack_v2/CMakeLists.txt index 4df5ce5e7b2..f31639fabd7 100644 --- a/ddtrace/internal/datadog/profiling/stack_v2/CMakeLists.txt +++ b/ddtrace/internal/datadog/profiling/stack_v2/CMakeLists.txt @@ -38,11 +38,7 @@ if(NOT echion_POPULATED) endif() # Specify the target C-extension that we want to build -add_library(${EXTENSION_NAME} SHARED - src/sampler.cpp - src/stack_renderer.cpp - src/stack_v2.cpp -) +add_library(${EXTENSION_NAME} SHARED src/sampler.cpp src/stack_renderer.cpp src/stack_v2.cpp src/thread_span_links.cpp) # Add common config add_ddup_config(${EXTENSION_NAME}) diff --git a/ddtrace/internal/datadog/profiling/stack_v2/__init__.py b/ddtrace/internal/datadog/profiling/stack_v2/__init__.py index 399906e115d..677406f5be5 100644 --- a/ddtrace/internal/datadog/profiling/stack_v2/__init__.py +++ b/ddtrace/internal/datadog/profiling/stack_v2/__init__.py @@ -5,9 +5,21 @@ try: + import typing + + from ddtrace._trace import context + from ddtrace._trace import span as ddspan + from ._stack_v2 import * # noqa: F403, F401 is_available = True + def link_span(span: typing.Optional[typing.Union[context.Context, ddspan.Span]]): + if isinstance(span, ddspan.Span): + span_id = span.span_id + local_root_span_id = span._local_root.span_id + local_root_span_type = span._local_root.span_type + _stack_v2.link_span(span_id, local_root_span_id, local_root_span_type) # type: ignore # noqa: F405 + except Exception as e: failure_msg = str(e) diff --git a/ddtrace/internal/datadog/profiling/stack_v2/include/thread_span_links.hpp b/ddtrace/internal/datadog/profiling/stack_v2/include/thread_span_links.hpp new file mode 100644 index 00000000000..061cb123c03 --- /dev/null +++ b/ddtrace/internal/datadog/profiling/stack_v2/include/thread_span_links.hpp @@ -0,0 +1,55 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace Datadog { + +struct Span +{ + uint64_t span_id; + uint64_t local_root_span_id; + std::string span_type; + + Span(uint64_t _span_id, uint64_t _local_root_span_id, std::string _span_type) + : span_id(_span_id) + , local_root_span_id(_local_root_span_id) + , span_type(_span_type) + { + } +}; + +class ThreadSpanLinks +{ + public: + static ThreadSpanLinks& get_instance() + { + static ThreadSpanLinks instance; + return instance; + } + + // Delete Copy constructor and assignment operator to prevent copies + ThreadSpanLinks(ThreadSpanLinks const&) = delete; + ThreadSpanLinks& operator=(ThreadSpanLinks const&) = delete; + + void link_span(uint64_t thread_id, uint64_t span_id, uint64_t local_root_span_id, std::string span_type); + + const Span* get_active_span_from_thread_id(uint64_t thread_id); + + static void postfork_child(); + + private: + std::mutex mtx; + std::unordered_map> thread_id_to_span; + + // Private Constructor/Destructor + ThreadSpanLinks() = default; + ~ThreadSpanLinks() = default; + + void reset(); +}; + +} diff --git a/ddtrace/internal/datadog/profiling/stack_v2/src/sampler.cpp b/ddtrace/internal/datadog/profiling/stack_v2/src/sampler.cpp index b8fc23de739..29f08f66a42 100644 --- a/ddtrace/internal/datadog/profiling/stack_v2/src/sampler.cpp +++ b/ddtrace/internal/datadog/profiling/stack_v2/src/sampler.cpp @@ -1,5 +1,7 @@ #include "sampler.hpp" +#include "thread_span_links.hpp" + #include "echion/interp.h" #include "echion/tasks.h" #include "echion/threads.h" @@ -64,6 +66,7 @@ _stack_v2_atfork_child() // The only thing we need to do at fork is to propagate the PID to echion // so we don't even reveal this function to the user _set_pid(getpid()); + ThreadSpanLinks::postfork_child(); } __attribute__((constructor)) void diff --git a/ddtrace/internal/datadog/profiling/stack_v2/src/stack_renderer.cpp b/ddtrace/internal/datadog/profiling/stack_v2/src/stack_renderer.cpp index 9ee7a3813d0..dc1ac1239b9 100644 --- a/ddtrace/internal/datadog/profiling/stack_v2/src/stack_renderer.cpp +++ b/ddtrace/internal/datadog/profiling/stack_v2/src/stack_renderer.cpp @@ -1,4 +1,6 @@ #include "stack_renderer.hpp" + +#include "thread_span_links.hpp" #include "utf8_validate.hpp" using namespace Datadog; @@ -52,6 +54,13 @@ StackRenderer::render_thread_begin(PyThreadState* tstate, // Finalize the thread information we have ddup_push_threadinfo(sample, static_cast(thread_id), static_cast(native_id), name); ddup_push_walltime(sample, thread_state.wall_time_ns, 1); + + const Span* active_span = ThreadSpanLinks::get_instance().get_active_span_from_thread_id(thread_id); + if (active_span != nullptr) { + ddup_push_span_id(sample, active_span->span_id); + ddup_push_local_root_span_id(sample, active_span->local_root_span_id); + ddup_push_trace_type(sample, std::string_view(active_span->span_type)); + } } void diff --git a/ddtrace/internal/datadog/profiling/stack_v2/src/stack_v2.cpp b/ddtrace/internal/datadog/profiling/stack_v2/src/stack_v2.cpp index 161926374ba..62fa4b38b77 100644 --- a/ddtrace/internal/datadog/profiling/stack_v2/src/stack_v2.cpp +++ b/ddtrace/internal/datadog/profiling/stack_v2/src/stack_v2.cpp @@ -1,6 +1,7 @@ #include "cast_to_pyfunc.hpp" #include "python_headers.hpp" #include "sampler.hpp" +#include "thread_span_links.hpp" #include #include @@ -83,12 +84,47 @@ stack_v2_thread_unregister(PyObject* self, PyObject* args) Py_RETURN_NONE; } +static PyObject* +_stack_v2_link_span(PyObject* self, PyObject* args, PyObject* kwargs) +{ + (void)self; + uint64_t thread_id; + uint64_t span_id; + uint64_t local_root_span_id; + const char* span_type; + + PyThreadState* state = PyThreadState_Get(); + + if (!state) { + return NULL; + } + + thread_id = state->thread_id; + + static const char* const_kwlist[] = { "span_id", "local_root_span_id", "span_type", NULL }; + static char** kwlist = const_cast(const_kwlist); + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "KKs", kwlist, &span_id, &local_root_span_id, &span_type)) { + return NULL; + } + + ThreadSpanLinks::get_instance().link_span(thread_id, span_id, local_root_span_id, std::string(span_type)); + + Py_RETURN_NONE; +} + +PyCFunction stack_v2_link_span = cast_to_pycfunction(_stack_v2_link_span); + static PyMethodDef _stack_v2_methods[] = { { "start", reinterpret_cast(stack_v2_start), METH_VARARGS | METH_KEYWORDS, "Start the sampler" }, { "stop", stack_v2_stop, METH_VARARGS, "Stop the sampler" }, { "register_thread", stack_v2_thread_register, METH_VARARGS, "Register a thread" }, { "unregister_thread", stack_v2_thread_unregister, METH_VARARGS, "Unregister a thread" }, { "set_interval", stack_v2_set_interval, METH_VARARGS, "Set the sampling interval" }, + { "link_span", + reinterpret_cast(stack_v2_link_span), + METH_VARARGS | METH_KEYWORDS, + "Link a span to a thread" }, { NULL, NULL, 0, NULL } }; diff --git a/ddtrace/internal/datadog/profiling/stack_v2/src/thread_span_links.cpp b/ddtrace/internal/datadog/profiling/stack_v2/src/thread_span_links.cpp new file mode 100644 index 00000000000..602aad9fdb3 --- /dev/null +++ b/ddtrace/internal/datadog/profiling/stack_v2/src/thread_span_links.cpp @@ -0,0 +1,50 @@ +#include "thread_span_links.hpp" + +#include +#include +#include +#include + +namespace Datadog { +void +ThreadSpanLinks::link_span(uint64_t thread_id, uint64_t span_id, uint64_t local_root_span_id, std::string span_type) +{ + std::lock_guard lock(mtx); + + if (thread_id_to_span.find(thread_id) == thread_id_to_span.end()) { + thread_id_to_span[thread_id] = std::make_unique(span_id, local_root_span_id, span_type); + } + thread_id_to_span[thread_id]->span_id = span_id; + thread_id_to_span[thread_id]->local_root_span_id = local_root_span_id; + thread_id_to_span[thread_id]->span_type = span_type; +} + +const Span* +ThreadSpanLinks::get_active_span_from_thread_id(uint64_t thread_id) +{ + std::lock_guard lock(mtx); + + if (thread_id_to_span.find(thread_id) == thread_id_to_span.end()) { + return nullptr; + } + return thread_id_to_span[thread_id].get(); +} + +void +ThreadSpanLinks::reset() +{ + std::lock_guard lock(mtx); + thread_id_to_span.clear(); +} + +void +ThreadSpanLinks::postfork_child() +{ + // Explicitly destroy and reconstruct the mutex to avoid undefined behavior + get_instance().mtx.~mutex(); + new (&get_instance().mtx) std::mutex(); + + get_instance().reset(); +} + +} // namespace Datadog diff --git a/ddtrace/profiling/collector/stack.pyx b/ddtrace/profiling/collector/stack.pyx index b936c709853..f3758d13989 100644 --- a/ddtrace/profiling/collector/stack.pyx +++ b/ddtrace/profiling/collector/stack.pyx @@ -523,7 +523,8 @@ class StackCollector(collector.PeriodicCollector): self._last_wall_time = compat.monotonic_ns() if self.tracer is not None: self._thread_span_links = _ThreadSpanLinks() - self.tracer.context_provider._on_activate(self._thread_span_links.link_span) + link_span = stack_v2.link_span if self._stack_collector_v2_enabled else self._thread_span_links.link_span + self.tracer.context_provider._on_activate(link_span) # If libdd is enabled, propagate the configuration if config.export.libdd_enabled: @@ -550,7 +551,8 @@ class StackCollector(collector.PeriodicCollector): LOG.debug("Profiling StackCollector stopping") super(StackCollector, self)._stop_service() if self.tracer is not None: - self.tracer.context_provider._deregister_on_activate(self._thread_span_links.link_span) + link_span = stack_v2.link_span if self._stack_collector_v2_enabled else self._thread_span_links.link_span + self.tracer.context_provider._deregister_on_activate(link_span) LOG.debug("Profiling StackCollector stopped") # Also tell the native thread running the v2 sampler to stop, if needed diff --git a/releasenotes/notes/profiling-fix-stack-v2-endpoint-82a1e26366166b8d.yaml b/releasenotes/notes/profiling-fix-stack-v2-endpoint-82a1e26366166b8d.yaml new file mode 100644 index 00000000000..0505c26e550 --- /dev/null +++ b/releasenotes/notes/profiling-fix-stack-v2-endpoint-82a1e26366166b8d.yaml @@ -0,0 +1,6 @@ +--- +fixes: + - | + profiling: enables endpoint profiling for stack v2, ``DD_PROFILING_STACK_V2_ENABLED`` + is set. + diff --git a/tests/profiling_v2/collector/test_stack.py b/tests/profiling_v2/collector/test_stack.py index da5a9078642..4a0514bcc3c 100644 --- a/tests/profiling_v2/collector/test_stack.py +++ b/tests/profiling_v2/collector/test_stack.py @@ -72,7 +72,62 @@ def test_push_span(): from ddtrace.profiling.collector import stack from tests.profiling.collector import pprof_utils - test_name = "test_collect_span_id" + test_name = "test_push_span" + pprof_prefix = "/tmp/" + test_name + output_filename = pprof_prefix + "." + str(os.getpid()) + + assert ddup.is_available + ddup.config(env="test", service=test_name, version="my_version", output_filename=pprof_prefix) + ddup.start() + + tracer._endpoint_call_counter_span_processor.enable() + + resource = str(uuid.uuid4()) + span_type = ext.SpanTypes.WEB + + with stack.StackCollector( + None, + tracer=tracer, + endpoint_collection_enabled=True, + ignore_profiler=True, # this is not necessary, but it's here to trim samples + ): + with tracer.trace("foobar", resource=resource, span_type=span_type) as span: + span_id = span.span_id + local_root_span_id = span._local_root.span_id + for _ in range(5): + time.sleep(0.01) + ddup.upload() + + profile = pprof_utils.parse_profile(output_filename) + samples = pprof_utils.get_samples_with_label_key(profile, "span id") + assert len(samples) > 0 + for sample in samples: + pprof_utils.assert_stack_event( + profile, + sample, + expected_event=pprof_utils.StackEvent( + span_id=span_id, + local_root_span_id=local_root_span_id, + trace_type=span_type, + trace_endpoint=resource, + ), + ) + + +# pytest.mark.subprocess doesn't support parametrize, so duplicate code here +@pytest.mark.subprocess(env=dict(DD_PROFILING_STACK_V2_ENABLED="true")) +def test_push_span_v2(): + import os + import time + import uuid + + from ddtrace import ext + from ddtrace import tracer + from ddtrace.internal.datadog.profiling import ddup + from ddtrace.profiling.collector import stack + from tests.profiling.collector import pprof_utils + + test_name = "test_push_span_v2" pprof_prefix = "/tmp/" + test_name output_filename = pprof_prefix + "." + str(os.getpid()) @@ -126,7 +181,7 @@ def test_push_non_web_span(): from ddtrace.profiling.collector import stack from tests.profiling.collector import pprof_utils - test_name = "test_collect_span_id" + test_name = "test_push_non_web_span" pprof_prefix = "/tmp/" + test_name output_filename = pprof_prefix + "." + str(os.getpid())