Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mem limiter #368

Merged
merged 79 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
1e44954
initial buffer reuse
DmitriyMusatkin Nov 6, 2023
0f59f52
test fixes
DmitriyMusatkin Nov 6, 2023
511ce35
more test fixes
DmitriyMusatkin Nov 6, 2023
a524b08
lets not be too fancy
DmitriyMusatkin Nov 6, 2023
5804c39
enable for puts
DmitriyMusatkin Nov 6, 2023
bd17c6d
bump limits
DmitriyMusatkin Nov 6, 2023
690fc85
dont reinit buffer
DmitriyMusatkin Nov 6, 2023
be631c6
fixes
DmitriyMusatkin Nov 6, 2023
d1df7ce
remove logging
DmitriyMusatkin Nov 6, 2023
bc9b126
cleaning up
DmitriyMusatkin Nov 9, 2023
891cd90
test fixes
DmitriyMusatkin Nov 9, 2023
c695744
32 bit fix
DmitriyMusatkin Nov 9, 2023
6c9e6ae
test fixes
DmitriyMusatkin Nov 9, 2023
e7166e9
fix small buffer for gets
DmitriyMusatkin Nov 9, 2023
9c40011
dont cancel trim
DmitriyMusatkin Nov 9, 2023
5207457
move around trim canceling
DmitriyMusatkin Nov 10, 2023
6f7286c
typo
DmitriyMusatkin Nov 10, 2023
22f38eb
lets check metrics inside synced block
DmitriyMusatkin Nov 10, 2023
43f12e6
data race
DmitriyMusatkin Nov 10, 2023
3d467d8
addressing comments
DmitriyMusatkin Nov 10, 2023
a7b9e7f
logging
DmitriyMusatkin Nov 10, 2023
ca9e597
low mem limits on 32
DmitriyMusatkin Nov 11, 2023
b0a9d83
typo
DmitriyMusatkin Nov 11, 2023
d9197dc
add more logging
DmitriyMusatkin Nov 13, 2023
3d755bd
build warning
DmitriyMusatkin Nov 13, 2023
02bae74
comment out correctly
DmitriyMusatkin Nov 13, 2023
dc9373e
comment out unused
DmitriyMusatkin Nov 13, 2023
c62cec1
more logging
DmitriyMusatkin Nov 13, 2023
607e28e
correct specifier
DmitriyMusatkin Nov 13, 2023
1c3345f
fix default mem config
DmitriyMusatkin Nov 13, 2023
36ad5c0
fixup mem usage stats
DmitriyMusatkin Nov 13, 2023
7ca1995
more logging
DmitriyMusatkin Nov 13, 2023
20f5c35
more logging
DmitriyMusatkin Nov 13, 2023
c5991e3
telemetry callback logs
DmitriyMusatkin Nov 13, 2023
7b98645
remove trim cancelling
DmitriyMusatkin Nov 13, 2023
df65c14
scheduling change
DmitriyMusatkin Nov 13, 2023
bb3c04a
switch over to reserving
DmitriyMusatkin Nov 14, 2023
3228670
unused params
DmitriyMusatkin Nov 14, 2023
6d68522
test fixes
DmitriyMusatkin Nov 14, 2023
cfc5f47
fix tests
DmitriyMusatkin Nov 14, 2023
b1f372f
remove assert
DmitriyMusatkin Nov 14, 2023
834f629
remove log
DmitriyMusatkin Nov 14, 2023
6d427a3
tweak reserve algo
DmitriyMusatkin Nov 15, 2023
8a7c617
fix
DmitriyMusatkin Nov 15, 2023
efef34d
docs
DmitriyMusatkin Nov 16, 2023
8e7f0db
Merge branch 'main' into mem_ticket
DmitriyMusatkin Nov 16, 2023
6a6fa88
fix test
DmitriyMusatkin Nov 16, 2023
b32a2fb
move test back
DmitriyMusatkin Nov 16, 2023
59e58fa
addressing comments
DmitriyMusatkin Nov 16, 2023
6617ab1
fix block size check
DmitriyMusatkin Nov 17, 2023
a94d0eb
fix buf limits test
DmitriyMusatkin Nov 17, 2023
9d783fd
add logging for debug purposes
DmitriyMusatkin Nov 17, 2023
7fd8358
more logs
DmitriyMusatkin Nov 17, 2023
4078601
moar logs
DmitriyMusatkin Nov 17, 2023
30b3069
telemetry callback makes no sense
DmitriyMusatkin Nov 17, 2023
3cfceb7
lets wait for meta req to shutdown
DmitriyMusatkin Nov 17, 2023
cb943cd
addressing comments
DmitriyMusatkin Nov 18, 2023
4f1488b
address comments
DmitriyMusatkin Nov 19, 2023
b450054
fix test
DmitriyMusatkin Nov 19, 2023
db19f07
another test
DmitriyMusatkin Nov 19, 2023
f3e896a
data race
DmitriyMusatkin Nov 19, 2023
1a8d329
data race
DmitriyMusatkin Nov 19, 2023
c600680
reenable trim
DmitriyMusatkin Nov 20, 2023
849d655
tweak buffer
DmitriyMusatkin Nov 20, 2023
e0b0ab7
fix build error
DmitriyMusatkin Nov 20, 2023
7f14806
lint and update docs
DmitriyMusatkin Nov 20, 2023
3739d1c
more lint
DmitriyMusatkin Nov 20, 2023
00cc3c1
adjust validation
DmitriyMusatkin Nov 20, 2023
ed5baa3
trim test
DmitriyMusatkin Nov 20, 2023
ab23ab6
lint
DmitriyMusatkin Nov 20, 2023
8fd62e4
net test case
DmitriyMusatkin Nov 20, 2023
52ffae7
Update source/s3_buffer_pool.c
DmitriyMusatkin Nov 21, 2023
fccca7c
addressing comments
DmitriyMusatkin Nov 21, 2023
1630751
addressing comments
DmitriyMusatkin Nov 21, 2023
0028246
lint, fix docs
DmitriyMusatkin Nov 21, 2023
fb3e351
even more lint
DmitriyMusatkin Nov 21, 2023
79977a5
address comments
DmitriyMusatkin Nov 21, 2023
bb5c139
remove 0 size buffer test
DmitriyMusatkin Nov 21, 2023
a6c69c2
typo
DmitriyMusatkin Nov 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/memory_aware_scheduling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
TODO: high level picture on how scheduling + buffer pooling works
96 changes: 96 additions & 0 deletions include/aws/s3/private/s3_buffer_pool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#ifndef AWS_S3_BUFFER_ALLOCATOR_H
#define AWS_S3_BUFFER_ALLOCATOR_H

