Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean buffer after rebalancing for batch queue #3269

Merged
merged 18 commits into from
Apr 13, 2021
Merged

Clean buffer after rebalancing for batch queue #3269

merged 18 commits into from
Apr 13, 2021

Conversation

jliunyu
Copy link
Contributor

@jliunyu jliunyu commented Feb 18, 2021

Summary: This is bug fix for confluentinc/confluent-kafka-python#1013

Issue: Buffer is not cleaned after rebalance if messages are polled using the batch queue method, so the consumer will still get old messages.

Solution: when assign happens, a new op event with type RD_KAFKA_OP_BARRIER will be created, a new version is been created at mean time. If the consumer met this event, will clean the buffer by comparing the version of msgs and the new version just created.

Test passed in local test:
Testing:
[0122_buffer_cleaning_after_rebalance/ 62.283s] ================= Test 0122_buffer_cleaning_after_rebalance PASSED =================
[

/ 63.161s] ALL-TESTS: duration 63160.668ms
TEST 20210224223610 (bare, scenario default) SUMMARY
#==================================================================#
| | PASSED | 63.161s |
| 0122_buffer_cleaning_after_rebalance | PASSED | 62.283s |
#==================================================================#
[ / 63.163s] 0 thread(s) in use by librdkafka
[ / 63.163s]
============== ALL TESTS PASSED ==============

./test-runner in bare mode PASSED!

@jliunyu jliunyu marked this pull request as draft February 18, 2021 02:13
@jliunyu jliunyu marked this pull request as ready for review February 18, 2021 02:13
src/rdkafka_partition.c Outdated Show resolved Hide resolved
src/rdkafka_queue.c Outdated Show resolved Hide resolved
src/rdkafka_queue.c Outdated Show resolved Hide resolved
src/rdkafka_queue.c Outdated Show resolved Hide resolved
src/rdkafka_queue.c Outdated Show resolved Hide resolved
src/rdkafka_queue.c Outdated Show resolved Hide resolved
src/rdkafka_queue.c Outdated Show resolved Hide resolved
Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work!
See comments.

src/rdkafka.c Show resolved Hide resolved
src/rdkafka_msg.c Outdated Show resolved Hide resolved
src/rdkafka_op.c Outdated Show resolved Hide resolved
src/rdkafka_op.h Outdated Show resolved Hide resolved
src/rdkafka_op.h Outdated Show resolved Hide resolved
tests/0122-buffer_cleaning_after_rebalance.c Outdated Show resolved Hide resolved
tests/0122-buffer_cleaning_after_rebalance.c Show resolved Hide resolved
src/rdkafka.c Outdated Show resolved Hide resolved
src/rdkafka_queue.c Outdated Show resolved Hide resolved
src/rdkafka_queue.c Outdated Show resolved Hide resolved
Issue: Buffer is not cleaned after rebalance if messages are polled using the batch queue method, so the consumer will still get old messages.

Solution: when assign happens, a new op event with type RD_KAFKA_OP_BARRIER will be created, a new version is been created at mean time. If the consumer met this event, will clean the buffer by comparing the version of msgs and the new version just created.
Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost there! Good work!

src/rdkafka_op.c Outdated Show resolved Hide resolved
src/rdkafka_partition.c Outdated Show resolved Hide resolved
src/rdkafka_partition.c Show resolved Hide resolved
src/rdkafka_partition.c Outdated Show resolved Hide resolved
src/rdkafka_partition.c Outdated Show resolved Hide resolved
tests/0122-buffer_cleaning_after_rebalance.c Outdated Show resolved Hide resolved
tests/0122-buffer_cleaning_after_rebalance.c Outdated Show resolved Hide resolved
tests/0122-buffer_cleaning_after_rebalance.c Outdated Show resolved Hide resolved
tests/test.c Outdated Show resolved Hide resolved
tests/0122-buffer_cleaning_after_rebalance.c Show resolved Hide resolved
Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good stuff!

I'm a bit concerned with the test, I don't think it actually triggers/verifies the buggy behaviour.

src/rdkafka_op.c Outdated Show resolved Hide resolved
src/rdkafka_partition.c Outdated Show resolved Hide resolved
src/rdkafka_partition.c Show resolved Hide resolved
src/rdkafka_queue.c Outdated Show resolved Hide resolved
if (rd_kafka_op_version_outdated(rko, version)) {
/* This also destroys the corresponding rkmessage. */
rd_kafka_op_destroy(rko);
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps as an optimization to avoid a write shuffle: else if (i > valid_count++)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Magnus, thanks for pointing this out! I think copy should before valid_count + 1, so I change this part to the below logic:

if (rd_kafka_op_version_outdated(rko, version)) {
/* This also destroys the corresponding rkmessage. */
rd_kafka_op_destroy(rko);
} else if ((size_t)i > valid_count) {
rkmessages[valid_count++] = rkmessages[i];
} else {
valid_count++;
}

src/rdkafka_queue.c Outdated Show resolved Hide resolved
tests/0122-buffer_cleaning_after_rebalance.c Outdated Show resolved Hide resolved
tests/0122-buffer_cleaning_after_rebalance.c Outdated Show resolved Hide resolved
tests/0122-buffer_cleaning_after_rebalance.c Outdated Show resolved Hide resolved
tests/0122-buffer_cleaning_after_rebalance.c Show resolved Hide resolved
Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really close now!

src/rdkafka_queue.c Outdated Show resolved Hide resolved
src/rdkafka_queue.c Outdated Show resolved Hide resolved
tests/0122-buffer_cleaning_after_rebalance.c Outdated Show resolved Hide resolved
tests/0122-buffer_cleaning_after_rebalance.c Outdated Show resolved Hide resolved
tests/0122-buffer_cleaning_after_rebalance.c Outdated Show resolved Hide resolved
tests/0122-buffer_cleaning_after_rebalance.c Outdated Show resolved Hide resolved
tests/0122-buffer_cleaning_after_rebalance.c Outdated Show resolved Hide resolved
tests/0122-buffer_cleaning_after_rebalance.c Show resolved Hide resolved
tests/0122-buffer_cleaning_after_rebalance.c Outdated Show resolved Hide resolved
tests/0122-buffer_cleaning_after_rebalance.c Outdated Show resolved Hide resolved
Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great! Good work on this rather hairy bug, Jing!



/**
* @brief Produce 400 messages and consume 500 messages totally by 2 consumers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good !

@edenhill edenhill merged commit b31363f into confluentinc:master Apr 13, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants