From 668b5d9bae48ed31f4009b10d43873428eb579aa Mon Sep 17 00:00:00 2001 From: Milind L Date: Fri, 8 Sep 2023 12:30:46 +0530 Subject: [PATCH 1/3] Fix loop of OffsetForLeaderEpoch requests on quick leader changes Fixes #4425 --- src/rdkafka_topic.c | 8 +-- tests/0139-offset_validation_mock.c | 91 +++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 4 deletions(-) diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index 4341637bc0..6ec9fb5fce 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -681,9 +681,7 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, } } - if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) - need_epoch_validation = rd_true; - else if (leader_epoch > rktp->rktp_leader_epoch) { + if (leader_epoch > rktp->rktp_leader_epoch) { rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", "%s [%" PRId32 "]: leader %" PRId32 " epoch %" PRId32 " -> leader %" PRId32 @@ -693,7 +691,9 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, rktp->rktp_leader_epoch, leader_id, leader_epoch); rktp->rktp_leader_epoch = leader_epoch; need_epoch_validation = rd_true; - } + } else if (rktp->rktp_fetch_state == + RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) + need_epoch_validation = rd_true; fetching_from_follower = leader != NULL && rktp->rktp_broker != NULL && diff --git a/tests/0139-offset_validation_mock.c b/tests/0139-offset_validation_mock.c index d1619634b1..f81b9fb46d 100644 --- a/tests/0139-offset_validation_mock.c +++ b/tests/0139-offset_validation_mock.c @@ -212,6 +212,95 @@ static void do_test_ssl_error_retried(void) { } +/** + * @brief If there's an OffsetForLeaderEpoch request which fails, and the leader + * changes meanwhile, we end up in an infinite loop of OffsetForLeaderEpoch + * requests. + * Specifically: + * a. Leader Change - causes OffsetForLeaderEpoch + * request 'A'. + * b. Request 'A' fails with a retriable error, and we retry it. + * c. While waiting for Request 'A', the leader changes again, and we send a + * Request 'B', but the leader epoch is not updated correctly in this + * request, causing a loop. + * + * See #4425. + */ +static void do_test_two_leader_changes(void) { + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + const char *c1_groupid = topic; + rd_kafka_t *c1; + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + int msg_cnt = 5; + uint64_t testid = test_id_generate(); + rd_kafka_conf_t *conf; + + SUB_TEST_QUICK(); + + mcluster = test_mock_cluster_new(2, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 1, 2); + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + + /* Seed the topic with messages */ + test_produce_msgs_easy_v(topic, testid, 0, 0, msg_cnt, 10, + "bootstrap.servers", bootstraps, + "batch.num.messages", "1", NULL); + + test_conf_init(&conf, NULL, 60); + + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "debug", "protocol"); + + c1 = test_create_consumer(c1_groupid, NULL, conf, NULL); + test_consumer_subscribe(c1, topic); + + /* Consume initial messages and join the group, etc. */ + test_consumer_poll("MSG_INIT", c1, testid, 0, 0, msg_cnt, NULL); + + /* The leader will change from 1->2, and the OffsetForLeaderEpoch will + * be sent to broker 2. We need to first fail it with + * an error, and then give enough time to change the leader before + * returning a success. */ + rd_kafka_mock_broker_push_request_error_rtts( + mcluster, 2, RD_KAFKAP_OffsetForLeaderEpoch, 1, + RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, 900); + + rd_kafka_mock_broker_push_request_error_rtts( + mcluster, 2, RD_KAFKAP_OffsetForLeaderEpoch, 1, + RD_KAFKA_RESP_ERR_NO_ERROR, 1000); + + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); + rd_kafka_poll(c1, 1000); + /* Enough time to make a request, fail with a retriable error, and + * retry. */ + rd_sleep(1); + + /* Reset leader. */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + rd_kafka_poll(c1, 1000); + rd_sleep(1); + + /* There should be no infinite loop of OffsetForLeaderEpoch, and + * consequently, we should be able to consume these messages as a sign + * of success. */ + test_produce_msgs_easy_v(topic, testid, 0, 0, msg_cnt, 10, + "bootstrap.servers", bootstraps, + "batch.num.messages", "1", NULL); + + test_consumer_poll("MSG_INIT", c1, testid, 0, 0, msg_cnt, NULL); + + + rd_kafka_destroy(c1); + + test_mock_cluster_destroy(mcluster); + + TEST_LATER_CHECK(); + SUB_TEST_PASS(); +} + + int main_0139_offset_validation_mock(int argc, char **argv) { if (test_needs_auth()) { @@ -223,5 +312,7 @@ int main_0139_offset_validation_mock(int argc, char **argv) { do_test_ssl_error_retried(); + do_test_two_leader_changes(); + return 0; } From b40e9c3ead324bc11ee0300ab64f81532f3da3ee Mon Sep 17 00:00:00 2001 From: Milind L Date: Fri, 29 Sep 2023 09:58:12 +0530 Subject: [PATCH 2/3] Address review comments --- tests/0139-offset_validation_mock.c | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/tests/0139-offset_validation_mock.c b/tests/0139-offset_validation_mock.c index f81b9fb46d..2185eaded6 100644 --- a/tests/0139-offset_validation_mock.c +++ b/tests/0139-offset_validation_mock.c @@ -248,10 +248,8 @@ static void do_test_two_leader_changes(void) { "batch.num.messages", "1", NULL); test_conf_init(&conf, NULL, 60); - test_conf_set(conf, "bootstrap.servers", bootstraps); test_conf_set(conf, "auto.offset.reset", "earliest"); - test_conf_set(conf, "debug", "protocol"); c1 = test_create_consumer(c1_groupid, NULL, conf, NULL); test_consumer_subscribe(c1, topic); @@ -264,11 +262,8 @@ static void do_test_two_leader_changes(void) { * an error, and then give enough time to change the leader before * returning a success. */ rd_kafka_mock_broker_push_request_error_rtts( - mcluster, 2, RD_KAFKAP_OffsetForLeaderEpoch, 1, - RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, 900); - - rd_kafka_mock_broker_push_request_error_rtts( - mcluster, 2, RD_KAFKAP_OffsetForLeaderEpoch, 1, + mcluster, 2, RD_KAFKAP_OffsetForLeaderEpoch, 2, + RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, 900, RD_KAFKA_RESP_ERR_NO_ERROR, 1000); rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); @@ -293,7 +288,6 @@ static void do_test_two_leader_changes(void) { rd_kafka_destroy(c1); - test_mock_cluster_destroy(mcluster); TEST_LATER_CHECK(); From 23bab946eee25199279a9eae8a1ea8e9bc52948a Mon Sep 17 00:00:00 2001 From: Milind L Date: Fri, 29 Sep 2023 10:05:23 +0530 Subject: [PATCH 3/3] Add CHANGELOG.md entry --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8eb3797e8c..a6c6efa18a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ librdkafka v2.3.0 is a feature release: * Fix a segmentation fault when closing a consumer using the cooperative-sticky assignor before the first assignment (#4381). * Fix for insufficient buffer allocation when allocating rack information (@wolfchimneyrock, #4449). + * Fix for infinite loop of OffsetForLeaderEpoch requests on quick leader changes. (#4433). ## Fixes @@ -29,6 +30,12 @@ librdkafka v2.3.0 is a feature release: rack information on 32bit architectures. Solved by aligning all allocations to the maximum allowed word size (#4449). +### Consumer Fixes + + * If an OffsetForLeaderEpoch request was being retried, and the leader changed + while the retry was in-flight, an infinite loop of requests was triggered, + because we weren't updating the leader epoch correctly. + Fixed by updating the leader epoch before sending the request (#4433). # librdkafka v2.2.0