Skip to content

Commit

Permalink
Handle overflow in rd_buf_write_remains
Browse files Browse the repository at this point in the history
  • Loading branch information
anchitj committed Apr 16, 2024
1 parent 8532a0e commit 66fa00b
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
6 changes: 5 additions & 1 deletion src/rdbuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ static RD_INLINE RD_UNUSED size_t rd_buf_write_pos(const rd_buf_t *rbuf) {
* @returns the number of bytes available for writing (before growing).
*/
static RD_INLINE RD_UNUSED size_t rd_buf_write_remains(const rd_buf_t *rbuf) {
return rbuf->rbuf_size - (rbuf->rbuf_len + rbuf->rbuf_erased);
ssize_t remaining =
rbuf->rbuf_size - (rbuf->rbuf_len + rbuf->rbuf_erased);
if (remaining < 0)
return 0;
return remaining;
}


Expand Down
21 changes: 20 additions & 1 deletion tests/0011-produce_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,22 @@ static void test_single_partition(void) {
int failcnt = 0;
int i;
rd_kafka_message_t *rkmessages;
const int topicSuffixLength = 56, clientIdLength = 239;
char *topicSuffix, *clientId;

SUB_TEST_QUICK();

msgid_next = 0;

test_conf_init(&conf, &topic_conf, 20);

clientId = (char *)malloc(clientIdLength + 1 * sizeof(char));
for (i = 0; i < clientIdLength; i++) {
clientId[i] = 'c';
}
clientId[clientIdLength] = '\0';
rd_kafka_conf_set(conf, "client.id", clientId, NULL, 0);

/* Set delivery report callback */
rd_kafka_conf_set_dr_cb(conf, dr_single_partition_cb);

Expand All @@ -106,7 +115,14 @@ static void test_single_partition(void) {
TEST_SAY("test_single_partition: Created kafka instance %s\n",
rd_kafka_name(rk));

rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0011", 0), topic_conf);
topicSuffix = (char *)malloc(topicSuffixLength + 1 * sizeof(char));
for (i = 0; i < topicSuffixLength; i++) {
topicSuffix[i] = 'b';
}
topicSuffix[topicSuffixLength] = '\0';

rkt = rd_kafka_topic_new(rk, test_mk_topic_name(topicSuffix, 0),
topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno));

Expand Down Expand Up @@ -178,6 +194,9 @@ static void test_single_partition(void) {
TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
rd_kafka_destroy(rk);

free(clientId);
free(topicSuffix);

SUB_TEST_PASS();
}

Expand Down

0 comments on commit 66fa00b

Please sign in to comment.