Skip to content

Commit

Permalink
cloudwatch_logs: remove sequence tokens from API calls
Browse files Browse the repository at this point in the history
Signed-off-by: Matthew Fala <falamatt@amazon.com>
  • Loading branch information
matthewfala committed Jan 27, 2023
1 parent 760956f commit 3aa6cd4
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 140 deletions.
5 changes: 4 additions & 1 deletion include/fluent-bit/flb_aws_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,11 @@ flb_sds_t flb_aws_error(char *response, size_t response_len);
* Similar to 'flb_aws_error', except it prints the JSON error type and message
* to the user in a error log.
* 'api' is the name of the API that was called; this is used in the error log.
*
* Returns 0 if an error cannot be found in the response,
* and 1 if the error is found and printed.
*/
void flb_aws_print_error(char *response, size_t response_len,
int flb_aws_print_error(char *response, size_t response_len,
char *api, struct flb_output_instance *ins);

/* Similar to 'flb_aws_print_error', but for APIs that return XML */
Expand Down
104 changes: 11 additions & 93 deletions plugins/out_cloudwatch_logs/cloudwatch_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@
#include "cloudwatch_api.h"

#define ERR_CODE_ALREADY_EXISTS "ResourceAlreadyExistsException"
#define ERR_CODE_INVALID_SEQUENCE_TOKEN "InvalidSequenceTokenException"
#define ERR_CODE_NOT_FOUND "ResourceNotFoundException"
#define ERR_CODE_DATA_ALREADY_ACCEPTED "DataAlreadyAcceptedException"

#define AMZN_REQUEST_ID_HEADER "x-amzn-RequestId"

Expand Down Expand Up @@ -228,23 +226,6 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf,
goto error;
}

if (stream->sequence_token) {
if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
"\"sequenceToken\":\"", 17)) {
goto error;
}

if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
stream->sequence_token, 0)) {
goto error;
}

if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
"\",", 2)) {
goto error;
}
}

if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
"\"logEvents\":[", 13)) {
goto error;
Expand Down Expand Up @@ -492,9 +473,6 @@ void reset_flush_buf(struct flb_cloudwatch *ctx, struct cw_flush *buf) {
if (buf->current_stream != NULL) {
buf->data_size += strlen(buf->current_stream->name);
buf->data_size += strlen(buf->current_stream->group);
if (buf->current_stream->sequence_token) {
buf->data_size += strlen(buf->current_stream->sequence_token);
}
}
}

Expand Down Expand Up @@ -1151,7 +1129,7 @@ static int set_log_group_retention(struct flb_cloudwatch *ctx, struct log_stream
struct flb_aws_client *cw_client;
flb_sds_t body;
flb_sds_t tmp;
flb_sds_t error;
int is_error_parsed;

flb_plg_info(ctx->ins, "Setting retention policy on log group %s to %dd", stream->group, ctx->log_retention_days);

Expand Down Expand Up @@ -1194,14 +1172,10 @@ static int set_log_group_retention(struct flb_cloudwatch *ctx, struct log_stream

/* Check error */
if (c->resp.payload_size > 0) {
error = flb_aws_error(c->resp.payload, c->resp.payload_size);
if (error != NULL) {
/* some other error occurred; notify user */
flb_aws_print_error(c->resp.payload, c->resp.payload_size,
"PutRetentionPolicy", ctx->ins);
flb_sds_destroy(error);
}
else {
/* some other error occurred; notify user */
is_error_parsed = flb_aws_print_error(c->resp.payload, c->resp.payload_size,
"PutRetentionPolicy", ctx->ins);
if (!is_error_parsed) {
/* error can not be parsed, print raw response to debug */
flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload);
}
Expand Down Expand Up @@ -1415,8 +1389,7 @@ int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream,
}

/*
* Returns -1 on failure, 0 on success, and 1 for a sequence token error,
* which means the caller can retry.
* Returns -1 on failure, 0 on success
*/
int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
struct log_stream *stream, size_t payload_size)
Expand All @@ -1425,7 +1398,7 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
struct flb_http_client *c = NULL;
struct flb_aws_client *cw_client;
flb_sds_t tmp;
flb_sds_t error;
int is_error_parsed;
int num_headers = 1;
int retry = FLB_TRUE;

Expand Down Expand Up @@ -1458,8 +1431,7 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
if (c->resp.data == NULL || c->resp.data_len == 0 || strstr(c->resp.data, AMZN_REQUEST_ID_HEADER) == NULL) {
/* code was 200, but response is invalid, treat as failure */
if (c->resp.data != NULL) {
flb_plg_debug(ctx->ins, "Could not find sequence token in "
"response: response body is empty: full data: `%.*s`", c->resp.data_len, c->resp.data);
flb_plg_debug(ctx->ins, "Invalid response: full data: `%.*s`", c->resp.data_len, c->resp.data);
}
flb_http_client_destroy(c);

Expand All @@ -1472,70 +1444,16 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
AMZN_REQUEST_ID_HEADER);
return -1;
}


/* success */
if (c->resp.payload_size > 0) {
flb_plg_debug(ctx->ins, "Sent events to %s", stream->name);
tmp = flb_json_get_val(c->resp.payload, c->resp.payload_size,
"nextSequenceToken");
if (tmp) {
if (stream->sequence_token != NULL) {
flb_sds_destroy(stream->sequence_token);
}
stream->sequence_token = tmp;

flb_http_client_destroy(c);
return 0;
}
else {
flb_plg_error(ctx->ins, "Could not find sequence token in "
"response: %s", c->resp.payload);
}
}

flb_http_client_destroy(c);
return 0;
}

