Skip to content

Commit

Permalink
Align gRPC server status code to span status code
Browse files Browse the repository at this point in the history
  • Loading branch information
dragon3 committed Nov 3, 2023
1 parent 61d17dd commit 60704dd
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 4 deletions.
37 changes: 33 additions & 4 deletions interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,11 @@ func (i *Interceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
)
}
attributes = attributeFilter(req, attributes...)
span.SetStatus(spanStatus(protocol, err))
if isClient {
span.SetStatus(clientSpanStatus(protocol, err))
} else {
span.SetStatus(serverSpanStatus(protocol, err))
}
span.SetAttributes(attributes...)
instrumentation.duration.Record(ctx, i.config.now().Sub(requestStartTime).Milliseconds(), metric.WithAttributes(attributes...))
instrumentation.requestSize.Record(ctx, int64(requestSize), metric.WithAttributes(attributes...))
Expand Down Expand Up @@ -239,7 +243,7 @@ func (i *Interceptor) WrapStreamingClient(next connect.StreamingClientFunc) conn
}
span.SetAttributes(state.attributes...)
span.SetAttributes(headerAttributes(protocol, responseKey, conn.ResponseHeader(), i.config.responseHeaderKeys)...)
span.SetStatus(spanStatus(protocol, state.error))
span.SetStatus(clientSpanStatus(protocol, state.error))
span.End()
instrumentation.duration.Record(ctx, i.config.now().Sub(requestStartTime).Milliseconds(), metric.WithAttributes(state.attributes...))
},
Expand Down Expand Up @@ -323,7 +327,7 @@ func (i *Interceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) co
}
span.SetAttributes(state.attributes...)
span.SetAttributes(headerAttributes(protocol, responseKey, conn.ResponseHeader(), i.config.responseHeaderKeys)...)
span.SetStatus(spanStatus(protocol, err))
span.SetStatus(serverSpanStatus(protocol, err))
instrumentation.duration.Record(ctx, i.config.now().Sub(requestStartTime).Milliseconds(), metric.WithAttributes(state.attributes...))
return err
}
Expand All @@ -342,7 +346,7 @@ func protocolToSemConv(protocol string) string {
}
}

func spanStatus(protocol string, err error) (codes.Code, string) {
func clientSpanStatus(protocol string, err error) (codes.Code, string) {
if err == nil {
return codes.Unset, ""
}
Expand All @@ -354,3 +358,28 @@ func spanStatus(protocol string, err error) (codes.Code, string) {
}
return codes.Error, err.Error()
}

func serverSpanStatus(protocol string, err error) (codes.Code, string) {
if err == nil {
return codes.Unset, ""
}
if protocol == connectProtocol && connect.IsNotModifiedError(err) {
return codes.Unset, ""
}

if connectErr := new(connect.Error); errors.As(err, &connectErr) {
switch connectErr.Code() {
case connect.CodeUnknown,
connect.CodeDeadlineExceeded,
connect.CodeUnimplemented,
connect.CodeInternal,
connect.CodeUnavailable,
connect.CodeDataLoss:
return codes.Error, connectErr.Message()
default:
return codes.Unset, ""
}
}

return codes.Error, err.Error()
}
100 changes: 100 additions & 0 deletions interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package otelconnect

import (
"context"
"errors"
"math/rand"
"net"
"net/http"
Expand Down Expand Up @@ -1828,6 +1829,82 @@ func TestWithoutTraceEventsUnary(t *testing.T) {
}, spanRecorder.Ended())
}

func TestServerSpanStatus(t *testing.T) {
t.Parallel()
var propagator propagation.TraceContext
for _, tc := range serverSpanStatusTestCases {
spanRecorder := tracetest.NewSpanRecorder()
traceProvider := trace.NewTracerProvider(trace.WithSpanProcessor(spanRecorder))
clientSpanRecorder := tracetest.NewSpanRecorder()
clientTraceProvider := trace.NewTracerProvider(trace.WithSpanProcessor(clientSpanRecorder))
pingClient, _, _ := startServer([]connect.HandlerOption{
connect.WithInterceptors(NewInterceptor(
WithTracerProvider(traceProvider),
WithoutTraceEvents(),
)),
}, []connect.ClientOption{
connect.WithInterceptors(
NewInterceptor(
WithPropagator(propagator),
WithTracerProvider(clientTraceProvider),
)),
}, &pluggablePingServer{
ping: func(ctx context.Context, r *connect.Request[pingv1.PingRequest]) (*connect.Response[pingv1.PingResponse], error) {
return nil, connect.NewError(tc.connectCode, errors.New(tc.connectCode.String()))
},
})
if _, err := pingClient.Ping(context.Background(), requestOfSize(1, 0)); err == nil {
t.Error("want error")
}
require.Len(t, spanRecorder.Ended(), 1)
require.Equal(t, codes.Error, clientSpanRecorder.Ended()[0].Status().Code)
require.Equal(t, tc.wantServerSpanCode, spanRecorder.Ended()[0].Status().Code)
require.Equal(t, tc.wantServerSpanDescription, spanRecorder.Ended()[0].Status().Description)
}
}

