Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(profiling): endpoint profiling for stack v2 [backport-2.13] #10910

Open
wants to merge 1 commit into
base: 2.13
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ if(NOT echion_POPULATED)
endif()

# Specify the target C-extension that we want to build
add_library(${EXTENSION_NAME} SHARED
src/sampler.cpp
src/stack_renderer.cpp
src/stack_v2.cpp
)
add_library(${EXTENSION_NAME} SHARED src/sampler.cpp src/stack_renderer.cpp src/stack_v2.cpp src/thread_span_links.cpp)

# Add common config
add_ddup_config(${EXTENSION_NAME})
Expand Down
12 changes: 12 additions & 0 deletions ddtrace/internal/datadog/profiling/stack_v2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,21 @@


try:
import typing

from ddtrace._trace import context
from ddtrace._trace import span as ddspan

from ._stack_v2 import * # noqa: F403, F401

is_available = True

def link_span(span: typing.Optional[typing.Union[context.Context, ddspan.Span]]):
if isinstance(span, ddspan.Span):
span_id = span.span_id
local_root_span_id = span._local_root.span_id
local_root_span_type = span._local_root.span_type
_stack_v2.link_span(span_id, local_root_span_id, local_root_span_type) # type: ignore # noqa: F405

except Exception as e:
failure_msg = str(e)
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#pragma once

#include <memory>
#include <mutex>
#include <stdint.h>
#include <string>
#include <unordered_map>

namespace Datadog {

struct Span
{
uint64_t span_id;
uint64_t local_root_span_id;
std::string span_type;

Span(uint64_t _span_id, uint64_t _local_root_span_id, std::string _span_type)
: span_id(_span_id)
, local_root_span_id(_local_root_span_id)
, span_type(_span_type)
{
}
};

class ThreadSpanLinks
{
public:
static ThreadSpanLinks& get_instance()
{
static ThreadSpanLinks instance;
return instance;
}

// Delete Copy constructor and assignment operator to prevent copies
ThreadSpanLinks(ThreadSpanLinks const&) = delete;
ThreadSpanLinks& operator=(ThreadSpanLinks const&) = delete;

void link_span(uint64_t thread_id, uint64_t span_id, uint64_t local_root_span_id, std::string span_type);

const Span* get_active_span_from_thread_id(uint64_t thread_id);

static void postfork_child();

private:
std::mutex mtx;
std::unordered_map<uint64_t, std::unique_ptr<Span>> thread_id_to_span;

// Private Constructor/Destructor
ThreadSpanLinks() = default;
~ThreadSpanLinks() = default;

void reset();
};

}
3 changes: 3 additions & 0 deletions ddtrace/internal/datadog/profiling/stack_v2/src/sampler.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "sampler.hpp"

#include "thread_span_links.hpp"

#include "echion/interp.h"
#include "echion/tasks.h"
#include "echion/threads.h"
Expand Down Expand Up @@ -64,6 +66,7 @@ _stack_v2_atfork_child()
// The only thing we need to do at fork is to propagate the PID to echion
// so we don't even reveal this function to the user
_set_pid(getpid());
ThreadSpanLinks::postfork_child();
}

__attribute__((constructor)) void
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include "stack_renderer.hpp"

#include "thread_span_links.hpp"
#include "utf8_validate.hpp"

using namespace Datadog;
Expand Down Expand Up @@ -52,6 +54,13 @@ StackRenderer::render_thread_begin(PyThreadState* tstate,
// Finalize the thread information we have
ddup_push_threadinfo(sample, static_cast<int64_t>(thread_id), static_cast<int64_t>(native_id), name);
ddup_push_walltime(sample, thread_state.wall_time_ns, 1);

const Span* active_span = ThreadSpanLinks::get_instance().get_active_span_from_thread_id(thread_id);
if (active_span != nullptr) {
ddup_push_span_id(sample, active_span->span_id);
ddup_push_local_root_span_id(sample, active_span->local_root_span_id);
ddup_push_trace_type(sample, std::string_view(active_span->span_type));
}
}

void
Expand Down
36 changes: 36 additions & 0 deletions ddtrace/internal/datadog/profiling/stack_v2/src/stack_v2.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "cast_to_pyfunc.hpp"
#include "python_headers.hpp"
#include "sampler.hpp"
#include "thread_span_links.hpp"

#include <mutex>
#include <unordered_map>
Expand Down Expand Up @@ -83,12 +84,47 @@ stack_v2_thread_unregister(PyObject* self, PyObject* args)
Py_RETURN_NONE;
}

