Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add API version determination code for newer kafka brokers #2024

Closed
wants to merge 1 commit into from

Conversation

tvoinarovskyi
Copy link
Collaborator

@tvoinarovskyi tvoinarovskyi commented Mar 19, 2020

This change is Reviewable

Copy link
Collaborator

@jeffwidman jeffwidman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't actually understand the point of this _infer_broker_version_from_api_versions struct... it appears that it's checking the broker version, but that's not true because from Kafka 0.10 onwards we know broker version via the APIVersion protocol call... right?

So is this somehow doing a list of when new API protocol calls are supported by the broker? If so, then we should rename this function to something like _identify_supported_struct_for_broker_version()...

What am I missing here?

@tvoinarovskyi
Copy link
Collaborator Author

@jeffwidman The PR is bad, I copy-pasted and never checked, sorry =)

The problem is that we never changed the code to work with ApiVersions results. APIVersions give us explicitly all versions supported by the broker for each API call. Problem is, we never have such mapping for older clients (we only know it's 0.9.0.1, but not API versions). So to change the logic to use new mappings we need to do some refactoring. Another problem is that we don't support varying versions of the broker's inside of the cluster (if one node is 2.2.0 and the other is 2.3.1), we just assume they are all the same.

So, for now, we need code like this to make it broker-agnostic supporting older versions. Let's take this part in Producer:

        kwargs = {}
        if self.config['api_version'] >= (2, 1):
            version = 7
        elif self.config['api_version'] >= (2, 0):
            version = 6
        elif self.config['api_version'] >= (1, 1):
            version = 5
        elif self.config['api_version'] >= (1, 0):
            version = 4
        elif self.config['api_version'] >= (0, 11):
            version = 3
            kwargs = dict(transactional_id=None)
        elif self.config['api_version'] >= (0, 10):
            version = 2
        elif self.config['api_version'] == (0, 9):
            version = 1
        else:
            version = 0
        return ProduceRequest[version](
            required_acks=acks,
            timeout=timeout,
            topics=[(topic, list(partition_info.items()))
                    for topic, partition_info
                    in six.iteritems(produce_records_by_partition)],
            **kwargs
        )

If we assume all brokers are the same version a better way to do this would be something like:

        kwargs = {}
        elif self.config['api_version'] >= (0, 10):
            version = self._client.pick_best(ProduceRequest[0].API_KEY)
        elif self.config['api_version'] == (0, 9):
            version = 1
        else:
            version = 0
        if version >= 3:
            kwargs = dict(transactional_id=None)
        return ProduceRequest[version](
            required_acks=acks,
            timeout=timeout,
            topics=[(topic, list(partition_info.items()))
                    for topic, partition_info
                    in six.iteritems(produce_records_by_partition)],
            **kwargs
        )

Where pick_best would get the version from ApiVersions mapping we got from the first bootstrap.

@tvoinarovskyi
Copy link
Collaborator Author

tvoinarovskyi commented Mar 23, 2020

Kafka 0.10 onwards we know broker version via the APIVersion protocol call... right?

We don't ) We know versions of each API call supported by this broker.

@tvoinarovskyi
Copy link
Collaborator Author

ApiVersions Response (Version: 2) => error_code [api_keys] throttle_time_ms 
  error_code => INT16
  api_keys => api_key min_version max_version 
    api_key => INT16
    min_version => INT16
    max_version => INT16
  throttle_time_ms => INT32

It's just API_KEY to MIN and MAX supported version.

@gabriel-tincu
Copy link
Contributor

The way i see it a broker version to api version mapping and a reverse one is still needed, since both producer and consumer expose api_version as a constructor parameter, which means there's probably people relying on it (myself included). So i guess maybe keep the conditionals currently in place that do the broker -> api mapping and add the min max in as a fallback if the consumer / producer was built without a specified broker api_version. Hope I'm making sense here, since it's pretty late...

@edenhill
Copy link

FWIW, pre-ApiVersionRequest broker version to protocol mappings:
https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_feature.c#L234

@tvoinarovskyi
Copy link
Collaborator Author

@edenhill You are a saviour!

@jeffwidman
Copy link
Collaborator

jeffwidman commented Apr 11, 2020

@tvoinarovskyi are you planning to take another look at this?

It'd be nice to land this before #2021...

@gabriel-tincu
Copy link
Contributor

gabriel-tincu commented Apr 20, 2020

@tvoinarovskyi are you planning to take another look at this?

It'd be nice to land this before #2021...

@tvoinarovskyi @jeffwidman mind if i have a crack at it? Should have caught it in my initial PR anyway

@tvoinarovskyi
Copy link
Collaborator Author

closing in favour of #2038

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants