-
Notifications
You must be signed in to change notification settings - Fork 539
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Message offset adjust ticket #1010 #1017
Message offset adjust ticket #1010 #1017
Conversation
There are too many files changed in this PR. Please check your coding style. We don't want to use wildcard imports, each individual package needs to be explicitly specified and imported. |
564b436
to
4a2f234
Compare
Done. Please review the latest one. This one is rebased on most recent master and leaves my commits clear to read. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the fix needs to be that complicated.
To distinguish the case between:
- offset progression no longer go sequential because consumer rebalance happens
- offset progression jumps because of transactional message
We can use ConsumerRebalanceListener to listen for consumer rebalance events, we already used SecurMessageIterator for those callback events, we can a bit more logic there to clear the lastSeenOffsets as well as remove underlying files if the partition assignment changes.
If we do that cleanup in the ConsumerRebalanceListener, we can simply change != to < in adjustOffset() method.
How about a bit enhancement over the above approach? Instead of removing the underlying files when rebalancing happens, let's upload the file and commit the offset in the partition revoked notification. Therefore, consumers don't need to rewrite the same data. |
If the rebalance happens, the files the current consumer was working on are no longer source of truth. The other consumer can already upload all or part of those offsets to S3. The other situation is the consumer was working on partition 0 for some time, then rebalance event 1 happens, he lost partition 0 but he still keep some unfinished partition 0 in local filesystem. After sometime another rebalance event 2 happens, he got back partition 0 but the offset jumps ahead already. If he continues appending the new messages from partition 0 to local files, those files will have a gap on message offsets. So it's best to clear all local files when offset inconsistency discovers. The messages are not lost because some other consumers already uploaded those offsets to S3. The transactional message is a bit different story, that's why we need to distinguish between this case and consumer rebalance. |
Thank you! Here, I understand and agree with your points here regarding the other customer takes over and the possibility about offset jump when a previous partition assigned back . Here, my thinking is we don't need to clear the local files to ensure consistency.Instead, we can do a upload and commit offset in the "partition revoke" callback given the kafka consumerRebalanceListener Api's promises (http://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html). The api document says "this method will be called before a rebalance operation starts and after the consumer stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a custom offset store to prevent duplicate data". Therefore, we can safely upload, commit the offset and clear the lastseen offset after process existing messages buffer in the "partition revoke callback". It won't cause data inconsistency issue given the other consumer will start from the newly committed offset, as they work sequentially without any messages overlap. By doing this, the benefit is that the other customer don't need to reprocess the same data. Let me know your thoughts. Thank you again for sharing your thoughts! |
If you add the uploading code in the ConsumerRebalanceListener, the
uploading will clear the local files automatically so the old check and
file clear might not be really needed. It's probably also a good idea to
clear the lastSeen data structure on that rebalance listener so everything
start fresh after rebalance.
We don't have that consumer rebalance listener before, but this new
listener we actually can do things more cleanly.
…On Mon, Oct 21, 2019 at 11:12 AM breadpowder ***@***.***> wrote:
Thank you! Here, I understand and agree with your points here regarding
the other customer took over and the possibility about offset jump when a
previous partition assigned back .
Here, my thinking is we don't need to clear the local files to ensure
consistency, instead, we can do a upload and commit offset in the
"partition revoke" callback given the kafka consumerRebalanceListener Api's
promises (
http://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html).
The api document says "this method will be called before a rebalance
operation starts and after the consumer stops fetching data. It is
recommended that offsets should be committed in this callback to either
Kafka or a custom offset store to prevent duplicate data".
Therefore, we can safely upload and commit offset after process existing
messages buffer in the "partition revoke callback". It won't cause data
inconsistency issue given the other consumer will start from the newly
committed offset, as they work sequentially without any offset overlapping.
By doing this, the benefit is that the other customer don't need to
reprocess the same data. Let me know your thoughts. Thank you again for
sharing your thoughts!
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#1017?email_source=notifications&email_token=ABYJP7Z6WHSKVTWAHMPQGCLQPXWJJA5CNFSM4JCJGYLKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEB3IQSQ#issuecomment-544639050>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ABYJP7ZHNUZ3MD55HV3UTW3QPXWJJANCNFSM4JCJGYLA>
.
|
Sure. will commit soon. |
Looking forward to your PR
…On Mon, Oct 21, 2019 at 3:48 PM breadpowder ***@***.***> wrote:
Sure. will commit soon.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#1017?email_source=notifications&email_token=ABYJP753CG4IGF3VRQIS7EDQPYWUBA5CNFSM4JCJGYLKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEB4BH6Q#issuecomment-544740346>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ABYJP74YRR32GDQLPZRRR33QPYWUBANCNFSM4JCJGYLA>
.
|
4a2f234
to
dda16af
Compare
As discussed, rebalance logic is added into SecorConsumerRebalanceListener and RebalanceHandler. Please review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, the code change looks fine. But there are too many lines changed to make the diff review difficult. I wasn't quite sure why you need to restructure the class/inner-classes in SecorKafkaMessageIterator, it seems you can can just add the new reset logic in the ConsumerRebalancerListener inner class.
topicPartition.getTopic(),topicPartition.getPartition(),lastSeenOffset, offset); | ||
} else { | ||
if (offset < lastSeenOffset + 1) { | ||
LOG.warn("offset for topic {} partition {} goes back from {} to {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we throw an exception here? I don't think we should see this happen with the code change in Rebalance listener?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For compatibility with LegacyKafkaMessageIterator, since no such listener exists, we can't throw exception here. Any idea?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would prefer to use a separator class for RebalanceListener since rebalance and handler logic is separate concern from message iterator so better to be a complete seperate unit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know if the above is ok, will push shortly.
#fixed offset
dda16af
to
2c7eec6
Compare
Just happened to find that this issue also related another ticket #682 |
lgtm, thanks for the effort. |
This is for ticket id #1010 regarding offset.
The offset validation logic is refactored a bit to ensure working for both transactional and non-transactional mode. Since:
Now the offset logic in message writer works: If the current processing message offset is not greater than "last seen offset" (messages have written), it means there are rebalances and/or the broker is replaying message from last committed offset/failure point. Since the messages before offsets are written, we can safely trim those message to remove duplicates.
The existing commit offset validation logic in upload class remains intact . The existing logic ensures that after rebalances, if another consumer has made progress on a same topic/partition, we either trims those files or trim corresponding completed offset.