From 0ca73b4be4c8c1105cc34722f1699850b0d44a98 Mon Sep 17 00:00:00 2001 From: Ran Nozik Date: Mon, 24 Jan 2022 08:58:47 +0200 Subject: [PATCH 1/2] safe partition extraction --- .../instrumentation/kafka/utils.py | 49 +++++++++++-------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py b/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py index 858e879a42..019369b42e 100644 --- a/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py @@ -55,27 +55,36 @@ def extract_send_headers(args, kwargs): @staticmethod def extract_send_partition(instance, args, kwargs): """extract partition `send` method arguments, using the `_partition` method in KafkaProducer class""" - topic = KafkaPropertiesExtractor.extract_send_topic(args) - key = KafkaPropertiesExtractor.extract_send_key(args, kwargs) - value = KafkaPropertiesExtractor.extract_send_value(args, kwargs) - partition = KafkaPropertiesExtractor._extract_argument( - "partition", 4, None, args, kwargs - ) - key_bytes = instance._serialize( - instance.config["key_serializer"], topic, key - ) - value_bytes = instance._serialize( - instance.config["value_serializer"], topic, value - ) - valid_types = (bytes, bytearray, memoryview, type(None)) - if ( - type(key_bytes) not in valid_types - or type(value_bytes) not in valid_types - ): + try: + topic = KafkaPropertiesExtractor.extract_send_topic(args) + key = KafkaPropertiesExtractor.extract_send_key(args, kwargs) + value = KafkaPropertiesExtractor.extract_send_value(args, kwargs) + partition = KafkaPropertiesExtractor._extract_argument( + "partition", 4, None, args, kwargs + ) + key_bytes = instance._serialize( + instance.config["key_serializer"], topic, key + ) + value_bytes = instance._serialize( + instance.config["value_serializer"], topic, value + ) + valid_types = (bytes, bytearray, memoryview, type(None)) + if ( + type(key_bytes) not in valid_types + or type(value_bytes) not in valid_types + ): + return None + + all_partitions = instance._metadata.partitions_for_topic(topic) + if all_partitions is None or len(all_partitions) == 0: + return None + + return instance._partition( + topic, partition, key, value, key_bytes, value_bytes + ) + except Exception as exception: # pylint: disable=W0703 + _LOG.debug("Unable to extract partition: %s", exception) return None - return instance._partition( - topic, partition, key, value, key_bytes, value_bytes - ) ProduceHookT = Optional[Callable[[Span, List, Dict], None]] From 688941a63ba6716cf2da920b5c4423690906ab7c Mon Sep 17 00:00:00 2001 From: Ran Nozik Date: Wed, 26 Jan 2022 17:01:16 +0200 Subject: [PATCH 2/2] update changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 053d96b159..195afa194e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-sqlite3` Instrumentation now works with `dbapi2.connect` +- `opentelemetry-instrumentation-kafka` Kafka: safe kafka partition extraction + ([#872](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/872)) + ## [1.8.0-0.27b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.8.0-0.27b0) - 2021-12-17 ### Added