Skip to content

Commit

Permalink
http_client: Add Ability to Process Chunked Stream
Browse files Browse the repository at this point in the history
Current flb_http_do processes chunked streams, but
requires that all chunks are received before allowing
interaction with the response payload.

This change allows a user to only initiate the http
request with flb_http_do_request and then process
the live stream of data by fetching available chunks
with flb_http_get_available_chunks.

Signed-off-by: ryanohnemus <ryanohnemus@gmail.com>
  • Loading branch information
ryanohnemus committed Dec 20, 2023
1 parent bcd3281 commit d1fb005
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 31 deletions.
5 changes: 3 additions & 2 deletions include/fluent-bit/flb_http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#define FLB_HTTP_MORE 0
#define FLB_HTTP_OK 1
#define FLB_HTTP_NOT_FOUND 2 /* header not found */
#define FLB_HTTP_CHUNK_AVAILABLE 3 /* means chunk is available, but there is more data. end of all chunks returns FLB_HTTP_OK */

/* Useful headers */
#define FLB_HTTP_HEADER_AUTH "Authorization"
Expand All @@ -66,8 +67,6 @@ struct flb_http_response {
int content_length; /* Content length set by headers */
int chunked_encoding; /* Chunked transfer encoding ? */
int connection_close; /* connection: close ? */
long chunked_cur_size;
long chunked_exp_size; /* expected chunked size */
char *chunk_processed_end; /* Position to mark last chunk */
char *headers_end; /* Headers end (\r\n\r\n) */

Expand Down Expand Up @@ -162,6 +161,8 @@ int flb_http_set_content_encoding_gzip(struct flb_http_client *c);
int flb_http_set_callback_context(struct flb_http_client *c,
struct flb_callback *cb_ctx);

int flb_http_get_available_chunks(struct flb_http_client *c, size_t bytes_consumed);
int flb_http_do_request(struct flb_http_client *c, size_t *bytes);
int flb_http_do(struct flb_http_client *c, size_t *bytes);
int flb_http_client_proxy_connect(struct flb_connection *u_conn);
void flb_http_client_destroy(struct flb_http_client *c);
Expand Down
116 changes: 89 additions & 27 deletions src/flb_http_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ static int process_chunked_data(struct flb_http_client *c)
long val;
char *p;
char tmp[32];
struct flb_http_response *r = &c->resp;
struct flb_http_response *r = &c->resp;
int found_full_chunk = FLB_FALSE;

chunk_start:
p = strstr(r->chunk_processed_end, "\r\n");
Expand Down Expand Up @@ -327,6 +328,7 @@ static int process_chunked_data(struct flb_http_client *c)
* 3. remove chunk ending \r\n
*/

found_full_chunk = FLB_TRUE;
/* 1. Validate ending chunk */
if (val - 2 == 0) {
/*
Expand Down Expand Up @@ -365,10 +367,11 @@ static int process_chunked_data(struct flb_http_client *c)
/* Always append a NULL byte */
r->data[r->data_len] = '\0';

/* Always update payload size after full chunk */
r->payload_size = r->data_len - (r->headers_end - r->data);

/* Is this the last chunk ? */
if ((val - 2 == 0)) {
/* Update payload size */
r->payload_size = r->data_len - (r->headers_end - r->data);
return FLB_HTTP_OK;
}

Expand All @@ -378,7 +381,10 @@ static int process_chunked_data(struct flb_http_client *c)
goto chunk_start;
}

return FLB_HTTP_MORE;
if (found_full_chunk == FLB_TRUE) {
return FLB_HTTP_CHUNK_AVAILABLE;
}
return FLB_HTTP_MORE;
}

static int process_data(struct flb_http_client *c)
Expand Down Expand Up @@ -460,8 +466,8 @@ static int process_data(struct flb_http_client *c)
if (ret == FLB_HTTP_ERROR) {
return FLB_HTTP_ERROR;
}
else if (ret == FLB_HTTP_OK) {
return FLB_HTTP_OK;
else if (ret == FLB_HTTP_OK || ret == FLB_HTTP_CHUNK_AVAILABLE) {
return ret;
}
}
else {
Expand Down Expand Up @@ -1173,15 +1179,16 @@ int flb_http_bearer_auth(struct flb_http_client *c, const char *token)
return result;
}


int flb_http_do(struct flb_http_client *c, size_t *bytes)
/* flb_http_do_request only sends the http request the data.
* This is useful for processing the chunked responses on your own.
* If you do not want to process the response on your own or expect
* all response data before you process data, use flb_http_do instead.
*/
int flb_http_do_request(struct flb_http_client *c, size_t *bytes)
{
int ret;
int r_bytes;
int crlf = 2;
int new_size;
ssize_t available;
size_t out_size;
size_t bytes_header = 0;
size_t bytes_body = 0;
char *tmp;
Expand All @@ -1192,7 +1199,7 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes)
/* Append pending headers */
ret = http_headers_compose(c);
if (ret == -1) {
return -1;
return FLB_HTTP_ERROR;
}

/* check enough space for the ending CRLF */
Expand All @@ -1201,7 +1208,7 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes)
tmp = flb_realloc(c->header_buf, new_size);
if (!tmp) {
flb_errno();
return -1;
return FLB_HTTP_ERROR;
}
c->header_buf = tmp;
c->header_size = new_size;
Expand Down Expand Up @@ -1230,7 +1237,7 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes)
if (errno != 0) {
flb_errno();
}
return -1;
return FLB_HTTP_ERROR;
}

if (c->body_len > 0) {
Expand All @@ -1239,16 +1246,55 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes)
&bytes_body);
if (ret == -1) {
flb_errno();
return -1;
return FLB_HTTP_ERROR;
}
}

/* number of sent bytes */
*bytes = (bytes_header + bytes_body);

/* Read the server response, we need at least 19 bytes */
/* prep c->resp for incoming data */
c->resp.data_len = 0;
while (1) {

/* at this point we've sent our request so we expect more data in response*/
return FLB_HTTP_MORE;
}

int flb_http_get_available_chunks(struct flb_http_client *c, size_t bytes_consumed)
{
/* returns
* FLB_HTTP_MORE - if no chunks are ready and we need to fetch more data
* FLB_HTTP_CHUNK_AVAILABLE - if one or more chunks have been found and
* it is not the end of the stream, meaning more data will come
* FLB_HTTP_OK - if the 'end' chunk was found signifying end of the stream
* if there is still chunked data to process at the end of
* stream, it will be contained in the response payload
* FLB_HTTP_ERROR - for any error
*/
int ret = FLB_HTTP_MORE;
int r_bytes;
ssize_t available;
size_t out_size;

// if the caller has consumed some of the payload (via bytes_consumed)
// we consume those bytes off the payload
if( bytes_consumed > 0 ) {
if(bytes_consumed > c->resp.payload_size) {
flb_error("[http_client] attempting to consume more bytes than "
"available. Attempted bytes_consumed=%zu payload_size=%zu ",
bytes_consumed,
c->resp.payload_size);
return FLB_HTTP_ERROR;
}

c->resp.payload_size -= bytes_consumed;
c->resp.data_len -= bytes_consumed;
memmove(c->resp.payload, c->resp.payload+bytes_consumed, c->resp.payload_size);
c->resp.chunk_processed_end = c->resp.payload+c->resp.payload_size;
c->resp.data[c->resp.data_len] = '\0';
}

while (ret == FLB_HTTP_MORE) {
available = flb_http_buffer_available(c) - 1;
if (available <= 1) {
/*
Expand All @@ -1267,7 +1313,7 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes)
c->resp.data_size + FLB_HTTP_DATA_CHUNK,
c->resp.data_size_max);
flb_upstream_conn_recycle(c->u_conn, FLB_FALSE);
return 0;
return FLB_HTTP_ERROR;
}
available = flb_http_buffer_available(c) - 1;
}
Expand All @@ -1277,7 +1323,7 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes)
available);
if (r_bytes <= 0) {
if (c->flags & FLB_HTTP_10) {
break;
return FLB_HTTP_OK;
}
}

Expand All @@ -1293,23 +1339,39 @@ int flb_http_do(struct flb_http_client *c, size_t *bytes)
c->u_conn->upstream->tcp_host,
c->u_conn->upstream->tcp_port,
c->u_conn->fd);
return -1;
}
else if (ret == FLB_HTTP_OK) {
break;
}
else if (ret == FLB_HTTP_MORE) {
continue;
return FLB_HTTP_ERROR;
}
}
else {
flb_error("[http_client] broken connection to %s:%i ?",
c->u_conn->upstream->tcp_host,
c->u_conn->upstream->tcp_port);
return -1;
return FLB_HTTP_ERROR;
}
}

return ret;
}

int flb_http_do(struct flb_http_client *c, size_t *bytes)
{
int ret;

ret = flb_http_do_request(c, bytes);
if (ret != 0) {
return ret;
}

/* Read the server response, we need at least 19 bytes */
while (ret == FLB_HTTP_MORE || ret == FLB_HTTP_CHUNK_AVAILABLE) {
/* flb_http_do does not consume any bytes during processing
* so we always pass 0 consumed_bytes because we fetch until
* the end chunk before returning to the caller
*/

ret = flb_http_get_available_chunks(c, 0);
}

/* Check 'Connection' response header */
ret = check_connection(c);
if (ret == FLB_HTTP_OK) {
Expand Down
2 changes: 0 additions & 2 deletions tests/include/aws_client_mock_client_resp.def
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(status, STATUS,
EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(content_length, CONTENT_LENGTH, T_INT)) /* Content length set by headers */
EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(chunked_encoding, CHUNKED_ENCODEING, T_INT)) /* Chunked transfer encoding ? */
EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(connection_close, CONNECTION_CLOSE, T_INT)) /* connection: close ? */
EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(chunked_cur_size, CHUNKED_CUR_SIZE, T_LONG))
EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(chunked_exp_size, CHUNKED_EXP_SIZE, T_LONG)) /* expected chunked size */
EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(chunk_processed_end, CHUNK_PROCESSED_END, T_CHAR_STAR)) /* Position to mark last chunk */
EVAL1(EXPAND_CLIENT_RESPONSE_PARAMETER(headers_end, HEADERS_END, T_CHAR_STAR)) /* Headers end (\r\n\r\n) */

Expand Down

0 comments on commit d1fb005

Please sign in to comment.