Skip to content

Commit

Permalink
Fix to allow topic to be passed via kwargs (#2901)
Browse files Browse the repository at this point in the history
* Fix to allow topic to be imported from kwargs

* add changelog

* lint

* separate assert function
  • Loading branch information
bourbonkk authored Oct 22, 2024
1 parent e4ece57 commit cef28d6
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `opentelemetry-instrumentation-aiokafka` Wrap `AIOKafkaConsumer.getone()` instead of `AIOKafkaConsumer.__anext__`
([#2874](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2874))
- `opentelemetry-instrumentation-confluent-kafka` Fix to allow `topic` to be extracted from `kwargs` in `produce()`
([#2901])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2901)

### Breaking changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,9 @@ def wrap_produce(func, instance, tracer, args, kwargs):
headers = []
kwargs["headers"] = headers

topic = KafkaPropertiesExtractor.extract_produce_topic(args)
topic = KafkaPropertiesExtractor.extract_produce_topic(
args, kwargs
)
_enrich_span(
span,
topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ def _extract_argument(key, position, default_value, args, kwargs):
return kwargs.get(key, default_value)

@staticmethod
def extract_produce_topic(args):
def extract_produce_topic(args, kwargs):
"""extract topic from `produce` method arguments in Producer class"""
if len(args) > 0:
return args[0]
return "unknown"
return kwargs.get("topic") or (args[0] if args else "unknown")

@staticmethod
def extract_produce_headers(args, kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,15 @@ def _compare_spans(self, spans, expected_spans):
expected_attribute_value, span.attributes[attribute_key]
)

def _assert_topic(self, span, expected_topic: str) -> None:
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_DESTINATION],
expected_topic,
)

def _assert_span_count(self, span_list, expected_count: int) -> None:
self.assertEqual(len(span_list), expected_count)

def test_producer_poll(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
message_queue = []
Expand All @@ -299,6 +308,9 @@ def test_producer_poll(self) -> None:
producer.produce(topic="topic-1", key="key-1", value="value-1")
msg = producer.poll()
self.assertIsNotNone(msg)
span_list = self.memory_exporter.get_finished_spans()
self._assert_span_count(span_list, 1)
self._assert_topic(span_list[0], "topic-1")

def test_producer_flush(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
Expand All @@ -315,3 +327,6 @@ def test_producer_flush(self) -> None:
producer.produce(topic="topic-1", key="key-1", value="value-1")
msg = producer.flush()
self.assertIsNotNone(msg)
span_list = self.memory_exporter.get_finished_spans()
self._assert_span_count(span_list, 1)
self._assert_topic(span_list[0], "topic-1")

0 comments on commit cef28d6

Please sign in to comment.