Skip to content

Commit

Permalink
Assignor: de-conflate owned/assigned partitions (#3306)
Browse files Browse the repository at this point in the history
rkgm_assignment, which is the member assignment after running the assignor,
was mixed up with rkgm_owned, which is the current member assignment before
running the assignor. This resulted in the sticky assignor not taking the
current assignment into consideration on rebalance and thus not being able to
provide the stickyness.
  • Loading branch information
edenhill committed Apr 20, 2021
1 parent f18e273 commit fd3d4bc
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 12 deletions.
5 changes: 3 additions & 2 deletions src/rdkafka_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -338,13 +338,14 @@ rd_kafka_assignor_run (rd_kafka_cgrp_t *rkcg,
rd_kafka_dbg(rkcg->rkcg_rk, CGRP|RD_KAFKA_DBG_ASSIGNOR,
"ASSIGN",
" Member \"%.*s\"%s with "
"%d assigned partition(s) and "
"%d owned partition(s) and "
"%d subscribed topic(s):",
RD_KAFKAP_STR_PR(member->rkgm_member_id),
!rd_kafkap_str_cmp(member->rkgm_member_id,
rkcg->rkcg_member_id) ?
" (me)":"",
member->rkgm_assignment->cnt,
member->rkgm_owned ?
member->rkgm_owned->cnt : 0,
member->rkgm_subscription->cnt);
for (j = 0 ; j < member->rkgm_subscription->cnt ; j++) {
const rd_kafka_topic_partition_t *p =
Expand Down
13 changes: 13 additions & 0 deletions src/rdkafka_assignor.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,27 @@ typedef enum rd_kafka_rebalance_protocol_t {


typedef struct rd_kafka_group_member_s {
/** Subscribed topics (partition field is ignored). */
rd_kafka_topic_partition_list_t *rkgm_subscription;
/** Partitions assigned to this member after running the assignor.
* E.g., the current assignment coming out of the rebalance. */
rd_kafka_topic_partition_list_t *rkgm_assignment;
/** Partitions reported as currently owned by the member, read
* from consumer metadata. E.g., the current assignment going into
* the rebalance. */
rd_kafka_topic_partition_list_t *rkgm_owned;
/** List of eligible topics in subscription. E.g., subscribed topics
* that exist. */
rd_list_t rkgm_eligible;
/** Member id (e.g., client.id-some-uuid). */
rd_kafkap_str_t *rkgm_member_id;
/** Group instance id. */
rd_kafkap_str_t *rkgm_group_instance_id;
/** Member-specific opaque userdata. */
rd_kafkap_bytes_t *rkgm_userdata;
/** Member metadata, e.g., the currently owned partitions. */
rd_kafkap_bytes_t *rkgm_member_metadata;
/** Group generation id. */
int rkgm_generation;
} rd_kafka_group_member_t;

Expand Down
6 changes: 3 additions & 3 deletions src/rdkafka_sticky_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1237,11 +1237,11 @@ prepopulateCurrentAssignments (
rd_kafka_topic_partition_list_new(
(int)estimated_partition_cnt));

if (!consumer->rkgm_assignment)
if (!consumer->rkgm_owned)
continue;

for (j = 0 ; j < (int)consumer->rkgm_assignment->cnt ; j++) {
partition = &consumer->rkgm_assignment->elems[j];
for (j = 0 ; j < (int)consumer->rkgm_owned->cnt ; j++) {
partition = &consumer->rkgm_owned->elems[j];

consumers = RD_MAP_GET_OR_SET(
&sortedPartitionConsumersByGeneration,
Expand Down
105 changes: 98 additions & 7 deletions tests/0113-cooperative_rebalance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1895,16 +1895,21 @@ static void o_java_interop() {

if (Test::assignment_partition_count(c, NULL) == 4 &&
java_pid != 0 &&
!changed_subscription &&
rebalance_cb.assign_call_cnt == 3) {
!changed_subscription) {
if (rebalance_cb.assign_call_cnt != 2)
Test::Fail(tostr() << "Expecting consumer's assign_call_cnt to be 2, "
"not " << rebalance_cb.assign_call_cnt);
Test::Say(_C_GRN "Java consumer is now part of the group\n");
Test::subscribe(c, topic_name_1);
changed_subscription = true;
}

if (Test::assignment_partition_count(c, NULL) == 1 &&
changed_subscription && rebalance_cb.assign_call_cnt == 4 &&
changed_subscription && !changed_subscription_done) {
/* Depending on the timing of resubscribe rebalancing and the
* Java consumer terminating we might have one or two rebalances,
* hence the fuzzy <=5 and >=5 checks. */
if (Test::assignment_partition_count(c, NULL) == 2 &&
changed_subscription && rebalance_cb.assign_call_cnt <= 5 &&
!changed_subscription_done) {
/* All topic 1 partitions will be allocated to this consumer whether or not the Java
* consumer has unsubscribed yet because the sticky algorithm attempts to ensure
* partition counts are even. */
Expand All @@ -1913,7 +1918,7 @@ static void o_java_interop() {
}

if (Test::assignment_partition_count(c, NULL) == 2 &&
changed_subscription && rebalance_cb.assign_call_cnt == 5 &&
changed_subscription && rebalance_cb.assign_call_cnt >= 5 &&
changed_subscription_done) {
/* When the java consumer closes, this will cause an empty assign rebalance_cb event,
* allowing detection of when this has happened. */
Expand Down Expand Up @@ -2888,6 +2893,91 @@ extern "C" {
SUB_TEST_PASS();
}


/**
* @brief Verify that incremental rebalances retain stickyness.
*/
static void x_incremental_rebalances (void) {
#define _NUM_CONS 3
rd_kafka_t *c[_NUM_CONS];
rd_kafka_conf_t *conf;
const char *topic = test_mk_topic_name("0113_x", 1);
int i;

SUB_TEST();
test_conf_init(&conf, NULL, 60);

test_create_topic(NULL, topic, 6, 1);

test_conf_set(conf, "partition.assignment.strategy", "cooperative-sticky");
for (i = 0 ; i < _NUM_CONS ; i++) {
char clientid[32];
rd_snprintf(clientid, sizeof(clientid), "consumer%d", i);
test_conf_set(conf, "client.id", clientid);

c[i] = test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL);
}
rd_kafka_conf_destroy(conf);

/* First consumer joins group */
TEST_SAY("%s: joining\n", rd_kafka_name(c[0]));
test_consumer_subscribe(c[0], topic);
test_consumer_wait_assignment(c[0], rd_true/*poll*/);
test_consumer_verify_assignment(c[0], rd_true/*fail immediately*/,
topic, 0,
topic, 1,
topic, 2,
topic, 3,
topic, 4,
topic, 5,
NULL);


/* Second consumer joins group */
TEST_SAY("%s: joining\n", rd_kafka_name(c[1]));
test_consumer_subscribe(c[1], topic);
test_consumer_wait_assignment(c[1], rd_true/*poll*/);
rd_sleep(3);
test_consumer_verify_assignment(c[0], rd_false/*fail later*/,
topic, 3,
topic, 4,
topic, 5,
NULL);
test_consumer_verify_assignment(c[1], rd_false/*fail later*/,
topic, 0,
topic, 1,
topic, 2,
NULL);

/* Third consumer joins group */
TEST_SAY("%s: joining\n", rd_kafka_name(c[2]));
test_consumer_subscribe(c[2], topic);
test_consumer_wait_assignment(c[2], rd_true/*poll*/);
rd_sleep(3);
test_consumer_verify_assignment(c[0], rd_false/*fail later*/,
topic, 4,
topic, 5,
NULL);
test_consumer_verify_assignment(c[1], rd_false/*fail later*/,
topic, 1,
topic, 2,
NULL);
test_consumer_verify_assignment(c[2], rd_false/*fail later*/,
topic, 3,
topic, 0,
NULL);

/* Raise any previously failed verify_assignment calls and fail the test */
TEST_LATER_CHECK();

for (i = 0 ; i < _NUM_CONS ; i++)
rd_kafka_destroy(c[i]);

SUB_TEST_PASS();

#undef _NUM_CONS
}

/* Local tests not needing a cluster */
int main_0113_cooperative_rebalance_local (int argc, char **argv) {
a_assign_rapid();
Expand All @@ -2907,7 +2997,7 @@ extern "C" {
c_subscribe_no_cb_test(true/*close consumer*/);

if (test_quick) {
Test::Say("Skipping tests c -> s due to quick mode\n");
Test::Say("Skipping tests >= c_ .. due to quick mode\n");
return 0;
}

Expand Down Expand Up @@ -2941,6 +3031,7 @@ extern "C" {
true/*auto commit*/);
v_commit_during_rebalance(true/*with rebalance callback*/,
false/*manual commit*/);
x_incremental_rebalances();

return 0;
}
Expand Down
61 changes: 61 additions & 0 deletions tests/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -2696,6 +2696,67 @@ void test_consumer_wait_assignment (rd_kafka_t *rk, rd_bool_t do_poll) {
}


/**
* @brief Verify that the consumer's assignment matches the expected assignment.
*
* The va-list is a NULL-terminated list of (const char *topic, int partition)
* tuples.
*
* Fails the test on mismatch, unless \p fail_immediately is false.
*/
void test_consumer_verify_assignment0 (const char *func, int line,
rd_kafka_t *rk,
rd_bool_t fail_immediately, ...) {
va_list ap;
int cnt = 0;
const char *topic;
rd_kafka_topic_partition_list_t *assignment;
rd_kafka_resp_err_t err;
int i;

if ((err = rd_kafka_assignment(rk, &assignment)))
TEST_FAIL("%s:%d: Failed to get assignment for %s: %s",
func, line, rd_kafka_name(rk), rd_kafka_err2str(err));

TEST_SAY("%s assignment (%d partition(s)):\n", rd_kafka_name(rk),
assignment->cnt);
for (i = 0 ; i < assignment->cnt ; i++)
TEST_SAY(" %s [%"PRId32"]\n",
assignment->elems[i].topic,
assignment->elems[i].partition);

va_start(ap, fail_immediately);
while ((topic = va_arg(ap, const char *))) {
int partition = va_arg(ap, int);
cnt++;

if (!rd_kafka_topic_partition_list_find(assignment,
topic, partition))
TEST_FAIL_LATER(
"%s:%d: Expected %s [%d] not found in %s's "
"assignment (%d partition(s))",
func, line,
topic, partition, rd_kafka_name(rk),
assignment->cnt);
}
va_end(ap);

if (cnt != assignment->cnt)
TEST_FAIL_LATER(
"%s:%d: "
"Expected %d assigned partition(s) for %s, not %d",
func, line, cnt, rd_kafka_name(rk), assignment->cnt);

if (fail_immediately)
TEST_LATER_CHECK();

rd_kafka_topic_partition_list_destroy(assignment);
}





/**
* @brief Start subscribing for 'topic'
*/
Expand Down
7 changes: 7 additions & 0 deletions tests/test.h
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,13 @@ int test_consumer_poll (const char *what, rd_kafka_t *rk, uint64_t testid,
test_msgver_t *mv);

void test_consumer_wait_assignment (rd_kafka_t *rk, rd_bool_t do_poll);
void test_consumer_verify_assignment0 (const char *func, int line,
rd_kafka_t *rk,
rd_bool_t fail_immediately, ...);
#define test_consumer_verify_assignment(rk,fail_immediately,...) \
test_consumer_verify_assignment0(__FUNCTION__,__LINE__,rk, \
fail_immediately,__VA_ARGS__)

void test_consumer_assign (const char *what, rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *parts);
void test_consumer_incremental_assign (const char *what, rd_kafka_t *rk,
Expand Down

0 comments on commit fd3d4bc

Please sign in to comment.