diff --git a/CHANGELOG.md b/CHANGELOG.md index 25808865fd..9c6620c57e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,20 +2,17 @@ librdkafka v2.3.0 is a feature release: - * Added Topic id to the metadata response which is part of the [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers) - * Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()` - (#4240, @jainruchir). - * [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses): - Return authorized operations in Describe Responses. - (#4240, @jainruchir). - * Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()` + * [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers) + Partial support of topic identifiers. Topic identifiers in metadata response + available through the new `rd_kafka_DescribeTopics` function (#4300, #4451). + * [KIP-117](https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations) Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()` (#4240, @jainruchir). * [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses): Return authorized operations in Describe Responses. (#4240, @jainruchir). * [KIP-580](https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients): Added Exponential Backoff mechanism for retriable requests with `retry.backoff.ms` as minimum backoff and `retry.backoff.max.ms` as the - maximum backoff, with 20% jitter(#4422). + maximum backoff, with 20% jitter (#4422). * Fixed ListConsumerGroupOffsets not fetching offsets for all the topics in a group with Apache Kafka version below 2.4.0. * Add missing destroy that leads to leaking partition structure memory when there are partition leader changes and a stale leader epoch is received (#4429). diff --git a/examples/describe_topics.c b/examples/describe_topics.c index 7008693d82..cf38a70e21 100644 --- a/examples/describe_topics.c +++ b/examples/describe_topics.c @@ -198,17 +198,19 @@ print_partition_info(const rd_kafka_TopicPartitionInfo_t *partition) { */ static void print_topic_info(const rd_kafka_TopicDescription_t *topic) { size_t j; - const rd_kafka_error_t *error; - const char *topic_name = rd_kafka_TopicDescription_name(topic); - error = rd_kafka_TopicDescription_error(topic); + const char *topic_name = rd_kafka_TopicDescription_name(topic); + const rd_kafka_error_t *error = rd_kafka_TopicDescription_error(topic); const rd_kafka_AclOperation_t *authorized_operations; size_t authorized_operations_cnt; const rd_kafka_TopicPartitionInfo_t **partitions; size_t partition_cnt; + const rd_kafka_Uuid_t *topic_id = + rd_kafka_TopicDescription_topic_id(topic); + const char *topic_id_str = rd_kafka_Uuid_base64str(topic_id); if (rd_kafka_error_code(error)) { - printf("Topic: %s has error[%" PRId32 "]: %s\n", topic_name, - rd_kafka_error_code(error), + printf("Topic: %s (Topic Id: %s) has error[%" PRId32 "]: %s\n", + topic_name, topic_id_str, rd_kafka_error_code(error), rd_kafka_error_string(error)); return; } @@ -217,9 +219,9 @@ static void print_topic_info(const rd_kafka_TopicDescription_t *topic) { topic, &authorized_operations_cnt); printf( - "Topic: %s succeeded, has %d topic authorized operations " + "Topic: %s (Topic Id: %s) succeeded, has %ld authorized operations " "allowed, they are:\n", - topic_name, (int)authorized_operations_cnt); + topic_name, topic_id_str, authorized_operations_cnt); for (j = 0; j < authorized_operations_cnt; j++) printf("\t%s operation is allowed\n", diff --git a/src/rdbase64.c b/src/rdbase64.c index d81858418f..aaf2fb138e 100644 --- a/src/rdbase64.c +++ b/src/rdbase64.c @@ -30,6 +30,47 @@ #if WITH_SSL #include +#else + +#define conv_bin2ascii(a, table) ((table)[(a)&0x3f]) + +static const unsigned char data_bin2ascii[65] = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + +static int base64_encoding_conversion(unsigned char *out, + const unsigned char *in, + int dlen) { + int i, ret = 0; + unsigned long l; + + for (i = dlen; i > 0; i -= 3) { + if (i >= 3) { + l = (((unsigned long)in[0]) << 16L) | + (((unsigned long)in[1]) << 8L) | in[2]; + *(out++) = conv_bin2ascii(l >> 18L, data_bin2ascii); + *(out++) = conv_bin2ascii(l >> 12L, data_bin2ascii); + *(out++) = conv_bin2ascii(l >> 6L, data_bin2ascii); + *(out++) = conv_bin2ascii(l, data_bin2ascii); + } else { + l = ((unsigned long)in[0]) << 16L; + if (i == 2) + l |= ((unsigned long)in[1] << 8L); + + *(out++) = conv_bin2ascii(l >> 18L, data_bin2ascii); + *(out++) = conv_bin2ascii(l >> 12L, data_bin2ascii); + *(out++) = + (i == 1) ? '=' + : conv_bin2ascii(l >> 6L, data_bin2ascii); + *(out++) = '='; + } + ret += 4; + in += 3; + } + + *out = '\0'; + return ret; +} + #endif /** @@ -41,7 +82,6 @@ */ void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out) { -#if WITH_SSL size_t max_len; /* OpenSSL takes an |int| argument so the input cannot exceed that. */ @@ -53,14 +93,16 @@ void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out) { max_len = (((in->size + 2) / 3) * 4) + 1; out->ptr = rd_malloc(max_len); +#if WITH_SSL out->size = EVP_EncodeBlock((unsigned char *)out->ptr, (unsigned char *)in->ptr, (int)in->size); +#else + out->size = base64_encoding_conversion( + (unsigned char *)out->ptr, (unsigned char *)in->ptr, (int)in->size); +#endif rd_assert(out->size < max_len); out->ptr[out->size] = 0; -#else - out->ptr = NULL; -#endif } diff --git a/src/rdkafka.c b/src/rdkafka.c index 8cc891b1e1..64b2bfec6c 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -5032,3 +5032,77 @@ int rd_kafka_errno(void) { int rd_kafka_unittest(void) { return rd_unittest(); } + + +/** + * Creates a new UUID. + * + * @return A newly allocated UUID. + */ +rd_kafka_Uuid_t *rd_kafka_Uuid_new(int64_t most_significant_bits, + int64_t least_significant_bits) { + rd_kafka_Uuid_t *uuid = rd_calloc(1, sizeof(rd_kafka_Uuid_t)); + uuid->most_significant_bits = most_significant_bits; + uuid->least_significant_bits = least_significant_bits; + return uuid; +} + +/** + * Returns a newly allocated copy of the given UUID. + * + * @param uuid UUID to copy. + * @return Copy of the provided UUID. + * + * @remark Dynamically allocated. Deallocate (free) after use. + */ +rd_kafka_Uuid_t *rd_kafka_Uuid_copy(const rd_kafka_Uuid_t *uuid) { + rd_kafka_Uuid_t *copy_uuid = rd_kafka_Uuid_new( + uuid->most_significant_bits, uuid->least_significant_bits); + if (*uuid->base64str) + memcpy(copy_uuid->base64str, uuid->base64str, 23); + return copy_uuid; +} + +/** + * @brief Destroy the provided uuid. + * + * @param uuid UUID + */ +void rd_kafka_Uuid_destroy(rd_kafka_Uuid_t *uuid) { + rd_free(uuid); +} + +const char *rd_kafka_Uuid_base64str(const rd_kafka_Uuid_t *uuid) { + if (*uuid->base64str) + return uuid->base64str; + + rd_chariov_t in_base64; + char *out_base64_str; + char *uuid_bytes; + uint64_t input_uuid[2]; + + input_uuid[0] = htobe64(uuid->most_significant_bits); + input_uuid[1] = htobe64(uuid->least_significant_bits); + uuid_bytes = (char *)input_uuid; + in_base64.ptr = uuid_bytes; + in_base64.size = sizeof(uuid->most_significant_bits) + + sizeof(uuid->least_significant_bits); + + out_base64_str = rd_base64_encode_str(&in_base64); + if (!out_base64_str) + return NULL; + + rd_strlcpy((char *)uuid->base64str, out_base64_str, + 23 /* Removing extra ('=') padding */); + rd_free(out_base64_str); + return uuid->base64str; +} + +int64_t rd_kafka_Uuid_least_significant_bits(const rd_kafka_Uuid_t *uuid) { + return uuid->least_significant_bits; +} + + +int64_t rd_kafka_Uuid_most_significant_bits(const rd_kafka_Uuid_t *uuid) { + return uuid->most_significant_bits; +} \ No newline at end of file diff --git a/src/rdkafka.h b/src/rdkafka.h index d66f242307..0802d6507d 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -262,6 +262,7 @@ typedef struct rd_kafka_error_s rd_kafka_error_t; typedef struct rd_kafka_headers_s rd_kafka_headers_t; typedef struct rd_kafka_group_result_s rd_kafka_group_result_t; typedef struct rd_kafka_acl_result_s rd_kafka_acl_result_t; +typedef struct rd_kafka_Uuid_s rd_kafka_Uuid_t; /* @endcond */ @@ -1631,6 +1632,75 @@ rd_kafka_message_leader_epoch(const rd_kafka_message_t *rkmessage); /**@}*/ +/** + * @name UUID + * @{ + * + */ + +/** + * @brief Computes base64 encoding for the given uuid string. + * @param uuid UUID for which base64 encoding is required. + * + * @return base64 encoded string for the given UUID or NULL in case of some + * issue with the conversion or the conversion is not supported. + */ +RD_EXPORT const char *rd_kafka_Uuid_base64str(const rd_kafka_Uuid_t *uuid); + +/** + * @brief Gets least significant 64 bits for the given UUID. + * + * @param uuid UUID + * + * @return least significant 64 bits for the given UUID. + */ +RD_EXPORT int64_t +rd_kafka_Uuid_least_significant_bits(const rd_kafka_Uuid_t *uuid); + + +/** + * @brief Gets most significant 64 bits for the given UUID. + * + * @param uuid UUID + * + * @return most significant 64 bits for the given UUID. + */ +RD_EXPORT int64_t +rd_kafka_Uuid_most_significant_bits(const rd_kafka_Uuid_t *uuid); + + +/** + * @brief Creates a new UUID. + * + * @param most_significant_bits most significant 64 bits of the 128 bits UUID. + * @param least_significant_bits least significant 64 bits of the 128 bits UUID. + * + * @return A newly allocated UUID. + * @remark Must be freed after use using rd_kafka_Uuid_destroy() + */ +RD_EXPORT rd_kafka_Uuid_t *rd_kafka_Uuid_new(int64_t most_significant_bits, + int64_t least_significant_bits); + +/** + * @brief Copies the given UUID. + * + * @param uuid UUID to be copied. + * + * @return A newly allocated copy of the provided UUID. + * @remark Must be freed after use using rd_kafka_Uuid_destroy() + */ +RD_EXPORT rd_kafka_Uuid_t *rd_kafka_Uuid_copy(const rd_kafka_Uuid_t *uuid); + +/** + * @brief Destroy the provided uuid. + * + * @param uuid UUID + */ +RD_EXPORT void rd_kafka_Uuid_destroy(rd_kafka_Uuid_t *uuid); + +/**@}*/ + + /** * @name Configuration interface * @{ @@ -8172,6 +8242,18 @@ RD_EXPORT const char * rd_kafka_TopicDescription_name(const rd_kafka_TopicDescription_t *topicdesc); +/** + * @brief Gets the topic id for the \p topicdesc topic. + * + * @param topicdesc The topic description. + * @return The topic id + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of the \p topicdesc object. + */ +RD_EXPORT const rd_kafka_Uuid_t *rd_kafka_TopicDescription_topic_id( + const rd_kafka_TopicDescription_t *topicdesc); + /** * @brief Gets if the \p topicdesc topic is internal. * diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 3107332a7f..93e4e7d6d3 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -8010,6 +8010,7 @@ rd_kafka_TopicPartitionInfo_destroy(rd_kafka_TopicPartitionInfo_t *pinfo) { * @brief Create a new TopicDescription object. * * @param topic topic name + * @param topic_id topic id * @param partitions Array of partition metadata (rd_kafka_metadata_partition). * @param partition_cnt Number of partitions in partition metadata. * @param authorized_operations acl operations allowed for topic. @@ -8019,6 +8020,7 @@ rd_kafka_TopicPartitionInfo_destroy(rd_kafka_TopicPartitionInfo_t *pinfo) { */ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new( const char *topic, + rd_kafka_Uuid_t topic_id, const struct rd_kafka_metadata_partition *partitions, int partition_cnt, const struct rd_kafka_metadata_broker *brokers_sorted, @@ -8032,6 +8034,7 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new( int i; topicdesc = rd_calloc(1, sizeof(*topicdesc)); topicdesc->topic = rd_strdup(topic); + topicdesc->topic_id = topic_id; topicdesc->partition_cnt = partition_cnt; topicdesc->is_internal = is_internal; if (error) @@ -8063,9 +8066,10 @@ static rd_kafka_TopicDescription_t *rd_kafka_TopicDescription_new( */ static rd_kafka_TopicDescription_t * rd_kafka_TopicDescription_new_error(const char *topic, + rd_kafka_Uuid_t topic_id, rd_kafka_error_t *error) { - return rd_kafka_TopicDescription_new(topic, NULL, 0, NULL, NULL, 0, - NULL, 0, rd_false, error); + return rd_kafka_TopicDescription_new(topic, topic_id, NULL, 0, NULL, + NULL, 0, NULL, 0, rd_false, error); } static void @@ -8075,7 +8079,6 @@ rd_kafka_TopicDescription_destroy(rd_kafka_TopicDescription_t *topicdesc) { RD_IF_FREE(topicdesc->topic, rd_free); RD_IF_FREE(topicdesc->error, rd_kafka_error_destroy); RD_IF_FREE(topicdesc->authorized_operations, rd_free); - for (i = 0; i < topicdesc->partition_cnt; i++) rd_kafka_TopicPartitionInfo_destroy(topicdesc->partitions[i]); rd_free(topicdesc->partitions); @@ -8142,6 +8145,11 @@ rd_kafka_TopicDescription_error(const rd_kafka_TopicDescription_t *topicdesc) { return topicdesc->error; } +const rd_kafka_Uuid_t *rd_kafka_TopicDescription_topic_id( + const rd_kafka_TopicDescription_t *topicdesc) { + return &topicdesc->topic_id; +} + const rd_kafka_TopicDescription_t **rd_kafka_DescribeTopics_result_topics( const rd_kafka_DescribeTopics_result_t *result, size_t *cntp) { @@ -8240,7 +8248,8 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req, mdi->topics[i].topic_authorized_operations, &authorized_operation_cnt); topicdesc = rd_kafka_TopicDescription_new( - md->topics[i].topic, md->topics[i].partitions, + md->topics[i].topic, mdi->topics[i].topic_id, + md->topics[i].partitions, md->topics[i].partition_cnt, mdi->brokers_sorted, mdi->brokers, md->broker_cnt, authorized_operations, authorized_operation_cnt, @@ -8251,7 +8260,8 @@ rd_kafka_DescribeTopicsResponse_parse(rd_kafka_op_t *rko_req, md->topics[i].err, "%s", rd_kafka_err2str(md->topics[i].err)); topicdesc = rd_kafka_TopicDescription_new_error( - md->topics[i].topic, error); + md->topics[i].topic, mdi->topics[i].topic_id, + error); rd_kafka_error_destroy(error); } orig_pos = rd_list_index(&rko_result->rko_u.admin_result.args, diff --git a/src/rdkafka_admin.h b/src/rdkafka_admin.h index 4eb015fad0..3e7378af56 100644 --- a/src/rdkafka_admin.h +++ b/src/rdkafka_admin.h @@ -525,9 +525,10 @@ struct rd_kafka_TopicPartitionInfo_s { * @struct DescribeTopics result */ struct rd_kafka_TopicDescription_s { - char *topic; /**< Topic name */ - int partition_cnt; /**< Number of partitions in \p partitions*/ - rd_bool_t is_internal; /**< Is the topic is internal to Kafka? */ + char *topic; /**< Topic name */ + rd_kafka_Uuid_t topic_id; /**< Topic Id */ + int partition_cnt; /**< Number of partitions in \p partitions*/ + rd_bool_t is_internal; /**< Is the topic is internal to Kafka? */ rd_kafka_TopicPartitionInfo_t **partitions; /**< Partitions */ rd_kafka_error_t *error; /**< Topic error reported by broker */ int authorized_operations_cnt; /**< Count of operations allowed for diff --git a/src/rdkafka_buf.h b/src/rdkafka_buf.h index 623ec49ae0..b6568b0ca9 100644 --- a/src/rdkafka_buf.h +++ b/src/rdkafka_buf.h @@ -1454,7 +1454,7 @@ void rd_kafka_buf_set_maker(rd_kafka_buf_t *rkbuf, } while (0) static RD_UNUSED void rd_kafka_buf_write_uuid(rd_kafka_buf_t *rkbuf, - rd_kafka_uuid_t *uuid) { + rd_kafka_Uuid_t *uuid) { rd_kafka_buf_write_i64(rkbuf, uuid->most_significant_bits); rd_kafka_buf_write_i64(rkbuf, uuid->least_significant_bits); } diff --git a/src/rdkafka_metadata.h b/src/rdkafka_metadata.h index 2b81e0ddec..ded83bb14c 100644 --- a/src/rdkafka_metadata.h +++ b/src/rdkafka_metadata.h @@ -54,7 +54,7 @@ typedef struct rd_kafka_metadata_topic_internal_s { * same count as metadata.topics[i].partition_cnt. * Sorted by Partition Id. */ rd_kafka_metadata_partition_internal_t *partitions; - rd_kafka_uuid_t topic_id; + rd_kafka_Uuid_t topic_id; int32_t topic_authorized_operations; /**< ACL operations allowed * for topic, -1 if not * supported by broker */ diff --git a/src/rdkafka_proto.h b/src/rdkafka_proto.h index 6ee948f2cb..e6caf509e3 100644 --- a/src/rdkafka_proto.h +++ b/src/rdkafka_proto.h @@ -572,16 +572,16 @@ typedef struct rd_kafka_buf_s rd_kafka_buf_t; /** * @struct Struct representing UUID protocol primitive type. */ -typedef struct rd_kafka_uuid_s { +typedef struct rd_kafka_Uuid_s { int64_t most_significant_bits; /**< Most significant 64 bits for the UUID */ int64_t least_significant_bits; /**< Least significant 64 bits for the UUID */ char base64str[23]; /**< base64 encoding for the uuid. By default, it is lazy loaded. Use function - `rd_kafka_uuid_base64str()` as a getter for this + `rd_kafka_Uuid_base64str()` as a getter for this field. */ -} rd_kafka_uuid_t; +} rd_kafka_Uuid_t; #define RD_KAFKA_UUID_ZERO \ { 0, 0, "" } @@ -590,64 +590,6 @@ typedef struct rd_kafka_uuid_s { { 0, 1, "" } -/** - * Creates a new UUID. - * - * @return A newly allocated UUID. - */ -static RD_INLINE RD_UNUSED rd_kafka_uuid_t *rd_kafka_uuid_new() { - rd_kafka_uuid_t *uuid = rd_calloc(1, sizeof(rd_kafka_uuid_t *)); - return uuid; -} - -/** - * Initialize given UUID to zero UUID. - * - * @param uuid UUID to initialize. - */ -static RD_INLINE RD_UNUSED void rd_kafka_uuid_init(rd_kafka_uuid_t *uuid) { - memset(uuid, 0, sizeof(*uuid)); -} - -/** - * @brief Computes base64 encoding for the given uuid string. - * @param uuid UUID for which base64 encoding is required. - * - * @return base64 encoded string for the given UUID or NULL in case of some - * issue with the conversion or the conversion is not supported. - */ -static RD_INLINE RD_UNUSED char * -rd_kafka_uuid_base64str(rd_kafka_uuid_t *uuid) { - if (*uuid->base64str) - return uuid->base64str; - - rd_chariov_t in_base64; - char *out_base64_str; - char *uuid_bytes; - uint64_t input_uuid[2]; - - input_uuid[0] = htobe64(uuid->most_significant_bits); - input_uuid[1] = htobe64(uuid->least_significant_bits); - uuid_bytes = (char *)input_uuid; - in_base64.ptr = uuid_bytes; - in_base64.size = sizeof(uuid->most_significant_bits) + - sizeof(uuid->least_significant_bits); - - out_base64_str = rd_base64_encode_str(&in_base64); - if (!out_base64_str) - return NULL; - - rd_strlcpy(uuid->base64str, out_base64_str, - 23 /* Removing extra ('=') padding */); - rd_free(out_base64_str); - return uuid->base64str; -} - -static RD_INLINE RD_UNUSED void rd_kafka_uuid_destroy(rd_kafka_uuid_t *uuid) { - rd_free(uuid); -} - - /** * @name Producer ID and Epoch for the Idempotent Producer * @{ diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 46c9ed4cc3..ca99349e46 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -2321,7 +2321,7 @@ rd_kafka_MetadataRequest0(rd_kafka_broker_t *rkb, if (topic_cnt > 0) { char *topic; int i; - rd_kafka_uuid_t zero_uuid = RD_KAFKA_UUID_ZERO; + rd_kafka_Uuid_t zero_uuid = RD_KAFKA_UUID_ZERO; /* Maintain a copy of the topics list so we can purge * hints from the metadata cache on error. */ diff --git a/tests/0081-admin.c b/tests/0081-admin.c index 3107c048b0..c8c6fcc7ab 100644 --- a/tests/0081-admin.c +++ b/tests/0081-admin.c @@ -3169,6 +3169,7 @@ static void do_test_DescribeTopics(const char *what, const rd_kafka_DescribeTopics_result_t *res; const rd_kafka_TopicDescription_t **result_topics; const rd_kafka_TopicPartitionInfo_t **partitions; + const rd_kafka_Uuid_t *topic_id; size_t partitions_cnt; size_t result_topics_cnt; char errstr[128]; @@ -3261,6 +3262,10 @@ static void do_test_DescribeTopics(const char *what, "Expected topic name %s, got %s", topic_names[0], rd_kafka_TopicDescription_name(result_topics[0])); + topic_id = rd_kafka_TopicDescription_topic_id(result_topics[0]); + + TEST_ASSERT(topic_id, "Expected Topic Id to present."); + partitions = rd_kafka_TopicDescription_partitions(result_topics[0], &partitions_cnt);