Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Topic Id to Metadata Response #4300

Merged
merged 7 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# librdkafka v2.2.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

version 2.2.1 for the moment as it doesn't change public API

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping 2.3.0 as we don't have plan for 2.2.1 right now.


librdkafka v2.2.0 is a feature release:

* Added Topic id to the metadata response which is part of the [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)

emasab marked this conversation as resolved.
Show resolved Hide resolved
# librdkafka v2.2.0

librdkafka v2.2.0 is a feature release:

* Fix a segmentation fault when subscribing to non-existent topics and
Expand Down
225 changes: 113 additions & 112 deletions INTRODUCTION.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set(
rdbuf.c
rdcrc32.c
rdfnv1a.c
rdbase64.c
rdkafka.c
rdkafka_assignor.c
rdkafka_broker.c
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
rdkafka_msgset_writer.c rdkafka_msgset_reader.c \
rdkafka_header.c rdkafka_admin.c rdkafka_aux.c \
rdkafka_background.c rdkafka_idempotence.c rdkafka_cert.c \
rdkafka_txnmgr.c rdkafka_coord.c \
rdkafka_txnmgr.c rdkafka_coord.c rdbase64.c \
rdvarint.c rdbuf.c rdmap.c rdunittest.c \
rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c \
rdkafka_error.c rdkafka_fetcher.c \
Expand Down
130 changes: 130 additions & 0 deletions src/rdbase64.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* librdkafka - The Apache Kafka C/C++ library
*
* Copyright (c) 2023 Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "rdbase64.h"

#if WITH_SSL
#include <openssl/ssl.h>
#endif

/**
* @brief Base64 encode binary input \p in, and write base64-encoded string
* and it's size to \p out. out->ptr will be NULL in case of some issue
* with the conversion or the conversion is not supported.
*
* @post out->ptr must be freed after use.
emasab marked this conversation as resolved.
Show resolved Hide resolved
*/
void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out) {

#if WITH_SSL
size_t max_len;

/* OpenSSL takes an |int| argument so the input cannot exceed that. */
if (in->size > INT_MAX) {
out->ptr = NULL;
return;
}

max_len = (((in->size + 2) / 3) * 4) + 1;
emasab marked this conversation as resolved.
Show resolved Hide resolved
out->ptr = rd_malloc(max_len);
if (out->ptr == NULL)
emasab marked this conversation as resolved.
Show resolved Hide resolved
return;

out->size = EVP_EncodeBlock((unsigned char *)out->ptr,
(unsigned char *)in->ptr, (int)in->size);

rd_assert(out->size < max_len);
out->ptr[out->size] = 0;
#else
out->ptr = NULL;
return;
emasab marked this conversation as resolved.
Show resolved Hide resolved
#endif
}


/**
* @brief Base64 encode binary input \p in.
* @returns a newly allocated, base64-encoded string or NULL in case of some
* issue with the conversion or the conversion is not supported.
emasab marked this conversation as resolved.
Show resolved Hide resolved
*
* @post Returned string must be freed after use.
*/
char *rd_base64_encode_str(const rd_chariov_t *in) {
rd_chariov_t out;
rd_base64_encode(in, &out);
return out.ptr;
}


/**
* @brief Base64 decode input string \p in. Ignores leading and trailing
* whitespace.
* @returns * 0 on successes in which case a newly allocated binary string is
* set in out (and size).
emasab marked this conversation as resolved.
Show resolved Hide resolved
* * -1 on invalid Base64.
* * -2 on conversion not supported.
*/
int rd_base64_decode(const rd_chariov_t *in, rd_chariov_t *out) {

#if WITH_SSL
size_t ret_len;

/* OpenSSL takes an |int| argument, so |in->size| must not exceed
* that. */
if (in->size % 4 != 0 || in->size > INT_MAX) {
return -1;
}

ret_len = ((in->size / 4) * 3);
out->ptr = rd_malloc(ret_len + 1);

if (EVP_DecodeBlock((unsigned char *)out->ptr, (unsigned char *)in->ptr,
(int)in->size) == -1) {
rd_free(out->ptr);
out->ptr = NULL;
return -1;
}

/* EVP_DecodeBlock will pad the output with trailing NULs and count
* them in the return value. */
if (in->size > 1 && in->ptr[in->size - 1] == '=') {
if (in->size > 2 && in->ptr[in->size - 2] == '=') {
ret_len -= 2;
} else {
ret_len -= 1;
}
}

out->ptr[ret_len] = 0;
out->size = ret_len;

return 0;
#else
return -2;
#endif
}
41 changes: 41 additions & 0 deletions src/rdbase64.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* librdkafka - The Apache Kafka C/C++ library
*
* Copyright (c) 2023 Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/


#ifndef _RDBASE64_H_
#define _RDBASE64_H_

#include "rd.h"

void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out);

char *rd_base64_encode_str(const rd_chariov_t *in);

int rd_base64_decode(const rd_chariov_t *in, rd_chariov_t *out);

#endif /* _RDBASE64_H_ */
18 changes: 17 additions & 1 deletion src/rdkafka_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,6 @@ rd_kafka_buf_update_i64(rd_kafka_buf_t *rkbuf, size_t of, int64_t v) {
rd_kafka_buf_update(rkbuf, of, &v, sizeof(v));
}


/**
* @brief Write standard (2-byte header) or KIP-482 COMPACT_STRING to buffer.
*
Expand Down Expand Up @@ -1428,4 +1427,21 @@ void rd_kafka_buf_set_maker(rd_kafka_buf_t *rkbuf,
rd_kafka_make_req_cb_t *make_cb,
void *make_opaque,
void (*free_make_opaque_cb)(void *make_opaque));


#define rd_kafka_buf_read_uuid(rkbuf, uuid) \
do { \
rd_kafka_buf_read_i64(rkbuf, \
&((uuid)->most_significant_bits)); \
rd_kafka_buf_read_i64(rkbuf, \
&((uuid)->least_significant_bits)); \
(uuid)->base64str[0] = '\0'; \
} while (0)

static RD_UNUSED void rd_kafka_buf_write_uuid(rd_kafka_buf_t *rkbuf,
emasab marked this conversation as resolved.
Show resolved Hide resolved
rd_kafka_uuid_t *uuid) {
rd_kafka_buf_write_i64(rkbuf, uuid->most_significant_bits);
rd_kafka_buf_write_i64(rkbuf, uuid->least_significant_bits);
}

#endif /* _RDKAFKA_BUF_H_ */
5 changes: 5 additions & 0 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,11 @@ rd_kafka_parse_Metadata(rd_kafka_broker_t *rkb,
rd_kafka_buf_read_i16a(rkbuf, md->topics[i].err);
rd_kafka_buf_read_str_tmpabuf(rkbuf, &tbuf,
md->topics[i].topic);

if (ApiVersion >= 10) {
rd_kafka_buf_read_uuid(rkbuf, &mdi->topics[i].topic_id);
}

if (ApiVersion >= 1) {
int8_t is_internal;
rd_kafka_buf_read_i8(rkbuf, &is_internal);
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ typedef struct rd_kafka_metadata_topic_internal_s {
* same count as metadata.topics[i].partition_cnt.
* Sorted by Partition Id. */
rd_kafka_metadata_partition_internal_t *partitions;
rd_kafka_uuid_t topic_id;
} rd_kafka_metadata_topic_internal_t;


Expand Down
69 changes: 69 additions & 0 deletions src/rdkafka_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
#define _RDKAFKA_PROTO_H_


#include "rdstring.h"
#include "rdendian.h"
#include "rdvarint.h"
#include "rdbase64.h"

/* Protocol defines */
#include "rdkafka_protocol.h"
Expand Down Expand Up @@ -565,6 +567,73 @@ typedef struct rd_kafka_buf_s rd_kafka_buf_t;
(8 + 4 + 4 + 1 + 4 + 2 + 4 + 8 + 8 + 8 + 2 + 4)


/**
emasab marked this conversation as resolved.
Show resolved Hide resolved
* @brief UUID
emasab marked this conversation as resolved.
Show resolved Hide resolved
*
* @param most_significant_bits - most significant 64 bits for the UUID
* @param least_significant_bits - least significant 64 bits for the UUID
* @param base64str - base64 encoding for the uuid. By default, it is lazy
emasab marked this conversation as resolved.
Show resolved Hide resolved
* loaded. Use function `rd_kafka_uuid_base64str()` as a getter for this field.
*/
typedef struct rd_kafka_uuid_s {
int64_t most_significant_bits;
int64_t least_significant_bits;
char base64str[23];
pranavrth marked this conversation as resolved.
Show resolved Hide resolved
} rd_kafka_uuid_t;

#define RD_KAFKA_ZERO_UUID \
emasab marked this conversation as resolved.
Show resolved Hide resolved
{ 0, 0, "" }

#define RD_KAFKA_METADATA_TOPIC_ID \
emasab marked this conversation as resolved.
Show resolved Hide resolved
{ 0, 1, "" }

/**
* Initialize given UUID to zero UUID.
*
* @param uuid - UUID to initialize.
emasab marked this conversation as resolved.
Show resolved Hide resolved
*/
static RD_INLINE RD_UNUSED void rd_kafka_uuid_init(rd_kafka_uuid_t *uuid) {
emasab marked this conversation as resolved.
Show resolved Hide resolved
memset(uuid, 0, sizeof(*uuid));
}

/**
* @brief Computes base64 encoding for the given uuid string.
* @param uuid UUID for which base64 encoding is required.
*
* @return base64 encoded string for the given UUID or NULL in case of some
* issue with the conversion or the conversion is not supported.
*/
static RD_INLINE RD_UNUSED char *
rd_kafka_uuid_base64str(rd_kafka_uuid_t *uuid) {
if (strlen(uuid->base64str))
emasab marked this conversation as resolved.
Show resolved Hide resolved
return uuid->base64str;

rd_chariov_t in_base64;
char *out_base64_str;
char *uuid_bytes;
uint64_t input_uuid[2];

input_uuid[0] = htobe64(uuid->most_significant_bits);
input_uuid[1] = htobe64(uuid->least_significant_bits);
uuid_bytes = (char *)input_uuid;
in_base64.ptr = uuid_bytes;
in_base64.size = sizeof(uuid->most_significant_bits) +
sizeof(uuid->least_significant_bits);

out_base64_str = rd_base64_encode_str(&in_base64);
if (!out_base64_str)
return NULL;

rd_strlcpy(uuid->base64str, out_base64_str,
23 /* Removing extra ('=') padding */);
rd_free(out_base64_str);
return uuid->base64str;
}

static RD_INLINE RD_UNUSED void rd_kafka_uuid_destroy(rd_kafka_uuid_t *uuid) {
rd_free(uuid);
}


/**
* @name Producer ID and Epoch for the Idempotent Producer
Expand Down
11 changes: 9 additions & 2 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -2216,7 +2216,7 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb,
int *full_incr = NULL;

ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_Metadata, 0, 9, &features);
rkb, RD_KAFKAP_Metadata, 0, 12, &features);

rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_Metadata, 1,
4 + (50 * topic_cnt) + 1,
Expand Down Expand Up @@ -2305,13 +2305,20 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb,
if (topic_cnt > 0) {
char *topic;
int i;
rd_kafka_uuid_t zero_uuid = RD_KAFKA_ZERO_UUID;

/* Maintain a copy of the topics list so we can purge
* hints from the metadata cache on error. */
rkbuf->rkbuf_u.Metadata.topics =
rd_list_copy(topics, rd_list_string_copy, NULL);

RD_LIST_FOREACH(topic, topics, i) {
if (ApiVersion >= 10) {
/* FIXME: Not supporting topic id in the request
* right now. Update this to correct topic
* id once KIP-516 is fully implemented. */
rd_kafka_buf_write_uuid(rkbuf, &zero_uuid);
}
rd_kafka_buf_write_str(rkbuf, topic, -1);
/* Tags for previous topic */
rd_kafka_buf_write_tags(rkbuf);
Expand All @@ -2337,7 +2344,7 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb,
"on broker auto.create.topics.enable configuration");
}

if (ApiVersion >= 8 && ApiVersion < 10) {
if (ApiVersion >= 8 && ApiVersion <= 10) {
/* TODO: implement KIP-430 */
/* IncludeClusterAuthorizedOperations */
rd_kafka_buf_write_bool(rkbuf, rd_false);
Expand Down
Loading