Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Topic Id to Metadata Response #4300

Merged
merged 7 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# librdkafka v2.3.0

librdkafka v2.3.0 is a feature release:

* Added Topic id to the metadata response which is part of the [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)

emasab marked this conversation as resolved.
Show resolved Hide resolved


# librdkafka v2.2.0

librdkafka v2.2.0 is a feature release:
Expand Down
7 changes: 4 additions & 3 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1876,7 +1876,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-84 - SASL SCRAM | 0.10.2.0 | Supported |
| KIP-85 - SASL config properties | 0.10.2.0 | Supported |
| KIP-86 - Configurable SASL callbacks | 2.0.0 | Not supported |
| KIP-88 - AdminAPI: ListGroupOffsets | 0.10.2.0 | Supported |
| KIP-88 - AdminAPI: ListGroupOffsets | 0.10.2.0 | Supported |
| KIP-91 - Intuitive timeouts in Producer | 2.1.0 | Supported |
| KIP-92 - Per-partition lag metrics in Consumer | 0.10.2.0 | Supported |
| KIP-97 - Backwards compatibility with older brokers | 0.10.2.0 | Supported |
Expand All @@ -1900,7 +1900,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-226 - AdminAPI: Dynamic broker config | 1.1.0 | Supported |
| KIP-227 - Consumer Incremental Fetch | 1.1.0 | Not supported |
| KIP-229 - AdminAPI: DeleteGroups | 1.1.0 | Supported |
| KIP-235 - DNS alias for secure connections | 2.1.0 | Supported |
| KIP-235 - DNS alias for secure connections | 2.1.0 | Supported |
| KIP-249 - AdminAPI: Deletegation Tokens | 2.0.0 | Not supported |
| KIP-255 - SASL OAUTHBEARER | 2.0.0 | Supported |
| KIP-266 - Fix indefinite consumer timeouts | 2.0.0 | Supported (bound by session.timeout.ms and max.poll.interval.ms) |
Expand Down Expand Up @@ -1938,6 +1938,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-496 - AdminAPI: delete offsets | 2.4.0 | Supported |
| KIP-511 - Collect Client's Name and Version | 2.4.0 | Supported |
| KIP-514 - Bounded flush() | 2.4.0 | Supported |
| KIP-516 - Topic Identifiers | 2.8.0 (WIP) | Partially Supported |
| KIP-517 - Consumer poll() metrics | 2.4.0 | Not supported |
| KIP-518 - Allow listing consumer groups per state | 2.6.0 | Supported |
| KIP-519 - Make SSL engine configurable | 2.6.0 | Supported |
Expand Down Expand Up @@ -1975,7 +1976,7 @@ release of librdkafka.
| 0 | Produce | 9 | 7 |
| 1 | Fetch | 15 | 11 |
| 2 | ListOffsets | 8 | 5 |
| 3 | Metadata | 12 | 9 |
| 3 | Metadata | 12 | 12 |
| 8 | OffsetCommit | 8 | 7 |
| 9 | OffsetFetch | 8 | 7 |
| 10 | FindCoordinator | 4 | 2 |
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set(
rdbuf.c
rdcrc32.c
rdfnv1a.c
rdbase64.c
rdkafka.c
rdkafka_assignor.c
rdkafka_broker.c
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
rdkafka_msgset_writer.c rdkafka_msgset_reader.c \
rdkafka_header.c rdkafka_admin.c rdkafka_aux.c \
rdkafka_background.c rdkafka_idempotence.c rdkafka_cert.c \
rdkafka_txnmgr.c rdkafka_coord.c \
rdkafka_txnmgr.c rdkafka_coord.c rdbase64.c \
rdvarint.c rdbuf.c rdmap.c rdunittest.c \
rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c \
rdkafka_error.c rdkafka_fetcher.c \
Expand Down
127 changes: 127 additions & 0 deletions src/rdbase64.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* librdkafka - The Apache Kafka C/C++ library
*
* Copyright (c) 2023 Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "rdbase64.h"

#if WITH_SSL
#include <openssl/ssl.h>
#endif

/**
* @brief Base64 encode binary input \p in, and write base64-encoded string
* and it's size to \p out. out->ptr will be NULL in case of some issue
* with the conversion or the conversion is not supported.
*
* @remark out->ptr must be freed after use.
*/
void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out) {

#if WITH_SSL
size_t max_len;

/* OpenSSL takes an |int| argument so the input cannot exceed that. */
if (in->size > INT_MAX) {
out->ptr = NULL;
return;
}

max_len = (((in->size + 2) / 3) * 4) + 1;
emasab marked this conversation as resolved.
Show resolved Hide resolved
out->ptr = rd_malloc(max_len);

out->size = EVP_EncodeBlock((unsigned char *)out->ptr,
(unsigned char *)in->ptr, (int)in->size);

rd_assert(out->size < max_len);
out->ptr[out->size] = 0;
#else
out->ptr = NULL;
#endif
}


