Skip to content

Commit

Permalink
Remove warnings and add CHANGELOG
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Jun 26, 2023
2 parents 3a9a340 + aa50e52 commit 7fff6eb
Show file tree
Hide file tree
Showing 15 changed files with 201 additions and 192 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ librdkafka v2.2.0 is a feature release:
* Store offset commit metadata in `rd_kafka_offsets_store` (@mathispesch, #4084).
* Fix a bug that happens when skipping tags, causing buffer underflow in
MetadataResponse (#4278).
* Fix a bug where topic leader is not refreshed in the same metadata call even if the leader is
present.
* [KIP-881](https://cwiki.apache.org/confluence/display/KAFKA/KIP-881%3A+Rack-aware+Partition+Assignment+for+Kafka+Consumers):
Add support for rack-aware partition assignment for consumers
(#4184, #4291, #4252).
Expand All @@ -19,6 +21,10 @@ librdkafka v2.2.0 is a feature release:
closes as normal ones (#4294).
* Added `fetch.queue.backoff.ms` to the consumer to control how long
the consumer backs off next fetch attempt. (@bitemyapp, @edenhill, #2879)
* [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses):
Return authorized operations in Describe Responses. This additionally
includes AdminAPI for DescribeCluster and DescribeTopics.
(#4240, @jainruchir).


## Enhancements
Expand Down
1 change: 0 additions & 1 deletion examples/describe_cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ int64_t parse_int(const char *what, const char *str) {
*/
static int
print_cluster_info(const rd_kafka_DescribeCluster_result_t *clusterdesc) {
size_t i;
int j, acl_operation;
const rd_kafka_ClusterDescription_t *desc;
int controller_id, node_cnt, cluster_authorized_operations_cnt;
Expand Down
3 changes: 1 addition & 2 deletions examples/describe_topics.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ int64_t parse_int(const char *what, const char *str) {
*/
static int print_topics_info(const rd_kafka_DescribeTopics_result_t *topicdesc,
int topic_cnt) {
size_t i, j;
size_t i;
const rd_kafka_TopicDescription_t **result_topics;
size_t result_topics_cnt;
result_topics = rd_kafka_DescribeTopics_result_topics(
Expand Down Expand Up @@ -248,7 +248,6 @@ static int print_topics_info(const rd_kafka_DescribeTopics_result_t *topicdesc,
*/
static void cmd_describe_topics(rd_kafka_conf_t *conf, int argc, char **argv) {
rd_kafka_t *rk;
int i;
const char **topics = NULL;
char errstr[512];
rd_kafka_AdminOptions_t *options;
Expand Down
2 changes: 1 addition & 1 deletion src-cpp/rdkafkacpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ namespace RdKafka {
* @remark This value should only be used during compile time,
* for runtime checks of version use RdKafka::version()
*/
#define RD_KAFKA_VERSION 0x020101ff
#define RD_KAFKA_VERSION 0x020200ff

/**
* @brief Returns the librdkafka version as integer.
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ typedef SSIZE_T ssize_t;
* @remark This value should only be used during compile time,
* for runtime checks of version use rd_kafka_version()
*/
#define RD_KAFKA_VERSION 0x020101ff
#define RD_KAFKA_VERSION 0x020200ff

/**
* @brief Returns the librdkafka version as integer.
Expand Down
64 changes: 9 additions & 55 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -6149,7 +6149,7 @@ const rd_kafka_topic_partition_list_t *rd_kafka_MemberAssignment_partitions(
* @param members List of members (rd_kafka_MemberDescription_t) of this
* group.
* @param partition_assignor (optional) Chosen assignor.
* @param authorized_operations authorized operations.
* @param authorized_operations (optional) authorized operations.
* @param state Group state.
* @param coordinator (optional) Group coordinator.
* @param error (optional) Error received for this group.
Expand Down Expand Up @@ -6601,22 +6601,20 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
authorized_operations_list,
rd_kafka_consumer_group_state_code(group_state),
node, error);
if (authorized_operations_list)
rd_list_destroy(authorized_operations_list);
} else {
} else
grpdesc = rd_kafka_ConsumerGroupDescription_new_error(
group_id, error);
}

rd_list_add(&rko_result->rko_u.admin_result.results, grpdesc);
if (error)
rd_kafka_error_destroy(error);

rd_list_destroy(&members);
rd_free(group_id);
rd_free(group_state);
rd_free(proto_type);
rd_free(proto);
if (authorized_operations_list)
rd_list_destroy(authorized_operations_list);
RD_IF_FREE(error, rd_kafka_error_destroy);
RD_IF_FREE(authorized_operations_list, rd_list_destroy);

error = NULL;
group_id = NULL;
group_state = NULL;
Expand Down Expand Up @@ -7117,10 +7115,6 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req,
i = 0;
while (cnt--) {
rd_kafka_TopicDescription_t *topicdesc = NULL;
/* topics in md should be in the same order as in
* mdi->topics[i]*/
rd_dassert(strcmp(md->topics[i].topic,
mdi->topics[i].topic_name) == 0);
if (md->topics[i].err == RD_KAFKA_RESP_ERR_NO_ERROR) {
rd_list_t *authorized_operations;
authorized_operations =
Expand Down Expand Up @@ -7315,47 +7309,6 @@ rd_kafka_DescribeCluster_result_description(
return clusterdesc;
}

/**
* @brief Copy \p desc ClusterDescription.
*
* @param desc The cluster description to copy.
* @return A new allocated copy of the passed ClusterDescription.
*/
static rd_kafka_ClusterDescription_t *
rd_kafka_ClusterDescription_copy(const rd_kafka_ClusterDescription_t *desc) {
rd_kafka_ClusterDescription_t *clusterdesc;
int i;
clusterdesc = rd_calloc(1, sizeof(*clusterdesc));

clusterdesc->cluster_id = rd_strdup(desc->cluster_id);
clusterdesc->controller_id = desc->controller_id;

if (desc->cluster_authorized_operations) {
clusterdesc->cluster_authorized_operations = rd_list_new(
rd_list_cnt(desc->cluster_authorized_operations), rd_free);
for (i = 0;
i < rd_list_cnt(desc->cluster_authorized_operations);
i++) {
int *entry = rd_list_elem(
desc->cluster_authorized_operations, i);
int *oper = malloc(sizeof(int));
*oper = *entry;
rd_list_add(clusterdesc->cluster_authorized_operations,
oper);
}
} else
clusterdesc->cluster_authorized_operations = NULL;

clusterdesc->node_cnt = desc->node_cnt;
clusterdesc->Nodes = malloc(sizeof(rd_kafka_Node_t) * desc->node_cnt);
for (i = 0; i < desc->node_cnt; i++) {
clusterdesc->Nodes[i].host = rd_strdup(desc->Nodes[i].host);
clusterdesc->Nodes[i].port = desc->Nodes[i].port;
clusterdesc->Nodes[i].id = desc->Nodes[i].id;
}
return clusterdesc;
}

/**
* @brief Create a new ClusterDescription object.
*
Expand Down Expand Up @@ -7445,7 +7398,7 @@ rd_kafka_admin_DescribeClusterRequest(rd_kafka_broker_t *rkb,
include_cluster_authorized_operations,
rd_false /*!include topic authorized operations */,
rd_false /*cgrp update*/, rd_false /* force_rack */, NULL, resp_cb,
1, opaque);
1 /* force */, opaque);

if (err) {
rd_snprintf(errstr, errstr_size, "%s", rd_kafka_err2str(err));
Expand Down Expand Up @@ -7521,4 +7474,5 @@ void rd_kafka_DescribeCluster(rd_kafka_t *rk,

rd_kafka_q_enq(rk->rk_ops, rko);
}

/**@}*/
3 changes: 2 additions & 1 deletion src/rdkafka_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,8 @@ struct rd_kafka_ConsumerGroupDescription_s {
rd_kafka_consumer_group_state_t state;
/** Consumer group coordinator. */
rd_kafka_Node_t *coordinator;
/** Authorized operations. */
/** List of authorized operations.
* Type: rd_kafka_AclOperation_t* */
rd_list_t *authorized_operations;
/** Group specific error. */
rd_kafka_error_t *error;
Expand Down
85 changes: 43 additions & 42 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,8 @@ rd_kafka_populate_metadata_topic_racks(rd_tmpabuf_t *tbuf,
/**
* @brief Handle a Metadata response message.
*
* @param topics are the requested topics (may be NULL)
* @param request_topics Use when rd_kafka_buf_t* request is NULL
* @param topics are the requested topics (may be NULL).
* @param request_topics Used when rd_kafka_buf_t* request is NULL.
*
* The metadata will be marshalled into 'rd_kafka_metadata_internal_t *'.
*
Expand Down Expand Up @@ -477,8 +477,7 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
? request->rkbuf_u.Metadata.reason
: "(no reason)")
: "(admin request)";
/* changed from request->rkbuf_reqhdr.ApiVersion as request buffer may
* be NULL*/

int ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion;
rd_kafkap_str_t cluster_id = RD_ZERO_INIT;
int32_t controller_id = -1;
Expand Down Expand Up @@ -569,18 +568,19 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
rd_kafka_buf_skip_tags(rkbuf);
}

if (ApiVersion >= 2) {
rd_kafka_buf_read_str(rkbuf, &cluster_id);
mdi->cluster_id = RD_KAFKAP_STR_DUP(&cluster_id);
}
if (ApiVersion >= 2)
rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf, mdi->cluster_id);
else
mdi->cluster_id = NULL;



if (ApiVersion >= 1) {
rd_kafka_buf_read_i32(rkbuf, &controller_id);
mdi->controller_id = controller_id;
rd_rkb_dbg(rkb, METADATA, "METADATA",
"ClusterId: %.*s, ControllerId: %" PRId32,
RD_KAFKAP_STR_PR(&cluster_id), controller_id);
"ClusterId: %s, ControllerId: %" PRId32,
mdi->cluster_id, controller_id);
}

qsort(mdi->brokers, md->broker_cnt, sizeof(mdi->brokers[i]),
Expand Down Expand Up @@ -733,12 +733,44 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
/* TopicAuthorizedOperations */
rd_kafka_buf_read_i32(rkbuf,
&TopicAuthorizedOperations);
mdi->topics[i].topic_name = md->topics[i].topic;
mdi->topics[i].topic_authorized_operations =
TopicAuthorizedOperations;
}

rd_kafka_buf_skip_tags(rkbuf);
}

if (ApiVersion >= 8 && ApiVersion <= 10) {
int32_t ClusterAuthorizedOperations;
/* ClusterAuthorizedOperations */
rd_kafka_buf_read_i32(rkbuf, &ClusterAuthorizedOperations);
mdi->cluster_authorized_operations =
ClusterAuthorizedOperations;
}

rd_kafka_buf_skip_tags(rkbuf);

/* Entire Metadata response now parsed without errors:
* update our internal state according to the response. */

if (md->broker_cnt == 0 && md->topic_cnt == 0) {
rd_rkb_dbg(rkb, METADATA, "METADATA",
"No brokers or topics in metadata: should retry");
err = RD_KAFKA_RESP_ERR__PARTIAL;
goto err;
}

/* Update our list of brokers. */
for (i = 0; i < md->broker_cnt; i++) {
rd_rkb_dbg(rkb, METADATA, "METADATA",
" Broker #%i/%i: %s:%i NodeId %" PRId32, i,
md->broker_cnt, md->brokers[i].host,
md->brokers[i].port, md->brokers[i].id);
rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto,
&md->brokers[i], NULL);
}

for (i = 0; i < md->topic_cnt; i++) {

/* Ignore topics in blacklist */
if (rkb->rkb_rk->rk_conf.topic_blacklist &&
Expand Down Expand Up @@ -767,7 +799,6 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
rd_kafka_parse_Metadata_update_topic(rkb, &md->topics[i],
&mdi->topics[i]);


if (requested_topics) {
rd_list_free_cb(missing_topics,
rd_list_remove_cmp(missing_topics,
Expand All @@ -794,36 +825,6 @@ rd_kafka_resp_err_t rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
}
}

if (ApiVersion >= 8 && ApiVersion <= 10) {
int32_t ClusterAuthorizedOperations;
/* ClusterAuthorizedOperations */
rd_kafka_buf_read_i32(rkbuf, &ClusterAuthorizedOperations);
mdi->cluster_authorized_operations =
ClusterAuthorizedOperations;
}

rd_kafka_buf_skip_tags(rkbuf);

/* Entire Metadata response now parsed without errors:
* update our internal state according to the response. */

if (md->broker_cnt == 0 && md->topic_cnt == 0) {
rd_rkb_dbg(rkb, METADATA, "METADATA",
"No brokers or topics in metadata: should retry");
err = RD_KAFKA_RESP_ERR__PARTIAL;
goto err;
}

/* Update our list of brokers. */
for (i = 0; i < md->broker_cnt; i++) {
rd_rkb_dbg(rkb, METADATA, "METADATA",
" Broker #%i/%i: %s:%i NodeId %" PRId32, i,
md->broker_cnt, md->brokers[i].host,
md->brokers[i].port, md->brokers[i].id);
rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto,
&md->brokers[i], NULL);
}

/* Requested topics not seen in metadata? Propogate to topic code. */
if (missing_topics) {
char *topic;
Expand Down
2 changes: 0 additions & 2 deletions src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ typedef struct rd_kafka_metadata_topic_internal_s {
* same count as metadata.topics[i].partition_cnt.
* Sorted by Partition Id. */
rd_kafka_metadata_partition_internal_t *partitions;
/** Topic Name. */
char *topic_name;
int32_t topic_authorized_operations; /**< ACL operations allowed
for topic */
} rd_kafka_metadata_topic_internal_t;
Expand Down
3 changes: 1 addition & 2 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -2017,7 +2017,7 @@ rd_kafka_error_t *rd_kafka_ListGroupsRequest(rd_kafka_broker_t *rkb,
* Uses \p max_ApiVersion as maximum API version,
* pass -1 to use the maximum available version.
* Uses \p include_authorized_operations to get
* group ACL authorized operations, 1 to request.
* group ACL authorized operations.
*
* The response (unparsed) will be enqueued on \p replyq
* for handling by \p resp_cb (with \p opaque passed).
Expand Down Expand Up @@ -2225,7 +2225,6 @@ rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb,
void *opaque) {
rd_kafka_buf_t *rkbuf;
int16_t ApiVersion = 0;
int i;
size_t of_TopicArrayCnt;
int features;
int topic_cnt = topics ? rd_list_cnt(topics) : 0;
Expand Down
2 changes: 0 additions & 2 deletions tests/0080-admin_ut.c
Original file line number Diff line number Diff line change
Expand Up @@ -916,15 +916,13 @@ static void do_test_DescribeCluster(const char *what,
rd_kafka_queue_t *q;
rd_kafka_AdminOptions_t *options = NULL;
int exp_timeout = MY_SOCKET_TIMEOUT_MS;
int i;
char errstr[512];
const char *errstr2;
rd_kafka_resp_err_t err;
rd_kafka_error_t *error;
test_timing_t timing;
rd_kafka_event_t *rkev;
const rd_kafka_DescribeCluster_result_t *res;
const rd_kafka_ClusterDescription_t *clusterdesc;
void *my_opaque = NULL, *opaque;

SUB_TEST_QUICK("%s DescribeCluster with %s, timeout %dms",
Expand Down
Loading

0 comments on commit 7fff6eb

Please sign in to comment.