From c0aa1899f4139a91f7374c395e3cdcf3be9f7aaf Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 18 Oct 2023 01:18:32 +0900 Subject: [PATCH] input: Add ingestion_paused metrics to confirm whether an input plugin is paused or not (#8044) --- include/fluent-bit/flb_input.h | 3 +++ src/flb_input.c | 31 +++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index 1b4dff82254..62d5851e8ab 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -326,6 +326,9 @@ struct flb_input_instance { /* is the input instance overlimit ?: 1 or 0 */ struct cmt_gauge *cmt_storage_overlimit; + /* is the input instance paused or not ?: 1 or 0 */ + struct cmt_gauge *cmt_ingestion_paused; + /* memory bytes used by chunks */ struct cmt_gauge *cmt_storage_memory_bytes; diff --git a/src/flb_input.c b/src/flb_input.c index 1c4faad4d3f..a3ae1d75bbd 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -970,6 +970,15 @@ int flb_input_instance_init(struct flb_input_instance *ins, 1, (char *[]) {"name"}); cmt_counter_set(ins->cmt_records, ts, 0, 1, (char *[]) {name}); + /* fluentbit_input_ingestion_paused */ + ins->cmt_ingestion_paused = \ + cmt_gauge_create(ins->cmt, + "fluentbit", "input", + "ingestion_paused", + "Is the input paused or not?", + 1, (char *[]) {"name"}); + cmt_gauge_set(ins->cmt_ingestion_paused, ts, 0, 1, (char *[]) {name}); + /* Storage Metrics */ if (ctx->storage_metrics == FLB_TRUE) { /* fluentbit_input_storage_overlimit */ @@ -1670,6 +1679,24 @@ int flb_input_test_pause_resume(struct flb_input_instance *ins, int sleep_second return 0; } +static void flb_input_ingestion_paused(struct flb_input_instance *ins) +{ + if (ins->cmt_ingestion_paused != NULL) { + /* cmetrics */ + cmt_gauge_set(ins->cmt_ingestion_paused, cfl_time_now(), 1, + 1, (char *[]) {flb_input_name(ins)}); + } +} + +static void flb_input_ingestion_resumed(struct flb_input_instance *ins) +{ + if (ins->cmt_ingestion_paused != NULL) { + /* cmetrics */ + cmt_gauge_set(ins->cmt_ingestion_paused, cfl_time_now(), 0, + 1, (char *[]) {flb_input_name(ins)}); + } +} + int flb_input_pause(struct flb_input_instance *ins) { /* if the instance is already paused, just return */ @@ -1689,6 +1716,8 @@ int flb_input_pause(struct flb_input_instance *ins) } } + flb_input_ingestion_paused(ins); + return 0; } @@ -1704,6 +1733,8 @@ int flb_input_resume(struct flb_input_instance *ins) } } + flb_input_ingestion_resumed(ins); + return 0; }