Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws-sdk-2.2: Support non-X-Ray propagation for SQS #8405

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,14 @@ public class TracingExecutionInterceptor implements ExecutionInterceptor {
ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.aws-sdk.experimental-span-attributes", false);

private static final boolean USE_MESSAGING_PROPAGATOR =
ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false);
mateuszrzeszutek marked this conversation as resolved.
Show resolved Hide resolved

private final ExecutionInterceptor delegate =
AwsSdkTelemetry.builder(GlobalOpenTelemetry.get())
.setCaptureExperimentalSpanAttributes(CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES)
.setUseConfiguredPropagatorForMessaging(USE_MESSAGING_PROPAGATOR)
.build()
.newExecutionInterceptor();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,52 +20,65 @@
final class AwsSdkInstrumenterFactory {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.aws-sdk-2.2";

static final AttributesExtractor<ExecutionAttributes, SdkHttpResponse> rpcAttributesExtractor =
RpcClientAttributesExtractor.create(AwsSdkRpcAttributesGetter.INSTANCE);
private static final AwsSdkExperimentalAttributesExtractor experimentalAttributesExtractor =
new AwsSdkExperimentalAttributesExtractor();

static final AwsSdkHttpAttributesGetter httpAttributesGetter = new AwsSdkHttpAttributesGetter();
private static final AwsSdkNetAttributesGetter netAttributesGetter =
new AwsSdkNetAttributesGetter();
static final AttributesExtractor<ExecutionAttributes, SdkHttpResponse> httpAttributesExtractor =
HttpClientAttributesExtractor.create(httpAttributesGetter, netAttributesGetter);
static final AttributesExtractor<ExecutionAttributes, SdkHttpResponse> rpcAttributesExtractor =
RpcClientAttributesExtractor.create(AwsSdkRpcAttributesGetter.INSTANCE);
private static final AwsSdkExperimentalAttributesExtractor experimentalAttributesExtractor =
new AwsSdkExperimentalAttributesExtractor();

private static final AwsSdkSpanKindExtractor spanKindExtractor = new AwsSdkSpanKindExtractor();

private static final List<AttributesExtractor<ExecutionAttributes, SdkHttpResponse>>
defaultAttributesExtractors = Arrays.asList(httpAttributesExtractor, rpcAttributesExtractor);
defaultAttributesExtractors = Arrays.asList(rpcAttributesExtractor);

private static final List<AttributesExtractor<ExecutionAttributes, SdkHttpResponse>>
extendedAttributesExtractors =
Arrays.asList(rpcAttributesExtractor, experimentalAttributesExtractor);

private static final List<AttributesExtractor<ExecutionAttributes, SdkHttpResponse>>
defaultConsumerAttributesExtractors =
Arrays.asList(rpcAttributesExtractor, httpAttributesExtractor);

private static final List<AttributesExtractor<ExecutionAttributes, SdkHttpResponse>>
extendedConsumerAttributesExtractors =
Arrays.asList(
httpAttributesExtractor, rpcAttributesExtractor, experimentalAttributesExtractor);
rpcAttributesExtractor, httpAttributesExtractor, experimentalAttributesExtractor);

static Instrumenter<ExecutionAttributes, SdkHttpResponse> requestInstrumenter(
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {

return createInstrumenter(
openTelemetry,
captureExperimentalSpanAttributes,
captureExperimentalSpanAttributes
? extendedAttributesExtractors
: defaultAttributesExtractors,
AwsSdkInstrumenterFactory.spanKindExtractor);
}

static Instrumenter<ExecutionAttributes, SdkHttpResponse> consumerInstrumenter(
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {

return createInstrumenter(
openTelemetry, captureExperimentalSpanAttributes, SpanKindExtractor.alwaysConsumer());
openTelemetry,
captureExperimentalSpanAttributes
? extendedConsumerAttributesExtractors
: defaultConsumerAttributesExtractors,
SpanKindExtractor.alwaysConsumer());
}

private static Instrumenter<ExecutionAttributes, SdkHttpResponse> createInstrumenter(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
List<AttributesExtractor<ExecutionAttributes, SdkHttpResponse>> extractors,
SpanKindExtractor<ExecutionAttributes> spanKindExtractor) {

return Instrumenter.<ExecutionAttributes, SdkHttpResponse>builder(
openTelemetry, INSTRUMENTATION_NAME, AwsSdkInstrumenterFactory::spanName)
.addAttributesExtractors(
captureExperimentalSpanAttributes
? extendedAttributesExtractors
: defaultAttributesExtractors)
.addAttributesExtractors(extractors)
.buildInstrumenter(spanKindExtractor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.instrumentation.awssdk.v2_2;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
Expand Down Expand Up @@ -42,15 +43,24 @@ public static AwsSdkTelemetryBuilder builder(OpenTelemetry openTelemetry) {
private final Instrumenter<ExecutionAttributes, SdkHttpResponse> requestInstrumenter;
private final Instrumenter<ExecutionAttributes, SdkHttpResponse> consumerInstrumenter;
private final boolean captureExperimentalSpanAttributes;
private final TextMapPropagator messagingPropagator;
Oberon00 marked this conversation as resolved.
Show resolved Hide resolved
private final boolean useXrayPropagator;

AwsSdkTelemetry(OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
AwsSdkTelemetry(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
boolean useMessagingPropagator,
boolean useXrayPropagator) {
this.useXrayPropagator = useXrayPropagator;
this.requestInstrumenter =
AwsSdkInstrumenterFactory.requestInstrumenter(
openTelemetry, captureExperimentalSpanAttributes);
this.consumerInstrumenter =
AwsSdkInstrumenterFactory.consumerInstrumenter(
openTelemetry, captureExperimentalSpanAttributes);
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.messagingPropagator =
useMessagingPropagator ? openTelemetry.getPropagators().getTextMapPropagator() : null;
}

/**
Expand All @@ -59,6 +69,10 @@ public static AwsSdkTelemetryBuilder builder(OpenTelemetry openTelemetry) {
*/
public ExecutionInterceptor newExecutionInterceptor() {
return new TracingExecutionInterceptor(
requestInstrumenter, consumerInstrumenter, captureExperimentalSpanAttributes);
requestInstrumenter,
consumerInstrumenter,
captureExperimentalSpanAttributes,
messagingPropagator,
useXrayPropagator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ public final class AwsSdkTelemetryBuilder {

private boolean captureExperimentalSpanAttributes;

private boolean useMessagingPropagator;

private boolean useXrayPropagator = true;

AwsSdkTelemetryBuilder(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
}
Expand All @@ -31,10 +35,50 @@ public AwsSdkTelemetryBuilder setCaptureExperimentalSpanAttributes(
return this;
}

/**
* Sets whether the {@link io.opentelemetry.context.propagation.TextMapPropagator} configured in
* the provided {@link OpenTelemetry} should be used to inject into supported messaging attributes
* (currently only SQS; SNS may follow).
*
* <p>In addition, the X-Ray propagator is always used.
*
* <p>Using the messaging propagator is needed if your tracing vendor requires special tracestate
* entries or legacy propagation information that cannot be transported via X-Ray headers. It may
* also be useful if you need to directly connect spans over messaging in your tracing backend,
* bypassing any intermediate spans/X-Ray segments that AWS may create in the delivery process.
*
* <p>This option is off by default. If enabled, on extraction the configured propagator will be
* preferred over X-Ray if it can extract anything.
*/
@CanIgnoreReturnValue
public AwsSdkTelemetryBuilder setUseConfiguredPropagatorForMessaging(
boolean useMessagingPropagator) {
this.useMessagingPropagator = useMessagingPropagator;
return this;
}

/**
* This setter implemented package-private for testing the messaging propagator, it does not seem
* too useful in general. The option is on by default.
*
* <p>If this needs to be exposed for non-testing use cases, consider if you need to refine this
* feature so that it disable this only for requests supported by {@link
* #setUseConfiguredPropagatorForMessaging(boolean)}
*/
@CanIgnoreReturnValue
AwsSdkTelemetryBuilder setUseXrayPropagator(boolean useMessagingPropagator) {
this.useXrayPropagator = useMessagingPropagator;
return this;
}

/**
* Returns a new {@link AwsSdkTelemetry} with the settings of this {@link AwsSdkTelemetryBuilder}.
*/
public AwsSdkTelemetry build() {
return new AwsSdkTelemetry(openTelemetry, captureExperimentalSpanAttributes);
return new AwsSdkTelemetry(
openTelemetry,
captureExperimentalSpanAttributes,
useMessagingPropagator,
useXrayPropagator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Collections;
import java.util.Map;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.SdkPojo;

/**
* Reflective access to aws-sdk-java-sqs class Message.
Expand All @@ -21,10 +22,18 @@
* prevent the entire instrumentation from loading when the plugin isn't available. We need to
* carefully check this class has all reflection errors result in no-op, and in the future we will
* hopefully come up with a better pattern.
*
* @see <a
* href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/sqs/model/Message.html">SDK
* Javadoc</a>
* @see <a
* href="https://github.com/aws/aws-sdk-java-v2/blob/2.2.0/services/sqs/src/main/resources/codegen-resources/service-2.json#L821-L856">Definition
* JSON</a>
*/
final class SqsMessageAccess {

@Nullable private static final MethodHandle GET_ATTRIBUTES;
@Nullable private static final MethodHandle GET_MESSAGE_ATTRIBUTES;

static {
Class<?> messageClass = null;
Expand All @@ -43,8 +52,18 @@ final class SqsMessageAccess {
// Ignore
}
GET_ATTRIBUTES = getAttributes;

MethodHandle getMessageAttributes = null;
try {
getMessageAttributes =
lookup.findVirtual(messageClass, "messageAttributes", methodType(Map.class));
} catch (NoSuchMethodException | IllegalAccessException e) {
// Ignore
}
GET_MESSAGE_ATTRIBUTES = getMessageAttributes;
} else {
GET_ATTRIBUTES = null;
GET_MESSAGE_ATTRIBUTES = null;
}
}

Expand All @@ -61,4 +80,16 @@ static Map<String, String> getAttributes(Object message) {
}

private SqsMessageAccess() {}

@SuppressWarnings("unchecked")
public static Map<String, SdkPojo> getMessageAttributes(Object message) {
if (GET_MESSAGE_ATTRIBUTES == null) {
return Collections.emptyMap();
}
try {
return (Map<String, SdkPojo>) GET_MESSAGE_ATTRIBUTES.invoke(message);
} catch (Throwable t) {
return Collections.emptyMap();
}
}
}
Loading