Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support MSK IAM Authentication #74

Merged
merged 7 commits into from
Nov 29, 2023
Merged

Conversation

danielpops
Copy link
Member

@danielpops danielpops commented Nov 3, 2023

Feature or Description

Roughly adapted from this upstream PR that was never merged.

Changes from that PR:

  • Dynamically determine region based on the broker endpoint. The above PR was relying on AWS_DEFAULT_REGION which may not always be set
  • Always call boto_session.get_credentials() to ensure that role credentials that can be automatically refreshed once they expire (from e.g. instance profile, container pod identity, boto profile that do role assumption, etc) will be automatically refreshed
  • Support passing down arbitrary kafka configuration properties through SimpleClient so fields like security_protocol and sasl_mechanism can be passed through to the BrokerConnection

Caveats

  • If authentication is not set up correctly, the current behavior is that a kafka.errors.NoBrokersAvailable: NoBrokersAvailable error is raised. Basically the failures to authenticate don't bubble up correctly and it just gets treated as if all brokers timed out or otherwise couldn't be connected. I think this can probably be fixed, but my initial attempt to fix it with minimal changes to the overall control flow was unsuccessful

Testing Done

KafkaProducer and KafkaConsumer

Producer

$ python
Python 3.10.6 (main, Mar 10 2023, 10:55:28) [GCC 11.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers="b-3.mycluster.8tc123.c5.kafka.us-west-2.amazonaws.com:9098,b-1.mycluster.8tc123.c5.kafka.us-west-2.amazonaws.com:9098,b-2.mycluster.8tc123.c5.kafka.us-west-2.amazonaws.com:9098", security_protocol="SASL_SSL", sasl_mechanism="AWS_MSK_IAM")
>>> producer.send('dpopes-iam-1', b'test message from Yelp/kafka-python')
<kafka.producer.future.FutureRecordMetadata object at 0x7f792f022680>
>>> producer.send('dpopes-iam-1', b'test message from Yelp/kafka-python')
<kafka.producer.future.FutureRecordMetadata object at 0x7f792f068a30>

Consumer

