From 47a70f4b4af3df93260799f01c5de075a38b58a9 Mon Sep 17 00:00:00 2001 From: "allen.k1m" Date: Sat, 12 Oct 2024 00:22:10 +0900 Subject: [PATCH 1/4] Fix to allow topic to be imported from kwargs --- .../instrumentation/confluent_kafka/__init__.py | 2 +- .../instrumentation/confluent_kafka/utils.py | 6 ++---- .../tests/test_instrumentation.py | 10 ++++++++++ 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index 3d1cc79c93..19f8742cfe 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -363,7 +363,7 @@ def wrap_produce(func, instance, tracer, args, kwargs): headers = [] kwargs["headers"] = headers - topic = KafkaPropertiesExtractor.extract_produce_topic(args) + topic = KafkaPropertiesExtractor.extract_produce_topic(args, kwargs) _enrich_span( span, topic, diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index 4769f2a88f..60dc13e675 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -25,11 +25,9 @@ def _extract_argument(key, position, default_value, args, kwargs): return kwargs.get(key, default_value) @staticmethod - def extract_produce_topic(args): + def extract_produce_topic(args, kwargs): """extract topic from `produce` method arguments in Producer class""" - if len(args) > 0: - return args[0] - return "unknown" + return kwargs.get("topic") or (args[0] if args else "unknown") @staticmethod def extract_produce_headers(args, kwargs): diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 27653d6777..fc08db78b1 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -284,6 +284,13 @@ def _compare_spans(self, spans, expected_spans): expected_attribute_value, span.attributes[attribute_key] ) + def _assert_topic(self, expected_topic: str) -> None: + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 1) + span = span_list[0] + self.assertEqual(span.attributes[SpanAttributes.MESSAGING_DESTINATION], expected_topic) + + def test_producer_poll(self) -> None: instrumentation = ConfluentKafkaInstrumentor() message_queue = [] @@ -299,6 +306,8 @@ def test_producer_poll(self) -> None: producer.produce(topic="topic-1", key="key-1", value="value-1") msg = producer.poll() self.assertIsNotNone(msg) + self._assert_topic("topic-1") + def test_producer_flush(self) -> None: instrumentation = ConfluentKafkaInstrumentor() @@ -315,3 +324,4 @@ def test_producer_flush(self) -> None: producer.produce(topic="topic-1", key="key-1", value="value-1") msg = producer.flush() self.assertIsNotNone(msg) + self._assert_topic("topic-1") From 9e17c43d6275110020242cdfc6eb2fe3d1852f07 Mon Sep 17 00:00:00 2001 From: "allen.k1m" Date: Sat, 12 Oct 2024 00:35:38 +0900 Subject: [PATCH 2/4] add changelog --- CHANGELOG.md | 2 ++ .../instrumentation/confluent_kafka/__init__.py | 4 +++- .../tests/test_instrumentation.py | 6 ++++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 94b6c22af8..1367cab7e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-aiokafka` Wrap `AIOKafkaConsumer.getone()` instead of `AIOKafkaConsumer.__anext__` ([#2874](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2874)) +- `opentelemetry-instrumentation-confluent-kafka` Fix to allow `topic` to be extracted from `kwargs` in `produce()` + ([#2901])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2901) ## Version 1.27.0/0.48b0 (2024-08-28) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index 19f8742cfe..95a14627b3 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -363,7 +363,9 @@ def wrap_produce(func, instance, tracer, args, kwargs): headers = [] kwargs["headers"] = headers - topic = KafkaPropertiesExtractor.extract_produce_topic(args, kwargs) + topic = KafkaPropertiesExtractor.extract_produce_topic( + args, kwargs + ) _enrich_span( span, topic, diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index fc08db78b1..9417738435 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -288,7 +288,10 @@ def _assert_topic(self, expected_topic: str) -> None: span_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(span_list), 1) span = span_list[0] - self.assertEqual(span.attributes[SpanAttributes.MESSAGING_DESTINATION], expected_topic) + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_DESTINATION], + expected_topic, + ) def test_producer_poll(self) -> None: @@ -308,7 +311,6 @@ def test_producer_poll(self) -> None: self.assertIsNotNone(msg) self._assert_topic("topic-1") - def test_producer_flush(self) -> None: instrumentation = ConfluentKafkaInstrumentor() message_queue = [] From b4af64c3af2d310cfacf1de1e2605559b7dfc243 Mon Sep 17 00:00:00 2001 From: "allen.k1m" Date: Sat, 12 Oct 2024 00:41:46 +0900 Subject: [PATCH 3/4] lint --- .../tests/test_instrumentation.py | 1 - 1 file changed, 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 9417738435..7dad228001 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -293,7 +293,6 @@ def _assert_topic(self, expected_topic: str) -> None: expected_topic, ) - def test_producer_poll(self) -> None: instrumentation = ConfluentKafkaInstrumentor() message_queue = [] From 9ae469d8234dfc5a80ce5ab53c368776eeb0663f Mon Sep 17 00:00:00 2001 From: "allen.k1m" Date: Sat, 12 Oct 2024 14:03:11 +0900 Subject: [PATCH 4/4] separate assert function --- .../tests/test_instrumentation.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 7dad228001..986116900d 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -284,15 +284,15 @@ def _compare_spans(self, spans, expected_spans): expected_attribute_value, span.attributes[attribute_key] ) - def _assert_topic(self, expected_topic: str) -> None: - span_list = self.memory_exporter.get_finished_spans() - self.assertEqual(len(span_list), 1) - span = span_list[0] + def _assert_topic(self, span, expected_topic: str) -> None: self.assertEqual( span.attributes[SpanAttributes.MESSAGING_DESTINATION], expected_topic, ) + def _assert_span_count(self, span_list, expected_count: int) -> None: + self.assertEqual(len(span_list), expected_count) + def test_producer_poll(self) -> None: instrumentation = ConfluentKafkaInstrumentor() message_queue = [] @@ -308,7 +308,9 @@ def test_producer_poll(self) -> None: producer.produce(topic="topic-1", key="key-1", value="value-1") msg = producer.poll() self.assertIsNotNone(msg) - self._assert_topic("topic-1") + span_list = self.memory_exporter.get_finished_spans() + self._assert_span_count(span_list, 1) + self._assert_topic(span_list[0], "topic-1") def test_producer_flush(self) -> None: instrumentation = ConfluentKafkaInstrumentor() @@ -325,4 +327,6 @@ def test_producer_flush(self) -> None: producer.produce(topic="topic-1", key="key-1", value="value-1") msg = producer.flush() self.assertIsNotNone(msg) - self._assert_topic("topic-1") + span_list = self.memory_exporter.get_finished_spans() + self._assert_span_count(span_list, 1) + self._assert_topic(span_list[0], "topic-1")