Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement capturing message headers for aws2 sqs spans #9842

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ plugins {
base.archivesName.set("${base.archivesName.get()}-autoconfigure")

dependencies {
compileOnly(project(":javaagent-extension-api"))

implementation(project(":instrumentation:aws-sdk:aws-sdk-2.2:library"))

library("software.amazon.awssdk:aws-core:2.2.0")
Expand All @@ -29,5 +31,6 @@ tasks {
systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", true)
systemProperty("otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", true)
systemProperty("otel.instrumentation.aws-sdk.experimental-record-individual-http-error", true)
systemProperty("otel.instrumentation.messaging.experimental.capture-headers", "test-message-header")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,75 @@

package io.opentelemetry.instrumentation.awssdk.v2_2.autoconfigure;

import static java.util.Collections.emptyList;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil;
import io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkTelemetry;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import java.util.List;

public final class AwsSdkSingletons {

private static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES =
ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.aws-sdk.experimental-span-attributes", false);
private static final boolean HAS_INSTRUMENTATION_CONFIG = hasAgentConfiguration();
private static final AwsSdkTelemetry TELEMETRY =
AwsSdkTelemetry.builder(GlobalOpenTelemetry.get())
.setCapturedHeaders(getCapturedHeaders())
.setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes())
.setMessagingReceiveInstrumentationEnabled(messagingReceiveInstrumentationEnabled())
.setUseConfiguredPropagatorForMessaging(useMessagingPropagator())
.setRecordIndividualHttpError(recordIndividualHttpError())
.build();

private static final boolean USE_MESSAGING_PROPAGATOR =
ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false);
private static boolean hasAgentConfiguration() {
try {
Class.forName("io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig");
return true;
} catch (ClassNotFoundException e) {
return false;
}
}

private static final boolean RECORD_INDIVIDUAL_HTTP_ERROR =
ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.aws-sdk.experimental-record-individual-http-error", false);
private static List<String> getCapturedHeaders() {
if (HAS_INSTRUMENTATION_CONFIG) {
return ExperimentalConfig.get().getMessagingHeaders();
} else {
return ConfigPropertiesUtil.getList(
"otel.instrumentation.messaging.experimental.capture-headers", emptyList());
}
}

private static boolean captureExperimentalSpanAttributes() {
return getBoolean("otel.instrumentation.aws-sdk.experimental-span-attributes", false);
}

private static final boolean RECEIVE_TELEMETRY_ENABLED =
ConfigPropertiesUtil.getBoolean(
private static boolean messagingReceiveInstrumentationEnabled() {
if (HAS_INSTRUMENTATION_CONFIG) {
return ExperimentalConfig.get().messagingReceiveInstrumentationEnabled();
} else {
return ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false);
}
}

private static final AwsSdkTelemetry TELEMETRY =
AwsSdkTelemetry.builder(GlobalOpenTelemetry.get())
.setCaptureExperimentalSpanAttributes(CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES)
.setMessagingReceiveInstrumentationEnabled(RECEIVE_TELEMETRY_ENABLED)
.setUseConfiguredPropagatorForMessaging(USE_MESSAGING_PROPAGATOR)
.setRecordIndividualHttpError(RECORD_INDIVIDUAL_HTTP_ERROR)
.build();
private static boolean useMessagingPropagator() {
return getBoolean(
"otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false);
}

private static boolean recordIndividualHttpError() {
return getBoolean(
"otel.instrumentation.aws-sdk.experimental-record-individual-http-error", false);
}

private static boolean getBoolean(String name, boolean defaultValue) {
if (HAS_INSTRUMENTATION_CONFIG) {
return InstrumentationConfig.get().getBoolean(name, defaultValue);
} else {
return ConfigPropertiesUtil.getBoolean(name, defaultValue);
}
}

public static AwsSdkTelemetry telemetry() {
return TELEMETRY;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.awssdk.v2_2;

import software.amazon.awssdk.core.interceptor.ExecutionAttributes;

abstract class AbstractSqsRequest {

public abstract ExecutionAttributes getRequest();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package io.opentelemetry.instrumentation.awssdk.v2_2;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
Expand All @@ -18,11 +21,13 @@
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcClientAttributesExtractor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute;
Expand Down Expand Up @@ -63,38 +68,73 @@ final class AwsSdkInstrumenterFactory {
Arrays.asList(
rpcAttributesExtractor, httpAttributesExtractor, experimentalAttributesExtractor);

static Instrumenter<ExecutionAttributes, Response> requestInstrumenter(
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
private final OpenTelemetry openTelemetry;
@Nullable private final TextMapPropagator messagingPropagator;
private final List<String> capturedHeaders;
private final boolean captureExperimentalSpanAttributes;
private final boolean messagingReceiveInstrumentationEnabled;
private final boolean useXrayPropagator;

AwsSdkInstrumenterFactory(
OpenTelemetry openTelemetry,
@Nullable TextMapPropagator messagingPropagator,
List<String> capturedHeaders,
boolean captureExperimentalSpanAttributes,
boolean messagingReceiveInstrumentationEnabled,
boolean useXrayPropagator) {
this.openTelemetry = openTelemetry;
this.messagingPropagator = messagingPropagator;
this.capturedHeaders = capturedHeaders;
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.messagingReceiveInstrumentationEnabled = messagingReceiveInstrumentationEnabled;
this.useXrayPropagator = useXrayPropagator;
}

Instrumenter<ExecutionAttributes, Response> requestInstrumenter() {
return createInstrumenter(
openTelemetry,
captureExperimentalSpanAttributes
? extendedAttributesExtractors
: defaultAttributesExtractors,
AwsSdkInstrumenterFactory::spanName,
SpanKindExtractor.alwaysClient(),
attributesExtractors(),
emptyList(),
true);
}

static Instrumenter<ExecutionAttributes, Response> consumerReceiveInstrumenter(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
boolean messagingReceiveInstrumentationEnabled) {
return sqsInstrumenter(
private List<AttributesExtractor<ExecutionAttributes, Response>> attributesExtractors() {
return captureExperimentalSpanAttributes
? extendedAttributesExtractors
: defaultAttributesExtractors;
}

private List<AttributesExtractor<ExecutionAttributes, Response>> consumerAttributesExtractors() {
return captureExperimentalSpanAttributes
? extendedConsumerAttributesExtractors
: defaultConsumerAttributesExtractors;
}

private <REQUEST, RESPONSE> AttributesExtractor<REQUEST, RESPONSE> messagingAttributesExtractor(
MessagingAttributesGetter<REQUEST, RESPONSE> getter, MessageOperation operation) {
return MessagingAttributesExtractor.builder(getter, operation)
.setCapturedHeaders(capturedHeaders)
.build();
}

Instrumenter<SqsReceiveRequest, Response> consumerReceiveInstrumenter() {
MessageOperation operation = MessageOperation.RECEIVE;
SqsReceiveRequestAttributesGetter getter = SqsReceiveRequestAttributesGetter.INSTANCE;
AttributesExtractor<SqsReceiveRequest, Response> messagingAttributeExtractor =
messagingAttributesExtractor(getter, operation);

return createInstrumenter(
openTelemetry,
MessageOperation.RECEIVE,
captureExperimentalSpanAttributes
? extendedConsumerAttributesExtractors
: defaultConsumerAttributesExtractors,
MessagingSpanNameExtractor.create(getter, operation),
SpanKindExtractor.alwaysConsumer(),
toSqsRequestExtractors(consumerAttributesExtractors(), Function.identity()),
singletonList(messagingAttributeExtractor),
messagingReceiveInstrumentationEnabled);
}

static Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter(
OpenTelemetry openTelemetry,
TextMapPropagator messagingPropagator,
boolean captureExperimentalSpanAttributes,
boolean messagingReceiveInstrumentationEnabled,
boolean shouldUseXrayPropagator) {
Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter() {
MessageOperation operation = MessageOperation.PROCESS;
SqsProcessRequestAttributesGetter getter = SqsProcessRequestAttributesGetter.INSTANCE;

Expand All @@ -104,96 +144,83 @@ static Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter(
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractors(
toProcessRequestExtractors(
captureExperimentalSpanAttributes
? extendedConsumerAttributesExtractors
: defaultConsumerAttributesExtractors))
.addAttributesExtractor(
MessagingAttributesExtractor.builder(getter, operation).build());
toSqsRequestExtractors(consumerAttributesExtractors(), unused -> null))
.addAttributesExtractor(messagingAttributesExtractor(getter, operation));

if (messagingReceiveInstrumentationEnabled) {
builder.addSpanLinksExtractor(
(spanLinks, parentContext, request) -> {
Context extracted =
SqsParentContext.ofMessage(
request.getMessage(), messagingPropagator, shouldUseXrayPropagator);
request.getMessage(), messagingPropagator, useXrayPropagator);
spanLinks.addLink(Span.fromContext(extracted).getSpanContext());
});
}
return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}

private static List<AttributesExtractor<SqsProcessRequest, Void>> toProcessRequestExtractors(
List<AttributesExtractor<ExecutionAttributes, Response>> extractors) {
List<AttributesExtractor<SqsProcessRequest, Void>> result = new ArrayList<>();
private static <RESPONSE>
List<AttributesExtractor<AbstractSqsRequest, RESPONSE>> toSqsRequestExtractors(
List<AttributesExtractor<ExecutionAttributes, Response>> extractors,
Function<RESPONSE, Response> responseConverter) {
List<AttributesExtractor<AbstractSqsRequest, RESPONSE>> result = new ArrayList<>();
for (AttributesExtractor<ExecutionAttributes, Response> extractor : extractors) {
result.add(
new AttributesExtractor<SqsProcessRequest, Void>() {
new AttributesExtractor<AbstractSqsRequest, RESPONSE>() {
@Override
public void onStart(
AttributesBuilder attributes,
Context parentContext,
SqsProcessRequest sqsProcessRequest) {
extractor.onStart(attributes, parentContext, sqsProcessRequest.getRequest());
AbstractSqsRequest sqsRequest) {
extractor.onStart(attributes, parentContext, sqsRequest.getRequest());
}

@Override
public void onEnd(
AttributesBuilder attributes,
Context context,
SqsProcessRequest sqsProcessRequest,
@Nullable Void unused,
AbstractSqsRequest sqsRequest,
@Nullable RESPONSE response,
@Nullable Throwable error) {
extractor.onEnd(attributes, context, sqsProcessRequest.getRequest(), null, error);
extractor.onEnd(
attributes,
context,
sqsRequest.getRequest(),
responseConverter.apply(response),
error);
}
});
}
return result;
}

static Instrumenter<ExecutionAttributes, Response> producerInstrumenter(
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
return sqsInstrumenter(
openTelemetry,
MessageOperation.PUBLISH,
captureExperimentalSpanAttributes
? extendedAttributesExtractors
: defaultAttributesExtractors,
true);
}

private static Instrumenter<ExecutionAttributes, Response> sqsInstrumenter(
OpenTelemetry openTelemetry,
MessageOperation operation,
List<AttributesExtractor<ExecutionAttributes, Response>> extractors,
boolean enabled) {
Instrumenter<ExecutionAttributes, Response> producerInstrumenter() {
MessageOperation operation = MessageOperation.PUBLISH;
SqsAttributesGetter getter = SqsAttributesGetter.INSTANCE;
AttributesExtractor<ExecutionAttributes, Response> messagingAttributeExtractor =
MessagingAttributesExtractor.builder(getter, operation).build();
List<AttributesExtractor<ExecutionAttributes, Response>> newExtractors =
new ArrayList<>(extractors);
newExtractors.add(messagingAttributeExtractor);
messagingAttributesExtractor(getter, operation);

return createInstrumenter(
openTelemetry,
newExtractors,
MessagingSpanNameExtractor.create(getter, operation),
operation == MessageOperation.PUBLISH
? SpanKindExtractor.alwaysProducer()
: SpanKindExtractor.alwaysConsumer(),
enabled);
SpanKindExtractor.alwaysProducer(),
attributesExtractors(),
singletonList(messagingAttributeExtractor),
true);
}

private static Instrumenter<ExecutionAttributes, Response> createInstrumenter(
private static <REQUEST, RESPONSE> Instrumenter<REQUEST, RESPONSE> createInstrumenter(
OpenTelemetry openTelemetry,
List<AttributesExtractor<ExecutionAttributes, Response>> extractors,
SpanNameExtractor<ExecutionAttributes> spanNameExtractor,
SpanKindExtractor<ExecutionAttributes> spanKindExtractor,
SpanNameExtractor<REQUEST> spanNameExtractor,
SpanKindExtractor<REQUEST> spanKindExtractor,
List<? extends AttributesExtractor<? super REQUEST, ? super RESPONSE>> attributeExtractors,
List<AttributesExtractor<REQUEST, RESPONSE>> additionalAttributeExtractors,
boolean enabled) {

return Instrumenter.<ExecutionAttributes, Response>builder(
return Instrumenter.<REQUEST, RESPONSE>builder(
openTelemetry, INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractors(extractors)
.addAttributesExtractors(attributeExtractors)
.addAttributesExtractors(additionalAttributeExtractors)
.setEnabled(enabled)
.buildInstrumenter(spanKindExtractor);
}
Expand All @@ -203,6 +230,4 @@ private static String spanName(ExecutionAttributes attributes) {
String awsOperation = attributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME);
return awsServiceName + "." + awsOperation;
}

private AwsSdkInstrumenterFactory() {}
}
Loading
Loading