diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index 94722516e0..ca3916cf24 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -142,6 +142,9 @@ def trace_call_end_lazily( ctx_manager.__enter__() def discard(exc_type=None, exc_value=None, exc_traceback=None): + if not exc_type: + span.set_status(Status(StatusCode.OK)) + ctx_manager.__exit__(exc_type, exc_value, exc_traceback) return discard @@ -175,7 +178,12 @@ def trace_call(name, session=None, extra_attributes=None, observability_options= span.record_exception(error) raise else: - span.set_status(Status(StatusCode.OK)) + if span._status.status_code == StatusCode.UNSET: + # OpenTelemetry-Python only allows a status change + # if the current code is UNSET or ERROR. At the end + # of the generator's consumption, only set it to OK + # it wasn't previously set otherwise + span.set_status(Status(StatusCode.OK)) def set_span_status_error(span, error): @@ -204,5 +212,5 @@ def add_event_on_current_span(event_name, attributes=None, span=None): def record_span_exception_and_status(span, exc): if span: - span.set_status(Status(StatusCode.ERROR, "foo")) + span.set_status(Status(StatusCode.ERROR, str(exc))) span.record_exception(exc) diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index b66571182d..86b2dcb7d5 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -50,7 +50,7 @@ class _BatchBase(_SessionWrapper): def __init__(self, session): super(_BatchBase, self).__init__(session) self._mutations = [] - self.__discard_span = trace_call_end_lazily( + self.__base_discard_span = trace_call_end_lazily( f"CloudSpanner.{type(self).__name__}", self._session, None, @@ -157,6 +157,11 @@ def delete(self, table, keyset): dict(table=table), ) + def _discard_on_end(self, exc_type=None, exc_val=None, exc_traceback=None): + if self.__base_discard_span: + self.__base_discard_span(exc_type, exc_val, exc_traceback) + self.__base_discard_span = None + class Batch(_BatchBase): """Accumulate mutations for transmission during :meth:`commit`.""" @@ -253,6 +258,7 @@ def commit( ) self.committed = response.commit_timestamp self.commit_stats = response.commit_stats + self._discard_on_end() return self.committed def __enter__(self): @@ -276,6 +282,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): if self.__discard_span: self.__discard_span(exc_type, exc_val, exc_tb) self.__discard_span = None + self._discard_on_end() class MutationGroup(_BatchBase): diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index cedfe5babf..e12b597378 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -180,7 +180,6 @@ def exists(self): ) observability_options = getattr(self._database, "observability_options", None) - print(f"obsopts {observability_options}") with trace_call( "CloudSpanner.GetSession", self, observability_options=observability_options ) as span: diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 3e950332a3..2a43dbfa63 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -90,6 +90,7 @@ def _restart_on_unavailable( span_name, session, attributes, observability_options=observability_options ): iterator = method(request=request) + while True: try: for item in iterator: diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index 00339e1344..2bf8d1040a 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -206,6 +206,7 @@ def rollback(self): ) self.rolled_back = True del self._session._transaction + self._discard_on_end() def commit( self, return_commit_stats=False, request_options=None, max_commit_delay=None @@ -286,6 +287,7 @@ def commit( if return_commit_stats: self.commit_stats = response.commit_stats del self._session._transaction + self._discard_on_end() return self.committed @staticmethod diff --git a/tests/system/test_observability_options.py b/tests/system/test_observability_options.py index d1323ebb28..009f4eb01e 100644 --- a/tests/system/test_observability_options.py +++ b/tests/system/test_observability_options.py @@ -137,3 +137,48 @@ def _make_credentials(): from google.auth.credentials import AnonymousCredentials return AnonymousCredentials() + + +from tests import _helpers as ot_helpers + + +@pytest.mark.skipif( + not ot_helpers.HAS_OPENTELEMETRY_INSTALLED, + reason="Tracing requires OpenTelemetry", +) +def test_trace_call_keeps_span_error_status(): + # Verifies that after our span's status was set to ERROR + # that it doesn't unconditionally get changed to OK + # per https://github.com/googleapis/python-spanner/issues/1246 + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) + from google.cloud.spanner_v1._opentelemetry_tracing import trace_call + from opentelemetry.trace.status import Status, StatusCode + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.sampling import ALWAYS_ON + from opentelemetry import trace + + tracer_provider = TracerProvider(sampler=ALWAYS_ON) + trace_exporter = InMemorySpanExporter() + tracer_provider.add_span_processor(SimpleSpanProcessor(trace_exporter)) + observability_options = dict(tracer_provider=tracer_provider) + + with trace_call( + "VerifyBehavior", observability_options=observability_options + ) as span: + span.set_status(Status(StatusCode.ERROR, "Our error exhibit")) + + span_list = trace_exporter.get_finished_spans() + got_statuses = [] + + for span in span_list: + got_statuses.append( + (span.name, span.status.status_code, span.status.description) + ) + + want_statuses = [ + ("VerifyBehavior", StatusCode.ERROR, "Our error exhibit"), + ] + assert got_statuses == want_statuses diff --git a/tests/system/test_session_api.py b/tests/system/test_session_api.py index 47f3d6d445..ce725137dc 100644 --- a/tests/system/test_session_api.py +++ b/tests/system/test_session_api.py @@ -437,8 +437,6 @@ def test_batch_insert_then_read(sessions_database, ot_exporter): if ot_exporter is not None: span_list = ot_exporter.get_finished_spans() - assert len(span_list) == 6 - assert_span_attributes( ot_exporter, "CloudSpanner.GetSession", @@ -453,28 +451,34 @@ def test_batch_insert_then_read(sessions_database, ot_exporter): ) assert_span_attributes( ot_exporter, - "CloudSpanner.Database.batch", + "CloudSpanner.Batch", attributes=_make_attributes(db_name), span=span_list[2], ) + assert_span_attributes( + ot_exporter, + "CloudSpanner.Database.batch", + attributes=_make_attributes(db_name), + span=span_list[3], + ) assert_span_attributes( ot_exporter, "CloudSpanner.GetSession", attributes=_make_attributes(db_name, session_found=True), - span=span_list[3], + span=span_list[4], ) assert_span_attributes( ot_exporter, "CloudSpanner.Snapshot.read", attributes=_make_attributes(db_name, columns=sd.COLUMNS, table_id=sd.TABLE), - span=span_list[4], + span=span_list[5], ) assert_span_attributes( ot_exporter, "CloudSpanner.Database.snapshot", attributes=_make_attributes(db_name, multi_use=False), - span=span_list[5], + span=span_list[6], ) @@ -621,8 +625,6 @@ def test_transaction_read_and_insert_then_rollback( if ot_exporter is not None: span_list = ot_exporter.get_finished_spans() - print("got_span_names", [span.name for span in span_list]) - # assert len(span_list) == 8 assert_span_attributes( ot_exporter, @@ -644,16 +646,22 @@ def test_transaction_read_and_insert_then_rollback( ) assert_span_attributes( ot_exporter, - "CloudSpanner.Database.batch", + "CloudSpanner.Batch", attributes=_make_attributes(db_name), span=span_list[3], ) assert_span_attributes( ot_exporter, - "CloudSpanner.BeginTransaction", + "CloudSpanner.Database.batch", attributes=_make_attributes(db_name), span=span_list[4], ) + assert_span_attributes( + ot_exporter, + "CloudSpanner.BeginTransaction", + attributes=_make_attributes(db_name), + span=span_list[5], + ) assert_span_attributes( ot_exporter, @@ -663,7 +671,7 @@ def test_transaction_read_and_insert_then_rollback( table_id=sd.TABLE, columns=sd.COLUMNS, ), - span=span_list[5], + span=span_list[6], ) assert_span_attributes( ot_exporter, @@ -673,13 +681,19 @@ def test_transaction_read_and_insert_then_rollback( table_id=sd.TABLE, columns=sd.COLUMNS, ), - span=span_list[6], + span=span_list[7], ) assert_span_attributes( ot_exporter, "CloudSpanner.Transaction.rollback", attributes=_make_attributes(db_name), - span=span_list[7], + span=span_list[8], + ) + assert_span_attributes( + ot_exporter, + "CloudSpanner.Transaction", + attributes=_make_attributes(db_name), + span=span_list[9], ) assert_span_attributes( ot_exporter, @@ -689,7 +703,7 @@ def test_transaction_read_and_insert_then_rollback( table_id=sd.TABLE, columns=sd.COLUMNS, ), - span=span_list[8], + span=span_list[10], ) @@ -720,6 +734,159 @@ def _transaction_read_then_raise(transaction): assert rows == [] +@pytest.mark.skipif( + not _helpers.USE_EMULATOR, + reason="Emulator needed to run this tests", +) +@pytest.mark.skipif( + not ot_helpers.HAS_OPENTELEMETRY_INSTALLED, + reason="Tracing requires OpenTelemetry", +) +def test_transaction_abort_then_retry_spans(sessions_database, ot_exporter): + from google.auth.credentials import AnonymousCredentials + from google.api_core.exceptions import Aborted + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) + from opentelemetry.trace.status import StatusCode + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.sampling import ALWAYS_ON + from opentelemetry import trace + + PROJECT = _helpers.EMULATOR_PROJECT + CONFIGURATION_NAME = "config-name" + INSTANCE_ID = _helpers.INSTANCE_ID + DISPLAY_NAME = "display-name" + DATABASE_ID = _helpers.unique_id("temp_db") + NODE_COUNT = 5 + LABELS = {"test": "true"} + + counters = dict(aborted=0) + already_aborted = False + + def select_in_txn(txn): + from google.rpc import error_details_pb2 + + results = txn.execute_sql("SELECT 1") + for row in results: + _ = row + + if counters["aborted"] == 0: + counters["aborted"] = 1 + raise Aborted( + "Thrown from ClientInterceptor for testing", + errors=[FauxCall(code_pb2.ABORTED)], + ) + + tracer_provider = TracerProvider(sampler=ALWAYS_ON) + trace_exporter = InMemorySpanExporter() + tracer_provider.add_span_processor(SimpleSpanProcessor(trace_exporter)) + observability_options = dict( + tracer_provider=tracer_provider, + enable_extended_tracing=True, + ) + + client = spanner_v1.Client( + project=PROJECT, + observability_options=observability_options, + credentials=AnonymousCredentials(), + ) + + instance = client.instance( + INSTANCE_ID, + CONFIGURATION_NAME, + display_name=DISPLAY_NAME, + node_count=NODE_COUNT, + labels=LABELS, + ) + + try: + instance.create() + except Exception: + pass + + db = instance.database(DATABASE_ID) + try: + db.create() + except Exception: + pass + + db.run_in_transaction(select_in_txn) + + span_list = trace_exporter.get_finished_spans() + got_span_names = [span.name for span in span_list] + want_span_names = [ + "CloudSpanner.CreateSession", + "CloudSpanner.Transaction.execute_streaming_sql", + "CloudSpanner.Transaction", + "CloudSpanner.Transaction.execute_streaming_sql", + "CloudSpanner.Transaction.commit", + "CloudSpanner.Transaction", + "CloudSpanner.ReadWriteTransaction", + "CloudSpanner.Database.run_in_transaction", + ] + + assert got_span_names == want_span_names + + # Let's check for the series of events + want_events = [ + ("Creating Transaction", {}), + ("Using Transaction", {"attempt": 1}), + ( + "exception", + { + "exception.type": "google.api_core.exceptions.Aborted", + "exception.message": "409 Thrown from ClientInterceptor for testing", + "exception.stacktrace": "EPHEMERAL", + "exception.escaped": "False", + }, + ), + ( + "Transaction was aborted, retrying", + {"delay_seconds": "EPHEMERAL", "attempt": 1}, + ), + ("Creating Transaction", {}), + ("Using Transaction", {"attempt": 2}), + ] + got_events = [] + got_statuses = [] + + # Some event attributes are noisy/highly ephemeral + # and can't be directly compared against. + imprecise_event_attributes = ["exception.stacktrace", "delay_seconds"] + for span in span_list: + got_statuses.append( + (span.name, span.status.status_code, span.status.description) + ) + for event in span.events: + evt_attributes = event.attributes.copy() + for attr_name in imprecise_event_attributes: + if attr_name in evt_attributes: + evt_attributes[attr_name] = "EPHEMERAL" + + got_events.append((event.name, evt_attributes)) + + assert got_events == want_events + + codes = StatusCode + want_statuses = [ + ("CloudSpanner.CreateSession", codes.OK, None), + ("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None), + ("CloudSpanner.Transaction", codes.UNSET, None), + ("CloudSpanner.Transaction.execute_streaming_sql", codes.OK, None), + ("CloudSpanner.Transaction.commit", codes.OK, None), + ("CloudSpanner.Transaction", codes.OK, None), + ( + "CloudSpanner.ReadWriteTransaction", + codes.ERROR, + "409 Thrown from ClientInterceptor for testing", + ), + ("CloudSpanner.Database.run_in_transaction", codes.OK, None), + ] + assert got_statuses == want_statuses + + @_helpers.retry_mabye_conflict def test_transaction_read_and_insert_or_update_then_commit( sessions_database, @@ -1208,13 +1375,15 @@ def unit_of_work(transaction): "CloudSpanner.CreateSession", "CloudSpanner.Batch.commit", "CloudSpanner.Batch", + "CloudSpanner.Batch", "CloudSpanner.DMLTransaction", "CloudSpanner.Transaction.commit", + "CloudSpanner.Transaction", "CloudSpanner.ReadWriteTransaction", "Test Span", ] - got_spans = [span.name for span in span_list] - assert got_spans == expected_span_names + got_span_names = [span.name for span in span_list] + assert got_span_names == expected_span_names # [CreateSession --> Batch] should have their own trace. session_parent_span = span_list[0]