Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KIP-951] Review comments for the tests #4767

Merged
merged 5 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -2094,7 +2094,7 @@ rd_kafka_metadata_update_op(rd_kafka_t *rk, rd_kafka_metadata_internal_t *mdi) {

rd_kafka_dbg(rk, METADATA, "METADATAUPDATE",
"Partition %s(%s)[%" PRId32
"]: "
"]:"
" updated with leader %" PRId32
" and epoch %" PRId32,
topic, rd_kafka_Uuid_base64str(&topic_id),
Expand Down
217 changes: 203 additions & 14 deletions src/rdkafka_mock_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,58 @@



void rd_kafka_mock_Produce_reply_tags_partition_write(
rd_kafka_buf_t *rkbuf,
int tagtype,
rd_kafka_mock_partition_t *mpart) {
switch (tagtype) {
case 0: /* CurrentLeader */
/* Leader id */
rd_kafka_buf_write_i32(rkbuf, mpart->leader->id);
/* Leader epoch */
rd_kafka_buf_write_i32(rkbuf, mpart->leader_epoch);
/* Field tags */
rd_kafka_buf_write_tags_empty(rkbuf);
break;
default:
break;
}
}

void rd_kafka_mock_Produce_reply_tags_write(
rd_kafka_buf_t *rkbuf,
int tagtype,
rd_kafka_mock_broker_t **changed_leaders,
int changed_leader_cnt) {
int i;
switch (tagtype) {
case 0: /* NodeEndpoints */
/* #NodeEndpoints */
rd_kafka_buf_write_arraycnt(rkbuf, changed_leader_cnt);
for (i = 0; i < changed_leader_cnt; i++) {
rd_kafka_mock_broker_t *changed_leader =
changed_leaders[i];
/* Leader id */
rd_kafka_buf_write_i32(rkbuf, changed_leader->id);
/* Leader Hostname */
rd_kafka_buf_write_str(
rkbuf, changed_leader->advertised_listener, -1);

/* Leader Port number */
rd_kafka_buf_write_i32(rkbuf,
(int32_t)changed_leader->port);

/* Leader Rack */
rd_kafka_buf_write_str(rkbuf, changed_leader->rack, -1);

/* Field tags */
rd_kafka_buf_write_tags_empty(rkbuf);
}
default:
break;
}
}

/**
* @brief Handle ProduceRequest
*/
Expand All @@ -55,6 +107,12 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn,
int16_t Acks;
int32_t TimeoutMs;
rd_kafka_resp_err_t all_err;
int32_t tags_to_write[1] = {0};
size_t tags_to_write_cnt = 0;
int changed_leaders_cnt = 0;
rd_kafka_mock_broker_t **changed_leaders =
rd_calloc(mcluster->broker_cnt, sizeof(*changed_leaders));


if (rkbuf->rkbuf_reqhdr.ApiVersion >= 3)
rd_kafka_buf_read_str(rkbuf, &TransactionalId);
Expand All @@ -78,7 +136,6 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn,
rd_kafka_buf_read_str(rkbuf, &Topic);
rd_kafka_buf_read_arraycnt(rkbuf, &PartitionCnt,
RD_KAFKAP_PARTITIONS_MAX);

mtopic = rd_kafka_mock_topic_find_by_kstr(mcluster, &Topic);

/* Response: Topic */
Expand All @@ -92,6 +149,8 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn,
rd_kafkap_bytes_t records;
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
int64_t BaseOffset = -1;
int32_t partition_tags_to_write[1] = {0};
size_t partition_tags_to_write_cnt = 0;

rd_kafka_buf_read_i32(rkbuf, &Partition);

Expand All @@ -100,10 +159,8 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn,
Partition);

rd_kafka_buf_read_kbytes(rkbuf, &records);

/* Partition Tags */
rd_kafka_buf_skip_tags(rkbuf);

/* Response: Partition */
rd_kafka_buf_write_i32(resp, Partition);

Expand Down Expand Up @@ -161,8 +218,38 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn,
/* Response: ErrorMessage */
rd_kafka_buf_write_str(resp, NULL, 0);
}

if (rkbuf->rkbuf_reqhdr.ApiVersion >= 10 &&
err == RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION) {
int changed_leader_idx;
/* See if this leader is already included */
for (changed_leader_idx = 0;
changed_leader_idx < changed_leaders_cnt;
changed_leader_idx++) {
if (changed_leaders[changed_leader_idx]
->id == mpart->leader->id)
break;
}
if (changed_leader_idx == changed_leaders_cnt) {
/* Add the new leader that wasn't
* present */
changed_leaders[changed_leaders_cnt] =
mpart->leader;
changed_leaders_cnt++;
}

partition_tags_to_write
[partition_tags_to_write_cnt] =
0 /* CurrentLeader */;
partition_tags_to_write_cnt++;
}

