Skip to content

Commit

Permalink
Start process span
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu committed Sep 20, 2024
1 parent a005057 commit 47c9d9a
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 1 deletion.
25 changes: 24 additions & 1 deletion google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@ def __init__(self, message: PubsubMessage):
self._scheduler_span: Optional[trace.Span] = None

# This will be set by `start_subscribe_span` method and will be used
# for other spans, such as
# for other spans, such as process span.
self._subscription_id: Optional[str] = None

# This will be set by `start_process_span` method.
self._process_span: Optional[trace.Span] = None

@property
def subscription_id(self):
return self._subscription_id
Expand Down Expand Up @@ -136,3 +139,23 @@ def start_subscribe_scheduler_span(self) -> None:
def end_subscribe_scheduler_span(self) -> None:
assert self._scheduler_span is not None
self._scheduler_span.end()

def start_process_span(self) -> None:
assert self._subscribe_span is not None
tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
publish_create_span_link: Optional[trace.Link] = (
trace.Link(self._subscribe_span.parent)
if self._subscribe_span.parent
else None
)
with tracer.start_as_current_span(
name=f"{self._subscription_id} process",
attributes={
"messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM,
},
kind=trace.SpanKind.INTERNAL,
context=set_span_in_context(self._subscribe_span),
links=[publish_create_span_link] if publish_create_span_link else None,
end_on_exit=False,
) as process_span:
self._process_span = process_span
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def _wrap_callback_errors(
try:
if message.opentelemetry_data:
message.opentelemetry_data.end_subscribe_concurrency_control_span()
message.opentelemetry_data.start_process_span()
callback(message)
except BaseException as exc:
# Note: the likelihood of this failing is extremely low. This just adds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,10 @@ def test_opentelemetry_end_subscribe_scheduler_span_assertion_error():
opentelemetry_data = SubscribeOpenTelemetry(msg)
with pytest.raises(AssertionError):
opentelemetry_data.end_subscribe_scheduler_span()


def test_opentelemetry_start_process_span_assertion_error():
msg = create_message(b"foo")
opentelemetry_data = SubscribeOpenTelemetry(msg)
with pytest.raises(AssertionError):
opentelemetry_data.start_process_span()

0 comments on commit 47c9d9a

Please sign in to comment.