-
Notifications
You must be signed in to change notification settings - Fork 642
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: safe kafka partition extraction #872
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,27 +55,36 @@ def extract_send_headers(args, kwargs): | |
@staticmethod | ||
def extract_send_partition(instance, args, kwargs): | ||
"""extract partition `send` method arguments, using the `_partition` method in KafkaProducer class""" | ||
topic = KafkaPropertiesExtractor.extract_send_topic(args) | ||
key = KafkaPropertiesExtractor.extract_send_key(args, kwargs) | ||
value = KafkaPropertiesExtractor.extract_send_value(args, kwargs) | ||
partition = KafkaPropertiesExtractor._extract_argument( | ||
"partition", 4, None, args, kwargs | ||
) | ||
key_bytes = instance._serialize( | ||
instance.config["key_serializer"], topic, key | ||
) | ||
value_bytes = instance._serialize( | ||
instance.config["value_serializer"], topic, value | ||
) | ||
valid_types = (bytes, bytearray, memoryview, type(None)) | ||
if ( | ||
type(key_bytes) not in valid_types | ||
or type(value_bytes) not in valid_types | ||
): | ||
try: | ||
topic = KafkaPropertiesExtractor.extract_send_topic(args) | ||
key = KafkaPropertiesExtractor.extract_send_key(args, kwargs) | ||
value = KafkaPropertiesExtractor.extract_send_value(args, kwargs) | ||
partition = KafkaPropertiesExtractor._extract_argument( | ||
"partition", 4, None, args, kwargs | ||
) | ||
key_bytes = instance._serialize( | ||
instance.config["key_serializer"], topic, key | ||
) | ||
value_bytes = instance._serialize( | ||
instance.config["value_serializer"], topic, value | ||
) | ||
valid_types = (bytes, bytearray, memoryview, type(None)) | ||
if ( | ||
type(key_bytes) not in valid_types | ||
or type(value_bytes) not in valid_types | ||
): | ||
return None | ||
|
||
all_partitions = instance._metadata.partitions_for_topic(topic) | ||
if all_partitions is None or len(all_partitions) == 0: | ||
return None | ||
|
||
return instance._partition( | ||
topic, partition, key, value, key_bytes, value_bytes | ||
) | ||
except Exception as exception: # pylint: disable=W0703 | ||
_LOG.debug("Unable to extract partition: %s", exception) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be info/warn so we can actually find and fix the issue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the impact of not collecting this data is low, I would not want to "bother" the user with a warning. I can go with info, but debug is also ok - whatever you think is best. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice to know what exactly in this function is brittle so we can fix it later instead of wrapping it in a try/except and forgetting about it forever. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I definitely agree - that line that I've added |
||
return None | ||
return instance._partition( | ||
topic, partition, key, value, key_bytes, value_bytes | ||
) | ||
|
||
|
||
ProduceHookT = Optional[Callable[[Span, List, Dict], None]] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this affect users? In case this case does trigger, will we end up not recording a span, omitting some info from the span or will this affect the instrumented service or kafka client in some way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will affect the instrumented service - hence the need to protect it with try/except
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that. I'm asking how does this fix affect users? Will it result in missing spans, missing attributes on spans or something else entirely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing attribute - the partition