From 445e7a2727d5cde323a598c1c8be4ce0705c5818 Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Mon, 19 Apr 2021 16:28:11 +0200 Subject: [PATCH] Sticky assignor: fix balancing and reassignment issues due to incorrect count comparison (#3306) --- CHANGELOG.md | 2 + src/rdkafka_cgrp.c | 2 +- src/rdkafka_sticky_assignor.c | 131 ++++++++++++++++++++++++++++++++-- 3 files changed, 130 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2bbe3e42ec..1732592a74 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,6 +65,8 @@ librdkafka v1.7.0 is feature release: accumulated messages for revoked partitions were not purged, which would pass messages to the application for partitions that were no longer owned by the consumer. Fixed by @jliunyu. #3340. + * Fix balancing and reassignment issues with the cooperative-sticky assignor. + #3306. * The consumer group deemed cached metadata up to date by checking `topic.metadata.refresh.interval.ms`: if this property was set too low it would cause cached metadata to be unusable and new metadata to be fetched, diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 4b05a1fd56..a76bbfac5a 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -1154,7 +1154,7 @@ static void rd_kafka_cgrp_rejoin (rd_kafka_cgrp_t *rkcg, const char *fmt, ...) { * @param member_cnt Number of elements in members. * @param par_cnt The total number of partitions expected to be collected. * @param collect_owned If rd_true, rkgm_owned partitions will be collected, - * else rdgm_assignment partitions will be collected. + * else rkgm_assignment partitions will be collected. */ static map_toppar_member_info_t * rd_kafka_collect_partitions (const rd_kafka_group_member_t *members, diff --git a/src/rdkafka_sticky_assignor.c b/src/rdkafka_sticky_assignor.c index fa8314c435..1bafdb78f5 100644 --- a/src/rdkafka_sticky_assignor.c +++ b/src/rdkafka_sticky_assignor.c @@ -733,15 +733,15 @@ isBalanced (rd_kafka_t *rk, consumerPartitions = (const rd_kafka_topic_partition_list_t *) elem->value; + potentialTopicPartitions = + RD_MAP_GET(consumer2AllPotentialPartitions, consumer); + /* Skip if this consumer already has all the topic partitions * it can get. */ - if (consumerPartitions->cnt == - (int)RD_MAP_CNT(consumer2AllPotentialPartitions)) + if (consumerPartitions->cnt == potentialTopicPartitions->cnt) continue; /* Otherwise make sure it can't get any more partitions */ - potentialTopicPartitions = - RD_MAP_GET(consumer2AllPotentialPartitions, consumer); for (i = 0 ; i < potentialTopicPartitions->cnt ; i++) { const rd_kafka_topic_partition_t *partition = @@ -3293,6 +3293,128 @@ static int ut_testStickiness (rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { } +/** + * @brief Verify stickiness across three rebalances. + */ +static int +ut_testStickiness2 (rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { + rd_kafka_resp_err_t err; + char errstr[512]; + rd_kafka_metadata_t *metadata; + rd_kafka_group_member_t members[3]; + int member_cnt = RD_ARRAYSIZE(members); + int i; + + metadata = rd_kafka_metadata_new_topic_mockv(1, "topic1", 6); + + ut_init_member(&members[0], "consumer1", "topic1", NULL); + ut_init_member(&members[1], "consumer2", "topic1", NULL); + ut_init_member(&members[2], "consumer3", "topic1", NULL); + + /* Just consumer1 */ + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, + members, 1, + errstr, sizeof(errstr)); + RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); + + verifyValidityAndBalance(members, 1, metadata); + isFullyBalanced(members, 1); + verifyAssignment(&members[0], + "topic1", 0, + "topic1", 1, + "topic1", 2, + "topic1", 3, + "topic1", 4, + "topic1", 5, + NULL); + + /* consumer1 and consumer2 */ + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, + members, 2, + errstr, sizeof(errstr)); + RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); + + verifyValidityAndBalance(members, 2, metadata); + isFullyBalanced(members, 2); + verifyAssignment(&members[0], + "topic1", 3, + "topic1", 4, + "topic1", 5, + NULL); + verifyAssignment(&members[1], + "topic1", 0, + "topic1", 1, + "topic1", 2, + NULL); + + /* Run it twice, should be stable. */ + for (i = 0 ; i < 2 ; i++) { + /* consumer1, consumer2, and consumer3 */ + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, + members, 3, + errstr, sizeof(errstr)); + RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); + + verifyValidityAndBalance(members, 3, metadata); + isFullyBalanced(members, 3); + verifyAssignment(&members[0], + "topic1", 4, + "topic1", 5, + NULL); + verifyAssignment(&members[1], + "topic1", 1, + "topic1", 2, + NULL); + verifyAssignment(&members[2], + "topic1", 0, + "topic1", 3, + NULL); + } + + /* Remove consumer1 */ + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, + &members[1], 2, + errstr, sizeof(errstr)); + RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); + + verifyValidityAndBalance(&members[1], 2, metadata); + isFullyBalanced(&members[1], 2); + verifyAssignment(&members[1], + "topic1", 1, + "topic1", 2, + "topic1", 5, + NULL); + verifyAssignment(&members[2], + "topic1", 0, + "topic1", 3, + "topic1", 4, + NULL); + + /* Remove consumer2 */ + err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata, + &members[2], 1, + errstr, sizeof(errstr)); + RD_UT_ASSERT(!err, "assignor run failed: %s", errstr); + + verifyValidityAndBalance(&members[2], 1, metadata); + isFullyBalanced(&members[2], 1); + verifyAssignment(&members[2], + "topic1", 0, + "topic1", 1, + "topic1", 2, + "topic1", 3, + "topic1", 4, + "topic1", 5, + NULL); + + for (i = 0 ; i < member_cnt ; i++) + rd_kafka_group_member_clear(&members[i]); + rd_kafka_metadata_destroy(metadata); + + RD_UT_PASS(); +} + + static int ut_testAssignmentUpdatedForDeletedTopic (rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { @@ -3458,6 +3580,7 @@ static int rd_kafka_sticky_assignor_unittest (void) { ut_testNewSubscription, ut_testMoveExistingAssignments, ut_testStickiness, + ut_testStickiness2, ut_testAssignmentUpdatedForDeletedTopic, ut_testNoExceptionThrownWhenOnlySubscribedTopicDeleted, ut_testConflictingPreviousAssignments,