From 1959291c43a57365b965af8decb3f8cf493ec85f Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Tue, 28 Apr 2020 11:08:14 +0800 Subject: [PATCH] Makes it possible to read commonly defined gRPC data --- .../java/brave/grpc/GrpcClientRequest.java | 27 +-- .../java/brave/grpc/GrpcClientResponse.java | 38 ++-- .../src/main/java/brave/grpc/GrpcRequest.java | 54 +++++ .../main/java/brave/grpc/GrpcResponse.java | 55 +++++ .../java/brave/grpc/GrpcServerRequest.java | 32 ++- .../java/brave/grpc/GrpcServerResponse.java | 28 ++- .../brave/grpc/TracingClientInterceptor.java | 5 +- .../brave/grpc/TracingServerInterceptor.java | 5 +- .../grpc/BaseITTracingClientInterceptor.java | 189 +++++++++--------- .../grpc/BaseITTracingServerInterceptor.java | 116 ++++++----- .../brave/grpc/GrpcClientResponseTest.java | 16 +- .../brave/grpc/GrpcServerResponseTest.java | 15 +- .../src/test/java/brave/grpc/TestServer.java | 51 +++-- .../java/brave/rpc/RpcResponseParser.java | 21 +- 14 files changed, 396 insertions(+), 256 deletions(-) create mode 100644 instrumentation/grpc/src/main/java/brave/grpc/GrpcRequest.java create mode 100644 instrumentation/grpc/src/main/java/brave/grpc/GrpcResponse.java diff --git a/instrumentation/grpc/src/main/java/brave/grpc/GrpcClientRequest.java b/instrumentation/grpc/src/main/java/brave/grpc/GrpcClientRequest.java index 78c9f9b81b..01dcf99a46 100644 --- a/instrumentation/grpc/src/main/java/brave/grpc/GrpcClientRequest.java +++ b/instrumentation/grpc/src/main/java/brave/grpc/GrpcClientRequest.java @@ -14,7 +14,6 @@ package brave.grpc; import brave.rpc.RpcClientRequest; -import brave.rpc.RpcTracing; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; @@ -27,27 +26,11 @@ /** * Allows access gRPC specific aspects of a client request during sampling and parsing. * - *

Here's an example that adds default tags, and if gRPC, the {@linkplain - * MethodDescriptor#getType() method type}: - *

{@code
- * Tag methodType = new Tag("grpc.method_type") {
- *   protected String parseValue(GrpcClientRequest input, TraceContext context) {
- *     return input.methodDescriptor().getType().name();
- *   }
- * };
- * rpcTracing = rpcTracingBuilder.clientResponseParser((res, context, span) -> {
- *   RpcResponseParser.DEFAULT.parse(res, context, span);
- *     if (res instanceof GrpcClientRequest) {
- *       methodType.tag((GrpcClientRequest) res, span);
- *     }
- *   }).build();
- * }
- * * @see GrpcClientResponse - * @see RpcTracing#clientRequestParser() + * @see GrpcRequest for a parsing example * @since 5.12 */ -public final class GrpcClientRequest extends RpcClientRequest { +public final class GrpcClientRequest extends RpcClientRequest implements GrpcRequest { final Map> nameToKey; final MethodDescriptor methodDescriptor; final CallOptions callOptions; @@ -55,7 +38,7 @@ public final class GrpcClientRequest extends RpcClientRequest { final Metadata headers; GrpcClientRequest(Map> nameToKey, MethodDescriptor methodDescriptor, - CallOptions callOptions, ClientCall call, Metadata headers) { + CallOptions callOptions, ClientCall call, Metadata headers) { if (nameToKey == null) throw new NullPointerException("nameToKey == null"); if (methodDescriptor == null) throw new NullPointerException("methodDescriptor == null"); if (callOptions == null) throw new NullPointerException("callOptions == null"); @@ -88,7 +71,7 @@ public final class GrpcClientRequest extends RpcClientRequest { * * @since 5.12 */ - public MethodDescriptor methodDescriptor() { + @Override public MethodDescriptor methodDescriptor() { return methodDescriptor; } @@ -118,7 +101,7 @@ public CallOptions callOptions() { * * @since 5.12 */ - public Metadata headers() { + @Override public Metadata headers() { return headers; } diff --git a/instrumentation/grpc/src/main/java/brave/grpc/GrpcClientResponse.java b/instrumentation/grpc/src/main/java/brave/grpc/GrpcClientResponse.java index 818448e067..6f3f344e5d 100644 --- a/instrumentation/grpc/src/main/java/brave/grpc/GrpcClientResponse.java +++ b/instrumentation/grpc/src/main/java/brave/grpc/GrpcClientResponse.java @@ -15,7 +15,6 @@ import brave.internal.Nullable; import brave.rpc.RpcClientResponse; -import brave.rpc.RpcTracing; import io.grpc.ClientCall; import io.grpc.Metadata; import io.grpc.Status; @@ -23,33 +22,23 @@ /** * Allows access gRPC specific aspects of a client response for parsing. * - *

Here's an example that adds default tags, and if gRPC, the Java result: - *

{@code
- * rpcTracing = rpcTracingBuilder
- *   .clientResponseParser((res, context, span) -> {
- *      RpcResponseParser.DEFAULT.parse(res, context, span);
- *      if (res instanceof DubboResponse) {
- *        DubboResponse dubboResponse = (DubboResponse) res;
- *        if (res.result() != null) {
- *          tagJavaResult(res.result().value());
- *        }
- *      }
- *   }).build();
- * }
- * * @see GrpcClientRequest - * @see RpcTracing#clientResponseParser() + * @see GrpcResponse for a parsing example * @since 5.12 */ -public final class GrpcClientResponse extends RpcClientResponse { +public final class GrpcClientResponse extends RpcClientResponse implements GrpcResponse { final GrpcClientRequest request; + final Metadata headers; final Status status; final Metadata trailers; - GrpcClientResponse(GrpcClientRequest request, Status status, Metadata trailers) { + GrpcClientResponse(GrpcClientRequest request, Metadata headers, Status status, + Metadata trailers) { if (request == null) throw new NullPointerException("request == null"); + if (headers == null) throw new NullPointerException("headers == null"); if (status == null) throw new NullPointerException("status == null"); if (trailers == null) throw new NullPointerException("trailers == null"); + this.headers = headers; this.request = request; this.status = status; this.trailers = trailers; @@ -78,12 +67,21 @@ public final class GrpcClientResponse extends RpcClientResponse { return status.getCode().name(); } + /** + * Returns a copy of headers passed to {@link ClientCall.Listener#onHeaders(Metadata)}. + * + * @since 5.12 + */ + @Override public Metadata headers() { + return headers; + } + /** * Returns the status passed to {@link ClientCall.Listener#onClose(Status, Metadata)}. * * @since 5.12 */ - public Status status() { + @Override public Status status() { return status; } @@ -92,7 +90,7 @@ public Status status() { * * @since 5.12 */ - public Metadata trailers() { + @Override public Metadata trailers() { return trailers; } } diff --git a/instrumentation/grpc/src/main/java/brave/grpc/GrpcRequest.java b/instrumentation/grpc/src/main/java/brave/grpc/GrpcRequest.java new file mode 100644 index 0000000000..7cca8cc69a --- /dev/null +++ b/instrumentation/grpc/src/main/java/brave/grpc/GrpcRequest.java @@ -0,0 +1,54 @@ +/* + * Copyright 2013-2020 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.grpc; + +import brave.rpc.RpcTracing; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; + +/** + * Allows access gRPC specific aspects of a client or server request during sampling and parsing. + * + *

Here's an example that adds default tags, and if gRPC, the {@linkplain + * MethodDescriptor#getType() method type}: + *

{@code
+ * Tag methodType = new Tag("grpc.method_type") {
+ *   @Override protected String parseValue(GrpcRequest input, TraceContext context) {
+ *     return input.methodDescriptor().getType().name();
+ *   }
+ * };
+ *
+ * RpcRequestParser addMethodType = (req, context, span) -> {
+ *   RpcRequestParser.DEFAULT.parse(req, context, span);
+ *   if (req instanceof GrpcRequest) methodType.tag((GrpcRequest) req, span);
+ * };
+ *
+ * grpcTracing = GrpcTracing.create(RpcTracing.newBuilder(tracing)
+ *     .clientRequestParser(addMethodType)
+ *     .serverRequestParser(addMethodType).build());
+ * }
+ * + * @see GrpcResponse + * @see GrpcClientRequest + * @see GrpcServerRequest + * @see RpcTracing#clientRequestParser() + * @see RpcTracing#serverRequestParser() + * @since 5.12 + */ +// NOTE: gRPC is Java 1.7+, so we cannot add methods to this later +public interface GrpcRequest { + MethodDescriptor methodDescriptor(); + + Metadata headers(); +} diff --git a/instrumentation/grpc/src/main/java/brave/grpc/GrpcResponse.java b/instrumentation/grpc/src/main/java/brave/grpc/GrpcResponse.java new file mode 100644 index 0000000000..974a0b00d0 --- /dev/null +++ b/instrumentation/grpc/src/main/java/brave/grpc/GrpcResponse.java @@ -0,0 +1,55 @@ +/* + * Copyright 2013-2020 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.grpc; + +import brave.rpc.RpcTracing; +import io.grpc.Metadata; +import io.grpc.Status; + +/** + * Allows access gRPC specific aspects of a client or server response for parsing. + * + *

Here's an example that adds default tags, and if gRPC, the response encoding: + *

{@code
+ * Tag responseEncoding = new Tag("grpc.response_encoding") {
+ *   @Override protected String parseValue(GrpcResponse input, TraceContext context) {
+ *     return input.headers().get(GrpcUtil.MESSAGE_ENCODING_KEY);
+ *   }
+ * };
+ *
+ * RpcResponseParser addResponseEncoding = (res, context, span) -> {
+ *   RpcResponseParser.DEFAULT.parse(res, context, span);
+ *   if (res instanceof GrpcResponse) responseEncoding.tag((GrpcResponse) res, span);
+ * };
+ *
+ * grpcTracing = GrpcTracing.create(RpcTracing.newBuilder(tracing)
+ *     .clientResponseParser(addResponseEncoding);
+ *     .serverResponseParser(addResponseEncoding).build());
+ * }
+ * + * @see GrpcRequest + * @see GrpcClientResponse + * @see GrpcServerResponse + * @see RpcTracing#clientResponseParser() + * @see RpcTracing#serverResponseParser() + * @since 5.12 + */ +// NOTE: gRPC is Java 1.7+, so we cannot add methods to this later +public interface GrpcResponse { + Metadata headers(); + + Status status(); + + Metadata trailers(); +} diff --git a/instrumentation/grpc/src/main/java/brave/grpc/GrpcServerRequest.java b/instrumentation/grpc/src/main/java/brave/grpc/GrpcServerRequest.java index 5f06290a0d..8aef475064 100644 --- a/instrumentation/grpc/src/main/java/brave/grpc/GrpcServerRequest.java +++ b/instrumentation/grpc/src/main/java/brave/grpc/GrpcServerRequest.java @@ -14,7 +14,6 @@ package brave.grpc; import brave.rpc.RpcServerRequest; -import brave.rpc.RpcTracing; import io.grpc.Metadata; import io.grpc.Metadata.Key; import io.grpc.MethodDescriptor; @@ -25,27 +24,11 @@ /** * Allows access gRPC specific aspects of a server request during sampling and parsing. * - *

Here's an example that adds default tags, and if gRPC, the {@linkplain - * MethodDescriptor#getType() method type}: - *

{@code
- * Tag methodType = new Tag("grpc.method_type") {
- *   protected String parseValue(GrpcServerRequest input, TraceContext context) {
- *     return input.call().getMethodDescriptor().getType().name();
- *   }
- * };
- * rpcTracing = rpcTracingBuilder.serverResponseParser((res, context, span) -> {
- *   RpcResponseParser.DEFAULT.parse(res, context, span);
- *     if (res instanceof GrpcServerRequest) {
- *       methodType.tag((GrpcServerRequest) res, span);
- *     }
- *   }).build();
- * }
- * * @see GrpcServerResponse - * @see RpcTracing#serverRequestParser() + * @see GrpcRequest for a parsing example * @since 5.12 */ -public class GrpcServerRequest extends RpcServerRequest { +public class GrpcServerRequest extends RpcServerRequest implements GrpcRequest { final Map> nameToKey; final ServerCall call; final Metadata headers; @@ -83,12 +66,21 @@ public class GrpcServerRequest extends RpcServerRequest { return call; } + /** + * Returns {@linkplain ServerCall#getMethodDescriptor()}} from the {@link #call()}. + * + * @since 5.12 + */ + @Override public MethodDescriptor methodDescriptor() { + return call.getMethodDescriptor(); + } + /** * Returns the {@linkplain Metadata headers} passed to {@link ServerInterceptor#interceptCall}. * * @since 5.12 */ - public Metadata headers() { + @Override public Metadata headers() { return headers; } diff --git a/instrumentation/grpc/src/main/java/brave/grpc/GrpcServerResponse.java b/instrumentation/grpc/src/main/java/brave/grpc/GrpcServerResponse.java index bbf5e4c2b0..9d83e8fce0 100644 --- a/instrumentation/grpc/src/main/java/brave/grpc/GrpcServerResponse.java +++ b/instrumentation/grpc/src/main/java/brave/grpc/GrpcServerResponse.java @@ -15,27 +15,30 @@ import brave.internal.Nullable; import brave.rpc.RpcServerResponse; -import brave.rpc.RpcTracing; import io.grpc.Metadata; import io.grpc.ServerCall; import io.grpc.Status; /** - * Allows access gRPC specific aspects of a client response for parsing. + * Allows access gRPC specific aspects of a server response for parsing. * * @see GrpcServerRequest - * @see RpcTracing#serverResponseParser() + * @see GrpcResponse for a parsing example * @since 5.12 */ -public final class GrpcServerResponse extends RpcServerResponse { +public final class GrpcServerResponse extends RpcServerResponse implements GrpcResponse { final GrpcServerRequest request; + final Metadata headers; final Status status; final Metadata trailers; - GrpcServerResponse(GrpcServerRequest request, Status status, Metadata trailers) { + GrpcServerResponse(GrpcServerRequest request, Metadata headers, Status status, + Metadata trailers) { if (request == null) throw new NullPointerException("request == null"); + if (headers == null) throw new NullPointerException("headers == null"); if (status == null) throw new NullPointerException("status == null"); if (trailers == null) throw new NullPointerException("trailers == null"); + this.headers = headers; this.request = request; this.status = status; this.trailers = trailers; @@ -65,11 +68,20 @@ public final class GrpcServerResponse extends RpcServerResponse { } /** - * Returns the status passed to{@link ServerCall#close(Status, Metadata)}. + * Returns a copy of headers passed to {@link ServerCall#sendHeaders(Metadata)}. * * @since 5.12 */ - public Status status() { + @Override public Metadata headers() { + return headers; + } + + /** + * Returns the status passed to {@link ServerCall#close(Status, Metadata)}. + * + * @since 5.12 + */ + @Override public Status status() { return status; } @@ -78,7 +90,7 @@ public Status status() { * * @since 5.12 */ - public Metadata trailers() { + @Override public Metadata trailers() { return trailers; } } diff --git a/instrumentation/grpc/src/main/java/brave/grpc/TracingClientInterceptor.java b/instrumentation/grpc/src/main/java/brave/grpc/TracingClientInterceptor.java index 2445e8858f..c90bac1d6f 100644 --- a/instrumentation/grpc/src/main/java/brave/grpc/TracingClientInterceptor.java +++ b/instrumentation/grpc/src/main/java/brave/grpc/TracingClientInterceptor.java @@ -154,6 +154,7 @@ final class TracingClientCallListener extends SimpleForwardingClientCallL @Nullable final TraceContext invocationContext; final AtomicReference spanRef; final GrpcClientRequest request; + final Metadata headers = new Metadata(); TracingClientCallListener( Listener delegate, @@ -175,6 +176,8 @@ final class TracingClientCallListener extends SimpleForwardingClientCallL // See instrumentation/RATIONALE.md for why the below response callbacks are invocation context @Override public void onHeaders(Metadata headers) { + // onHeaders() JavaDoc mentions headers are not thread-safe, so we make a safe copy here. + this.headers.merge(headers); try (Scope scope = currentTraceContext.maybeScope(invocationContext)) { delegate().onHeaders(headers); } @@ -191,7 +194,7 @@ final class TracingClientCallListener extends SimpleForwardingClientCallL @Override public void onClose(Status status, Metadata trailers) { // See /instrumentation/grpc/RATIONALE.md for why we don't catch exceptions from the delegate - GrpcClientResponse response = new GrpcClientResponse(request, status, trailers); + GrpcClientResponse response = new GrpcClientResponse(request, headers, status, trailers); Span span = spanRef.getAndSet(null); if (span != null) handler.handleReceive(response, span); diff --git a/instrumentation/grpc/src/main/java/brave/grpc/TracingServerInterceptor.java b/instrumentation/grpc/src/main/java/brave/grpc/TracingServerInterceptor.java index ed812827aa..db1fa4dcb1 100644 --- a/instrumentation/grpc/src/main/java/brave/grpc/TracingServerInterceptor.java +++ b/instrumentation/grpc/src/main/java/brave/grpc/TracingServerInterceptor.java @@ -77,6 +77,7 @@ final class TracingServerCall extends SimpleForwardingServerCall spanRef; final GrpcServerRequest request; + final Metadata headers = new Metadata(); TracingServerCall(ServerCall delegate, Span span, AtomicReference spanRef, GrpcServerRequest request) { @@ -96,6 +97,8 @@ final class TracingServerCall extends SimpleForwardingServerCall extends SimpleForwardingServerCall GraterGrpc.newBlockingStub(client).seyHallo(HELLO_REQUEST)) - .isInstanceOf(StatusRuntimeException.class); + .isInstanceOf(StatusRuntimeException.class); // The error format of the exception message can differ from the span's "error" tag in CI Span span = reporter.takeRemoteSpanWithError(Span.Kind.CLIENT, ".*Connection refused.*"); @@ -217,7 +218,7 @@ ManagedChannel newClient(ClientInterceptor... clientInterceptors) { @Test public void addsErrorTag_onUnimplemented() { assertThatThrownBy(() -> GraterGrpc.newBlockingStub(client).seyHallo(HELLO_REQUEST)) - .isInstanceOf(StatusRuntimeException.class); + .isInstanceOf(StatusRuntimeException.class); Span span = reporter.takeRemoteSpanWithError(Span.Kind.CLIENT, "UNIMPLEMENTED"); assertThat(span.tags().get("grpc.status_code")).isEqualTo("UNIMPLEMENTED"); @@ -242,31 +243,31 @@ ManagedChannel newClient(ClientInterceptor... clientInterceptors) { closeClient(client); client = newClient( - new ClientInterceptor() { - @Override public ClientCall interceptCall( - MethodDescriptor method, CallOptions callOptions, Channel next) { - return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { - @Override - public void start(Listener responseListener, Metadata headers) { - tracing.tracer().currentSpanCustomizer().annotate("start"); - super.start(responseListener, headers); - } - - @Override public void sendMessage(ReqT message) { - tracing.tracer().currentSpanCustomizer().annotate("sendMessage"); - super.sendMessage(message); - } - }; - } - }, - grpcTracing.newClientInterceptor() + new ClientInterceptor() { + @Override public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + tracing.tracer().currentSpanCustomizer().annotate("start"); + super.start(responseListener, headers); + } + + @Override public void sendMessage(ReqT message) { + tracing.tracer().currentSpanCustomizer().annotate("sendMessage"); + super.sendMessage(message); + } + }; + } + }, + grpcTracing.newClientInterceptor() ); GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST); assertThat(reporter.takeRemoteSpan(Span.Kind.CLIENT).annotations()) - .extracting(Annotation::value) - .containsOnly("start", "sendMessage"); + .extracting(Annotation::value) + .containsOnly("start", "sendMessage"); } @Test public void clientParserTest() { @@ -303,8 +304,8 @@ protected String spanName(MethodDescriptor methodDesc Span span = reporter.takeRemoteSpan(Span.Kind.CLIENT); assertThat(span.name()).isEqualTo("unary"); assertThat(span.tags()).containsKeys( - "grpc.message_received", "grpc.message_sent", - "grpc.message_received.visible", "grpc.message_sent.visible" + "grpc.message_received", "grpc.message_sent", + "grpc.message_received.visible", "grpc.message_sent.visible" ); reporter.takeLocalSpan(); } @@ -321,7 +322,7 @@ protected String spanName(MethodDescriptor methodDesc client = newClient(); Iterator replies = GreeterGrpc.newBlockingStub(client) - .sayHelloWithManyReplies(HelloRequest.newBuilder().setName("this is dog").build()); + .sayHelloWithManyReplies(HelloRequest.newBuilder().setName("this is dog").build()); assertThat(replies).toIterable().hasSize(10); // all response messages are tagged to the same span @@ -334,7 +335,8 @@ protected String spanName(MethodDescriptor methodDesc closeClient(client); client = newClient(new ClientInterceptor() { @Override public ClientCall interceptCall( - MethodDescriptor methodDescriptor, CallOptions callOptions, Channel channel) { + MethodDescriptor methodDescriptor, CallOptions callOptions, + Channel channel) { ClientCall call = channel.newCall(methodDescriptor, callOptions); return new SimpleForwardingClientCall(call) { @Override public void start(Listener responseListener, Metadata headers) { @@ -345,7 +347,7 @@ protected String spanName(MethodDescriptor methodDesc }, grpcTracing.newClientInterceptor()); assertThatThrownBy(() -> GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST)) - .isInstanceOf(IllegalStateException.class); + .isInstanceOf(IllegalStateException.class); reporter.takeRemoteSpanWithError(Span.Kind.CLIENT, "I'm a bad interceptor."); } @@ -353,7 +355,8 @@ protected String spanName(MethodDescriptor methodDesc closeClient(client); client = newClient(new ClientInterceptor() { @Override public ClientCall interceptCall( - MethodDescriptor methodDescriptor, CallOptions callOptions, Channel channel) { + MethodDescriptor methodDescriptor, CallOptions callOptions, + Channel channel) { ClientCall call = channel.newCall(methodDescriptor, callOptions); return new SimpleForwardingClientCall(call) { @Override public void halfClose() { @@ -364,7 +367,7 @@ protected String spanName(MethodDescriptor methodDesc }, grpcTracing.newClientInterceptor()); assertThatThrownBy(() -> GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST)) - .isInstanceOf(IllegalStateException.class); + .isInstanceOf(IllegalStateException.class); reporter.takeRemoteSpanWithError(Span.Kind.CLIENT, "I'm a bad interceptor."); } @@ -383,11 +386,11 @@ protected String spanName(MethodDescriptor methodDesc } assertThat(reporter.takeRemoteSpan(Span.Kind.CLIENT).tags()) - .containsKey("grpc.message_send.1"); + .containsKey("grpc.message_send.1"); // Response processing happens on the invocation (parent) trace context assertThat(reporter.takeLocalSpan().tags()) - .containsKey("grpc.message_recv.1"); + .containsKey("grpc.message_recv.1"); } @Test public void messageTagging_streaming() { @@ -396,28 +399,28 @@ protected String spanName(MethodDescriptor methodDesc ScopedSpan span = tracing.tracer().startScopedSpan("parent"); try { Iterator replies = GreeterGrpc.newBlockingStub(client) - .sayHelloWithManyReplies(HELLO_REQUEST); + .sayHelloWithManyReplies(HELLO_REQUEST); assertThat(replies).toIterable().hasSize(10); } finally { span.finish(); } assertThat(reporter.takeRemoteSpan(Span.Kind.CLIENT).tags()) - .containsKey("grpc.message_send.1"); + .containsKey("grpc.message_send.1"); // Response processing happens on the invocation (parent) trace context // Intentionally verbose here to show 10 replies assertThat(reporter.takeLocalSpan().tags()).containsKeys( - "grpc.message_recv.1", - "grpc.message_recv.2", - "grpc.message_recv.3", - "grpc.message_recv.4", - "grpc.message_recv.5", - "grpc.message_recv.6", - "grpc.message_recv.7", - "grpc.message_recv.8", - "grpc.message_recv.9", - "grpc.message_recv.10" + "grpc.message_recv.1", + "grpc.message_recv.2", + "grpc.message_recv.3", + "grpc.message_recv.4", + "grpc.message_recv.5", + "grpc.message_recv.6", + "grpc.message_recv.7", + "grpc.message_recv.8", + "grpc.message_recv.9", + "grpc.message_recv.10" ); } @@ -428,27 +431,27 @@ void initMessageTaggingClient() { closeClient(client); client = newClient( - new ClientInterceptor() { - @Override public ClientCall interceptCall( - MethodDescriptor method, CallOptions callOptions, Channel next) { - return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { - @Override public void start(Listener responseListener, Metadata headers) { - super.start(new SimpleForwardingClientCallListener(responseListener) { - @Override public void onMessage(RespT message) { - customizer.tag("grpc.message_recv." + recvs.getAndIncrement(), - message.toString()); - delegate().onMessage(message); - } - }, headers); - } - - @Override public void sendMessage(ReqT message) { - customizer.tag("grpc.message_send." + sends.getAndIncrement(), message.toString()); - delegate().sendMessage(message); - } - }; - } - }, grpcTracing.newClientInterceptor()); + new ClientInterceptor() { + @Override public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { + @Override public void start(Listener responseListener, Metadata headers) { + super.start(new SimpleForwardingClientCallListener(responseListener) { + @Override public void onMessage(RespT message) { + customizer.tag("grpc.message_recv." + recvs.getAndIncrement(), + message.toString()); + delegate().onMessage(message); + } + }, headers); + } + + @Override public void sendMessage(ReqT message) { + customizer.tag("grpc.message_send." + sends.getAndIncrement(), message.toString()); + delegate().sendMessage(message); + } + }; + } + }, grpcTracing.newClientInterceptor()); } /** @@ -495,9 +498,9 @@ void initMessageTaggingClient() { closeClient(client); RpcTracing rpcTracing = RpcTracing.newBuilder(tracing).clientSampler(RpcRuleSampler.newBuilder() - .putRule(methodEquals("SayHelloWithManyReplies"), NEVER_SAMPLE) - .putRule(serviceEquals("helloworld.greeter"), ALWAYS_SAMPLE) - .build()).build(); + .putRule(methodEquals("SayHelloWithManyReplies"), NEVER_SAMPLE) + .putRule(serviceEquals("helloworld.greeter"), ALWAYS_SAMPLE) + .build()).build(); grpcTracing = GrpcTracing.create(rpcTracing); client = newClient(); @@ -510,36 +513,42 @@ void initMessageTaggingClient() { GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST); assertThat(reporter.takeRemoteSpan(Span.Kind.CLIENT).name()) - .isEqualTo("helloworld.greeter/sayhello"); + .isEqualTo("helloworld.greeter/sayhello"); // @After will also check that sayHelloWithManyReplies was not sampled } @Test public void customParser() { closeClient(client); - Tag methodType = new Tag("grpc.method_type") { - @Override protected String parseValue(GrpcClientRequest input, TraceContext context) { + Tag methodType = new Tag("grpc.method_type") { + @Override protected String parseValue(GrpcRequest input, TraceContext context) { return input.methodDescriptor().getType().name(); } }; - RpcRequestParser customRequestParser = new RpcRequestParser() { - @Override public void parse(RpcRequest res, TraceContext context, SpanCustomizer span) { - RpcRequestParser.DEFAULT.parse(res, context, span); - if (res instanceof GrpcClientRequest) { - methodType.tag((GrpcClientRequest) res, span); - } + + Tag responseEncoding = new Tag("grpc.response_encoding") { + @Override protected String parseValue(GrpcResponse input, TraceContext context) { + return input.headers().get(GrpcUtil.MESSAGE_ENCODING_KEY); } }; grpcTracing = GrpcTracing.create(RpcTracing.newBuilder(tracing) - .clientRequestParser(customRequestParser) - .build()); + .clientRequestParser((req, context, span) -> { + RpcRequestParser.DEFAULT.parse(req, context, span); + if (req instanceof GrpcRequest) methodType.tag((GrpcRequest) req, span); + }) + .clientResponseParser((res, context, span) -> { + RpcResponseParser.DEFAULT.parse(res, context, span); + if (res instanceof GrpcResponse) responseEncoding.tag((GrpcResponse) res, span); + }).build()); + client = newClient(); GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST); assertThat(reporter.takeRemoteSpan(Span.Kind.CLIENT).tags()) - .containsEntry("grpc.method_type", "UNARY"); + .containsEntry("grpc.method_type", "UNARY") + .containsEntry("grpc.response_encoding", "identity"); } static final class StreamObserverAdapter implements StreamObserver { diff --git a/instrumentation/grpc/src/test/java/brave/grpc/BaseITTracingServerInterceptor.java b/instrumentation/grpc/src/test/java/brave/grpc/BaseITTracingServerInterceptor.java index 6cfb06030f..1145831932 100644 --- a/instrumentation/grpc/src/test/java/brave/grpc/BaseITTracingServerInterceptor.java +++ b/instrumentation/grpc/src/test/java/brave/grpc/BaseITTracingServerInterceptor.java @@ -20,8 +20,8 @@ import brave.propagation.B3SingleFormat; import brave.propagation.SamplingFlags; import brave.propagation.TraceContext; -import brave.rpc.RpcRequest; import brave.rpc.RpcRequestParser; +import brave.rpc.RpcResponseParser; import brave.rpc.RpcRuleSampler; import brave.rpc.RpcTracing; import brave.test.ITRemote; @@ -49,6 +49,7 @@ import io.grpc.examples.helloworld.GreeterGrpc; import io.grpc.examples.helloworld.HelloReply; import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.internal.GrpcUtil; import java.io.IOException; import java.util.Iterator; import java.util.concurrent.TimeUnit; @@ -87,15 +88,15 @@ void init(@Nullable ServerInterceptor userInterceptor) throws IOException { // tracing interceptor needs to go last ServerInterceptor tracingInterceptor = grpcTracing.newServerInterceptor(); ServerInterceptor[] interceptors = userInterceptor != null - ? new ServerInterceptor[] {userInterceptor, tracingInterceptor} - : new ServerInterceptor[] {tracingInterceptor}; + ? new ServerInterceptor[] {userInterceptor, tracingInterceptor} + : new ServerInterceptor[] {tracingInterceptor}; server = ServerBuilder.forPort(PickUnusedPort.get()) - .addService(ServerInterceptors.intercept(new GreeterImpl(grpcTracing), interceptors)) - .build().start(); + .addService(ServerInterceptors.intercept(new GreeterImpl(grpcTracing), interceptors)) + .build().start(); client = usePlainText(ManagedChannelBuilder.forAddress("localhost", server.getPort())) - .build(); + .build(); } /** Extracted as {@link ManagedChannelBuilder#usePlaintext()} is a version-specific signature */ @@ -157,7 +158,7 @@ void init(@Nullable ServerInterceptor userInterceptor) throws IOException { init(new ServerInterceptor() { @Override public ServerCall.Listener interceptCall(ServerCall call, - Metadata headers, ServerCallHandler next) { + Metadata headers, ServerCallHandler next) { fromUserInterceptor.set(tracing.currentTraceContext().get()); return next.startCall(call, headers); } @@ -166,7 +167,7 @@ public ServerCall.Listener interceptCall(ServerCall ServerCall.Listener interceptCall(ServerCall GreeterGrpc.newBlockingStub(client) - .sayHello(HelloRequest.newBuilder().setName("bad").build())); + .sayHello(HelloRequest.newBuilder().setName("bad").build())); Span span = reporter.takeRemoteSpanWithError(Span.Kind.SERVER, "IllegalArgumentException"); assertThat(span.tags()).containsEntry("grpc.status_code", "UNKNOWN"); @@ -195,8 +196,8 @@ public ServerCall.Listener interceptCall(ServerCall GreeterGrpc.newBlockingStub(client) - .sayHello(HelloRequest.newBuilder().setName("testerror").build())) - .isInstanceOf(StatusRuntimeException.class); + .sayHello(HelloRequest.newBuilder().setName("testerror").build())) + .isInstanceOf(StatusRuntimeException.class); Span span = reporter.takeRemoteSpanWithError(Span.Kind.SERVER, "testerror"); assertThat(span.tags().get("grpc.status_code")).isNull(); @@ -231,8 +232,8 @@ protected String spanName(MethodDescriptor methodDesc Span span = reporter.takeRemoteSpan(Span.Kind.SERVER); assertThat(span.name()).isEqualTo("unary"); assertThat(span.tags().keySet()).containsExactlyInAnyOrder( - "grpc.message_received", "grpc.message_sent", - "grpc.message_received.visible", "grpc.message_sent.visible" + "grpc.message_received", "grpc.message_sent", + "grpc.message_received.visible", "grpc.message_sent.visible" ); } @@ -247,7 +248,7 @@ protected String spanName(MethodDescriptor methodDesc init(); Iterator replies = GreeterGrpc.newBlockingStub(client) - .sayHelloWithManyReplies(HELLO_REQUEST); + .sayHelloWithManyReplies(HELLO_REQUEST); assertThat(replies).toIterable().hasSize(10); // all response messages are tagged to the same span assertThat(reporter.takeRemoteSpan(Span.Kind.SERVER).tags()).hasSize(10); @@ -258,20 +259,20 @@ protected String spanName(MethodDescriptor methodDesc @Test public void userInterceptor_throwsOnStartCall() throws IOException { init(new ServerInterceptor() { @Override public ServerCall.Listener interceptCall( - ServerCall call, Metadata metadata, ServerCallHandler next) { + ServerCall call, Metadata metadata, ServerCallHandler next) { throw new IllegalStateException("I'm a bad interceptor."); } }); assertThatThrownBy(() -> GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST)) - .isInstanceOf(StatusRuntimeException.class); + .isInstanceOf(StatusRuntimeException.class); reporter.takeRemoteSpanWithError(Span.Kind.SERVER, "I'm a bad interceptor."); } @Test public void userInterceptor_throwsOnSendMessage() throws IOException { init(new ServerInterceptor() { @Override public ServerCall.Listener interceptCall( - ServerCall call, Metadata metadata, ServerCallHandler next) { + ServerCall call, Metadata metadata, ServerCallHandler next) { return next.startCall(new SimpleForwardingServerCall(call) { @Override public void sendMessage(RespT message) { throw new IllegalStateException("I'm a bad interceptor."); @@ -281,14 +282,14 @@ protected String spanName(MethodDescriptor methodDesc }); assertThatThrownBy(() -> GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST)) - .isInstanceOf(StatusRuntimeException.class); + .isInstanceOf(StatusRuntimeException.class); reporter.takeRemoteSpanWithError(Span.Kind.SERVER, "I'm a bad interceptor."); } @Test public void userInterceptor_throwsOnClose() throws IOException { init(new ServerInterceptor() { @Override public ServerCall.Listener interceptCall( - ServerCall call, Metadata metadata, ServerCallHandler next) { + ServerCall call, Metadata metadata, ServerCallHandler next) { return next.startCall(new SimpleForwardingServerCall(call) { @Override public void close(Status status, Metadata trailers) { throw new IllegalStateException("I'm a bad interceptor."); @@ -298,14 +299,14 @@ protected String spanName(MethodDescriptor methodDesc }); assertThatThrownBy(() -> GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST)) - .isInstanceOf(StatusRuntimeException.class); + .isInstanceOf(StatusRuntimeException.class); reporter.takeRemoteSpanWithError(Span.Kind.SERVER, "I'm a bad interceptor."); } @Test public void userInterceptor_throwsOnOnHalfClose() throws IOException { init(new ServerInterceptor() { @Override public ServerCall.Listener interceptCall( - ServerCall call, Metadata metadata, ServerCallHandler next) { + ServerCall call, Metadata metadata, ServerCallHandler next) { return new SimpleForwardingServerCallListener(next.startCall(call, metadata)) { @Override public void onHalfClose() { throw new IllegalStateException("I'm a bad interceptor."); @@ -315,7 +316,7 @@ protected String spanName(MethodDescriptor methodDesc }); assertThatThrownBy(() -> GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST)) - .isInstanceOf(StatusRuntimeException.class); + .isInstanceOf(StatusRuntimeException.class); reporter.takeRemoteSpanWithError(Span.Kind.SERVER, "I'm a bad interceptor."); } @@ -331,7 +332,7 @@ protected String spanName(MethodDescriptor methodDesc init(new ServerInterceptor() { @Override public ServerCall.Listener interceptCall( - ServerCall call, Metadata headers, ServerCallHandler next) { + ServerCall call, Metadata headers, ServerCallHandler next) { call = new SimpleForwardingServerCall(call) { @Override public void sendMessage(RespT message) { delegate().sendMessage(message); @@ -350,26 +351,26 @@ public ServerCall.Listener interceptCall( GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST); assertThat(reporter.takeRemoteSpan(Span.Kind.SERVER).tags()).containsKeys( - "grpc.message_recv.0", "grpc.message_send.0" + "grpc.message_recv.0", "grpc.message_send.0" ); Iterator replies = GreeterGrpc.newBlockingStub(client) - .sayHelloWithManyReplies(HELLO_REQUEST); + .sayHelloWithManyReplies(HELLO_REQUEST); assertThat(replies).toIterable().hasSize(10); // Intentionally verbose here to show that only one recv and 10 replies assertThat(reporter.takeRemoteSpan(Span.Kind.SERVER).tags()).containsKeys( - "grpc.message_recv.1", - "grpc.message_send.1", - "grpc.message_send.2", - "grpc.message_send.3", - "grpc.message_send.4", - "grpc.message_send.5", - "grpc.message_send.6", - "grpc.message_send.7", - "grpc.message_send.8", - "grpc.message_send.9", - "grpc.message_send.10" + "grpc.message_recv.1", + "grpc.message_send.1", + "grpc.message_send.2", + "grpc.message_send.3", + "grpc.message_send.4", + "grpc.message_send.5", + "grpc.message_send.6", + "grpc.message_send.7", + "grpc.message_send.8", + "grpc.message_send.9", + "grpc.message_send.10" ); } @@ -377,9 +378,9 @@ public ServerCall.Listener interceptCall( @Test public void customSampler() throws IOException { RpcTracing rpcTracing = RpcTracing.newBuilder(tracing).serverSampler(RpcRuleSampler.newBuilder() - .putRule(methodEquals("SayHelloWithManyReplies"), NEVER_SAMPLE) - .putRule(serviceEquals("helloworld.greeter"), ALWAYS_SAMPLE) - .build()).build(); + .putRule(methodEquals("SayHelloWithManyReplies"), NEVER_SAMPLE) + .putRule(serviceEquals("helloworld.greeter"), ALWAYS_SAMPLE) + .build()).build(); grpcTracing = GrpcTracing.create(rpcTracing); init(); @@ -391,45 +392,50 @@ public ServerCall.Listener interceptCall( GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST); assertThat(reporter.takeRemoteSpan(Span.Kind.SERVER).name()) - .isEqualTo("helloworld.greeter/sayhello"); + .isEqualTo("helloworld.greeter/sayhello"); // @After will also check that sayHelloWithManyReplies was not sampled } @Test public void customParser() throws IOException { - Tag methodType = new Tag("grpc.method_type") { - @Override protected String parseValue(GrpcServerRequest input, TraceContext context) { - return input.call().getMethodDescriptor().getType().name(); + Tag methodType = new Tag("grpc.method_type") { + @Override protected String parseValue(GrpcRequest input, TraceContext context) { + return input.methodDescriptor().getType().name(); } }; - RpcRequestParser customRequestParser = new RpcRequestParser() { - @Override public void parse(RpcRequest res, TraceContext context, SpanCustomizer span) { - RpcRequestParser.DEFAULT.parse(res, context, span); - if (res instanceof GrpcServerRequest) { - methodType.tag((GrpcServerRequest) res, span); - } + + Tag responseEncoding = new Tag("grpc.response_encoding") { + @Override protected String parseValue(GrpcResponse input, TraceContext context) { + return input.headers().get(GrpcUtil.MESSAGE_ENCODING_KEY); } }; grpcTracing = GrpcTracing.create(RpcTracing.newBuilder(tracing) - .serverRequestParser(customRequestParser) - .build()); + .serverRequestParser((req, context, span) -> { + RpcRequestParser.DEFAULT.parse(req, context, span); + if (req instanceof GrpcRequest) methodType.tag((GrpcRequest) req, span); + }) + .serverResponseParser((res, context, span) -> { + RpcResponseParser.DEFAULT.parse(res, context, span); + if (res instanceof GrpcResponse) responseEncoding.tag((GrpcResponse) res, span); + }).build()); init(); GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST); assertThat(reporter.takeRemoteSpan(Span.Kind.SERVER).tags()) - .containsEntry("grpc.method_type", "UNARY"); + .containsEntry("grpc.method_type", "UNARY") + .containsEntry("grpc.response_encoding", "identity"); } Channel clientWithB3SingleHeader(TraceContext parent) { return ClientInterceptors.intercept(client, new ClientInterceptor() { @Override public ClientCall interceptCall( - MethodDescriptor method, CallOptions callOptions, Channel next) { + MethodDescriptor method, CallOptions callOptions, Channel next) { return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { @Override public void start(Listener responseListener, Metadata headers) { headers.put(Key.of("b3", ASCII_STRING_MARSHALLER), - B3SingleFormat.writeB3SingleFormat(parent)); + B3SingleFormat.writeB3SingleFormat(parent)); super.start(responseListener, headers); } }; diff --git a/instrumentation/grpc/src/test/java/brave/grpc/GrpcClientResponseTest.java b/instrumentation/grpc/src/test/java/brave/grpc/GrpcClientResponseTest.java index b9cd0daea4..f32e89c125 100644 --- a/instrumentation/grpc/src/test/java/brave/grpc/GrpcClientResponseTest.java +++ b/instrumentation/grpc/src/test/java/brave/grpc/GrpcClientResponseTest.java @@ -30,17 +30,21 @@ public class GrpcClientResponseTest { MethodDescriptor methodDescriptor = TestObjects.METHOD_DESCRIPTOR; CallOptions callOptions = CallOptions.DEFAULT; ClientCall call = mock(ClientCall.class); - Metadata headers = new Metadata(); + Metadata headers = new Metadata(), trailers = new Metadata(); GrpcClientRequest request = - new GrpcClientRequest(singletonMap("b3", b3Key), methodDescriptor, callOptions, call, headers); + new GrpcClientRequest(singletonMap("b3", b3Key), methodDescriptor, callOptions, call, + headers); Status status = Status.CANCELLED; - Metadata trailers = new Metadata(); - GrpcClientResponse response = new GrpcClientResponse(request, status, trailers); + GrpcClientResponse response = new GrpcClientResponse(request, headers, status, trailers); @Test public void request() { assertThat(response.request()).isSameAs(request); } + @Test public void headers() { + assertThat(response.headers()).isSameAs(headers); + } + @Test public void status() { assertThat(response.status()).isSameAs(status); } @@ -60,7 +64,7 @@ public class GrpcClientResponseTest { @Test public void error_fromStatus() { RuntimeException error = new RuntimeException("noodles"); status = Status.fromThrowable(error); - GrpcClientResponse response = new GrpcClientResponse(request, status, trailers); + GrpcClientResponse response = new GrpcClientResponse(request, headers, status, trailers); assertThat(response.error()).isSameAs(error); assertThat(response.errorCode()).isEqualTo("UNKNOWN"); @@ -68,7 +72,7 @@ public class GrpcClientResponseTest { @Test public void errorCode_nullWhenOk() { status = Status.OK; - GrpcClientResponse response = new GrpcClientResponse(request, status, trailers); + GrpcClientResponse response = new GrpcClientResponse(request, headers, status, trailers); assertThat(response.errorCode()).isNull(); } diff --git a/instrumentation/grpc/src/test/java/brave/grpc/GrpcServerResponseTest.java b/instrumentation/grpc/src/test/java/brave/grpc/GrpcServerResponseTest.java index 6afcad261f..2631684ba5 100644 --- a/instrumentation/grpc/src/test/java/brave/grpc/GrpcServerResponseTest.java +++ b/instrumentation/grpc/src/test/java/brave/grpc/GrpcServerResponseTest.java @@ -26,17 +26,20 @@ public class GrpcServerResponseTest { Key b3Key = Key.of("b3", Metadata.ASCII_STRING_MARSHALLER); ServerCall call = mock(ServerCall.class); - Metadata headers = new Metadata(); + Metadata headers = new Metadata(), trailers = new Metadata(); GrpcServerRequest request = - new GrpcServerRequest(singletonMap("b3", b3Key), call, headers); + new GrpcServerRequest(singletonMap("b3", b3Key), call, headers); Status status = Status.CANCELLED; - Metadata trailers = new Metadata(); - GrpcServerResponse response = new GrpcServerResponse(request, status, trailers); + GrpcServerResponse response = new GrpcServerResponse(request, headers, status, trailers); @Test public void request() { assertThat(response.request()).isSameAs(request); } + @Test public void headers() { + assertThat(response.headers()).isSameAs(headers); + } + @Test public void status() { assertThat(response.status()).isSameAs(status); } @@ -56,7 +59,7 @@ public class GrpcServerResponseTest { @Test public void error_fromStatus() { RuntimeException error = new RuntimeException("noodles"); status = Status.fromThrowable(error); - GrpcServerResponse response = new GrpcServerResponse(request, status, trailers); + GrpcServerResponse response = new GrpcServerResponse(request, headers, status, trailers); assertThat(response.error()).isSameAs(error); assertThat(response.errorCode()).isEqualTo("UNKNOWN"); @@ -64,7 +67,7 @@ public class GrpcServerResponseTest { @Test public void errorCode_nullWhenOk() { status = Status.OK; - GrpcServerResponse response = new GrpcServerResponse(request, status, trailers); + GrpcServerResponse response = new GrpcServerResponse(request, headers, status, trailers); assertThat(response.errorCode()).isNull(); } diff --git a/instrumentation/grpc/src/test/java/brave/grpc/TestServer.java b/instrumentation/grpc/src/test/java/brave/grpc/TestServer.java index 4751fe0a5b..664ddce827 100644 --- a/instrumentation/grpc/src/test/java/brave/grpc/TestServer.java +++ b/instrumentation/grpc/src/test/java/brave/grpc/TestServer.java @@ -16,11 +16,13 @@ import brave.propagation.Propagation; import brave.propagation.TraceContext.Extractor; import brave.propagation.TraceContextOrSamplingFlags; +import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; import io.grpc.Metadata; import io.grpc.Metadata.Key; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.ServerCall; +import io.grpc.ServerCall.Listener; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.ServerInterceptors; @@ -31,33 +33,40 @@ import java.util.concurrent.TimeUnit; class TestServer { + static final Key CUSTOM_KEY = Key.of("custom", Metadata.ASCII_STRING_MARSHALLER); final BlockingQueue delayQueue = new LinkedBlockingQueue<>(); - final BlockingQueue requestQueue = new LinkedBlockingQueue<>(); + final BlockingQueue requests = new LinkedBlockingQueue<>(); final Extractor extractor; final Server server; TestServer(Map> nameToKey, Propagation propagation) { extractor = propagation.extractor(GrpcServerRequest::propagationField); server = ServerBuilder.forPort(PickUnusedPort.get()) - .addService(ServerInterceptors.intercept(new GreeterImpl(null), new ServerInterceptor() { - - @Override - public ServerCall.Listener interceptCall(ServerCall call, - Metadata headers, ServerCallHandler next) { - Long delay = delayQueue.poll(); - if (delay != null) { - try { - Thread.sleep(delay); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new AssertionError("interrupted sleeping " + delay); - } - } - requestQueue.add(extractor.extract(new GrpcServerRequest(nameToKey, call, headers))); - return next.startCall(call, headers); - } - })) - .build(); + .addService(ServerInterceptors.intercept( + new GreeterImpl(null), + new ServerInterceptor() { + @Override + public Listener interceptCall(ServerCall call, + Metadata headers, ServerCallHandler next) { + Long delay = delayQueue.poll(); + if (delay != null) { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError("interrupted sleeping " + delay); + } + } + requests.add(extractor.extract(new GrpcServerRequest(nameToKey, call, headers))); + return next.startCall(new SimpleForwardingServerCall(call) { + @Override public void sendHeaders(Metadata headers) { + headers.put(CUSTOM_KEY, "brave"); + super.sendHeaders(headers); + } + }, headers); + } + })) + .build(); } void start() throws IOException { @@ -80,7 +89,7 @@ int port() { TraceContextOrSamplingFlags takeRequest() { try { - return requestQueue.poll(3, TimeUnit.SECONDS); + return requests.poll(3, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new AssertionError(e); diff --git a/instrumentation/rpc/src/main/java/brave/rpc/RpcResponseParser.java b/instrumentation/rpc/src/main/java/brave/rpc/RpcResponseParser.java index 95b996c1cf..e5a567f158 100644 --- a/instrumentation/rpc/src/main/java/brave/rpc/RpcResponseParser.java +++ b/instrumentation/rpc/src/main/java/brave/rpc/RpcResponseParser.java @@ -24,13 +24,22 @@ * Use this to control the response data recorded for an {@link TraceContext#sampledLocal() sampled * RPC client or server span}. * - *

Here's an example that adds the "rpc.error_code" even though "error" contains it. + *

Here's an example that adds default tags, and if gRPC, the response encoding: *

{@code
- * rpcTracing = rpcTracingBuilder
- *   .clientResponseParser((response, context, span) -> {
- *     RpcResponseParser.DEFAULT.parse(response, context, span);
- *     RpcTags.ERROR_CODE.tag(response, context, span);
- *   }).build();
+ * Tag responseEncoding = new Tag("grpc.response_encoding") {
+ *   @Override protected String parseValue(GrpcResponse input, TraceContext context) {
+ *     return input.headers().get(GrpcUtil.MESSAGE_ENCODING_KEY);
+ *   }
+ * };
+ *
+ * RpcResponseParser addResponseEncoding = (res, context, span) -> {
+ *   RpcResponseParser.DEFAULT.parse(res, context, span);
+ *   if (res instanceof GrpcResponse) responseEncoding.tag((GrpcResponse) res, span);
+ * };
+ *
+ * grpcTracing = GrpcTracing.create(RpcTracing.newBuilder(tracing)
+ *     .clientResponseParser(addResponseEncoding);
+ *     .serverResponseParser(addResponseEncoding).build());
  * }
* * @see RpcRequestParser