Skip to content

Commit

Permalink
Added new data type UUID for topic id. Added topic id support in Meta…
Browse files Browse the repository at this point in the history
…data Response.
  • Loading branch information
pranavrth committed Aug 2, 2023
1 parent c07a335 commit b6a7b45
Show file tree
Hide file tree
Showing 13 changed files with 272 additions and 99 deletions.
2 changes: 1 addition & 1 deletion INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1975,7 +1975,7 @@ release of librdkafka.
| 0 | Produce | 9 | 7 |
| 1 | Fetch | 15 | 11 |
| 2 | ListOffsets | 8 | 5 |
| 3 | Metadata | 12 | 9 |
| 3 | Metadata | 12 | 12 |
| 8 | OffsetCommit | 8 | 7 |
| 9 | OffsetFetch | 8 | 7 |
| 10 | FindCoordinator | 4 | 2 |
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set(
rdbuf.c
rdcrc32.c
rdfnv1a.c
rdbase64.c
rdkafka.c
rdkafka_assignor.c
rdkafka_broker.c
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
rdkafka_msgset_writer.c rdkafka_msgset_reader.c \
rdkafka_header.c rdkafka_admin.c rdkafka_aux.c \
rdkafka_background.c rdkafka_idempotence.c rdkafka_cert.c \
rdkafka_txnmgr.c rdkafka_coord.c \
rdkafka_txnmgr.c rdkafka_coord.c rdbase64.c \
rdvarint.c rdbuf.c rdmap.c rdunittest.c \
rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c \
rdkafka_error.c rdkafka_fetcher.c \
Expand Down
127 changes: 127 additions & 0 deletions src/rdbase64.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* librdkafka - The Apache Kafka C/C++ library
*
* Copyright (c) 2023 Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "rdkafka_int.h"
#include "rdbase64.h"

/**
* @brief Base64 encode binary input \p in, and write base64-encoded string
* and it's size to \p out. out->ptr will be NULL in case of some issue
* with the conversion or the conversion is not supported.
*
* @post out->ptr must be freed after use.
*/
void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out) {

#if !WITH_SSL
out->ptr = NULL;
return;
#endif

size_t max_len;

/* OpenSSL takes an |int| argument so the input cannot exceed that. */
if (in->size > INT_MAX) {
out->ptr = NULL;
return;
}

max_len = (((in->size + 2) / 3) * 4) + 1;
out->ptr = rd_malloc(max_len);
if (out->ptr == NULL)
return;

out->size = EVP_EncodeBlock((uint8_t *)out->ptr, (uint8_t *)in->ptr,
(int)in->size);

rd_assert(out->size < max_len);
out->ptr[out->size] = 0;
}


/**
* @brief Base64 encode binary input \p in.
* @returns a newly allocated, base64-encoded string or NULL in case of some issue
* with the conversion or the conversion is not supported.
*
* @post Returned string must be freed after use.
*/
char *rd_base64_encode_str(const rd_chariov_t *in) {
rd_chariov_t out;
rd_base64_encode(in, &out);
return out.ptr;
}


/**
* @brief Base64 decode input string \p in. Ignores leading and trailing
* whitespace.
* @returns * 0 on successes in which case a newly allocated binary string is set
* in out (and size).
* * -1 on invalid Base64.
* * -2 on conversion not supported.
*/
int rd_base64_decode(const rd_chariov_t *in, rd_chariov_t *out) {

#if !WITH_SSL
return -2;
#endif

size_t ret_len;

/* OpenSSL takes an |int| argument, so |in->size| must not exceed
* that. */
if (in->size % 4 != 0 || in->size > INT_MAX) {
return -1;
}

ret_len = ((in->size / 4) * 3);
out->ptr = rd_malloc(ret_len + 1);

if (EVP_DecodeBlock((uint8_t *)out->ptr, (uint8_t *)in->ptr,
(int)in->size) == -1) {
rd_free(out->ptr);
out->ptr = NULL;
return -1;
}

/* EVP_DecodeBlock will pad the output with trailing NULs and count
* them in the return value. */
if (in->size > 1 && in->ptr[in->size - 1] == '=') {
if (in->size > 2 && in->ptr[in->size - 2] == '=') {
ret_len -= 2;
} else {
ret_len -= 1;
}
}

out->ptr[ret_len] = 0;
out->size = ret_len;

return 0;
}
35 changes: 35 additions & 0 deletions src/rdbase64.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* librdkafka - The Apache Kafka C/C++ library
*
* Copyright (c) 2023 Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "rdkafka_int.h"

