Skip to content

Commit

Permalink
Align gRPC server span status codes to OTEL specs (#2019)
Browse files Browse the repository at this point in the history
  • Loading branch information
FilipNikolovski authored Mar 7, 2024
1 parent 46a8c59 commit 2e74619
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 68 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Fixed
- Align gRPC span status codes to OTEL specification ([#1756](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1756))

## Version 1.23.0/0.44b0 (2024-02-23)

- Drop support for 3.7
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import wrapt

from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.status import Status, StatusCode

from ._server import OpenTelemetryServerInterceptor, _wrap_rpc_behavior
from ._utilities import _server_status


# pylint:disable=abstract-method
Expand All @@ -36,12 +36,8 @@ async def abort(self, code, details="", trailing_metadata=tuple()):
self._self_active_span.set_attribute(
SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0]
)
self._self_active_span.set_status(
Status(
status_code=StatusCode.ERROR,
description=f"{code}:{details}",
)
)
status = _server_status(code, details)
self._self_active_span.set_status(status)
return await self.__wrapped__.abort(code, details, trailing_metadata)

def set_code(self, code):
Expand All @@ -51,23 +47,15 @@ def set_code(self, code):
SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0]
)
if code != grpc.StatusCode.OK:
self._self_active_span.set_status(
Status(
status_code=StatusCode.ERROR,
description=f"{code}:{details}",
)
)
status = _server_status(code, details)
self._self_active_span.set_status(status)
return self.__wrapped__.set_code(code)

def set_details(self, details):
self._self_details = details
if self._self_code != grpc.StatusCode.OK:
self._self_active_span.set_status(
Status(
status_code=StatusCode.ERROR,
description=f"{self._self_code}:{details}",
)
)
status = _server_status(self._self_code, details)
self._self_active_span.set_status(status)
return self.__wrapped__.set_details(details)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
from opentelemetry.context import attach, detach
from opentelemetry.propagate import extract
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.status import Status, StatusCode

from ._utilities import _server_status

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -124,12 +125,8 @@ def abort(self, code, details):
self._active_span.set_attribute(
SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0]
)
self._active_span.set_status(
Status(
status_code=StatusCode.ERROR,
description=f"{code}:{details}",
)
)
status = _server_status(code, details)
self._active_span.set_status(status)
return self._servicer_context.abort(code, details)

def abort_with_status(self, status):
Expand Down Expand Up @@ -158,23 +155,15 @@ def set_code(self, code):
SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0]
)
if code != grpc.StatusCode.OK:
self._active_span.set_status(
Status(
status_code=StatusCode.ERROR,
description=f"{code}:{details}",
)
)
status = _server_status(code, details)
self._active_span.set_status(status)
return self._servicer_context.set_code(code)

def set_details(self, details):
self._details = details
if self._code != grpc.StatusCode.OK:
self._active_span.set_status(
Status(
status_code=StatusCode.ERROR,
description=f"{self._code}:{details}",
)
)
status = _server_status(self._code, details)
self._active_span.set_status(status)
return self._servicer_context.set_details(details)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

"""Internal utilities."""

import grpc

from opentelemetry.trace.status import Status, StatusCode


