-
Notifications
You must be signed in to change notification settings - Fork 905
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
oauth token not refreshing #1485
Comments
I have a similar issue when using the oauth_cb configuration on version 1.9.2. I start my application and get a new token with an expiration of
I consume some messages successfully. Then at
But at
So something appears to be happening where the new token is not being used after it is refreshed. |
I confirmed that this is also happening on version 1.9.0. |
Went all the way back to version 1.6.1 and it continued to have this issue. I'm having a hard time believing that this would be a bug in the library for almost 2 years. I wonder if there's something else going on. |
Something I noticed that may be contributing to this is that librdkafka copies the token value from the handle into another state object: https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_sasl_oauthbearer.c#L1256 If that copying only happens the first time we're connecting and the library only uses the state's token value, I could see that the first token fetched could be the only one passed to the broker. I haven't been able to prove this though as I haven't setup the project to debug locally. |
Confirmed that the issue also happens without the
|
Confirmed this is still happening in version 2.0.2. |
Circling back here, the above change didn't actually work in the production environment. Both our production and development environment broker configurations are identical. We've also confirmed that the newly generated tokens in the |
Providing some logs here. You can see the library states that the token was refreshed at 15:01:51.976
You then see the group authorization errors starting at 15:07:51 pm (1683659271.105) which is exactly the time the first token expires.
After a while of this, the session times out and the broker hangs up the connection.
|
@edenhill I apologize for the direct mention, but I could use some help. Is it possible that there is bug in which the old token is used instead of the refreshed token that is fetched from the confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): 2.0.2
Operating system: Rocky Linux 8.5 (Green Obsidian) |
I've also seen instances where the consumer stops consuming messages all together with the following error after the initial token expires.
|
It appears that according to KIP-255, the initial token remains in use so long as that initial connection remains intact. Does this mean that in order to maintain a long-lived connection, we need to explicitly track token expiration and restart consumers/producers or is there some other way to trigger a reconnection? The token refresh callback doesn't seem particularly useful if the new token is not used and we need to track our own expiration dates. It's also quite misleading from an API perspective. |
I have the same issue. with broker version 3.4.0 and confluent-kafka-python 2.1.1. Are there any updates on this? EDIT:
To sum up: |
It looks like librdkafka 2.2.0 included a possible fix for this issue:
|
We've seen evidence that the issue lies within the A working alternative on version 2.2.0 is to use the "sasl.oauthbearer.method": "oidc", "sasl.oauthbearer.client.id", "sasl.oauthbearer.client.secret" and "sasl.oauthbearer.token.endpoint.url" configuration settings to authenticate. This configuration does not exhibit the authentication issues seen when using the Ideally, the bug with |
Description
In my project, I am using
confluent-kafka-python-1.9.2
to consumer and produce messages onto kafka topic. OAuth provider is already set by the other team and token gets expires after 30 minutes. In My code I have used this configuration for the consumer._get_token function:
Consumer code:
So while running the consumer, it can fetch the token for the first time and can consume the messages without any issue. but as token expires after 30 minutes, I started getting following error.
confluent_kafka.error.ConsumeError: KafkaError{code=TOPIC_AUTHORIZATION_FAILED,val=29,str="Fetch from broker 31 failed: Broker: Topic authorization failed"}
even I set
expires_in
to 30 seconds or 1 minute, I still get the above error. so I don't understand that_get_token
is called after every 1 minute but when after 30 minutes, I get the above error.I also tried to set
oauthbearer_token_refresh_cb
but got this error:cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="Property "oauthbearer_token_refresh_cb" must be set through dedicated .._set_..() function"}
So I would like to know how to refresh token?
How to reproduce
Checklist
Please provide the following information:
confluent_kafka.version()
andconfluent_kafka.libversion()
):{...}
'debug': '..'
as necessary)The text was updated successfully, but these errors were encountered: