Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow committed offsets refresh #375

Merged
merged 2 commits into from
May 8, 2018

Conversation

rtimush
Copy link
Contributor

@rtimush rtimush commented Dec 6, 2017

This pull request solves the problem described in https://issues.apache.org/jira/browse/KAFKA-3806 and #301.

In some workloads, it is possible that no events are sent to a given topic/partition for 24 hours. By default, Kafka evicts committed offsets after this period, so if the consumer is then restarted it has no stored offset to start with. Depending on the auto.offset.reset it may result in re-processing of the whole history, losing some recent events or a failure.

This pull request allows a consumer to refresh the committed offset on a regular basis even if there are no new messages in the topic. The following logic applies:

  • If the last commit has been successfully completed, the offsets from this commit are sent.
  • If there were no messages at all since the consumer started, the original offset for a partition is sent.
  • If the last commit has failed, no offset refresh for the partitions affected happens.

The feature should be explicitly enabled by calling withCommitRefreshInterval on the ConsumerSettings, so existing client won't be affected.

There is an integration test for this feature, but it takes a few minutes to run because the eviction timeout is configured in minutes and cannot be fractional.

@bsideup
Copy link

bsideup commented Feb 19, 2018

@rtimush @ktoso @patriknw do you have any updates on the status of this PR?

Copy link

@fchaillou fchaillou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worked with the reporter of #301 and i can confirm that this would perfectly fix our issue.
Hope to see it coming in a future version of reactive-kafka

@ktoso
Copy link
Contributor

ktoso commented Apr 9, 2018

We'll soon have @ennru join us and spend considerable time on Alpakka projects, which includes this, which means quicker turnaround in merging things very soon :-) Thanks for your patience, Alpakka has proven it self and is going to get the attention it deserves very soon :-)

@ktoso ktoso added this to the 0.21 milestone Apr 9, 2018
@fchaillou
Copy link

@ktoso no worries, I'm interested in these features so i'm doing what i can to help you in my own way !
Welcome and good luck to @ennru then 👍

@ennru
Copy link
Member

ennru commented May 3, 2018

Good work and welcome to the Kafka connector project!

Could you rebase, please?

@rtimush rtimush force-pushed the rtimush-301-offsets-refresh branch from 83803c4 to 9040389 Compare May 3, 2018 09:58
@rtimush
Copy link
Contributor Author

rtimush commented May 3, 2018

Thanks, @ennru. Rebased.

@ennru
Copy link
Member

ennru commented May 3, 2018

Great, thank you.
I'd like to see some documentation for this feature. We'd at least need a section in reference.conf.

@rtimush rtimush force-pushed the rtimush-301-offsets-refresh branch from 9040389 to 05e0d8e Compare May 4, 2018 11:34
@rtimush
Copy link
Contributor Author

rtimush commented May 4, 2018

Makes sense, I added parameters to the reference.conf and also added a small note in the consumer docs.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

# If enabled the consumer will re-send last committed offsets periodically
# for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682.
commit-refresh-enabled = false
commit-refresh-interval = 1m
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it not be nicer to have one value, and it being infinite being "disabled"? That's usually how we have such settings in Akka and Akka HTTP

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is indeed much nicer, but I didn't want to introduce a custom config behavior. As you say it is already done like this in akka, I will change the code here to use a single config value too.

Copy link
Contributor

@ktoso ktoso left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One comment to simplify settings, looks good otherwise

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, better with just one setting.

@ennru ennru merged commit 7a22a2e into akka:master May 8, 2018
@ennru
Copy link
Member

ennru commented May 8, 2018

Thank you for your contribution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants