Skip to content

Commit

Permalink
Add Topic Id to Metadata Response (#4300)
Browse files Browse the repository at this point in the history
* Added new data type UUID for topic id. 
* Added topic id support in Metadata Response.
  • Loading branch information
pranavrth authored Aug 7, 2023
1 parent c07a335 commit 07262c4
Show file tree
Hide file tree
Showing 14 changed files with 305 additions and 99 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# librdkafka v2.2.1

librdkafka v2.2.1 is a maintenance release:

* Added Topic id to the metadata response which is part of the [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)



# librdkafka v2.2.0

librdkafka v2.2.0 is a feature release:
Expand Down
7 changes: 4 additions & 3 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1876,7 +1876,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-84 - SASL SCRAM | 0.10.2.0 | Supported |
| KIP-85 - SASL config properties | 0.10.2.0 | Supported |
| KIP-86 - Configurable SASL callbacks | 2.0.0 | Not supported |
| KIP-88 - AdminAPI: ListGroupOffsets | 0.10.2.0 | Supported |
| KIP-88 - AdminAPI: ListGroupOffsets | 0.10.2.0 | Supported |
| KIP-91 - Intuitive timeouts in Producer | 2.1.0 | Supported |
| KIP-92 - Per-partition lag metrics in Consumer | 0.10.2.0 | Supported |
| KIP-97 - Backwards compatibility with older brokers | 0.10.2.0 | Supported |
Expand All @@ -1900,7 +1900,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-226 - AdminAPI: Dynamic broker config | 1.1.0 | Supported |
| KIP-227 - Consumer Incremental Fetch | 1.1.0 | Not supported |
| KIP-229 - AdminAPI: DeleteGroups | 1.1.0 | Supported |
| KIP-235 - DNS alias for secure connections | 2.1.0 | Supported |
| KIP-235 - DNS alias for secure connections | 2.1.0 | Supported |
| KIP-249 - AdminAPI: Deletegation Tokens | 2.0.0 | Not supported |
| KIP-255 - SASL OAUTHBEARER | 2.0.0 | Supported |
| KIP-266 - Fix indefinite consumer timeouts | 2.0.0 | Supported (bound by session.timeout.ms and max.poll.interval.ms) |
Expand Down Expand Up @@ -1938,6 +1938,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-496 - AdminAPI: delete offsets | 2.4.0 | Supported |
| KIP-511 - Collect Client's Name and Version | 2.4.0 | Supported |
| KIP-514 - Bounded flush() | 2.4.0 | Supported |
| KIP-516 - Topic Identifiers | 2.8.0 (WIP) | Partially Supported |
| KIP-517 - Consumer poll() metrics | 2.4.0 | Not supported |
| KIP-518 - Allow listing consumer groups per state | 2.6.0 | Supported |
| KIP-519 - Make SSL engine configurable | 2.6.0 | Supported |
Expand Down Expand Up @@ -1975,7 +1976,7 @@ release of librdkafka.
| 0 | Produce | 9 | 7 |
| 1 | Fetch | 15 | 11 |
| 2 | ListOffsets | 8 | 5 |
| 3 | Metadata | 12 | 9 |
| 3 | Metadata | 12 | 12 |
| 8 | OffsetCommit | 8 | 7 |
| 9 | OffsetFetch | 8 | 7 |
| 10 | FindCoordinator | 4 | 2 |
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set(
rdbuf.c
rdcrc32.c
rdfnv1a.c
rdbase64.c
rdkafka.c
rdkafka_assignor.c
rdkafka_broker.c
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
rdkafka_msgset_writer.c rdkafka_msgset_reader.c \
rdkafka_header.c rdkafka_admin.c rdkafka_aux.c \
rdkafka_background.c rdkafka_idempotence.c rdkafka_cert.c \
rdkafka_txnmgr.c rdkafka_coord.c \
rdkafka_txnmgr.c rdkafka_coord.c rdbase64.c \
rdvarint.c rdbuf.c rdmap.c rdunittest.c \
rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c \
rdkafka_error.c rdkafka_fetcher.c \
Expand Down
127 changes: 127 additions & 0 deletions src/rdbase64.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* librdkafka - The Apache Kafka C/C++ library
*
* Copyright (c) 2023 Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "rdbase64.h"

#if WITH_SSL
#include <openssl/ssl.h>
#endif

/**
* @brief Base64 encode binary input \p in, and write base64-encoded string
* and it's size to \p out. out->ptr will be NULL in case of some issue
* with the conversion or the conversion is not supported.
*
* @remark out->ptr must be freed after use.
*/
void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out) {

#if WITH_SSL
size_t max_len;

/* OpenSSL takes an |int| argument so the input cannot exceed that. */
if (in->size > INT_MAX) {
out->ptr = NULL;
return;
}

max_len = (((in->size + 2) / 3) * 4) + 1;
out->ptr = rd_malloc(max_len);

out->size = EVP_EncodeBlock((unsigned char *)out->ptr,
(unsigned char *)in->ptr, (int)in->size);

rd_assert(out->size < max_len);
out->ptr[out->size] = 0;
#else
out->ptr = NULL;
#endif
}


/**
* @brief Base64 encode binary input \p in.
* @returns a newly allocated, base64-encoded string or NULL in case of some
* issue with the conversion or the conversion is not supported.
*
* @remark Returned string must be freed after use.
*/
char *rd_base64_encode_str(const rd_chariov_t *in) {
rd_chariov_t out;
rd_base64_encode(in, &out);
return out.ptr;
}


/**
* @brief Base64 decode input string \p in. Ignores leading and trailing
* whitespace.
* @returns * 0 on successes in which case a newly allocated binary string is
* set in \p out (and size).
* * -1 on invalid Base64.
* * -2 on conversion not supported.
*/
int rd_base64_decode(const rd_chariov_t *in, rd_chariov_t *out) {

#if WITH_SSL
size_t ret_len;

/* OpenSSL takes an |int| argument, so |in->size| must not exceed
* that. */
if (in->size % 4 != 0 || in->size > INT_MAX) {
return -1;
}

ret_len = ((in->size / 4) * 3);
out->ptr = rd_malloc(ret_len + 1);

if (EVP_DecodeBlock((unsigned char *)out->ptr, (unsigned char *)in->ptr,
(int)in->size) == -1) {
rd_free(out->ptr);
out->ptr = NULL;
return -1;
}

/* EVP_DecodeBlock will pad the output with trailing NULs and count
* them in the return value. */
if (in->size > 1 && in->ptr[in->size - 1] == '=') {
if (in->size > 2 && in->ptr[in->size - 2] == '=') {
ret_len -= 2;
} else {
ret_len -= 1;
}
}

out->ptr[ret_len] = 0;
out->size = ret_len;

return 0;
#else
return -2;
#endif
}
41 changes: 41 additions & 0 deletions src/rdbase64.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* librdkafka - The Apache Kafka C/C++ library
*
* Copyright (c) 2023 Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/


#ifndef _RDBASE64_H_
#define _RDBASE64_H_

#include "rd.h"

void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out);