func TestStreamingServerSpanStatus(t *testing.T) {
t.Parallel()
var propagator propagation.TraceContext
for _, tc := range serverSpanStatusTestCases {
handlerSpanRecorder := tracetest.NewSpanRecorder()
handlerTraceProvider := trace.NewTracerProvider(trace.WithSpanProcessor(handlerSpanRecorder))
clientSpanRecorder := tracetest.NewSpanRecorder()
clientTraceProvider := trace.NewTracerProvider(trace.WithSpanProcessor(clientSpanRecorder))
client, _, _ := startServer(
[]connect.HandlerOption{
connect.WithInterceptors(
NewInterceptor(
WithPropagator(propagator),
WithTracerProvider(handlerTraceProvider),
)),
}, []connect.ClientOption{
connect.WithInterceptors(
NewInterceptor(
WithPropagator(propagator),
WithTracerProvider(clientTraceProvider),
)),
}, &pluggablePingServer{
pingStream: func(ctx context.Context, bs *connect.BidiStream[pingv1.PingStreamRequest, pingv1.PingStreamResponse]) error {
return connect.NewError(tc.connectCode, errors.New(tc.connectCode.String()))
},
})
stream := client.PingStream(context.Background())
assert.NoError(t, stream.Send(&pingv1.PingStreamRequest{
Data: []byte("Hello, otel!"),
}))
_, err := stream.Receive()
assert.Error(t, err)
assert.NoError(t, stream.CloseRequest())
assert.NoError(t, stream.CloseResponse())
assert.Equal(t, len(handlerSpanRecorder.Ended()), 1)
assert.Equal(t, len(clientSpanRecorder.Ended()), 1)
assert.Equal(t, tc.wantServerSpanCode, handlerSpanRecorder.Ended()[0].Status().Code)
assert.Equal(t, tc.wantServerSpanDescription, handlerSpanRecorder.Ended()[0].Status().Description)
assert.Equal(t, clientSpanRecorder.Ended()[0].Status().Code, codes.Error)
}
}

// streamingHandlerInterceptorFunc is a simple Interceptor implementation that only
// wraps streaming handler RPCs. It has no effect on unary or streaming client RPCs.
type streamingHandlerInterceptorFunc func(connect.StreamingHandlerFunc) connect.StreamingHandlerFunc
Expand Down Expand Up @@ -1991,3 +2068,26 @@ func metricResource() *resource.Resource {
attribute.String("telemetry.sdk.version", otel.Version()),
)
}

var serverSpanStatusTestCases = []struct {
connectCode connect.Code
wantServerSpanCode codes.Code
wantServerSpanDescription string
}{
{connectCode: connect.CodeCanceled, wantServerSpanCode: codes.Unset, wantServerSpanDescription: ""},
{connectCode: connect.CodeUnknown, wantServerSpanCode: codes.Error, wantServerSpanDescription: connect.CodeUnknown.String()},
{connectCode: connect.CodeInvalidArgument, wantServerSpanCode: codes.Unset, wantServerSpanDescription: ""},
{connectCode: connect.CodeDeadlineExceeded, wantServerSpanCode: codes.Error, wantServerSpanDescription: connect.CodeDeadlineExceeded.String()},
{connectCode: connect.CodeNotFound, wantServerSpanCode: codes.Unset, wantServerSpanDescription: ""},
{connectCode: connect.CodeAlreadyExists, wantServerSpanCode: codes.Unset, wantServerSpanDescription: ""},
{connectCode: connect.CodePermissionDenied, wantServerSpanCode: codes.Unset, wantServerSpanDescription: ""},
{connectCode: connect.CodeResourceExhausted, wantServerSpanCode: codes.Unset, wantServerSpanDescription: ""},
{connectCode: connect.CodeFailedPrecondition, wantServerSpanCode: codes.Unset, wantServerSpanDescription: ""},
{connectCode: connect.CodeAborted, wantServerSpanCode: codes.Unset, wantServerSpanDescription: ""},
{connectCode: connect.CodeOutOfRange, wantServerSpanCode: codes.Unset, wantServerSpanDescription: ""},
{connectCode: connect.CodeUnimplemented, wantServerSpanCode: codes.Error, wantServerSpanDescription: connect.CodeUnimplemented.String()},
{connectCode: connect.CodeInternal, wantServerSpanCode: codes.Error, wantServerSpanDescription: connect.CodeInternal.String()},
{connectCode: connect.CodeUnavailable, wantServerSpanCode: codes.Error, wantServerSpanDescription: connect.CodeUnavailable.String()},
{connectCode: connect.CodeDataLoss, wantServerSpanCode: codes.Error, wantServerSpanDescription: connect.CodeDataLoss.String()},
{connectCode: connect.CodeUnauthenticated, wantServerSpanCode: codes.Unset, wantServerSpanDescription: ""},
}

0 comments on commit 60704dd

Please sign in to comment.