$ python
Python 3.10.6 (main, Mar 10 2023, 10:55:28) [GCC 11.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer('dpopes-iam-1', bootstrap_servers="b-2.mycluster.8tc123.c5.kafka.us-west-2.amazonaws.com:9098,b-3.mycluster.8tc123.c5.kafka.us-west-2.amazonaws.com:9098,b-1.mycluster.8tc123.c5.kafka.us-west-2.amazonaws.com:9098", security_protocol="SASL_SSL", sasl_mechanism="AWS_MSK_IAM")
>>> for message in consumer:
...  print(message)
...
ConsumerRecord(topic='dpopes-iam-1', partition=0, offset=282, timestamp=1698977783799, timestamp_type=0, key=None, value=b'test message from Yelp/kafka-python', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=35, serialized_header_size=-1)
ConsumerRecord(topic='dpopes-iam-1', partition=0, offset=283, timestamp=1698977788359, timestamp_type=0, key=None, value=b'test message from Yelp/kafka-python', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=35, serialized_header_size=-1)

SimpleClient Producer and Consumer

Producer

$ python
Python 3.10.6 (main, Mar 10 2023, 10:55:28) [GCC 11.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from kafka.client import SimpleClient
>>> client = SimpleClient(hosts="b-3.mycluster.8tc123.c5.kafka.us-west-2.amazonaws.com:9098,b-1.mycluster.8tc123.c5.kafka.us-west-2.amazonaws.com:9098,b-2.mycluster.8tc123.c5.kafka.us-west-2.amazonaws.com:9098", security_protocol="SASL_SSL", sasl_mechanism="AWS_MSK_IAM", api_version=None)
>>> from kafka.structs import ProduceRequestPayload
>>> import kafka.protocol.message
>>> client.send_produce_request([ProduceRequestPayload("dpopes-iam-1", 0, [kafka.protocol.message.Message(b"a")])])
[ProduceResponsePayload(topic='dpopes-iam-1', partition=0, error=0, offset=284)]
>>> client.send_produce_request([ProduceRequestPayload("dpopes-iam-1", 0, [kafka.protocol.message.Message(b"b")])])
[ProduceResponsePayload(topic='dpopes-iam-1', partition=0, error=0, offset=285)]

Consumer

$ python
Python 3.10.6 (main, Mar 10 2023, 10:55:28) [GCC 11.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from kafka.client import SimpleClient
>>> client = SimpleClient(hosts="b-3.mycluster.8tc123.c5.kafka.us-west-2.amazonaws.com:9098,b-1.mycluster.8tc123.c5.kafka.us-west-2.amazonaws.com:9098,b-2.mycluster.8tc123.c5.kafka.us-west-2.amazonaws.com:9098", security_protocol="SASL_SSL", sasl_mechanism="AWS_MSK_IAM", api_version=None)
>>> from kafka.structs import FetchRequestPayload
>>> client.send_fetch_request([FetchRequestPayload("dpopes-iam-1", 0, 284, 1024)])
[FetchResponsePayload(topic='dpopes-iam-1', partition=0, error=0, highwaterMark=286, messages=[OffsetAndMessage(offset=284, message=Message(crc=1373583922, magic=0, attributes=0, timestamp=None, key=None, value=b'a')), OffsetAndMessage(offset=285, message=Message(crc=-925471864, magic=0, attributes=0, timestamp=None, key=None, value=b'b')), OffsetAndMessage(offset=None, message=PartialMessage(b''))])]
>>> client.send_fetch_request([FetchRequestPayload("dpopes-iam-1", 0, 285, 1024)])
[FetchResponsePayload(topic='dpopes-iam-1', partition=0, error=0, highwaterMark=286, messages=[OffsetAndMessage(offset=285, message=Message(crc=-925471864, magic=0, attributes=0, timestamp=None, key=None, value=b'b')), OffsetAndMessage(offset=None, message=PartialMessage(b''))])]

Admin Client with KafkaClient

Create Topic

$ python
Python 3.10.6 (main, Mar 10 2023, 10:55:28) [GCC 11.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from kafka.client_async import KafkaClient
>>> from kafka.admin_client import AdminClient
>>> kafka_client = KafkaClient(bootstrap_servers="b-3.mycluster.8tc123.c5.kafka.us-west-2.amazonaws.com:9098,b-1.mycluster.8tc123.c5.kafka.us-west-2.amazonaws.com:9098,b-2.mycluster.8tc123.c5.kafka.us-west-2.amazonaws.com:9098", security_protocol="SASL_SSL", sasl_mechanism="AWS_MSK_IAM")
>>> admin_client = AdminClient(kafka_client)
>>> admin_client.create_topics([NewTopic("dpopes-iam-3", 1, 1)], 100)
[CreateTopicsResponse_v0(topic_errors=[(topic='dpopes-iam-3', error_code=0)])]

Copy link

@bobtfish bobtfish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

kafka/conn.py Show resolved Hide resolved
kafka/msk.py Outdated Show resolved Hide resolved
Copy link

@mhaseebmlk mhaseebmlk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have one question, LGTM otherwise.

@danielpops
Copy link
Member Author

@mhaseebmlk @sofyat are there any special considerations for deploying a new version with these changes? Can I just merge to master and expect to see a new version?

@mhaseebmlk
Copy link

@mhaseebmlk @sofyat are there any special considerations for deploying a new version with these changes? Can I just merge to master and expect to see a new version?

@danielpops I think you might want to update the version here (https://github.com/Yelp/kafka-python/blob/master/kafka/version.py) to '1.4.7.post5. Streaming Platform own this repo, so let's confirm with @gkousouris if that's needed?

@danielpops danielpops changed the title Support MSK Authentication Support MSK IAM Authentication Nov 23, 2023
@gkousouris
Copy link

@mhaseebmlk is correct, we need to push the tag and then manually push the package.
on up-to-date master: python setup.py sdist bdist_wheel and then upload-to-pypi dist/.

However, one of our longer term plans is to not use kafka-python for producing / consuming, but rather use confluent-kafka (a librdkafka wrapper) directly. Only use yelp-kafka (or a similar tool if we want to completely replace it) for discovery of kafka brokers as a utilities library but do the kafka-related stuff from confluent-kafka.

Reason is that MSK uses Kafka versions 2.8+ which is not explicitly supported by kafka-python (+ it is no longer maintained).

Having said that, it seems that there's a pushback to not release MSK IAM authentication for librdkafka (see comment). We might need to use a fork that supports it

@danielpops
Copy link
Member Author

@gkousouris thanks!

Having said that, it seems that there's a pushback to not release MSK IAM authentication for librdkafka (see confluentinc/librdkafka#3496 (comment)). We might need to use a fork that supports it

FYI AWS has recently released support for the SASL_OAUTHBEARER mechanism, which means it may no longer be necessary for any changes in librdkafka to enable support for IAM authentication to MSK. Announcement and docs.

Having said that, since afaict a large majority of Yelp use cases are using yelp_kafka and/or kafka-python, I'd like to ship this PR as is to validate that we can migrate at least one extant MSK use case to opt in to MSK with IAM Auth. Once that's been proven out with these libraries, it should be relatively straightforward to do something similar with the librdkafka + SASL_OAUTHBEARER mechanism.

Copy link

@gkousouris gkousouris left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good!

@danielpops danielpops merged commit 427de2d into master Nov 29, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants