Skip to content

Commit

Permalink
Fix loop of OffsetForLeaderEpoch requests on quick leader changes (#4433
Browse files Browse the repository at this point in the history
)
  • Loading branch information
milindl authored Sep 29, 2023
1 parent 116b6cf commit 788cd0c
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 4 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ librdkafka v2.3.0 is a feature release:
* Fix a segmentation fault when closing a consumer using the
cooperative-sticky assignor before the first assignment (#4381).
* Fix for insufficient buffer allocation when allocating rack information (@wolfchimneyrock, #4449).
* Fix for infinite loop of OffsetForLeaderEpoch requests on quick leader changes. (#4433).
* Fix to add leader epoch to control messages, to make sure they're stored
for committing even without a subsequent fetch message (#4434).
* Fix for stored offsets not being committed if they lacked the leader epoch (#4442).
Expand All @@ -42,6 +43,13 @@ librdkafka v2.3.0 is a feature release:
than committed one, if either stored or committed leader epoch is -1 (#4442).


### Consumer Fixes

* If an OffsetForLeaderEpoch request was being retried, and the leader changed
while the retry was in-flight, an infinite loop of requests was triggered,
because we weren't updating the leader epoch correctly.
Fixed by updating the leader epoch before sending the request (#4433).


# librdkafka v2.2.0

Expand Down
8 changes: 4 additions & 4 deletions src/rdkafka_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -682,9 +682,7 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt,
}
}

if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT)
need_epoch_validation = rd_true;
else if (leader_epoch > rktp->rktp_leader_epoch) {
if (leader_epoch > rktp->rktp_leader_epoch) {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER",
"%s [%" PRId32 "]: leader %" PRId32
" epoch %" PRId32 " -> leader %" PRId32
Expand All @@ -694,7 +692,9 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt,
rktp->rktp_leader_epoch, leader_id, leader_epoch);
rktp->rktp_leader_epoch = leader_epoch;
need_epoch_validation = rd_true;
}
} else if (rktp->rktp_fetch_state ==
RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT)
need_epoch_validation = rd_true;

fetching_from_follower =
leader != NULL && rktp->rktp_broker != NULL &&
Expand Down
85 changes: 85 additions & 0 deletions tests/0139-offset_validation_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,89 @@ static void do_test_ssl_error_retried(void) {
}


/**
* @brief If there's an OffsetForLeaderEpoch request which fails, and the leader
* changes meanwhile, we end up in an infinite loop of OffsetForLeaderEpoch
* requests.
* Specifically:
* a. Leader Change - causes OffsetForLeaderEpoch
* request 'A'.
* b. Request 'A' fails with a retriable error, and we retry it.
* c. While waiting for Request 'A', the leader changes again, and we send a
* Request 'B', but the leader epoch is not updated correctly in this
* request, causing a loop.
*
* See #4425.
*/
static void do_test_two_leader_changes(void) {
const char *topic = test_mk_topic_name(__FUNCTION__, 1);
const char *c1_groupid = topic;
rd_kafka_t *c1;
const char *bootstraps;
rd_kafka_mock_cluster_t *mcluster;
int msg_cnt = 5;
uint64_t testid = test_id_generate();
rd_kafka_conf_t *conf;

SUB_TEST_QUICK();

mcluster = test_mock_cluster_new(2, &bootstraps);
rd_kafka_mock_topic_create(mcluster, topic, 1, 2);
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);

/* Seed the topic with messages */
test_produce_msgs_easy_v(topic, testid, 0, 0, msg_cnt, 10,
"bootstrap.servers", bootstraps,
"batch.num.messages", "1", NULL);

test_conf_init(&conf, NULL, 60);
test_conf_set(conf, "bootstrap.servers", bootstraps);
test_conf_set(conf, "auto.offset.reset", "earliest");

c1 = test_create_consumer(c1_groupid, NULL, conf, NULL);
test_consumer_subscribe(c1, topic);

/* Consume initial messages and join the group, etc. */
test_consumer_poll("MSG_INIT", c1, testid, 0, 0, msg_cnt, NULL);

/* The leader will change from 1->2, and the OffsetForLeaderEpoch will
* be sent to broker 2. We need to first fail it with
* an error, and then give enough time to change the leader before
* returning a success. */
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, 2, RD_KAFKAP_OffsetForLeaderEpoch, 2,
RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, 900,
RD_KAFKA_RESP_ERR_NO_ERROR, 1000);

rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2);
rd_kafka_poll(c1, 1000);
/* Enough time to make a request, fail with a retriable error, and
* retry. */
rd_sleep(1);

/* Reset leader. */
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
rd_kafka_poll(c1, 1000);
rd_sleep(1);

/* There should be no infinite loop of OffsetForLeaderEpoch, and
* consequently, we should be able to consume these messages as a sign
* of success. */
test_produce_msgs_easy_v(topic, testid, 0, 0, msg_cnt, 10,
"bootstrap.servers", bootstraps,
"batch.num.messages", "1", NULL);

test_consumer_poll("MSG_INIT", c1, testid, 0, 0, msg_cnt, NULL);


rd_kafka_destroy(c1);

test_mock_cluster_destroy(mcluster);

TEST_LATER_CHECK();
SUB_TEST_PASS();
}

/**
* @brief Storing an offset without leader epoch should still be allowed
* and the greater than check should apply only to the offset.
Expand Down Expand Up @@ -353,6 +436,8 @@ int main_0139_offset_validation_mock(int argc, char **argv) {

do_test_ssl_error_retried();

do_test_two_leader_changes();

do_test_store_offset_without_leader_epoch();

return 0;
Expand Down

0 comments on commit 788cd0c

Please sign in to comment.