Skip to content

Commit

Permalink
Propagate trace when upgrading connections
Browse files Browse the repository at this point in the history
Fix #4630
  • Loading branch information
Sgitario authored and vietj committed Feb 28, 2023
1 parent ae7b256 commit 9561e0d
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ Object metric() {
return metric;
}

Object trace() {
return trace;
}

abstract void handleContinue();
abstract void handleEarlyHints(MultiMap headers);
abstract void handleHead(HttpResponseHead response);
Expand Down Expand Up @@ -523,6 +527,11 @@ public Object metric() {
return super.metric();
}

@Override
public Object trace() {
return super.trace();
}

@Override
public HttpVersion version() {
return conn.version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,14 @@ public HttpClientMetrics metrics() {
return client.metrics();
}

void upgradeStream(Object metric, ContextInternal context, Handler<AsyncResult<HttpClientStream>> completionHandler) {
void upgradeStream(Object metric, Object trace, ContextInternal context, Handler<AsyncResult<HttpClientStream>> completionHandler) {
Future<HttpClientStream> fut;
synchronized (this) {
try {
StreamImpl stream = createStream(context);
stream.init(handler.connection().stream(1));
((Stream)stream).metric = metric;
stream.metric = metric;
stream.trace = trace;
fut = Future.succeededFuture(stream);
} catch (Exception e) {
fut = Future.failedFuture(e);
Expand Down Expand Up @@ -271,6 +272,10 @@ public Object metric() {
return metric;
}

public Object trace() {
return trace;
}

@Override
void doWriteData(ByteBuf chunk, boolean end, Handler<AsyncResult<Void>> handler) {
super.doWriteData(chunk, end, handler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ public Object metric() {
return delegate.metric();
}

@Override
public Object trace() {
return delegate.trace();
}

@Override
public HttpVersion version() {
return delegate.version();
Expand Down Expand Up @@ -352,7 +357,7 @@ public void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeRespons
return;
}
Http2ClientConnection conn = (Http2ClientConnection) future.getNow();
conn.upgradeStream(upgradingStream.metric(), upgradingStream.getContext(), ar -> {
conn.upgradeStream(upgradingStream.metric(), upgradingStream.trace(), upgradingStream.getContext(), ar -> {
upgradingConnection.closeHandler(null);
upgradingConnection.exceptionHandler(null);
upgradingConnection.evictionHandler(null);
Expand Down Expand Up @@ -524,6 +529,11 @@ public Object metric() {
return upgradingStream.metric();
}

@Override
public Object trace() {
return upgradingStream.trace();
}

@Override
public HttpVersion version() {
HttpClientStream s = upgradedStream;
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/vertx/core/http/impl/HttpClientStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public interface HttpClientStream extends WriteStream<Buffer> {

Object metric();

Object trace();

/**
* @return the stream version or null if it's not yet determined
*/
Expand Down
49 changes: 49 additions & 0 deletions src/test/java/io/vertx/core/spi/tracing/Http2TracerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,25 @@
*/
package io.vertx.core.spi.tracing;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.Ignore;
import org.junit.Test;

import io.vertx.core.http.Http2TestBase;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.tracing.TracingPolicy;
import io.vertx.test.faketracer.FakeTracer;

public class Http2TracerTest extends HttpTracerTestBase {

private static final String SPAN_KIND_SERVER = "server";
private static final String SPAN_KIND_CLIENT = "client";
private static final String SPAN_KIND_KEY = "span_kind";

@Override
protected HttpServerOptions createBaseServerOptions() {
return Http2TestBase.createHttp2ServerOptions(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST);
Expand All @@ -25,4 +38,40 @@ protected HttpServerOptions createBaseServerOptions() {
protected HttpClientOptions createBaseClientOptions() {
return Http2TestBase.createHttp2ClientOptions();
}

@Test
public void testTracingWorksAfterUpgrading() throws Exception {
client.close();
FakeTracer fakeTracer = new FakeTracer();
setTracer(fakeTracer);

client = vertx.createHttpClient(new HttpClientOptions()
// setting policy always, if not the span with kind client is not sent
.setTracingPolicy(TracingPolicy.ALWAYS)
.setProtocolVersion(HttpVersion.HTTP_2));
// Server always return "Ok"
server = vertx.createHttpServer();
server.requestHandler(req -> {
req.response().end("Ok");
});
startServer(testAddress);
CountDownLatch finished = new CountDownLatch(1);
waitFor(1);
client.request(requestOptions).onSuccess(request -> {
request.connection().closeHandler((v) -> {
finished.countDown();
});
request.send().onSuccess(response -> {
complete();
});
});
await();
finished.await(5, TimeUnit.SECONDS);
// There should be 2 spans: 1 of kind server and 1 of kind client.
assertEquals(2, fakeTracer.getFinishedSpans().size());
assertTrue("Span with kind server was not found!",
fakeTracer.getFinishedSpans().stream().anyMatch(s -> SPAN_KIND_SERVER.equals(s.getTags().get(SPAN_KIND_KEY))));
assertTrue("Span with kind client was not found!",
fakeTracer.getFinishedSpans().stream().anyMatch(s -> SPAN_KIND_CLIENT.equals(s.getTags().get(SPAN_KIND_KEY))));
}
}
44 changes: 18 additions & 26 deletions src/test/java/io/vertx/core/spi/tracing/HttpTracerTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,15 @@ public abstract class HttpTracerTestBase extends HttpTestBase {

@Override
protected VertxTracer getTracer() {
return tracer = new VertxTracer() {
@Override
public Object receiveRequest(Context context, SpanKind kind, TracingPolicy policy, Object request, String operation, Iterable headers, TagExtractor tagExtractor) {
return tracer.receiveRequest(context, kind, policy, request, operation, headers, tagExtractor);
}
@Override
public void sendResponse(Context context, Object response, Object payload, Throwable failure, TagExtractor tagExtractor) {
tracer.sendResponse(context, response, payload, failure, tagExtractor);
}
@Override
public Object sendRequest(Context context, SpanKind kind, TracingPolicy policy, Object request, String operation, BiConsumer headers, TagExtractor tagExtractor) {
return tracer.sendRequest(context, kind, policy, request, operation, headers, tagExtractor);
}
@Override
public void receiveResponse(Context context, Object response, Object payload, Throwable failure, TagExtractor tagExtractor) {
tracer.receiveResponse(context, response, payload, failure, tagExtractor);
}
};
return tracer;
}

@Test
public void testHttpServer() throws Exception {
String key = TestUtils.randomAlphaString(10);
Object val = new Object();
AtomicInteger seq = new AtomicInteger();
tracer = new VertxTracer() {
setTracer(new VertxTracer() {
@Override
public Object receiveRequest(Context context, SpanKind kind, TracingPolicy policy, Object request, String operation, Iterable headers, TagExtractor tagExtractor) {
assertNull(context.getLocal(key));
Expand All @@ -77,7 +60,7 @@ public void sendResponse(Context context, Object response, Object payload, Throw
assertSame(val, context.getLocal(key));
assertTrue(context.removeLocal(key));
}
};
});
CountDownLatch latch = new CountDownLatch(1);
server.requestHandler(req -> {
assertEquals(1, seq.get());
Expand Down Expand Up @@ -106,7 +89,7 @@ public void testHttpServerError() throws Exception {
String key = TestUtils.randomAlphaString(10);
Object val = new Object();
AtomicInteger seq = new AtomicInteger();
tracer = new VertxTracer() {
setTracer(new VertxTracer() {
@Override
public Object receiveRequest(Context context, SpanKind kind, TracingPolicy policy, Object request, String operation, Iterable headers, TagExtractor tagExtractor) {
assertNull(context.getLocal(key));
Expand All @@ -121,7 +104,7 @@ public void sendResponse(Context context, Object response, Object payload, Throw
assertNotNull(failure);
assertTrue(context.removeLocal(key));
}
};
});
CountDownLatch latch = new CountDownLatch(1);
server.requestHandler(req -> {
assertEquals(1, seq.get());
Expand Down Expand Up @@ -167,7 +150,7 @@ private void testHttpClientRequest(RequestOptions request, String expectedOperat
Object val = new Object();
AtomicInteger seq = new AtomicInteger();
String traceId = UUID.randomUUID().toString();
tracer = new VertxTracer() {
setTracer(new VertxTracer() {
@Override
public Object sendRequest(Context context, SpanKind kind, TracingPolicy policy, Object request, String operation, BiConsumer headers, TagExtractor tagExtractor) {
assertSame(val, context.getLocal(key));
Expand All @@ -187,7 +170,7 @@ public void receiveResponse(Context context, Object response, Object payload, Th
assertNull(failure);
assertTrue(seq.compareAndSet(1, 2));
}
};
});
CountDownLatch latch = new CountDownLatch(1);
server.requestHandler(req -> {
assertEquals(traceId, req.getHeader("X-B3-TraceId"));
Expand Down Expand Up @@ -222,7 +205,7 @@ public void testHttpClientError() throws Exception {
Object val = new Object();
AtomicInteger seq = new AtomicInteger();
String traceId = UUID.randomUUID().toString();
tracer = new VertxTracer() {
setTracer(new VertxTracer() {
@Override
public Object sendRequest(Context context, SpanKind kind, TracingPolicy policy, Object request, String operation, BiConsumer headers, TagExtractor tagExtractor) {
assertSame(val, context.getLocal(key));
Expand All @@ -238,7 +221,7 @@ public void receiveResponse(Context context, Object response, Object payload, Th
assertNotNull(failure);
assertTrue(seq.compareAndSet(1, 2));
}
};
});
CountDownLatch latch = new CountDownLatch(1);
server.requestHandler(req -> {
assertEquals(traceId, req.getHeader("X-B3-TraceId"));
Expand All @@ -262,4 +245,13 @@ public void receiveResponse(Context context, Object response, Object payload, Th
});
await();
}

protected void setTracer(VertxTracer tracer) {
this.server.close();
this.tracer = tracer;
// So, the vertx options is reset with the new tracer.
this.vertx = vertx(getOptions());
this.server = this.vertx.createHttpServer(createBaseServerOptions());
this.client = this.vertx.createHttpClient(createBaseClientOptions());
}
}

0 comments on commit 9561e0d

Please sign in to comment.