Skip to content

Commit

Permalink
input: Add ingestion_paused metrics to confirm whether an input plugi…
Browse files Browse the repository at this point in the history
…n is paused or not (#8044)
  • Loading branch information
cosmo0920 authored Oct 17, 2023
1 parent 284a4eb commit 92b9053
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
3 changes: 3 additions & 0 deletions include/fluent-bit/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ struct flb_input_instance {
/* is the input instance overlimit ?: 1 or 0 */
struct cmt_gauge *cmt_storage_overlimit;

/* is the input instance paused or not ?: 1 or 0 */
struct cmt_gauge *cmt_ingestion_paused;

/* memory bytes used by chunks */
struct cmt_gauge *cmt_storage_memory_bytes;

Expand Down
31 changes: 31 additions & 0 deletions src/flb_input.c
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,15 @@ int flb_input_instance_init(struct flb_input_instance *ins,
1, (char *[]) {"name"});
cmt_counter_set(ins->cmt_records, ts, 0, 1, (char *[]) {name});

/* fluentbit_input_ingestion_paused */
ins->cmt_ingestion_paused = \
cmt_gauge_create(ins->cmt,
"fluentbit", "input",
"ingestion_paused",
"Is the input paused or not?",
1, (char *[]) {"name"});
cmt_gauge_set(ins->cmt_ingestion_paused, ts, 0, 1, (char *[]) {name});

/* Storage Metrics */
if (ctx->storage_metrics == FLB_TRUE) {
/* fluentbit_input_storage_overlimit */
Expand Down Expand Up @@ -1670,6 +1679,24 @@ int flb_input_test_pause_resume(struct flb_input_instance *ins, int sleep_second
return 0;
}

static void flb_input_ingestion_paused(struct flb_input_instance *ins)
{
if (ins->cmt_ingestion_paused != NULL) {
/* cmetrics */
cmt_gauge_set(ins->cmt_ingestion_paused, cfl_time_now(), 1,
1, (char *[]) {flb_input_name(ins)});
}
}

static void flb_input_ingestion_resumed(struct flb_input_instance *ins)
{
if (ins->cmt_ingestion_paused != NULL) {
/* cmetrics */
cmt_gauge_set(ins->cmt_ingestion_paused, cfl_time_now(), 0,
1, (char *[]) {flb_input_name(ins)});
}
}

int flb_input_pause(struct flb_input_instance *ins)
{
/* if the instance is already paused, just return */
Expand All @@ -1689,6 +1716,8 @@ int flb_input_pause(struct flb_input_instance *ins)
}
}

flb_input_ingestion_paused(ins);

return 0;
}

Expand All @@ -1704,6 +1733,8 @@ int flb_input_resume(struct flb_input_instance *ins)
}
}

flb_input_ingestion_resumed(ins);

return 0;
}

Expand Down

0 comments on commit 92b9053

Please sign in to comment.