-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow committed offsets refresh #375
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I worked with the reporter of #301 and i can confirm that this would perfectly fix our issue.
Hope to see it coming in a future version of reactive-kafka
We'll soon have @ennru join us and spend considerable time on Alpakka projects, which includes this, which means quicker turnaround in merging things very soon :-) Thanks for your patience, Alpakka has proven it self and is going to get the attention it deserves very soon :-) |
Good work and welcome to the Kafka connector project! Could you rebase, please? |
83803c4
to
9040389
Compare
Thanks, @ennru. Rebased. |
Great, thank you. |
9040389
to
05e0d8e
Compare
Makes sense, I added parameters to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
# If enabled the consumer will re-send last committed offsets periodically | ||
# for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682. | ||
commit-refresh-enabled = false | ||
commit-refresh-interval = 1m |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it not be nicer to have one value, and it being infinite
being "disabled"? That's usually how we have such settings in Akka and Akka HTTP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is indeed much nicer, but I didn't want to introduce a custom config behavior. As you say it is already done like this in akka, I will change the code here to use a single config value too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One comment to simplify settings, looks good otherwise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, better with just one setting.
Thank you for your contribution! |
This pull request solves the problem described in https://issues.apache.org/jira/browse/KAFKA-3806 and #301.
In some workloads, it is possible that no events are sent to a given topic/partition for 24 hours. By default, Kafka evicts committed offsets after this period, so if the consumer is then restarted it has no stored offset to start with. Depending on the
auto.offset.reset
it may result in re-processing of the whole history, losing some recent events or a failure.This pull request allows a consumer to refresh the committed offset on a regular basis even if there are no new messages in the topic. The following logic applies:
The feature should be explicitly enabled by calling
withCommitRefreshInterval
on theConsumerSettings
, so existing client won't be affected.There is an integration test for this feature, but it takes a few minutes to run because the eviction timeout is configured in minutes and cannot be fractional.