From 95a95f2f26b59b6ea9cdc0884c73a7a266ab8ad1 Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Mon, 17 Oct 2022 22:29:02 +0200 Subject: [PATCH] End reactor-netty HTTP client span properly on `Mono#timeout()` (#6891) Calling `Mono#timeout()` with a timeout value smaller than the HTTP client timeout caused the on request/response end callbacks to be simply discarded; and the HTTP span was never finished. --- .../HttpResponseReceiverInstrumenter.java | 63 +++++++++++++------ .../AbstractReactorNettyHttpClientTest.java | 52 +++++++++++++++ 2 files changed, 95 insertions(+), 20 deletions(-) diff --git a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java index 899541dd082e..fda2e86f27fe 100644 --- a/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java +++ b/instrumentation/reactor/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java @@ -13,6 +13,7 @@ import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.netty.v4_1.NettyClientTelemetry; import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.BiConsumer; import java.util.function.Function; import javax.annotation.Nullable; @@ -58,8 +59,20 @@ public static HttpClient.ResponseReceiver instrument(HttpClient.ResponseRecei } static final class ContextHolder { + + private static final AtomicReferenceFieldUpdater contextUpdater = + AtomicReferenceFieldUpdater.newUpdater(ContextHolder.class, Context.class, "context"); + volatile Context parentContext; volatile Context context; + + void setContext(Context context) { + contextUpdater.set(this, context); + } + + Context getAndRemoveContext() { + return contextUpdater.getAndSet(this, null); + } } static final class StartOperation @@ -76,23 +89,33 @@ static final class StartOperation @Override public Mono apply(Mono mono) { return Mono.defer( - () -> { - Context parentContext = Context.current(); - contextHolder.parentContext = parentContext; - if (!instrumenter().shouldStart(parentContext, config)) { - // make context accessible via the reactor ContextView - the doOn* callbacks - // instrumentation uses this to set the proper context for callbacks - return mono.contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext)); - } - - Context context = instrumenter().start(parentContext, config); - contextHolder.context = context; - return ContextPropagationOperator.runWithContext(mono, context) - // make contexts accessible via the reactor ContextView - the doOn* callbacks - // instrumentation uses the parent context to set the proper context for callbacks - .contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext)) - .contextWrite(ctx -> ctx.put(CLIENT_CONTEXT_KEY, context)); - }); + () -> { + Context parentContext = Context.current(); + contextHolder.parentContext = parentContext; + if (!instrumenter().shouldStart(parentContext, config)) { + // make context accessible via the reactor ContextView - the doOn* callbacks + // instrumentation uses this to set the proper context for callbacks + return mono.contextWrite( + ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext)); + } + + Context context = instrumenter().start(parentContext, config); + contextHolder.setContext(context); + return ContextPropagationOperator.runWithContext(mono, context) + // make contexts accessible via the reactor ContextView - the doOn* callbacks + // instrumentation uses the parent context to set the proper context for + // callbacks + .contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext)) + .contextWrite(ctx -> ctx.put(CLIENT_CONTEXT_KEY, context)); + }) + .doOnCancel( + () -> { + Context context = contextHolder.getAndRemoveContext(); + if (context == null) { + return; + } + instrumenter().end(context, config, null, null); + }); } } @@ -134,7 +157,7 @@ static final class EndOperationWithRequestError @Override public void accept(HttpClientRequest httpClientRequest, Throwable error) { - Context context = contextHolder.context; + Context context = contextHolder.getAndRemoveContext(); if (context == null) { return; } @@ -155,7 +178,7 @@ static final class EndOperationWithResponseError @Override public void accept(HttpClientResponse response, Throwable error) { - Context context = contextHolder.context; + Context context = contextHolder.getAndRemoveContext(); if (context == null) { return; } @@ -175,7 +198,7 @@ static final class EndOperationWithSuccess implements BiConsumer + testing.runWithSpan( + "parent", + () -> + httpClient + .get() + .uri(uri) + .responseSingle( + (resp, content) -> { + // Make sure to consume content since that's when we close the + // span. + return content.map(unused -> resp); + }) + // apply Mono timeout that is way shorter than HTTP request timeout + .timeout(Duration.ofSeconds(1)) + .block())); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("parent") + .hasKind(SpanKind.INTERNAL) + .hasNoParent() + .hasStatus(StatusData.error()) + .hasException(thrown), + span -> + span.hasName("HTTP GET") + .hasKind(CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.HTTP_METHOD, "GET"), + equalTo(SemanticAttributes.HTTP_URL, uri.toString()), + equalTo(SemanticAttributes.HTTP_USER_AGENT, USER_AGENT), + equalTo(SemanticAttributes.NET_PEER_NAME, "localhost"), + equalTo(SemanticAttributes.NET_PEER_PORT, uri.getPort())), + span -> + span.hasName("test-http-server") + .hasKind(SpanKind.SERVER) + .hasParent(trace.getSpan(1)))); + } + private static void assertSameSpan(SpanData expected, AtomicReference actual) { SpanContext expectedSpanContext = expected.getSpanContext(); SpanContext actualSpanContext = actual.get().getSpanContext();