Skip to content

Commit

Permalink
fix((confluent-kafka): Extracted some shared logic between poll and c…
Browse files Browse the repository at this point in the history
…onsume methods.
  • Loading branch information
javferrod committed Aug 13, 2023
1 parent 8a76540 commit 1815057
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None)
from .package import _instruments
from .utils import (
KafkaPropertiesExtractor,
_end_current_consume_span,
_create_new_consume_span,
_enrich_span,
_get_links_from_records,
_get_span_name,
_kafka_getter,
_kafka_setter,
Expand Down Expand Up @@ -348,23 +349,14 @@ def wrap_produce(func, instance, tracer, args, kwargs):
@staticmethod
def wrap_poll(func, instance, tracer, args, kwargs):
if instance._current_consume_span:
context.detach(instance._current_context_token)
instance._current_context_token = None
instance._current_consume_span.end()
instance._current_consume_span = None
_end_current_consume_span(instance)

with tracer.start_as_current_span(
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
):
record = func(*args, **kwargs)
if record:
links = _get_links_from_records([record])
instance._current_consume_span = tracer.start_span(
name=f"{record.topic()} process",
links=links,
kind=SpanKind.CONSUMER,
)

_create_new_consume_span(instance, tracer, [record])
_enrich_span(
instance._current_consume_span,
record.topic(),
Expand All @@ -381,23 +373,14 @@ def wrap_poll(func, instance, tracer, args, kwargs):
@staticmethod
def wrap_consume(func, instance, tracer, args, kwargs):
if instance._current_consume_span:
context.detach(instance._current_context_token)
instance._current_context_token = None
instance._current_consume_span.end()
instance._current_consume_span = None
_end_current_consume_span(instance)

with tracer.start_as_current_span(
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
):
records = func(*args, **kwargs)
if len(records) > 0:
links = _get_links_from_records(records)
instance._current_consume_span = tracer.start_span(
name=f"{records[0].topic()} process",
links=links,
kind=SpanKind.CONSUMER,
)

_create_new_consume_span(instance, tracer, records)
_enrich_span(
instance._current_consume_span,
records[0].topic(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from logging import getLogger
from typing import List, Optional

from opentelemetry import propagate
from opentelemetry import context, propagate
from opentelemetry.trace import SpanKind, Link
from opentelemetry.propagators import textmap
from opentelemetry.semconv.trace import (
Expand Down Expand Up @@ -83,6 +83,22 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None:
_kafka_getter = KafkaContextGetter()


def _end_current_consume_span(instance):
context.detach(instance._current_context_token)
instance._current_context_token = None
instance._current_consume_span.end()
instance._current_consume_span = None


def _create_new_consume_span(instance, tracer, records):
links = _get_links_from_records(records)
instance._current_consume_span = tracer.start_span(
name=f"{records[0].topic()} process",
links=links,
kind=SpanKind.CONSUMER,
)


def _enrich_span(
span,
topic,
Expand Down

0 comments on commit 1815057

Please sign in to comment.