-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Event callbacks #137
Comments
+1 on this. I want to be able to log state changes in topic+partition leadership, so I can say whether the topic+partition is "up" or "down". |
+1 |
@DEvil0000 Pause/Resume is now available on master branch. |
Thanks, is the state now also observable? |
No sir
|
+1 Some error states (such as misconfiguration of the broker URL) would cause our application log to be spammed rather enthusiastically with errors such as FAIL: 127.0.0.1:10002/0: Connect to ipv4#127.0.0.1:10002 failed: Connection refused To avoid this we use a heuristic approach: if we see an error, we set a state variable to 'in error' and increment an error count; if this reaches 10, we stop logging until we come out of the error state, which is the tricky bit. At the moment the best we have come up with is to watch the output queue (our application is a producer): if it has got shorter, or if it's empty and we have seen no new errors for a few seconds, we log that the error state is over and reset the error count. This mostly works, but is very occasionally prone to spurious flipping between states, especially if we are not producing any messages at the time. We would find it very useful indeed to have a way of knowing whether the application is connected to at least one broker. |
Repeated errors for the same broker (and address!) should be suppressed, but if the broker resolves to multiple addresses or multiple address families (which localhost does on some OS:es, providing both ipv4 and ipv6 addresses and Kafka typically only listens to one family) there might be an endless ping-pong of logs since librdkafka round-robins all addresses for a broker.
I'm guessing you want this to be push notifications (e.g., a callback or event served through ..poll()) rather than an API you need to call to check it, right? If so, an event like thing, what would be triggering factors and what info should the event contain? |
We could use either a push notification or a query API. If it was a push event, then the trigger and info you suggest would suit us very well. |
If all one needs is whether broker is up or down, one may call rd_kafka_metadata() to detect broker unreachable,which returns RD_KAFKA_RESP_ERR__TRANSPORT if broker is down. Is there anything wrong with that approach? |
@asharma339 This will do the trick, but the question is when to to do it and how often (there is a cost involved on the broker). |
For consumer, I was thinking about doing every time when message fetched |
Shouldn't ALL_BROKERS_DOWN be sufficient for that? |
I never see ALL_BROKERS_DOWN or any other error while producing OR consuming if I deliberately added junk URLs in call to rd_kafka_brokers_add(). All I see is that if anywhere I call rd_kafka_metadata(), I get RD_KAFKA_RESP_ERR__TRANSPORT. Otherwise, all produce, consume calls succeed. Consumer poll simply retrieves 0 messages. My message delivery callback for producer never gets called with either success or failure. This is all C API. |
If you register an error_cb you should get an ALL_BROKERS_DOWN |
Certainly. Since application would come to know about failures in an async manner, it would need some state to communicate it back to the main thread. Issue is specifically with producer which may want to retry on such failure. But probably failed message would be still in output queue and can be reprocessed if want to retry? |
Yes, the error_cb works well to let us know that things have gone bad and the output queue handles error recovery. Our problem is reliably detecting when the error has been recovered, in order to reset our logging state. Maybe calling rd_kafka_metadata() every few seconds until it succeeds would work for that - as long as it's failing it doesn't cost the brokers anything. |
@rolandyoung That sounds like a good idea |
Metadata query has a quite high cost on the broker and network. Depending on the number of partitions and topics.
Von meinem Samsung Gerät gesendet.
…-------- Ursprüngliche Nachricht --------
Von: Roland Young <notifications@github.com>
Datum: 10.03.2017 09:55 (GMT+01:00)
An: edenhill/librdkafka <librdkafka@noreply.github.com>
Cc: "A. Binzxxxxxx" <alexander@binzberger.de>, Mention <mention@noreply.github.com>
Betreff: Re: [edenhill/librdkafka] Event callbacks (#137)
Yes, the error_cb works well to let us know that things have gone bad and the output queue handles error recovery. Our problem is reliably detecting when the error has been recovered, in order to reset our logging state. Maybe calling rd_kafka_metadata() every few seconds until it succeeds would work for that - as long as it's failing it doesn't cost the brokers anything.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.
{"api_version":"1.0","publisher":{"api_key":"05dde50f1d1a384dd78767c55493e4bb","name":"GitHub"},"entity":{"external_key":"github/edenhill/librdkafka","title":"edenhill/librdkafka","subtitle":"GitHub repository","main_image_url":"https://cloud.githubusercontent.com/assets/143418/17495839/a5054eac-5d88-11e6-95fc-7290892c7bb5.png","avatar_image_url":"https://cloud.githubusercontent.com/assets/143418/15842166/7c72db34-2c0b-11e6-9aed-b52498112777.png","action":{"name":"Open in GitHub","url":"https://github.com/edenhill/librdkafka"}},"updates":{"snippets":[{"icon":"PERSON","message":"@rolandyoung in #137: Yes, the error_cb works well to let us know that things have gone bad and the output queue handles error recovery. Our problem is reliably detecting when the error has been recovered, in order to reset our logging state. Maybe calling rd_kafka_metadata() every few seconds until it succeeds would work for that - as long as it's failing it doesn't cost the brokers anything."}],"action":{"name":"View Issue","url":"#137 (comment)"}}}
|
Events:
Callback set with
rd_kafka_event_cb_set(rk, ..)
.Served by
rd_kafka_poll()
The text was updated successfully, but these errors were encountered: