Skip to content

Commit

Permalink
in_elasticsearch: add support for chunked transfer encoding
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Silva <eduardo@calyptia.com>
  • Loading branch information
edsiper committed Oct 10, 2024
1 parent 241f1c9 commit 6a0ef2b
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 38 deletions.
65 changes: 31 additions & 34 deletions plugins/in_elasticsearch/in_elasticsearch_bulk_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ static int in_elasticsearch_bulk_conn_event(void *data)
ssize_t available;
ssize_t bytes;
char *tmp;
char *request_end;
size_t request_len;
struct flb_connection *connection;
struct in_elasticsearch_bulk_conn *conn;
Expand Down Expand Up @@ -98,47 +97,45 @@ static int in_elasticsearch_bulk_conn_event(void *data)
/* Do more logic parsing and checks for this request */
in_elasticsearch_bulk_prot_handle(ctx, conn, &conn->session, &conn->request);

/* Evict the processed request from the connection buffer and reinitialize
/*
* Evict the processed request from the connection buffer and reinitialize
* the HTTP parser.
*/

request_end = NULL;
/* Use the last parser position as the request length */
request_len = mk_http_parser_request_size(&conn->session.parser,
conn->buf_data,
conn->buf_len);

if (NULL != conn->request.data.data) {
request_end = &conn->request.data.data[conn->request.data.len];
if (request_len == -1 || (request_len > conn->buf_len)) {
/* Unexpected but let's make sure things are safe */
conn->buf_len = 0;
flb_plg_debug(ctx->ins, "request length exceeds buffer length, closing connection");
in_elasticsearch_bulk_conn_del(conn);
return -1;
}
else {
request_end = strstr(conn->buf_data, "\r\n\r\n");

if(NULL != request_end) {
request_end = &request_end[4];
}
}
/* If we have extra bytes in our bytes, adjust the extra bytes */
if (0 < (conn->buf_len - request_len)) {
memmove(conn->buf_data, &conn->buf_data[request_len],
conn->buf_len - request_len);

if (NULL != request_end) {
request_len = (size_t)(request_end - conn->buf_data);

if (0 < (conn->buf_len - request_len)) {
memmove(conn->buf_data, &conn->buf_data[request_len],
conn->buf_len - request_len);

conn->buf_data[conn->buf_len - request_len] = '\0';
conn->buf_len -= request_len;
}
else {
memset(conn->buf_data, 0, request_len);

conn->buf_len = 0;
}

/* Reinitialize the parser so the next request is properly
* handled, the additional memset intends to wipe any left over data
* from the headers parsed in the previous request.
*/
memset(&conn->session.parser, 0, sizeof(struct mk_http_parser));
mk_http_parser_init(&conn->session.parser);
in_elasticsearch_bulk_conn_request_init(&conn->session, &conn->request);
conn->buf_data[conn->buf_len - request_len] = '\0';
conn->buf_len -= request_len;
}
else {
memset(conn->buf_data, 0, request_len);
conn->buf_len = 0;
}

/*
* Reinitialize the parser so the next request is properly
* handled, the additional memset intends to wipe any left over data
* from the headers parsed in the previous request.
*/
memset(&conn->session.parser, 0, sizeof(struct mk_http_parser));
mk_http_parser_init(&conn->session.parser);
in_elasticsearch_bulk_conn_request_init(&conn->session, &conn->request);
}
else if (status == MK_HTTP_PARSER_ERROR) {
in_elasticsearch_bulk_prot_handle_error(ctx, conn, &conn->session, &conn->request);
Expand Down
43 changes: 39 additions & 4 deletions plugins/in_elasticsearch/in_elasticsearch_bulk_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,10 @@ static int process_payload(struct flb_in_elasticsearch *ctx, struct in_elasticse
int gzip_compressed = FLB_FALSE;
void *gz_data = NULL;
size_t gz_size = -1;
char *out_chunked = NULL;
size_t out_chunked_size = 0;
char *payload_buf;
size_t payload_size;

header = &session->parser.headers[MK_HEADER_CONTENT_TYPE];
if (header->key.data == NULL) {
Expand All @@ -643,7 +647,7 @@ static int process_payload(struct flb_in_elasticsearch *ctx, struct in_elasticse
return -1;
}

if (request->data.len <= 0) {
if (request->data.len <= 0 && !mk_http_parser_is_content_chunked(&session->parser)) {
send_response(conn, 400, "error: no payload found\n");
return -1;
}
Expand All @@ -664,8 +668,32 @@ static int process_payload(struct flb_in_elasticsearch *ctx, struct in_elasticse
}

if (type == HTTP_CONTENT_NDJSON || type == HTTP_CONTENT_JSON) {
/* Check if the data is chunked */
payload_buf = NULL;
payload_size = 0;

if (mk_http_parser_is_content_chunked(&session->parser)) {
ret = mk_http_parser_chunked_decode(&session->parser,
conn->buf_data,
conn->buf_len,
&out_chunked,
&out_chunked_size);

if (ret == -1) {
send_response(conn, 400, "error: invalid chunked data\n");
return -1;
}

payload_buf = out_chunked;
payload_size = out_chunked_size;
}
else {
payload_buf = request->data.data;
payload_size = request->data.len;
}

if (gzip_compressed == FLB_TRUE) {
ret = flb_gzip_uncompress((void *) request->data.data, request->data.len,
ret = flb_gzip_uncompress((void *) payload_buf, payload_size,
&gz_data, &gz_size);
if (ret == -1) {
flb_error("[elasticsearch_bulk_prot] gzip uncompress is failed");
Expand All @@ -675,10 +703,15 @@ static int process_payload(struct flb_in_elasticsearch *ctx, struct in_elasticse
flb_free(gz_data);
}
else {
parse_payload_ndjson(ctx, tag, request->data.data, request->data.len, bulk_statuses);
parse_payload_ndjson(ctx, tag, payload_buf, payload_size, bulk_statuses);
}
}

/* release chunked data if has been set */
if (out_chunked) {
mk_mem_free(out_chunked);
}

return 0;
}

Expand Down Expand Up @@ -856,7 +889,8 @@ int in_elasticsearch_bulk_prot_handle(struct flb_in_elasticsearch *ctx,
mk_mem_free(uri);
return -1;
}
} else {
}
else {
flb_sds_destroy(tag);
mk_mem_free(uri);

Expand Down Expand Up @@ -1056,6 +1090,7 @@ static int process_payload_ng(struct flb_http_request *request,
return -1;
}

printf("Processing payload 2 : %s\n", request->body);
parse_payload_ndjson(context, tag, request->body, cfl_sds_len(request->body), bulk_statuses);

return 0;
Expand Down

0 comments on commit 6a0ef2b

Please sign in to comment.