class RpcInfo:
def __init__(
Expand All @@ -31,3 +35,21 @@ def __init__(
self.request = request
self.response = response
self.error = error


def _server_status(code, details):
error_status = Status(
status_code=StatusCode.ERROR, description=f"{code}:{details}"
)
status_codes = {
grpc.StatusCode.UNKNOWN: error_status,
grpc.StatusCode.DEADLINE_EXCEEDED: error_status,
grpc.StatusCode.UNIMPLEMENTED: error_status,
grpc.StatusCode.INTERNAL: error_status,
grpc.StatusCode.UNAVAILABLE: error_status,
grpc.StatusCode.DATA_LOSS: error_status,
}

return status_codes.get(
code, Status(status_code=StatusCode.UNSET, description="")
)
Original file line number Diff line number Diff line change
Expand Up @@ -507,9 +507,7 @@ async def test_abort(self):
class AbortServicer(GRPCTestServerServicer):
# pylint:disable=C0103
async def SimpleMethod(self, request, context):
await context.abort(
grpc.StatusCode.FAILED_PRECONDITION, failure_message
)
await context.abort(grpc.StatusCode.INTERNAL, failure_message)

testcase = self

Expand All @@ -520,9 +518,7 @@ async def request(channel):
with testcase.assertRaises(grpc.RpcError) as cm:
await channel.unary_unary(rpc_call)(msg)

self.assertEqual(
cm.exception.code(), grpc.StatusCode.FAILED_PRECONDITION
)
self.assertEqual(cm.exception.code(), grpc.StatusCode.INTERNAL)
self.assertEqual(cm.exception.details(), failure_message)

await run_with_test_server(request, servicer=AbortServicer())
Expand All @@ -543,7 +539,7 @@ async def request(channel):
self.assertEqual(span.status.status_code, StatusCode.ERROR)
self.assertEqual(
span.status.description,
f"{grpc.StatusCode.FAILED_PRECONDITION}:{failure_message}",
f"{grpc.StatusCode.INTERNAL}:{failure_message}",
)

# Check attributes
Expand All @@ -555,7 +551,7 @@ async def request(channel):
SpanAttributes.RPC_METHOD: "SimpleMethod",
SpanAttributes.RPC_SERVICE: "GRPCTestServer",
SpanAttributes.RPC_SYSTEM: "grpc",
SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.FAILED_PRECONDITION.value[
SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.INTERNAL.value[
0
],
},
Expand Down Expand Up @@ -605,11 +601,8 @@ async def request(channel):
)

# make sure this span errored, with the right status and detail
self.assertEqual(span.status.status_code, StatusCode.ERROR)
self.assertEqual(
span.status.description,
f"{grpc.StatusCode.FAILED_PRECONDITION}:{failure_message}",
)
self.assertEqual(span.status.status_code, StatusCode.UNSET)
self.assertEqual(span.status.description, None)

# Check attributes
self.assertSpanHasAttributes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,28 +552,45 @@ def test_abort(self):
# our detailed failure message
failure_message = "This is a test failure"

# aborting RPC handler
def handler(request, context):
# aborting RPC handlers
def error_status_handler(request, context):
context.abort(grpc.StatusCode.INTERNAL, failure_message)

def unset_status_handler(request, context):
context.abort(grpc.StatusCode.FAILED_PRECONDITION, failure_message)

with self.server(
max_workers=1,
interceptors=[interceptor],
) as (server, channel):
server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))
rpc_call = "TestServicer/handler"
rpc_call_error = "TestServicer/error_status_handler"
rpc_call_unset = "TestServicer/unset_status_handler"

rpc_calls = {
rpc_call_error: error_status_handler,
rpc_call_unset: unset_status_handler,
}

for rpc_call, handler in rpc_calls.items():
with self.server(
max_workers=1,
interceptors=[interceptor],
) as (server, channel):
server.add_generic_rpc_handlers(
(UnaryUnaryRpcHandler(handler),)
)

server.start()
# unfortunately, these are just bare exceptions in grpc...
with self.assertRaises(Exception):
channel.unary_unary(rpc_call)(b"")
server.stop(None)
server.start()

with self.assertRaises(Exception):
channel.unary_unary(rpc_call)(b"")

# unfortunately, these are just bare exceptions in grpc...
server.stop(None)

spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
self.assertEqual(len(spans_list), 2)

# check error span
span = spans_list[0]

self.assertEqual(span.name, rpc_call)
self.assertEqual(span.name, rpc_call_error)
self.assertIs(span.kind, trace.SpanKind.SERVER)

# Check version and name in span's instrumentation info
Expand All @@ -585,15 +602,43 @@ def handler(request, context):
self.assertEqual(span.status.status_code, StatusCode.ERROR)
self.assertEqual(
span.status.description,
f"{grpc.StatusCode.FAILED_PRECONDITION}:{failure_message}",
f"{grpc.StatusCode.INTERNAL}:{failure_message}",
)

# Check attributes
self.assertSpanHasAttributes(
span,
{
**self.net_peer_span_attributes,
SpanAttributes.RPC_METHOD: "error_status_handler",
SpanAttributes.RPC_SERVICE: "TestServicer",
SpanAttributes.RPC_SYSTEM: "grpc",
SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.INTERNAL.value[
0
],
},
)

# check unset status span
span = spans_list[1]

self.assertEqual(span.name, rpc_call_unset)
self.assertIs(span.kind, trace.SpanKind.SERVER)

# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationInfo(
span, opentelemetry.instrumentation.grpc
)

self.assertEqual(span.status.description, None)
self.assertEqual(span.status.status_code, StatusCode.UNSET)

# Check attributes
self.assertSpanHasAttributes(
span,
{
**self.net_peer_span_attributes,
SpanAttributes.RPC_METHOD: "handler",
SpanAttributes.RPC_METHOD: "unset_status_handler",
SpanAttributes.RPC_SERVICE: "TestServicer",
SpanAttributes.RPC_SYSTEM: "grpc",
SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.FAILED_PRECONDITION.value[
Expand Down

0 comments on commit 2e74619

Please sign in to comment.