Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_splunk: Add switch for storing in metadata or records and handle multiple tokens on in splunk #8900

12 changes: 12 additions & 0 deletions plugins/in_splunk/splunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,18 @@ static struct flb_config_map config_map[] = {
"Set valid Splunk HEC tokens for the requests"
},

{
FLB_CONFIG_MAP_BOOL, "store_token_to_metadata", "true",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest a little change to `store_token_in_metadata"

0, FLB_TRUE, offsetof(struct flb_splunk, store_token_to_metadata),
"Store Splunk HEC tokens to matadata. If set as false, they will be stored into records."
},

{
FLB_CONFIG_MAP_STR, "splunk_token_key", "@splunk_token",
0, FLB_TRUE, offsetof(struct flb_splunk, store_token_key),
"Set a record key for storing Splunk HEC token for the request"
},

{
FLB_CONFIG_MAP_STR, "tag_key", NULL,
0, FLB_TRUE, offsetof(struct flb_splunk, tag_key),
Expand Down
9 changes: 8 additions & 1 deletion plugins/in_splunk/splunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
#define HTTP_BUFFER_MAX_SIZE "4M"
#define HTTP_BUFFER_CHUNK_SIZE "512K"

struct flb_splunk_tokens {
flb_sds_t header;
struct mk_list _head;
};

struct flb_splunk {
flb_sds_t listen;
flb_sds_t tcp_port;
Expand All @@ -41,8 +46,10 @@ struct flb_splunk {
struct mk_list *success_headers;

/* Token Auth */
flb_sds_t auth_header;
struct mk_list auth_tokens;
flb_sds_t ingested_auth_header;
int store_token_to_metadata;
flb_sds_t store_token_key;

struct flb_log_event_encoder log_encoder;

Expand Down
87 changes: 67 additions & 20 deletions plugins/in_splunk/splunk_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,66 @@
#include "splunk_conn.h"
#include "splunk_config.h"

static void delete_hec_tokens(struct flb_splunk *ctx)
{
struct mk_list *tmp;
struct mk_list *head;
struct flb_splunk_tokens *splunk_token;

mk_list_foreach_safe(head, tmp, &ctx->auth_tokens) {
splunk_token = mk_list_entry(head, struct flb_splunk_tokens, _head);
flb_sds_destroy(splunk_token->header);
mk_list_del(&splunk_token->_head);
flb_free(&splunk_token);
}
}

static int setup_hec_tokens(struct flb_splunk *ctx)
{
int ret;
const char *tmp;
char *tmp_tokens;
char *token;
flb_sds_t auth_header;
struct flb_splunk_tokens *splunk_token;

tmp = flb_input_get_property("splunk_token", ctx->ins);
if (tmp) {
tmp_tokens = flb_strdup(tmp);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate the return value


token = strtok(tmp_tokens, ",");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid strtok(). you can use flb_utls_split instead

while (token) {
auth_header = flb_sds_create("Splunk ");
if (auth_header == NULL) {
flb_plg_error(ctx->ins, "error on prefix of auth_header generation");
return -1;
}

ret = flb_sds_cat_safe(&auth_header, tmp, strlen(tmp));
if (ret < 0) {
flb_plg_error(ctx->ins, "error on token generation");
return -1;
}

/* Create a new token */
splunk_token = flb_malloc(sizeof(struct flb_splunk_tokens));
if (!splunk_token) {
flb_errno();
return -1;
}

splunk_token->header = auth_header;

/* Link to parent list */
mk_list_add(&splunk_token->_head, &ctx->auth_tokens);

token = strtok(NULL, ",");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we avoid using strtok(2) function since it modifies the original buffer.

}
}

return 0;
}

struct flb_splunk *splunk_config_create(struct flb_input_instance *ins)
{
struct mk_list *header_iterator;
Expand All @@ -33,7 +93,6 @@ struct flb_splunk *splunk_config_create(struct flb_input_instance *ins)
char port[8];
int ret;
struct flb_splunk *ctx;
const char *tmp;

ctx = flb_calloc(1, sizeof(struct flb_splunk));
if (!ctx) {
Expand All @@ -42,6 +101,7 @@ struct flb_splunk *splunk_config_create(struct flb_input_instance *ins)
}
ctx->ins = ins;
mk_list_init(&ctx->connections);
mk_list_init(&ctx->auth_tokens);

/* Load the config map */
ret = flb_input_config_map_set(ins, (void *) ctx);
Expand All @@ -50,22 +110,12 @@ struct flb_splunk *splunk_config_create(struct flb_input_instance *ins)
return NULL;
}

ctx->auth_header = NULL;
ctx->ingested_auth_header = NULL;
tmp = flb_input_get_property("splunk_token", ins);
if (tmp) {
ctx->auth_header = flb_sds_create("Splunk ");
if (ctx->auth_header == NULL) {
flb_plg_error(ctx->ins, "error on prefix of auth_header generation");
splunk_config_destroy(ctx);
return NULL;
}
ret = flb_sds_cat_safe(&ctx->auth_header, tmp, strlen(tmp));
if (ret < 0) {
flb_plg_error(ctx->ins, "error on token generation");
splunk_config_destroy(ctx);
return NULL;
}

ret = setup_hec_tokens(ctx);
if (ret != 0) {
splunk_config_destroy(ctx);
return NULL;
}

/* Listen interface (if not set, defaults to 0.0.0.0:8088) */
Expand Down Expand Up @@ -161,10 +211,6 @@ int splunk_config_destroy(struct flb_splunk *ctx)
ctx->collector_id = -1;
}

if (ctx->auth_header != NULL) {
flb_sds_destroy(ctx->auth_header);
}

if (ctx->downstream != NULL) {
flb_downstream_destroy(ctx->downstream);
}
Expand All @@ -181,6 +227,7 @@ int splunk_config_destroy(struct flb_splunk *ctx)
flb_sds_destroy(ctx->success_headers_str);
}

delete_hec_tokens(ctx);

flb_free(ctx->listen);
flb_free(ctx->tcp_port);
Expand Down
133 changes: 95 additions & 38 deletions plugins/in_splunk/splunk_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,19 +226,36 @@ static int process_raw_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char
ret = flb_log_event_encoder_set_current_timestamp(&ctx->log_encoder);
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_values(
&ctx->log_encoder,
FLB_LOG_EVENT_CSTRING_VALUE("log"),
FLB_LOG_EVENT_STRING_VALUE(buf, size));
if (ctx->store_token_to_metadata == FLB_TRUE) {
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_values(
&ctx->log_encoder,
FLB_LOG_EVENT_CSTRING_VALUE("log"),
FLB_LOG_EVENT_STRING_VALUE(buf, size));
}
}

if (ctx->ingested_auth_header != NULL) {
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_metadata_values(
&ctx->log_encoder,
FLB_LOG_EVENT_CSTRING_VALUE("hec_token"),
FLB_LOG_EVENT_CSTRING_VALUE(ctx->ingested_auth_header));
if (ctx->store_token_to_metadata == FLB_TRUE) {
if (ctx->ingested_auth_header != NULL) {
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_metadata_values(
&ctx->log_encoder,
FLB_LOG_EVENT_CSTRING_VALUE("hec_token"),
FLB_LOG_EVENT_CSTRING_VALUE(ctx->ingested_auth_header));
}
}
}
else {
if (ctx->ingested_auth_header != NULL) {
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_values(
&ctx->log_encoder,
FLB_LOG_EVENT_CSTRING_VALUE(ctx->store_token_key),
FLB_LOG_EVENT_CSTRING_VALUE(ctx->ingested_auth_header),
FLB_LOG_EVENT_CSTRING_VALUE("log"),
FLB_LOG_EVENT_STRING_VALUE(buf, size));

}
}
}

Expand Down Expand Up @@ -275,6 +292,8 @@ static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *recor
flb_sds_t tag, flb_sds_t tag_from_record,
struct flb_time tm) {
int ret;
int i;
msgpack_object_kv *kv;

ret = flb_log_event_encoder_begin_record(&ctx->log_encoder);

Expand All @@ -284,18 +303,46 @@ static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *recor
&tm);
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_set_body_from_msgpack_object(
&ctx->log_encoder,
record);
if (ctx->store_token_to_metadata == FLB_TRUE) {
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_set_body_from_msgpack_object(
&ctx->log_encoder,
record);
}

if (ctx->ingested_auth_header != NULL) {
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_metadata_values(
&ctx->log_encoder,
FLB_LOG_EVENT_CSTRING_VALUE("hec_token"),
FLB_LOG_EVENT_CSTRING_VALUE(ctx->ingested_auth_header));
}
}
}
else {
if (ctx->ingested_auth_header != NULL) {
/* iterate through the old record map to create the appendable new buffer */
kv = record->via.map.ptr;
for(i = 0; i < record->via.map.size && ret == FLB_EVENT_ENCODER_SUCCESS; i++) {
ret = flb_log_event_encoder_append_body_values(
&ctx->log_encoder,
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].key),
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&kv[i].val));
}

if (ctx->ingested_auth_header != NULL) {
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_metadata_values(
&ctx->log_encoder,
FLB_LOG_EVENT_CSTRING_VALUE("hec_token"),
FLB_LOG_EVENT_CSTRING_VALUE(ctx->ingested_auth_header));
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_values(
&ctx->log_encoder,
FLB_LOG_EVENT_CSTRING_VALUE(ctx->store_token_key),
FLB_LOG_EVENT_CSTRING_VALUE(ctx->ingested_auth_header));
}
}
else {
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_set_body_from_msgpack_object(
&ctx->log_encoder,
record);
}
}
}

Expand Down Expand Up @@ -434,9 +481,12 @@ static ssize_t parse_hec_payload_json(struct flb_splunk *ctx, flb_sds_t tag,

static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request *request)
{
struct mk_list *tmp;
struct mk_list *head;
struct mk_http_header *auth_header = NULL;
struct flb_splunk_tokens *splunk_token;

if (ctx->auth_header == NULL) {
if (mk_list_size(&ctx->auth_tokens) == 0) {
return SPLUNK_AUTH_UNAUTH;
}

Expand All @@ -447,14 +497,16 @@ static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request *
}

if (auth_header != NULL && auth_header->val.len > 0) {
if (strncmp(ctx->auth_header,
auth_header->val.data,
strlen(ctx->auth_header)) == 0) {
return SPLUNK_AUTH_SUCCESS;
}
else {
return SPLUNK_AUTH_UNAUTHORIZED;
mk_list_foreach_safe(head, tmp, &ctx->auth_tokens) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the list entry won't be deleted, you can use mk_list_foreach() , the _safe_ version aims to provide an extra pointer to deal with cases where entries/nodes are being removed.

splunk_token = mk_list_entry(head, struct flb_splunk_tokens, _head);
if (strncmp(splunk_token->header,
auth_header->val.data,
strlen(splunk_token->header)) == 0) {
return SPLUNK_AUTH_SUCCESS;
}
}

return SPLUNK_AUTH_UNAUTHORIZED;
}
else {
return SPLUNK_AUTH_MISSING_CRED;
Expand Down Expand Up @@ -888,27 +940,32 @@ static int send_json_message_response_ng(struct flb_http_response *response,

static int validate_auth_header_ng(struct flb_splunk *ctx, struct flb_http_request *request)
{
struct mk_list *tmp;
struct mk_list *head;
char *auth_header;
struct flb_splunk_tokens *splunk_token;

if (ctx->auth_header == NULL) {
if (mk_list_size(&ctx->auth_tokens) == 0) {
return SPLUNK_AUTH_UNAUTH;
}

auth_header = flb_http_request_get_header(request, "authorization");

if (auth_header == NULL) {
return SPLUNK_AUTH_MISSING_CRED;
}

if (auth_header != NULL && strlen(auth_header) > 0) {
if (strncmp(ctx->auth_header,
auth_header,
strlen(ctx->auth_header)) == 0) {
return SPLUNK_AUTH_SUCCESS;
}
else {
return SPLUNK_AUTH_UNAUTHORIZED;
mk_list_foreach_safe(head, tmp, &ctx->auth_tokens) {
splunk_token = mk_list_entry(head, struct flb_splunk_tokens, _head);
if (strncmp(splunk_token->header,
auth_header,
strlen(splunk_token->header)) == 0) {
return SPLUNK_AUTH_SUCCESS;
}
}

return SPLUNK_AUTH_UNAUTHORIZED;
}
else {
return SPLUNK_AUTH_MISSING_CRED;
Expand Down
Loading
Loading