Skip to content

Commit

Permalink
End grpc server span in onComplete instead of close (#11170)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit committed Apr 30, 2024
1 parent 5cc3362 commit c92955f
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ final class TracingServerCall<REQUEST, RESPONSE>
extends ForwardingServerCall.SimpleForwardingServerCall<REQUEST, RESPONSE> {
private final Context context;
private final GrpcRequest request;
private Status status;

// Used by MESSAGE_ID_UPDATER
@SuppressWarnings("UnusedVariable")
Expand Down Expand Up @@ -101,13 +102,13 @@ public void sendMessage(RESPONSE message) {

@Override
public void close(Status status, Metadata trailers) {
this.status = status;
try {
delegate().close(status, trailers);
} catch (Throwable e) {
instrumenter.end(context, request, status, e);
throw e;
}
instrumenter.end(context, request, status, status.getCause());
}

final class TracingServerCallListener
Expand Down Expand Up @@ -165,6 +166,10 @@ public void onComplete() {
instrumenter.end(context, request, Status.UNKNOWN, e);
throw e;
}
if (status == null) {
status = Status.UNKNOWN;
}
instrumenter.end(context, request, status, status.getCause());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import io.grpc.ServerBuilder;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.util.ThrowingRunnable;
import io.opentelemetry.sdk.testing.assertj.EventDataAssert;
Expand All @@ -33,11 +35,13 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.cartesian.CartesianTest;

public abstract class AbstractGrpcStreamingTest {
Expand Down Expand Up @@ -264,6 +268,92 @@ public void onCompleted() {
(long) Status.Code.OK.value()))))));
}

@Test
void grpcServerSpanEndsAfterChildSpan() throws Exception {
Tracer tracer = testing().getOpenTelemetry().getTracer("test");
AtomicBoolean serverSpanRecording = new AtomicBoolean();
CountDownLatch latch = new CountDownLatch(2);

BindableService greeter =
new GreeterGrpc.GreeterImplBase() {
@Override
public StreamObserver<Helloworld.Response> conversation(
StreamObserver<Helloworld.Response> observer) {
return new StreamObserver<Helloworld.Response>() {
Span span;

@Override
public void onNext(Helloworld.Response value) {
span = tracer.spanBuilder("child").startSpan();
observer.onNext(value);
}

@Override
public void onError(Throwable t) {
observer.onError(t);
span.end();
}

@Override
public void onCompleted() {
observer.onCompleted();
serverSpanRecording.set(Span.current().isRecording());
span.end();
latch.countDown();
}
};
}
};

Server server = configureServer(ServerBuilder.forPort(0).addService(greeter)).build().start();
ManagedChannel channel = createChannel(server);
closer.add(() -> channel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS));
closer.add(() -> server.shutdownNow().awaitTermination());

GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel).withWaitForReady();

StreamObserver<Helloworld.Response> observer2 =
client.conversation(
new StreamObserver<Helloworld.Response>() {
@Override
public void onNext(Helloworld.Response value) {}

@Override
public void onError(Throwable t) {}

@Override
public void onCompleted() {
latch.countDown();
}
});

Helloworld.Response message = Helloworld.Response.newBuilder().setMessage("message").build();
observer2.onNext(message);
observer2.onCompleted();

latch.await(10, TimeUnit.SECONDS);

// server span should end after child span
assertThat(serverSpanRecording).isTrue();

testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("example.Greeter/Conversation")
.hasKind(SpanKind.CLIENT)
.hasNoParent(),
span ->
span.hasName("example.Greeter/Conversation")
.hasKind(SpanKind.SERVER)
.hasParent(trace.getSpan(0)),
span ->
span.hasName("child")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(1))));
}

private ManagedChannel createChannel(Server server) throws Exception {
ManagedChannelBuilder<?> channelBuilder =
configureClient(ManagedChannelBuilder.forAddress("localhost", server.getPort()));
Expand Down

0 comments on commit c92955f

Please sign in to comment.