Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bypass for CreateSession reqeust #384

Merged
merged 8 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 55 additions & 23 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1606,6 +1606,54 @@ static void s_s3_client_prepare_callback_queue_request(
int error_code,
void *user_data);

static bool s_s3_client_should_update_meta_request(
struct aws_s3_client *client,
struct aws_s3_meta_request *meta_request,
uint32_t num_requests_in_flight,
const uint32_t max_requests_in_flight,
const uint32_t max_requests_prepare) {

/* CreateSession has high priority to bypass the checks. */
if (meta_request->type == AWS_S3_META_REQUEST_TYPE_DEFAULT) {
struct aws_s3_meta_request_default *meta_request_default = meta_request->impl;
if (aws_string_eq_c_str(meta_request_default->operation_name, "CreateSession")) {
graebm marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
}

/**
* If number of being-prepared + already-prepared-and-queued requests is more than the max that can
* be in the preparation stage.
* Or total number of requests tracked by the client is more than the max tracked ("in flight")
* requests.
*
* We cannot create more requests for this meta request.
*/
if ((client->threaded_data.num_requests_being_prepared + client->threaded_data.request_queue_size) >=
max_requests_prepare) {
return false;
}
if (num_requests_in_flight >= max_requests_in_flight) {
return false;
TingDaoK marked this conversation as resolved.
Show resolved Hide resolved
}

/* If this particular endpoint doesn't have any known addresses yet, then we don't want to go full speed in
* ramping up requests just yet. If there is already enough in the queue for one address (even if those
* aren't for this particular endpoint) we skip over this meta request for now. */
struct aws_s3_endpoint *endpoint = meta_request->endpoint;
AWS_ASSERT(endpoint != NULL);
AWS_ASSERT(client->vtable->get_host_address_count);
size_t num_known_vips = client->vtable->get_host_address_count(
client->client_bootstrap->host_resolver, endpoint->host_name, AWS_GET_HOST_ADDRESS_COUNT_RECORD_TYPE_A);
if (num_known_vips == 0 && (client->threaded_data.num_requests_being_prepared +
client->threaded_data.request_queue_size) >= g_max_num_connections_per_vip) {
return false;
}

/* Nothing blocks the meta request to create more requests */
return true;
}

void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) {
AWS_PRECONDITION(client);

Expand All @@ -1628,37 +1676,21 @@ void aws_s3_client_update_meta_requests_threaded(struct aws_s3_client *client) {

for (uint32_t pass_index = 0; pass_index < num_passes; ++pass_index) {

/* While:
* * Number of being-prepared + already-prepared-and-queued requests is less than the max that can be in the
* preparation stage.
* * Total number of requests tracked by the client is less than the max tracked ("in flight") requests.
* * There are meta requests to get requests from.
*
* Then update meta requests to get new requests that can then be prepared (reading from any streams, signing,
* etc.) for sending.
/**
* Iterate through the meta requests to update meta requests and get new requests that can then be prepared
+ * (reading from any streams, signing, etc.) for sending.
*/
while ((client->threaded_data.num_requests_being_prepared + client->threaded_data.request_queue_size) <
max_requests_prepare &&
num_requests_in_flight < max_requests_in_flight &&
!aws_linked_list_empty(&client->threaded_data.meta_requests)) {
while (!aws_linked_list_empty(&client->threaded_data.meta_requests)) {

struct aws_linked_list_node *meta_request_node =
aws_linked_list_begin(&client->threaded_data.meta_requests);
struct aws_s3_meta_request *meta_request =
AWS_CONTAINER_OF(meta_request_node, struct aws_s3_meta_request, client_process_work_threaded_data);

struct aws_s3_endpoint *endpoint = meta_request->endpoint;
AWS_ASSERT(endpoint != NULL);

AWS_ASSERT(client->vtable->get_host_address_count);
size_t num_known_vips = client->vtable->get_host_address_count(
client->client_bootstrap->host_resolver, endpoint->host_name, AWS_GET_HOST_ADDRESS_COUNT_RECORD_TYPE_A);
if (!s_s3_client_should_update_meta_request(
client, meta_request, num_requests_in_flight, max_requests_in_flight, max_requests_prepare)) {

/* If this particular endpoint doesn't have any known addresses yet, then we don't want to go full speed in
* ramping up requests just yet. If there is already enough in the queue for one address (even if those
* aren't for this particular endpoint) we skip over this meta request for now. */
if (num_known_vips == 0 && (client->threaded_data.num_requests_being_prepared +
client->threaded_data.request_queue_size) >= g_max_num_connections_per_vip) {
/* Move the meta request to be processed from next loop. */
aws_linked_list_remove(&meta_request->client_process_work_threaded_data.node);
aws_linked_list_push_back(
&meta_requests_work_remaining, &meta_request->client_process_work_threaded_data.node);
Expand Down
1 change: 1 addition & 0 deletions source/s3express_credentials_provider.c
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ static struct aws_s3express_session_creator *s_session_creator_new(
/* Override endpoint only for tests. */
.endpoint = impl->mock_test.endpoint_override ? impl->mock_test.endpoint_override : NULL,
.user_data = session_creator,
.operation_name = aws_byte_cursor_from_c_str("CreateSession"),
};
session_creator->synced_data.meta_request = aws_s3_client_make_meta_request(impl->client, &options);
AWS_FATAL_ASSERT(session_creator->synced_data.meta_request);
Expand Down
13 changes: 8 additions & 5 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,14 @@ if(ENABLE_MOCK_SERVER_TESTS)
add_net_test_case(request_time_too_skewed_mock_server)
endif()

add_net_test_case(s3express_provider_long_run_real_server)
add_net_test_case(s3express_client_put_test_small_real_server)
add_net_test_case(s3express_client_put_test_large_real_server)
add_net_test_case(s3express_client_put_long_running_test_real_server)
add_net_test_case(s3express_client_get_test_real_server)
add_net_test_case(s3express_provider_long_running_session_refresh)

add_net_test_case(s3express_client_put_object)
add_net_test_case(s3express_client_put_object_multipart)
add_net_test_case(s3express_client_put_object_multipart_multiple)
add_net_test_case(s3express_client_put_object_long_running_session_refresh)
add_net_test_case(s3express_client_get_object)
add_net_test_case(s3express_client_get_object_multiple)

add_net_test_case(meta_request_auto_ranged_get_new_error_handling)
add_net_test_case(meta_request_auto_ranged_put_new_error_handling)
Expand Down
3 changes: 2 additions & 1 deletion tests/s3_data_plane_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -1711,7 +1711,8 @@ static int s_test_s3_multipart_put_object_with_acl(struct aws_allocator *allocat

static int s_test_s3_put_object_multiple_helper(struct aws_allocator *allocator, bool file_on_disk) {

#define NUM_REQUESTS 5
enum s_numbers { NUM_REQUESTS = 5 };

struct aws_s3_meta_request *meta_requests[NUM_REQUESTS];
struct aws_s3_meta_request_test_results meta_request_test_results[NUM_REQUESTS];
struct aws_http_message *messages[NUM_REQUESTS];
Expand Down
5 changes: 1 addition & 4 deletions tests/s3_mock_server_s3express_provider_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,6 @@ TEST_CASE(s3express_provider_stress_mock_server) {

/* Stress about under load, keep hitting 10 hosts */
for (size_t i = 0; i < num_requests; i++) {
/* code */
char key_buffer[128] = "";
snprintf(key_buffer, sizeof(key_buffer), "test-%zu", (size_t)(i % 10));
struct aws_credentials_properties_s3express property = {
Expand All @@ -562,7 +561,6 @@ TEST_CASE(s3express_provider_stress_mock_server) {
/* Stress about over load, keep hitting different hosts */
s_s3express_tester.credentials_callbacks_received = 0;
for (size_t i = 0; i < num_requests; i++) {
/* code */
char key_buffer[128] = "";
snprintf(key_buffer, sizeof(key_buffer), "test-%zu", i);
struct aws_credentials_properties_s3express property = {
Expand All @@ -583,7 +581,7 @@ TEST_CASE(s3express_provider_stress_mock_server) {
return AWS_OP_SUCCESS;
}

TEST_CASE(s3express_provider_long_run_real_server) {
TEST_CASE(s3express_provider_long_running_session_refresh) {
(void)ctx;

struct aws_s3_tester tester;
Expand Down Expand Up @@ -637,7 +635,6 @@ TEST_CASE(s3express_provider_long_run_real_server) {
}
/**
* We should have more than 2 different creds.
* Server can return a credentials that expires less than 5 mins.
**/
ASSERT_TRUE(s_s3express_tester.number_of_credentials >= 2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ static int s_s3express_put_object_request(
return AWS_OP_SUCCESS;
}

static int s_s3express_client_put_test_real_server_helper(struct aws_allocator *allocator, size_t content_length) {
static int s_s3express_client_put_test_helper(struct aws_allocator *allocator, size_t content_length) {

struct aws_s3_tester tester;
ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester));
Expand Down Expand Up @@ -256,14 +256,104 @@ static int s_s3express_client_put_test_real_server_helper(struct aws_allocator *
return AWS_OP_SUCCESS;
}

TEST_CASE(s3express_client_put_test_small_real_server) {
TEST_CASE(s3express_client_put_object) {
(void)ctx;
return s_s3express_client_put_test_real_server_helper(allocator, MB_TO_BYTES(1));
return s_s3express_client_put_test_helper(allocator, MB_TO_BYTES(1));
}

TEST_CASE(s3express_client_put_test_large_real_server) {
TEST_CASE(s3express_client_put_object_multipart) {
(void)ctx;
return s_s3express_client_put_test_real_server_helper(allocator, MB_TO_BYTES(100));
return s_s3express_client_put_test_helper(allocator, MB_TO_BYTES(100));
}

TEST_CASE(s3express_client_put_object_multipart_multiple) {
(void)ctx;

enum s_numbers { NUM_REQUESTS = 100 };

struct aws_s3_meta_request *meta_requests[NUM_REQUESTS];
struct aws_s3_meta_request_test_results meta_request_test_results[NUM_REQUESTS];
struct aws_input_stream *input_streams[NUM_REQUESTS];

struct aws_s3_tester tester;
ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester));

struct aws_byte_cursor region_cursor = aws_byte_cursor_from_c_str("us-east-1");

char endpoint[] = "crts-east1--use1-az4--x-s3.s3express-use1-az4.us-east-1.amazonaws.com";
struct aws_byte_cursor host_cursor = aws_byte_cursor_from_c_str(endpoint);
struct aws_byte_cursor key_cursor = aws_byte_cursor_from_c_str("/crt-test");

struct aws_byte_cursor west2_region_cursor = aws_byte_cursor_from_c_str("us-west-2");
char west2_endpoint[] = "crts-west2--usw2-az1--x-s3.s3express-usw2-az1.us-west-2.amazonaws.com";
struct aws_byte_cursor west2_host_cursor = aws_byte_cursor_from_c_str(west2_endpoint);

struct aws_s3_client_config client_config = {
.part_size = MB_TO_BYTES(5),
.enable_s3express = true,
.region = region_cursor,
};

ASSERT_SUCCESS(aws_s3_tester_bind_client(&tester, &client_config, AWS_S3_TESTER_BIND_CLIENT_SIGNING));

struct aws_s3_client *client = aws_s3_client_new(allocator, &client_config);

for (size_t i = 0; i < NUM_REQUESTS; ++i) {
input_streams[i] = aws_s3_test_input_stream_new(allocator, MB_TO_BYTES(10));

struct aws_byte_cursor request_region = region_cursor;
struct aws_byte_cursor request_host = host_cursor;
if (i % 2 == 0) {
/* Make half of request to east1 and rest half to west2 */
request_region = west2_region_cursor;
request_host = west2_host_cursor;
}

struct aws_http_message *message = aws_s3_test_put_object_request_new(
allocator, &request_host, key_cursor, g_test_body_content_type, input_streams[i], 0);

struct aws_s3_meta_request_options options;
AWS_ZERO_STRUCT(options);
options.type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT;
options.message = message;
struct aws_signing_config_aws s3express_signing_config = {
.algorithm = AWS_SIGNING_ALGORITHM_V4_S3EXPRESS,
.service = g_s3express_service_name,
.region = request_region,
};
options.signing_config = &s3express_signing_config;
aws_s3_meta_request_test_results_init(&meta_request_test_results[i], allocator);

ASSERT_SUCCESS(aws_s3_tester_bind_meta_request(&tester, &options, &meta_request_test_results[i]));

meta_requests[i] = aws_s3_client_make_meta_request(client, &options);
ASSERT_TRUE(meta_requests[i] != NULL);
aws_http_message_release(message);
}
/* Wait for the request to finish. */
aws_s3_tester_wait_for_meta_request_finish(&tester);
aws_s3_tester_lock_synced_data(&tester);
ASSERT_TRUE(tester.synced_data.finish_error_code == AWS_ERROR_SUCCESS);
aws_s3_tester_unlock_synced_data(&tester);

for (size_t i = 0; i < NUM_REQUESTS; ++i) {
meta_requests[i] = aws_s3_meta_request_release(meta_requests[i]);
}

aws_s3_tester_wait_for_meta_request_shutdown(&tester);

for (size_t i = 0; i < NUM_REQUESTS; ++i) {
aws_s3_tester_validate_put_object_results(&meta_request_test_results[i], 0);
aws_s3_meta_request_test_results_clean_up(&meta_request_test_results[i]);
}

for (size_t i = 0; i < NUM_REQUESTS; ++i) {
aws_input_stream_release(input_streams[i]);
}

aws_s3_client_release(client);
aws_s3_tester_clean_up(&tester);
return AWS_OP_SUCCESS;
}

void s_meta_request_finished_overhead(
Expand Down Expand Up @@ -300,7 +390,7 @@ struct aws_s3express_credentials_provider *s_s3express_provider_mock_factory(
}

/* Long running test to make sure our refresh works properly */
TEST_CASE(s3express_client_put_long_running_test_real_server) {
TEST_CASE(s3express_client_put_object_long_running_session_refresh) {
(void)ctx;

struct aws_s3_tester tester;
Expand Down Expand Up @@ -375,7 +465,7 @@ TEST_CASE(s3express_client_put_long_running_test_real_server) {
return AWS_OP_SUCCESS;
}

TEST_CASE(s3express_client_get_test_real_server) {
TEST_CASE(s3express_client_get_object) {
(void)ctx;

struct aws_s3_tester tester;
Expand Down Expand Up @@ -429,3 +519,73 @@ TEST_CASE(s3express_client_get_test_real_server) {
aws_s3_tester_clean_up(&tester);
return AWS_OP_SUCCESS;
}

TEST_CASE(s3express_client_get_object_multiple) {
(void)ctx;

struct aws_s3_meta_request *meta_requests[100];
struct aws_s3_meta_request_test_results meta_request_test_results[100];
size_t num_meta_requests = AWS_ARRAY_SIZE(meta_requests);

struct aws_s3_tester tester;
ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester));

struct aws_byte_cursor region_cursor = aws_byte_cursor_from_c_str("us-east-1");

char endpoint[] = "crts-east1--use1-az4--x-s3.s3express-use1-az4.us-east-1.amazonaws.com";
struct aws_byte_cursor host_cursor = aws_byte_cursor_from_c_str(endpoint);
struct aws_byte_cursor key_cursor = aws_byte_cursor_from_c_str("/crt-download-10MB");

struct aws_s3_client_config client_config = {
.part_size = MB_TO_BYTES(5),
.enable_s3express = true,
.region = region_cursor,
};

ASSERT_SUCCESS(aws_s3_tester_bind_client(&tester, &client_config, AWS_S3_TESTER_BIND_CLIENT_SIGNING));

struct aws_s3_client *client = aws_s3_client_new(allocator, &client_config);

for (size_t i = 0; i < num_meta_requests; ++i) {

struct aws_http_message *message = aws_s3_test_get_object_request_new(allocator, host_cursor, key_cursor);

struct aws_s3_meta_request_options options;
AWS_ZERO_STRUCT(options);
options.type = AWS_S3_META_REQUEST_TYPE_GET_OBJECT;
options.message = message;
struct aws_signing_config_aws s3express_signing_config = {
.algorithm = AWS_SIGNING_ALGORITHM_V4_S3EXPRESS,
.service = g_s3express_service_name,
};
options.signing_config = &s3express_signing_config;
aws_s3_meta_request_test_results_init(&meta_request_test_results[i], allocator);

ASSERT_SUCCESS(aws_s3_tester_bind_meta_request(&tester, &options, &meta_request_test_results[i]));

meta_requests[i] = aws_s3_client_make_meta_request(client, &options);
ASSERT_TRUE(meta_requests[i] != NULL);

aws_http_message_release(message);
}
/* Wait for the request to finish. */
aws_s3_tester_wait_for_meta_request_finish(&tester);
aws_s3_tester_lock_synced_data(&tester);
ASSERT_TRUE(tester.synced_data.finish_error_code == AWS_ERROR_SUCCESS);
aws_s3_tester_unlock_synced_data(&tester);

for (size_t i = 0; i < num_meta_requests; ++i) {
meta_requests[i] = aws_s3_meta_request_release(meta_requests[i]);
}

aws_s3_tester_wait_for_meta_request_shutdown(&tester);

for (size_t i = 0; i < num_meta_requests; ++i) {
aws_s3_tester_validate_get_object_results(&meta_request_test_results[i], 0);
aws_s3_meta_request_test_results_clean_up(&meta_request_test_results[i]);
}

aws_s3_client_release(client);
aws_s3_tester_clean_up(&tester);
return AWS_OP_SUCCESS;
}
Loading