-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Is pollFrequency of the RunLoop redundant with the fetch.min.bytes setting of KafkaConsumer? #656
Comments
I focused on the |
Interestingly your issue made me think about this code and might have helped me to find a bug in the code: #661 Thanks 🙂 |
@tnielens Thanks for examining this functionality in detail, it never hurts. Polling serves two needs: fetching new data and keeping up a heartbeat with the broker. The spaced polling serves the second goal. For fetching data, we indeed want to get the records with as little latency as possible but we also need to take backpressure into account (when downstream cannot process records fast enough). Because of this, we poll as soon as we have a request in the queue and pause partitions for which there is not request. On top of that is a buffering mechanism which drives extra requests, this helps to increase throughput. I suppose we could document this a bit clearer, together with the settings that affect it. |
Thanks for the comments 🙏 . Some more questions on the topic
|
About point 1, I think you're right. We could immediately poll again in that case. Could you create a new issue for that? Point 2: not quite. The bufferedRecords in State are records for partitions for which no Request was currently. The only situation I know where this can happen is if new partitions are assigned after rebalancing, in which case there was no chance yet to pause them. To be honest I'm not exactly sure of other circumstances in which this occurs, since partitions without request are always paused. This also made me think about the case when there's a small poll interval and a large lag / high volume of messages to be processed. In that case both the Requests and the scheduled poll commands are driving the polling. I'm not sure if that is efficient, since if there's not a pending request for every call to poll, it would lead to frequent pausing & resuming. It also relates to this #428 (comment) Stuff to be investigated after #590 I suppose.. |
Will do.
Would be nice to document that on the
Maybe |
Thanks guys for this very interesting discussion 🙂 Sorry if my question is stupid. About:
Why do we need to keep a heartbeat? AFAIK, the Kafka clients are already doing this automatically (behind our back, giving us very little, if any, control over it) |
As per section "Detecting Consumer Failures" in the KafkaConsumer javadoc, two different processes signal liveness to the broker, see |
From this line (from the same KafkaConsumer javadoc):
I deduce we do need to continue calling poll for a liveness signal. If we wouldn't, after max.poll.interval.ms, the broker will revoke all assigned partitions from this client. |
Let's continue the discussion in #664 |
Here are the first lines of the zio-kafka
RunLoop
I don't understand the purpose of spacing in time
consumer.poll()
calls to theKafkaConsumer
. From the javadoc, the example polls the consumer in loops without waiting.Isn't that potentially harming latency?
If calls to
consumer.poll
are spaced in time, messages reaching the broker right after the return ofconsumer.poll
call take a latency hit due to thepollFrequency
spacing.For example, given instants
t1 < t2 < t1 + pollTimeout < t3 < t2 + pollFrequency
and aconsumer.poll()
blocking call started att1
, when a message arrives on the broker att2
and thepoll
call returns, theRunLoop
waits untilt2 + pollFrequency
for polling again, and a message arriving att3
takes a latency hit.Maybe it was done to let batches grow on the broker instead of fetching small ones? If so, it seems redundant with settings
KafkaConsumer
already provides. See fetch.min.bytes and fetch.max.wait.ms. These settings let record batches grow up to a certain threshold before returning the response to the caller. By default, theKafkaConsumer
is configured for lowest latency. In order to increase throughput, and at the cost of latency,fetch.min.bytes
can be increased.The text was updated successfully, but these errors were encountered: