Skip to content

Commit

Permalink
Failing test
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab committed Sep 21, 2023
1 parent 49f180a commit 2a67470
Showing 1 changed file with 132 additions and 0 deletions.
132 changes: 132 additions & 0 deletions tests/0139-offset_validation_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,136 @@ static void do_test_ssl_error_retried(void) {
}


/**
* @brief Storing an offset without leader epoch should still be allowed
* and the greater than check should apply only to the offset.
* See #4384.
*/
static void do_test_store_offset_without_leader_epoch(void) {
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_conf_t *conf;
const char *bootstraps;
const char *topic = test_mk_topic_name(__FUNCTION__, 1);
const char *c1_groupid = topic;
rd_kafka_t *c1;
rd_kafka_topic_t *rdk_topic;
uint64_t testid = test_id_generate();
rd_kafka_topic_partition_list_t *rktpars;
rd_kafka_topic_partition_t *rktpar;
int32_t leader_epoch;

SUB_TEST_QUICK();

mcluster = test_mock_cluster_new(3, &bootstraps);
rd_kafka_mock_topic_create(mcluster, topic, 1, 1);

test_conf_init(&conf, NULL, 60);
test_conf_set(conf, "bootstrap.servers", bootstraps);
test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000");
test_conf_set(conf, "auto.offset.reset", "earliest");
test_conf_set(conf, "enable.auto.commit", "false");
test_conf_set(conf, "enable.auto.offset.store", "false");
test_conf_set(conf, "enable.partition.eof", "true");

c1 = test_create_consumer(c1_groupid, NULL, conf, NULL);
test_consumer_subscribe(c1, topic);

/* Leader epoch becomes 1. */
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2);

/* Read EOF. */
test_consumer_poll("MSG_ALL", c1, testid, 1, 0, 0, NULL);

TEST_SAY(
"Storing offset without leader epoch with rd_kafka_offset_store");
rdk_topic = rd_kafka_topic_new(c1, topic, NULL);
/* Legacy function stores offset + 1 */
rd_kafka_offset_store(rdk_topic, 0, 1);
rd_kafka_topic_destroy(rdk_topic);

rd_kafka_commit(c1, NULL, rd_false);

rktpars = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(rktpars, topic, 0);
rd_kafka_committed(c1, rktpars, -1);

TEST_ASSERT(rktpars->elems[0].offset == 2, "expected %d, got %" PRId64,
2, rktpars->elems[0].offset);
leader_epoch =
rd_kafka_topic_partition_get_leader_epoch(&rktpars->elems[0]);

/* OffsetFetch returns the leader epoch even if not set. */
TEST_ASSERT(leader_epoch == 1, "expected %d, got %" PRId32, 1,
leader_epoch);
rd_kafka_topic_partition_list_destroy(rktpars);

TEST_SAY(
"Storing offset without leader epoch with rd_kafka_offsets_store");
rktpars = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(rktpars, topic, 0)->offset = 5;
rd_kafka_offsets_store(c1, rktpars);
rd_kafka_topic_partition_list_destroy(rktpars);

rd_kafka_commit(c1, NULL, rd_false);

rktpars = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(rktpars, topic, 0);
rd_kafka_committed(c1, rktpars, -1);

TEST_ASSERT(rktpars->elems[0].offset == 5, "expected %d, got %" PRId64,
5, rktpars->elems[0].offset);
leader_epoch =
rd_kafka_topic_partition_get_leader_epoch(&rktpars->elems[0]);
/* OffsetFetch returns the leader epoch even if not set. */
TEST_ASSERT(leader_epoch == 1, "expected %d, got %" PRId32, 1,
leader_epoch);
rd_kafka_topic_partition_list_destroy(rktpars);

TEST_SAY(
"While storing offset with leader epoch it should check that value "
"first");
/* Setting it to (6,1), as last one has epoch -1. */
rktpars = rd_kafka_topic_partition_list_new(1);
rktpar = rd_kafka_topic_partition_list_add(rktpars, topic, 0);
rktpar->offset = 6;
rd_kafka_topic_partition_set_leader_epoch(rktpar, 1);
rd_kafka_offsets_store(c1, rktpars);
rd_kafka_topic_partition_list_destroy(rktpars);

rd_kafka_commit(c1, NULL, rd_false);

/* Trying to store (7,0), it should skip the commit. */
rktpars = rd_kafka_topic_partition_list_new(1);
rktpar = rd_kafka_topic_partition_list_add(rktpars, topic, 0);
rktpar->offset = 7;
rd_kafka_topic_partition_set_leader_epoch(rktpar, 0);
rd_kafka_offsets_store(c1, rktpars);
rd_kafka_topic_partition_list_destroy(rktpars);

rd_kafka_commit(c1, NULL, rd_false);

/* Committed offset is (6,1). */
rktpars = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(rktpars, topic, 0);
rd_kafka_committed(c1, rktpars, -1);

TEST_ASSERT(rktpars->elems[0].offset == 6, "expected %d, got %" PRId64,
6, rktpars->elems[0].offset);
leader_epoch =
rd_kafka_topic_partition_get_leader_epoch(&rktpars->elems[0]);
TEST_ASSERT(leader_epoch == 1, "expected %d, got %" PRId32, 1,
leader_epoch);
rd_kafka_topic_partition_list_destroy(rktpars);

rd_kafka_destroy(c1);

test_mock_cluster_destroy(mcluster);

TEST_LATER_CHECK();
SUB_TEST_PASS();
}


int main_0139_offset_validation_mock(int argc, char **argv) {

if (test_needs_auth()) {
Expand All @@ -223,5 +353,7 @@ int main_0139_offset_validation_mock(int argc, char **argv) {

do_test_ssl_error_retried();

do_test_store_offset_without_leader_epoch();

return 0;
}

0 comments on commit 2a67470

Please sign in to comment.