/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/s3/s3.h>

/*
* S3 buffer pool.
* Buffer pool used for pooling part sized buffers for Put/Get.
* Provides additional functionally for limiting overall memory usage by setting
* upper bound beyond which buffer acquisition will fail.
*/

AWS_EXTERN_C_BEGIN

struct aws_s3_buffer_pool;

struct aws_s3_pooled_buffer {
size_t size;
uint8_t *ptr;
};
DmitriyMusatkin marked this conversation as resolved.
Show resolved Hide resolved

struct aws_s3_buffer_pool_usage_stats {
/* Max size limit. Same value as provided during creation. */
size_t max_size;
/* Approximate amount of memory used. Should be fairly close to actual
* memory usage, but amount of remaining memory can vary based on
* allocations being performed, ex. pool has several blocks with space
* available to them, but allocation does not fit into any of them and new
* block cannot be allocated.
* Exercise caution when using this number and plan for allocations failing
* even if pool seemingly has enough memory. */
size_t approx_used;
};

/*
* Create new buffer pool.
* block_size - specifies size of the blocks in the primary storage. any acquires
* bigger than block size are allocated off secondary storage.
* max_mem_usage - limit on how much mem buffer pool can use. once limit is hit,
* buffer pool will start return empty buffers.
* Returns buffer pool pointer on success and NULL on failure.
*/
AWS_S3_API struct aws_s3_buffer_pool *aws_s3_buffer_pool_new(
struct aws_allocator *allocator,
size_t block_size,
DmitriyMusatkin marked this conversation as resolved.
Show resolved Hide resolved
size_t max_mem_usage);

/*
* Destroys buffer pool.
* Does nothing if buffer_pool is NULL.
*/
AWS_S3_API void aws_s3_buffer_pool_destroy(struct aws_s3_buffer_pool *buffer_pool);

/*
* Acquire buffer of specified size.
* Returned buffer is empty (0 ptr and size) if buffer cannot be allocated.
* Warning: Always release buffer using release api. Its fine to wrap pooled
* buffer in another struct as long as it does not try to free underlying pointer
* (ex. aws_byte_buf_from_pooled_buffer)
*/
AWS_S3_API struct aws_s3_pooled_buffer aws_s3_buffer_pool_acquire_buffer(
struct aws_s3_buffer_pool *buffer_pool,
size_t size);
DmitriyMusatkin marked this conversation as resolved.
Show resolved Hide resolved