static PyObject*
_stack_v2_link_span(PyObject* self, PyObject* args, PyObject* kwargs)
{
(void)self;
uint64_t thread_id;
uint64_t span_id;
uint64_t local_root_span_id;
const char* span_type;

PyThreadState* state = PyThreadState_Get();

if (!state) {
return NULL;
}

thread_id = state->thread_id;

static const char* const_kwlist[] = { "span_id", "local_root_span_id", "span_type", NULL };
static char** kwlist = const_cast<char**>(const_kwlist);

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "KKs", kwlist, &span_id, &local_root_span_id, &span_type)) {
return NULL;
}

ThreadSpanLinks::get_instance().link_span(thread_id, span_id, local_root_span_id, std::string(span_type));

Py_RETURN_NONE;
}

PyCFunction stack_v2_link_span = cast_to_pycfunction(_stack_v2_link_span);

static PyMethodDef _stack_v2_methods[] = {
{ "start", reinterpret_cast<PyCFunction>(stack_v2_start), METH_VARARGS | METH_KEYWORDS, "Start the sampler" },
{ "stop", stack_v2_stop, METH_VARARGS, "Stop the sampler" },
{ "register_thread", stack_v2_thread_register, METH_VARARGS, "Register a thread" },
{ "unregister_thread", stack_v2_thread_unregister, METH_VARARGS, "Unregister a thread" },
{ "set_interval", stack_v2_set_interval, METH_VARARGS, "Set the sampling interval" },
{ "link_span",
reinterpret_cast<PyCFunction>(stack_v2_link_span),
METH_VARARGS | METH_KEYWORDS,
"Link a span to a thread" },
{ NULL, NULL, 0, NULL }
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#include "thread_span_links.hpp"

#include <iostream>
#include <mutex>
#include <stdint.h>
#include <string>

namespace Datadog {
void
ThreadSpanLinks::link_span(uint64_t thread_id, uint64_t span_id, uint64_t local_root_span_id, std::string span_type)
{
std::lock_guard<std::mutex> lock(mtx);

if (thread_id_to_span.find(thread_id) == thread_id_to_span.end()) {
thread_id_to_span[thread_id] = std::make_unique<Span>(span_id, local_root_span_id, span_type);
}
thread_id_to_span[thread_id]->span_id = span_id;
thread_id_to_span[thread_id]->local_root_span_id = local_root_span_id;
thread_id_to_span[thread_id]->span_type = span_type;
}

const Span*
ThreadSpanLinks::get_active_span_from_thread_id(uint64_t thread_id)
{
std::lock_guard<std::mutex> lock(mtx);

if (thread_id_to_span.find(thread_id) == thread_id_to_span.end()) {
return nullptr;
}
return thread_id_to_span[thread_id].get();
}

void
ThreadSpanLinks::reset()
{
std::lock_guard<std::mutex> lock(mtx);
thread_id_to_span.clear();
}

void
ThreadSpanLinks::postfork_child()
{
// Explicitly destroy and reconstruct the mutex to avoid undefined behavior
get_instance().mtx.~mutex();
new (&get_instance().mtx) std::mutex();

get_instance().reset();
}

} // namespace Datadog
6 changes: 4 additions & 2 deletions ddtrace/profiling/collector/stack.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,8 @@ class StackCollector(collector.PeriodicCollector):
self._last_wall_time = compat.monotonic_ns()
if self.tracer is not None:
self._thread_span_links = _ThreadSpanLinks()
self.tracer.context_provider._on_activate(self._thread_span_links.link_span)
link_span = stack_v2.link_span if self._stack_collector_v2_enabled else self._thread_span_links.link_span
self.tracer.context_provider._on_activate(link_span)

# If libdd is enabled, propagate the configuration
if config.export.libdd_enabled:
Expand All @@ -550,7 +551,8 @@ class StackCollector(collector.PeriodicCollector):
LOG.debug("Profiling StackCollector stopping")
super(StackCollector, self)._stop_service()
if self.tracer is not None:
self.tracer.context_provider._deregister_on_activate(self._thread_span_links.link_span)
link_span = stack_v2.link_span if self._stack_collector_v2_enabled else self._thread_span_links.link_span
self.tracer.context_provider._deregister_on_activate(link_span)
LOG.debug("Profiling StackCollector stopped")

# Also tell the native thread running the v2 sampler to stop, if needed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
fixes:
- |
profiling: enables endpoint profiling for stack v2, ``DD_PROFILING_STACK_V2_ENABLED``
is set.

Loading