From af6e1034643db4c1e980cd4bef491c2789490c61 Mon Sep 17 00:00:00 2001 From: Daniel Popescu Date: Tue, 31 Oct 2023 20:03:30 -0700 Subject: [PATCH 1/7] MSK IAM Authentication implementation --- kafka/conn.py | 50 ++++++++++++- kafka/msk.py | 197 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 246 insertions(+), 1 deletion(-) create mode 100644 kafka/msk.py diff --git a/kafka/conn.py b/kafka/conn.py index a06de4910..d352c84a9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -25,6 +25,7 @@ import kafka.errors as Errors from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate +from kafka.msk import AwsMskIamClient from kafka.oauth.abstract import AbstractTokenProvider from kafka.protocol.admin import SaslHandShakeRequest from kafka.protocol.commit import OffsetFetchRequest @@ -81,6 +82,12 @@ class SSLWantWriteError(Exception): gssapi = None GSSError = None +# needed for AWS_MSK_IAM authentication: +try: + from botocore.session import Session as BotoSession +except ImportError: + # no botocore available, will disable AWS_MSK_IAM mechanism + BotoSession = None AFI_NAMES = { socket.AF_UNSPEC: "unspecified", @@ -224,7 +231,7 @@ class BrokerConnection(object): 'sasl_oauth_token_provider': None } SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') - SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER') + SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', 'AWS_MSK_IAM') def __init__(self, host, port, afi, **configs): self.host = host @@ -269,6 +276,11 @@ def __init__(self, host, port, afi, **configs): token_provider = self.config['sasl_oauth_token_provider'] assert token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl' assert callable(getattr(token_provider, "token", None)), 'sasl_oauth_token_provider must implement method #token()' + + if self.config['sasl_mechanism'] == 'AWS_MSK_IAM': + assert BotoSession is not None, 'AWS_MSK_IAM requires the "botocore" package' + assert self.config['security_protocol'] == 'SASL_SSL', 'AWS_MSK_IAM requires SASL_SSL' + # This is not a general lock / this class is not generally thread-safe yet # However, to avoid pushing responsibility for maintaining # per-connection locks to the upstream client, we will use this lock to @@ -552,6 +564,8 @@ def _handle_sasl_handshake_response(self, future, response): return self._try_authenticate_gssapi(future) elif self.config['sasl_mechanism'] == 'OAUTHBEARER': return self._try_authenticate_oauth(future) + elif self.config['sasl_mechanism'] == 'AWS_MSK_IAM': + return self._try_authenticate_aws_msk_iam(future) else: return future.failure( Errors.UnsupportedSaslMechanismError( @@ -652,6 +666,40 @@ def _try_authenticate_plain(self, future): log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username']) return future.success(True) + def _try_authenticate_aws_msk_iam(self, future): + session = BotoSession() + client = AwsMskIamClient( + host=self.host, + boto_session=session, + ) + + msg = client.first_message() + size = Int32.encode(len(msg)) + + err = None + close = False + with self._lock: + if not self._can_send_recv(): + err = Errors.NodeNotReadyError(str(self)) + close = False + else: + try: + self._send_bytes_blocking(size + msg) + data = self._recv_bytes_blocking(4) + data = self._recv_bytes_blocking(struct.unpack('4B', data)[-1]) + except (ConnectionError, TimeoutError) as e: + log.exception("%s: Error receiving reply from server", self) + err = Errors.KafkaConnectionError("%s: %s" % (self, e)) + close = True + + if err is not None: + if close: + self.close(error=err) + return future.failure(err) + + log.info('%s: Authenticated via AWS_MSK_IAM %s', self, data.decode('utf-8')) + return future.success(True) + def _try_authenticate_gssapi(self, future): kerberos_damin_name = self.config['sasl_kerberos_domain_name'] or self.host auth_id = self.config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name diff --git a/kafka/msk.py b/kafka/msk.py new file mode 100644 index 000000000..3351c1409 --- /dev/null +++ b/kafka/msk.py @@ -0,0 +1,197 @@ +import datetime +import hashlib +import hmac +import json +import string + +from kafka.vendor.six.moves import urllib + + +class AwsMskIamClient: + UNRESERVED_CHARS = string.ascii_letters + string.digits + '-._~' + + def __init__(self, host, boto_session): + """ + Arguments: + host (str): The hostname of the broker. + boto_session (botocore.BotoSession) the boto session + """ + self.algorithm = 'AWS4-HMAC-SHA256' + self.expires = '900' + self.hashfunc = hashlib.sha256 + self.headers = [ + ('host', host) + ] + self.version = '2020_10_22' + + self.service = 'kafka-cluster' + self.action = '{}:Connect'.format(self.service) + + now = datetime.datetime.utcnow() + self.datestamp = now.strftime('%Y%m%d') + self.timestamp = now.strftime('%Y%m%dT%H%M%SZ') + + self.host = host + self.boto_session = boto_session + + @property + def access_key(self): + return self.boto_session.get_credentials().access_key + + @property + def secret_key(self): + return self.boto_session.get_credentials().secret_key + + @property + def token(self): + return self.boto_session.get_credentials().token + + @property + def region(self): + # TODO: This logic is not perfect and should be revisited + for host in self.host.split(','): + if 'amazonaws.com' in host: + return host.split('.')[-3] + return 'us-west-2' + + @property + def _credential(self): + return '{0.access_key}/{0._scope}'.format(self) + + @property + def _scope(self): + return '{0.datestamp}/{0.region}/{0.service}/aws4_request'.format(self) + + @property + def _signed_headers(self): + """ + Returns (str): + An alphabetically sorted, semicolon-delimited list of lowercase + request header names. + """ + return ';'.join(sorted(k.lower() for k, _ in self.headers)) + + @property + def _canonical_headers(self): + """ + Returns (str): + A newline-delited list of header names and values. + Header names are lowercased. + """ + return '\n'.join(map(':'.join, self.headers)) + '\n' + + @property + def _canonical_request(self): + """ + Returns (str): + An AWS Signature Version 4 canonical request in the format: + \n + \n + \n + \n + \n + + """ + # The hashed_payload is always an empty string for MSK. + hashed_payload = self.hashfunc(b'').hexdigest() + return '\n'.join(( + 'GET', + '/', + self._canonical_querystring, + self._canonical_headers, + self._signed_headers, + hashed_payload, + )) + + @property + def _canonical_querystring(self): + """ + Returns (str): + A '&'-separated list of URI-encoded key/value pairs. + """ + params = [] + params.append(('Action', self.action)) + params.append(('X-Amz-Algorithm', self.algorithm)) + params.append(('X-Amz-Credential', self._credential)) + params.append(('X-Amz-Date', self.timestamp)) + params.append(('X-Amz-Expires', self.expires)) + if self.token: + params.append(('X-Amz-Security-Token', self.token)) + params.append(('X-Amz-SignedHeaders', self._signed_headers)) + + return '&'.join(self._uriencode(k) + '=' + self._uriencode(v) for k, v in params) + + @property + def _signing_key(self): + """ + Returns (bytes): + An AWS Signature V4 signing key generated from the secret_key, date, + region, service, and request type. + """ + key = self._hmac(('AWS4' + self.secret_key).encode('utf-8'), self.datestamp) + key = self._hmac(key, self.region) + key = self._hmac(key, self.service) + key = self._hmac(key, 'aws4_request') + return key + + @property + def _signing_str(self): + """ + Returns (str): + A string used to sign the AWS Signature V4 payload in the format: + \n + \n + \n + + """ + canonical_request_hash = self.hashfunc(self._canonical_request.encode('utf-8')).hexdigest() + return '\n'.join((self.algorithm, self.timestamp, self._scope, canonical_request_hash)) + + def _uriencode(self, msg): + """ + Arguments: + msg (str): A string to URI-encode. + + Returns (str): + The URI-encoded version of the provided msg, following the encoding + rules specified: https://github.com/aws/aws-msk-iam-auth#uriencode + """ + return urllib.parse.quote(msg, safe=self.UNRESERVED_CHARS) + + def _hmac(self, key, msg): + """ + Arguments: + key (bytes): A key to use for the HMAC digest. + msg (str): A value to include in the HMAC digest. + Returns (bytes): + An HMAC digest of the given key and msg. + """ + return hmac.new(key, msg.encode('utf-8'), digestmod=self.hashfunc).digest() + + def first_message(self): + """ + Returns (bytes): + An encoded JSON authentication payload that can be sent to the + broker. + """ + signature = hmac.new( + self._signing_key, + self._signing_str.encode('utf-8'), + digestmod=self.hashfunc, + ).hexdigest() + msg = { + 'version': self.version, + 'host': self.host, + 'user-agent': 'kafka-python', + 'action': self.action, + 'x-amz-algorithm': self.algorithm, + 'x-amz-credential': self._credential, + 'x-amz-date': self.timestamp, + 'x-amz-signedheaders': self._signed_headers, + 'x-amz-expires': self.expires, + 'x-amz-signature': signature, + } + if self.token: + msg['x-amz-security-token'] = self.token + + return json.dumps(msg, separators=(',', ':')).encode('utf-8') From fc6b277eb90a2ba8f0820beefc08cf8b0296c781 Mon Sep 17 00:00:00 2001 From: Daniel Popescu Date: Thu, 2 Nov 2023 19:00:30 -0700 Subject: [PATCH 2/7] Support arbitrary kafka configuration in SimpleClient to support things like authentication --- kafka/client.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/kafka/client.py b/kafka/client.py index b65019f0b..c66c52bed 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -76,7 +76,7 @@ class SimpleClient(object): # socket timeout. def __init__(self, hosts, client_id=CLIENT_ID, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS, - correlation_id=0, metrics=None): + correlation_id=0, metrics=None, **kwargs): # We need one connection to bootstrap self.client_id = client_id self.timeout = timeout @@ -90,6 +90,10 @@ def __init__(self, hosts, client_id=CLIENT_ID, self.topics_to_brokers = {} # TopicPartition -> BrokerMetadata self.topic_partitions = {} # topic -> partition -> leader + # Support arbitrary kwargs to be provided as config to BrokerConnection + # This will allow advanced features like Authentication to work + self.config = kwargs + self.load_metadata_for_topics() # bootstrap with all metadata ################## @@ -108,6 +112,7 @@ def _get_conn(self, host, port, afi, node_id='bootstrap'): metrics=self._metrics_registry, metric_group_prefix='simple-client', node_id=node_id, + **self.config, ) conn = self._conns[host_key] From 950045041fa3e6f534c140b01b32270e0d85c1b0 Mon Sep 17 00:00:00 2001 From: Daniel Popescu Date: Mon, 13 Nov 2023 18:04:00 -0800 Subject: [PATCH 3/7] Add tests from upstream PR --- test/test_msk.py | 67 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 test/test_msk.py diff --git a/test/test_msk.py b/test/test_msk.py new file mode 100644 index 000000000..d4620e39e --- /dev/null +++ b/test/test_msk.py @@ -0,0 +1,67 @@ +import datetime +import json + +from kafka.msk import AwsMskIamClient + +try: + from unittest import mock +except ImportError: + import mock + + +def client_factory(token=None): + now = datetime.datetime.utcfromtimestamp(1629321911) + with mock.patch('kafka.msk.datetime') as mock_dt: + mock_dt.datetime.utcnow = mock.Mock(return_value=now) + return AwsMskIamClient( + host='localhost', + access_key='XXXXXXXXXXXXXXXXXXXX', + secret_key='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', + region='us-east-1', + token=token, + ) + + +def test_aws_msk_iam_client_permanent_credentials(): + client = client_factory(token=None) + msg = client.first_message() + assert msg + assert isinstance(msg, bytes) + actual = json.loads(msg) + + expected = { + 'version': '2020_10_22', + 'host': 'localhost', + 'user-agent': 'kafka-python', + 'action': 'kafka-cluster:Connect', + 'x-amz-algorithm': 'AWS4-HMAC-SHA256', + 'x-amz-credential': 'XXXXXXXXXXXXXXXXXXXX/20210818/us-east-1/kafka-cluster/aws4_request', + 'x-amz-date': '20210818T212511Z', + 'x-amz-signedheaders': 'host', + 'x-amz-expires': '900', + 'x-amz-signature': '0fa42ae3d5693777942a7a4028b564f0b372bafa2f71c1a19ad60680e6cb994b', + } + assert actual == expected + + +def test_aws_msk_iam_client_temporary_credentials(): + client = client_factory(token='XXXXX') + msg = client.first_message() + assert msg + assert isinstance(msg, bytes) + actual = json.loads(msg) + + expected = { + 'version': '2020_10_22', + 'host': 'localhost', + 'user-agent': 'kafka-python', + 'action': 'kafka-cluster:Connect', + 'x-amz-algorithm': 'AWS4-HMAC-SHA256', + 'x-amz-credential': 'XXXXXXXXXXXXXXXXXXXX/20210818/us-east-1/kafka-cluster/aws4_request', + 'x-amz-date': '20210818T212511Z', + 'x-amz-signedheaders': 'host', + 'x-amz-expires': '900', + 'x-amz-signature': 'b0619c50b7ecb4a7f6f92bd5f733770df5710e97b25146f97015c0b1db783b05', + 'x-amz-security-token': 'XXXXX', + } + assert actual == expected From c12044802d456d31a38546e517a56f25e5c5e168 Mon Sep 17 00:00:00 2001 From: Daniel Popescu Date: Tue, 14 Nov 2023 16:45:02 -0800 Subject: [PATCH 4/7] Make tests less verbose and mock botocore --- test/test_msk.py | 67 ++++++++++++++++++------------------------------ 1 file changed, 25 insertions(+), 42 deletions(-) diff --git a/test/test_msk.py b/test/test_msk.py index d4620e39e..72b9737c4 100644 --- a/test/test_msk.py +++ b/test/test_msk.py @@ -1,5 +1,9 @@ import datetime import json +import sys + +import pytest +from unittest import TestCase from kafka.msk import AwsMskIamClient @@ -9,59 +13,38 @@ import mock -def client_factory(token=None): - now = datetime.datetime.utcfromtimestamp(1629321911) - with mock.patch('kafka.msk.datetime') as mock_dt: - mock_dt.datetime.utcnow = mock.Mock(return_value=now) - return AwsMskIamClient( - host='localhost', - access_key='XXXXXXXXXXXXXXXXXXXX', - secret_key='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', - region='us-east-1', - token=token, - ) - +@pytest.fixture(params=[{'session_token': 'session_token', 'host': 'localhost'}, {'session_token': None, 'host': 'localhost.us-east-1.amazonaws.com'}]) +def msk_client(request): + # To avoid a package dependency on the optional botocore library, we mock the module out + sys.modules['botocore.session'] = mock.MagicMock() + from botocore.session import Session # pylint: disable=import-error -def test_aws_msk_iam_client_permanent_credentials(): - client = client_factory(token=None) - msg = client.first_message() - assert msg - assert isinstance(msg, bytes) - actual = json.loads(msg) - - expected = { - 'version': '2020_10_22', - 'host': 'localhost', - 'user-agent': 'kafka-python', - 'action': 'kafka-cluster:Connect', - 'x-amz-algorithm': 'AWS4-HMAC-SHA256', - 'x-amz-credential': 'XXXXXXXXXXXXXXXXXXXX/20210818/us-east-1/kafka-cluster/aws4_request', - 'x-amz-date': '20210818T212511Z', - 'x-amz-signedheaders': 'host', - 'x-amz-expires': '900', - 'x-amz-signature': '0fa42ae3d5693777942a7a4028b564f0b372bafa2f71c1a19ad60680e6cb994b', - } - assert actual == expected + session = Session() + session.get_credentials = mock.MagicMock(return_value=mock.MagicMock(id='the_actual_credentials', access_key='akia', secret_key='secret', token=request.param['session_token'])) + yield AwsMskIamClient( + host=request.param["host"], + boto_session = session, + ) -def test_aws_msk_iam_client_temporary_credentials(): - client = client_factory(token='XXXXX') - msg = client.first_message() +def test_aws_msk_iam(msk_client): + msg = msk_client.first_message() assert msg assert isinstance(msg, bytes) - actual = json.loads(msg) + actual = json.loads(msg.decode('utf-8')) expected = { 'version': '2020_10_22', - 'host': 'localhost', + 'host': msk_client.host, 'user-agent': 'kafka-python', 'action': 'kafka-cluster:Connect', 'x-amz-algorithm': 'AWS4-HMAC-SHA256', - 'x-amz-credential': 'XXXXXXXXXXXXXXXXXXXX/20210818/us-east-1/kafka-cluster/aws4_request', - 'x-amz-date': '20210818T212511Z', + 'x-amz-credential': '{}/{}/{}/kafka-cluster/aws4_request'.format(msk_client.access_key, datetime.datetime.utcnow().strftime('%Y%m%d'), 'us-west-2' if msk_client.host == 'localhost' else 'us-east-1'), + 'x-amz-date': mock.ANY, 'x-amz-signedheaders': 'host', 'x-amz-expires': '900', - 'x-amz-signature': 'b0619c50b7ecb4a7f6f92bd5f733770df5710e97b25146f97015c0b1db783b05', - 'x-amz-security-token': 'XXXXX', + 'x-amz-signature': mock.ANY, } - assert actual == expected + if msk_client.token: + expected['x-amz-security-token'] = msk_client.token + TestCase().assertEqual(actual, expected) From 5ab4bbfc16a7fdc5b5cdbd50eafe7bcdaaff8986 Mon Sep 17 00:00:00 2001 From: Daniel Popescu Date: Wed, 15 Nov 2023 19:00:58 -0800 Subject: [PATCH 5/7] Update region resolution logic and refactor / add more tests --- kafka/msk.py | 19 ++++++++-- test/test_msk.py | 93 ++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 99 insertions(+), 13 deletions(-) diff --git a/kafka/msk.py b/kafka/msk.py index 3351c1409..a6d8e772d 100644 --- a/kafka/msk.py +++ b/kafka/msk.py @@ -34,6 +34,11 @@ def __init__(self, host, boto_session): self.host = host self.boto_session = boto_session + # This will raise if the region can't be determined + # Do this during init instead of waiting for failures downstream + if self.region: + pass + @property def access_key(self): return self.boto_session.get_credentials().access_key @@ -48,11 +53,21 @@ def token(self): @property def region(self): - # TODO: This logic is not perfect and should be revisited + # Try to get the region information from the broker hostname for host in self.host.split(','): if 'amazonaws.com' in host: return host.split('.')[-3] - return 'us-west-2' + + # If the region can't be determined from hostname, try the boto session + # This will only have a value if: + # - `AWS_DEFAULT_REGION` environment variable is set + # - `~/.aws/config` region variable is set + region = self.boto_session.get_config_variable('region') + if region: + return region + + # Otherwise give up + raise Exception('Could not determine region from broker host(s) or aws configuration') @property def _credential(self): diff --git a/test/test_msk.py b/test/test_msk.py index 72b9737c4..0e5c414de 100644 --- a/test/test_msk.py +++ b/test/test_msk.py @@ -13,21 +13,92 @@ import mock -@pytest.fixture(params=[{'session_token': 'session_token', 'host': 'localhost'}, {'session_token': None, 'host': 'localhost.us-east-1.amazonaws.com'}]) -def msk_client(request): +@pytest.fixture +def boto_session(): # To avoid a package dependency on the optional botocore library, we mock the module out sys.modules['botocore.session'] = mock.MagicMock() from botocore.session import Session # pylint: disable=import-error - session = Session() - session.get_credentials = mock.MagicMock(return_value=mock.MagicMock(id='the_actual_credentials', access_key='akia', secret_key='secret', token=request.param['session_token'])) - yield AwsMskIamClient( - host=request.param["host"], - boto_session = session, + boto_session = Session() + boto_session.get_credentials = mock.MagicMock(return_value=mock.MagicMock(id='the_actual_credentials', access_key='akia', secret_key='secret', token=None)) + yield boto_session + + +def test_aws_msk_iam_region_from_config(boto_session): + # Region determined by configuration + boto_session.get_config_variable = mock.MagicMock(return_value='us-west-2') + msk_client = AwsMskIamClient( + host='localhost', + boto_session = boto_session, ) + msg = msk_client.first_message() + assert msg + assert isinstance(msg, bytes) + actual = json.loads(msg.decode('utf-8')) + + expected = { + 'version': '2020_10_22', + 'host': msk_client.host, + 'user-agent': 'kafka-python', + 'action': 'kafka-cluster:Connect', + 'x-amz-algorithm': 'AWS4-HMAC-SHA256', + 'x-amz-credential': '{}/{}/us-west-2/kafka-cluster/aws4_request'.format(msk_client.access_key, datetime.datetime.utcnow().strftime('%Y%m%d')), + 'x-amz-date': mock.ANY, + 'x-amz-signedheaders': 'host', + 'x-amz-expires': '900', + 'x-amz-signature': mock.ANY, + } + TestCase().assertEqual(actual, expected) -def test_aws_msk_iam(msk_client): +def test_aws_msk_iam_region_from_hostname(boto_session): + # Region determined by hostname + msk_client = AwsMskIamClient( + host='localhost.us-east-1.amazonaws.com', + boto_session = boto_session, + ) + msg = msk_client.first_message() + assert msg + assert isinstance(msg, bytes) + actual = json.loads(msg.decode('utf-8')) + + expected = { + 'version': '2020_10_22', + 'host': msk_client.host, + 'user-agent': 'kafka-python', + 'action': 'kafka-cluster:Connect', + 'x-amz-algorithm': 'AWS4-HMAC-SHA256', + 'x-amz-credential': '{}/{}/us-east-1/kafka-cluster/aws4_request'.format(msk_client.access_key, datetime.datetime.utcnow().strftime('%Y%m%d')), + 'x-amz-date': mock.ANY, + 'x-amz-signedheaders': 'host', + 'x-amz-expires': '900', + 'x-amz-signature': mock.ANY, + } + TestCase().assertEqual(actual, expected) + + +def test_aws_msk_iam_no_region(boto_session): + # No region from config + boto_session.get_config_variable = mock.MagicMock(return_value=None) + + with TestCase().assertRaises(Exception) as e: + # No region from hostname + msk_client = AwsMskIamClient( + host='localhost', + boto_session = boto_session, + ) + assert 'Could not determine region from broker host(s) or aws configuration' == str(e.exception) + + +@pytest.mark.parametrize('session_token', [(None), ('the_token')]) +def test_aws_msk_iam_permanent_and_temporary_credentials(session_token, request): + boto_session = request.getfixturevalue('boto_session') + if session_token: + boto_session.get_credentials.return_value.token = session_token + msk_client = AwsMskIamClient( + host='localhost.us-east-1.amazonaws.com', + boto_session = boto_session, + ) msg = msk_client.first_message() assert msg assert isinstance(msg, bytes) @@ -39,12 +110,12 @@ def test_aws_msk_iam(msk_client): 'user-agent': 'kafka-python', 'action': 'kafka-cluster:Connect', 'x-amz-algorithm': 'AWS4-HMAC-SHA256', - 'x-amz-credential': '{}/{}/{}/kafka-cluster/aws4_request'.format(msk_client.access_key, datetime.datetime.utcnow().strftime('%Y%m%d'), 'us-west-2' if msk_client.host == 'localhost' else 'us-east-1'), + 'x-amz-credential': '{}/{}/us-east-1/kafka-cluster/aws4_request'.format(msk_client.access_key, datetime.datetime.utcnow().strftime('%Y%m%d')), 'x-amz-date': mock.ANY, 'x-amz-signedheaders': 'host', 'x-amz-expires': '900', 'x-amz-signature': mock.ANY, } - if msk_client.token: - expected['x-amz-security-token'] = msk_client.token + if session_token: + expected['x-amz-security-token'] = session_token TestCase().assertEqual(actual, expected) From beaee8602c97d7961853785676af6150a9fa7dab Mon Sep 17 00:00:00 2001 From: Daniel Popescu Date: Wed, 15 Nov 2023 19:04:42 -0800 Subject: [PATCH 6/7] Use different exception type --- kafka/msk.py | 3 ++- test/test_msk.py | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/kafka/msk.py b/kafka/msk.py index a6d8e772d..e058c286e 100644 --- a/kafka/msk.py +++ b/kafka/msk.py @@ -4,6 +4,7 @@ import json import string +from kafka.errors import IllegalArgumentError from kafka.vendor.six.moves import urllib @@ -67,7 +68,7 @@ def region(self): return region # Otherwise give up - raise Exception('Could not determine region from broker host(s) or aws configuration') + raise IllegalArgumentError('Could not determine region from broker host(s) or aws configuration') @property def _credential(self): diff --git a/test/test_msk.py b/test/test_msk.py index 0e5c414de..69855fbb9 100644 --- a/test/test_msk.py +++ b/test/test_msk.py @@ -5,6 +5,7 @@ import pytest from unittest import TestCase +from kafka.errors import IllegalArgumentError from kafka.msk import AwsMskIamClient try: @@ -81,13 +82,13 @@ def test_aws_msk_iam_no_region(boto_session): # No region from config boto_session.get_config_variable = mock.MagicMock(return_value=None) - with TestCase().assertRaises(Exception) as e: + with TestCase().assertRaises(IllegalArgumentError) as e: # No region from hostname msk_client = AwsMskIamClient( host='localhost', boto_session = boto_session, ) - assert 'Could not determine region from broker host(s) or aws configuration' == str(e.exception) + assert 'IllegalArgumentError: Could not determine region from broker host(s) or aws configuration' == str(e.exception) @pytest.mark.parametrize('session_token', [(None), ('the_token')]) From c062282be24dbaef619eb7fcd0c1ac9296b8ac0e Mon Sep 17 00:00:00 2001 From: dpopes Date: Wed, 29 Nov 2023 08:49:39 -0800 Subject: [PATCH 7/7] Bump version to post5 --- kafka/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/version.py b/kafka/version.py index ccf7bda8c..e47fd1c67 100644 --- a/kafka/version.py +++ b/kafka/version.py @@ -1 +1 @@ -__version__ = '1.4.7.post4' +__version__ = '1.4.7.post5'