Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trying to consume with oauthbear but issues with AbstractTokenProvider maybee #691

Open
davidmontgom opened this issue Nov 27, 2020 · 5 comments
Labels

Comments

@davidmontgom
Copy link

davidmontgom commented Nov 27, 2020

Hi,

python-kafka simply works with oauthbearer. With aiokafka there are zero examples I am flying blind.

Did I impelent CustomTokenProvider correct? the create_ssl_context() function is what i use in python-kafka

Below is the error I get:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/conn.py", line 376, in _on_read_task_error
    read_task.result()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/conn.py", line 512, in _read
    resp = await reader.readexactly(4)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/streams.py", line 677, in readexactly
    raise IncompleteReadError(incomplete, n)
asyncio.streams.IncompleteReadError: 0 bytes read on a total of 4 expected bytes

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/Documents/workspace/hedge-project/nasdaqstreamer/ai.py", line 109, in <module>
    loop.run_until_complete(consume())
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "/Users//Documents/workspace/hedge-project/nasdaqstreamer/ai.py", line 102, in consume
    await consumer.start()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 341, in start
    await self._client.bootstrap()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/client.py", line 215, in bootstrap
    version_hint=version_hint)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/conn.py", line 97, in create_conn
    await conn.connect()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/conn.py", line 235, in connect
    await self._do_sasl_handshake()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/aiokafka/conn.py", line 316, in _do_sasl_handshake
    payload, expect_response
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py", line 416, in wait_for
    return fut.result()
kafka.errors.KafkaConnectionError: KafkaConnectionError: Connection at test.com:9094 closed
{"access_token":"eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJtNDV5a1RZRXhXZzdfZmUybHdBcVRDSGV2bnQtMWMwNjRiTTl2UlN3NVY0In0.eyJleHAiOjE2MDcxMjQzNTYsImlhdCI6MTYwNjUxOTU1NiwianRpIjoiN2M3OGU0ZmItNmQ0ZC00YTAyLWI1YzYtZGVmOTA1OGY0MmY0IiwiaXNzIjoiaHR0cHM6Ly9jbG91ZGRhdGFzZXJ2aWNlLmF1dGgubmFzZGFxLmNvbS9hdXRoL3JlYWxtcy9wcm8tcmVhbG0iLCJzdWIiOiI0YzIzOTNhOC0yM2FmLTQ1MDc
0NDU5ZDJhM2E5IiwiaXNzIjoiaHR0cHM6Ly9jbG91ZGRhdGFzZXJ2aWNlLmF1dGgubmFzZGFxLmNvbS9hdXRoL3JlYWxtcy9wcm8tcmVhbG0iLCJhdWQiOiJodHRwczovL2Nsb3VkZGF0YXNlcnZpY2UuYXV0aC5uYXNkYXEuY29tL2F1dGgvcmVhbG1zL3Byby1yZWFsbSIsInN1YiI6IjRjMjM5M2E4LTIzYWYtNDUwNy1iYmI2LTFlZTkwODhmZDBiMiIsInR5cCI6IlJlZnJlc2giLCJhenAiOiJzeW5lcmdpc2NhcGl0YWwtZGF2aWQtc2Nob29sZXkiLCJzZXNzaW9uX3N0YXRlIjoiZmQ2NDkyZTMtMzRmMi00ZjEzLWI5MDgtM2M1NmU1OWQwYWYzIiwic2NvcGUiOiJlbWFpbCBwcm9maWxlIn0.5zZw0OZCuDUKzjWVZ-23s-DagXhOtzsvefJ9q8Q7vXA","token_type":"bearer","not-before-policy":0,"session_state":"fd6492e3-34f2-4f13-b908-3c56e59d0af3","scope":"email profile"}
Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7f9c183e7c88>

Below is my code:

def create_ssl_context():
    
    _ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    _ssl_context.options |= ssl.OP_NO_SSLv2
    _ssl_context.options |= ssl.OP_NO_SSLv3
    _ssl_context.verify_mode = ssl.CERT_NONE
    _ssl_context.check_hostname = False
    _ssl_context.load_default_certs()
    
    return _ssl_context

class CustomTokenProvider(AbstractTokenProvider):
        async def token(self):
            return asyncio.get_running_loop().run_in_executor(
                None, self._token)
        def _token(self):
            token_url = 'https://test.com/auth/realms/pro-realm/protocol/openid-connect/token'
            client = BackendApplicationClient(client_id='adfafd')
            oauth = OAuth2Session(client=client)
            token_json = oauth.fetch_token(token_url=token_url, client_id='adfasdf', client_secret='adfadf')
            token = token_json['access_token']

            return token

loop = asyncio.get_event_loop()

async def consume():
    consumer = AIOKafkaConsumer(
        "test-4.stream", 
        loop=loop, 
        sasl_oauth_token_provider=CustomTokenProvider(),
        security_protocol="SASL_SSL", 
        sasl_mechanism="OAUTHBEARER",
        enable_auto_commit=False,
        ssl_context=create_ssl_context(),
        bootstrap_servers='test.com:9094')
    # Get cluster layout and topic/partition allocation
    await consumer.start()
    try:
        async for msg in consumer:
            print(msg.value)
    finally:
        await consumer.stop()

loop.run_until_complete(consume())

@krlx
Copy link

krlx commented Dec 15, 2020

Any updates on this? Having the same issue

@chrisbarker
Copy link

@davidmontgom & @krlx - I had a similar problem and resolved it by awaiting the future in CustomTokenProvider. Try this change

        async def token(self):
            return await asyncio.get_running_loop().run_in_executor(
                None, self._token)

@benschumacher
Copy link

It seems that the documentation is inconsistent with the fixes provided by @mtomilov.

https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.abc.AbstractTokenProvider.token

I'd be happy to submit a PR for this. Should this issue be closed?

@ods
Copy link
Collaborator

ods commented Sep 13, 2024

Hi @benschumacher,

The build of docs was broken at the time of the latest release, that's why the stable docs wasn't updated. But you can see these changes in the latest: https://aiokafka.readthedocs.io/en/latest/api.html#aiokafka.abc.AbstractTokenProvider.token

If you still think something have to be fixed, contributions are welcome.

@benschumacher
Copy link

benschumacher commented Sep 13, 2024

Thanks for the reply. We made a change to await from the token provider and it is working as expected. I was confused about the mismatch between the published docs and what the code comments had, but the doc build being broken clarifies that. Cheers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants