Skip to content

Commit

Permalink
Refactored tmpabuf and fixed an insufficient buffer allocation (#4449)
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab authored Sep 28, 2023
1 parent 32747f5 commit cca5e75
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 68 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ librdkafka v2.3.0 is a feature release:
are partition leader changes and a stale leader epoch is received (#4429).
* Fix a segmentation fault when closing a consumer using the
cooperative-sticky assignor before the first assignment (#4381).
* Fix for insufficient buffer allocation when allocating rack information (@wolfchimneyrock, #4449).


## Fixes

### General fixes

* An assertion failed with insufficient buffer size when allocating
rack information on 32bit architectures.
Solved by aligning all allocations to the maximum allowed word size (#4449).



Expand Down
25 changes: 20 additions & 5 deletions src/rdkafka_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,36 @@ typedef struct rd_tmpabuf_s {
size_t of;
char *buf;
int failed;
int assert_on_fail;
rd_bool_t assert_on_fail;
} rd_tmpabuf_t;

/**
* @brief Allocate new tmpabuf with \p size bytes pre-allocated.
* @brief Initialize new tmpabuf of non-final \p size bytes.
*/
static RD_UNUSED void
rd_tmpabuf_new(rd_tmpabuf_t *tab, size_t size, int assert_on_fail) {
tab->buf = rd_malloc(size);
tab->size = size;
rd_tmpabuf_new(rd_tmpabuf_t *tab, size_t size, rd_bool_t assert_on_fail) {
tab->buf = NULL;
tab->size = RD_ROUNDUP(size, 8);
tab->of = 0;
tab->failed = 0;
tab->assert_on_fail = assert_on_fail;
}

/**
* @brief Add a new allocation of \p _size bytes,
* rounded up to maximum word size,
* for \p _times times.
*/
#define rd_tmpabuf_add_alloc_times(_tab, _size, _times) \
(_tab)->size += RD_ROUNDUP(_size, 8) * _times

#define rd_tmpabuf_add_alloc(_tab, _size) \
rd_tmpabuf_add_alloc_times(_tab, _size, 1)
/**
* @brief Finalize tmpabuf pre-allocating tab->size bytes.
*/
#define rd_tmpabuf_finalize(_tab) (_tab)->buf = rd_malloc((_tab)->size)

/**
* @brief Free memory allocated by tmpabuf
*/
Expand Down
59 changes: 32 additions & 27 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ static rd_kafka_metadata_internal_t *rd_kafka_metadata_copy_internal(
* Because of this we copy all the structs verbatim but
* any pointer fields needs to be copied explicitly to update
* the pointer address. */
rd_tmpabuf_new(&tbuf, size, 1 /*assert on fail*/);
rd_tmpabuf_new(&tbuf, size, rd_true /*assert on fail*/);
rd_tmpabuf_finalize(&tbuf);
mdi = rd_tmpabuf_write(&tbuf, src, sizeof(*mdi));
md = &mdi->metadata;

Expand Down Expand Up @@ -523,11 +524,13 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
* no more than 4 times larger than the wire representation.
* This is increased to 5 times in case if we want to compute partition
* to rack mapping. */
rd_tmpabuf_new(&tbuf,
sizeof(*mdi) + rkb_namelen +
(rkbuf->rkbuf_totlen * 4 +
(compute_racks ? rkbuf->rkbuf_totlen : 0)),
0 /*dont assert on fail*/);
rd_tmpabuf_new(&tbuf, 0, rd_false /*dont assert on fail*/);
rd_tmpabuf_add_alloc(&tbuf, sizeof(*mdi));
rd_tmpabuf_add_alloc(&tbuf, rkb_namelen);
rd_tmpabuf_add_alloc(&tbuf, rkbuf->rkbuf_totlen *
(4 + (compute_racks ? 1 : 0)));

rd_tmpabuf_finalize(&tbuf);

if (!(mdi = rd_tmpabuf_alloc(&tbuf, sizeof(*mdi)))) {
rd_kafka_broker_unlock(rkb);
Expand Down Expand Up @@ -1692,35 +1695,37 @@ rd_kafka_metadata_new_topic_mock(const rd_kafka_metadata_topic_t *topics,
rd_kafka_metadata_internal_t *mdi;
rd_kafka_metadata_t *md;
rd_tmpabuf_t tbuf;
size_t topic_names_size = 0;
int total_partition_cnt = 0;
size_t i;
int curr_broker = 0;

/* Calculate total partition count and topic names size before
* allocating memory. */
for (i = 0; i < topic_cnt; i++) {
topic_names_size += 1 + strlen(topics[i].topic);
total_partition_cnt += topics[i].partition_cnt;
}

/* If the replication factor is given, num_brokers must also be given */
rd_assert(replication_factor <= 0 || num_brokers > 0);

/* Allocate contiguous buffer which will back all the memory
* needed by the final metadata_t object */
rd_tmpabuf_new(
&tbuf,
sizeof(*mdi) + (sizeof(*md->topics) * topic_cnt) +
topic_names_size + (64 /*topic name size..*/ * topic_cnt) +
(sizeof(*md->topics[0].partitions) * total_partition_cnt) +
(sizeof(*mdi->topics) * topic_cnt) +
(sizeof(*mdi->topics[0].partitions) * total_partition_cnt) +
(sizeof(*mdi->brokers) * RD_ROUNDUP(num_brokers, 8)) +
(replication_factor > 0 ? RD_ROUNDUP(replication_factor, 8) *
total_partition_cnt * sizeof(int)
: 0),
1 /*assert on fail*/);
rd_tmpabuf_new(&tbuf, sizeof(*mdi), rd_true /*assert on fail*/);

rd_tmpabuf_add_alloc(&tbuf, topic_cnt * sizeof(*md->topics));
rd_tmpabuf_add_alloc(&tbuf, topic_cnt * sizeof(*mdi->topics));
rd_tmpabuf_add_alloc(&tbuf, num_brokers * sizeof(*md->brokers));

/* Calculate total partition count and topic names size before
* allocating memory. */
for (i = 0; i < topic_cnt; i++) {
rd_tmpabuf_add_alloc(&tbuf, 1 + strlen(topics[i].topic));
rd_tmpabuf_add_alloc(&tbuf,
topics[i].partition_cnt *
sizeof(*md->topics[i].partitions));
rd_tmpabuf_add_alloc(&tbuf,
topics[i].partition_cnt *
sizeof(*mdi->topics[i].partitions));
if (replication_factor > 0)
rd_tmpabuf_add_alloc_times(
&tbuf, replication_factor * sizeof(int),
topics[i].partition_cnt);
}

rd_tmpabuf_finalize(&tbuf);

mdi = rd_tmpabuf_alloc(&tbuf, sizeof(*mdi));
memset(mdi, 0, sizeof(*mdi));
Expand Down
38 changes: 17 additions & 21 deletions src/rdkafka_metadata_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,6 @@ static struct rd_kafka_metadata_cache_entry *rd_kafka_metadata_cache_insert(
rd_kafka_metadata_broker_internal_t *brokers_internal,
size_t broker_cnt) {
struct rd_kafka_metadata_cache_entry *rkmce, *old;
size_t topic_len;
size_t racks_size = 0;
rd_tmpabuf_t tbuf;
int i;

Expand All @@ -261,34 +259,32 @@ static struct rd_kafka_metadata_cache_entry *rd_kafka_metadata_cache_insert(
* any pointer fields needs to be copied explicitly to update
* the pointer address.
* See also rd_kafka_metadata_cache_delete which frees this. */
topic_len = strlen(mtopic->topic) + 1;
rd_tmpabuf_new(&tbuf, 0, rd_true /*assert on fail*/);

rd_tmpabuf_add_alloc(&tbuf, sizeof(*rkmce));
rd_tmpabuf_add_alloc(&tbuf, strlen(mtopic->topic) + 1);
rd_tmpabuf_add_alloc(&tbuf, mtopic->partition_cnt *
sizeof(*mtopic->partitions));
rd_tmpabuf_add_alloc(&tbuf,
mtopic->partition_cnt *
sizeof(*metadata_internal_topic->partitions));

for (i = 0; include_racks && i < mtopic->partition_cnt; i++) {
size_t j;
racks_size += RD_ROUNDUP(
metadata_internal_topic->partitions[i].racks_cnt *
sizeof(char *),
8);
rd_tmpabuf_add_alloc(
&tbuf, metadata_internal_topic->partitions[i].racks_cnt *
sizeof(char *));
for (j = 0;
j < metadata_internal_topic->partitions[i].racks_cnt;
j++) {
racks_size += RD_ROUNDUP(
strlen(metadata_internal_topic->partitions[i]
.racks[j]) +
1,
8);
rd_tmpabuf_add_alloc(
&tbuf, strlen(metadata_internal_topic->partitions[i]
.racks[j]) +
1);
}
}

rd_tmpabuf_new(
&tbuf,
RD_ROUNDUP(sizeof(*rkmce), 8) + RD_ROUNDUP(topic_len, 8) +
(mtopic->partition_cnt *
RD_ROUNDUP(sizeof(*mtopic->partitions), 8)) +
(mtopic->partition_cnt *
RD_ROUNDUP(sizeof(*metadata_internal_topic->partitions), 8)) +
racks_size,
1 /*assert on fail*/);
rd_tmpabuf_finalize(&tbuf);

rkmce = rd_tmpabuf_alloc(&tbuf, sizeof(*rkmce));

Expand Down
36 changes: 21 additions & 15 deletions src/rdkafka_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -1831,38 +1831,44 @@ rd_kafka_topic_info_t *rd_kafka_topic_info_new_with_rack(
const rd_kafka_metadata_partition_internal_t *mdpi) {
rd_kafka_topic_info_t *ti;
rd_tmpabuf_t tbuf;
size_t tlen = RD_ROUNDUP(strlen(topic) + 1, 8);
size_t total_racks_size = 0;
int i;
rd_bool_t has_racks = rd_false;

rd_tmpabuf_new(&tbuf, 0, rd_true /* assert on fail */);

rd_tmpabuf_add_alloc(&tbuf, sizeof(*ti));
rd_tmpabuf_add_alloc(&tbuf, strlen(topic) + 1);
for (i = 0; i < partition_cnt; i++) {
size_t j;
if (!mdpi[i].racks)
continue;

if (unlikely(!has_racks))
has_racks = rd_true;

for (j = 0; j < mdpi[i].racks_cnt; j++) {
total_racks_size +=
RD_ROUNDUP(strlen(mdpi[i].racks[j]) + 1, 8);
rd_tmpabuf_add_alloc(&tbuf,
strlen(mdpi[i].racks[j]) + 1);
}
total_racks_size +=
RD_ROUNDUP(sizeof(char *) * mdpi[i].racks_cnt, 8);
rd_tmpabuf_add_alloc(&tbuf, sizeof(char *) * mdpi[i].racks_cnt);
}

/* Only bother allocating this if at least one
* rack is there. */
if (has_racks) {
rd_tmpabuf_add_alloc(
&tbuf, sizeof(rd_kafka_metadata_partition_internal_t) *
partition_cnt);
}

if (total_racks_size) /* Only bother allocating this if at least one
rack is there. */
total_racks_size +=
RD_ROUNDUP(sizeof(rd_kafka_metadata_partition_internal_t) *
partition_cnt,
8);
rd_tmpabuf_finalize(&tbuf);

rd_tmpabuf_new(&tbuf, sizeof(*ti) + tlen + total_racks_size,
1 /* assert on fail */);
ti = rd_tmpabuf_alloc(&tbuf, sizeof(*ti));
ti->topic = rd_tmpabuf_write_str(&tbuf, topic);
ti->partition_cnt = partition_cnt;
ti->partitions_internal = NULL;

if (total_racks_size) {
if (has_racks) {
ti->partitions_internal = rd_tmpabuf_alloc(
&tbuf, sizeof(*ti->partitions_internal) * partition_cnt);

Expand Down

0 comments on commit cca5e75

Please sign in to comment.