diff --git a/src/flb_filter.c b/src/flb_filter.c index 485387c2ea7..8dfcf80ad02 100644 --- a/src/flb_filter.c +++ b/src/flb_filter.c @@ -123,11 +123,11 @@ void flb_filter_do(struct flb_input_chunk *ic, #ifdef FLB_HAVE_METRICS /* timestamp */ ts = cfl_time_now(); +#endif /* Count number of incoming records */ in_records = ic->added_records; pre_records = ic->total_records - in_records; -#endif /* Iterate filters */ mk_list_foreach(head, &config->filters) { @@ -201,10 +201,9 @@ void flb_filter_do(struct flb_input_chunk *ic, } #endif /* FLB_HAVE_CHUNK_TRACE */ - -#ifdef FLB_HAVE_METRICS ic->total_records = pre_records; +#ifdef FLB_HAVE_METRICS /* cmetrics */ cmt_counter_add(f_ins->cmt_drop_records, ts, in_records, 1, (char *[]) {name}); @@ -216,8 +215,9 @@ void flb_filter_do(struct flb_input_chunk *ic, break; } else { -#ifdef FLB_HAVE_METRICS out_records = flb_mp_count(out_buf, out_size); + +#ifdef FLB_HAVE_METRICS if (out_records > in_records) { diff = (out_records - in_records); @@ -240,11 +240,11 @@ void flb_filter_do(struct flb_input_chunk *ic, flb_metrics_sum(FLB_METRIC_N_DROPPED, diff, f_ins->metrics); } +#endif /* set number of records in new chunk */ in_records = out_records; ic->total_records = pre_records + in_records; -#endif } #ifdef FLB_HAVE_CHUNK_TRACE diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index 959e5fd396e..7437ca77654 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -1499,31 +1499,6 @@ static int input_chunk_append_raw(struct flb_input_instance *in, flb_chunk_trace_do_input(ic); #endif /* FLB_HAVE_CHUNK_TRACE */ - /* Update 'input' metrics */ -#ifdef FLB_HAVE_METRICS - if (ret == CIO_OK) { - ic->added_records = n_records; - ic->total_records += n_records; - } - - if (ic->total_records > 0) { - /* timestamp */ - ts = cfl_time_now(); - - /* fluentbit_input_records_total */ - cmt_counter_add(in->cmt_records, ts, ic->added_records, - 1, (char *[]) {(char *) flb_input_name(in)}); - - /* fluentbit_input_bytes_total */ - cmt_counter_add(in->cmt_bytes, ts, buf_size, - 1, (char *[]) {(char *) flb_input_name(in)}); - - /* OLD api */ - flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->added_records, in->metrics); - flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics); - } -#endif - filtered_data_buffer = NULL; final_data_buffer = (char *) buf; final_data_size = buf_size; @@ -1555,6 +1530,31 @@ static int input_chunk_append_raw(struct flb_input_instance *in, flb_free(filtered_data_buffer); } + if (ret == CIO_OK) { + ic->added_records = n_records; + ic->total_records += n_records; + } + + /* Update 'input' metrics */ +#ifdef FLB_HAVE_METRICS + if (ic->total_records > 0) { + /* timestamp */ + ts = cfl_time_now(); + + /* fluentbit_input_records_total */ + cmt_counter_add(in->cmt_records, ts, ic->added_records, + 1, (char *[]) {(char *) flb_input_name(in)}); + + /* fluentbit_input_bytes_total */ + cmt_counter_add(in->cmt_bytes, ts, buf_size, + 1, (char *[]) {(char *) flb_input_name(in)}); + + /* OLD api */ + flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->added_records, in->metrics); + flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics); + } +#endif + if (ret == -1) { flb_error("[input chunk] error writing data from %s instance", in->name); diff --git a/src/flb_task.c b/src/flb_task.c index ed2ca5cb7e3..d10df9015db 100644 --- a/src/flb_task.c +++ b/src/flb_task.c @@ -362,9 +362,7 @@ struct flb_task *flb_task_create(uint64_t ref_id, return NULL; } -#ifdef FLB_HAVE_METRICS total_events = ((struct flb_input_chunk *) ic)->total_records; -#endif /* event chunk */ evc = flb_event_chunk_create(ic->event_type, diff --git a/tests/internal/input_chunk.c b/tests/internal/input_chunk.c index 7154b324baf..2e2b5d04bef 100644 --- a/tests/internal/input_chunk.c +++ b/tests/internal/input_chunk.c @@ -463,7 +463,7 @@ void flb_test_input_chunk_fs_chunks_size_real() flb_input_chunk_append_raw(i_ins, FLB_INPUT_LOGS, 256, "dummy", 4, (void *)buf, 256); msgpack_sbuffer_destroy(&mp_sbuf); - /* clean up test chunks */ + /* Check each test chunk for size discrepancy */ mk_list_foreach_safe(head, tmp, &i_ins->chunks) { ic = mk_list_entry(head, struct flb_input_chunk, _head); if (cio_chunk_get_real_size(ic->chunk) != cio_chunk_get_content_size(ic->chunk)) { @@ -506,11 +506,99 @@ void flb_test_input_chunk_fs_chunks_size_real() flb_config_exit(cfg); } +/* This tests uses the subsystems of the engine directly + * to avoid threading issues when submitting chunks. + */ +void flb_test_input_chunk_correct_total_records(void) +{ + int records; + struct flb_input_instance *i_ins; + struct flb_output_instance *o_ins; + struct mk_list *tmp; + struct mk_list *head; + struct flb_input_chunk *ic; + struct flb_task *task; + struct flb_config *cfg; + struct cio_ctx *cio; + msgpack_sbuffer mp_sbuf; + char buf[262144]; + struct mk_event_loop *evl; + struct cio_options opts = {0}; + + flb_init_env(); + cfg = flb_config_init(); + evl = mk_event_loop_create(256); + + TEST_CHECK(evl != NULL); + cfg->evl = evl; + + flb_log_create(cfg, FLB_LOG_STDERR, FLB_LOG_DEBUG, NULL); + + i_ins = flb_input_new(cfg, "dummy", NULL, FLB_TRUE); + i_ins->storage_type = CIO_STORE_FS; + + cio_options_init(&opts); + + opts.root_path = "/tmp/input-chunk-fs_chunks-size_real"; + opts.log_cb = log_cb; + opts.log_level = CIO_LOG_DEBUG; + opts.flags = CIO_OPEN; + + cio = cio_create(&opts); + flb_storage_input_create(cio, i_ins); + flb_input_init_all(cfg); + + o_ins = flb_output_new(cfg, "http", NULL, FLB_TRUE); + // not the right way to do this + o_ins->id = 1; + TEST_CHECK_(o_ins != NULL, "unable to instance output"); + flb_output_set_property(o_ins, "match", "*"); + flb_output_set_property(o_ins, "storage.total_limit_size", "1M"); + + TEST_CHECK_((flb_router_io_set(cfg) != -1), "unable to router"); + + /* fill up the chunk ... */ + memset((void *)buf, 0x41, sizeof(buf)); + msgpack_sbuffer_init(&mp_sbuf); + gen_buf(&mp_sbuf, buf, sizeof(buf)); + + records = flb_mp_count(buf, sizeof(buf)); + flb_input_chunk_append_raw(i_ins, FLB_INPUT_LOGS, records, "dummy", 4, (void *)buf, sizeof(buf)); + msgpack_sbuffer_destroy(&mp_sbuf); + + /* Check each chunk's total records */ + mk_list_foreach_safe(head, tmp, &i_ins->chunks) { + ic = mk_list_entry(head, struct flb_input_chunk, _head); + TEST_CHECK_(ic->total_records > 0, "found input chunk with 0 total records"); + } + + /* FORCE clean up test tasks*/ + mk_list_foreach_safe(head, tmp, &i_ins->tasks) { + task = mk_list_entry(head, struct flb_task, _head); + flb_info("[task] cleanup test task"); + flb_task_destroy(task, FLB_TRUE); + } + + /* clean up test chunks */ + mk_list_foreach_safe(head, tmp, &i_ins->chunks) { + ic = mk_list_entry(head, struct flb_input_chunk, _head); + flb_input_chunk_destroy(ic, FLB_TRUE); + } + + cio_destroy(cio); + flb_router_exit(cfg); + flb_input_exit_all(cfg); + flb_output_exit(cfg); + flb_config_exit(cfg); +} + + /* Test list */ TEST_LIST = { {"input_chunk_exceed_limit", flb_test_input_chunk_exceed_limit}, {"input_chunk_buffer_valid", flb_test_input_chunk_buffer_valid}, {"input_chunk_dropping_chunks", flb_test_input_chunk_dropping_chunks}, {"input_chunk_fs_chunk_size_real", flb_test_input_chunk_fs_chunks_size_real}, + {"input_chunk_correct_total_records", flb_test_input_chunk_correct_total_records}, {NULL, NULL} };