Skip to content

Commit

Permalink
Use topic id where it's needed.
Browse files Browse the repository at this point in the history
Needs to be updated after #4300
  • Loading branch information
emasab committed Aug 3, 2023
1 parent 51bb202 commit a0afd7f
Show file tree
Hide file tree
Showing 13 changed files with 484 additions and 54 deletions.
21 changes: 21 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,27 @@ RD_EXPORT
int32_t rd_kafka_topic_partition_get_leader_epoch(
const rd_kafka_topic_partition_t *rktpar);

/**
* @brief TODO: write
*
* @param rktpar
* @param topic_id
* @return
*/
RD_EXPORT
void rd_kafka_topic_partition_set_topic_id(rd_kafka_topic_partition_t *rktpar,
int64_t topic_id);

/**
* @brief TODO: write
*
* @param rktpar
* @return
*/
RD_EXPORT
int64_t
rd_kafka_topic_partition_get_topic_id(const rd_kafka_topic_partition_t *rktpar);

/**
* @brief A growable list of Topic+Partitions.
*
Expand Down
3 changes: 2 additions & 1 deletion src/rdkafka_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
rd_kafka_buf_write_topic_partitions(
rkbuf, owned_partitions,
rd_false /*don't skip invalid offsets*/,
rd_false /*any offset*/, fields);
rd_false /*any offset*/, rd_false /* use_topic name */,
fields);
}

/* Following data is ignored by consumer version < 2 */
Expand Down
3 changes: 1 addition & 2 deletions src/rdkafka_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -878,8 +878,7 @@ static void rd_kafka_mock_connection_close(rd_kafka_mock_connection_t *mconn,
rd_sockaddr2str(&mconn->peer, RD_SOCKADDR2STR_F_PORT),
reason);

rd_kafka_mock_cgrps_generic_connection_closed(mconn->broker->cluster,
mconn);
rd_kafka_mock_cgrps_connection_closed(mconn->broker->cluster, mconn);

rd_kafka_timer_stop(&mconn->broker->cluster->timers, &mconn->write_tmr,
rd_true);
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ rd_kafka_mock_set_apiversion(rd_kafka_mock_cluster_t *mcluster,
* @param rktparlist Partitions to assign to the member.
*/
void rd_kafka_mock_cgrp_consumer_assignment(
const rd_kafka_mock_cluster_t *mcluster,
rd_kafka_mock_cluster_t *mcluster,
const char *group_id,
const char *member_id,
const rd_kafka_topic_partition_list_t *rktparlist);
Expand Down
257 changes: 237 additions & 20 deletions src/rdkafka_mock_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ rd_kafka_mock_cgrp_generic_get(rd_kafka_mock_cluster_t *mcluster,


/**
* @brief A client connection closed, check if any cgrp has any state
* @brief A client connection closed, check if any generic cgrp has any state
* for this connection that needs to be cleared.
*/
void rd_kafka_mock_cgrps_generic_connection_closed(
Expand All @@ -722,10 +722,187 @@ void rd_kafka_mock_cgrps_generic_connection_closed(
}
}

// static void rd_kafka_mock_cgrp_consumer_member_assignment(
// rd_kafka_mock_cgrp_consumer_t *mcgrp,
// rd_kafka_mock_cgrp_consumer_member_t *member) {
// }
/**
* @brief TODO: write
*
* @param mcgrp
* @param rktparlist
*/
static void rd_kafka_mock_cgrp_consumer_member_assignment_set(
rd_kafka_mock_cgrp_consumer_t *cgrp,
rd_kafka_mock_cgrp_consumer_member_t *member,
const rd_kafka_topic_partition_list_t *rktparlist) {
rd_kafka_topic_partition_t *rktpar;
if (member->target_assignment) {
rd_kafka_topic_partition_list_destroy(
member->target_assignment);
}
member->target_member_epoch++;
member->target_assignment =
rd_kafka_topic_partition_list_copy(rktparlist);

/* If not present, fill topic ids using names */
RD_KAFKA_TPLIST_FOREACH(rktpar, member->target_assignment) {
int64_t topic_id =
rd_kafka_topic_partition_get_topic_id(rktpar);
if (!topic_id) {
rd_kafka_mock_topic_t *mtopic =
rd_kafka_mock_topic_find(cgrp->cluster,
rktpar->topic);
if (mtopic)
rd_kafka_topic_partition_set_topic_id(
rktpar, mtopic->id);
}
}
}

/**
* @brief TODO: write
*
* @param member
* @param current_assignment
*/
static void rd_kafka_mock_cgrp_consumer_member_current_assignment_set(
rd_kafka_mock_cgrp_consumer_member_t *member,
const rd_kafka_topic_partition_list_t *current_assignment) {
if (member->current_assignment) {
rd_kafka_topic_partition_list_destroy(
member->current_assignment);
}

member->current_assignment =
current_assignment
? rd_kafka_topic_partition_list_copy(current_assignment)
: NULL;
}

/**
* @brief TODO: write
*
* @param member
* @param returned_assignment
*/
static void rd_kafka_mock_cgrp_consumer_member_returned_assignment_set(
rd_kafka_mock_cgrp_consumer_member_t *member,
const rd_kafka_topic_partition_list_t *returned_assignment) {
if (member->returned_assignment) {
rd_kafka_topic_partition_list_destroy(
member->returned_assignment);
}
member->returned_assignment =
returned_assignment
? rd_kafka_topic_partition_list_copy(returned_assignment)
: NULL;
}

static rd_kafka_topic_partition_list_t *
rd_kafka_mock_cgrp_consumer_member_assignment_filter(
rd_kafka_topic_partition_list_t *assignment) {
rd_kafka_topic_partition_t *rktpar;
rd_kafka_topic_partition_list_t *ret =
rd_kafka_topic_partition_list_new(assignment->cnt);

RD_KAFKA_TPLIST_FOREACH(rktpar, assignment) {
int64_t topic_id =
rd_kafka_topic_partition_get_topic_id(rktpar);
if (topic_id) {
rd_kafka_topic_partition_list_add_copy(ret, rktpar);
}
}

return ret;
}

/**
* @brief Calculates next assignment and member epoch for a \p member,
* given \p current_assignment.
*
* @param member The group member.
* @param current_assignment The assignment sent by the member, or NULL if it
* didn't change. Must be NULL if *member_epoch is 0.
* @param member_epoch Pointer to reported member epoch. Can be updated.
*
* @return The new assignment to return to the member.
*/
rd_kafka_topic_partition_list_t *
rd_kafka_mock_cgrp_consumer_member_next_assignment(
rd_kafka_mock_cgrp_consumer_member_t *member,
rd_kafka_topic_partition_list_t *current_assignment,
int *member_epoch) {
rd_kafka_topic_partition_t *rktpar;
rd_kafka_topic_partition_list_t *returned_assignment = NULL;

if (current_assignment) {
/* Update current assignment to reflect what is provided
* by the client. */
rd_kafka_mock_cgrp_consumer_member_current_assignment_set(
member, current_assignment);
}

if (*member_epoch != member->current_member_epoch ||
member->current_member_epoch != member->target_member_epoch) {
/* Epochs are different, that means we have to bump the epoch
* immediately or do some revocations before that. */

rd_kafka_topic_partition_list_t *intersection =
rd_kafka_topic_partition_list_new(
member->target_assignment->cnt);
RD_KAFKA_TPLIST_FOREACH(rktpar, member->current_assignment) {
int64_t topic_id =
rd_kafka_topic_partition_get_topic_id(rktpar);
if (rd_kafka_topic_partition_list_find_by_id_idx(
member->target_assignment, topic_id,
rktpar->partition) >= 0) {
rd_kafka_topic_partition_list_add_copy(
intersection, rktpar);
}
}

if (intersection->cnt == member->current_assignment->cnt) {
/* No partitions to remove, return target assignment
* and reconcile the epochs */
member->current_member_epoch =
member->target_member_epoch;
returned_assignment =
rd_kafka_mock_cgrp_consumer_member_assignment_filter(
member->target_assignment);
rd_kafka_topic_partition_list_destroy(intersection);
} else {
/* Some partitions to remove, return intersection and
* leave the same epoch. */
returned_assignment = intersection;
}
} else if (!member->returned_assignment) {
/* If all the epochs are the same, the only case
* where we have to return the assignment is
* after a disconnection, when returned_assignment has been
* reset to NULL. */
returned_assignment =
rd_kafka_mock_cgrp_consumer_member_assignment_filter(
member->target_assignment);
}

*member_epoch = member->current_member_epoch;
if (returned_assignment) {
/* Compare returned_assignment with last returned_assignment.
* If equal return NULL, otherwise return returned_assignment
* and update last returned_assignment. */
rd_bool_t same_returned_assignment =
member->returned_assignment &&
!rd_kafka_topic_partition_list_cmp(
member->returned_assignment, returned_assignment,
rd_kafka_topic_partition_by_id_cmp);
if (same_returned_assignment) {
rd_kafka_topic_partition_list_destroy(
returned_assignment);
returned_assignment = NULL;
} else {
rd_kafka_mock_cgrp_consumer_member_returned_assignment_set(
member, returned_assignment);
}
}
return returned_assignment;
}

/**
* @brief Mark member as active (restart session timer)
Expand Down Expand Up @@ -753,6 +930,7 @@ rd_kafka_mock_cgrp_consumer_member_t *rd_kafka_mock_cgrp_consumer_member_find(

rd_kafka_mock_cgrp_consumer_member_t *
rd_kafka_mock_cgrp_consumer_member_add(rd_kafka_mock_cgrp_consumer_t *mcgrp,
struct rd_kafka_mock_connection_s *conn,
rd_kafka_buf_t *resp,
const rd_kafkap_str_t *MemberId,
const rd_kafkap_str_t *InstanceId,
Expand Down Expand Up @@ -782,6 +960,8 @@ rd_kafka_mock_cgrp_consumer_member_add(rd_kafka_mock_cgrp_consumer_t *mcgrp,

mcgrp->session_timeout_ms = session_timeout_ms;

mcgrp->conn = conn;

rd_kafka_mock_cgrp_consumer_member_active(mcgrp, member);

return member;
Expand All @@ -799,6 +979,13 @@ static void rd_kafka_mock_cgrp_consumer_member_destroy(
if (member->instance_id)
rd_free(member->instance_id);

RD_IF_FREE(member->target_assignment,
rd_kafka_topic_partition_list_destroy);
RD_IF_FREE(member->current_assignment,
rd_kafka_topic_partition_list_destroy);
RD_IF_FREE(member->returned_assignment,
rd_kafka_topic_partition_list_destroy);

rd_free(member);
}

Expand All @@ -815,9 +1002,6 @@ rd_kafka_resp_err_t rd_kafka_mock_cgrp_consumer_member_leave(

rd_kafka_mock_cgrp_consumer_member_destroy(mcgrp, member);

// rd_kafka_mock_cgrp_consumer_rebalance(mcgrp, "explicit member
// leave");

return RD_KAFKA_RESP_ERR_NO_ERROR;
}

Expand Down Expand Up @@ -857,10 +1041,6 @@ static void rd_kafka_mock_cgrp_consumer_session_tmr_cb(rd_kafka_timers_t *rkts,
rd_kafka_mock_cgrp_consumer_member_destroy(mcgrp, member);
timeout_cnt++;
}

// if (timeout_cnt)
// rd_kafka_mock_cgrp_generic_rebalance(mcgrp, "member
// timeout");
}


Expand Down Expand Up @@ -891,31 +1071,58 @@ rd_kafka_mock_cgrp_consumer_get(rd_kafka_mock_cluster_t *mcluster,


void rd_kafka_mock_cgrp_consumer_assignment(
const rd_kafka_mock_cluster_t *mcluster,
rd_kafka_mock_cluster_t *mcluster,
const char *group_id,
const char *member_id,
const rd_kafka_topic_partition_list_t *rktparlist) {
rd_kafka_mock_cgrp_consumer_member_t *cgrp_member;
rd_kafka_mock_cgrp_consumer_t *cgrp;
rd_kafka_mock_cgrp_consumer_member_t *member;
rd_kafkap_str_t *group_id_str =
rd_kafkap_str_new(group_id, strlen(group_id));
rd_kafkap_str_t *member_id_str =
rd_kafkap_str_new(member_id, strlen(member_id));
rd_kafka_mock_cgrp_consumer_t *cgrp =
rd_kafka_mock_cgrp_consumer_find(mcluster, group_id_str);

mtx_lock(&mcluster->lock);

cgrp = rd_kafka_mock_cgrp_consumer_find(mcluster, group_id_str);
if (!cgrp)
goto destroy;

cgrp_member =
rd_kafka_mock_cgrp_consumer_member_find(cgrp, member_id_str);
member = rd_kafka_mock_cgrp_consumer_member_find(cgrp, member_id_str);

if (!cgrp_member)
if (!member)
goto destroy;


rd_kafka_mock_cgrp_consumer_member_assignment_set(cgrp, member,
rktparlist);

destroy:
rd_kafkap_str_destroy(group_id_str);
rd_kafkap_str_destroy(member_id_str);
mtx_unlock(&mcluster->lock);
}

/**
* @brief A client connection closed, check if any consumer cgrp has any state
* for this connection that needs to be cleared.
*/
void rd_kafka_mock_cgrps_consumer_connection_closed(
rd_kafka_mock_cluster_t *mcluster,
rd_kafka_mock_connection_t *mconn) {
rd_kafka_mock_cgrp_consumer_t *mcgrp;

TAILQ_FOREACH(mcgrp, &mcluster->cgrps_consumer, link) {
rd_kafka_mock_cgrp_consumer_member_t *member, *tmp;
TAILQ_FOREACH_SAFE(member, &mcgrp->members, link, tmp) {
if (member->conn == mconn) {
member->conn = NULL;
rd_kafka_mock_cgrp_consumer_member_returned_assignment_set(
member, NULL);
rd_kafka_mock_cgrp_consumer_member_current_assignment_set(
member, NULL);
}
}
}
}

void rd_kafka_mock_cgrp_consumer_destroy(rd_kafka_mock_cgrp_consumer_t *mcgrp) {
Expand All @@ -932,3 +1139,13 @@ void rd_kafka_mock_cgrp_consumer_destroy(rd_kafka_mock_cgrp_consumer_t *mcgrp) {
rd_kafka_mock_cgrp_consumer_member_destroy(mcgrp, member);
rd_free(mcgrp);
}

/**
* @brief A client connection closed, check if any cgrp has any state
* for this connection that needs to be cleared.
*/
void rd_kafka_mock_cgrps_connection_closed(rd_kafka_mock_cluster_t *mcluster,
rd_kafka_mock_connection_t *mconn) {
rd_kafka_mock_cgrps_generic_connection_closed(mcluster, mconn);
rd_kafka_mock_cgrps_consumer_connection_closed(mcluster, mconn);
}
Loading

0 comments on commit a0afd7f

Please sign in to comment.