From 77ddec8beea6eb7bbefe0d687ea592abb6fd3ad2 Mon Sep 17 00:00:00 2001 From: gary-huang Date: Thu, 19 Sep 2024 15:45:33 -0400 Subject: [PATCH 1/8] add APIs for llm obs --- dd-trace-api/build.gradle | 4 + .../java/datadog/trace/api/llmobs/LLMObs.java | 60 +++++++ .../datadog/trace/api/llmobs/LLMObsSpan.java | 147 ++++++++++++++++++ .../trace/api/llmobs/noop/NoOpLLMObsSpan.java | 61 ++++++++ .../llmobs/noop/NoOpLLMObsSpanFactory.java | 38 +++++ 5 files changed, 310 insertions(+) create mode 100644 dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObs.java create mode 100644 dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsSpan.java create mode 100644 dd-trace-api/src/main/java/datadog/trace/api/llmobs/noop/NoOpLLMObsSpan.java create mode 100644 dd-trace-api/src/main/java/datadog/trace/api/llmobs/noop/NoOpLLMObsSpanFactory.java diff --git a/dd-trace-api/build.gradle b/dd-trace-api/build.gradle index 7c25ddeb164..0bb0c8f6bf7 100644 --- a/dd-trace-api/build.gradle +++ b/dd-trace-api/build.gradle @@ -31,6 +31,10 @@ excludedClassesCoverage += [ 'datadog.trace.api.profiling.ProfilingScope', 'datadog.trace.api.profiling.ProfilingContext', 'datadog.trace.api.profiling.ProfilingContextAttribute.NoOp', + 'datadog.trace.api.llmobs.LLMObs', + 'datadog.trace.api.llmobs.LLMObsSpan', + 'datadog.trace.api.llmobs.noop.NoOpLLMObsSpan', + 'datadog.trace.api.llmobs.noop.NoOpLLMObsSpanFactory', 'datadog.trace.api.experimental.DataStreamsCheckpointer', 'datadog.trace.api.experimental.DataStreamsCheckpointer.NoOp', 'datadog.trace.api.experimental.DataStreamsContextCarrier', diff --git a/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObs.java b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObs.java new file mode 100644 index 00000000000..02eb79a7d79 --- /dev/null +++ b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObs.java @@ -0,0 +1,60 @@ +package datadog.trace.api.llmobs; + +import datadog.trace.api.llmobs.noop.NoOpLLMObsSpanFactory; +import javax.annotation.Nullable; + +public class LLMObs { + private static LLMObsSpanFactory SPAN_FACTORY = NoOpLLMObsSpanFactory.INSTANCE; + + public static LLMObsSpan startLLMSpan( + String spanName, + String modelName, + String modelProvider, + @Nullable String mlApp, + @Nullable String sessionID) { + + return SPAN_FACTORY.startLLMSpan(spanName, modelName, modelProvider, mlApp, sessionID); + } + + public static LLMObsSpan startAgentSpan( + String spanName, @Nullable String mlApp, @Nullable String sessionID) { + + return SPAN_FACTORY.startAgentSpan(spanName, mlApp, sessionID); + } + + public static LLMObsSpan startToolSpan( + String spanName, @Nullable String mlApp, @Nullable String sessionID) { + + return SPAN_FACTORY.startToolSpan(spanName, mlApp, sessionID); + } + + public static LLMObsSpan startTaskSpan( + String spanName, @Nullable String mlApp, @Nullable String sessionID) { + + return SPAN_FACTORY.startTaskSpan(spanName, mlApp, sessionID); + } + + public static LLMObsSpan startWorkflowSpan( + String spanName, @Nullable String mlApp, @Nullable String sessionID) { + + return SPAN_FACTORY.startWorkflowSpan(spanName, mlApp, sessionID); + } + + public interface LLMObsSpanFactory { + LLMObsSpan startLLMSpan( + String spanName, + String modelName, + String modelProvider, + @Nullable String mlApp, + @Nullable String sessionID); + + LLMObsSpan startAgentSpan(String spanName, @Nullable String mlApp, @Nullable String sessionID); + + LLMObsSpan startToolSpan(String spanName, @Nullable String mlApp, @Nullable String sessionID); + + LLMObsSpan startTaskSpan(String spanName, @Nullable String mlApp, @Nullable String sessionID); + + LLMObsSpan startWorkflowSpan( + String spanName, @Nullable String mlApp, @Nullable String sessionID); + } +} diff --git a/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsSpan.java b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsSpan.java new file mode 100644 index 00000000000..af5eb204937 --- /dev/null +++ b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsSpan.java @@ -0,0 +1,147 @@ +package datadog.trace.api.llmobs; + +import java.util.List; +import java.util.Map; + +/** This interface represent an individual LLM Obs span. */ +public interface LLMObsSpan { + + /** + * Annotate the span with inputs and outputs + * + * @param inputData The input data of the span in the form of a list, for example a list of input + * messages + * @param outputData The output data of the span in the form of a list, for example a list of + * output messages + */ + void annotateIO(List> inputData, List> outputData); + + /** + * Annotate the span with inputs and outputs + * + * @param inputData The input data of the span in the form of a string + * @param outputData The output data of the span in the form of a string + */ + void annotateIO(String inputData, String outputData); + + /** + * Annotate the span with metadata + * + * @param metadata A map of JSON serializable key-value pairs that contains metadata information + * relevant to the input or output operation described by the span + */ + void setMetadata(Map metadata); + + /** + * Annotate the span with metrics + * + * @param metrics A map of JSON serializable keys and numeric values that users can add as metrics + * relevant to the operation described by the span (input_tokens, output_tokens, total_tokens, + * etc.). + */ + void setMetrics(Map metrics); + + /** + * Annotate the span with a single metric key value pair for the span’s context (number of tokens + * document length, etc). + * + * @param key the name of the metric + * @param value the value of the metric + */ + void setMetric(CharSequence key, int value); + + /** + * Annotate the span with a single metric key value pair for the span’s context (number of tokens + * document length, etc). + * + * @param key the name of the metric + * @param value the value of the metric + */ + void setMetric(CharSequence key, long value); + + /** + * Annotate the span with a single metric key value pair for the span’s context (number of tokens + * document length, etc). + * + * @param key the name of the metric + * @param value the value of the metric + */ + void setMetric(CharSequence key, double value); + + /** + * Annotate the span with tags + * + * @param tags An map of JSON serializable key-value pairs that users can add as tags regarding + * the span’s context (session, environment, system, versioning, etc.). + */ + void setTags(Map tags); + + /** + * Annotate the span with a single tag key value pair as a tag regarding the span’s context + * (session, environment, system, versioning, etc.). + * + * @param key the key of the tag + * @param value the value of the tag + */ + void setTag(String key, String value); + + /** + * Annotate the span with a single tag key value pair as a tag regarding the span’s context + * (session, environment, system, versioning, etc.). + * + * @param key the key of the tag + * @param value the value of the tag + */ + void setTag(String key, boolean value); + + /** + * Annotate the span with a single tag key value pair as a tag regarding the span’s context + * (session, environment, system, versioning, etc.). + * + * @param key the key of the tag + * @param value the value of the tag + */ + void setTag(String key, int value); + + /** + * Annotate the span with a single tag key value pair as a tag regarding the span’s context + * (session, environment, system, versioning, etc.). + * + * @param key the key of the tag + * @param value the value of the tag + */ + void setTag(String key, long value); + + /** + * Annotate the span with a single tag key value pair as a tag regarding the span’s context + * (session, environment, system, versioning, etc.). + * + * @param key the key of the tag + * @param value the value of the tag + */ + void setTag(String key, double value); + + /** + * Annotate the span to indicate that an error occurred + * + * @param error whether an error occurred + */ + void setError(boolean error); + + /** + * Annotate the span with an error message + * + * @param errorMessage the message of the error + */ + void setErrorMessage(String errorMessage); + + /** + * Annotate the span with a throwable + * + * @param throwable the errored throwable + */ + void addThrowable(Throwable throwable); + + /** Finishes (closes) a span */ + void finish(); +} diff --git a/dd-trace-api/src/main/java/datadog/trace/api/llmobs/noop/NoOpLLMObsSpan.java b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/noop/NoOpLLMObsSpan.java new file mode 100644 index 00000000000..f6752dc92fa --- /dev/null +++ b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/noop/NoOpLLMObsSpan.java @@ -0,0 +1,61 @@ +package datadog.trace.api.llmobs.noop; + +import datadog.trace.api.llmobs.LLMObsSpan; +import java.util.List; +import java.util.Map; + +public class NoOpLLMObsSpan implements LLMObsSpan { + public static final LLMObsSpan INSTANCE = new NoOpLLMObsSpan(); + + @Override + public void annotateIO( + List> inputData, List> outputData) {} + + @Override + public void annotateIO(String inputData, String outputData) {} + + @Override + public void setMetadata(Map metadata) {} + + @Override + public void setMetrics(Map metrics) {} + + @Override + public void setMetric(CharSequence key, int value) {} + + @Override + public void setMetric(CharSequence key, long value) {} + + @Override + public void setMetric(CharSequence key, double value) {} + + @Override + public void setTags(Map tags) {} + + @Override + public void setTag(String key, String value) {} + + @Override + public void setTag(String key, boolean value) {} + + @Override + public void setTag(String key, int value) {} + + @Override + public void setTag(String key, long value) {} + + @Override + public void setTag(String key, double value) {} + + @Override + public void setError(boolean error) {} + + @Override + public void setErrorMessage(String errorMessage) {} + + @Override + public void addThrowable(Throwable throwable) {} + + @Override + public void finish() {} +} diff --git a/dd-trace-api/src/main/java/datadog/trace/api/llmobs/noop/NoOpLLMObsSpanFactory.java b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/noop/NoOpLLMObsSpanFactory.java new file mode 100644 index 00000000000..5f0071b1a3e --- /dev/null +++ b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/noop/NoOpLLMObsSpanFactory.java @@ -0,0 +1,38 @@ +package datadog.trace.api.llmobs.noop; + +import datadog.trace.api.llmobs.LLMObs; +import datadog.trace.api.llmobs.LLMObsSpan; +import javax.annotation.Nullable; + +public class NoOpLLMObsSpanFactory implements LLMObs.LLMObsSpanFactory { + public static final NoOpLLMObsSpanFactory INSTANCE = new NoOpLLMObsSpanFactory(); + + public LLMObsSpan startLLMSpan( + String spanName, + String modelName, + String modelProvider, + @Nullable String mlApp, + @Nullable String sessionID) { + return NoOpLLMObsSpan.INSTANCE; + } + + public LLMObsSpan startAgentSpan( + String spanName, @Nullable String mlApp, @Nullable String sessionID) { + return NoOpLLMObsSpan.INSTANCE; + } + + public LLMObsSpan startToolSpan( + String spanName, @Nullable String mlApp, @Nullable String sessionID) { + return NoOpLLMObsSpan.INSTANCE; + } + + public LLMObsSpan startTaskSpan( + String spanName, @Nullable String mlApp, @Nullable String sessionID) { + return NoOpLLMObsSpan.INSTANCE; + } + + public LLMObsSpan startWorkflowSpan( + String spanName, @Nullable String mlApp, @Nullable String sessionID) { + return NoOpLLMObsSpan.INSTANCE; + } +} From c98581939df4689db59f5d5277356f6c093104cb Mon Sep 17 00:00:00 2001 From: gary-huang Date: Thu, 19 Sep 2024 15:45:33 -0400 Subject: [PATCH 2/8] add APIs for llm obs --- dd-java-agent/agent-jmxfetch/integrations-core | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dd-java-agent/agent-jmxfetch/integrations-core b/dd-java-agent/agent-jmxfetch/integrations-core index 3189af0e0ae..5240f2a7cdc 160000 --- a/dd-java-agent/agent-jmxfetch/integrations-core +++ b/dd-java-agent/agent-jmxfetch/integrations-core @@ -1 +1 @@ -Subproject commit 3189af0e0ae840c9a4bab3131662c7fd6b0de7fb +Subproject commit 5240f2a7cdcabc6ae7787b9191b9189438671f3e From 9cb3e0aad8836c441b8f9c0159478066221134f7 Mon Sep 17 00:00:00 2001 From: gary-huang Date: Mon, 27 Jan 2025 14:21:50 -0500 Subject: [PATCH 3/8] impl llmobs agent and llmobs apis --- .../communication/BackendApiFactory.java | 1 + .../java/datadog/trace/bootstrap/Agent.java | 25 +- dd-java-agent/agent-llmobs/build.gradle | 60 ++++ .../datadog/trace/llmobs/LLMObsServices.java | 22 ++ .../datadog/trace/llmobs/LLMObsSystem.java | 100 ++++++ .../trace/llmobs/domain/DDLLMObsSpan.java | 293 ++++++++++++++++++ .../trace/llmobs/domain/LLMObsInternal.java | 10 + .../llmobs/domain/DDLLMObsSpanTest.groovy | 213 +++++++++++++ dd-java-agent/build.gradle | 1 + .../java/datadog/trace/api/DDSpanTypes.java | 3 + .../java/datadog/trace/api/llmobs/LLMObs.java | 2 +- .../datadog/trace/api/llmobs/LLMObsTags.java | 15 + .../main/java/datadog/trace/api/Config.java | 28 ++ .../bootstrap/instrumentation/api/Tags.java | 6 + settings.gradle | 3 + 15 files changed, 780 insertions(+), 2 deletions(-) create mode 100644 dd-java-agent/agent-llmobs/build.gradle create mode 100644 dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsServices.java create mode 100644 dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsSystem.java create mode 100644 dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java create mode 100644 dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/LLMObsInternal.java create mode 100644 dd-java-agent/agent-llmobs/src/test/groovy/datadog/trace/llmobs/domain/DDLLMObsSpanTest.groovy create mode 100644 dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsTags.java diff --git a/communication/src/main/java/datadog/communication/BackendApiFactory.java b/communication/src/main/java/datadog/communication/BackendApiFactory.java index bebb7b42828..f3382792baa 100644 --- a/communication/src/main/java/datadog/communication/BackendApiFactory.java +++ b/communication/src/main/java/datadog/communication/BackendApiFactory.java @@ -72,6 +72,7 @@ private HttpUrl getAgentlessUrl(Intake intake) { public enum Intake { API("api", "v2", Config::isCiVisibilityAgentlessEnabled, Config::getCiVisibilityAgentlessUrl), + LLMOBS_API("api", "v2", Config::isLlmObsAgentlessEnabled, Config::getLlMObsAgentlessUrl), LOGS( "http-intake.logs", "v2", diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java index 4d9e7b10338..7b4ae97135b 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java @@ -23,6 +23,7 @@ import datadog.trace.api.config.GeneralConfig; import datadog.trace.api.config.IastConfig; import datadog.trace.api.config.JmxFetchConfig; +import datadog.trace.api.config.LlmObsConfig; import datadog.trace.api.config.ProfilingConfig; import datadog.trace.api.config.RemoteConfigConfig; import datadog.trace.api.config.TraceInstrumentationConfig; @@ -109,7 +110,8 @@ private enum AgentFeature { false), DATA_JOBS(propertyNameToSystemPropertyName(GeneralConfig.DATA_JOBS_ENABLED), false), AGENTLESS_LOG_SUBMISSION( - propertyNameToSystemPropertyName(GeneralConfig.AGENTLESS_LOG_SUBMISSION_ENABLED), false); + propertyNameToSystemPropertyName(GeneralConfig.AGENTLESS_LOG_SUBMISSION_ENABLED), false), + LLMOBS(propertyNameToSystemPropertyName(LlmObsConfig.LLMOBS_ENABLED), false); private final String systemProp; private final boolean enabledByDefault; @@ -150,6 +152,7 @@ public boolean isEnabledByDefault() { private static boolean iastFullyDisabled; private static boolean cwsEnabled = false; private static boolean ciVisibilityEnabled = false; + private static boolean llmObsEnabled = false; private static boolean usmEnabled = false; private static boolean telemetryEnabled = true; private static boolean debuggerEnabled = false; @@ -268,6 +271,7 @@ public static void start( exceptionDebuggingEnabled = isFeatureEnabled(AgentFeature.EXCEPTION_DEBUGGING); spanOriginEnabled = isFeatureEnabled(AgentFeature.SPAN_ORIGIN); agentlessLogSubmissionEnabled = isFeatureEnabled(AgentFeature.AGENTLESS_LOG_SUBMISSION); + llmObsEnabled = isFeatureEnabled(AgentFeature.LLMOBS); if (profilingEnabled) { if (!isOracleJDK8()) { @@ -556,6 +560,7 @@ public void execute() { maybeStartAppSec(scoClass, sco); maybeStartCiVisibility(instrumentation, scoClass, sco); + maybeStartLLMObs(instrumentation, scoClass, sco); // start debugger before remote config to subscribe to it before starting to poll maybeStartDebugger(instrumentation, scoClass, sco); maybeStartRemoteConfig(scoClass, sco); @@ -903,6 +908,24 @@ private static void maybeStartCiVisibility(Instrumentation inst, Class scoCla } } + private static void maybeStartLLMObs(Instrumentation inst, Class scoClass, Object sco) { + if (llmObsEnabled) { + StaticEventLogger.begin("LLM Observability"); + + try { + final Class llmObsSysClass = + AGENT_CLASSLOADER.loadClass("datadog.trace.llmobs.LLMObsSystem"); + final Method llmObsInstallerMethod = + llmObsSysClass.getMethod("start", Instrumentation.class, scoClass); + llmObsInstallerMethod.invoke(null, inst, sco); + } catch (final Throwable e) { + log.warn("Not starting LLM Observability subsystem", e); + } + + StaticEventLogger.end("LLM Observability"); + } + } + private static void maybeInstallLogsIntake(Class scoClass, Object sco) { if (agentlessLogSubmissionEnabled) { StaticEventLogger.begin("Logs Intake"); diff --git a/dd-java-agent/agent-llmobs/build.gradle b/dd-java-agent/agent-llmobs/build.gradle new file mode 100644 index 00000000000..92329277890 --- /dev/null +++ b/dd-java-agent/agent-llmobs/build.gradle @@ -0,0 +1,60 @@ +buildscript { + repositories { + mavenCentral() + } + + dependencies { + classpath group: 'org.jetbrains.kotlin', name: 'kotlin-gradle-plugin', version: libs.versions.kotlin.get() + } +} + +plugins { + id 'com.github.johnrengelman.shadow' + id 'java-test-fixtures' +} + +apply from: "$rootDir/gradle/java.gradle" +apply from: "$rootDir/gradle/version.gradle" +apply from: "$rootDir/gradle/test-with-kotlin.gradle" +apply from: "$rootDir/gradle/test-with-scala.gradle" + +minimumBranchCoverage = 0.0 +minimumInstructionCoverage = 0.0 + +dependencies { + api libs.slf4j + + implementation libs.bundles.asm + implementation group: 'org.jacoco', name: 'org.jacoco.core', version: '0.8.12' + implementation group: 'org.jacoco', name: 'org.jacoco.report', version: '0.8.12' + + implementation project(':communication') + implementation project(':components:json') + implementation project(':internal-api') + implementation project(':internal-api:internal-api-9') + + testImplementation project(":utils:test-utils") + testImplementation("com.google.jimfs:jimfs:1.1") // an in-memory file system for testing code that works with files + + testImplementation libs.scala + testImplementation libs.kotlin + + testFixturesApi project(':dd-java-agent:testing') + testFixturesApi project(':utils:test-utils') + + testFixturesApi group: 'org.skyscreamer', name: 'jsonassert', version: '1.5.1' + testFixturesApi group: 'org.freemarker', name: 'freemarker', version: '2.3.30' + testFixturesApi group: 'com.jayway.jsonpath', name: 'json-path', version: '2.8.0' + testFixturesApi group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.16.0' + testFixturesApi group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.9.6' + + testRuntimeOnly("org.junit.platform:junit-platform-launcher:1.9.2") // Required to update dependency lock files +} + +shadowJar { + dependencies deps.excludeShared +} + +jar { + archiveClassifier = 'unbundled' +} diff --git a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsServices.java b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsServices.java new file mode 100644 index 00000000000..600ea4d545a --- /dev/null +++ b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsServices.java @@ -0,0 +1,22 @@ +package datadog.trace.llmobs; + +import datadog.communication.BackendApi; +import datadog.communication.BackendApiFactory; +import datadog.communication.ddagent.SharedCommunicationObjects; +import datadog.trace.api.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LLMObsServices { + + private static final Logger logger = LoggerFactory.getLogger(LLMObsServices.class); + + final Config config; + final BackendApi backendApi; + + LLMObsServices(Config config, SharedCommunicationObjects sco) { + this.config = config; + this.backendApi = + new BackendApiFactory(config, sco).createBackendApi(BackendApiFactory.Intake.LLMOBS_API); + } +} diff --git a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsSystem.java b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsSystem.java new file mode 100644 index 00000000000..fd9d3f3aca4 --- /dev/null +++ b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsSystem.java @@ -0,0 +1,100 @@ +package datadog.trace.llmobs; + +import datadog.communication.ddagent.SharedCommunicationObjects; +import datadog.trace.api.Config; +import datadog.trace.api.llmobs.LLMObs; +import datadog.trace.api.llmobs.LLMObsSpan; +import datadog.trace.api.llmobs.LLMObsTags; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import datadog.trace.llmobs.domain.DDLLMObsSpan; +import datadog.trace.llmobs.domain.LLMObsInternal; +import java.lang.instrument.Instrumentation; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LLMObsSystem { + + private static final Logger LOGGER = LoggerFactory.getLogger(LLMObsSystem.class); + + public static void start(Instrumentation inst, SharedCommunicationObjects sco) { + Config config = Config.get(); + if (!config.isLlmObsEnabled()) { + LOGGER.debug("LLM Observability is disabled"); + return; + } + + sco.createRemaining(config); + + LLMObsServices llmObsServices = new LLMObsServices(config, sco); + LLMObsInternal.setLLMObsSpanFactory( + new LLMObsManualSpanFactory( + config.getLlmObsMlApp(), config.getServiceName(), llmObsServices)); + } + + private static class LLMObsManualSpanFactory implements LLMObs.LLMObsSpanFactory { + + private final LLMObsServices llmObsServices; + private final String serviceName; + private final String defaultMLApp; + + public LLMObsManualSpanFactory( + String defaultMLApp, String serviceName, LLMObsServices llmObsServices) { + this.defaultMLApp = defaultMLApp; + this.llmObsServices = llmObsServices; + this.serviceName = serviceName; + } + + @Override + public LLMObsSpan startLLMSpan( + String spanName, + String modelName, + String modelProvider, + @Nullable String mlApp, + @Nullable String sessionID) { + + DDLLMObsSpan span = + new DDLLMObsSpan( + Tags.LLMOBS_LLM_SPAN_KIND, spanName, getMLApp(mlApp), sessionID, serviceName); + + span.setTag(LLMObsTags.MODEL_NAME, modelName); + span.setTag(LLMObsTags.MODEL_PROVIDER, modelProvider); + return span; + } + + @Override + public LLMObsSpan startAgentSpan( + String spanName, @Nullable String mlApp, @Nullable String sessionID) { + return new DDLLMObsSpan( + Tags.LLMOBS_AGENT_SPAN_KIND, spanName, getMLApp(mlApp), sessionID, serviceName); + } + + @Override + public LLMObsSpan startToolSpan( + String spanName, @Nullable String mlApp, @Nullable String sessionID) { + return new DDLLMObsSpan( + Tags.LLMOBS_TOOL_SPAN_KIND, spanName, getMLApp(mlApp), sessionID, serviceName); + } + + @Override + public LLMObsSpan startTaskSpan( + String spanName, @Nullable String mlApp, @Nullable String sessionID) { + return new DDLLMObsSpan( + Tags.LLMOBS_TASK_SPAN_KIND, spanName, getMLApp(mlApp), sessionID, serviceName); + } + + @Override + public LLMObsSpan startWorkflowSpan( + String spanName, @Nullable String mlApp, @Nullable String sessionID) { + return new DDLLMObsSpan( + Tags.LLMOBS_WORKFLOW_SPAN_KIND, spanName, getMLApp(mlApp), sessionID, serviceName); + } + + private String getMLApp(String mlApp) { + if (mlApp == null || mlApp.isEmpty()) { + return defaultMLApp; + } + return mlApp; + } + } +} diff --git a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java new file mode 100644 index 00000000000..35558e02987 --- /dev/null +++ b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java @@ -0,0 +1,293 @@ +package datadog.trace.llmobs.domain; + +import datadog.trace.api.DDSpanTypes; +import datadog.trace.api.llmobs.LLMObsSpan; +import datadog.trace.api.llmobs.LLMObsTags; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nonnull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DDLLMObsSpan implements LLMObsSpan { + + private enum State { + VALID, + INVALID_IO_MESSAGE_KEY + } + + private static final String MESSAGE_KEY_ROLE = "role"; + private static final String MESSAGE_KEY_CONTENT = "content"; + + private static final Set VALID_MESSAGE_KEYS = new HashSet<>(Arrays.asList(MESSAGE_KEY_ROLE, MESSAGE_KEY_CONTENT)); + + // Well known tags for LLM obs will be prefixed with _ml_obs_(tags|metrics). + // Prefix for tags + private static final String LLMOBS_TAG_PREFIX = "_ml_obs_tag."; + // Prefix for metrics + private static final String LLMOBS_METRIC_PREFIX = "_ml_obs_metric."; + + // internal tags to be prefixed + private static final String INPUT = LLMOBS_TAG_PREFIX + "input"; + private static final String OUTPUT = LLMOBS_TAG_PREFIX + "output"; + private static final String SPAN_KIND = LLMOBS_TAG_PREFIX + Tags.SPAN_KIND; + private static final String METADATA = LLMOBS_TAG_PREFIX + LLMObsTags.METADATA; + + private static final String LLM_OBS_INSTRUMENTATION_NAME = "llmobs"; + + private static final Logger LOGGER = LoggerFactory.getLogger(DDLLMObsSpan.class); + + private final AgentSpan span; + private final String spanKind; + + private boolean finished = false; + + public DDLLMObsSpan( + @Nonnull String kind, + String spanName, + @Nonnull String mlApp, + String sessionID, + @Nonnull String serviceName) { + + if (null == spanName || spanName.isEmpty()) { + spanName = kind; + } + + AgentTracer.SpanBuilder spanBuilder = + AgentTracer.get() + .buildSpan(LLM_OBS_INSTRUMENTATION_NAME, spanName) + .withServiceName(serviceName) + .withSpanType(DDSpanTypes.LLMOBS); + + this.span = spanBuilder.start(); + this.span.setTag(SPAN_KIND, kind); + this.spanKind = kind; + this.span.setTag(LLMOBS_TAG_PREFIX + LLMObsTags.ML_APP, mlApp); + if (sessionID != null && !sessionID.isEmpty()) { + this.span.setTag(LLMOBS_TAG_PREFIX + LLMObsTags.SESSION_ID, sessionID); + } + } + + @Override + public String toString() { + return super.toString() + + ", trace_id=" + + this.span.context().getTraceId() + + ", span_id=" + + this.span.context().getSpanId() + + ", ml_app=" + + this.span.getTag(LLMObsTags.ML_APP) + + ", service=" + + this.span.getServiceName() + + ", span_kind=" + + this.span.getTag(SPAN_KIND); + } + + private static State validateIOMessages(List> messages) { + for (Map message : messages) { + for (String key : message.keySet()) { + if (!VALID_MESSAGE_KEYS.contains(key)) { + return State.INVALID_IO_MESSAGE_KEY; + } + } + } + return State.VALID; + } + + @Override + public void annotateIO( + List> inputData, List> outputData) { + if (finished) { + return; + } + if (inputData != null && !inputData.isEmpty()) { + State inputState = validateIOMessages(inputData); + if (validateIOMessages(inputData) != State.VALID) { + LOGGER.debug("malformed/unexpected input message, state={}", inputState); + } + this.span.setTag(INPUT, inputData); + } + if (outputData != null && !outputData.isEmpty()) { + State outputState = validateIOMessages(outputData); + if (validateIOMessages(outputData) != State.VALID) { + LOGGER.debug("malformed/unexpected output message, state={}", outputState); + } + this.span.setTag(OUTPUT, outputData); + } + } + + @Override + public void annotateIO(String inputData, String outputData) { + if (finished) { + return; + } + if (inputData != null && !inputData.isEmpty()) { + if (Tags.LLMOBS_LLM_SPAN_KIND.equals(this.spanKind)) { + annotateIO(Collections.singletonList(Collections.singletonMap(MESSAGE_KEY_CONTENT, inputData)), null); + } else { + this.span.setTag(INPUT, inputData); + } + } + if (outputData != null && !outputData.isEmpty()) { + if (Tags.LLMOBS_LLM_SPAN_KIND.equals(this.spanKind)) { + annotateIO(null, Collections.singletonList(Collections.singletonMap(MESSAGE_KEY_CONTENT, outputData))); + } else { + this.span.setTag(OUTPUT, outputData); + } + } + } + + @Override + public void setMetadata(Map metadata) { + if (finished) { + return; + } + Object value = span.getTag(METADATA); + if (value == null) { + this.span.setTag(METADATA, new HashMap<>(metadata)); + return; + } + + if (value instanceof Map) { + ((Map) value).putAll(metadata); + } else { + LOGGER.debug("unexpected instance type for metadata {}, overwriting for now", value.getClass().getName()); + this.span.setTag(METADATA, new HashMap<>(metadata)); + } + } + + @Override + public void setMetrics(Map metrics) { + if (finished) { + return; + } + for (Map.Entry entry : metrics.entrySet()) { + this.span.setMetric(LLMOBS_METRIC_PREFIX + entry.getKey(), entry.getValue().doubleValue()); + } + } + + @Override + public void setMetric(CharSequence key, int value) { + if (finished) { + return; + } + this.span.setMetric(LLMOBS_METRIC_PREFIX + key, value); + } + + @Override + public void setMetric(CharSequence key, long value) { + if (finished) { + return; + } + this.span.setMetric(LLMOBS_METRIC_PREFIX + key, value); + } + + @Override + public void setMetric(CharSequence key, double value) { + if (finished) { + return; + } + this.span.setMetric(LLMOBS_METRIC_PREFIX + key, value); + } + + @Override + public void setTags(Map tags) { + if (finished) { + return; + } + if (tags != null && !tags.isEmpty()) { + for (Map.Entry entry : tags.entrySet()) { + this.span.setTag(LLMOBS_TAG_PREFIX + entry.getKey(), entry.getValue()); + } + } + } + + @Override + public void setTag(String key, String value) { + if (finished) { + return; + } + this.span.setTag(LLMOBS_TAG_PREFIX + key, value); + } + + @Override + public void setTag(String key, boolean value) { + if (finished) { + return; + } + this.span.setTag(LLMOBS_TAG_PREFIX + key, value); + } + + @Override + public void setTag(String key, int value) { + if (finished) { + return; + } + this.span.setTag(LLMOBS_TAG_PREFIX + key, value); + } + + @Override + public void setTag(String key, long value) { + if (finished) { + return; + } + this.span.setTag(LLMOBS_TAG_PREFIX + key, value); + } + + @Override + public void setTag(String key, double value) { + if (finished) { + return; + } + this.span.setTag(LLMOBS_TAG_PREFIX + key, value); + } + + @Override + public void setError(boolean error) { + if (finished) { + return; + } + this.span.setError(error); + } + + @Override + public void setErrorMessage(String errorMessage) { + if (finished) { + return; + } + if (errorMessage == null || errorMessage.isEmpty()) { + return; + } + this.span.setError(true); + this.span.setErrorMessage(errorMessage); + } + + @Override + public void addThrowable(Throwable throwable) { + if (finished) { + return; + } + if (throwable == null) { + return; + } + this.span.setError(true); + this.span.addThrowable(throwable); + } + + @Override + public void finish() { + if (finished) { + return; + } + this.span.finish(); + this.finished = true; + } +} diff --git a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/LLMObsInternal.java b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/LLMObsInternal.java new file mode 100644 index 00000000000..42b0c097e48 --- /dev/null +++ b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/LLMObsInternal.java @@ -0,0 +1,10 @@ +package datadog.trace.llmobs.domain; + +import datadog.trace.api.llmobs.LLMObs; + +public class LLMObsInternal extends LLMObs { + + public static void setLLMObsSpanFactory(final LLMObsSpanFactory factory) { + LLMObs.SPAN_FACTORY = factory; + } +} diff --git a/dd-java-agent/agent-llmobs/src/test/groovy/datadog/trace/llmobs/domain/DDLLMObsSpanTest.groovy b/dd-java-agent/agent-llmobs/src/test/groovy/datadog/trace/llmobs/domain/DDLLMObsSpanTest.groovy new file mode 100644 index 00000000000..cc2a82e3c08 --- /dev/null +++ b/dd-java-agent/agent-llmobs/src/test/groovy/datadog/trace/llmobs/domain/DDLLMObsSpanTest.groovy @@ -0,0 +1,213 @@ +package datadog.trace.llmobs.domain + +import datadog.trace.agent.tooling.TracerInstaller +import datadog.trace.api.DDTags +import datadog.trace.api.IdGenerationStrategy +import datadog.trace.api.llmobs.LLMObsSpan +import datadog.trace.api.llmobs.LLMObsTags +import datadog.trace.bootstrap.instrumentation.api.AgentSpan +import datadog.trace.bootstrap.instrumentation.api.AgentTracer +import datadog.trace.bootstrap.instrumentation.api.Tags +import datadog.trace.core.CoreTracer +import datadog.trace.test.util.DDSpecification +import org.apache.groovy.util.Maps +import spock.lang.Shared + +class DDLLMObsSpanTest extends DDSpecification{ + @SuppressWarnings('PropertyName') + @Shared + AgentTracer.TracerAPI TEST_TRACER + + void setupSpec() { + TEST_TRACER = + Spy( + CoreTracer.builder() + .idGenerationStrategy(IdGenerationStrategy.fromName("SEQUENTIAL")) + .build()) + TracerInstaller.forceInstallGlobalTracer(TEST_TRACER) + + TEST_TRACER.startSpan(*_) >> { + def agentSpan = callRealMethod() + agentSpan + } + } + + void cleanupSpec() { + TEST_TRACER?.close() + } + + void setup() { + assert TEST_TRACER.activeSpan() == null: "Span is active before test has started: " + TEST_TRACER.activeSpan() + TEST_TRACER.flush() + } + + void cleanup() { + TEST_TRACER.flush() + } + + // Prefix for tags + private static final String LLMOBS_TAG_PREFIX = "_ml_obs_tag."; + // Prefix for metrics + private static final String LLMOBS_METRIC_PREFIX = "_ml_obs_metric."; + + // internal tags to be prefixed + private static final String INPUT = LLMOBS_TAG_PREFIX + "input"; + private static final String OUTPUT = LLMOBS_TAG_PREFIX + "output"; + private static final String SPAN_KIND = LLMOBS_TAG_PREFIX + Tags.SPAN_KIND; + private static final String METADATA = LLMOBS_TAG_PREFIX + LLMObsTags.METADATA; + + + def "test span simple"() { + setup: + def test = givenALLMObsSpan(Tags.LLMOBS_WORKFLOW_SPAN_KIND, "test-span") + + when: + def input = "test input" + def output = "test output" + // initial set + test.annotateIO(input, output) + test.setMetadata(Maps.of("sport", "baseball", "price_data", Maps.of("gpt4", 100))) + test.setMetrics(Maps.of("rank", 1)) + test.setMetric("likelihood", 0.1) + test.setTag("DOMAIN", "north-america") + test.setTags(Maps.of("bulk1", 1, "bulk2", "2")) + def errMsg = "mr brady" + test.setErrorMessage(errMsg) + + then: + def innerSpan = (AgentSpan)test.span + assert Tags.LLMOBS_WORKFLOW_SPAN_KIND.equals(innerSpan.getTag(LLMOBS_TAG_PREFIX + "span.kind")) + + assert null == innerSpan.getTag("input") + assert input.equals(innerSpan.getTag(INPUT)) + assert null == innerSpan.getTag("output") + assert output.equals(innerSpan.getTag(OUTPUT)) + + assert null == innerSpan.getTag("metadata") + def expectedMetadata = Maps.of("sport", "baseball", "price_data", Maps.of("gpt4", 100)) + assert expectedMetadata.equals(innerSpan.getTag(METADATA)) + + assert null == innerSpan.getTag("rank") + def rankMetric = innerSpan.getTag(LLMOBS_METRIC_PREFIX + "rank") + assert rankMetric instanceof Number && 1 == (int)rankMetric + + assert null == innerSpan.getTag("likelihood") + def likelihoodMetric = innerSpan.getTag(LLMOBS_METRIC_PREFIX + "likelihood") + assert likelihoodMetric instanceof Number + assert 0.1 == (double)likelihoodMetric + + assert null == innerSpan.getTag("DOMAIN") + def domain = innerSpan.getTag(LLMOBS_TAG_PREFIX + "DOMAIN") + assert domain instanceof String + assert "north-america".equals((String)domain) + + assert null == innerSpan.getTag("bulk1") + def tagBulk1 = innerSpan.getTag(LLMOBS_TAG_PREFIX + "bulk1") + assert tagBulk1 instanceof Number + assert 1 == ((int)tagBulk1) + + assert null == innerSpan.getTag("bulk2") + def tagBulk2 = innerSpan.getTag(LLMOBS_TAG_PREFIX + "bulk2") + assert tagBulk2 instanceof String + assert "2".equals((String)tagBulk2) + + assert innerSpan.isError() + assert innerSpan.getTag(DDTags.ERROR_MSG) instanceof String + assert errMsg.equals(innerSpan.getTag(DDTags.ERROR_MSG)) + } + + def "test span with overwrites"() { + setup: + def test = givenALLMObsSpan(Tags.LLMOBS_AGENT_SPAN_KIND, "test-span") + + when: + def input = "test input" + // initial set + test.annotateIO(input, "test output") + // this should be a no-op + test.annotateIO("", "") + // this should replace the initial output + def expectedOutput = Arrays.asList(Maps.of("role", "user", "content", "how much is gas")) + test.annotateIO(null, expectedOutput) + + // initial set + test.setMetadata(Maps.of("sport", "baseball", "price_data", Maps.of("gpt4", 100))) + // this should replace baseball with hockey + test.setMetadata(Maps.of("sport", "hockey")) + // this should add a new key + test.setMetadata(Maps.of("temperature", 30)) + + // initial set + test.setMetrics(Maps.of("rank", 1)) + // this should replace the metric + test.setMetric("rank", 10) + + // initial set + test.setTag("DOMAIN", "north-america") + // add and replace + test.setTags(Maps.of("bulk1", 1, "DOMAIN", "europe")) + + def throwableMsg = "false positive" + test.addThrowable(new Throwable(throwableMsg)) + test.setError(false) + + then: + def innerSpan = (AgentSpan)test.span + assert Tags.LLMOBS_AGENT_SPAN_KIND.equals(innerSpan.getTag(LLMOBS_TAG_PREFIX + "span.kind")) + + assert null == innerSpan.getTag("input") + assert input.equals(innerSpan.getTag(INPUT)) + assert null == innerSpan.getTag("output") + assert expectedOutput.equals(innerSpan.getTag(OUTPUT)) + + assert null == innerSpan.getTag("metadata") + def expectedMetadata = Maps.of("sport", "hockey", "price_data", Maps.of("gpt4", 100), "temperature", 30) + assert expectedMetadata.equals(innerSpan.getTag(METADATA)) + + assert null == innerSpan.getTag("rank") + def rankMetric = innerSpan.getTag(LLMOBS_METRIC_PREFIX + "rank") + assert rankMetric instanceof Number && 10 == (int)rankMetric + + assert null == innerSpan.getTag("DOMAIN") + def domain = innerSpan.getTag(LLMOBS_TAG_PREFIX + "DOMAIN") + assert domain instanceof String + assert "europe".equals((String)domain) + + assert null == innerSpan.getTag("bulk1") + def tagBulk1 = innerSpan.getTag(LLMOBS_TAG_PREFIX + "bulk1") + assert tagBulk1 instanceof Number + assert 1 == ((int)tagBulk1) + + assert !innerSpan.isError() + assert innerSpan.getTag(DDTags.ERROR_MSG) instanceof String + assert throwableMsg.equals(innerSpan.getTag(DDTags.ERROR_MSG)) + assert innerSpan.getTag(DDTags.ERROR_STACK) instanceof String + assert ((String)innerSpan.getTag(DDTags.ERROR_STACK)).contains(throwableMsg) + } + + def "test llm span string input formatted to messages"() { + setup: + def test = givenALLMObsSpan(Tags.LLMOBS_LLM_SPAN_KIND, "test-span") + + when: + def input = "test input" + def output = "test output" + // initial set + test.annotateIO(input, output) + + then: + def innerSpan = (AgentSpan)test.span + assert Tags.LLMOBS_LLM_SPAN_KIND.equals(innerSpan.getTag(LLMOBS_TAG_PREFIX + "span.kind")) + + assert null == innerSpan.getTag("input") + def expectedInput = Arrays.asList(Maps.of("content", input)) + assert expectedInput.equals(innerSpan.getTag(INPUT)) + assert null == innerSpan.getTag("output") + def expectedOutput = Arrays.asList(Maps.of("content", output)) + assert expectedOutput.equals(innerSpan.getTag(OUTPUT)) + } + + private LLMObsSpan givenALLMObsSpan(String kind, name){ + new DDLLMObsSpan(kind, name, "test-ml-app", null, "test-svc") + } +} diff --git a/dd-java-agent/build.gradle b/dd-java-agent/build.gradle index 459880a0ee7..47c631f57b9 100644 --- a/dd-java-agent/build.gradle +++ b/dd-java-agent/build.gradle @@ -119,6 +119,7 @@ includeSubprojShadowJar ':dd-java-agent:appsec', 'appsec' includeSubprojShadowJar ':dd-java-agent:agent-iast', 'iast' includeSubprojShadowJar ':dd-java-agent:agent-debugger', 'debugger' includeSubprojShadowJar ':dd-java-agent:agent-ci-visibility', 'ci-visibility' +includeSubprojShadowJar ':dd-java-agent:agent-llmobs', 'llm-obs' includeSubprojShadowJar ':dd-java-agent:agent-logs-intake', 'logs-intake' includeSubprojShadowJar ':dd-java-agent:cws-tls', 'cws-tls' diff --git a/dd-trace-api/src/main/java/datadog/trace/api/DDSpanTypes.java b/dd-trace-api/src/main/java/datadog/trace/api/DDSpanTypes.java index a0118b62899..562ad7fb0c9 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/DDSpanTypes.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/DDSpanTypes.java @@ -36,5 +36,8 @@ public class DDSpanTypes { public static final String PROTOBUF = "protobuf"; public static final String MULE = "mule"; + public static final String VALKEY = "valkey"; + + public static final String LLMOBS = "llm"; } diff --git a/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObs.java b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObs.java index 02eb79a7d79..517677001d2 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObs.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObs.java @@ -4,7 +4,7 @@ import javax.annotation.Nullable; public class LLMObs { - private static LLMObsSpanFactory SPAN_FACTORY = NoOpLLMObsSpanFactory.INSTANCE; + protected static LLMObsSpanFactory SPAN_FACTORY = NoOpLLMObsSpanFactory.INSTANCE; public static LLMObsSpan startLLMSpan( String spanName, diff --git a/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsTags.java b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsTags.java new file mode 100644 index 00000000000..afa4f2b241e --- /dev/null +++ b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsTags.java @@ -0,0 +1,15 @@ +package datadog.trace.api.llmobs; + +// Well known tags for llm obs +public class LLMObsTags { + public static final String ML_APP = "ml_app"; + public static final String SESSION_ID = "session_id"; + + // meta + public static final String METADATA = "metadata"; + + // LLM spans related + public static final String MODEL_NAME = "model_name"; + public static final String MODEL_VERSION = "model_version"; + public static final String MODEL_PROVIDER = "model_provider"; +} diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index d758f675c32..f07fbb74cf4 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -313,6 +313,7 @@ public static String getHostName() { private final int iastDbRowsToTaint; private final boolean llmObsAgentlessEnabled; + private final String llmObsAgentlessUrl; private final String llmObsMlApp; private final boolean ciVisibilityTraceSanitationEnabled; @@ -1377,6 +1378,22 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment()) configProvider.getBoolean(LLMOBS_AGENTLESS_ENABLED, DEFAULT_LLM_OBS_AGENTLESS_ENABLED); llmObsMlApp = configProvider.getString(LLMOBS_ML_APP); + final String llmObsAgentlessUrlStr = getFinalLLMObsUrl(); + URI parsedLLMObsUri = null; + if (llmObsAgentlessUrlStr != null && !llmObsAgentlessUrlStr.isEmpty()) { + try { + parsedLLMObsUri = new URL(llmObsAgentlessUrlStr).toURI(); + } catch (MalformedURLException | URISyntaxException ex) { + log.error( + "Cannot parse LLM Observability agentless URL '{}', skipping", llmObsAgentlessUrlStr); + } + } + if (parsedLLMObsUri != null) { + llmObsAgentlessUrl = llmObsAgentlessUrlStr; + } else { + llmObsAgentlessUrl = null; + } + ciVisibilityTraceSanitationEnabled = configProvider.getBoolean(CIVISIBILITY_TRACE_SANITATION_ENABLED, true); @@ -2743,6 +2760,10 @@ public boolean isLlmObsAgentlessEnabled() { return llmObsAgentlessEnabled; } + public String getLlMObsAgentlessUrl() { + return llmObsAgentlessUrl; + } + public String getLlmObsMlApp() { return llmObsMlApp; } @@ -3774,6 +3795,13 @@ public String getFinalProfilingUrl() { } } + public String getFinalLLMObsUrl() { + if (llmObsAgentlessEnabled) { + return "https://llmobs-intake." + site + "/api/v2/llmobs"; + } + return null; + } + public String getFinalCrashTrackingTelemetryUrl() { if (crashTrackingAgentless) { // when agentless crashTracking is turned on we send directly to our intake diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/Tags.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/Tags.java index 78c90b312a8..fd11b2bf565 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/Tags.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/Tags.java @@ -152,4 +152,10 @@ public class Tags { public static final String PROPAGATED_TRACE_SOURCE = "_dd.p.ts"; public static final String PROPAGATED_DEBUG = "_dd.p.debug"; + + public static final String LLMOBS_LLM_SPAN_KIND = "llm"; + public static final String LLMOBS_WORKFLOW_SPAN_KIND = "workflow"; + public static final String LLMOBS_TASK_SPAN_KIND = "task"; + public static final String LLMOBS_AGENT_SPAN_KIND = "agent"; + public static final String LLMOBS_TOOL_SPAN_KIND = "tool"; } diff --git a/settings.gradle b/settings.gradle index c79530dd5ba..6e4083c0ec8 100644 --- a/settings.gradle +++ b/settings.gradle @@ -77,6 +77,9 @@ include ':dd-java-agent:appsec' // ci-visibility include ':dd-java-agent:agent-ci-visibility' +// llm-observability +include ':dd-java-agent:agent-llmobs' + // iast include ':dd-java-agent:agent-iast' From 273e2ac0051aa9c16087d939f040a2c7c5efc9cb Mon Sep 17 00:00:00 2001 From: gary-huang Date: Mon, 27 Jan 2025 14:21:50 -0500 Subject: [PATCH 4/8] impl llmobs agent --- .../main/java/datadog/trace/api/llmobs/LLMObsConstants.java | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsConstants.java diff --git a/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsConstants.java b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsConstants.java new file mode 100644 index 00000000000..9cfcbd821cb --- /dev/null +++ b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsConstants.java @@ -0,0 +1,5 @@ +package datadog.trace.api.llmobs; + +public interface LLMObsConstants { + String LLM_OBS_INSTRUMENTATION_NAME = "llmobs"; +} From 738eb7773a714ec6db834cf5003c55f231b69988 Mon Sep 17 00:00:00 2001 From: gary-huang Date: Fri, 7 Feb 2025 16:37:57 -0500 Subject: [PATCH 5/8] working writer --- communication/build.gradle | 1 + .../java/datadog/trace/bootstrap/Agent.java | 14 +- .../trace/api/llmobs/LLMObsConstants.java | 5 - dd-trace-core/build.gradle | 5 + .../trace/common/writer/DDAgentWriter.java | 1 + .../trace/common/writer/WriterFactory.java | 29 +- .../common/writer/ddagent/DDAgentApi.java | 1 + .../common/writer/ddintake/DDIntakeApi.java | 1 + .../ddintake/DDIntakeMapperDiscovery.java | 3 + .../writer/ddintake/LLMObsSpanMapper.java | 323 ++++++++++++++++++ .../main/java/datadog/trace/api/Config.java | 1 + .../civisibility/telemetry/tag/Endpoint.java | 3 +- .../datadog/trace/api/intake/TrackType.java | 1 + .../api/InternalSpanTypes.java | 2 + 14 files changed, 376 insertions(+), 14 deletions(-) delete mode 100644 dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsConstants.java create mode 100644 dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java diff --git a/communication/build.gradle b/communication/build.gradle index 2b621a99b4e..d3332d9e40a 100644 --- a/communication/build.gradle +++ b/communication/build.gradle @@ -4,6 +4,7 @@ dependencies { implementation libs.slf4j api project(':remote-config:remote-config-api') + implementation project(':components:json') implementation project(':remote-config:remote-config-core') implementation project(':internal-api') implementation project(':utils:container-utils') diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java index 7b4ae97135b..d1c5a4c6b6c 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java @@ -37,6 +37,7 @@ import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.AgentTracer.TracerAPI; import datadog.trace.bootstrap.instrumentation.api.ProfilingContextIntegration; +import datadog.trace.bootstrap.instrumentation.api.WriterConstants; import datadog.trace.bootstrap.instrumentation.jfr.InstrumentationBasedProfiling; import datadog.trace.util.AgentTaskScheduler; import datadog.trace.util.AgentThreadFactory.AgentThread; @@ -111,7 +112,8 @@ private enum AgentFeature { DATA_JOBS(propertyNameToSystemPropertyName(GeneralConfig.DATA_JOBS_ENABLED), false), AGENTLESS_LOG_SUBMISSION( propertyNameToSystemPropertyName(GeneralConfig.AGENTLESS_LOG_SUBMISSION_ENABLED), false), - LLMOBS(propertyNameToSystemPropertyName(LlmObsConfig.LLMOBS_ENABLED), false); + LLMOBS(propertyNameToSystemPropertyName(LlmObsConfig.LLMOBS_ENABLED), false), + LLMOBS_AGENTLESS(propertyNameToSystemPropertyName(LlmObsConfig.LLMOBS_AGENTLESS_ENABLED), false); private final String systemProp; private final boolean enabledByDefault; @@ -153,6 +155,7 @@ public boolean isEnabledByDefault() { private static boolean cwsEnabled = false; private static boolean ciVisibilityEnabled = false; private static boolean llmObsEnabled = false; + private static boolean llmObsAgentlessEnabled = false; private static boolean usmEnabled = false; private static boolean telemetryEnabled = true; private static boolean debuggerEnabled = false; @@ -273,6 +276,15 @@ public static void start( agentlessLogSubmissionEnabled = isFeatureEnabled(AgentFeature.AGENTLESS_LOG_SUBMISSION); llmObsEnabled = isFeatureEnabled(AgentFeature.LLMOBS); + if (llmObsEnabled) { + // for llm obs spans, use agent proxy by default, apm spans will use agent writer + setSystemPropertyDefault(propertyNameToSystemPropertyName(TracerConfig.WRITER_TYPE), WriterConstants.MULTI_WRITER_TYPE + ":" + WriterConstants.DD_INTAKE_WRITER_TYPE + "," + WriterConstants.DD_AGENT_WRITER_TYPE); + if (llmObsAgentlessEnabled) { + // use API writer only + setSystemPropertyDefault(propertyNameToSystemPropertyName(TracerConfig.WRITER_TYPE), WriterConstants.DD_INTAKE_WRITER_TYPE); + } + } + if (profilingEnabled) { if (!isOracleJDK8()) { // Profiling agent startup code is written in a way to allow `startProfilingAgent` be called diff --git a/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsConstants.java b/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsConstants.java deleted file mode 100644 index 9cfcbd821cb..00000000000 --- a/dd-trace-api/src/main/java/datadog/trace/api/llmobs/LLMObsConstants.java +++ /dev/null @@ -1,5 +0,0 @@ -package datadog.trace.api.llmobs; - -public interface LLMObsConstants { - String LLM_OBS_INSTRUMENTATION_NAME = "llmobs"; -} diff --git a/dd-trace-core/build.gradle b/dd-trace-core/build.gradle index cfc50ded09b..bd118f28e18 100644 --- a/dd-trace-core/build.gradle +++ b/dd-trace-core/build.gradle @@ -68,6 +68,11 @@ dependencies { implementation project(':components:json') implementation project(':utils:container-utils') implementation project(':utils:socket-utils') + + implementation group: 'org.msgpack', name: 'msgpack-core', version: '0.8.10' + implementation group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.10' + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.10.0' + // for span exception debugging compileOnly project(':dd-java-agent:agent-debugger:debugger-bootstrap') diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index 153bcca6565..5f4945b7abe 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -151,6 +151,7 @@ public DDAgentWriter build() { } final DDAgentMapperDiscovery mapperDiscovery = new DDAgentMapperDiscovery(featureDiscovery); + final PayloadDispatcher dispatcher = new PayloadDispatcherImpl(mapperDiscovery, agentApi, healthMetrics, monitoring); final TraceProcessingWorker traceProcessingWorker = diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java index b4af03b90e7..765add5440e 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java @@ -82,7 +82,7 @@ public static Writer createWriter( // The AgentWriter doesn't support the CI Visibility protocol. If CI Visibility is // enabled, check if we can use the IntakeWriter instead. - if (DD_AGENT_WRITER_TYPE.equals(configuredType) && config.isCiVisibilityEnabled()) { + if (DD_AGENT_WRITER_TYPE.equals(configuredType) && (config.isCiVisibilityEnabled())) { if (featuresDiscovery.supportsEvpProxy() || config.isCiVisibilityAgentlessEnabled()) { configuredType = DD_INTAKE_WRITER_TYPE; } else { @@ -116,6 +116,10 @@ public static Writer createWriter( builder.addTrack(TrackType.CITESTCOV, coverageApi); } + final RemoteApi llmobsApi = + createDDIntakeRemoteApi(config, commObjects, featuresDiscovery, TrackType.LLMOBS); + builder.addTrack(TrackType.LLMOBS, llmobsApi); + remoteWriter = builder.build(); } else { // configuredType == DDAgentWriter @@ -171,7 +175,11 @@ private static RemoteApi createDDIntakeRemoteApi( SharedCommunicationObjects commObjects, DDAgentFeaturesDiscovery featuresDiscovery, TrackType trackType) { - if (featuresDiscovery.supportsEvpProxy() && !config.isCiVisibilityAgentlessEnabled()) { + boolean evpProxySupported = featuresDiscovery.supportsEvpProxy(); + boolean useProxyApi = (evpProxySupported && TrackType.LLMOBS == trackType && !config.isLlmObsAgentlessEnabled()) + || (evpProxySupported && (TrackType.CITESTCOV == trackType || TrackType.CITESTCYCLE == trackType) && !config.isCiVisibilityAgentlessEnabled()); + + if (useProxyApi) { return DDEvpProxyApi.builder() .httpClient(commObjects.okHttpClient) .agentUrl(commObjects.agentUrl) @@ -179,18 +187,25 @@ private static RemoteApi createDDIntakeRemoteApi( .trackType(trackType) .compressionEnabled(featuresDiscovery.supportsContentEncodingHeadersWithEvpProxy()) .build(); - } else { HttpUrl hostUrl = null; + String llmObsAgentlessUrl = config.getLlMObsAgentlessUrl(); + if (config.getCiVisibilityAgentlessUrl() != null) { hostUrl = HttpUrl.get(config.getCiVisibilityAgentlessUrl()); log.info("Using host URL '{}' to report CI Visibility traces in Agentless mode.", hostUrl); + } else if (config.isLlmObsEnabled() + && config.isLlmObsAgentlessEnabled() + && llmObsAgentlessUrl != null + && !llmObsAgentlessUrl.isEmpty()) { + hostUrl = HttpUrl.get(llmObsAgentlessUrl); + log.info("Using host URL '{}' to report LLM Obs traces in Agentless mode.", hostUrl); } return DDIntakeApi.builder() - .hostUrl(hostUrl) - .apiKey(config.getApiKey()) - .trackType(trackType) - .build(); + .hostUrl(hostUrl) + .apiKey(config.getApiKey()) + .trackType(trackType) + .build(); } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java index 645bbc4b9e9..ccdc6ccda09 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java @@ -90,6 +90,7 @@ public void addResponseListener(final RemoteResponseListener listener) { public Response sendSerializedTraces(final Payload payload) { final int sizeInBytes = payload.sizeInBytes(); String tracesEndpoint = featuresDiscovery.getTraceEndpoint(); + if (null == tracesEndpoint) { featuresDiscovery.discoverIfOutdated(); tracesEndpoint = featuresDiscovery.getTraceEndpoint(); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeApi.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeApi.java index 7abad42d2f1..954fc8e1cf7 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeApi.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeApi.java @@ -131,6 +131,7 @@ public Response sendSerializedTraces(Payload payload) { .post(payload.toRequest()) .tag(OkHttpUtils.CustomListener.class, telemetryListener) .build(); + totalTraces += payload.traceCount(); receivedTraces += payload.traceCount(); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeMapperDiscovery.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeMapperDiscovery.java index 6cc38f8a3a6..5123e6fe06e 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeMapperDiscovery.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeMapperDiscovery.java @@ -6,6 +6,7 @@ import datadog.trace.civisibility.writer.ddintake.CiTestCycleMapperV1; import datadog.trace.common.writer.RemoteMapper; import datadog.trace.common.writer.RemoteMapperDiscovery; +import datadog.trace.llmobs.writer.ddintake.LLMObsSpanMapper; /** * Mapper discovery logic when a DDIntake is used. The mapper is discovered based on a backend @@ -40,6 +41,8 @@ public void discover() { mapper = new CiTestCycleMapperV1(wellKnownTags, compressionEnabled); } else if (TrackType.CITESTCOV.equals(trackType)) { mapper = new CiTestCovMapperV2(compressionEnabled); + } else if (TrackType.LLMOBS.equals(trackType)) { + mapper = new LLMObsSpanMapper(); } else { mapper = RemoteMapper.NO_OP; } diff --git a/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java b/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java new file mode 100644 index 00000000000..7562a3ec05d --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java @@ -0,0 +1,323 @@ +package datadog.trace.llmobs.writer.ddintake; + +import static datadog.communication.http.OkHttpUtils.gzippedMsgpackRequestBodyOf; + +import datadog.communication.serialization.Writable; +import datadog.trace.api.DDTags; +import datadog.trace.api.intake.TrackType; +import datadog.trace.api.llmobs.LLMObsTags; +import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import datadog.trace.common.writer.Payload; +import datadog.trace.common.writer.RemoteMapper; +import datadog.trace.core.CoreSpan; +import datadog.trace.core.Metadata; +import datadog.trace.core.MetadataConsumer; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import okhttp3.RequestBody; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LLMObsSpanMapper implements RemoteMapper { + + // Well known tags for LLM obs will be prefixed with _ml_obs_(tags|metrics). + // Prefix for tags + private static final String LLMOBS_TAG_PREFIX = "_ml_obs_tag."; + // Prefix for metrics + private static final String LLMOBS_METRIC_PREFIX = "_ml_obs_metric."; + + // internal tags to be prefixed + private static final String INPUT = "input"; + private static final String OUTPUT = "output"; + private static final String SPAN_KIND_TAG_KEY = LLMOBS_TAG_PREFIX + Tags.SPAN_KIND; + + private static final Logger LOGGER = LoggerFactory.getLogger(LLMObsSpanMapper.class); + + private static final byte[] STAGE = "_dd.stage".getBytes(StandardCharsets.UTF_8); + private static final byte[] EVENT_TYPE = "event_type".getBytes(StandardCharsets.UTF_8); + + private static final byte[] SPAN_ID = "span_id".getBytes(StandardCharsets.UTF_8); + private static final byte[] TRACE_ID = "trace_id".getBytes(StandardCharsets.UTF_8); + private static final byte[] PARENT_ID = "parent_id".getBytes(StandardCharsets.UTF_8); + private static final byte[] NAME = "name".getBytes(StandardCharsets.UTF_8); + private static final byte[] DURATION = "duration".getBytes(StandardCharsets.UTF_8); + private static final byte[] START_NS = "start_ns".getBytes(StandardCharsets.UTF_8); + private static final byte[] STATUS = "status".getBytes(StandardCharsets.UTF_8); + private static final byte[] ERROR = "error".getBytes(StandardCharsets.UTF_8); + + private static final byte[] META = "meta".getBytes(StandardCharsets.UTF_8); + private static final byte[] METADATA = "metadata".getBytes(StandardCharsets.UTF_8); + private static final byte[] SPAN_KIND = "span.kind".getBytes(StandardCharsets.UTF_8); + private static final byte[] SPANS = "spans".getBytes(StandardCharsets.UTF_8); + private static final byte[] METRICS = "metrics".getBytes(StandardCharsets.UTF_8); + private static final byte[] TAGS = "tags".getBytes(StandardCharsets.UTF_8); + + private final LLMObsSpanMapper.MetaWriter metaWriter = new MetaWriter(); + private final int size; + + public LLMObsSpanMapper() { + this(5 << 20); + } + + private LLMObsSpanMapper(int size) { + this.size = size; + } + + @Override + public void map(List> trace, Writable writable) { + List> llmobsSpans = + trace.stream().filter(LLMObsSpanMapper::isLLMObsSpan).collect(Collectors.toList()); + + writable.startMap(3); + + writable.writeUTF8(EVENT_TYPE); + writable.writeString("span", null); + + writable.writeUTF8(STAGE); + writable.writeString("raw", null); + + writable.writeUTF8(SPANS); + writable.startArray(llmobsSpans.size()); + for (CoreSpan span : llmobsSpans) { + writable.startMap(11); + // 1 + writable.writeUTF8(SPAN_ID); + writable.writeString(String.valueOf(span.getSpanId()), null); + + // 2 + writable.writeUTF8(TRACE_ID); + writable.writeString(span.getTraceId().toHexString(), null); + + // 3 + writable.writeUTF8(PARENT_ID); + // TODO fix after parent ID tracking is in place + writable.writeString("undefined", null); + + // 4 + writable.writeUTF8(NAME); + writable.writeString(span.getOperationName(), null); + + // 5 + writable.writeUTF8(START_NS); + writable.writeUnsignedLong(span.getStartTime()); + + // 6 + writable.writeUTF8(DURATION); + writable.writeFloat(span.getDurationNano()); + + // 7 + writable.writeUTF8(ERROR); + writable.writeInt(span.getError()); + + boolean errored = span.getError() == 1; + + // 8 + writable.writeUTF8(STATUS); + writable.writeString(errored ? "error" : "ok", null); + + /* 9 (metrics), 10 (tags), 11 meta */ + span.processTagsAndBaggage(metaWriter.withWritable(writable, getErrorsMap(span))); + } + } + + private static boolean isLLMObsSpan(CoreSpan span) { + CharSequence type = span.getType(); + return type != null && type.toString().contentEquals(InternalSpanTypes.LLMOBS); + } + + @Override + public Payload newPayload() { + return new PayloadV1(); + } + + @Override + public int messageBufferSize() { + return size; + } + + @Override + public void reset() {} + + @Override + public String endpoint() { + return TrackType.LLMOBS + "/v2"; + } + + private static Map getErrorsMap(CoreSpan span) { + Map errors = new HashMap<>(); + String errorMsg = span.getTag(DDTags.ERROR_MSG); + if (errorMsg != null && !errorMsg.isEmpty()) { + errors.put(DDTags.ERROR_MSG, errorMsg); + } + String errorType = span.getTag(DDTags.ERROR_TYPE); + if (errorType != null && !errorType.isEmpty()) { + errors.put(DDTags.ERROR_TYPE, errorType); + } + String errorStack = span.getTag(DDTags.ERROR_STACK); + if (errorStack != null && !errorStack.isEmpty()) { + errors.put(DDTags.ERROR_STACK, errorStack); + } + return errors; + } + + private static final class MetaWriter implements MetadataConsumer { + + private Writable writable; + private Map errorInfo; + + private static final Set TAGS_FOR_REMAPPING = + Collections.unmodifiableSet( + new HashSet<>( + Arrays.asList( + LLMOBS_TAG_PREFIX + INPUT, + LLMOBS_TAG_PREFIX + OUTPUT, + LLMOBS_TAG_PREFIX + LLMObsTags.MODEL_NAME, + LLMOBS_TAG_PREFIX + LLMObsTags.MODEL_PROVIDER, + LLMOBS_TAG_PREFIX + LLMObsTags.MODEL_VERSION, + LLMOBS_TAG_PREFIX + LLMObsTags.METADATA))); + + LLMObsSpanMapper.MetaWriter withWritable(Writable writable, Map errorInfo) { + this.writable = writable; + this.errorInfo = errorInfo; + return this; + } + + @Override + public void accept(Metadata metadata) { + Map tagsToRemapToMeta = new HashMap<>(); + int metricsSize = 0, tagsSize = 0; + String spanKind = "unknown"; + for (Map.Entry tag : metadata.getTags().entrySet()) { + String key = tag.getKey(); + if (key.equals(SPAN_KIND_TAG_KEY)) { + spanKind = String.valueOf(tag.getValue()); + } else if (TAGS_FOR_REMAPPING.contains(key)) { + tagsToRemapToMeta.put(key, tag.getValue()); + } else if (key.startsWith(LLMOBS_METRIC_PREFIX) && tag.getValue() instanceof Number) { + ++metricsSize; + } else if (key.startsWith(LLMOBS_TAG_PREFIX)) { + if (TAGS_FOR_REMAPPING.contains(key.replaceFirst(LLMOBS_TAG_PREFIX, ""))) { + tagsToRemapToMeta.put(key, tag.getValue()); + } else { + ++tagsSize; + } + } + } + + if (!spanKind.equals("unknown")) { + metadata.getTags().remove(SPAN_KIND_TAG_KEY); + } else { + LOGGER.warn("missing span kind"); + } + + // write metrics (9) + writable.writeUTF8(METRICS); + writable.startMap(metricsSize); + for (Map.Entry tag : metadata.getTags().entrySet()) { + if (tag.getKey().startsWith(LLMOBS_METRIC_PREFIX) && tag.getValue() instanceof Number) { + writable.writeString(tag.getKey().replaceFirst(LLMOBS_METRIC_PREFIX, ""), null); + writable.writeDouble((double) tag.getValue()); + } + } + + // write tags (10) + writable.writeUTF8(TAGS); + writable.startArray(tagsSize + 1); + writable.writeString("language:jvm", null); + for (Map.Entry tag : metadata.getTags().entrySet()) { + Object value = tag.getValue(); + if (!tagsToRemapToMeta.containsKey(tag.getKey()) + && tag.getKey().startsWith(LLMOBS_TAG_PREFIX)) { + String key = tag.getKey().replaceFirst(LLMOBS_TAG_PREFIX, ""); + writable.writeObject(key + ":" + value, null); + } + } + + // write meta (11) + int metaSize = tagsToRemapToMeta.size() + 1 + (null != errorInfo ? errorInfo.size() : 0); + writable.writeUTF8(META); + writable.startMap(metaSize); + writable.writeUTF8(SPAN_KIND); + writable.writeString(spanKind, null); + + for (Map.Entry error : errorInfo.entrySet()) { + writable.writeUTF8(error.getKey().getBytes()); + writable.writeString(error.getValue(), null); + } + + for (Map.Entry tag : tagsToRemapToMeta.entrySet()) { + String key = tag.getKey().replaceFirst(LLMOBS_TAG_PREFIX, ""); + Object val = tag.getValue(); + if (key.equals(INPUT) || key.equals(OUTPUT)) { + if (!spanKind.equals(Tags.LLMOBS_LLM_SPAN_KIND)) { + key += ".value"; + } else { + key += ".messages"; + } + } else if (key.equals(LLMObsTags.METADATA) && val instanceof Map) { + Map metadataMap = (Map) val; + writable.writeUTF8(METADATA); + writable.startMap(metadataMap.size()); + for (Map.Entry entry : metadataMap.entrySet()) { + writable.writeString(entry.getKey(), null); + writable.writeObject(entry.getValue(), null); + } + continue; + } + writable.writeString(key, null); + writable.writeObject(val, null); + } + } + } + + private static class PayloadV1 extends Payload { + private PayloadV1() {} + + @Override + public int sizeInBytes() { + if (traceCount() == 0) { + return msgpackMapHeaderSize(0); + } + + return body.array().length; + } + + @Override + public void writeTo(WritableByteChannel channel) throws IOException { + // If traceCount is 0, we write a map with 0 elements in MsgPack format. + if (traceCount() == 0) { + ByteBuffer emptyDict = msgpackMapHeader(0); + while (emptyDict.hasRemaining()) { + channel.write(emptyDict); + } + } else { + while (body.hasRemaining()) { + channel.write(body); + } + } + } + + @Override + public RequestBody toRequest() { + List buffers; + if (traceCount() == 0) { + buffers = Collections.singletonList(msgpackMapHeader(0)); + } else { + buffers = Collections.singletonList(body); + } + + return gzippedMsgpackRequestBodyOf(buffers); + } + } +} diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index f07fbb74cf4..f8b0feb08d9 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -2032,6 +2032,7 @@ public boolean isIntegrationSynapseLegacyOperationName() { } public String getWriterType() { + log.warn("WRITER TYPE {}", writerType); return writerType; } diff --git a/internal-api/src/main/java/datadog/trace/api/civisibility/telemetry/tag/Endpoint.java b/internal-api/src/main/java/datadog/trace/api/civisibility/telemetry/tag/Endpoint.java index 6c2d9b6b25c..529644493b2 100644 --- a/internal-api/src/main/java/datadog/trace/api/civisibility/telemetry/tag/Endpoint.java +++ b/internal-api/src/main/java/datadog/trace/api/civisibility/telemetry/tag/Endpoint.java @@ -5,7 +5,8 @@ /** The type of endpoint where a request is sent */ public enum Endpoint implements TagValue { TEST_CYCLE, - CODE_COVERAGE; + CODE_COVERAGE, + LLMOBS; // TODO this is probably not right, need to probably move this enum to a common package? private final String s; diff --git a/internal-api/src/main/java/datadog/trace/api/intake/TrackType.java b/internal-api/src/main/java/datadog/trace/api/intake/TrackType.java index 4d99f0655a8..e50f46b6f13 100644 --- a/internal-api/src/main/java/datadog/trace/api/intake/TrackType.java +++ b/internal-api/src/main/java/datadog/trace/api/intake/TrackType.java @@ -6,6 +6,7 @@ public enum TrackType { CITESTCYCLE(Endpoint.TEST_CYCLE), CITESTCOV(Endpoint.CODE_COVERAGE), + LLMOBS(Endpoint.LLMOBS), NOOP(null); @Nullable public final Endpoint endpoint; diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InternalSpanTypes.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InternalSpanTypes.java index 6fc420b9e9a..21ae3052b7a 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InternalSpanTypes.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InternalSpanTypes.java @@ -46,6 +46,8 @@ public class InternalSpanTypes { UTF8BytesString.create(DDSpanTypes.VULNERABILITY); public static final UTF8BytesString PROTOBUF = UTF8BytesString.create(DDSpanTypes.PROTOBUF); + public static final UTF8BytesString LLMOBS = UTF8BytesString.create(DDSpanTypes.LLMOBS); + public static final UTF8BytesString TIBCO_BW = UTF8BytesString.create("tibco_bw"); public static final UTF8BytesString MULE = UTF8BytesString.create(DDSpanTypes.MULE); public static final CharSequence VALKEY = UTF8BytesString.create(DDSpanTypes.VALKEY); From 717a9cbba8cb9d914a9934dee2e104015fa592ce Mon Sep 17 00:00:00 2001 From: gary-huang Date: Fri, 7 Feb 2025 16:37:57 -0500 Subject: [PATCH 6/8] working writer --- .../java/datadog/trace/llmobs/domain/DDLLMObsSpan.java | 3 +++ .../java/datadog/trace/common/writer/DDIntakeWriter.java | 9 +++++++++ .../trace/common/writer/ddintake/DDEvpProxyApi.java | 1 + .../src/main/java/datadog/trace/core/CoreTracer.java | 1 + 4 files changed, 14 insertions(+) diff --git a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java index 35558e02987..4c88e8171fc 100644 --- a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java +++ b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java @@ -50,6 +50,8 @@ private enum State { private boolean finished = false; + private static final Logger LOGGER = LoggerFactory.getLogger(DDLLMObsSpan.class); + public DDLLMObsSpan( @Nonnull String kind, String spanName, @@ -108,6 +110,7 @@ public void annotateIO( if (finished) { return; } + LOGGER.warn("ANNOTATE IN {} OUT {}", inputData, outputData); if (inputData != null && !inputData.isEmpty()) { State inputState = validateIOMessages(inputData); if (validateIOMessages(inputData) != State.VALID) { diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java index 614865a6b5e..7f0f1c4b083 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java @@ -12,6 +12,8 @@ import java.util.EnumMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DDIntakeWriter extends RemoteWriter { @@ -40,6 +42,8 @@ public static class DDIntakeWriterBuilder { private SingleSpanSampler singleSpanSampler; + private static final Logger log = LoggerFactory.getLogger(DDIntakeWriterBuilder.class); + public DDIntakeWriterBuilder addTrack(final TrackType trackType, final RemoteApi intakeApi) { tracks.put(trackType, intakeApi); return this; @@ -98,6 +102,7 @@ public DDIntakeWriterBuilder singleSpanSampler(SingleSpanSampler singleSpanSampl } public DDIntakeWriter build() { + log.debug("DDINTAKEWRITER TRACKS {}", tracks); if (tracks.isEmpty()) { throw new IllegalArgumentException("At least one track needs to be configured"); } @@ -111,7 +116,11 @@ public DDIntakeWriter build() { .map(this::createDispatcher) .toArray(PayloadDispatcher[]::new); dispatcher = new CompositePayloadDispatcher(dispatchers); + for (PayloadDispatcher dispatcher2 : dispatchers) { + log.debug("COMP DISPATCHER {}", dispatcher2); + } } + log.debug("DISPATCHER {}", dispatcher); final TraceProcessingWorker traceProcessingWorker = new TraceProcessingWorker( diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java index 8926d17e200..af189013ce0 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java @@ -128,6 +128,7 @@ private DDEvpProxyApi( public Response sendSerializedTraces(Payload payload) { final int sizeInBytes = payload.sizeInBytes(); + log.debug("SENDING PL {} TO TRACK {}", payload, trackType); Request.Builder builder = new Request.Builder() .url(proxiedApiUrl) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java index b9a3d5f07da..962ad182779 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java @@ -676,6 +676,7 @@ private CoreTracer( } else { this.writer = writer; } + log.debug("STARTED WRITER {}", this.writer); if (config.isCiVisibilityEnabled() && (config.isCiVisibilityAgentlessEnabled() From 92bc77490c8c07bd93d6adb2789b84cba08f323f Mon Sep 17 00:00:00 2001 From: gary-huang Date: Fri, 7 Feb 2025 16:37:57 -0500 Subject: [PATCH 7/8] working writer --- .../json/JSONWritableFormatter.java | 131 ++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100644 communication/src/main/java/datadog/communication/serialization/json/JSONWritableFormatter.java diff --git a/communication/src/main/java/datadog/communication/serialization/json/JSONWritableFormatter.java b/communication/src/main/java/datadog/communication/serialization/json/JSONWritableFormatter.java new file mode 100644 index 00000000000..0304fda34f7 --- /dev/null +++ b/communication/src/main/java/datadog/communication/serialization/json/JSONWritableFormatter.java @@ -0,0 +1,131 @@ +package datadog.communication.serialization.json; + +import datadog.communication.serialization.EncodingCache; +import datadog.communication.serialization.Mapper; +import datadog.communication.serialization.WritableFormatter; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import java.nio.ByteBuffer; +import java.util.Map; + +public class JSONWritableFormatter implements WritableFormatter { + + @Override + public boolean format(T message, Mapper mapper) { + return false; + } + + @Override + public void flush() { + + } + + @Override + public void writeNull() { + + } + + @Override + public void writeBoolean(boolean value) { + + } + + @Override + public void writeObject(Object value, EncodingCache encodingCache) { + + } + + @Override + public void writeObjectString(Object value, EncodingCache encodingCache) { + + } + + @Override + public void writeMap(Map map, EncodingCache encodingCache) { + + } + + @Override + public void writeString(CharSequence s, EncodingCache encodingCache) { + + } + + @Override + public void writeUTF8(byte[] string, int offset, int length) { + + } + + @Override + public void writeUTF8(byte[] string) { + + } + + @Override + public void writeUTF8(UTF8BytesString string) { + + } + + @Override + public void writeBinary(byte[] binary) { + + } + + @Override + public void writeBinary(byte[] binary, int offset, int length) { + + } + + @Override + public void startMap(int elementCount) { + + } + + @Override + public void startStruct(int elementCount) { + + } + + @Override + public void startArray(int elementCount) { + + } + + @Override + public void writeBinary(ByteBuffer buffer) { + + } + + @Override + public void writeInt(int value) { + + } + + @Override + public void writeSignedInt(int value) { + + } + + @Override + public void writeLong(long value) { + + } + + @Override + public void writeUnsignedLong(long value) { + + } + + @Override + public void writeSignedLong(long value) { + + } + + @Override + public void writeFloat(float value) { + + } + + @Override + public void writeDouble(double value) { + + } +} From 8e16ad22d611ca0d695ede8af7f16cd6dddbeea4 Mon Sep 17 00:00:00 2001 From: gary-huang Date: Thu, 20 Feb 2025 01:38:11 -0500 Subject: [PATCH 8/8] add first cut llm obs parent children linking --- .../datadog/trace/llmobs/LLMObsServices.java | 37 +++++++++++++++++++ .../datadog/trace/llmobs/LLMObsSystem.java | 11 +++--- .../trace/llmobs/domain/DDLLMObsSpan.java | 22 ++++++++++- .../trace/llmobs/domain/SpanContextInfo.java | 32 ++++++++++++++++ .../writer/ddintake/LLMObsSpanMapper.java | 7 +++- 5 files changed, 100 insertions(+), 9 deletions(-) create mode 100644 dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/SpanContextInfo.java diff --git a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsServices.java b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsServices.java index 600ea4d545a..9f56019125f 100644 --- a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsServices.java +++ b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsServices.java @@ -4,6 +4,9 @@ import datadog.communication.BackendApiFactory; import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.trace.api.Config; +import datadog.trace.llmobs.domain.SpanContextInfo; +import java.util.Stack; +import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,9 +17,43 @@ public class LLMObsServices { final Config config; final BackendApi backendApi; + ThreadLocal> activeSpanContext = new ThreadLocal<>(); + LLMObsServices(Config config, SharedCommunicationObjects sco) { this.config = config; this.backendApi = new BackendApiFactory(config, sco).createBackendApi(BackendApiFactory.Intake.LLMOBS_API); } + + public SpanContextInfo getParentContext() { + if (activeSpanContext.get() == null || activeSpanContext.get().isEmpty()) { + return null; + } + return activeSpanContext.get().peek(); + } + + @Nonnull + public SpanContextInfo getActiveSpanContext() { + if (activeSpanContext.get() == null || activeSpanContext.get().isEmpty()) { + return new SpanContextInfo(); + } + return activeSpanContext.get().peek(); + } + + public void setActiveSpanContext(SpanContextInfo spanContext) { + Stack contexts = activeSpanContext.get(); + if (contexts == null) { + contexts = new Stack<>(); + } + contexts.push(spanContext); + this.activeSpanContext.set(contexts); + } + + public void removeActiveSpanContext() { + Stack contexts = activeSpanContext.get(); + if (contexts == null) { + return; + } + contexts.pop(); + } } diff --git a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsSystem.java b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsSystem.java index fd9d3f3aca4..20d22af8d19 100644 --- a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsSystem.java +++ b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/LLMObsSystem.java @@ -45,6 +45,7 @@ public LLMObsManualSpanFactory( this.serviceName = serviceName; } + @Override public LLMObsSpan startLLMSpan( String spanName, @@ -55,7 +56,7 @@ public LLMObsSpan startLLMSpan( DDLLMObsSpan span = new DDLLMObsSpan( - Tags.LLMOBS_LLM_SPAN_KIND, spanName, getMLApp(mlApp), sessionID, serviceName); + Tags.LLMOBS_LLM_SPAN_KIND, spanName, getMLApp(mlApp), sessionID, serviceName, llmObsServices); span.setTag(LLMObsTags.MODEL_NAME, modelName); span.setTag(LLMObsTags.MODEL_PROVIDER, modelProvider); @@ -66,28 +67,28 @@ public LLMObsSpan startLLMSpan( public LLMObsSpan startAgentSpan( String spanName, @Nullable String mlApp, @Nullable String sessionID) { return new DDLLMObsSpan( - Tags.LLMOBS_AGENT_SPAN_KIND, spanName, getMLApp(mlApp), sessionID, serviceName); + Tags.LLMOBS_AGENT_SPAN_KIND, spanName, getMLApp(mlApp), sessionID, serviceName, llmObsServices); } @Override public LLMObsSpan startToolSpan( String spanName, @Nullable String mlApp, @Nullable String sessionID) { return new DDLLMObsSpan( - Tags.LLMOBS_TOOL_SPAN_KIND, spanName, getMLApp(mlApp), sessionID, serviceName); + Tags.LLMOBS_TOOL_SPAN_KIND, spanName, getMLApp(mlApp), sessionID, serviceName, llmObsServices); } @Override public LLMObsSpan startTaskSpan( String spanName, @Nullable String mlApp, @Nullable String sessionID) { return new DDLLMObsSpan( - Tags.LLMOBS_TASK_SPAN_KIND, spanName, getMLApp(mlApp), sessionID, serviceName); + Tags.LLMOBS_TASK_SPAN_KIND, spanName, getMLApp(mlApp), sessionID, serviceName, llmObsServices); } @Override public LLMObsSpan startWorkflowSpan( String spanName, @Nullable String mlApp, @Nullable String sessionID) { return new DDLLMObsSpan( - Tags.LLMOBS_WORKFLOW_SPAN_KIND, spanName, getMLApp(mlApp), sessionID, serviceName); + Tags.LLMOBS_WORKFLOW_SPAN_KIND, spanName, getMLApp(mlApp), sessionID, serviceName, llmObsServices); } private String getMLApp(String mlApp) { diff --git a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java index 4c88e8171fc..2ce44ea611f 100644 --- a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java +++ b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java @@ -4,12 +4,14 @@ import datadog.trace.api.llmobs.LLMObsSpan; import datadog.trace.api.llmobs.LLMObsTags; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.Tags; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import datadog.trace.llmobs.LLMObsServices; import java.util.List; import java.util.Map; import java.util.Set; @@ -43,6 +45,8 @@ private enum State { private static final String LLM_OBS_INSTRUMENTATION_NAME = "llmobs"; + private static final String PARENT_ID_TAG_INTERNAL = "parent_id"; + private static final Logger LOGGER = LoggerFactory.getLogger(DDLLMObsSpan.class); private final AgentSpan span; @@ -50,32 +54,45 @@ private enum State { private boolean finished = false; - private static final Logger LOGGER = LoggerFactory.getLogger(DDLLMObsSpan.class); + private final LLMObsServices llmObsServices; public DDLLMObsSpan( @Nonnull String kind, String spanName, @Nonnull String mlApp, String sessionID, - @Nonnull String serviceName) { + @Nonnull String serviceName, + @Nonnull LLMObsServices llmObsServices) { if (null == spanName || spanName.isEmpty()) { spanName = kind; } + this.llmObsServices = llmObsServices; + + SpanContextInfo activeSpanCtxInfo = this.llmObsServices.getActiveSpanContext(); + AgentTracer.SpanBuilder spanBuilder = AgentTracer.get() .buildSpan(LLM_OBS_INSTRUMENTATION_NAME, spanName) .withServiceName(serviceName) .withSpanType(DDSpanTypes.LLMOBS); + AgentSpanContext activeCtx = activeSpanCtxInfo.getActiveContext(); + if (!activeSpanCtxInfo.isRoot() && null != activeCtx) { + spanBuilder.asChildOf(activeCtx); + } + this.span = spanBuilder.start(); this.span.setTag(SPAN_KIND, kind); this.spanKind = kind; this.span.setTag(LLMOBS_TAG_PREFIX + LLMObsTags.ML_APP, mlApp); + this.span.setTag(LLMOBS_TAG_PREFIX + PARENT_ID_TAG_INTERNAL, activeSpanCtxInfo.getParentSpanID()); if (sessionID != null && !sessionID.isEmpty()) { this.span.setTag(LLMOBS_TAG_PREFIX + LLMObsTags.SESSION_ID, sessionID); } + + this.llmObsServices.setActiveSpanContext(new SpanContextInfo(span.context(), String.valueOf(span.context().getSpanId()))); } @Override @@ -292,5 +309,6 @@ public void finish() { } this.span.finish(); this.finished = true; + this.llmObsServices.removeActiveSpanContext(); } } diff --git a/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/SpanContextInfo.java b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/SpanContextInfo.java new file mode 100644 index 00000000000..2edb61f38d5 --- /dev/null +++ b/dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/SpanContextInfo.java @@ -0,0 +1,32 @@ +package datadog.trace.llmobs.domain; + +import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; + +public class SpanContextInfo { + private final AgentSpanContext activeContext; + private final String parentSpanID; + + public static final String ROOT_SPAN_ID = "undefined"; + + public SpanContextInfo() { + this.activeContext = null; + this.parentSpanID = ROOT_SPAN_ID; + } + + public SpanContextInfo(AgentSpanContext activeContext, String parentSpanID) { + this.activeContext = activeContext; + this.parentSpanID = parentSpanID; + } + + public boolean isRoot() { + return this.parentSpanID.equals(ROOT_SPAN_ID); + } + + public AgentSpanContext getActiveContext() { + return activeContext; + } + + public String getParentSpanID() { + return parentSpanID; + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java b/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java index 7562a3ec05d..d80ba1c2242 100644 --- a/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java +++ b/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java @@ -63,6 +63,9 @@ public class LLMObsSpanMapper implements RemoteMapper { private static final byte[] METRICS = "metrics".getBytes(StandardCharsets.UTF_8); private static final byte[] TAGS = "tags".getBytes(StandardCharsets.UTF_8); + // TODO is there a better place for this? + private static final String PARENT_ID_TAG_INTERNAL_FULL = LLMOBS_TAG_PREFIX + "parent_id"; + private final LLMObsSpanMapper.MetaWriter metaWriter = new MetaWriter(); private final int size; @@ -101,8 +104,8 @@ public void map(List> trace, Writable writable) { // 3 writable.writeUTF8(PARENT_ID); - // TODO fix after parent ID tracking is in place - writable.writeString("undefined", null); + writable.writeString(span.getTag(PARENT_ID_TAG_INTERNAL_FULL), null); + span.removeTag(PARENT_ID_TAG_INTERNAL_FULL); // 4 writable.writeUTF8(NAME);