/**
* @brief Base64 encode binary input \p in.
* @returns a newly allocated, base64-encoded string or NULL in case of some
* issue with the conversion or the conversion is not supported.
*
* @remark Returned string must be freed after use.
*/
char *rd_base64_encode_str(const rd_chariov_t *in) {
rd_chariov_t out;
rd_base64_encode(in, &out);
return out.ptr;
}


/**
* @brief Base64 decode input string \p in. Ignores leading and trailing
* whitespace.
* @returns * 0 on successes in which case a newly allocated binary string is
* set in \p out (and size).
* * -1 on invalid Base64.
* * -2 on conversion not supported.
*/
int rd_base64_decode(const rd_chariov_t *in, rd_chariov_t *out) {

#if WITH_SSL
size_t ret_len;

/* OpenSSL takes an |int| argument, so |in->size| must not exceed
* that. */
if (in->size % 4 != 0 || in->size > INT_MAX) {
return -1;
}

ret_len = ((in->size / 4) * 3);
out->ptr = rd_malloc(ret_len + 1);

if (EVP_DecodeBlock((unsigned char *)out->ptr, (unsigned char *)in->ptr,
(int)in->size) == -1) {
rd_free(out->ptr);
out->ptr = NULL;
return -1;
}

/* EVP_DecodeBlock will pad the output with trailing NULs and count
* them in the return value. */
if (in->size > 1 && in->ptr[in->size - 1] == '=') {
if (in->size > 2 && in->ptr[in->size - 2] == '=') {
ret_len -= 2;
} else {
ret_len -= 1;
}
}

out->ptr[ret_len] = 0;
out->size = ret_len;

return 0;
#else
return -2;
#endif
}
41 changes: 41 additions & 0 deletions src/rdbase64.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* librdkafka - The Apache Kafka C/C++ library
*
* Copyright (c) 2023 Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/


#ifndef _RDBASE64_H_
#define _RDBASE64_H_

#include "rd.h"

void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out);

char *rd_base64_encode_str(const rd_chariov_t *in);

int rd_base64_decode(const rd_chariov_t *in, rd_chariov_t *out);

#endif /* _RDBASE64_H_ */
18 changes: 17 additions & 1 deletion src/rdkafka_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,6 @@ rd_kafka_buf_update_i64(rd_kafka_buf_t *rkbuf, size_t of, int64_t v) {
rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
}


/**
* @brief Write standard (2-byte header) or KIP-482 COMPACT_STRING to buffer.
*
Expand Down Expand Up @@ -1428,4 +1427,21 @@ void rd_kafka_buf_set_maker(rd_kafka_buf_t *rkbuf,
rd_kafka_make_req_cb_t *make_cb,
void *make_opaque,
void (*free_make_opaque_cb)(void *make_opaque));


#define rd_kafka_buf_read_uuid(rkbuf, uuid) \
do { \
rd_kafka_buf_read_i64(rkbuf, \
&((uuid)->most_significant_bits)); \
rd_kafka_buf_read_i64(rkbuf, \
&((uuid)->least_significant_bits)); \
(uuid)->base64str[0] = '\0'; \
} while (0)

static RD_UNUSED void rd_kafka_buf_write_uuid(rd_kafka_buf_t *rkbuf,
emasab marked this conversation as resolved.
Show resolved Hide resolved
rd_kafka_uuid_t *uuid) {
rd_kafka_buf_write_i64(rkbuf, uuid->most_significant_bits);
rd_kafka_buf_write_i64(rkbuf, uuid->least_significant_bits);
}

#endif /* _RDKAFKA_BUF_H_ */
5 changes: 5 additions & 0 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,11 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
rd_kafka_buf_read_i16a(rkbuf, md->topics[i].err);
rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf,
md->topics[i].topic);

