Skip to content

Commit

Permalink
All add much deeper tests to check for span statuses plus retries+abort
Browse files Browse the repository at this point in the history
Also while here, fixed #1246
  • Loading branch information
odeke-em committed Dec 3, 2024
1 parent 9e49db9 commit 8d6dad0
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 20 deletions.
12 changes: 10 additions & 2 deletions google/cloud/spanner_v1/_opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ def trace_call_end_lazily(
ctx_manager.__enter__()

def discard(exc_type=None, exc_value=None, exc_traceback=None):
if not exc_type:
span.set_status(Status(StatusCode.OK))

ctx_manager.__exit__(exc_type, exc_value, exc_traceback)

return discard
Expand Down Expand Up @@ -175,7 +178,12 @@ def trace_call(name, session=None, extra_attributes=None, observability_options=
span.record_exception(error)
raise
else:
span.set_status(Status(StatusCode.OK))
if span._status.status_code == StatusCode.UNSET:
# OpenTelemetry-Python only allows a status change
# if the current code is UNSET or ERROR. At the end
# of the generator's consumption, only set it to OK
# it wasn't previously set otherwise
span.set_status(Status(StatusCode.OK))


def set_span_status_error(span, error):
Expand Down Expand Up @@ -204,5 +212,5 @@ def add_event_on_current_span(event_name, attributes=None, span=None):

def record_span_exception_and_status(span, exc):
if span:
span.set_status(Status(StatusCode.ERROR, "foo"))
span.set_status(Status(StatusCode.ERROR, str(exc)))
span.record_exception(exc)
9 changes: 8 additions & 1 deletion google/cloud/spanner_v1/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class _BatchBase(_SessionWrapper):
def __init__(self, session):
super(_BatchBase, self).__init__(session)
self._mutations = []
self.__discard_span = trace_call_end_lazily(
self.__base_discard_span = trace_call_end_lazily(
f"CloudSpanner.{type(self).__name__}",
self._session,
None,
Expand Down Expand Up @@ -157,6 +157,11 @@ def delete(self, table, keyset):
dict(table=table),
)

def _discard_on_end(self, exc_type=None, exc_val=None, exc_traceback=None):
if self.__base_discard_span:
self.__base_discard_span(exc_type, exc_val, exc_traceback)
self.__base_discard_span = None


class Batch(_BatchBase):
"""Accumulate mutations for transmission during :meth:`commit`."""
Expand Down Expand Up @@ -253,6 +258,7 @@ def commit(
)
self.committed = response.commit_timestamp
self.commit_stats = response.commit_stats
self._discard_on_end()
return self.committed

def __enter__(self):
Expand All @@ -276,6 +282,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
if self.__discard_span:
self.__discard_span(exc_type, exc_val, exc_tb)
self.__discard_span = None
self._discard_on_end()


class MutationGroup(_BatchBase):
Expand Down
1 change: 0 additions & 1 deletion google/cloud/spanner_v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ def exists(self):
)

observability_options = getattr(self._database, "observability_options", None)
print(f"obsopts {observability_options}")
with trace_call(
"CloudSpanner.GetSession", self, observability_options=observability_options
) as span:
Expand Down
1 change: 1 addition & 0 deletions google/cloud/spanner_v1/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def _restart_on_unavailable(
span_name, session, attributes, observability_options=observability_options
):
iterator = method(request=request)

while True:
try:
for item in iterator:
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/spanner_v1/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ def rollback(self):
)
self.rolled_back = True
del self._session._transaction
self._discard_on_end()

def commit(
self, return_commit_stats=False, request_options=None, max_commit_delay=None
Expand Down Expand Up @@ -286,6 +287,7 @@ def commit(
if return_commit_stats:
self.commit_stats = response.commit_stats
del self._session._transaction
self._discard_on_end()
return self.committed

@staticmethod
Expand Down
45 changes: 45 additions & 0 deletions tests/system/test_observability_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,48 @@ def _make_credentials():
from google.auth.credentials import AnonymousCredentials

return AnonymousCredentials()


from tests import _helpers as ot_helpers


@pytest.mark.skipif(
not ot_helpers.HAS_OPENTELEMETRY_INSTALLED,
reason="Tracing requires OpenTelemetry",
)
def test_trace_call_keeps_span_error_status():
# Verifies that after our span's status was set to ERROR
# that it doesn't unconditionally get changed to OK
# per https://github.com/googleapis/python-spanner/issues/1246
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
)
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.sampling import ALWAYS_ON
from opentelemetry import trace

tracer_provider = TracerProvider(sampler=ALWAYS_ON)
trace_exporter = InMemorySpanExporter()
tracer_provider.add_span_processor(SimpleSpanProcessor(trace_exporter))
observability_options = dict(tracer_provider=tracer_provider)

with trace_call(
"VerifyBehavior", observability_options=observability_options
) as span:
span.set_status(Status(StatusCode.ERROR, "Our error exhibit"))

span_list = trace_exporter.get_finished_spans()
got_statuses = []

for span in span_list:
got_statuses.append(
(span.name, span.status.status_code, span.status.description)
)

want_statuses = [
("VerifyBehavior", StatusCode.ERROR, "Our error exhibit"),
]
assert got_statuses == want_statuses
Loading

0 comments on commit 8d6dad0

Please sign in to comment.