Skip to content

Commit

Permalink
out_loki: add support for structured metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Tackaberry <tack@urandom.ca>
  • Loading branch information
jtackaberry committed Jul 4, 2024
1 parent 3f0b7c1 commit ab94f02
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 76 deletions.
214 changes: 143 additions & 71 deletions plugins/out_loki/loki.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ void flb_loki_kv_destroy(struct flb_loki_kv *kv)
flb_free(kv);
}

int flb_loki_kv_append(struct flb_loki *ctx, char *key, char *val)
int flb_loki_kv_append(struct flb_loki *ctx, struct mk_list *list, char *key, char *val)
{
int ra_count = 0;
int k_len;
Expand Down Expand Up @@ -276,7 +276,7 @@ int flb_loki_kv_append(struct flb_loki *ctx, char *key, char *val)
return -1;
}
}
mk_list_add(&kv->_head, &ctx->labels_list);
mk_list_add(&kv->_head, list);

/* return the number of record accessor values */
return ra_count;
Expand All @@ -291,6 +291,13 @@ static void flb_loki_kv_exit(struct flb_loki *ctx)
mk_list_foreach_safe(head, tmp, &ctx->labels_list) {
kv = mk_list_entry(head, struct flb_loki_kv, _head);

/* unlink and destroy */
mk_list_del(&kv->_head);
flb_loki_kv_destroy(kv);
}
mk_list_foreach_safe(head, tmp, &ctx->structured_metadata_list) {
kv = mk_list_entry(head, struct flb_loki_kv, _head);

/* unlink and destroy */
mk_list_del(&kv->_head);
flb_loki_kv_destroy(kv);
Expand Down Expand Up @@ -337,25 +344,17 @@ static int pack_label_key(msgpack_packer *mp_pck, char *key, int key_len)
return 0;
}

