Skip to content

Commit

Permalink
Sticky assignor: fix balancing and reassignment issues due to incorre…
Browse files Browse the repository at this point in the history
…ct count comparison (#3306)
  • Loading branch information
edenhill committed Apr 19, 2021
1 parent 7faf33a commit 445e7a2
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ librdkafka v1.7.0 is feature release:
accumulated messages for revoked partitions were not purged, which would
pass messages to the application for partitions that were no longer owned
by the consumer. Fixed by @jliunyu. #3340.
* Fix balancing and reassignment issues with the cooperative-sticky assignor.
#3306.
* The consumer group deemed cached metadata up to date by checking
`topic.metadata.refresh.interval.ms`: if this property was set too low
it would cause cached metadata to be unusable and new metadata to be fetched,
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,7 @@ static void rd_kafka_cgrp_rejoin (rd_kafka_cgrp_t *rkcg, const char *fmt, ...) {
* @param member_cnt Number of elements in members.
* @param par_cnt The total number of partitions expected to be collected.
* @param collect_owned If rd_true, rkgm_owned partitions will be collected,
* else rdgm_assignment partitions will be collected.
* else rkgm_assignment partitions will be collected.
*/
static map_toppar_member_info_t *
rd_kafka_collect_partitions (const rd_kafka_group_member_t *members,
Expand Down
131 changes: 127 additions & 4 deletions src/rdkafka_sticky_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -733,15 +733,15 @@ isBalanced (rd_kafka_t *rk,
consumerPartitions = (const rd_kafka_topic_partition_list_t *)
elem->value;

potentialTopicPartitions =
RD_MAP_GET(consumer2AllPotentialPartitions, consumer);

/* Skip if this consumer already has all the topic partitions
* it can get. */
if (consumerPartitions->cnt ==
(int)RD_MAP_CNT(consumer2AllPotentialPartitions))
if (consumerPartitions->cnt == potentialTopicPartitions->cnt)
continue;

/* Otherwise make sure it can't get any more partitions */
potentialTopicPartitions =
RD_MAP_GET(consumer2AllPotentialPartitions, consumer);

for (i = 0 ; i < potentialTopicPartitions->cnt ; i++) {
const rd_kafka_topic_partition_t *partition =
Expand Down Expand Up @@ -3293,6 +3293,128 @@ static int ut_testStickiness (rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) {
}


/**
* @brief Verify stickiness across three rebalances.
*/
static int
ut_testStickiness2 (rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) {
rd_kafka_resp_err_t err;
char errstr[512];
rd_kafka_metadata_t *metadata;
rd_kafka_group_member_t members[3];
int member_cnt = RD_ARRAYSIZE(members);
int i;

metadata = rd_kafka_metadata_new_topic_mockv(1, "topic1", 6);

ut_init_member(&members[0], "consumer1", "topic1", NULL);
ut_init_member(&members[1], "consumer2", "topic1", NULL);
ut_init_member(&members[2], "consumer3", "topic1", NULL);

/* Just consumer1 */
err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata,
members, 1,
errstr, sizeof(errstr));
RD_UT_ASSERT(!err, "assignor run failed: %s", errstr);

verifyValidityAndBalance(members, 1, metadata);
isFullyBalanced(members, 1);
verifyAssignment(&members[0],
"topic1", 0,
"topic1", 1,
"topic1", 2,
"topic1", 3,
"topic1", 4,
"topic1", 5,
NULL);

/* consumer1 and consumer2 */
err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata,
members, 2,
errstr, sizeof(errstr));
RD_UT_ASSERT(!err, "assignor run failed: %s", errstr);

verifyValidityAndBalance(members, 2, metadata);
isFullyBalanced(members, 2);
verifyAssignment(&members[0],
"topic1", 3,
"topic1", 4,
"topic1", 5,
NULL);
verifyAssignment(&members[1],
"topic1", 0,
"topic1", 1,
"topic1", 2,
NULL);

/* Run it twice, should be stable. */
for (i = 0 ; i < 2 ; i++) {
/* consumer1, consumer2, and consumer3 */
err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata,
members, 3,
errstr, sizeof(errstr));
RD_UT_ASSERT(!err, "assignor run failed: %s", errstr);

verifyValidityAndBalance(members, 3, metadata);
isFullyBalanced(members, 3);
verifyAssignment(&members[0],
"topic1", 4,
"topic1", 5,
NULL);
verifyAssignment(&members[1],
"topic1", 1,
"topic1", 2,
NULL);
verifyAssignment(&members[2],
"topic1", 0,
"topic1", 3,
NULL);
}

/* Remove consumer1 */
err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata,
&members[1], 2,
errstr, sizeof(errstr));
RD_UT_ASSERT(!err, "assignor run failed: %s", errstr);

verifyValidityAndBalance(&members[1], 2, metadata);
isFullyBalanced(&members[1], 2);
verifyAssignment(&members[1],
"topic1", 1,
"topic1", 2,
"topic1", 5,
NULL);
verifyAssignment(&members[2],
"topic1", 0,
"topic1", 3,
"topic1", 4,
NULL);

/* Remove consumer2 */
err = rd_kafka_assignor_run(rk->rk_cgrp, rkas, metadata,
&members[2], 1,
errstr, sizeof(errstr));
RD_UT_ASSERT(!err, "assignor run failed: %s", errstr);

verifyValidityAndBalance(&members[2], 1, metadata);
isFullyBalanced(&members[2], 1);
verifyAssignment(&members[2],
"topic1", 0,
"topic1", 1,
"topic1", 2,
"topic1", 3,
"topic1", 4,
"topic1", 5,
NULL);

for (i = 0 ; i < member_cnt ; i++)
rd_kafka_group_member_clear(&members[i]);
rd_kafka_metadata_destroy(metadata);

RD_UT_PASS();
}


static int
ut_testAssignmentUpdatedForDeletedTopic (rd_kafka_t *rk,
const rd_kafka_assignor_t *rkas) {
Expand Down Expand Up @@ -3458,6 +3580,7 @@ static int rd_kafka_sticky_assignor_unittest (void) {
ut_testNewSubscription,
ut_testMoveExistingAssignments,
ut_testStickiness,
ut_testStickiness2,
ut_testAssignmentUpdatedForDeletedTopic,
ut_testNoExceptionThrownWhenOnlySubscribedTopicDeleted,
ut_testConflictingPreviousAssignments,
Expand Down

0 comments on commit 445e7a2

Please sign in to comment.