diff --git a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py index 1ec142077..96f2c53e5 100644 --- a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py +++ b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py @@ -45,9 +45,12 @@ def __init__(self, message: PubsubMessage): self._scheduler_span: Optional[trace.Span] = None # This will be set by `start_subscribe_span` method and will be used - # for other spans, such as + # for other spans, such as process span. self._subscription_id: Optional[str] = None + # This will be set by `start_process_span` method. + self._process_span: Optional[trace.Span] = None + @property def subscription_id(self): return self._subscription_id @@ -136,3 +139,23 @@ def start_subscribe_scheduler_span(self) -> None: def end_subscribe_scheduler_span(self) -> None: assert self._scheduler_span is not None self._scheduler_span.end() + + def start_process_span(self) -> None: + assert self._subscribe_span is not None + tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) + publish_create_span_link: Optional[trace.Link] = ( + trace.Link(self._subscribe_span.parent) + if self._subscribe_span.parent + else None + ) + with tracer.start_as_current_span( + name=f"{self._subscription_id} process", + attributes={ + "messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM, + }, + kind=trace.SpanKind.INTERNAL, + context=set_span_in_context(self._subscribe_span), + links=[publish_create_span_link] if publish_create_span_link else None, + end_on_exit=False, + ) as process_span: + self._process_span = process_span diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 4b3df6df0..cc815f114 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -128,6 +128,7 @@ def _wrap_callback_errors( try: if message.opentelemetry_data: message.opentelemetry_data.end_subscribe_concurrency_control_span() + message.opentelemetry_data.start_process_span() callback(message) except BaseException as exc: # Note: the likelihood of this failing is extremely low. This just adds diff --git a/tests/unit/pubsub_v1/subscriber/test_subscribe_opentelemetry.py b/tests/unit/pubsub_v1/subscriber/test_subscribe_opentelemetry.py index aad87ebdf..8a5542493 100644 --- a/tests/unit/pubsub_v1/subscriber/test_subscribe_opentelemetry.py +++ b/tests/unit/pubsub_v1/subscriber/test_subscribe_opentelemetry.py @@ -124,3 +124,10 @@ def test_opentelemetry_end_subscribe_scheduler_span_assertion_error(): opentelemetry_data = SubscribeOpenTelemetry(msg) with pytest.raises(AssertionError): opentelemetry_data.end_subscribe_scheduler_span() + + +def test_opentelemetry_start_process_span_assertion_error(): + msg = create_message(b"foo") + opentelemetry_data = SubscribeOpenTelemetry(msg) + with pytest.raises(AssertionError): + opentelemetry_data.start_process_span()