void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out);

char *rd_base64_encode_str(const rd_chariov_t *in);

int rd_base64_decode(const rd_chariov_t *in, rd_chariov_t *out);
17 changes: 16 additions & 1 deletion src/rdkafka_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "rdlist.h"
#include "rdbuf.h"
#include "rdkafka_msgbatch.h"
#include "rdbase64.h"

typedef struct rd_kafka_broker_s rd_kafka_broker_t;

Expand Down Expand Up @@ -1206,7 +1207,6 @@ rd_kafka_buf_update_i64(rd_kafka_buf_t *rkbuf, size_t of, int64_t v) {
rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
}


/**
* @brief Write standard (2-byte header) or KIP-482 COMPACT_STRING to buffer.
*
Expand Down Expand Up @@ -1428,4 +1428,19 @@ void rd_kafka_buf_set_maker(rd_kafka_buf_t *rkbuf,
rd_kafka_make_req_cb_t *make_cb,
void *make_opaque,
void (*free_make_opaque_cb)(void *make_opaque));


#define rd_kafka_buf_read_uuid(rkbuf, uuid) \
do { \
rd_kafka_buf_read_i64(rkbuf, &((uuid)->most_significant_bits)); \
rd_kafka_buf_read_i64(rkbuf, &((uuid)->least_significant_bits)); \
(uuid)->base64str[0] = '\0'; \
} while (0)

static RD_UNUSED void rd_kafka_buf_write_uuid(rd_kafka_buf_t *rkbuf,
rd_kafka_uuid_t *uuid) {
rd_kafka_buf_write_i64(rkbuf, uuid->most_significant_bits);
rd_kafka_buf_write_i64(rkbuf, uuid->least_significant_bits);
}

#endif /* _RDKAFKA_BUF_H_ */
5 changes: 5 additions & 0 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,11 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
rd_kafka_buf_read_i16a(rkbuf, md->topics[i].err);
rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf,
md->topics[i].topic);

if (ApiVersion >= 10) {
rd_kafka_buf_read_uuid(rkbuf, &mdi->topics[i].topic_id);
}

