void onMessageReceived(M message, SpanCustomizer span) {
- }
-
/**
* Override to change what data from the status or trailers are parsed into the span modeling it.
- * By default, this tags "grpc.status_code" and "error" when it is not OK.
+ * By default, this tags "grpc.status_code" when it is not OK, and "error" if there was no {@link
+ * Status#getCause()}.
+ *
+ * Note: {@link Status#getCause()} will be set as {@link Span#error(Throwable)} by
+ * default. You don't need to parse it here.
*/
protected void onClose(Status status, Metadata trailers, SpanCustomizer span) {
- if (status != null && status.getCode() != Status.Code.OK) {
- String code = String.valueOf(status.getCode());
- span.tag("grpc.status_code", code);
- span.tag("error", code);
- }
+ if (status == null || status.isOk()) return;
+
+ String code = String.valueOf(status.getCode());
+ span.tag("grpc.status_code", code);
+ if (status.getCause() == null) span.tag("error", code);
}
static @Nullable String method(String fullMethodName) {
diff --git a/instrumentation/grpc/src/main/java/brave/grpc/GrpcPropagation.java b/instrumentation/grpc/src/main/java/brave/grpc/GrpcPropagation.java
index b7ecb41f54..4025fc987c 100644
--- a/instrumentation/grpc/src/main/java/brave/grpc/GrpcPropagation.java
+++ b/instrumentation/grpc/src/main/java/brave/grpc/GrpcPropagation.java
@@ -102,7 +102,7 @@ static final class GrpcInjector implements Injector {
@Override public void inject(TraceContext context, R request) {
if (request instanceof GrpcClientRequest) {
byte[] serialized = TraceContextBinaryFormat.toBytes(context);
- Metadata metadata = ((GrpcClientRequest) request).metadata;
+ Metadata metadata = ((GrpcClientRequest) request).headers;
metadata.removeAll(GRPC_TRACE_BIN);
metadata.put(GRPC_TRACE_BIN, serialized);
TagsBin tags = context.findExtra(TagsBin.class);
@@ -127,7 +127,7 @@ static final class GrpcExtractor implements Extractor {
@Override public TraceContextOrSamplingFlags extract(R request) {
if (!(request instanceof GrpcServerRequest)) return delegate.extract(request);
- Metadata metadata = ((GrpcClientRequest) request).metadata;
+ Metadata metadata = ((GrpcClientRequest) request).headers;
// First, check if we are propagating gRPC tags.
TagsBin tagsBin = metadata.get(GRPC_TAGS_BIN);
diff --git a/instrumentation/grpc/src/main/java/brave/grpc/GrpcServerRequest.java b/instrumentation/grpc/src/main/java/brave/grpc/GrpcServerRequest.java
index b73e93787f..42fc011ffa 100644
--- a/instrumentation/grpc/src/main/java/brave/grpc/GrpcServerRequest.java
+++ b/instrumentation/grpc/src/main/java/brave/grpc/GrpcServerRequest.java
@@ -13,63 +13,79 @@
*/
package brave.grpc;
-import brave.internal.Nullable;
import brave.propagation.Propagation.Getter;
import brave.rpc.RpcServerRequest;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
-import io.grpc.MethodDescriptor;
+import io.grpc.ServerCall;
+import io.grpc.ServerInterceptor;
import java.util.Map;
// intentionally not yet public until we add tag parsing functionality
final class GrpcServerRequest extends RpcServerRequest {
- static final Getter GETTER =
- new Getter() { // retrolambda no like
- @Override public String get(GrpcServerRequest request, String key) {
- return request.getMetadata(key);
- }
+ static final Getter GETTER = new Getter() {
+ @Override public String get(GrpcServerRequest request, String key) {
+ return request.propagationField(key);
+ }
- @Override public String toString() {
- return "GrpcServerRequest::getMetadata";
- }
- };
+ @Override public String toString() {
+ return "GrpcServerRequest::propagationField";
+ }
+ };
final Map> nameToKey;
- final String fullMethodName;
- final Metadata metadata;
+ final ServerCall, ?> call;
+ final Metadata headers;
- GrpcServerRequest(
- Map> nameToKey,
- MethodDescriptor, ?> methodDescriptor,
- Metadata metadata
- ) {
+ GrpcServerRequest(Map> nameToKey, ServerCall, ?> call, Metadata headers) {
if (nameToKey == null) throw new NullPointerException("nameToKey == null");
- if (methodDescriptor == null) throw new NullPointerException("methodDescriptor == null");
- if (metadata == null) throw new NullPointerException("metadata == null");
+ if (call == null) throw new NullPointerException("call == null");
+ if (headers == null) throw new NullPointerException("headers == null");
this.nameToKey = nameToKey;
- this.fullMethodName = methodDescriptor.getFullMethodName();
- this.metadata = metadata;
+ this.call = call;
+ this.headers = headers;
}
+ /** Returns the {@link #call()} */
@Override public Object unwrap() {
- return this;
+ return call;
}
@Override public String method() {
- return GrpcParser.method(fullMethodName);
+ return GrpcParser.method(call.getMethodDescriptor().getFullMethodName());
}
@Override public String service() {
- return GrpcParser.service(fullMethodName);
+ // MethodDescriptor.getServiceName() is not in our floor version: gRPC 1.2
+ return GrpcParser.service(call.getMethodDescriptor().getFullMethodName());
+ }
+
+ /**
+ * Returns the {@linkplain ServerCall server call} passed to {@link
+ * ServerInterceptor#interceptCall}.
+ *
+ * @since 5.12
+ */
+ public ServerCall, ?> call() {
+ return call;
+ }
+
+ /**
+ * Returns the {@linkplain Metadata headers} passed to {@link ServerInterceptor#interceptCall}.
+ *
+ * @since 5.12
+ */
+ public Metadata headers() {
+ return headers;
}
- @Nullable String getMetadata(String name) {
- if (name == null) throw new NullPointerException("name == null");
- Key key = nameToKey.get(name);
+ String propagationField(String keyName) {
+ if (keyName == null) throw new NullPointerException("keyName == null");
+ Key key = nameToKey.get(keyName);
if (key == null) {
- assert false : "We currently don't support getting metadata except propagation fields";
+ assert false : "We currently don't support getting headers except propagation fields";
return null;
}
- return metadata.get(key);
+ return headers.get(key);
}
}
diff --git a/instrumentation/grpc/src/main/java/brave/grpc/GrpcServerResponse.java b/instrumentation/grpc/src/main/java/brave/grpc/GrpcServerResponse.java
new file mode 100644
index 0000000000..7e8551909d
--- /dev/null
+++ b/instrumentation/grpc/src/main/java/brave/grpc/GrpcServerResponse.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2013-2020 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package brave.grpc;
+
+import brave.Response;
+import brave.Span;
+import brave.internal.Nullable;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.Status;
+
+// intentionally not yet public until we add tag parsing functionality
+final class GrpcServerResponse extends Response {
+ final GrpcServerRequest request;
+ @Nullable final Status status;
+ @Nullable final Metadata trailers;
+ @Nullable final Throwable error;
+
+ GrpcServerResponse(GrpcServerRequest request,
+ @Nullable Status status, @Nullable Metadata trailers, @Nullable Throwable error) {
+ if (request == null) throw new NullPointerException("request == null");
+ this.request = request;
+ this.status = status;
+ this.trailers = trailers;
+ this.error = error != null ? error : status != null ? status.getCause() : null;
+ }
+
+ /** Returns the {@link #status()} */
+ @Override @Nullable public Status unwrap() {
+ return status;
+ }
+
+ @Override public Span.Kind spanKind() {
+ return Span.Kind.SERVER;
+ }
+
+ @Override public GrpcServerRequest request() {
+ return request;
+ }
+
+ @Override @Nullable public Throwable error() {
+ return error;
+ }
+
+ /**
+ * Returns the string form of the {@link Status#getCode()} or {@code null} when not {@link
+ * Status#isOk()} or {@link #error()}.
+ */
+ @Nullable public String errorCode() {
+ if (status == null || status.isOk()) return null;
+ return status.getCode().name();
+ }
+
+ /**
+ * Returns the status passed to{@link ServerCall#close(Status, Metadata)} or {@code null} on
+ * {@link #error()}.
+ *
+ * @since 5.12
+ */
+ @Nullable public Status status() {
+ return status;
+ }
+
+ /**
+ * Returns the trailers passed to {@link ServerCall#close(Status, Metadata)} or {@code null} on
+ * {@link #error()}.
+ *
+ * @since 5.12
+ */
+ @Nullable public Metadata trailers() {
+ return trailers;
+ }
+}
diff --git a/instrumentation/grpc/src/main/java/brave/grpc/GrpcTracing.java b/instrumentation/grpc/src/main/java/brave/grpc/GrpcTracing.java
index 829c8df664..aecee09a72 100644
--- a/instrumentation/grpc/src/main/java/brave/grpc/GrpcTracing.java
+++ b/instrumentation/grpc/src/main/java/brave/grpc/GrpcTracing.java
@@ -13,7 +13,6 @@
*/
package brave.grpc;
-import brave.ErrorParser;
import brave.Tracing;
import brave.propagation.Propagation;
import brave.rpc.RpcTracing;
@@ -41,36 +40,38 @@ public static Builder newBuilder(RpcTracing rpcTracing) {
public static final class Builder {
final RpcTracing rpcTracing;
- GrpcClientParser clientParser;
- GrpcServerParser serverParser;
+ GrpcClientParser clientParser = new GrpcClientParser();
+ GrpcServerParser serverParser = new GrpcServerParser();
boolean grpcPropagationFormatEnabled = false;
+ // for interop with old parsers
+ MessageProcessor clientMessageProcessor = MessageProcessor.NOOP;
+ MessageProcessor serverMessageProcessor = MessageProcessor.NOOP;
+
Builder(RpcTracing rpcTracing) {
if (rpcTracing == null) throw new NullPointerException("rpcTracing == null");
this.rpcTracing = rpcTracing;
- // override to re-use any custom error parser from the tracing component
- ErrorParser errorParser = rpcTracing.tracing().errorParser();
- clientParser = new GrpcClientParser() {
- @Override protected ErrorParser errorParser() {
- return errorParser;
- }
- };
- serverParser = new GrpcServerParser() {
- @Override protected ErrorParser errorParser() {
- return errorParser;
- }
- };
+ }
+
+ Builder(GrpcTracing grpcTracing) {
+ rpcTracing = grpcTracing.rpcTracing;
+ clientParser = grpcTracing.clientParser;
+ serverParser = grpcTracing.serverParser;
+ clientMessageProcessor = grpcTracing.clientMessageProcessor;
+ serverMessageProcessor = grpcTracing.serverMessageProcessor;
}
public Builder clientParser(GrpcClientParser clientParser) {
if (clientParser == null) throw new NullPointerException("clientParser == null");
this.clientParser = clientParser;
+ this.clientMessageProcessor = clientParser;
return this;
}
public Builder serverParser(GrpcServerParser serverParser) {
if (serverParser == null) throw new NullPointerException("serverParser == null");
this.serverParser = serverParser;
+ this.serverMessageProcessor = serverParser;
return this;
}
@@ -105,6 +106,9 @@ public GrpcTracing build() {
final Map> nameToKey;
final boolean grpcPropagationFormatEnabled;
+ // for toBuilder()
+ final MessageProcessor clientMessageProcessor, serverMessageProcessor;
+
GrpcTracing(Builder builder) { // intentionally hidden constructor
rpcTracing = builder.rpcTracing;
grpcPropagationFormatEnabled = builder.grpcPropagationFormatEnabled;
@@ -116,12 +120,12 @@ public GrpcTracing build() {
nameToKey = GrpcPropagation.nameToKey(propagation);
clientParser = builder.clientParser;
serverParser = builder.serverParser;
+ clientMessageProcessor = builder.clientMessageProcessor;
+ serverMessageProcessor = builder.serverMessageProcessor;
}
public Builder toBuilder() {
- return new Builder(rpcTracing)
- .clientParser(clientParser)
- .serverParser(serverParser);
+ return new Builder(this);
}
/** This interceptor traces outbound calls */
diff --git a/instrumentation/grpc/src/main/java/brave/grpc/MessageProcessor.java b/instrumentation/grpc/src/main/java/brave/grpc/MessageProcessor.java
new file mode 100644
index 0000000000..f1bdb0dd0c
--- /dev/null
+++ b/instrumentation/grpc/src/main/java/brave/grpc/MessageProcessor.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2013-2020 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package brave.grpc;
+
+import brave.SpanCustomizer;
+
+// only here to deal with deprecated methods.
+abstract class MessageProcessor {
+ static final MessageProcessor NOOP = new MessageProcessor() {
+ @Override public void onMessageSent(Object message, SpanCustomizer span) {
+ }
+
+ @Override public void onMessageReceived(Object message, SpanCustomizer span) {
+ }
+ };
+
+ void onMessageSent(M message, SpanCustomizer span) {
+ }
+
+ void onMessageReceived(M message, SpanCustomizer span) {
+ }
+}
diff --git a/instrumentation/grpc/src/main/java/brave/grpc/TracingClientInterceptor.java b/instrumentation/grpc/src/main/java/brave/grpc/TracingClientInterceptor.java
index 3c74f132d7..db4eb92eff 100644
--- a/instrumentation/grpc/src/main/java/brave/grpc/TracingClientInterceptor.java
+++ b/instrumentation/grpc/src/main/java/brave/grpc/TracingClientInterceptor.java
@@ -13,7 +13,9 @@
*/
package brave.grpc;
+import brave.NoopSpanCustomizer;
import brave.Span;
+import brave.SpanCustomizer;
import brave.Tracer;
import brave.internal.Nullable;
import brave.propagation.CurrentTraceContext;
@@ -30,95 +32,165 @@
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Metadata;
+import io.grpc.Metadata.Key;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
import static brave.grpc.GrpcClientRequest.SETTER;
// not exposed directly as implementation notably changes between versions 1.2 and 1.3
final class TracingClientInterceptor implements ClientInterceptor {
- final Tracer tracer;
+ final Map> nameToKey;
final CurrentTraceContext currentTraceContext;
+ final Tracer tracer;
final SamplerFunction sampler;
final Injector injector;
final GrpcClientParser parser;
- final Map> nameToKey;
+ final MessageProcessor messageProcessor;
TracingClientInterceptor(GrpcTracing grpcTracing) {
- tracer = grpcTracing.rpcTracing.tracing().tracer();
+ nameToKey = grpcTracing.nameToKey;
currentTraceContext = grpcTracing.rpcTracing.tracing().currentTraceContext();
+ tracer = grpcTracing.rpcTracing.tracing().tracer();
sampler = grpcTracing.rpcTracing.clientSampler();
injector = grpcTracing.propagation.injector(SETTER);
parser = grpcTracing.clientParser;
- nameToKey = grpcTracing.nameToKey;
+ messageProcessor = grpcTracing.clientMessageProcessor;
}
@Override
public ClientCall interceptCall(MethodDescriptor method,
CallOptions callOptions, Channel next) {
- TraceContext invocationContext = currentTraceContext.get();
-
- GrpcClientRequest request = new GrpcClientRequest(nameToKey, method);
- Span span = tracer.nextSpanWithParent(sampler, request, invocationContext);
-
- Throwable error = null;
- try (Scope scope = currentTraceContext.maybeScope(span.context())) {
- return new SimpleForwardingClientCall(next.newCall(method, callOptions)) {
- @Override public void start(Listener responseListener, Metadata headers) {
- request.metadata = headers;
- injector.inject(span.context(), request);
- span.kind(Span.Kind.CLIENT).start();
- try (Scope scope = currentTraceContext.maybeScope(span.context())) {
- parser.onStart(method, callOptions, headers, span.customizer());
-
- // See RATIONALE.md which notes response callbacks are in the invocation context
- responseListener = new TraceContextCallListener<>(
- responseListener,
- currentTraceContext,
- invocationContext
- );
-
- super.start(new TracingClientCallListener<>(responseListener, span), headers);
- }
- }
-
- @Override public void sendMessage(ReqT message) {
- try (Scope scope = currentTraceContext.maybeScope(span.context())) {
- super.sendMessage(message);
- parser.onMessageSent(message, span.customizer());
- }
- }
- };
- } catch (Throwable e) {
- error = e;
- throw e;
- } finally {
- if (error != null) span.error(error).finish();
+ return new TracingClientCall<>(
+ method, callOptions, currentTraceContext.get(), next.newCall(method, callOptions));
+ }
+
+ void finish(GrpcClientResponse response, @Nullable Span span) {
+ if (span == null || span.isNoop()) return;
+ Throwable error = response.error();
+ if (error != null) span.error(error);
+ parser.onClose(response.status, response.trailers, span.customizer());
+ span.finish();
+ }
+
+ void finishWithError(@Nullable Span span, Throwable error) {
+ if (span == null || span.isNoop()) return;
+ if (error != null) span.error(error);
+ span.finish();
+ }
+
+ final class TracingClientCall extends SimpleForwardingClientCall {
+ final MethodDescriptor method;
+ final CallOptions callOptions;
+ final TraceContext invocationContext;
+ final AtomicReference spanRef = new AtomicReference<>();
+
+ TracingClientCall(MethodDescriptor method, CallOptions callOptions,
+ TraceContext invocationContext, ClientCall call) {
+ super(call);
+ this.method = method;
+ this.callOptions = callOptions;
+ this.invocationContext = invocationContext;
+ }
+
+ @Override public void start(Listener responseListener, Metadata headers) {
+ GrpcClientRequest request =
+ new GrpcClientRequest(nameToKey, method, callOptions, delegate(), headers);
+
+ Span span = tracer.nextSpanWithParent(sampler, request, invocationContext);
+ injector.inject(span.context(), request);
+ if (!span.isNoop()) {
+ span.kind(Span.Kind.CLIENT).start();
+ parser.onStart(method, callOptions, headers, span.customizer());
+ }
+ spanRef.set(span);
+
+ responseListener = new TracingClientCallListener<>(
+ responseListener,
+ invocationContext,
+ spanRef,
+ request
+ );
+
+ try (Scope scope = currentTraceContext.maybeScope(span.context())) {
+ super.start(responseListener, headers);
+ } catch (Throwable e) {
+ // Another interceptor may throw an exception during start, in which case no other
+ // callbacks are called, so go ahead and close the span here.
+ finishWithError(spanRef.getAndSet(null), e);
+ throw e;
+ }
+ }
+
+ @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {
+ try (Scope scope = maybeScopeClientOrInvocationContext(spanRef, invocationContext)) {
+ delegate().cancel(message, cause);
+ }
+ }
+
+ @Override public void halfClose() {
+ try (Scope scope = maybeScopeClientOrInvocationContext(spanRef, invocationContext)) {
+ delegate().halfClose();
+ } catch (Throwable e) {
+ // If there was an exception executing onHalfClose, we don't expect other lifecycle
+ // commands to succeed. Accordingly, we close the span
+ finishWithError(spanRef.getAndSet(null), e);
+ throw e;
+ }
}
+
+ @Override public void request(int numMessages) {
+ try (Scope scope = maybeScopeClientOrInvocationContext(spanRef, invocationContext)) {
+ delegate().request(numMessages);
+ }
+ }
+
+ @Override public void sendMessage(ReqT message) {
+ try (Scope scope = maybeScopeClientOrInvocationContext(spanRef, invocationContext)) {
+ delegate().sendMessage(message);
+ Span span = spanRef.get(); // could be an error
+ SpanCustomizer customizer = span != null ? span.customizer() : NoopSpanCustomizer.INSTANCE;
+ messageProcessor.onMessageSent(message, customizer);
+ }
+ }
+ }
+
+ /** Scopes the client context or the invocation if the client span finished */
+ Scope maybeScopeClientOrInvocationContext(
+ AtomicReference spanRef,
+ @Nullable TraceContext invocationContext
+ ) {
+ Span span = spanRef.get();
+ TraceContext context = span != null ? span.context() : invocationContext;
+ return currentTraceContext.maybeScope(context);
}
- static final class TraceContextCallListener
- extends SimpleForwardingClientCallListener {
- final CurrentTraceContext currentTraceContext;
+ final class TracingClientCallListener extends SimpleForwardingClientCallListener {
@Nullable final TraceContext invocationContext;
+ final AtomicReference spanRef;
+ final GrpcClientRequest request;
- TraceContextCallListener(
+ TracingClientCallListener(
Listener delegate,
- CurrentTraceContext currentTraceContext,
- @Nullable TraceContext invocationContext
+ @Nullable TraceContext invocationContext,
+ AtomicReference spanRef,
+ GrpcClientRequest request
) {
super(delegate);
- this.currentTraceContext = currentTraceContext;
this.invocationContext = invocationContext;
+ this.spanRef = spanRef;
+ this.request = request;
}
@Override public void onReady() {
- try (Scope scope = currentTraceContext.maybeScope(invocationContext)) {
+ try (Scope scope = maybeScopeClientOrInvocationContext(spanRef, invocationContext)) {
delegate().onReady();
}
}
+ // See instrumentation/RATIONALE.md for why the below response callbacks are invocation context
@Override public void onHeaders(Metadata headers) {
try (Scope scope = currentTraceContext.maybeScope(invocationContext)) {
delegate().onHeaders(headers);
@@ -127,39 +199,21 @@ static final class TraceContextCallListener
@Override public void onMessage(RespT message) {
try (Scope scope = currentTraceContext.maybeScope(invocationContext)) {
+ Span span = spanRef.get(); // could be an error
+ SpanCustomizer customizer = span != null ? span.customizer() : NoopSpanCustomizer.INSTANCE;
+ messageProcessor.onMessageReceived(message, customizer);
delegate().onMessage(message);
}
}
@Override public void onClose(Status status, Metadata trailers) {
+ // See /instrumentation/grpc/RATIONALE.md for why we don't catch exceptions from the delegate
+ GrpcClientResponse response = new GrpcClientResponse(request, status, trailers, null);
+ finish(response, spanRef.getAndSet(null));
+
try (Scope scope = currentTraceContext.maybeScope(invocationContext)) {
delegate().onClose(status, trailers);
}
}
}
-
- final class TracingClientCallListener extends SimpleForwardingClientCallListener {
- final Span span;
-
- TracingClientCallListener(Listener responseListener, Span span) {
- super(responseListener);
- this.span = span;
- }
-
- @Override public void onMessage(RespT message) {
- try (Scope scope = currentTraceContext.maybeScope(span.context())) {
- parser.onMessageReceived(message, span.customizer());
- delegate().onMessage(message);
- }
- }
-
- @Override public void onClose(Status status, Metadata trailers) {
- try (Scope scope = currentTraceContext.maybeScope(span.context())) {
- super.onClose(status, trailers);
- parser.onClose(status, trailers, span.customizer());
- } finally {
- span.finish();
- }
- }
- }
}
diff --git a/instrumentation/grpc/src/main/java/brave/grpc/TracingServerInterceptor.java b/instrumentation/grpc/src/main/java/brave/grpc/TracingServerInterceptor.java
index f3d15d5b5f..c709958684 100644
--- a/instrumentation/grpc/src/main/java/brave/grpc/TracingServerInterceptor.java
+++ b/instrumentation/grpc/src/main/java/brave/grpc/TracingServerInterceptor.java
@@ -13,10 +13,14 @@
*/
package brave.grpc;
+import brave.NoopSpanCustomizer;
import brave.Span;
+import brave.SpanCustomizer;
import brave.Tracer;
+import brave.internal.Nullable;
import brave.propagation.CurrentTraceContext;
import brave.propagation.CurrentTraceContext.Scope;
+import brave.propagation.TraceContext;
import brave.propagation.TraceContext.Extractor;
import brave.propagation.TraceContextOrSamplingFlags;
import brave.rpc.RpcRequest;
@@ -24,57 +28,63 @@
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener;
import io.grpc.Metadata;
+import io.grpc.Metadata.Key;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
import static brave.grpc.GrpcServerRequest.GETTER;
// not exposed directly as implementation notably changes between versions 1.2 and 1.3
final class TracingServerInterceptor implements ServerInterceptor {
- final Tracer tracer;
+ final Map> nameToKey;
final CurrentTraceContext currentTraceContext;
+ final Tracer tracer;
final Extractor extractor;
final SamplerFunction sampler;
final GrpcServerParser parser;
- final Map> nameToKey;
final boolean grpcPropagationFormatEnabled;
+ final MessageProcessor messageProcessor;
TracingServerInterceptor(GrpcTracing grpcTracing) {
- tracer = grpcTracing.rpcTracing.tracing().tracer();
+ nameToKey = grpcTracing.nameToKey;
currentTraceContext = grpcTracing.rpcTracing.tracing().currentTraceContext();
+ tracer = grpcTracing.rpcTracing.tracing().tracer();
extractor = grpcTracing.propagation.extractor(GETTER);
sampler = grpcTracing.rpcTracing.serverSampler();
parser = grpcTracing.serverParser;
- nameToKey = grpcTracing.nameToKey;
grpcPropagationFormatEnabled = grpcTracing.grpcPropagationFormatEnabled;
+ messageProcessor = grpcTracing.serverMessageProcessor;
}
@Override
public Listener interceptCall(ServerCall call,
Metadata headers, ServerCallHandler next) {
- GrpcServerRequest request =
- new GrpcServerRequest(nameToKey, call.getMethodDescriptor(), headers);
- TraceContextOrSamplingFlags extracted = extractor.extract(request);
- Span span = nextSpan(extracted, request).kind(Span.Kind.SERVER);
- parser.onStart(call, headers, span.customizer());
+ GrpcServerRequest request = new GrpcServerRequest(nameToKey, call, headers);
+
+ Span span = nextSpan(extractor.extract(request), request);
+ if (!span.isNoop()) {
+ span.kind(Span.Kind.SERVER).start();
+ parser.onStart(call, headers, span.customizer());
+ }
+ AtomicReference spanRef = new AtomicReference<>(span);
+
// startCall invokes user interceptors, so we place the span in scope here
Listener result;
- Throwable error = null;
try (Scope scope = currentTraceContext.maybeScope(span.context())) {
- result = next.startCall(new TracingServerCall<>(span, call), headers);
+ result = next.startCall(new TracingServerCall<>(call, span, spanRef, request), headers);
} catch (Throwable e) {
- error = e;
+ // Another interceptor may throw an exception during startCall, in which case no other
+ // callbacks are called, so go ahead and close the span here.
+ finishWithError(spanRef.getAndSet(null), e);
throw e;
- } finally {
- if (error != null) span.error(error).finish();
}
- // This ensures the server implementation can see the span in scope
- return new TracingServerCallListener<>(result, currentTraceContext, parser, span);
+ return new TracingServerCallListener<>(result, span, spanRef, request);
}
/** Creates a potentially noop span representing this request */
@@ -91,96 +101,116 @@ Span nextSpan(TraceContextOrSamplingFlags extracted, GrpcServerRequest request)
: tracer.nextSpan(extracted);
}
+ void finish(GrpcServerResponse response, @Nullable Span span) {
+ if (span == null || span.isNoop()) return;
+ Throwable error = response.error();
+ if (error != null) span.error(error);
+ parser.onClose(response.status, response.trailers, span.customizer());
+ span.finish();
+ }
+
+ void finishWithError(@Nullable Span span, Throwable error) {
+ if (span == null || span.isNoop()) return;
+ if (error != null) span.error(error);
+ span.finish();
+ }
+
final class TracingServerCall extends SimpleForwardingServerCall {
- final Span span;
+ final TraceContext context;
+ final AtomicReference spanRef;
+ final GrpcServerRequest request;
- TracingServerCall(Span span, ServerCall call) {
- super(call);
- this.span = span;
+ TracingServerCall(ServerCall delegate, Span span, AtomicReference spanRef,
+ GrpcServerRequest request) {
+ super(delegate);
+ this.context = span.context();
+ this.spanRef = spanRef;
+ this.request = request;
}
@Override public void request(int numMessages) {
- span.start();
- try (Scope scope = currentTraceContext.maybeScope(span.context())) {
- super.request(numMessages);
+ try (Scope scope = currentTraceContext.maybeScope(context)) {
+ delegate().request(numMessages);
}
}
@Override public void sendHeaders(Metadata headers) {
- try (Scope scope = currentTraceContext.maybeScope(span.context())) {
- super.sendHeaders(headers);
+ try (Scope scope = currentTraceContext.maybeScope(context)) {
+ delegate().sendHeaders(headers);
}
}
@Override public void sendMessage(RespT message) {
- try (Scope scope = currentTraceContext.maybeScope(span.context())) {
- super.sendMessage(message);
- parser.onMessageSent(message, span.customizer());
+ try (Scope scope = currentTraceContext.maybeScope(context)) {
+ delegate().sendMessage(message);
+ Span span = spanRef.get(); // could be an error
+ SpanCustomizer customizer = span != null ? span.customizer() : NoopSpanCustomizer.INSTANCE;
+ messageProcessor.onMessageSent(message, customizer);
}
}
@Override public void close(Status status, Metadata trailers) {
- try (Scope scope = currentTraceContext.maybeScope(span.context())) {
- super.close(status, trailers);
- parser.onClose(status, trailers, span.customizer());
- } catch (Throwable e) {
- span.error(e);
- throw e;
- } finally {
- span.finish();
+ // See /instrumentation/grpc/RATIONALE.md for why we don't catch exceptions from the delegate
+ GrpcServerResponse response = new GrpcServerResponse(request, status, trailers, null);
+ finish(response, spanRef.getAndSet(null));
+
+ try (Scope scope = currentTraceContext.maybeScope(context)) {
+ delegate().close(status, trailers);
}
}
}
- static final class TracingServerCallListener
- extends SimpleForwardingServerCallListener {
- final CurrentTraceContext currentTraceContext;
- final Span span;
- final GrpcServerParser parser;
-
- TracingServerCallListener(Listener delegate, CurrentTraceContext currentTraceContext,
- GrpcServerParser parser, Span span) {
+ final class TracingServerCallListener extends SimpleForwardingServerCallListener {
+ final TraceContext context;
+ final AtomicReference spanRef;
+ final GrpcServerRequest request;
+
+ TracingServerCallListener(
+ Listener delegate,
+ Span span,
+ AtomicReference spanRef,
+ GrpcServerRequest request
+ ) {
super(delegate);
- this.currentTraceContext = currentTraceContext;
- this.span = span;
- this.parser = parser;
+ this.context = span.context();
+ this.spanRef = spanRef;
+ this.request = request;
}
- @Override public void onMessage(ReqT message) {
- try (Scope scope = currentTraceContext.maybeScope(span.context())) {
- parser.onMessageReceived(message, span.customizer());
+ @Override public void onMessage(RespT message) {
+ try (Scope scope = currentTraceContext.maybeScope(context)) {
delegate().onMessage(message);
+ Span span = spanRef.get(); // could be an error
+ SpanCustomizer customizer = span != null ? span.customizer() : NoopSpanCustomizer.INSTANCE;
+ messageProcessor.onMessageReceived(message, customizer);
}
}
@Override public void onHalfClose() {
- Throwable error = null;
- try (Scope scope = currentTraceContext.maybeScope(span.context())) {
+ try (Scope scope = currentTraceContext.maybeScope(context)) {
delegate().onHalfClose();
} catch (Throwable e) {
- error = e;
- throw e;
- } finally {
// If there was an exception executing onHalfClose, we don't expect other lifecycle
// commands to succeed. Accordingly, we close the span
- if (error != null) span.error(error).finish();
+ finishWithError(spanRef.getAndSet(null), e);
+ throw e;
}
}
@Override public void onCancel() {
- try (Scope scope = currentTraceContext.maybeScope(span.context())) {
+ try (Scope scope = currentTraceContext.maybeScope(context)) {
delegate().onCancel();
}
}
@Override public void onComplete() {
- try (Scope scope = currentTraceContext.maybeScope(span.context())) {
+ try (Scope scope = currentTraceContext.maybeScope(context)) {
delegate().onComplete();
}
}
@Override public void onReady() {
- try (Scope scope = currentTraceContext.maybeScope(span.context())) {
+ try (Scope scope = currentTraceContext.maybeScope(context)) {
delegate().onReady();
}
}
diff --git a/instrumentation/grpc/src/test/java/brave/grpc/BaseITTracingClientInterceptor.java b/instrumentation/grpc/src/test/java/brave/grpc/BaseITTracingClientInterceptor.java
index 39611de289..cf5e173d56 100644
--- a/instrumentation/grpc/src/test/java/brave/grpc/BaseITTracingClientInterceptor.java
+++ b/instrumentation/grpc/src/test/java/brave/grpc/BaseITTracingClientInterceptor.java
@@ -58,8 +58,7 @@
import static brave.sampler.Sampler.ALWAYS_SAMPLE;
import static brave.sampler.Sampler.NEVER_SAMPLE;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.catchThrowableOfType;
-import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assume.assumeTrue;
public abstract class BaseITTracingClientInterceptor extends ITRemote {
@@ -228,26 +227,20 @@ ManagedChannel newClient(ClientInterceptor... clientInterceptors) {
@Test public void onTransportException_addsErrorTag() {
server.stop();
- StatusRuntimeException thrown = catchThrowableOfType(
- () -> GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST),
- StatusRuntimeException.class);
+ assertThatThrownBy(() -> GraterGrpc.newBlockingStub(client).seyHallo(HELLO_REQUEST))
+ .isInstanceOf(StatusRuntimeException.class);
- Span span =
- reporter.takeRemoteSpanWithError(Span.Kind.CLIENT, thrown.getStatus().getCode().toString());
- assertThat(span.tags().get("grpc.status_code"))
- .isEqualTo(span.tags().get("error"));
+ // The error format of the exception message can differ from the span's "error" tag in CI
+ Span span = reporter.takeRemoteSpanWithError(Span.Kind.CLIENT, ".*Connection refused.*");
+ assertThat(span.tags()).containsEntry("grpc.status_code", "UNAVAILABLE");
}
@Test public void addsErrorTag_onUnimplemented() {
- try {
- GraterGrpc.newBlockingStub(client).seyHallo(HELLO_REQUEST);
- failBecauseExceptionWasNotThrown(StatusRuntimeException.class);
- } catch (StatusRuntimeException e) {
- }
+ assertThatThrownBy(() -> GraterGrpc.newBlockingStub(client).seyHallo(HELLO_REQUEST))
+ .isInstanceOf(StatusRuntimeException.class);
Span span = reporter.takeRemoteSpanWithError(Span.Kind.CLIENT, "UNIMPLEMENTED");
- assertThat(span.tags().get("grpc.status_code"))
- .isEqualTo(span.tags().get("error"));
+ assertThat(span.tags().get("grpc.status_code")).isEqualTo("UNIMPLEMENTED");
}
@Test public void addsErrorTag_onCanceledFuture() {
@@ -257,8 +250,7 @@ ManagedChannel newClient(ClientInterceptor... clientInterceptors) {
assumeTrue("lost race on cancel", resp.cancel(true));
Span span = reporter.takeRemoteSpanWithError(Span.Kind.CLIENT, "CANCELLED");
- assertThat(span.tags().get("grpc.status_code"))
- .isEqualTo(span.tags().get("error"));
+ assertThat(span.tags().get("grpc.status_code")).isEqualTo("CANCELLED");
}
/**
@@ -273,14 +265,17 @@ ManagedChannel newClient(ClientInterceptor... clientInterceptors) {
new ClientInterceptor() {
@Override public ClientCall interceptCall(
MethodDescriptor method, CallOptions callOptions, Channel next) {
- tracing.tracer().currentSpanCustomizer().annotate("before");
- return new SimpleForwardingClientCall(
- next.newCall(method, callOptions)) {
+ return new SimpleForwardingClientCall(next.newCall(method, callOptions)) {
@Override
public void start(Listener responseListener, Metadata headers) {
tracing.tracer().currentSpanCustomizer().annotate("start");
super.start(responseListener, headers);
}
+
+ @Override public void sendMessage(ReqT message) {
+ tracing.tracer().currentSpanCustomizer().annotate("sendMessage");
+ super.sendMessage(message);
+ }
};
}
},
@@ -291,7 +286,7 @@ public void start(Listener responseListener, Metadata headers) {
assertThat(reporter.takeRemoteSpan(Span.Kind.CLIENT).annotations())
.extracting(Annotation::value)
- .containsOnly("before", "start");
+ .containsOnly("start", "sendMessage");
}
@Test public void clientParserTest() {
@@ -318,7 +313,12 @@ protected String spanName(MethodDescriptor methodDesc
}).build();
client = newClient();
- GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST);
+ ScopedSpan parent = tracing.tracer().startScopedSpan("parent");
+ try {
+ GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST);
+ } finally {
+ parent.finish();
+ }
Span span = reporter.takeRemoteSpan(Span.Kind.CLIENT);
assertThat(span.name()).isEqualTo("unary");
@@ -326,9 +326,10 @@ protected String spanName(MethodDescriptor methodDesc
"grpc.message_received", "grpc.message_sent",
"grpc.message_received.visible", "grpc.message_sent.visible"
);
+ reporter.takeLocalSpan();
}
- @Test public void clientParserTestStreamingResponse() {
+ @Test public void deprecated_clientParserTestStreamingResponse() {
closeClient(client);
grpcTracing = grpcTracing.toBuilder().clientParser(new GrpcClientParser() {
int receiveCount = 0;
@@ -347,6 +348,46 @@ protected String spanName(MethodDescriptor methodDesc
assertThat(reporter.takeRemoteSpan(Span.Kind.CLIENT).tags()).hasSize(10);
}
+ // Make sure we work well with bad user interceptors.
+
+ @Test public void userInterceptor_throwsOnStart() {
+ closeClient(client);
+ client = newClient(new ClientInterceptor() {
+ @Override public ClientCall interceptCall(
+ MethodDescriptor methodDescriptor, CallOptions callOptions, Channel channel) {
+ ClientCall call = channel.newCall(methodDescriptor, callOptions);
+ return new SimpleForwardingClientCall(call) {
+ @Override public void start(Listener responseListener, Metadata headers) {
+ throw new IllegalStateException("I'm a bad interceptor.");
+ }
+ };
+ }
+ }, grpcTracing.newClientInterceptor());
+
+ assertThatThrownBy(() -> GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST))
+ .isInstanceOf(IllegalStateException.class);
+ reporter.takeRemoteSpanWithError(Span.Kind.CLIENT, "I'm a bad interceptor.");
+ }
+
+ @Test public void userInterceptor_throwsOnHalfClose() {
+ closeClient(client);
+ client = newClient(new ClientInterceptor() {
+ @Override public ClientCall interceptCall(
+ MethodDescriptor methodDescriptor, CallOptions callOptions, Channel channel) {
+ ClientCall call = channel.newCall(methodDescriptor, callOptions);
+ return new SimpleForwardingClientCall(call) {
+ @Override public void halfClose() {
+ throw new IllegalStateException("I'm a bad interceptor.");
+ }
+ };
+ }
+ }, grpcTracing.newClientInterceptor());
+
+ assertThatThrownBy(() -> GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST))
+ .isInstanceOf(IllegalStateException.class);
+ reporter.takeRemoteSpanWithError(Span.Kind.CLIENT, "I'm a bad interceptor.");
+ }
+
/**
* This shows that a {@link ClientInterceptor} can see the server server span when processing the
* request and response.
@@ -362,11 +403,11 @@ protected String spanName(MethodDescriptor methodDesc
}
assertThat(reporter.takeRemoteSpan(Span.Kind.CLIENT).tags())
- .containsOnlyKeys("grpc.message_send.1");
+ .containsKey("grpc.message_send.1");
// Response processing happens on the invocation (parent) trace context
assertThat(reporter.takeLocalSpan().tags())
- .containsOnlyKeys("grpc.message_recv.1");
+ .containsKey("grpc.message_recv.1");
}
@Test public void messageTagging_streaming() {
@@ -382,11 +423,11 @@ protected String spanName(MethodDescriptor methodDesc
}
assertThat(reporter.takeRemoteSpan(Span.Kind.CLIENT).tags())
- .containsOnlyKeys("grpc.message_send.1");
+ .containsKey("grpc.message_send.1");
// Response processing happens on the invocation (parent) trace context
// Intentionally verbose here to show 10 replies
- assertThat(reporter.takeLocalSpan().tags()).containsOnlyKeys(
+ assertThat(reporter.takeLocalSpan().tags()).containsKeys(
"grpc.message_recv.1",
"grpc.message_recv.2",
"grpc.message_recv.3",
diff --git a/instrumentation/grpc/src/test/java/brave/grpc/BaseITTracingServerInterceptor.java b/instrumentation/grpc/src/test/java/brave/grpc/BaseITTracingServerInterceptor.java
index 64aa2281db..b9255d26f3 100644
--- a/instrumentation/grpc/src/test/java/brave/grpc/BaseITTracingServerInterceptor.java
+++ b/instrumentation/grpc/src/test/java/brave/grpc/BaseITTracingServerInterceptor.java
@@ -28,8 +28,8 @@
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
-import io.grpc.ForwardingServerCall;
-import io.grpc.ForwardingServerCallListener;
+import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
+import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
@@ -41,6 +41,7 @@
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
+import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
@@ -62,11 +63,10 @@
import static brave.sampler.Sampler.NEVER_SAMPLE;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
public abstract class BaseITTracingServerInterceptor extends ITRemote {
- RpcTracing rpcTracing = RpcTracing.create(tracing);
- GrpcTracing grpcTracing = GrpcTracing.create(rpcTracing);
+ GrpcTracing grpcTracing = GrpcTracing.create(tracing);
Server server;
ManagedChannel client;
@@ -124,8 +124,7 @@ void init(@Nullable ServerInterceptor userInterceptor) throws IOException {
@Test public void createsChildWhenJoinDisabled() throws IOException {
tracing = tracingBuilder(NEVER_SAMPLE).supportsJoin(false).build();
- rpcTracing = RpcTracing.create(tracing);
- grpcTracing = GrpcTracing.create(rpcTracing);
+ grpcTracing = GrpcTracing.create(tracing);
init();
TraceContext parent = newTraceContext(SamplingFlags.SAMPLED);
@@ -137,8 +136,7 @@ void init(@Nullable ServerInterceptor userInterceptor) throws IOException {
@Test public void samplingDisabled() throws IOException {
tracing = tracingBuilder(NEVER_SAMPLE).build();
- rpcTracing = RpcTracing.create(tracing);
- grpcTracing = GrpcTracing.create(rpcTracing);
+ grpcTracing = GrpcTracing.create(tracing);
init();
GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST);
@@ -183,25 +181,22 @@ public ServerCall.Listener interceptCall(ServerCall GreeterGrpc.newBlockingStub(client)
+ .sayHello(HelloRequest.newBuilder().setName("bad").build()));
+
+ Span span = reporter.takeRemoteSpanWithError(Span.Kind.SERVER, "IllegalArgumentException");
+ assertThat(span.tags()).containsEntry("grpc.status_code", "UNKNOWN");
}
@Test public void addsErrorTagOnRuntimeException() {
- try {
- GreeterGrpc.newBlockingStub(client)
- .sayHello(HelloRequest.newBuilder().setName("testerror").build());
- failBecauseExceptionWasNotThrown(StatusRuntimeException.class);
- } catch (StatusRuntimeException e) {
- reporter.takeRemoteSpanWithError(Span.Kind.SERVER, "testerror");
- }
+ assertThatThrownBy(() -> GreeterGrpc.newBlockingStub(client)
+ .sayHello(HelloRequest.newBuilder().setName("testerror").build()))
+ .isInstanceOf(StatusRuntimeException.class);
+
+ Span span = reporter.takeRemoteSpanWithError(Span.Kind.SERVER, "testerror");
+ assertThat(span.tags().get("grpc.status_code")).isNull();
}
@Test
@@ -276,6 +271,72 @@ protected String spanName(MethodDescriptor methodDesc
// @After will also check that sayHelloWithManyReplies was not sampled
}
+ // Make sure we work well with bad user interceptors.
+
+ @Test public void userInterceptor_ThrowsOnStartCall() throws IOException {
+ init(new ServerInterceptor() {
+ @Override public ServerCall.Listener interceptCall(
+ ServerCall call, Metadata metadata, ServerCallHandler next) {
+ throw new IllegalStateException("I'm a bad interceptor.");
+ }
+ });
+
+ assertThatThrownBy(() -> GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST))
+ .isInstanceOf(StatusRuntimeException.class);
+ reporter.takeRemoteSpanWithError(Span.Kind.SERVER, "I'm a bad interceptor.");
+ }
+
+ @Test public void userInterceptor_throwsOnSendMessage() throws IOException {
+ init(new ServerInterceptor() {
+ @Override public ServerCall.Listener interceptCall(
+ ServerCall call, Metadata metadata, ServerCallHandler next) {
+ return next.startCall(new SimpleForwardingServerCall(call) {
+ @Override public void sendMessage(RespT message) {
+ throw new IllegalStateException("I'm a bad interceptor.");
+ }
+ }, metadata);
+ }
+ });
+
+ assertThatThrownBy(() -> GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST))
+ .isInstanceOf(StatusRuntimeException.class);
+ reporter.takeRemoteSpanWithError(Span.Kind.SERVER, "I'm a bad interceptor.");
+ }
+
+ @Test public void userInterceptor_throwsOnClose() throws IOException {
+ init(new ServerInterceptor() {
+ @Override public ServerCall.Listener interceptCall(
+ ServerCall call, Metadata metadata, ServerCallHandler next) {
+ return next.startCall(new SimpleForwardingServerCall(call) {
+ @Override public void close(Status status, Metadata trailers) {
+ throw new IllegalStateException("I'm a bad interceptor.");
+ }
+ }, metadata);
+ }
+ });
+
+ assertThatThrownBy(() -> GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST))
+ .isInstanceOf(StatusRuntimeException.class);
+ reporter.takeRemoteSpanWithError(Span.Kind.SERVER, "I'm a bad interceptor.");
+ }
+
+ @Test public void userInterceptor_throwsOnOnHalfClose() throws IOException {
+ init(new ServerInterceptor() {
+ @Override public ServerCall.Listener interceptCall(
+ ServerCall call, Metadata metadata, ServerCallHandler next) {
+ return new SimpleForwardingServerCallListener(next.startCall(call, metadata)) {
+ @Override public void onHalfClose() {
+ throw new IllegalStateException("I'm a bad interceptor.");
+ }
+ };
+ }
+ });
+
+ assertThatThrownBy(() -> GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST))
+ .isInstanceOf(StatusRuntimeException.class);
+ reporter.takeRemoteSpanWithError(Span.Kind.SERVER, "I'm a bad interceptor.");
+ }
+
/**
* This shows that a {@link ServerInterceptor} can see the server server span when processing the
* request and response.
@@ -286,15 +347,16 @@ protected String spanName(MethodDescriptor methodDesc
AtomicInteger recvs = new AtomicInteger();
init(new ServerInterceptor() {
- @Override public ServerCall.Listener interceptCall(ServerCall call,
- Metadata headers, ServerCallHandler next) {
- call = new ForwardingServerCall.SimpleForwardingServerCall(call) {
+ @Override
+ public ServerCall.Listener interceptCall(
+ ServerCall call, Metadata headers, ServerCallHandler next) {
+ call = new SimpleForwardingServerCall(call) {
@Override public void sendMessage(RespT message) {
delegate().sendMessage(message);
customizer.tag("grpc.message_send." + sends.getAndIncrement(), message.toString());
}
};
- return new ForwardingServerCallListener.SimpleForwardingServerCallListener(next.startCall(call, headers)) {
+ return new SimpleForwardingServerCallListener(next.startCall(call, headers)) {
@Override public void onMessage(ReqT message) {
customizer.tag("grpc.message_recv." + recvs.getAndIncrement(), message.toString());
delegate().onMessage(message);
@@ -305,7 +367,7 @@ protected String spanName(MethodDescriptor methodDesc
GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST);
- assertThat(reporter.takeRemoteSpan(Span.Kind.SERVER).tags()).containsOnlyKeys(
+ assertThat(reporter.takeRemoteSpan(Span.Kind.SERVER).tags()).containsKeys(
"grpc.message_recv.0", "grpc.message_send.0"
);
@@ -314,7 +376,7 @@ protected String spanName(MethodDescriptor methodDesc
assertThat(replies).toIterable().hasSize(10);
// Intentionally verbose here to show that only one recv and 10 replies
- assertThat(reporter.takeRemoteSpan(Span.Kind.SERVER).tags()).containsOnlyKeys(
+ assertThat(reporter.takeRemoteSpan(Span.Kind.SERVER).tags()).containsKeys(
"grpc.message_recv.1",
"grpc.message_send.1",
"grpc.message_send.2",
diff --git a/instrumentation/grpc/src/test/java/brave/grpc/GrpcClientRequestSetterTest.java b/instrumentation/grpc/src/test/java/brave/grpc/GrpcClientRequestSetterTest.java
deleted file mode 100644
index 5df365af11..0000000000
--- a/instrumentation/grpc/src/test/java/brave/grpc/GrpcClientRequestSetterTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright 2013-2020 The OpenZipkin Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package brave.grpc;
-
-import brave.propagation.Propagation.Setter;
-import brave.test.propagation.PropagationSetterTest;
-import io.grpc.Metadata;
-import io.grpc.Metadata.Key;
-import java.util.Collections;
-import java.util.Map;
-
-import static brave.grpc.GrpcClientRequest.SETTER;
-import static brave.grpc.TestObjects.METHOD_DESCRIPTOR;
-
-public class GrpcClientRequestSetterTest extends PropagationSetterTest {
- Map> nameToKey = GrpcPropagation.nameToKey(propagation);
- GrpcClientRequest request =
- new GrpcClientRequest(nameToKey, METHOD_DESCRIPTOR).metadata(new Metadata());
-
- @Override protected GrpcClientRequest request() {
- return request;
- }
-
- @Override protected Setter setter() {
- return SETTER;
- }
-
- @Override protected Iterable read(GrpcClientRequest request, String key) {
- Iterable result = request.metadata.getAll(nameToKey.get(key));
- return result != null ? result : Collections.emptyList();
- }
-}
diff --git a/instrumentation/grpc/src/test/java/brave/grpc/GrpcClientRequestTest.java b/instrumentation/grpc/src/test/java/brave/grpc/GrpcClientRequestTest.java
index 2e4b59f2b5..243f4e29a8 100644
--- a/instrumentation/grpc/src/test/java/brave/grpc/GrpcClientRequestTest.java
+++ b/instrumentation/grpc/src/test/java/brave/grpc/GrpcClientRequestTest.java
@@ -13,39 +13,65 @@
*/
package brave.grpc;
+import io.grpc.CallOptions;
+import io.grpc.ClientCall;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
+import io.grpc.MethodDescriptor;
import org.junit.Test;
-import static brave.grpc.TestObjects.METHOD_DESCRIPTOR;
import static java.util.Collections.singletonMap;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
public class GrpcClientRequestTest {
Key b3Key = Key.of("b3", Metadata.ASCII_STRING_MARSHALLER);
- Metadata metadata = new Metadata();
- GrpcClientRequest request = new GrpcClientRequest(singletonMap("b3", b3Key), METHOD_DESCRIPTOR);
+ MethodDescriptor, ?> methodDescriptor = TestObjects.METHOD_DESCRIPTOR;
+ CallOptions callOptions = CallOptions.DEFAULT;
+ ClientCall, ?> call = mock(ClientCall.class);
+ Metadata headers = new Metadata();
+ GrpcClientRequest request =
+ new GrpcClientRequest(singletonMap("b3", b3Key), methodDescriptor, callOptions, call, headers);
- @Test public void metadata() {
- request.metadata(metadata).setMetadata("b3", "1");
+ @Test public void service() {
+ assertThat(request.service()).isEqualTo("helloworld.Greeter");
+ }
+
+ @Test public void method() {
+ assertThat(request.method()).isEqualTo("SayHello");
+ }
+
+ @Test public void unwrap() {
+ assertThat(request.unwrap()).isSameAs(call);
+ }
+
+ @Test public void methodDescriptor() {
+ assertThat(request.methodDescriptor()).isSameAs(methodDescriptor);
+ }
+
+ @Test public void callOptions() {
+ assertThat(request.callOptions()).isSameAs(callOptions);
+ }
+
+ @Test public void call() {
+ assertThat(request.call()).isSameAs(call);
+ }
+
+ @Test public void propagationField() {
+ request.propagationField("b3", "d");
- assertThat(metadata.get(b3Key))
- .isEqualTo("1");
+ assertThat(headers.get(b3Key)).isEqualTo("d");
}
- @Test public void metadata_replace() {
- metadata.put(b3Key, "0");
+ @Test public void propagationField_replace() {
+ headers.put(b3Key, "0");
- request.metadata(metadata).setMetadata("b3", "1");
+ request.propagationField("b3", "1");
- assertThat(request.metadata.get(b3Key))
- .isEqualTo("1");
+ assertThat(request.headers.get(b3Key)).isEqualTo("1");
}
- @Test public void metadata_null() {
- assertThatThrownBy(() -> request.setMetadata("b3", "1"))
- .isNotInstanceOf(NullPointerException.class)
- .hasMessage("This code should never be called when null");
+ @Test public void propagationField_null() {
+ assertThat(request.headers.get(b3Key)).isNull();
}
}
diff --git a/instrumentation/grpc/src/test/java/brave/grpc/GrpcClientResponseTest.java b/instrumentation/grpc/src/test/java/brave/grpc/GrpcClientResponseTest.java
new file mode 100644
index 0000000000..44b3bb6a08
--- /dev/null
+++ b/instrumentation/grpc/src/test/java/brave/grpc/GrpcClientResponseTest.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2013-2020 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package brave.grpc;
+
+import io.grpc.CallOptions;
+import io.grpc.ClientCall;
+import io.grpc.Metadata;
+import io.grpc.Metadata.Key;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import org.junit.Test;
+
+import static java.util.Collections.singletonMap;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class GrpcClientResponseTest {
+ Key b3Key = Key.of("b3", Metadata.ASCII_STRING_MARSHALLER);
+ MethodDescriptor, ?> methodDescriptor = TestObjects.METHOD_DESCRIPTOR;
+ CallOptions callOptions = CallOptions.DEFAULT;
+ ClientCall, ?> call = mock(ClientCall.class);
+ Metadata headers = new Metadata();
+ GrpcClientRequest request =
+ new GrpcClientRequest(singletonMap("b3", b3Key), methodDescriptor, callOptions, call, headers);
+ Status status = Status.CANCELLED;
+ Metadata trailers = new Metadata();
+ Throwable error;
+ GrpcClientResponse response = new GrpcClientResponse(request, status, trailers, error);
+
+ @Test public void request() {
+ assertThat(response.request()).isSameAs(request);
+ }
+
+ @Test public void status() {
+ assertThat(response.status()).isSameAs(status);
+ }
+
+ @Test public void unwrap() {
+ assertThat(response.unwrap()).isSameAs(status);
+ }
+
+ @Test public void trailers() {
+ assertThat(response.trailers()).isSameAs(trailers);
+ }
+
+ @Test public void error_null() {
+ assertThat(response.error()).isNull();
+ }
+
+ @Test public void error() {
+ RuntimeException error = new RuntimeException("noodles");
+ GrpcClientResponse response = new GrpcClientResponse(request, status, trailers, error);
+
+ assertThat(response.error()).isSameAs(error);
+ }
+
+ @Test public void error_fromStatus() {
+ RuntimeException error = new RuntimeException("noodles");
+ status = Status.fromThrowable(error);
+ GrpcClientResponse response = new GrpcClientResponse(request, status, trailers, null);
+
+ assertThat(response.error()).isSameAs(error);
+ assertThat(response.errorCode()).isEqualTo("UNKNOWN");
+ }
+
+ @Test public void errorCode_nullWhenOk() {
+ status = Status.OK;
+ GrpcClientResponse response = new GrpcClientResponse(request, status, trailers, null);
+
+ assertThat(response.errorCode()).isNull();
+ }
+
+ @Test public void errorCode() {
+ assertThat(response.errorCode()).isEqualTo("CANCELLED");
+ }
+}
diff --git a/instrumentation/grpc/src/test/java/brave/grpc/GrpcServerRequestTest.java b/instrumentation/grpc/src/test/java/brave/grpc/GrpcServerRequestTest.java
index 7c99a026fd..2df7759fe1 100644
--- a/instrumentation/grpc/src/test/java/brave/grpc/GrpcServerRequestTest.java
+++ b/instrumentation/grpc/src/test/java/brave/grpc/GrpcServerRequestTest.java
@@ -13,27 +13,60 @@
*/
package brave.grpc;
+import io.grpc.CallOptions;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
+import io.grpc.MethodDescriptor;
+import io.grpc.ServerCall;
import org.junit.Test;
-import static brave.grpc.TestObjects.METHOD_DESCRIPTOR;
import static java.util.Collections.singletonMap;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class GrpcServerRequestTest {
Key b3Key = Key.of("b3", Metadata.ASCII_STRING_MARSHALLER);
+ MethodDescriptor, ?> methodDescriptor = TestObjects.METHOD_DESCRIPTOR;
+ ServerCall call = mock(ServerCall.class);
+ Metadata headers = new Metadata();
GrpcServerRequest request =
- new GrpcServerRequest(singletonMap("b3", b3Key), METHOD_DESCRIPTOR, new Metadata());
+ new GrpcServerRequest(singletonMap("b3", b3Key), call, headers);
- @Test public void metadata() {
- request.metadata.put(b3Key, "1");
+ @Test public void service() {
+ when(call.getMethodDescriptor()).thenReturn(methodDescriptor);
- assertThat(request.getMetadata("b3"))
- .isEqualTo("1");
+ assertThat(request.service()).isEqualTo("helloworld.Greeter");
}
- @Test public void metadata_null() {
- assertThat(request.getMetadata("b3")).isNull();
+ @Test public void method() {
+ when(call.getMethodDescriptor()).thenReturn(methodDescriptor);
+
+ assertThat(request.service()).isEqualTo("helloworld.Greeter");
+ }
+
+ @Test public void unwrap() {
+ assertThat(request.unwrap()).isSameAs(call);
+ }
+
+ @Test public void call() {
+ assertThat(request.call()).isSameAs(call);
+ }
+
+ @Test public void propagationField() {
+ headers.put(b3Key, "1");
+
+ assertThat(request.propagationField("b3")).isEqualTo("1");
+ }
+
+ @Test public void propagationField_null() {
+ assertThat(request.propagationField("b3")).isNull();
+ }
+
+ @Test public void propagationField_lastValue() {
+ headers.put(b3Key, "0");
+ headers.put(b3Key, "1");
+
+ assertThat(request.propagationField("b3")).isEqualTo("1");
}
}
diff --git a/instrumentation/grpc/src/test/java/brave/grpc/GrpcServerResponseTest.java b/instrumentation/grpc/src/test/java/brave/grpc/GrpcServerResponseTest.java
new file mode 100644
index 0000000000..5e92990f0f
--- /dev/null
+++ b/instrumentation/grpc/src/test/java/brave/grpc/GrpcServerResponseTest.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2013-2020 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package brave.grpc;
+
+import io.grpc.Metadata;
+import io.grpc.Metadata.Key;
+import io.grpc.ServerCall;
+import io.grpc.Status;
+import org.junit.Test;
+
+import static java.util.Collections.singletonMap;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class GrpcServerResponseTest {
+ Key b3Key = Key.of("b3", Metadata.ASCII_STRING_MARSHALLER);
+ ServerCall call = mock(ServerCall.class);
+ Metadata headers = new Metadata();
+ GrpcServerRequest request =
+ new GrpcServerRequest(singletonMap("b3", b3Key), call, headers);
+ Status status = Status.CANCELLED;
+ Metadata trailers = new Metadata();
+ GrpcServerResponse response = new GrpcServerResponse(request, status, trailers, null);
+
+ @Test public void request() {
+ assertThat(response.request()).isSameAs(request);
+ }
+
+ @Test public void status() {
+ assertThat(response.status()).isSameAs(status);
+ }
+
+ @Test public void unwrap() {
+ assertThat(response.unwrap()).isSameAs(status);
+ }
+
+ @Test public void trailers() {
+ assertThat(response.trailers()).isSameAs(trailers);
+ }
+
+ @Test public void error_null() {
+ assertThat(response.error()).isNull();
+ }
+
+ @Test public void error() {
+ RuntimeException error = new RuntimeException("noodles");
+ GrpcServerResponse response = new GrpcServerResponse(request, status, trailers, error);
+
+ assertThat(response.error()).isSameAs(error);
+ }
+
+ @Test public void error_fromStatus() {
+ RuntimeException error = new RuntimeException("noodles");
+ status = Status.fromThrowable(error);
+ GrpcServerResponse response = new GrpcServerResponse(request, status, trailers, null);
+
+ assertThat(response.error()).isSameAs(error);
+ assertThat(response.errorCode()).isEqualTo("UNKNOWN");
+ }
+
+ @Test public void errorCode_nullWhenOk() {
+ status = Status.OK;
+ GrpcServerResponse response = new GrpcServerResponse(request, status, trailers, null);
+
+ assertThat(response.errorCode()).isNull();
+ }
+
+ @Test public void errorCode() {
+ assertThat(response.errorCode()).isEqualTo("CANCELLED");
+ }
+}
diff --git a/instrumentation/grpc/src/test/java/brave/grpc/TestServer.java b/instrumentation/grpc/src/test/java/brave/grpc/TestServer.java
index 5ef8d0e607..4751fe0a5b 100644
--- a/instrumentation/grpc/src/test/java/brave/grpc/TestServer.java
+++ b/instrumentation/grpc/src/test/java/brave/grpc/TestServer.java
@@ -37,13 +37,13 @@ class TestServer {
final Server server;
TestServer(Map> nameToKey, Propagation propagation) {
- extractor = propagation.extractor(GrpcServerRequest.GETTER);
+ extractor = propagation.extractor(GrpcServerRequest::propagationField);
server = ServerBuilder.forPort(PickUnusedPort.get())
.addService(ServerInterceptors.intercept(new GreeterImpl(null), new ServerInterceptor() {
@Override
public ServerCall.Listener interceptCall(ServerCall call,
- Metadata metadata, ServerCallHandler next) {
+ Metadata headers, ServerCallHandler next) {
Long delay = delayQueue.poll();
if (delay != null) {
try {
@@ -53,12 +53,8 @@ public ServerCall.Listener interceptCall(ServerCall4.3.1
4.5.11
- 1.27.0
+ 1.29.0
- 4.1.42.Final
+ 4.1.48.Final
2.9.1
4.13