static flb_sds_t pack_labels(struct flb_loki *ctx,
msgpack_packer *mp_pck,
char *tag, int tag_len,
msgpack_object *map)
static void pack_kv(struct flb_loki *ctx,
msgpack_packer *mp_pck,
char *tag, int tag_len,
msgpack_object *map,
struct flb_mp_map_header *mh,
struct mk_list *list)
{
int i;
flb_sds_t ra_val;
struct mk_list *head;
struct flb_ra_value *rval = NULL;
struct flb_loki_kv *kv;
msgpack_object k;
msgpack_object v;
struct flb_mp_map_header mh;


/* Initialize dynamic map header */
flb_mp_map_header_init(&mh, mp_pck);

mk_list_foreach(head, &ctx->labels_list) {
flb_sds_t ra_val;
mk_list_foreach(head, list) {
kv = mk_list_entry(head, struct flb_loki_kv, _head);

/* record accessor key/value pair */
Expand All @@ -369,7 +368,7 @@ static flb_sds_t pack_labels(struct flb_loki *ctx,
}
else {
/* Pack the key and value */
flb_mp_map_header_append(&mh);
flb_mp_map_header_append(mh);

/* We skip the first '$' character since it won't be valid in Loki */
pack_label_key(mp_pck, kv->key_normalized,
Expand All @@ -390,7 +389,7 @@ static flb_sds_t pack_labels(struct flb_loki *ctx,
* invalid or empty value, on that case the k/v is skipped.
*/
if (kv->val_type == FLB_LOKI_KV_STR) {
flb_mp_map_header_append(&mh);
flb_mp_map_header_append(mh);
msgpack_pack_str(mp_pck, flb_sds_len(kv->key));
msgpack_pack_str_body(mp_pck, kv->key, flb_sds_len(kv->key));
msgpack_pack_str(mp_pck, flb_sds_len(kv->str_val));
Expand All @@ -403,7 +402,7 @@ static flb_sds_t pack_labels(struct flb_loki *ctx,
flb_plg_debug(ctx->ins, "could not translate record accessor");
}
else {
flb_mp_map_header_append(&mh);
flb_mp_map_header_append(mh);
msgpack_pack_str(mp_pck, flb_sds_len(kv->key));
msgpack_pack_str_body(mp_pck, kv->key, flb_sds_len(kv->key));
msgpack_pack_str(mp_pck, flb_sds_len(ra_val));
Expand All @@ -415,6 +414,35 @@ static flb_sds_t pack_labels(struct flb_loki *ctx,
}
}
}
}

static flb_sds_t pack_structured_metadata(struct flb_loki *ctx,
msgpack_packer *mp_pck,
char *tag, int tag_len,
msgpack_object *map)
{
struct flb_mp_map_header mh;
/* Initialize dynamic map header */
flb_mp_map_header_init(&mh, mp_pck);
pack_kv(ctx, mp_pck, tag, tag_len, map, &mh, &ctx->structured_metadata_list);
flb_mp_map_header_end(&mh);
return 0;
}

static flb_sds_t pack_labels(struct flb_loki *ctx,
msgpack_packer *mp_pck,
char *tag, int tag_len,
msgpack_object *map)
{
int i;
struct flb_ra_value *rval = NULL;
msgpack_object k;
msgpack_object v;
struct flb_mp_map_header mh;

/* Initialize dynamic map header */
flb_mp_map_header_init(&mh, mp_pck);
pack_kv(ctx, mp_pck, tag, tag_len, map, &mh, &ctx->labels_list);

if (ctx->auto_kubernetes_labels == FLB_TRUE) {
rval = flb_ra_get_value_object(ctx->ra_k8s, *map);
Expand Down Expand Up @@ -490,7 +518,7 @@ static int create_label_map_entry(struct flb_loki *ctx,
printf("label_key=%s val_str=%s\n", label_key, val_str);
*/

ret = flb_loki_kv_append(ctx, label_key, val_str);
ret = flb_loki_kv_append(ctx, &ctx->labels_list, label_key, val_str);
flb_sds_destroy(label_key);
flb_sds_destroy(val_str);
if (ret == -1) {
Expand Down Expand Up @@ -686,68 +714,92 @@ static int load_label_map_path(struct flb_loki *ctx, flb_sds_t path, int *ra_use
return 0;
}

static int parse_labels(struct flb_loki *ctx)
static int parse_kv(struct flb_loki *ctx, struct mk_list *kv, struct mk_list *list, int *ra_used)
{
int ret;
int ra_used = 0;
char *p;
flb_sds_t key;
flb_sds_t val;
struct mk_list *head;
struct flb_slist_entry *entry;

flb_loki_kv_init(&ctx->labels_list);

if (ctx->labels) {
mk_list_foreach(head, ctx->labels) {
entry = mk_list_entry(head, struct flb_slist_entry, _head);

/* record accessor label key ? */
if (entry->str[0] == '$') {
ret = flb_loki_kv_append(ctx, entry->str, NULL);
if (ret == -1) {
return -1;
}
else if (ret > 0) {
ra_used++;
}
continue;
}
if (ctx == NULL || list == NULL || ra_used == NULL) {
return -1;
}

p = strchr(entry->str, '=');
if (!p) {
flb_plg_error(ctx->ins, "invalid key value pair on '%s'",
entry->str);
return -1;
}
mk_list_foreach(head, kv) {
entry = mk_list_entry(head, struct flb_slist_entry, _head);

key = flb_sds_create_size((p - entry->str) + 1);
flb_sds_cat(key, entry->str, p - entry->str);
val = flb_sds_create(p + 1);
if (!key) {
flb_plg_error(ctx->ins,
"invalid key value pair on '%s'",
entry->str);
/* record accessor label key ? */
if (entry->str[0] == '$') {
ret = flb_loki_kv_append(ctx, list, entry->str, NULL);
if (ret == -1) {
return -1;
}
if (!val || flb_sds_len(val) == 0) {
flb_plg_error(ctx->ins,
"invalid key value pair on '%s'",
entry->str);
flb_sds_destroy(key);
return -1;
else if (ret > 0) {
(*ra_used)++;
}
continue;
}

p = strchr(entry->str, '=');
if (!p) {
flb_plg_error(ctx->ins, "invalid key value pair on '%s'",
entry->str);
return -1;
}

ret = flb_loki_kv_append(ctx, key, val);
key = flb_sds_create_size((p - entry->str) + 1);
flb_sds_cat(key, entry->str, p - entry->str);
val = flb_sds_create(p + 1);
if (!key) {
flb_plg_error(ctx->ins,
"invalid key value pair on '%s'",
entry->str);
return -1;
}
if (!val || flb_sds_len(val) == 0) {
flb_plg_error(ctx->ins,
"invalid key value pair on '%s'",
entry->str);
flb_sds_destroy(key);
flb_sds_destroy(val);
return -1;
}
ret = flb_loki_kv_append(ctx, list, key, val);
flb_sds_destroy(key);
flb_sds_destroy(val);

if (ret == -1) {
return -1;
}
else if (ret > 0) {
ra_used++;
}
if (ret == -1) {
return -1;
}
else if (ret > 0) {
(*ra_used)++;
}
}
return 0;
}

static int parse_labels(struct flb_loki *ctx)
{
int ret;
int ra_used = 0;
struct mk_list *head;
struct flb_slist_entry *entry;

flb_loki_kv_init(&ctx->labels_list);
flb_loki_kv_init(&ctx->structured_metadata_list);

if (ctx->structured_metadata) {
ret = parse_kv(ctx, ctx->structured_metadata, &ctx->structured_metadata_list, &ra_used);
if (ret == -1) {
return -1;
}
}

if (ctx->labels) {
ret = parse_kv(ctx, ctx->labels, &ctx->labels_list, &ra_used);
if (ret == -1) {
return -1;
}
}

Expand All @@ -761,7 +813,7 @@ static int parse_labels(struct flb_loki *ctx)
return -1;
}

ret = flb_loki_kv_append(ctx, entry->str, NULL);
ret = flb_loki_kv_append(ctx, &ctx->labels_list, entry->str, NULL);
if (ret == -1) {
return -1;
}
Expand Down Expand Up @@ -918,6 +970,7 @@ static struct flb_loki *loki_config_create(struct flb_output_instance *ins,
}
ctx->ins = ins;
flb_loki_kv_init(&ctx->labels_list);
flb_loki_kv_init(&ctx->structured_metadata_list);

/* Register context with plugin instance */
flb_output_set_context(ins, ctx);
Expand Down Expand Up @@ -1431,6 +1484,13 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
* }
* ]
* }
*
* As of Loki 3.0, log entries may optionally contain a third element which is a JSON
* object indicating structured metadata:
*
* "values": [
* [ "<unix epoch in nanoseconds>", "<log line>", {"trace_id": "0242ac120002"}]
* ]
*/

ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
Expand Down Expand Up @@ -1479,11 +1539,14 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
while ((ret = flb_log_event_decoder_next(
&log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
msgpack_pack_array(&mp_pck, 2);
msgpack_pack_array(&mp_pck, ctx->structured_metadata ? 3 : 2);

/* Append the timestamp */
pack_timestamp(&mp_pck, &log_event.timestamp);
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id);
if (ctx->structured_metadata) {
pack_structured_metadata(ctx, &mp_pck, tag, tag_len, NULL);
}
}
}
else {
Expand Down Expand Up @@ -1512,11 +1575,14 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
msgpack_pack_str_body(&mp_pck, "values", 6);
msgpack_pack_array(&mp_pck, 1);

msgpack_pack_array(&mp_pck, 2);
msgpack_pack_array(&mp_pck, ctx->structured_metadata ? 3 : 2);

/* Append the timestamp */
pack_timestamp(&mp_pck, &log_event.timestamp);
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id);
if (ctx->structured_metadata) {
pack_structured_metadata(ctx, &mp_pck, tag, tag_len, log_event.body);
}
}
}

Expand Down Expand Up @@ -1808,6 +1874,12 @@ static struct flb_config_map config_map[] = {
"labels for API requests. If no value is set, the default label is 'job=fluent-bit'"
},

{
FLB_CONFIG_MAP_CLIST, "structured_metadata", NULL,
0, FLB_TRUE, offsetof(struct flb_loki, structured_metadata),
"optional structured metadata fields for API requests."
},

{
FLB_CONFIG_MAP_BOOL, "auto_kubernetes_labels", "false",
0, FLB_TRUE, offsetof(struct flb_loki, auto_kubernetes_labels),
Expand Down
12 changes: 7 additions & 5 deletions plugins/out_loki/loki.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ struct flb_loki {
/* Labels */
struct mk_list *labels;
struct mk_list *label_keys;
struct mk_list *structured_metadata;
struct mk_list *remove_keys;

flb_sds_t label_map_path;
Expand All @@ -86,11 +87,12 @@ struct flb_loki {
char *tcp_host;
int out_line_format;
int out_drop_single_key;
int ra_used; /* number of record accessor label keys */
struct flb_record_accessor *ra_k8s; /* kubernetes record accessor */
struct mk_list labels_list; /* list of flb_loki_kv nodes */
struct mk_list remove_keys_derived; /* remove_keys with label RAs */
struct flb_mp_accessor *remove_mpa; /* remove_keys multi-pattern accessor */
int ra_used; /* number of record accessor label keys */
struct flb_record_accessor *ra_k8s; /* kubernetes record accessor */
struct mk_list labels_list; /* list of flb_loki_kv nodes */
struct mk_list structured_metadata_list; /* list of flb_loki_kv nodes */
struct mk_list remove_keys_derived; /* remove_keys with label RAs */
struct flb_mp_accessor *remove_mpa; /* remove_keys multi-pattern accessor */
struct flb_record_accessor *ra_tenant_id_key; /* dynamic tenant id key */

struct cfl_list dynamic_tenant_list;
Expand Down

0 comments on commit ab94f02

Please sign in to comment.