Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: safe kafka partition extraction #872

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `opentelemetry-instrumentation-sqlite3` Instrumentation now works with `dbapi2.connect`

- `opentelemetry-instrumentation-kafka` Kafka: safe kafka partition extraction
([#872](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/872))

## [1.8.0-0.27b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.8.0-0.27b0) - 2021-12-17

### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,27 +55,36 @@ def extract_send_headers(args, kwargs):
@staticmethod
def extract_send_partition(instance, args, kwargs):
"""extract partition `send` method arguments, using the `_partition` method in KafkaProducer class"""
topic = KafkaPropertiesExtractor.extract_send_topic(args)
key = KafkaPropertiesExtractor.extract_send_key(args, kwargs)
value = KafkaPropertiesExtractor.extract_send_value(args, kwargs)
partition = KafkaPropertiesExtractor._extract_argument(
"partition", 4, None, args, kwargs
)
key_bytes = instance._serialize(
instance.config["key_serializer"], topic, key
)
value_bytes = instance._serialize(
instance.config["value_serializer"], topic, value
)
valid_types = (bytes, bytearray, memoryview, type(None))
if (
type(key_bytes) not in valid_types
or type(value_bytes) not in valid_types
):
try:
topic = KafkaPropertiesExtractor.extract_send_topic(args)
key = KafkaPropertiesExtractor.extract_send_key(args, kwargs)
value = KafkaPropertiesExtractor.extract_send_value(args, kwargs)
partition = KafkaPropertiesExtractor._extract_argument(
"partition", 4, None, args, kwargs
)
key_bytes = instance._serialize(
instance.config["key_serializer"], topic, key
)
value_bytes = instance._serialize(
instance.config["value_serializer"], topic, value
)
valid_types = (bytes, bytearray, memoryview, type(None))
if (
type(key_bytes) not in valid_types
or type(value_bytes) not in valid_types
):
return None

all_partitions = instance._metadata.partitions_for_topic(topic)
if all_partitions is None or len(all_partitions) == 0:
return None

return instance._partition(
topic, partition, key, value, key_bytes, value_bytes
)
except Exception as exception: # pylint: disable=W0703
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this affect users? In case this case does trigger, will we end up not recording a span, omitting some info from the span or will this affect the instrumented service or kafka client in some way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will affect the instrumented service - hence the need to protect it with try/except

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that. I'm asking how does this fix affect users? Will it result in missing spans, missing attributes on spans or something else entirely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing attribute - the partition

_LOG.debug("Unable to extract partition: %s", exception)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be info/warn so we can actually find and fix the issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the impact of not collecting this data is low, I would not want to "bother" the user with a warning. I can go with info, but debug is also ok - whatever you think is best.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to know what exactly in this function is brittle so we can fix it later instead of wrapping it in a try/except and forgetting about it forever.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely agree - that line that I've addedall_partitions = instance._metadata.partitions_for_topic(topic) would've solved the crash. But I wanted to be on the safe side

return None
return instance._partition(
topic, partition, key, value, key_bytes, value_bytes
)


ProduceHookT = Optional[Callable[[Span, List, Dict], None]]
Expand Down