From 14b804c457f9bdc82412899462be2c052fb3005e Mon Sep 17 00:00:00 2001 From: ryanohnemus Date: Fri, 22 Dec 2023 08:34:21 -0600 Subject: [PATCH] in_kubernetes_events: consolidate record timestamp logic Signed-off-by: ryanohnemus --- .../in_kubernetes_events/kubernetes_events.c | 36 +++++++++---------- src/flb_http_client.c | 6 ---- 2 files changed, 18 insertions(+), 24 deletions(-) diff --git a/plugins/in_kubernetes_events/kubernetes_events.c b/plugins/in_kubernetes_events/kubernetes_events.c index 34b8f37d5bc..eded3e3a440 100644 --- a/plugins/in_kubernetes_events/kubernetes_events.c +++ b/plugins/in_kubernetes_events/kubernetes_events.c @@ -210,7 +210,7 @@ static int record_get_field_sds(msgpack_object *obj, const char *fieldname, flb_ return 0; } -static int record_get_field_time(msgpack_object *obj, const char *fieldname, time_t *val) +static int record_get_field_time(msgpack_object *obj, const char *fieldname, struct flb_time *val) { msgpack_object *v; struct flb_tm tm = { 0 }; @@ -227,7 +227,9 @@ static int record_get_field_time(msgpack_object *obj, const char *fieldname, tim return -2; } - *val = mktime(&tm.tm); + val->tm.tv_sec = flb_parser_tm2time(&tm); + val->tm.tv_nsec = 0; + return 0; } @@ -260,7 +262,7 @@ static int record_get_field_uint64(msgpack_object *obj, const char *fieldname, u return -1; } -static int item_get_timestamp(msgpack_object *obj, time_t *event_time) +static int item_get_timestamp(msgpack_object *obj, struct flb_time *event_time) { int ret; msgpack_object *metadata; @@ -291,18 +293,18 @@ static int item_get_timestamp(msgpack_object *obj, time_t *event_time) } static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj, - time_t* event_time) + struct flb_time* event_time) { int ret; - time_t now; + uint64_t outdated; msgpack_object *metadata; flb_sds_t uid; uint64_t resource_version; - now = (time_t)(cfl_time_now() / 1000000000); - if (*event_time < (now - ctx->retention_time)) { + outdated = cfl_time_now() - (ctx->retention_time * 1000000000); + if (flb_time_to_nanosec(event_time) < outdated) { flb_plg_debug(ctx->ins, "Item is older than retention_time: %ld < %ld", - *event_time, (now - ctx->retention_time)); + flb_time_to_nanosec(event_time), outdated); return FLB_TRUE; } @@ -407,7 +409,7 @@ static int process_event_object(struct k8s_events* ctx, flb_sds_t action, return -1; } - if (check_event_is_filtered(ctx, item, (time_t)&ts) == FLB_TRUE) { + if (check_event_is_filtered(ctx, item, &ts) == FLB_TRUE) { return 0; } @@ -489,8 +491,6 @@ static int process_event_list(struct k8s_events *ctx, char *in_data, size_t in_s char *buf_data; size_t buf_size; size_t off = 0; - struct flb_time ts; - uint64_t resource_version; msgpack_unpacked result; msgpack_object root; msgpack_object k; @@ -671,7 +671,7 @@ static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *i { int ret; uint64_t resource_version; - time_t last; + struct flb_time last; msgpack_object *meta; flb_sds_t uid; @@ -709,7 +709,7 @@ static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *i /* Bind parameters */ sqlite3_bind_text(ctx->stmt_insert_kubernetes_event, 1, uid, -1, 0); sqlite3_bind_int64(ctx->stmt_insert_kubernetes_event, 2, resource_version); - sqlite3_bind_int64(ctx->stmt_insert_kubernetes_event, 3, (int64_t)last); + sqlite3_bind_int64(ctx->stmt_insert_kubernetes_event, 3, flb_time_to_nanosec(&last)); /* Run the insert */ ret = sqlite3_step(ctx->stmt_insert_kubernetes_event); @@ -723,8 +723,8 @@ static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *i } flb_plg_debug(ctx->ins, - "inserted k8s event: uid=%s, resource_version=%llu, last=%ld", - uid, resource_version, last); + "inserted k8s event: uid=%s, resource_version=%llu, last=%llu", + uid, resource_version, flb_time_to_nanosec(&last)); sqlite3_clear_bindings(ctx->stmt_insert_kubernetes_event); sqlite3_reset(ctx->stmt_insert_kubernetes_event); @@ -847,7 +847,7 @@ static int k8s_events_collect(struct flb_input_instance *ins, } while(continue_token != NULL); if (max_resource_version > ctx->last_resource_version) { - flb_plg_debug(ctx->ins, "set last resourceVersion=%lu", max_resource_version); + flb_plg_debug(ctx->ins, "set last resourceVersion=%llu", max_resource_version); ctx->last_resource_version = max_resource_version; } @@ -861,7 +861,7 @@ static int k8s_events_collect(struct flb_input_instance *ins, initialize_http_client(c, ctx); // Watch will stream chunked json data, so we only send - // the http request, then use flb_http_get_available_chunks + // the http request, then use flb_http_get_response_data // to attempt processing on available streamed data b_sent = 0; ret = flb_http_do_request(c, &b_sent); @@ -874,7 +874,7 @@ static int k8s_events_collect(struct flb_input_instance *ins, bytes_consumed = 0; chunk_proc_ret = 0; while ((ret == FLB_HTTP_MORE || ret == FLB_HTTP_CHUNK_AVAILABLE) && chunk_proc_ret == 0) { - ret = flb_http_get_available_chunks(c, bytes_consumed); + ret = flb_http_get_response_data(c, bytes_consumed); bytes_consumed = 0; if( c->resp.status == 200 && ret == FLB_HTTP_CHUNK_AVAILABLE ) { chunk_proc_ret = process_http_chunk(ctx, c, &bytes_consumed); diff --git a/src/flb_http_client.c b/src/flb_http_client.c index 27045181d8e..11d8edd7f9e 100644 --- a/src/flb_http_client.c +++ b/src/flb_http_client.c @@ -1179,12 +1179,6 @@ int flb_http_bearer_auth(struct flb_http_client *c, const char *token) return result; } -/* flb_http_do_request only sends the http request the data. -* This is useful for processing the chunked responses on your own. -* If you do not want to process the response on your own or expect -* all response data before you process data, use flb_http_do instead. -*/ -int flb_http_do_request(struct flb_http_client *c, size_t *bytes) /* flb_http_do_request only sends the http request the data. * This is useful for processing the chunked responses on your own. * If you do not want to process the response on your own or expect