From 7ce7e4c5d4973a1abda6b737345cc54db3ce1dfa Mon Sep 17 00:00:00 2001 From: braydonk Date: Thu, 12 Sep 2024 21:22:25 +0000 Subject: [PATCH 1/8] non-functioning skeleton of the idea --- src/flb_input_chunk.c | 5 +- src/flb_input_log.c | 164 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 166 insertions(+), 3 deletions(-) diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index ca02e6fca68..01bdc4f9fda 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -36,6 +36,7 @@ #include #include #include +#include #include #include @@ -1124,7 +1125,9 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in, } if (id >= 0) { - if (ic->busy == FLB_TRUE || cio_chunk_is_locked(ic->chunk)) { + if (ic->busy == FLB_TRUE || cio_chunk_is_locked(ic->chunk) || + (flb_input_chunk_get_real_size(ic) + chunk_size) > + FLB_INPUT_CHUNK_FS_MAX_SIZE) { ic = NULL; } else if (cio_chunk_is_up(ic->chunk) == CIO_FALSE) { diff --git a/src/flb_input_log.c b/src/flb_input_log.c index ac4b5cdfa67..cea58ffb75f 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -23,6 +23,127 @@ #include #include #include +#include +#include + +#include + +struct buffer_entry { + char *buf; + size_t buf_size; + struct mk_list _head; +}; + +static struct buffer_entry *new_buffer_entry(const void *buf, size_t buf_size) +{ + struct buffer_entry *new_entry = flb_malloc(sizeof(struct buffer_entry)); + new_entry->buf_size = buf_size; + new_entry->buf = buf; + return new_entry; +} + +// static struct buffer_entry *init_buffer_entry(size_t buf_size) { +// struct *buffer_entry new_entry = flb_malloc(sizeof(struct buffer_entry)); +// new_entry->buf_size = buf_size; +// new_entry->buf = flb_malloc(buf_size); +// return new_entry; +// } + +static int split_buffer_entry(struct buffer_entry *entry, + struct buffer_entry ***entries) +{ + int ret; + int encoder_result; + size_t bytes_consumed = 0; + struct buffer_entry *split_entries[2]; + struct buffer_entry *curr; + void *tmp_encoder_buf; + size_t tmp_encoder_buf_size; + size_t split_size = entry->buf_size / 2; + struct flb_log_event_encoder log_encoder; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + size_t bytes_left; + + ret = flb_log_event_decoder_init(&log_decoder, entry->buf, entry->buf_size); + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_error("Log event decoder initialization error : %d", ret); + + return FLB_FALSE; + } + + ret = flb_log_event_encoder_init(&log_encoder, + FLB_LOG_EVENT_FORMAT_DEFAULT); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_error("Log event encoder initialization error : %d", ret); + + flb_log_event_decoder_destroy(&log_decoder); + + return FLB_FALSE; + } + + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + encoder_result = flb_log_event_encoder_begin_record(&log_encoder); + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + encoder_result = flb_log_event_encoder_set_timestamp( + &log_encoder, &log_event.timestamp); + } + + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + encoder_result = \ + flb_log_event_encoder_set_metadata_from_msgpack_object( + &log_encoder, log_event.metadata); + } + + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + encoder_result = \ + flb_log_event_encoder_set_body_from_msgpack_object( + &log_encoder, log_event.body); + } + + if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { + encoder_result = flb_log_event_encoder_commit_record(&log_encoder); + } + + if (encoder_result != FLB_EVENT_ENCODER_SUCCESS) { + flb_error("log event encoder error : %d", encoder_result); + continue; + } + + if (log_encoder.output_length >= split_size) { + tmp_encoder_buf = log_encoder.output_buffer; + tmp_encoder_buf_size = log_encoder.output_length; + flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); + split_entries[0] = new_buffer_entry(tmp_encoder_buf, + tmp_encoder_buf_size); + flb_log_event_encoder_reset(&log_encoder); + } + } + + if (log_encoder.output_length >= 0) { + tmp_encoder_buf = log_encoder.output_buffer; + tmp_encoder_buf_size = log_encoder.output_length; + flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); + split_entries[1] = new_buffer_entry(tmp_encoder_buf, + tmp_encoder_buf_size); + flb_log_event_encoder_reset(&log_encoder); + } + + flb_log_event_encoder_destroy(&log_encoder); + flb_log_event_decoder_destroy(&log_decoder); + *entries = split_entries; + return FLB_TRUE; + +error: + flb_free(split_entries[0]); + flb_free(split_entries[1]); + flb_log_event_encoder_destroy(&log_encoder); + flb_log_event_decoder_destroy(&log_decoder); + return FLB_FALSE; +} + static int input_log_append(struct flb_input_instance *ins, size_t processor_starting_stage, @@ -34,6 +155,14 @@ static int input_log_append(struct flb_input_instance *ins, int processor_is_active; void *out_buf = (void *) buf; size_t out_size = buf_size; + struct mk_list buffers; + struct mk_list *head; + struct mk_list *tmp; + struct buffer_entry *curr_buffer; + struct buffer_entry *new_buffer; + struct buffer_entry **split_entries = flb_calloc(2, sizeof(struct buffer_entry*)); + int all_buffers_sized; + int something_resized; processor_is_active = flb_processor_is_active(ins->processor); if (processor_is_active) { @@ -68,9 +197,40 @@ static int input_log_append(struct flb_input_instance *ins, } } - ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records, - tag, tag_len, out_buf, out_size); + flb_info("start with buffer size %zu", buf_size); + + mk_list_init(&buffers); + new_buffer = new_buffer_entry(buf, buf_size); + mk_list_add(&new_buffer->_head, &buffers); + + all_buffers_sized = FLB_FALSE; + while (all_buffers_sized != FLB_TRUE) { + something_resized = FLB_FALSE; + mk_list_foreach_safe(head, tmp, &buffers) { + curr_buffer = mk_list_entry(head, struct buffer_entry, _head); + + if (curr_buffer->buf_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) { + ret = split_buffer_entry(curr_buffer, &split_entries); + if (ret == FLB_TRUE) { + mk_list_add(&(split_entries[0]->_head), &buffers); + mk_list_add(&(split_entries[1]->_head), &buffers); + mk_list_del(&curr_buffer->_head); + // flb_free(curr_buffer); + something_resized = FLB_TRUE; + } + } + } + if (something_resized == FLB_FALSE) { + all_buffers_sized = FLB_TRUE; + } + } + mk_list_foreach_safe(head, tmp, &buffers) { + curr_buffer = mk_list_entry(head, struct buffer_entry, _head); + flb_info("appending buf size %zu", curr_buffer->buf_size); + ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records, + tag, tag_len, curr_buffer->buf, curr_buffer->buf_size); + } if (processor_is_active && buf != out_buf) { flb_free(out_buf); From 6f87ef41c4b029dff23e5a0f8314986eb9093820 Mon Sep 17 00:00:00 2001 From: braydonk Date: Fri, 13 Sep 2024 16:25:32 +0000 Subject: [PATCH 2/8] lossy version working --- plugins/in_tail/tail_file.c | 2 +- src/flb_input_log.c | 94 ++++++++++++++++++++++++------------- 2 files changed, 63 insertions(+), 33 deletions(-) diff --git a/plugins/in_tail/tail_file.c b/plugins/in_tail/tail_file.c index c20594c7f1c..a14ec433c22 100644 --- a/plugins/in_tail/tail_file.c +++ b/plugins/in_tail/tail_file.c @@ -615,7 +615,7 @@ static int process_content(struct flb_tail_file *file, size_t *bytes) file->tag_len, file->sl_log_event_encoder->output_buffer, file->sl_log_event_encoder->output_length); - + flb_log_event_encoder_claim_internal_buffer_ownership(file->sl_log_event_encoder); flb_log_event_encoder_reset(file->sl_log_event_encoder); } } diff --git a/src/flb_input_log.c b/src/flb_input_log.c index cea58ffb75f..c6e80b56f25 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -42,28 +42,30 @@ static struct buffer_entry *new_buffer_entry(const void *buf, size_t buf_size) return new_entry; } -// static struct buffer_entry *init_buffer_entry(size_t buf_size) { -// struct *buffer_entry new_entry = flb_malloc(sizeof(struct buffer_entry)); -// new_entry->buf_size = buf_size; -// new_entry->buf = flb_malloc(buf_size); -// return new_entry; -// } +static void buffer_entry_destroy(struct buffer_entry *entry) { + if (!entry) { + return; + } + if (entry->buf) { + flb_free(entry->buf); + } + mk_list_del(&entry->_head); + flb_free(entry); +} static int split_buffer_entry(struct buffer_entry *entry, struct buffer_entry ***entries) { int ret; int encoder_result; - size_t bytes_consumed = 0; - struct buffer_entry *split_entries[2]; - struct buffer_entry *curr; + struct buffer_entry **split_entries; void *tmp_encoder_buf; size_t tmp_encoder_buf_size; size_t split_size = entry->buf_size / 2; struct flb_log_event_encoder log_encoder; struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; - size_t bytes_left; + int entries_processed; ret = flb_log_event_decoder_init(&log_decoder, entry->buf, entry->buf_size); if (ret != FLB_EVENT_DECODER_SUCCESS) { @@ -82,6 +84,9 @@ static int split_buffer_entry(struct buffer_entry *entry, return FLB_FALSE; } + split_entries = flb_calloc(2, sizeof(struct buffer_entry*)); + + entries_processed = 0; while ((ret = flb_log_event_decoder_next( &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { @@ -113,35 +118,44 @@ static int split_buffer_entry(struct buffer_entry *entry, } if (log_encoder.output_length >= split_size) { - tmp_encoder_buf = log_encoder.output_buffer; tmp_encoder_buf_size = log_encoder.output_length; - flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); + tmp_encoder_buf = flb_malloc(tmp_encoder_buf_size); + memcpy(tmp_encoder_buf, log_encoder.output_buffer, tmp_encoder_buf_size); split_entries[0] = new_buffer_entry(tmp_encoder_buf, - tmp_encoder_buf_size); + tmp_encoder_buf_size); + flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); flb_log_event_encoder_reset(&log_encoder); } + + entries_processed++; + } + + /** + * Edge case: If only one entry was processed, that means this buffer of data + * is one entry that exceeds the chunk max size. + */ + if (entries_processed <= 1) { + buffer_entry_destroy(split_entries[0]); + buffer_entry_destroy(split_entries[1]); + flb_free(split_entries); + flb_log_event_encoder_destroy(&log_encoder); + flb_log_event_decoder_destroy(&log_decoder); + return FLB_FALSE; } if (log_encoder.output_length >= 0) { - tmp_encoder_buf = log_encoder.output_buffer; tmp_encoder_buf_size = log_encoder.output_length; - flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); + tmp_encoder_buf = flb_malloc(tmp_encoder_buf_size); + memcpy(tmp_encoder_buf, log_encoder.output_buffer, tmp_encoder_buf_size); split_entries[1] = new_buffer_entry(tmp_encoder_buf, - tmp_encoder_buf_size); - flb_log_event_encoder_reset(&log_encoder); + tmp_encoder_buf_size); + flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); } flb_log_event_encoder_destroy(&log_encoder); flb_log_event_decoder_destroy(&log_decoder); *entries = split_entries; return FLB_TRUE; - -error: - flb_free(split_entries[0]); - flb_free(split_entries[1]); - flb_log_event_encoder_destroy(&log_encoder); - flb_log_event_decoder_destroy(&log_decoder); - return FLB_FALSE; } @@ -156,10 +170,11 @@ static int input_log_append(struct flb_input_instance *ins, void *out_buf = (void *) buf; size_t out_size = buf_size; struct mk_list buffers; + struct mk_list buffers_keep; + struct mk_list buffers_discard; struct mk_list *head; struct mk_list *tmp; struct buffer_entry *curr_buffer; - struct buffer_entry *new_buffer; struct buffer_entry **split_entries = flb_calloc(2, sizeof(struct buffer_entry*)); int all_buffers_sized; int something_resized; @@ -200,27 +215,42 @@ static int input_log_append(struct flb_input_instance *ins, flb_info("start with buffer size %zu", buf_size); mk_list_init(&buffers); - new_buffer = new_buffer_entry(buf, buf_size); - mk_list_add(&new_buffer->_head, &buffers); + curr_buffer = new_buffer_entry(buf, buf_size); + mk_list_add(&curr_buffer->_head, &buffers); all_buffers_sized = FLB_FALSE; while (all_buffers_sized != FLB_TRUE) { something_resized = FLB_FALSE; + mk_list_init(&buffers_keep); + mk_list_init(&buffers_discard); mk_list_foreach_safe(head, tmp, &buffers) { curr_buffer = mk_list_entry(head, struct buffer_entry, _head); if (curr_buffer->buf_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) { ret = split_buffer_entry(curr_buffer, &split_entries); if (ret == FLB_TRUE) { - mk_list_add(&(split_entries[0]->_head), &buffers); - mk_list_add(&(split_entries[1]->_head), &buffers); - mk_list_del(&curr_buffer->_head); - // flb_free(curr_buffer); + flb_info("split to size %zu and %zu", split_entries[0]->buf_size, split_entries[1]->buf_size); + mk_list_add(&(split_entries[0]->_head), &buffers_keep); + mk_list_add(&(split_entries[1]->_head), &buffers_keep); + mk_list_add(&curr_buffer->_head, &buffers_discard); something_resized = FLB_TRUE; + } else { + mk_list_add(&curr_buffer->_head, &buffers_keep); } } } - if (something_resized == FLB_FALSE) { + + if (something_resized == FLB_TRUE) { + mk_list_foreach_safe(head, tmp, &buffers_discard) { + curr_buffer = mk_list_entry(head, struct buffer_entry, _head); + buffer_entry_destroy(curr_buffer); + } + mk_list_init(&buffers); + mk_list_foreach_safe(head, tmp, &buffers_keep) { + curr_buffer = mk_list_entry(head, struct buffer_entry, _head); + mk_list_add(&curr_buffer->_head, &buffers); + } + } else { all_buffers_sized = FLB_TRUE; } } From aeb43e2fa837b860073c199c147629b326430191 Mon Sep 17 00:00:00 2001 From: braydonk Date: Fri, 13 Sep 2024 16:36:06 +0000 Subject: [PATCH 3/8] I wasted a lot of time not doing it this way --- src/flb_input_log.c | 87 +++++++++------------------------------------ 1 file changed, 16 insertions(+), 71 deletions(-) diff --git a/src/flb_input_log.c b/src/flb_input_log.c index c6e80b56f25..30bbfb388c2 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -54,18 +54,17 @@ static void buffer_entry_destroy(struct buffer_entry *entry) { } static int split_buffer_entry(struct buffer_entry *entry, - struct buffer_entry ***entries) + struct mk_list *entries) { int ret; int encoder_result; - struct buffer_entry **split_entries; void *tmp_encoder_buf; size_t tmp_encoder_buf_size; - size_t split_size = entry->buf_size / 2; struct flb_log_event_encoder log_encoder; struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; int entries_processed; + struct buffer_entry *new_buffer; ret = flb_log_event_decoder_init(&log_decoder, entry->buf, entry->buf_size); if (ret != FLB_EVENT_DECODER_SUCCESS) { @@ -84,8 +83,6 @@ static int split_buffer_entry(struct buffer_entry *entry, return FLB_FALSE; } - split_entries = flb_calloc(2, sizeof(struct buffer_entry*)); - entries_processed = 0; while ((ret = flb_log_event_decoder_next( &log_decoder, @@ -117,12 +114,12 @@ static int split_buffer_entry(struct buffer_entry *entry, continue; } - if (log_encoder.output_length >= split_size) { + if (log_encoder.output_length >= FLB_INPUT_CHUNK_FS_MAX_SIZE) { tmp_encoder_buf_size = log_encoder.output_length; tmp_encoder_buf = flb_malloc(tmp_encoder_buf_size); memcpy(tmp_encoder_buf, log_encoder.output_buffer, tmp_encoder_buf_size); - split_entries[0] = new_buffer_entry(tmp_encoder_buf, - tmp_encoder_buf_size); + new_buffer = new_buffer_entry(tmp_encoder_buf, tmp_encoder_buf_size); + mk_list_add(&new_buffer->_head, entries); flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); flb_log_event_encoder_reset(&log_encoder); } @@ -130,31 +127,17 @@ static int split_buffer_entry(struct buffer_entry *entry, entries_processed++; } - /** - * Edge case: If only one entry was processed, that means this buffer of data - * is one entry that exceeds the chunk max size. - */ - if (entries_processed <= 1) { - buffer_entry_destroy(split_entries[0]); - buffer_entry_destroy(split_entries[1]); - flb_free(split_entries); - flb_log_event_encoder_destroy(&log_encoder); - flb_log_event_decoder_destroy(&log_decoder); - return FLB_FALSE; - } - if (log_encoder.output_length >= 0) { tmp_encoder_buf_size = log_encoder.output_length; tmp_encoder_buf = flb_malloc(tmp_encoder_buf_size); memcpy(tmp_encoder_buf, log_encoder.output_buffer, tmp_encoder_buf_size); - split_entries[1] = new_buffer_entry(tmp_encoder_buf, - tmp_encoder_buf_size); + new_buffer = new_buffer_entry(tmp_encoder_buf, tmp_encoder_buf_size); + mk_list_add(&new_buffer->_head, entries); flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); } flb_log_event_encoder_destroy(&log_encoder); flb_log_event_decoder_destroy(&log_decoder); - *entries = split_entries; return FLB_TRUE; } @@ -170,14 +153,10 @@ static int input_log_append(struct flb_input_instance *ins, void *out_buf = (void *) buf; size_t out_size = buf_size; struct mk_list buffers; - struct mk_list buffers_keep; - struct mk_list buffers_discard; struct mk_list *head; struct mk_list *tmp; struct buffer_entry *curr_buffer; struct buffer_entry **split_entries = flb_calloc(2, sizeof(struct buffer_entry*)); - int all_buffers_sized; - int something_resized; processor_is_active = flb_processor_is_active(ins->processor); if (processor_is_active) { @@ -213,53 +192,19 @@ static int input_log_append(struct flb_input_instance *ins, } flb_info("start with buffer size %zu", buf_size); - - mk_list_init(&buffers); - curr_buffer = new_buffer_entry(buf, buf_size); - mk_list_add(&curr_buffer->_head, &buffers); - - all_buffers_sized = FLB_FALSE; - while (all_buffers_sized != FLB_TRUE) { - something_resized = FLB_FALSE; - mk_list_init(&buffers_keep); - mk_list_init(&buffers_discard); + if (buf_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) { + mk_list_init(&buffers); + curr_buffer = new_buffer_entry(buf, buf_size); + split_buffer_entry(curr_buffer, &buffers); mk_list_foreach_safe(head, tmp, &buffers) { curr_buffer = mk_list_entry(head, struct buffer_entry, _head); - - if (curr_buffer->buf_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) { - ret = split_buffer_entry(curr_buffer, &split_entries); - if (ret == FLB_TRUE) { - flb_info("split to size %zu and %zu", split_entries[0]->buf_size, split_entries[1]->buf_size); - mk_list_add(&(split_entries[0]->_head), &buffers_keep); - mk_list_add(&(split_entries[1]->_head), &buffers_keep); - mk_list_add(&curr_buffer->_head, &buffers_discard); - something_resized = FLB_TRUE; - } else { - mk_list_add(&curr_buffer->_head, &buffers_keep); - } - } + flb_info("appending buf size %zu", curr_buffer->buf_size); + ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records, + tag, tag_len, curr_buffer->buf, curr_buffer->buf_size); } - - if (something_resized == FLB_TRUE) { - mk_list_foreach_safe(head, tmp, &buffers_discard) { - curr_buffer = mk_list_entry(head, struct buffer_entry, _head); - buffer_entry_destroy(curr_buffer); - } - mk_list_init(&buffers); - mk_list_foreach_safe(head, tmp, &buffers_keep) { - curr_buffer = mk_list_entry(head, struct buffer_entry, _head); - mk_list_add(&curr_buffer->_head, &buffers); - } - } else { - all_buffers_sized = FLB_TRUE; - } - } - - mk_list_foreach_safe(head, tmp, &buffers) { - curr_buffer = mk_list_entry(head, struct buffer_entry, _head); - flb_info("appending buf size %zu", curr_buffer->buf_size); + } else { ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records, - tag, tag_len, curr_buffer->buf, curr_buffer->buf_size); + tag, tag_len, buf, buf_size); } if (processor_is_active && buf != out_buf) { From cbcd35c9dd09da7ff7cf7a48000c09e02a5f8802 Mon Sep 17 00:00:00 2001 From: braydonk Date: Fri, 13 Sep 2024 18:37:32 +0000 Subject: [PATCH 4/8] working version with one memory leak left --- plugins/in_tail/tail_file.c | 2 +- src/flb_input_log.c | 51 ++++++++++++++++++++++--------------- 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/plugins/in_tail/tail_file.c b/plugins/in_tail/tail_file.c index a14ec433c22..7c6dee06ba3 100644 --- a/plugins/in_tail/tail_file.c +++ b/plugins/in_tail/tail_file.c @@ -615,8 +615,8 @@ static int process_content(struct flb_tail_file *file, size_t *bytes) file->tag_len, file->sl_log_event_encoder->output_buffer, file->sl_log_event_encoder->output_length); - flb_log_event_encoder_claim_internal_buffer_ownership(file->sl_log_event_encoder); flb_log_event_encoder_reset(file->sl_log_event_encoder); + flb_log_event_encoder_claim_internal_buffer_ownership(file->sl_log_event_encoder); } } else if (file->skip_next) { diff --git a/src/flb_input_log.c b/src/flb_input_log.c index 30bbfb388c2..67731215dd4 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -61,6 +61,7 @@ static int split_buffer_entry(struct buffer_entry *entry, void *tmp_encoder_buf; size_t tmp_encoder_buf_size; struct flb_log_event_encoder log_encoder; + int new_encoder = FLB_TRUE; struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; int entries_processed; @@ -73,20 +74,25 @@ static int split_buffer_entry(struct buffer_entry *entry, return FLB_FALSE; } - ret = flb_log_event_encoder_init(&log_encoder, - FLB_LOG_EVENT_FORMAT_DEFAULT); - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_error("Log event encoder initialization error : %d", ret); - - flb_log_event_decoder_destroy(&log_decoder); - - return FLB_FALSE; - } - entries_processed = 0; while ((ret = flb_log_event_decoder_next( &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + if (new_encoder == FLB_TRUE) { + ret = flb_log_event_encoder_init(&log_encoder, + FLB_LOG_EVENT_FORMAT_DEFAULT); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_error("Log event encoder initialization error : %d", ret); + + flb_log_event_decoder_destroy(&log_decoder); + + return FLB_FALSE; + } + + flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); + new_encoder = FLB_FALSE; + } + encoder_result = flb_log_event_encoder_begin_record(&log_encoder); if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { encoder_result = flb_log_event_encoder_set_timestamp( @@ -120,8 +126,8 @@ static int split_buffer_entry(struct buffer_entry *entry, memcpy(tmp_encoder_buf, log_encoder.output_buffer, tmp_encoder_buf_size); new_buffer = new_buffer_entry(tmp_encoder_buf, tmp_encoder_buf_size); mk_list_add(&new_buffer->_head, entries); - flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); - flb_log_event_encoder_reset(&log_encoder); + flb_log_event_encoder_destroy(&log_encoder); + new_encoder = FLB_TRUE; } entries_processed++; @@ -133,7 +139,6 @@ static int split_buffer_entry(struct buffer_entry *entry, memcpy(tmp_encoder_buf, log_encoder.output_buffer, tmp_encoder_buf_size); new_buffer = new_buffer_entry(tmp_encoder_buf, tmp_encoder_buf_size); mk_list_add(&new_buffer->_head, entries); - flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); } flb_log_event_encoder_destroy(&log_encoder); @@ -146,7 +151,7 @@ static int input_log_append(struct flb_input_instance *ins, size_t processor_starting_stage, size_t records, const char *tag, size_t tag_len, - const void *buf, size_t buf_size) + void *buf, size_t buf_size) { int ret; int processor_is_active; @@ -155,8 +160,8 @@ static int input_log_append(struct flb_input_instance *ins, struct mk_list buffers; struct mk_list *head; struct mk_list *tmp; - struct buffer_entry *curr_buffer; - struct buffer_entry **split_entries = flb_calloc(2, sizeof(struct buffer_entry*)); + struct buffer_entry *start_buffer; + struct buffer_entry *iter_buffer; processor_is_active = flb_processor_is_active(ins->processor); if (processor_is_active) { @@ -194,13 +199,17 @@ static int input_log_append(struct flb_input_instance *ins, flb_info("start with buffer size %zu", buf_size); if (buf_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) { mk_list_init(&buffers); - curr_buffer = new_buffer_entry(buf, buf_size); - split_buffer_entry(curr_buffer, &buffers); + start_buffer = new_buffer_entry(buf, buf_size); + split_buffer_entry(start_buffer, &buffers); + flb_free(start_buffer); + flb_free(buf); mk_list_foreach_safe(head, tmp, &buffers) { - curr_buffer = mk_list_entry(head, struct buffer_entry, _head); - flb_info("appending buf size %zu", curr_buffer->buf_size); + iter_buffer = mk_list_entry(head, struct buffer_entry, _head); + flb_info("appending buf size %zu", iter_buffer->buf_size); + records = flb_mp_count(iter_buffer->buf, iter_buffer->buf_size); ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records, - tag, tag_len, curr_buffer->buf, curr_buffer->buf_size); + tag, tag_len, iter_buffer->buf, iter_buffer->buf_size); + buffer_entry_destroy(iter_buffer); } } else { ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records, From 8e9102f650bab13af2f3fe6e60f278d0d11ef604 Mon Sep 17 00:00:00 2001 From: braydonk Date: Fri, 13 Sep 2024 19:26:03 +0000 Subject: [PATCH 5/8] do the buffer ownership stuff properly --- plugins/in_tail/tail_file.c | 1 + src/flb_input_log.c | 34 +++++++++++++--------------------- 2 files changed, 14 insertions(+), 21 deletions(-) diff --git a/plugins/in_tail/tail_file.c b/plugins/in_tail/tail_file.c index 7c6dee06ba3..ef40e4045ce 100644 --- a/plugins/in_tail/tail_file.c +++ b/plugins/in_tail/tail_file.c @@ -616,6 +616,7 @@ static int process_content(struct flb_tail_file *file, size_t *bytes) file->sl_log_event_encoder->output_buffer, file->sl_log_event_encoder->output_length); flb_log_event_encoder_reset(file->sl_log_event_encoder); + flb_free(file->sl_log_event_encoder->output_buffer); flb_log_event_encoder_claim_internal_buffer_ownership(file->sl_log_event_encoder); } } diff --git a/src/flb_input_log.c b/src/flb_input_log.c index 67731215dd4..f1c172d6f6c 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -74,25 +74,20 @@ static int split_buffer_entry(struct buffer_entry *entry, return FLB_FALSE; } + ret = flb_log_event_encoder_init(&log_encoder, + FLB_LOG_EVENT_FORMAT_DEFAULT); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_error("Log event encoder initialization error : %d", ret); + + flb_log_event_decoder_destroy(&log_decoder); + + return FLB_FALSE; + } + entries_processed = 0; while ((ret = flb_log_event_decoder_next( &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - if (new_encoder == FLB_TRUE) { - ret = flb_log_event_encoder_init(&log_encoder, - FLB_LOG_EVENT_FORMAT_DEFAULT); - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_error("Log event encoder initialization error : %d", ret); - - flb_log_event_decoder_destroy(&log_decoder); - - return FLB_FALSE; - } - - flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); - new_encoder = FLB_FALSE; - } - encoder_result = flb_log_event_encoder_begin_record(&log_encoder); if (encoder_result == FLB_EVENT_ENCODER_SUCCESS) { encoder_result = flb_log_event_encoder_set_timestamp( @@ -122,12 +117,10 @@ static int split_buffer_entry(struct buffer_entry *entry, if (log_encoder.output_length >= FLB_INPUT_CHUNK_FS_MAX_SIZE) { tmp_encoder_buf_size = log_encoder.output_length; - tmp_encoder_buf = flb_malloc(tmp_encoder_buf_size); - memcpy(tmp_encoder_buf, log_encoder.output_buffer, tmp_encoder_buf_size); + tmp_encoder_buf = log_encoder.output_buffer; + flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); new_buffer = new_buffer_entry(tmp_encoder_buf, tmp_encoder_buf_size); mk_list_add(&new_buffer->_head, entries); - flb_log_event_encoder_destroy(&log_encoder); - new_encoder = FLB_TRUE; } entries_processed++; @@ -151,7 +144,7 @@ static int input_log_append(struct flb_input_instance *ins, size_t processor_starting_stage, size_t records, const char *tag, size_t tag_len, - void *buf, size_t buf_size) + const void *buf, size_t buf_size) { int ret; int processor_is_active; @@ -202,7 +195,6 @@ static int input_log_append(struct flb_input_instance *ins, start_buffer = new_buffer_entry(buf, buf_size); split_buffer_entry(start_buffer, &buffers); flb_free(start_buffer); - flb_free(buf); mk_list_foreach_safe(head, tmp, &buffers) { iter_buffer = mk_list_entry(head, struct buffer_entry, _head); flb_info("appending buf size %zu", iter_buffer->buf_size); From 756b73363065a9c0bc2bc44c6e9d8d13b7761fbc Mon Sep 17 00:00:00 2001 From: braydonk Date: Fri, 13 Sep 2024 19:35:00 +0000 Subject: [PATCH 6/8] fix const discard warnings --- include/fluent-bit/flb_input_log.h | 6 +++--- src/flb_input_log.c | 11 +++++------ 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/include/fluent-bit/flb_input_log.h b/include/fluent-bit/flb_input_log.h index 91839981d27..b62d3f65e60 100644 --- a/include/fluent-bit/flb_input_log.h +++ b/include/fluent-bit/flb_input_log.h @@ -25,17 +25,17 @@ int flb_input_log_append(struct flb_input_instance *ins, const char *tag, size_t tag_len, - const void *buf, size_t buf_size); + void *buf, size_t buf_size); int flb_input_log_append_records(struct flb_input_instance *ins, size_t records, const char *tag, size_t tag_len, - const void *buf, size_t buf_size); + void *buf, size_t buf_size); int flb_input_log_append_skip_processor_stages(struct flb_input_instance *ins, size_t processor_starting_stage, const char *tag, size_t tag_len, - const void *buf, + void *buf, size_t buf_size); #endif diff --git a/src/flb_input_log.c b/src/flb_input_log.c index f1c172d6f6c..331da6f9328 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -34,7 +34,7 @@ struct buffer_entry { struct mk_list _head; }; -static struct buffer_entry *new_buffer_entry(const void *buf, size_t buf_size) +static struct buffer_entry *new_buffer_entry(void *buf, size_t buf_size) { struct buffer_entry *new_entry = flb_malloc(sizeof(struct buffer_entry)); new_entry->buf_size = buf_size; @@ -61,7 +61,6 @@ static int split_buffer_entry(struct buffer_entry *entry, void *tmp_encoder_buf; size_t tmp_encoder_buf_size; struct flb_log_event_encoder log_encoder; - int new_encoder = FLB_TRUE; struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; int entries_processed; @@ -144,7 +143,7 @@ static int input_log_append(struct flb_input_instance *ins, size_t processor_starting_stage, size_t records, const char *tag, size_t tag_len, - const void *buf, size_t buf_size) + void *buf, size_t buf_size) { int ret; int processor_is_active; @@ -217,7 +216,7 @@ static int input_log_append(struct flb_input_instance *ins, /* Take a msgpack serialized record and enqueue it as a chunk */ int flb_input_log_append(struct flb_input_instance *ins, const char *tag, size_t tag_len, - const void *buf, size_t buf_size) + void *buf, size_t buf_size) { int ret; size_t records; @@ -232,7 +231,7 @@ int flb_input_log_append_skip_processor_stages(struct flb_input_instance *ins, size_t processor_starting_stage, const char *tag, size_t tag_len, - const void *buf, + void *buf, size_t buf_size) { return input_log_append(ins, @@ -248,7 +247,7 @@ int flb_input_log_append_skip_processor_stages(struct flb_input_instance *ins, int flb_input_log_append_records(struct flb_input_instance *ins, size_t records, const char *tag, size_t tag_len, - const void *buf, size_t buf_size) + void *buf, size_t buf_size) { int ret; From 161e83485bebf5ffc0636981f1529ac19961c9fb Mon Sep 17 00:00:00 2001 From: braydonk Date: Fri, 13 Sep 2024 19:47:05 +0000 Subject: [PATCH 7/8] remove testing log statements --- src/flb_input_log.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/flb_input_log.c b/src/flb_input_log.c index 331da6f9328..9fc248f2020 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -188,7 +188,6 @@ static int input_log_append(struct flb_input_instance *ins, } } - flb_info("start with buffer size %zu", buf_size); if (buf_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) { mk_list_init(&buffers); start_buffer = new_buffer_entry(buf, buf_size); @@ -196,7 +195,6 @@ static int input_log_append(struct flb_input_instance *ins, flb_free(start_buffer); mk_list_foreach_safe(head, tmp, &buffers) { iter_buffer = mk_list_entry(head, struct buffer_entry, _head); - flb_info("appending buf size %zu", iter_buffer->buf_size); records = flb_mp_count(iter_buffer->buf, iter_buffer->buf_size); ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records, tag, tag_len, iter_buffer->buf, iter_buffer->buf_size); From c2e626a88bb2d2da715eba5f7f787fac46748a7e Mon Sep 17 00:00:00 2001 From: braydonk Date: Tue, 1 Oct 2024 13:07:14 +0000 Subject: [PATCH 8/8] config: add storage.chunk_max_size Add a configuration value for the storage chunk max size. Signed-off-by: braydonk --- include/fluent-bit/flb_config.h | 24 +++++++++++++----------- src/flb_config.c | 5 +++++ src/flb_input_log.c | 10 ++++++---- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index 82003386d6e..7dfa4bf17c5 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -48,11 +48,11 @@ struct flb_config { int is_running; /* service running ? */ double flush; /* Flush timeout */ - /* - * Maximum grace time on shutdown. If set to -1, the engine will + /* + * Maximum grace time on shutdown. If set to -1, the engine will * shutdown when all remaining tasks are flushed */ - int grace; + int grace; int grace_count; /* Count of grace shutdown tries */ flb_pipefd_t flush_fd; /* Timer FD associated to flush */ int convert_nan_to_null; /* convert null to nan ? */ @@ -227,6 +227,7 @@ struct flb_config { char *storage_bl_mem_limit; /* storage backlog memory limit */ struct flb_storage_metrics *storage_metrics_ctx; /* storage metrics context */ int storage_trim_files; /* enable/disable file trimming */ + size_t storage_chunk_max_size; /* The max chunk size */ /* Embedded SQL Database support (SQLite3) */ #ifdef FLB_HAVE_SQLDB @@ -354,15 +355,16 @@ enum conf_type { #define FLB_CONF_DNS_PREFER_IPV6 "dns.prefer_ipv6" /* Storage / Chunk I/O */ -#define FLB_CONF_STORAGE_PATH "storage.path" -#define FLB_CONF_STORAGE_SYNC "storage.sync" -#define FLB_CONF_STORAGE_METRICS "storage.metrics" -#define FLB_CONF_STORAGE_CHECKSUM "storage.checksum" -#define FLB_CONF_STORAGE_BL_MEM_LIMIT "storage.backlog.mem_limit" -#define FLB_CONF_STORAGE_MAX_CHUNKS_UP "storage.max_chunks_up" +#define FLB_CONF_STORAGE_PATH "storage.path" +#define FLB_CONF_STORAGE_SYNC "storage.sync" +#define FLB_CONF_STORAGE_METRICS "storage.metrics" +#define FLB_CONF_STORAGE_CHECKSUM "storage.checksum" +#define FLB_CONF_STORAGE_BL_MEM_LIMIT "storage.backlog.mem_limit" +#define FLB_CONF_STORAGE_MAX_CHUNKS_UP "storage.max_chunks_up" #define FLB_CONF_STORAGE_DELETE_IRRECOVERABLE_CHUNKS \ - "storage.delete_irrecoverable_chunks" -#define FLB_CONF_STORAGE_TRIM_FILES "storage.trim_files" + "storage.delete_irrecoverable_chunks" +#define FLB_CONF_STORAGE_TRIM_FILES "storage.trim_files" +#define FLB_CONF_STORAGE_CHUNK_MAX_SIZE "storage.chunk_max_size" /* Coroutines */ #define FLB_CONF_STR_CORO_STACK_SIZE "Coro_Stack_Size" diff --git a/src/flb_config.c b/src/flb_config.c index 747d855cf08..89184201a1b 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -44,6 +44,7 @@ #include #include #include +#include const char *FLB_CONF_ENV_LOGLEVEL = "FLB_LOG_LEVEL"; @@ -154,6 +155,9 @@ struct flb_service_config service_configs[] = { {FLB_CONF_STORAGE_TRIM_FILES, FLB_CONF_TYPE_BOOL, offsetof(struct flb_config, storage_trim_files)}, + {FLB_CONF_STORAGE_CHUNK_MAX_SIZE, + FLB_CONF_TYPE_INT, + offsetof(struct flb_config, storage_chunk_max_size)}, /* Coroutines */ {FLB_CONF_STR_CORO_STACK_SIZE, @@ -278,6 +282,7 @@ struct flb_config *flb_config_init() config->storage_path = NULL; config->storage_input_plugin = NULL; config->storage_metrics = FLB_TRUE; + config->storage_chunk_max_size = FLB_INPUT_CHUNK_FS_MAX_SIZE; config->sched_cap = FLB_SCHED_CAP; config->sched_base = FLB_SCHED_BASE; diff --git a/src/flb_input_log.c b/src/flb_input_log.c index 9fc248f2020..f33ede87aeb 100644 --- a/src/flb_input_log.c +++ b/src/flb_input_log.c @@ -54,7 +54,8 @@ static void buffer_entry_destroy(struct buffer_entry *entry) { } static int split_buffer_entry(struct buffer_entry *entry, - struct mk_list *entries) + struct mk_list *entries, + int buf_entry_max_size) { int ret; int encoder_result; @@ -114,7 +115,7 @@ static int split_buffer_entry(struct buffer_entry *entry, continue; } - if (log_encoder.output_length >= FLB_INPUT_CHUNK_FS_MAX_SIZE) { + if (log_encoder.output_length >= buf_entry_max_size) { tmp_encoder_buf_size = log_encoder.output_length; tmp_encoder_buf = log_encoder.output_buffer; flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); @@ -191,13 +192,14 @@ static int input_log_append(struct flb_input_instance *ins, if (buf_size > FLB_INPUT_CHUNK_FS_MAX_SIZE) { mk_list_init(&buffers); start_buffer = new_buffer_entry(buf, buf_size); - split_buffer_entry(start_buffer, &buffers); + split_buffer_entry(start_buffer, &buffers, ins->config->storage_chunk_max_size); flb_free(start_buffer); mk_list_foreach_safe(head, tmp, &buffers) { iter_buffer = mk_list_entry(head, struct buffer_entry, _head); records = flb_mp_count(iter_buffer->buf, iter_buffer->buf_size); ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records, - tag, tag_len, iter_buffer->buf, iter_buffer->buf_size); + tag, tag_len, + iter_buffer->buf, iter_buffer->buf_size); buffer_entry_destroy(iter_buffer); } } else {