Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metric callback improve #390

Merged
merged 18 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ struct aws_s3_meta_request_event {
enum aws_s3_meta_request_event_type {
AWS_S3_META_REQUEST_EVENT_RESPONSE_BODY, /* body_callback */
AWS_S3_META_REQUEST_EVENT_PROGRESS, /* progress_callback */
/* TODO: AWS_S3_META_REQUEST_EVENT_TELEMETRY */
AWS_S3_META_REQUEST_EVENT_TELEMETRY, /* telemetry_callback */
} type;

union {
Expand All @@ -72,6 +72,11 @@ struct aws_s3_meta_request_event {
struct {
struct aws_s3_meta_request_progress info;
} progress;

/* data for AWS_S3_META_REQUEST_EVENT_TELEMETRY */
struct {
struct aws_s3_request_metrics *metrics;
} telemetry;
} u;
};

Expand Down
5 changes: 5 additions & 0 deletions include/aws/s3/private/s3_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ struct aws_http_stream;
struct aws_http_headers;
struct aws_http_message;
struct aws_s3_client;
struct aws_s3_request;
struct aws_s3_meta_request;

enum aws_s3_response_status {
AWS_S3_RESPONSE_STATUS_SUCCESS = 200,
Expand Down Expand Up @@ -284,6 +286,9 @@ void aws_s3_get_part_range(
AWS_S3_API
int aws_s3_crt_error_code_from_server_error_code_string(struct aws_byte_cursor error_code_string);

AWS_S3_API
void aws_s3_request_finish_up_metrics_synced(struct aws_s3_request *request, struct aws_s3_meta_request *meta_request);

AWS_EXTERN_C_END

#endif /* AWS_S3_UTIL_H */
6 changes: 1 addition & 5 deletions include/aws/s3/s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ typedef void(aws_s3_meta_request_progress_fn)(
void *user_data);

/**
* Invoked to report the telemetry of the meta request once a single request finishes. Invoked from the thread of the
* connection that request made from.
* Invoked to report the telemetry of the meta request once a single request finishes.
* Note: *metrics is only valid for the duration of the callback. If you need to keep it around, use
* `aws_s3_request_metrics_acquire`
*/
Expand Down Expand Up @@ -652,9 +651,6 @@ struct aws_s3_meta_request_options {
* If set the request will keep track of the metrics from `aws_s3_request_metrics`, and fire the callback when the
* request finishes receiving response.
* See `aws_s3_meta_request_telemetry_fn`
*
* Notes:
* - The callback will be invoked multiple times from different threads.
*/
aws_s3_meta_request_telemetry_fn *telemetry_callback;

Expand Down
1 change: 1 addition & 0 deletions source/s3_auto_ranged_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@ static void s_s3_auto_ranged_get_request_finished(
/* BEGIN CRITICAL SECTION */
{
aws_s3_meta_request_lock_synced_data(meta_request);
aws_s3_request_finish_up_metrics_synced(request, meta_request);

/* If the object range was found, then record it. */
if (found_object_size) {
Expand Down
5 changes: 5 additions & 0 deletions source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,7 @@ static void s_s3_auto_ranged_put_request_finished(
/* BEGIN CRITICAL SECTION */
{
aws_s3_meta_request_lock_synced_data(meta_request);
aws_s3_request_finish_up_metrics_synced(request, meta_request);
TingDaoK marked this conversation as resolved.
Show resolved Hide resolved

bool has_more_results = false;

Expand Down Expand Up @@ -1467,6 +1468,7 @@ static void s_s3_auto_ranged_put_request_finished(
/* BEGIN CRITICAL SECTION */
{
aws_s3_meta_request_lock_synced_data(meta_request);
aws_s3_request_finish_up_metrics_synced(request, meta_request);

AWS_ASSERT(auto_ranged_put->synced_data.needed_response_headers == NULL);
auto_ranged_put->synced_data.needed_response_headers = needed_response_headers;
Expand Down Expand Up @@ -1519,6 +1521,7 @@ static void s_s3_auto_ranged_put_request_finished(
/* BEGIN CRITICAL SECTION */
{
aws_s3_meta_request_lock_synced_data(meta_request);
aws_s3_request_finish_up_metrics_synced(request, meta_request);

++auto_ranged_put->synced_data.num_parts_completed;

Expand Down Expand Up @@ -1636,6 +1639,7 @@ static void s_s3_auto_ranged_put_request_finished(
/* BEGIN CRITICAL SECTION */
{
aws_s3_meta_request_lock_synced_data(meta_request);
aws_s3_request_finish_up_metrics_synced(request, meta_request);
auto_ranged_put->synced_data.complete_multipart_upload_completed = true;
auto_ranged_put->synced_data.complete_multipart_upload_error_code = error_code;

Expand All @@ -1652,6 +1656,7 @@ static void s_s3_auto_ranged_put_request_finished(
/* BEGIN CRITICAL SECTION */
{
aws_s3_meta_request_lock_synced_data(meta_request);
aws_s3_request_finish_up_metrics_synced(request, meta_request);
auto_ranged_put->synced_data.abort_multipart_upload_error_code = error_code;
auto_ranged_put->synced_data.abort_multipart_upload_completed = true;
aws_s3_meta_request_unlock_synced_data(meta_request);
Expand Down
1 change: 1 addition & 0 deletions source/s3_copy_object.c
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ static void s_s3_copy_object_request_finished(

struct aws_s3_copy_object *copy_object = meta_request->impl;
aws_s3_meta_request_lock_synced_data(meta_request);
aws_s3_request_finish_up_metrics_synced(request, meta_request);

switch (request->request_tag) {

Expand Down
2 changes: 2 additions & 0 deletions source/s3_default_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ static void s_s3_meta_request_default_request_finished(
/* BEGIN CRITICAL SECTION */
{
aws_s3_meta_request_lock_synced_data(meta_request);
aws_s3_request_finish_up_metrics_synced(request, meta_request);

meta_request_default->synced_data.cached_response_status = request->send_data.response_status;
meta_request_default->synced_data.request_completed = true;
meta_request_default->synced_data.request_error_code = error_code;
Expand Down
7 changes: 7 additions & 0 deletions source/s3_meta_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -1743,6 +1743,13 @@ static void s_s3_meta_request_event_delivery_task(struct aws_task *task, void *a
}
} break;

case AWS_S3_META_REQUEST_EVENT_TELEMETRY: {
AWS_FATAL_ASSERT(meta_request->telemetry_callback != NULL);
AWS_FATAL_ASSERT(event.u.telemetry.metrics != NULL);
meta_request->telemetry_callback(meta_request, event.u.telemetry.metrics, meta_request->user_data);
event.u.telemetry.metrics = aws_s3_request_metrics_release(event.u.telemetry.metrics);
} break;

default:
AWS_FATAL_ASSERT(false);
}
Expand Down
33 changes: 20 additions & 13 deletions source/s3_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,24 @@ void aws_s3_request_setup_send_data(struct aws_s3_request *request, struct aws_h
AWS_PRECONDITION(request);
AWS_PRECONDITION(message);

if (request != NULL && request->send_data.metrics != NULL) {
/* If there is a metrics from previous attempt, complete it now. */
struct aws_s3_request_metrics *metric = request->send_data.metrics;
aws_high_res_clock_get_ticks((uint64_t *)&metric->time_metrics.end_timestamp_ns);
metric->time_metrics.total_duration_ns =
metric->time_metrics.end_timestamp_ns - metric->time_metrics.start_timestamp_ns;

struct aws_s3_meta_request *meta_request = request->meta_request;
if (meta_request != NULL && meta_request->telemetry_callback != NULL) {

aws_s3_meta_request_lock_synced_data(meta_request);
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_TELEMETRY};
event.u.telemetry.metrics = aws_s3_request_metrics_acquire(metric);
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
aws_s3_meta_request_unlock_synced_data(meta_request);
}
request->send_data.metrics = aws_s3_request_metrics_release(metric);
}
aws_s3_request_clean_up_send_data(request);

request->send_data.message = message;
Expand All @@ -76,24 +94,13 @@ static void s_s3_request_clean_up_send_data_message(struct aws_s3_request *reque

void aws_s3_request_clean_up_send_data(struct aws_s3_request *request) {
AWS_PRECONDITION(request);
/* The metrics should be collected and provided to user before reaching here */
AWS_FATAL_ASSERT(request->send_data.metrics == NULL);

s_s3_request_clean_up_send_data_message(request);

aws_signable_destroy(request->send_data.signable);
request->send_data.signable = NULL;
if (request->send_data.metrics) {
/* invoke callback */
struct aws_s3_meta_request *meta_request = request->meta_request;
struct aws_s3_request_metrics *metric = request->send_data.metrics;
aws_high_res_clock_get_ticks((uint64_t *)&metric->time_metrics.end_timestamp_ns);
metric->time_metrics.total_duration_ns =
metric->time_metrics.end_timestamp_ns - metric->time_metrics.start_timestamp_ns;
if (meta_request->telemetry_callback) {
meta_request->telemetry_callback(meta_request, metric, meta_request->user_data);
}
request->send_data.metrics = aws_s3_request_metrics_release(metric);
}

aws_http_headers_release(request->send_data.response_headers);
request->send_data.response_headers = NULL;

Expand Down
23 changes: 23 additions & 0 deletions source/s3_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

#include "aws/s3/private/s3_util.h"
#include "aws/s3/private/s3_client_impl.h"
#include "aws/s3/private/s3_meta_request_impl.h"
#include "aws/s3/private/s3_request.h"
#include <aws/auth/credentials.h>
#include <aws/common/clock.h>
#include <aws/common/string.h>
#include <aws/common/xml_parser.h>
#include <aws/http/request_response.h>
Expand Down Expand Up @@ -623,3 +626,23 @@ int aws_s3_crt_error_code_from_server_error_code_string(struct aws_byte_cursor e
}
return AWS_ERROR_UNKNOWN;
}

void aws_s3_request_finish_up_metrics_synced(struct aws_s3_request *request, struct aws_s3_meta_request *meta_request) {
AWS_PRECONDITION(meta_request);
AWS_PRECONDITION(request);

if (request->send_data.metrics != NULL) {
/* Request is done, complete the metrics for the request now. */
struct aws_s3_request_metrics *metric = request->send_data.metrics;
aws_high_res_clock_get_ticks((uint64_t *)&metric->time_metrics.end_timestamp_ns);
metric->time_metrics.total_duration_ns =
metric->time_metrics.end_timestamp_ns - metric->time_metrics.start_timestamp_ns;

if (meta_request->telemetry_callback != NULL) {
struct aws_s3_meta_request_event event = {.type = AWS_S3_META_REQUEST_EVENT_TELEMETRY};
event.u.telemetry.metrics = aws_s3_request_metrics_acquire(metric);
aws_s3_meta_request_add_event_for_delivery_synced(meta_request, &event);
}
request->send_data.metrics = aws_s3_request_metrics_release(metric);
}
}
20 changes: 7 additions & 13 deletions tests/s3_data_plane_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ static int s_test_s3_request_create_destroy(struct aws_allocator *allocator, voi

request->send_data.response_headers = aws_http_headers_new(allocator);
ASSERT_TRUE(request->send_data.response_headers != NULL);
ASSERT_TRUE(request->send_data.metrics != NULL);
request->send_data.metrics = aws_s3_request_metrics_release(request->send_data.metrics);

aws_s3_request_clean_up_send_data(request);

Expand Down Expand Up @@ -3964,25 +3966,17 @@ static int s_test_s3_meta_request_default(struct aws_allocator *allocator, void

aws_s3_tester_unlock_synced_data(&tester);

ASSERT_SUCCESS(aws_s3_tester_validate_get_object_results(&meta_request_test_results, 0));

meta_request = aws_s3_meta_request_release(meta_request);

aws_s3_tester_wait_for_meta_request_shutdown(&tester);

/*
* TODO: telemetry is sent from request destructor, http threads hold on to
* req for a little bit after on_req_finished callback and its possible that
* telemetry callback will be invoked after meta reqs on_finished callback.
* Moving the telemetry check to after meta req shutdown callback. Need to
* figure out whether current behavior can be improved.
*/
/* Check the size of the metrics should be the same as the number of
requests, which should be 1 */
ASSERT_UINT_EQUALS(1, aws_array_list_length(&meta_request_test_results.synced_data.metrics));
struct aws_s3_request_metrics *metrics = NULL;
aws_array_list_back(&meta_request_test_results.synced_data.metrics, (void **)&metrics);

ASSERT_SUCCESS(aws_s3_tester_validate_get_object_results(&meta_request_test_results, 0));

meta_request = aws_s3_meta_request_release(meta_request);

aws_s3_tester_wait_for_meta_request_shutdown(&tester);
aws_s3_meta_request_test_results_clean_up(&meta_request_test_results);

aws_http_message_release(message);
Expand Down