char *rd_base64_encode_str(const rd_chariov_t *in);

int rd_base64_decode(const rd_chariov_t *in, rd_chariov_t *out);

#endif /* _RDBASE64_H_ */
18 changes: 17 additions & 1 deletion src/rdkafka_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,6 @@ rd_kafka_buf_update_i64(rd_kafka_buf_t *rkbuf, size_t of, int64_t v) {
rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
}


/**
* @brief Write standard (2-byte header) or KIP-482 COMPACT_STRING to buffer.
*
Expand Down Expand Up @@ -1428,4 +1427,21 @@ void rd_kafka_buf_set_maker(rd_kafka_buf_t *rkbuf,
rd_kafka_make_req_cb_t *make_cb,
void *make_opaque,
void (*free_make_opaque_cb)(void *make_opaque));


#define rd_kafka_buf_read_uuid(rkbuf, uuid) \
do { \
rd_kafka_buf_read_i64(rkbuf, \
&((uuid)->most_significant_bits)); \
rd_kafka_buf_read_i64(rkbuf, \
&((uuid)->least_significant_bits)); \
(uuid)->base64str[0] = '\0'; \
} while (0)

static RD_UNUSED void rd_kafka_buf_write_uuid(rd_kafka_buf_t *rkbuf,
rd_kafka_uuid_t *uuid) {
rd_kafka_buf_write_i64(rkbuf, uuid->most_significant_bits);
rd_kafka_buf_write_i64(rkbuf, uuid->least_significant_bits);
}

#endif /* _RDKAFKA_BUF_H_ */
5 changes: 5 additions & 0 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,11 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
rd_kafka_buf_read_i16a(rkbuf, md->topics[i].err);
rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf,
md->topics[i].topic);