/*
* Release buffer back to the pool.
*/
AWS_S3_API void aws_s3_buffer_pool_release_buffer(
struct aws_s3_buffer_pool *buffer_pool,
struct aws_s3_pooled_buffer pooled_buffer);

/*
* Get pool memory usage stats.
*/
AWS_S3_API struct aws_s3_buffer_pool_usage_stats aws_s3_buffer_pool_get_usage(struct aws_s3_buffer_pool *buffer_pool);

/*
* Wraps pooled buffer in a static byte_buf.
*/
AWS_S3_API struct aws_byte_buf aws_byte_buf_from_pooled_buffer(struct aws_s3_pooled_buffer pooled_buffer);

/*
* Trims all unused mem from the pool.
* Warning: fairly slow operation, do not use in critical path.
* TODO: partial trimming? ex. only trim down to 50% of max?
*/
AWS_S3_API void aws_s3_buffer_pool_trim(struct aws_s3_buffer_pool *buffer_pool);

AWS_EXTERN_C_END

#endif /* AWS_S3_BUFFER_ALLOCATOR_H */
14 changes: 14 additions & 0 deletions include/aws/s3/private/s3_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ struct aws_s3_client_vtable {
struct aws_s3_client {
struct aws_allocator *allocator;

struct aws_s3_buffer_pool *buffer_pool;

struct aws_s3_client_vtable *vtable;

struct aws_ref_count ref_count;
Expand Down Expand Up @@ -304,12 +306,18 @@ struct aws_s3_client {
* List contains aws_s3_meta_request_work */
struct aws_linked_list pending_meta_request_work;

/* aws_s3_requests waiting for memory. */
struct aws_priority_queue requests_waiting_for_mem;

/* aws_s3_request that are prepared and ready to be put in the threaded_data request queue. */
struct aws_linked_list prepared_requests;

/* Task for processing requests from meta requests on connections. */
struct aws_task process_work_task;

/* Task for trimming buffer bool. */
struct aws_task trim_buffer_pool_task;

/* Number of endpoints currently allocated. Used during clean up to know how many endpoints are still in
* memory.*/
uint32_t num_endpoints_allocated;
Expand All @@ -323,6 +331,9 @@ struct aws_s3_client {
/* Whether or not work processing is currently scheduled. */
uint32_t process_work_task_scheduled : 1;

/* Whether or not work processing is currently scheduled. */
uint32_t trim_buffer_pool_task_scheduled : 1;

/* Whether or not work process is currently in progress. */
uint32_t process_work_task_in_progress : 1;

Expand All @@ -342,6 +353,9 @@ struct aws_s3_client {
/* Client list of ongoing aws_s3_meta_requests. */
struct aws_linked_list meta_requests;

/* aws_s3_requests waiting for memory. */
struct aws_priority_queue requests_waiting_for_mem;

/* Number of requests in the request_queue linked_list. */
uint32_t request_queue_size;

Expand Down
1 change: 1 addition & 0 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ struct aws_s3_meta_request_vtable {
*/
struct aws_s3_meta_request {
struct aws_allocator *allocator;
struct aws_s3_buffer_pool *buffer_pool;
DmitriyMusatkin marked this conversation as resolved.
Show resolved Hide resolved

struct aws_ref_count ref_count;

Expand Down
12 changes: 12 additions & 0 deletions include/aws/s3/private/s3_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <aws/common/thread.h>
#include <aws/s3/s3.h>

#include <aws/s3/private/s3_buffer_pool.h>
#include <aws/s3/private/s3_checksums.h>

struct aws_http_message;
Expand All @@ -22,6 +23,7 @@ enum aws_s3_request_flags {
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS = 0x00000001,
AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY = 0x00000002,
AWS_S3_REQUEST_FLAG_ALWAYS_SEND = 0x00000004,
AWS_S3_REQUEST_FLAG_PART_SIZE_REQUEST_BODY = 0x00000008,
};

/**
Expand Down Expand Up @@ -105,13 +107,17 @@ struct aws_s3_request {

struct aws_allocator *allocator;

struct aws_s3_buffer_pool *buffer_pool;

/* Owning meta request. */
struct aws_s3_meta_request *meta_request;

/* Request body to use when sending the request. The contents of this body will be re-used if a request is
* retried.*/
struct aws_byte_buf request_body;

struct aws_s3_pooled_buffer pooled_buffer;

/* Beginning range of this part. */
/* TODO currently only used by auto_range_get, could be hooked up to auto_range_put as well. */
uint64_t part_range_start;
Expand All @@ -126,6 +132,10 @@ struct aws_s3_request {
*/
uint32_t part_number;

size_t part_size;

uint32_t num_times_tried_buffer_acquire;
DmitriyMusatkin marked this conversation as resolved.
Show resolved Hide resolved

/* Number of times aws_s3_meta_request_prepare has been called for a request. During the first call to the virtual
* prepare function, this will be 0.*/
uint32_t num_times_prepared;
Expand Down Expand Up @@ -205,6 +215,8 @@ struct aws_s3_request {

/* When true, this request has already been uploaded. we still prepare the request to check the durability. */
uint32_t was_previously_uploaded : 1;

uint32_t part_size_request_body : 1;
DmitriyMusatkin marked this conversation as resolved.
Show resolved Hide resolved
};

AWS_EXTERN_C_BEGIN
Expand Down
10 changes: 5 additions & 5 deletions include/aws/s3/private/s3_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
#else
# define ASSERT_SYNCED_DATA_LOCK_HELD(object)
#endif
#define KB_TO_BYTES(kb) ((kb)*1024)
#define MB_TO_BYTES(mb) ((mb)*1024 * 1024)
#define GB_TO_BYTES(gb) ((gb)*1024 * 1024 * 1024ULL)
#define KB_TO_BYTES(kb) ((kb) * 1024)
#define MB_TO_BYTES(mb) ((mb) * 1024 * 1024)
#define GB_TO_BYTES(gb) ((gb) * 1024 * 1024 * 1024ULL)

#define MS_TO_NS(ms) ((uint64_t)(ms)*1000000)
#define SEC_TO_NS(ms) ((uint64_t)(ms)*1000000000)
#define MS_TO_NS(ms) ((uint64_t)(ms) * 1000000)
#define SEC_TO_NS(ms) ((uint64_t)(ms) * 1000000000)

struct aws_allocator;
struct aws_http_stream;
Expand Down
1 change: 1 addition & 0 deletions include/aws/s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ enum aws_s3_errors {
AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH,
AWS_ERROR_S3_REQUEST_TIME_TOO_SKEWED,
AWS_ERROR_S3_FILE_MODIFIED,
AWS_ERROR_S3_INSUFFICIENT_MEMORY,
AWS_ERROR_S3_END_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_S3_PACKAGE_ID)
};

Expand Down
10 changes: 10 additions & 0 deletions include/aws/s3/s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,12 @@ enum aws_s3_checksum_location {
AWS_SCL_TRAILER,
};

enum aws_s3_memory_limiter_mode {
AWS_S3_MEMORY_LIMITER_DISABLED = 0,
AWS_S3_MEMORY_LIMITER_ENABLED_DEFAULT,
AWS_S3_MEMORY_LIMITER_ENABLED
};

/**
* Info about a single part, for you to review before the upload completes.
*/
Expand Down Expand Up @@ -344,6 +350,10 @@ struct aws_s3_client_config {
/* Throughput target in Gbps that we are trying to reach. */
double throughput_target_gbps;

enum aws_s3_memory_limiter_mode memory_limiter_mode;

size_t max_memory_limit;
DmitriyMusatkin marked this conversation as resolved.
Show resolved Hide resolved

/* Retry strategy to use. If NULL, a default retry strategy will be used. */
struct aws_retry_strategy *retry_strategy;

Expand Down
1 change: 1 addition & 0 deletions source/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ static struct aws_error_info s_errors[] = {
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH, "Request body length must match Content-Length header."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_REQUEST_TIME_TOO_SKEWED, "RequestTimeTooSkewed error received from S3."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_FILE_MODIFIED, "The file was modified during upload."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_INSUFFICIENT_MEMORY, "Request cannot be processed due to insufficient memory"),
};
/* clang-format on */

Expand Down
16 changes: 14 additions & 2 deletions source/s3_auto_ranged_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ static bool s_s3_auto_ranged_get_update(
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS | AWS_S3_REQUEST_FLAG_PART_SIZE_RESPONSE_BODY);

request->discovers_object_size = true;
request->part_size = meta_request->part_size;
DmitriyMusatkin marked this conversation as resolved.
Show resolved Hide resolved

auto_ranged_get->synced_data.head_object_sent = true;
}
Expand All @@ -194,6 +195,7 @@ static bool s_s3_auto_ranged_get_update(

request->part_range_start = 0;
request->part_range_end = meta_request->part_size - 1; /* range-end is inclusive */
request->part_size = meta_request->part_size;
DmitriyMusatkin marked this conversation as resolved.
Show resolved Hide resolved
request->discovers_object_size = true;

++auto_ranged_get->synced_data.num_parts_requested;
Expand Down Expand Up @@ -267,6 +269,8 @@ static bool s_s3_auto_ranged_get_update(
&request->part_range_start,
&request->part_range_end);

request->part_size = meta_request->part_size;

++auto_ranged_get->synced_data.num_parts_requested;
goto has_work_remaining;
}
Expand Down Expand Up @@ -368,13 +372,21 @@ static struct aws_future_void *s_s3_auto_ranged_get_prepare_request(struct aws_s
aws_http_message_set_request_method(message, g_head_method);
}
break;
case AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART:
case AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_PART: {
request->pooled_buffer = aws_s3_buffer_pool_acquire_buffer(request->buffer_pool, meta_request->part_size);
if (request->pooled_buffer.ptr != NULL) {
request->send_data.response_body = aws_byte_buf_from_pooled_buffer(request->pooled_buffer);
} else {
aws_raise_error(AWS_ERROR_S3_INSUFFICIENT_MEMORY);
goto finish;
}

message = aws_s3_ranged_get_object_message_new(
meta_request->allocator,
meta_request->initial_request_message,
request->part_range_start,
request->part_range_end);
break;
} break;
case AWS_S3_AUTO_RANGE_GET_REQUEST_TYPE_INITIAL_MESSAGE:
message = aws_s3_message_util_copy_http_message_no_body_all_headers(
meta_request->allocator, meta_request->initial_request_message);
Expand Down
22 changes: 17 additions & 5 deletions source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -543,9 +543,10 @@ static bool s_s3_auto_ranged_put_update(
meta_request,
AWS_S3_AUTO_RANGED_PUT_REQUEST_TAG_PART,
0,
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS);
AWS_S3_REQUEST_FLAG_RECORD_RESPONSE_HEADERS | AWS_S3_REQUEST_FLAG_PART_SIZE_REQUEST_BODY);

request->part_number = auto_ranged_put->threaded_update_data.next_part_number;
request->part_size = meta_request->part_size;

/* If request was previously uploaded, we prepare it to ensure checksums still match,
* but ultimately it gets marked no-op and we don't send it */
Expand Down Expand Up @@ -924,7 +925,15 @@ struct aws_future_http_message *s_s3_prepare_upload_part(struct aws_s3_request *
/* Read the body */
uint64_t offset = 0;
size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number, &offset);
aws_byte_buf_init(&request->request_body, meta_request->allocator, request_body_size);
if (request->request_body.capacity == 0) {
request->pooled_buffer = aws_s3_buffer_pool_acquire_buffer(request->buffer_pool, request_body_size);
if (request->pooled_buffer.ptr != NULL) {
request->request_body = aws_byte_buf_from_pooled_buffer(request->pooled_buffer);
} else {
s_s3_prepare_upload_part_finish(part_prep, AWS_ERROR_S3_INSUFFICIENT_MEMORY);
return message_future;
}
}

part_prep->asyncstep_read_part = aws_s3_meta_request_read_body(meta_request, offset, &request->request_body);
aws_future_bool_register_callback(
Expand Down Expand Up @@ -952,17 +961,20 @@ static void s_s3_prepare_upload_part_on_read_done(void *user_data) {
if (error_code != AWS_ERROR_SUCCESS) {
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"id=%p: Failed reading request body, error %d (%s)",
"id=%p: Failed reading request body, error %d (%s) req cap %zu",
(void *)meta_request,
error_code,
aws_error_str(error_code));
aws_error_str(error_code),
request->request_body.capacity);
goto on_done;
}
/* Reading succeeded. */
bool is_body_stream_at_end = aws_future_bool_get_result(part_prep->asyncstep_read_part);

uint64_t offset = 0;
size_t request_body_size = s_compute_request_body_size(meta_request, request->part_number, &offset);
/* If Content-Length is defined, check that we read the expected amount */
if (has_content_length && (request->request_body.len < request->request_body.capacity)) {
if (has_content_length && (request->request_body.len < request_body_size)) {
error_code = AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH;
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
Expand Down
Loading
Loading