if (ApiVersion >= 10) {
rd_kafka_buf_read_uuid(rkbuf, &mdi->topics[i].topic_id);
}

if (ApiVersion >= 1) {
int8_t is_internal;
rd_kafka_buf_read_i8(rkbuf, &is_internal);
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ typedef struct rd_kafka_metadata_topic_internal_s {
* same count as metadata.topics[i].partition_cnt.
* Sorted by Partition Id. */
rd_kafka_metadata_partition_internal_t *partitions;
rd_kafka_uuid_t topic_id;
} rd_kafka_metadata_topic_internal_t;


Expand Down
82 changes: 82 additions & 0 deletions src/rdkafka_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2022, Magnus Edenhill
* 2023, Confluent Inc.

* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -30,8 +32,10 @@
#define _RDKAFKA_PROTO_H_


#include "rdstring.h"
#include "rdendian.h"
#include "rdvarint.h"
#include "rdbase64.h"

/* Protocol defines */
#include "rdkafka_protocol.h"
Expand Down Expand Up @@ -565,6 +569,84 @@ typedef struct rd_kafka_buf_s rd_kafka_buf_t;
(8 + 4 + 4 + 1 + 4 + 2 + 4 + 8 + 8 + 8 + 2 + 4)


/**
emasab marked this conversation as resolved.
Show resolved Hide resolved
* @struct Struct representing UUID protocol primitive type.
*/
typedef struct rd_kafka_uuid_s {
int64_t
most_significant_bits; /**< Most significant 64 bits for the UUID */
int64_t least_significant_bits; /**< Least significant 64 bits for the
UUID */
char base64str[23]; /**< base64 encoding for the uuid. By default, it is
lazy loaded. Use function
`rd_kafka_uuid_base64str()` as a getter for this
field. */
} rd_kafka_uuid_t;

#define RD_KAFKA_UUID_ZERO \
{ 0, 0, "" }

#define RD_KAFKA_UUID_METADATA_TOPIC_ID \
{ 0, 1, "" }


/**
* Creates a new UUID.
*
* @return A newly allocated UUID.
*/
static RD_INLINE RD_UNUSED rd_kafka_uuid_t *rd_kafka_uuid_new() {
rd_kafka_uuid_t *uuid = rd_calloc(1, sizeof(rd_kafka_uuid_t *));
return uuid;
}

/**
* Initialize given UUID to zero UUID.
*
* @param uuid UUID to initialize.
*/
static RD_INLINE RD_UNUSED void rd_kafka_uuid_init(rd_kafka_uuid_t *uuid) {
emasab marked this conversation as resolved.
Show resolved Hide resolved
memset(uuid, 0, sizeof(*uuid));
}

/**
* @brief Computes base64 encoding for the given uuid string.
* @param uuid UUID for which base64 encoding is required.
*
* @return base64 encoded string for the given UUID or NULL in case of some
* issue with the conversion or the conversion is not supported.
*/
static RD_INLINE RD_UNUSED char *
rd_kafka_uuid_base64str(rd_kafka_uuid_t *uuid) {
if (*uuid->base64str)
return uuid->base64str;

rd_chariov_t in_base64;
char *out_base64_str;
char *uuid_bytes;
uint64_t input_uuid[2];

input_uuid[0] = htobe64(uuid->most_significant_bits);
input_uuid[1] = htobe64(uuid->least_significant_bits);
uuid_bytes = (char *)input_uuid;
in_base64.ptr = uuid_bytes;
in_base64.size = sizeof(uuid->most_significant_bits) +
sizeof(uuid->least_significant_bits);

out_base64_str = rd_base64_encode_str(&in_base64);
if (!out_base64_str)
return NULL;

rd_strlcpy(uuid->base64str, out_base64_str,
23 /* Removing extra ('=') padding */);
rd_free(out_base64_str);
return uuid->base64str;
}

static RD_INLINE RD_UNUSED void rd_kafka_uuid_destroy(rd_kafka_uuid_t *uuid) {
rd_free(uuid);
}


/**
* @name Producer ID and Epoch for the Idempotent Producer
Expand Down
Loading