if (ApiVersion >= 10) {
rd_kafka_buf_read_uuid(rkbuf, &mdi->topics[i].topic_id);
}

if (ApiVersion >= 1) {
int8_t is_internal;
rd_kafka_buf_read_i8(rkbuf, &is_internal);
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ typedef struct rd_kafka_metadata_topic_internal_s {
* same count as metadata.topics[i].partition_cnt.
* Sorted by Partition Id. */
rd_kafka_metadata_partition_internal_t *partitions;
rd_kafka_uuid_t topic_id;
} rd_kafka_metadata_topic_internal_t;


Expand Down
82 changes: 82 additions & 0 deletions src/rdkafka_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2022, Magnus Edenhill
* 2023, Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -30,8 +32,10 @@
#define _RDKAFKA_PROTO_H_


#include "rdstring.h"
#include "rdendian.h"
#include "rdvarint.h"
#include "rdbase64.h"

/* Protocol defines */
#include "rdkafka_protocol.h"
Expand Down Expand Up @@ -565,6 +569,84 @@ typedef struct rd_kafka_buf_s rd_kafka_buf_t;
(8 + 4 + 4 + 1 + 4 + 2 + 4 + 8 + 8 + 8 + 2 + 4)


/**
* @struct Struct representing UUID protocol primitive type.
*/
typedef struct rd_kafka_uuid_s {
int64_t
most_significant_bits; /**< Most significant 64 bits for the UUID */
int64_t least_significant_bits; /**< Least significant 64 bits for the
UUID */
char base64str[23]; /**< base64 encoding for the uuid. By default, it is
lazy loaded. Use function
`rd_kafka_uuid_base64str()` as a getter for this
field. */
} rd_kafka_uuid_t;

#define RD_KAFKA_UUID_ZERO \
{ 0, 0, "" }

#define RD_KAFKA_UUID_METADATA_TOPIC_ID \
{ 0, 1, "" }


/**
* Creates a new UUID.
*
* @return A newly allocated UUID.
*/
static RD_INLINE RD_UNUSED rd_kafka_uuid_t *rd_kafka_uuid_new() {
rd_kafka_uuid_t *uuid = rd_calloc(1, sizeof(rd_kafka_uuid_t *));
return uuid;
}

/**
* Initialize given UUID to zero UUID.
*
* @param uuid UUID to initialize.
*/
static RD_INLINE RD_UNUSED void rd_kafka_uuid_init(rd_kafka_uuid_t *uuid) {
memset(uuid, 0, sizeof(*uuid));
}

/**
* @brief Computes base64 encoding for the given uuid string.
* @param uuid UUID for which base64 encoding is required.
*
* @return base64 encoded string for the given UUID or NULL in case of some
* issue with the conversion or the conversion is not supported.
*/
static RD_INLINE RD_UNUSED char *
rd_kafka_uuid_base64str(rd_kafka_uuid_t *uuid) {
if (*uuid->base64str)
return uuid->base64str;

rd_chariov_t in_base64;
char *out_base64_str;
char *uuid_bytes;
uint64_t input_uuid[2];

input_uuid[0] = htobe64(uuid->most_significant_bits);
input_uuid[1] = htobe64(uuid->least_significant_bits);
uuid_bytes = (char *)input_uuid;
in_base64.ptr = uuid_bytes;
in_base64.size = sizeof(uuid->most_significant_bits) +
sizeof(uuid->least_significant_bits);

out_base64_str = rd_base64_encode_str(&in_base64);
if (!out_base64_str)
return NULL;

rd_strlcpy(uuid->base64str, out_base64_str,
23 /* Removing extra ('=') padding */);
rd_free(out_base64_str);
return uuid->base64str;
}

static RD_INLINE RD_UNUSED void rd_kafka_uuid_destroy(rd_kafka_uuid_t *uuid) {
rd_free(uuid);
}


/**
* @name Producer ID and Epoch for the Idempotent Producer
Expand Down
Loading

0 comments on commit 07262c4

Please sign in to comment.