/* Check error */
if (c->resp.payload_size > 0) {
error = flb_aws_error(c->resp.payload, c->resp.payload_size);
if (error != NULL) {
if (strcmp(error, ERR_CODE_INVALID_SEQUENCE_TOKEN) == 0) {
/*
* This case will happen when we do not know the correct
* sequence token; we can find it in the error response
* and retry.
*/
flb_plg_debug(ctx->ins, "Sequence token was invalid, "
"will retry");
tmp = flb_json_get_val(c->resp.payload, c->resp.payload_size,
"expectedSequenceToken");
if (tmp) {
if (stream->sequence_token != NULL) {
flb_sds_destroy(stream->sequence_token);
}
stream->sequence_token = tmp;
flb_sds_destroy(error);
flb_http_client_destroy(c);
/* tell the caller to retry */
return 1;
}
} else if (strcmp(error, ERR_CODE_DATA_ALREADY_ACCEPTED) == 0) {
/* not sure what causes this but it counts as success */
flb_plg_info(ctx->ins, "Got %s, a previous retry must have succeeded asychronously", ERR_CODE_DATA_ALREADY_ACCEPTED);
flb_sds_destroy(error);
flb_http_client_destroy(c);
/* success */
return 0;
}
/* some other error occurred; notify user */
flb_aws_print_error(c->resp.payload, c->resp.payload_size,
"PutLogEvents", ctx->ins);
flb_sds_destroy(error);
}
else {
is_error_parsed = flb_aws_print_error(c->resp.payload, c->resp.payload_size,
"PutLogEvents", ctx->ins);
if (!is_error_parsed) {
/* error could not be parsed, print raw response to debug */
flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload);
}
Expand Down
71 changes: 32 additions & 39 deletions plugins/out_cloudwatch_logs/cloudwatch_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins,
const char *tmp;
char *session_name = NULL;
struct flb_cloudwatch *ctx = NULL;
struct cw_flush *buf = NULL;
int ret;
(void) config;
(void) data;
Expand Down Expand Up @@ -348,50 +347,53 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins,
flb_output_upstream_set(upstream, ctx->ins);
ctx->cw_client->host = ctx->endpoint;

/* alloc the payload/processing buffer */
/* Export context */
flb_output_set_context(ins, ctx);

return 0;

error:
flb_free(session_name);
flb_plg_error(ctx->ins, "Initialization failed");
flb_cloudwatch_ctx_destroy(ctx);
return -1;
}

struct cw_flush *new_buffer()
{
struct cw_flush *buf;

buf = flb_calloc(1, sizeof(struct cw_flush));
if (!buf) {
flb_errno();
goto error;
return NULL;
}

buf->out_buf = flb_malloc(PUT_LOG_EVENTS_PAYLOAD_SIZE);
if (!buf->out_buf) {
flb_errno();
cw_flush_destroy(buf);
goto error;
return NULL;
}
buf->out_buf_size = PUT_LOG_EVENTS_PAYLOAD_SIZE;

buf->tmp_buf = flb_malloc(sizeof(char) * PUT_LOG_EVENTS_PAYLOAD_SIZE);
if (!buf->tmp_buf) {
flb_errno();
cw_flush_destroy(buf);
goto error;
return NULL;
}
buf->tmp_buf_size = PUT_LOG_EVENTS_PAYLOAD_SIZE;

buf->events = flb_malloc(sizeof(struct cw_event) * MAX_EVENTS_PER_PUT);
if (!buf->events) {
flb_errno();
cw_flush_destroy(buf);
goto error;
return NULL;
}
buf->events_capacity = MAX_EVENTS_PER_PUT;

ctx->buf = buf;


/* Export context */
flb_output_set_context(ins, ctx);

return 0;

error:
flb_free(session_name);
flb_plg_error(ctx->ins, "Initialization failed");
flb_cloudwatch_ctx_destroy(ctx);
return -1;
return buf;
}

static void cb_cloudwatch_flush(struct flb_event_chunk *event_chunk,
Expand All @@ -405,15 +407,21 @@ static void cb_cloudwatch_flush(struct flb_event_chunk *event_chunk,
(void) i_ins;
(void) config;

event_count = process_and_send(ctx, i_ins->p->name, ctx->buf, event_chunk->tag,
event_chunk->data, event_chunk->size);
struct cw_flush *buf;

buf = new_buffer();
if (!buf) {
FLB_OUTPUT_RETURN(FLB_RETRY);
}

event_count = process_and_send(ctx, i_ins->p->name, buf, event_chunk->tag, event_chunk->data, event_chunk->size);
if (event_count < 0) {
flb_plg_error(ctx->ins, "Failed to send events");
cw_flush_destroy(buf);
FLB_OUTPUT_RETURN(FLB_RETRY);
}

// TODO: this msg is innaccurate if events are skipped
flb_plg_debug(ctx->ins, "Sent %d events to CloudWatch", event_count);
cw_flush_destroy(buf);

FLB_OUTPUT_RETURN(FLB_OK);
}
Expand All @@ -429,10 +437,6 @@ void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx)
flb_aws_provider_destroy(ctx->base_aws_provider);
}

if (ctx->buf) {
cw_flush_destroy(ctx->buf);
}

if (ctx->aws_provider) {
flb_aws_provider_destroy(ctx->aws_provider);
}
Expand Down Expand Up @@ -477,9 +481,6 @@ void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx)
if (ctx->stream.name) {
flb_sds_destroy(ctx->stream.name);
}
if (ctx->stream.sequence_token) {
flb_sds_destroy(ctx->stream.sequence_token);
}
} else {
mk_list_foreach_safe(head, tmp, &ctx->streams) {
stream = mk_list_entry(head, struct log_stream, _head);
Expand All @@ -505,9 +506,6 @@ void log_stream_destroy(struct log_stream *stream)
if (stream->name) {
flb_sds_destroy(stream->name);
}
if (stream->sequence_token) {
flb_sds_destroy(stream->sequence_token);
}
if (stream->group) {
flb_sds_destroy(stream->group);
}
Expand Down Expand Up @@ -659,12 +657,7 @@ struct flb_output_plugin out_cloudwatch_logs_plugin = {
.cb_init = cb_cloudwatch_init,
.cb_flush = cb_cloudwatch_flush,
.cb_exit = cb_cloudwatch_exit,

/*
* Allow cloudwatch to use async network stack synchronously by opting into
* FLB_OUTPUT_SYNCHRONOUS synchronous task scheduler
*/
.flags = FLB_OUTPUT_SYNCHRONOUS,
.flags = 0,
.workers = 1,

/* Configuration */
Expand Down
8 changes: 3 additions & 5 deletions plugins/out_cloudwatch_logs/cloudwatch_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ struct cw_event {
struct log_stream {
flb_sds_t name;
flb_sds_t group;
flb_sds_t sequence_token;

/*
* log streams in CloudWatch do not expire; but our internal representations
* of them are periodically cleaned up if they have been unused for too long
Expand All @@ -87,8 +87,6 @@ struct log_stream {
struct mk_list _head;
};

void log_stream_destroy(struct log_stream *stream);

struct flb_cloudwatch {
/*
* TLS instances can not be re-used. So we have one for:
Expand Down Expand Up @@ -140,8 +138,6 @@ struct flb_cloudwatch {
/* if the log stream is dynamic, we'll use this */
struct mk_list streams;

/* buffers for data processing and request payload */
struct cw_flush *buf;
/* The namespace to use for the metric */
flb_sds_t metric_namespace;

Expand All @@ -157,4 +153,6 @@ struct flb_cloudwatch {

void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx);

void log_stream_destroy(struct log_stream *stream);

#endif
5 changes: 3 additions & 2 deletions src/aws/flb_aws_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -568,15 +568,15 @@ flb_sds_t flb_xml_get_val(char *response, size_t response_len, char *tag)
return val;
}

void flb_aws_print_error(char *response, size_t response_len,
int flb_aws_print_error(char *response, size_t response_len,
char *api, struct flb_output_instance *ins)
{
flb_sds_t error;
flb_sds_t message;

error = flb_json_get_val(response, response_len, "__type");
if (!error) {
return;
return 0;
}

message = flb_json_get_val(response, response_len, "message");
Expand All @@ -591,6 +591,7 @@ void flb_aws_print_error(char *response, size_t response_len,
}

flb_sds_destroy(error);
return 1;
}

/* parses AWS JSON API error responses and returns the value of the __type field */
Expand Down

0 comments on commit 3aa6cd4

Please sign in to comment.