/* Response: Partition tags */
rd_kafka_buf_write_tags_empty(resp);
rd_kafka_buf_write_tags(
resp,
rd_kafka_mock_Produce_reply_tags_partition_write,
partition_tags_to_write,
partition_tags_to_write_cnt, mpart);
}

/* Topic tags */
Expand All @@ -177,17 +264,76 @@ static int rd_kafka_mock_handle_Produce(rd_kafka_mock_connection_t *mconn,
}

/* Response: Top level tags */
rd_kafka_buf_write_tags_empty(resp);
if (changed_leaders_cnt) {
tags_to_write[tags_to_write_cnt] = 0 /* NodeEndpoints */;
tags_to_write_cnt++;
}

rd_kafka_mock_connection_send_response0(mconn, resp, rd_true);
rd_kafka_buf_write_tags(resp, rd_kafka_mock_Produce_reply_tags_write,
tags_to_write, tags_to_write_cnt,
changed_leaders, changed_leaders_cnt);

rd_kafka_mock_connection_send_response0(mconn, resp, rd_true);
rd_free(changed_leaders);
return 0;

err_parse:
rd_free(changed_leaders);
rd_kafka_buf_destroy(resp);
return -1;
}

void rd_kafka_mock_Fetch_reply_tags_partition_write(
rd_kafka_buf_t *rkbuf,
int tagtype,
rd_kafka_mock_partition_t *mpart) {
switch (tagtype) {
case 1: /* CurrentLeader */
/* Leader id */
rd_kafka_buf_write_i32(rkbuf, mpart->leader->id);
/* Leader epoch */
rd_kafka_buf_write_i32(rkbuf, mpart->leader_epoch);
/* Field tags */
rd_kafka_buf_write_tags_empty(rkbuf);
break;
default:
break;
}
}

void rd_kafka_mock_Fetch_reply_tags_write(
rd_kafka_buf_t *rkbuf,
int tagtype,
rd_kafka_mock_broker_t **changed_leaders,
int changed_leader_cnt) {
int i;
switch (tagtype) {
case 0: /* NodeEndpoints */
/* #NodeEndpoints */
rd_kafka_buf_write_arraycnt(rkbuf, changed_leader_cnt);
for (i = 0; i < changed_leader_cnt; i++) {
rd_kafka_mock_broker_t *changed_leader =
changed_leaders[i];
/* Leader id */
rd_kafka_buf_write_i32(rkbuf, changed_leader->id);
/* Leader Hostname */
rd_kafka_buf_write_str(
rkbuf, changed_leader->advertised_listener, -1);

/* Leader Port number */
rd_kafka_buf_write_i32(rkbuf,
(int32_t)changed_leader->port);

/* Leader Rack */
rd_kafka_buf_write_str(rkbuf, changed_leader->rack, -1);

/* Field tags */
rd_kafka_buf_write_tags_empty(rkbuf);
}
default:
break;
}
}


/**
Expand All @@ -204,6 +350,13 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn,
int8_t IsolationLevel;
size_t totsize = 0;

int32_t tags_to_write[1] = {0};
uint64_t tags_to_write_cnt = 0;

int changed_leaders_cnt = 0;
rd_kafka_mock_broker_t **changed_leaders =
rd_calloc(mcluster->broker_cnt, sizeof(*changed_leaders));

if (rkbuf->rkbuf_reqhdr.ApiVersion <= 14) {
rd_kafka_buf_read_i32(rkbuf, &ReplicaId);
}
Expand Down Expand Up @@ -281,8 +434,10 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn,
rd_kafka_mock_partition_t *mpart = NULL;
rd_kafka_resp_err_t err = all_err;
rd_bool_t on_follower;
size_t partsize = 0;
const rd_kafka_mock_msgset_t *mset = NULL;
size_t partsize = 0;
const rd_kafka_mock_msgset_t *mset = NULL;
int32_t partition_tags_to_write[1] = {0};
uint64_t partition_tags_to_write_cnt = 0;

rd_kafka_buf_read_i32(rkbuf, &Partition);

Expand Down Expand Up @@ -422,14 +577,39 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn,
rd_kafka_buf_write_arraycnt(resp, 0);
}

if (rkbuf->rkbuf_reqhdr.ApiVersion >= 12 &&
err == RD_KAFKA_RESP_ERR_NOT_LEADER_OR_FOLLOWER) {
int changed_leader_idx;
for (changed_leader_idx = 0;
changed_leader_idx < changed_leaders_cnt;
changed_leader_idx++) {
if (changed_leaders[changed_leader_idx]
->id == mpart->leader->id)
break;
}
if (changed_leader_idx == changed_leaders_cnt) {
changed_leaders[changed_leaders_cnt] =
mpart->leader;
changed_leaders_cnt++;
}
/* CurrentLeader */
partition_tags_to_write
[partition_tags_to_write_cnt] = 1;
partition_tags_to_write_cnt++;
}

/* Response: Partition tags */
rd_kafka_buf_write_tags_empty(resp);
rd_kafka_buf_write_tags(
resp,
rd_kafka_mock_Fetch_reply_tags_partition_write,
partition_tags_to_write,
partition_tags_to_write_cnt, mpart);
}

/* Response: Topic tags */
rd_kafka_buf_write_tags_empty(resp);
/* Topic tags */
rd_kafka_buf_skip_tags(rkbuf);
/* Response: Topic tags */
rd_kafka_buf_write_tags_empty(resp);
}

if (rkbuf->rkbuf_reqhdr.ApiVersion >= 7) {
Expand Down Expand Up @@ -466,8 +646,15 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn,
/* Matt might do something sensible with this */
}

if (rkbuf->rkbuf_reqhdr.ApiVersion >= 16 && changed_leaders_cnt) {
tags_to_write[tags_to_write_cnt] = 0 /* NodeEndpoints */;
tags_to_write_cnt++;
}

/* Response: Top level tags */
rd_kafka_buf_write_tags_empty(resp);
rd_kafka_buf_write_tags(resp, rd_kafka_mock_Fetch_reply_tags_write,
tags_to_write, tags_to_write_cnt,
changed_leaders, changed_leaders_cnt);

/* If there was no data, delay up to MaxWait.
* This isn't strictly correct since we should cut the wait short
Expand All @@ -478,10 +665,12 @@ static int rd_kafka_mock_handle_Fetch(rd_kafka_mock_connection_t *mconn,
resp->rkbuf_ts_retry = rd_clock() + (MaxWait * 1000);

rd_kafka_mock_connection_send_response0(mconn, resp, rd_true);
rd_free(changed_leaders);
return 0;

err_parse:
rd_kafka_buf_destroy(resp);
rd_free(changed_leaders);
return -1;
}

Expand Down Expand Up @@ -2306,8 +2495,8 @@ rd_kafka_mock_handle_OffsetForLeaderEpoch(rd_kafka_mock_connection_t *mconn,
const struct rd_kafka_mock_api_handler
rd_kafka_mock_api_handlers[RD_KAFKAP__NUM] = {
/* [request-type] = { MinVersion, MaxVersion, FlexVersion, callback } */
[RD_KAFKAP_Produce] = {0, 9, 9, rd_kafka_mock_handle_Produce},
[RD_KAFKAP_Fetch] = {0, 15, 12, rd_kafka_mock_handle_Fetch},
[RD_KAFKAP_Produce] = {0, 10, 9, rd_kafka_mock_handle_Produce},
[RD_KAFKAP_Fetch] = {0, 16, 12, rd_kafka_mock_handle_Fetch},
[RD_KAFKAP_ListOffsets] = {0, 7, 6, rd_kafka_mock_handle_ListOffsets},
[RD_KAFKAP_OffsetFetch] = {0, 6, 6, rd_kafka_mock_handle_OffsetFetch},
[RD_KAFKAP_OffsetCommit] = {0, 9, 8, rd_kafka_mock_handle_OffsetCommit},
Expand Down
2 changes: 0 additions & 2 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -3675,10 +3675,8 @@ rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb,
if (request->rkbuf_reqhdr.ApiVersion >= 10) {
rd_kafkap_Produce_reply_tags_Topic_t *TopicTags =
&ProduceTags.Topic;
;
rd_kafkap_Produce_reply_tags_Partition_t *PartitionTags =
&TopicTags->Partition;
;

/* Partition tags count */
TopicTags->TopicName = RD_KAFKAP_STR_DUP(&TopicName);
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -1375,7 +1375,7 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
rd_kafka_toppar_get(rkt, mdt->partitions[j].id, 0);

rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA",
" Topic %s partition %i Leader %" PRId32
"Topic %s [%" PRId32 "] Leader %" PRId32
" Epoch %" PRId32,
rkt->rkt_topic->str, mdt->partitions[j].id,
mdt->partitions[j].leader, leader_epoch);
Expand Down
Loading