if (ApiVersion >= 1) {
int8_t is_internal;
rd_kafka_buf_read_i8(rkbuf, &is_internal);
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ typedef struct rd_kafka_metadata_topic_internal_s {
* same count as metadata.topics[i].partition_cnt.
* Sorted by Partition Id. */
rd_kafka_metadata_partition_internal_t *partitions;
rd_kafka_uuid_t topic_id;
} rd_kafka_metadata_topic_internal_t;


Expand Down
67 changes: 67 additions & 0 deletions src/rdkafka_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

/* Protocol defines */
#include "rdkafka_protocol.h"
#include "rdbase64.h"
#include "rdstring.h"



Expand Down Expand Up @@ -565,6 +567,71 @@ typedef struct rd_kafka_buf_s rd_kafka_buf_t;
(8 + 4 + 4 + 1 + 4 + 2 + 4 + 8 + 8 + 8 + 2 + 4)


/**
* @brief UUID
*
* @param most_significant_bits - most significant 64 bits for the UUID
* @param least_significant_bits - least significant 64 bits for the UUID
* @param base64str - base64 encoding for the uuid. By default, it is lazy loaded.
* Use function `rd_kafka_uuid_base64str()` as a getter for
* this field.
*/
typedef struct rd_kafka_uuid_s {
int64_t most_significant_bits;
int64_t least_significant_bits;
char base64str[23];
} rd_kafka_uuid_t;

#define RD_KAFKA_ZERO_UUID \
{ 0, 0, "" }

#define RD_KAFKA_METADATA_TOPIC_ID \
{ 0, 1, "" }

/**
* Initialize given UUID to zero UUID.
*
* @param uuid - UUID to initialize.
*/
static RD_UNUSED void rd_kafka_uuid_init(rd_kafka_uuid_t *uuid) {
memset(uuid,0,sizeof(*uuid));
}

/**
* @brief Computes base64 encoding for the given uuid string.
* @param uuid UUID for which base64 encoding is required.
*
* @return base64 encoded string for the given UUID or NULL in case of some
* issue with the conversion or the conversion is not supported.
*/
static RD_UNUSED char *rd_kafka_uuid_base64str(rd_kafka_uuid_t *uuid) {
if(strlen(uuid->base64str))
return uuid->base64str;

rd_chariov_t in_base64;
char *out_base64_str;
char *uuid_bytes;
uint64_t input_uuid[2];

input_uuid[0] = htobe64(uuid->most_significant_bits);
input_uuid[1] = htobe64(uuid->least_significant_bits);
uuid_bytes = (char *)input_uuid;
in_base64.ptr = uuid_bytes;
in_base64.size = sizeof(uuid->most_significant_bits) + sizeof(uuid->least_significant_bits);

out_base64_str = rd_base64_encode_str(&in_base64);
if(!out_base64_str)
return NULL;

rd_strlcpy(uuid->base64str, out_base64_str, 23 /* Removing extra ('=') padding */);
rd_free(out_base64_str);
return uuid->base64str;
}

static RD_UNUSED void rd_kafka_uuid_destroy(rd_kafka_uuid_t *uuid) {
rd_free(uuid);
}


/**
* @name Producer ID and Epoch for the Idempotent Producer
Expand Down
15 changes: 11 additions & 4 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -2212,11 +2212,11 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb,
int16_t ApiVersion = 0;
size_t of_TopicArrayCnt;
int features;
int topic_cnt = topics ? rd_list_cnt(topics) : 0;
int *full_incr = NULL;
int topic_cnt = topics ? rd_list_cnt(topics) : 0;
int *full_incr = NULL;

ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_Metadata, 0, 9, &features);
rkb, RD_KAFKAP_Metadata, 0, 12, &features);

rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_Metadata, 1,
4 + (50 * topic_cnt) + 1,
Expand Down Expand Up @@ -2305,13 +2305,20 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb,
if (topic_cnt > 0) {
char *topic;
int i;
rd_kafka_uuid_t zero_uuid = RD_KAFKA_ZERO_UUID;

/* Maintain a copy of the topics list so we can purge
* hints from the metadata cache on error. */
rkbuf->rkbuf_u.Metadata.topics =
rd_list_copy(topics, rd_list_string_copy, NULL);

RD_LIST_FOREACH(topic, topics, i) {
if (ApiVersion >= 10) {
/* FIXME: Not supporting topic id in the request
* right now. Update this to correct topic
* id once KIP-516 is fully implemented. */
rd_kafka_buf_write_uuid(rkbuf, &zero_uuid);
}
rd_kafka_buf_write_str(rkbuf, topic, -1);
/* Tags for previous topic */
rd_kafka_buf_write_tags(rkbuf);
Expand All @@ -2337,7 +2344,7 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb,
"on broker auto.create.topics.enable configuration");
}

if (ApiVersion >= 8 && ApiVersion < 10) {
if (ApiVersion >= 8 && ApiVersion <= 10) {
/* TODO: implement KIP-430 */
/* IncludeClusterAuthorizedOperations */
rd_kafka_buf_write_bool(rkbuf, rd_false);
Expand Down
Loading

0 comments on commit b6a7b45

Please sign in to comment.