Skip to content

Commit

Permalink
Reset offsets in buffer list for retries (#5220)
Browse files Browse the repository at this point in the history
There's a bug in the retry logic when a 503 is encountered while
uploading data. Our curl client fails to reset its BufferList state back
to zero in this case and we end up sending a bunch of garbage data to
the cloud service.

This PR fixes the issue and adds a regression test that failed before
and passes with the fix.

[sc-49128]

---
TYPE: BUG
DESC: Reset offsets in buffer list for retries
  • Loading branch information
ypatia authored Aug 2, 2024
1 parent 24cae8f commit 2a30fbe
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 26 deletions.
45 changes: 42 additions & 3 deletions test/src/unit-capi-incomplete.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,7 @@ void IncompleteFx::check_sparse_unsplittable_complete() {
TEST_CASE_METHOD(
IncompleteFx,
"C API: Test incomplete read queries, dense",
"[capi][incomplete][dense-incomplete][serialization][rest]") {
"[capi][incomplete][dense-incomplete][rest]") {
create_dense_array();
write_dense_full();
check_dense_incomplete();
Expand All @@ -1159,7 +1159,7 @@ TEST_CASE_METHOD(
TEST_CASE_METHOD(
IncompleteFx,
"C API: Test incomplete read queries, sparse",
"[capi][incomplete][sparse][serialization][rest]") {
"[capi][incomplete][sparse][rest]") {
create_sparse_array();
write_sparse_full();
check_sparse_incomplete();
Expand All @@ -1171,8 +1171,47 @@ TEST_CASE_METHOD(
TEST_CASE_METHOD(
IncompleteFx,
"C API: Test incomplete read queries, dense, serialized",
"[capi][incomplete][dense][serialization][rest]") {
"[capi][incomplete][dense][rest]") {
create_dense_array();
write_dense_full();
check_dense_incomplete();
}

TEST_CASE_METHOD(
IncompleteFx,
"C API: Test incomplete read queries, sparse",
"[capi][incomplete][sparse][retries][sc-49128][rest]") {
// This test is testing CURL logic and only makes sense on REST-CI
if (!vfs_test_setup_.is_rest()) {
return;
}

// Force retries on successful requests to test that the buffers
// resetting in the retry logic works well
tiledb_config_t* cfg;
tiledb_error_t* err = nullptr;
int rc = tiledb_config_alloc(&cfg, &err);
REQUIRE(rc == TILEDB_OK);
REQUIRE(err == nullptr);
rc = tiledb_config_set(cfg, "rest.retry_http_codes", "200", &err);
REQUIRE(rc == TILEDB_OK);
REQUIRE(err == nullptr);
rc = tiledb_config_set(cfg, "rest.retry_count", "1", &err);
REQUIRE(rc == TILEDB_OK);
REQUIRE(err == nullptr);
rc = tiledb_config_set(cfg, "rest.retry_initial_delay_ms", "5", &err);
REQUIRE(rc == TILEDB_OK);
REQUIRE(err == nullptr);

// Update the context with config
vfs_test_setup_.update_config(cfg);
ctx_ = vfs_test_setup_.ctx_c;
tiledb_config_free(&cfg);

create_sparse_array();
write_sparse_full();
check_sparse_incomplete();
check_sparse_until_complete();
check_sparse_unsplittable_overflow();
check_sparse_unsplittable_complete();
}
10 changes: 10 additions & 0 deletions tiledb/sm/buffer/buffer_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ void BufferList::reset_offset() {
current_relative_offset_ = 0;
}

void BufferList::set_offset(
const size_t current_buffer_index, const uint64_t current_relative_offset) {
current_buffer_index_ = current_buffer_index;
current_relative_offset_ = current_relative_offset;
}

std::tuple<size_t, uint64_t> BufferList::get_offset() const {
return {current_buffer_index_, current_relative_offset_};
}

uint64_t BufferList::total_size() const {
uint64_t size = 0;
for (const auto& b : buffers_)
Expand Down
20 changes: 20 additions & 0 deletions tiledb/sm/buffer/buffer_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,26 @@ class BufferList {
/** Returns the sum of sizes of all buffers in the list. */
uint64_t total_size() const;

/**
* Sets the current offsets for reading.
*
* @param current_buffer_index The index of the current buffer in the list.
* @param current_relative_offset The current relative offset within the
* current buffer.
*
* */
void set_offset(
const size_t current_buffer_index,
const uint64_t current_relative_offset);

/**
* Returns the current offsets.
*
* @return The index of the current buffer in the list.
* @return The current relative offset within the current buffer.
*/
std::tuple<size_t, uint64_t> get_offset() const;

private:
/** The underlying list of Buffers. */
std::vector<Buffer> buffers_;
Expand Down
53 changes: 37 additions & 16 deletions tiledb/sm/rest/curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -425,11 +425,13 @@ Status Curl::make_curl_request(
stats::Stats* const stats,
const char* url,
CURLcode* curl_code,
BufferList* data,
Buffer* returned_data) const {
return make_curl_request_common(
stats,
url,
curl_code,
data,
&write_memory_callback,
static_cast<void*>(returned_data));
}
Expand All @@ -438,11 +440,13 @@ Status Curl::make_curl_request(
stats::Stats* const stats,
const char* url,
CURLcode* curl_code,
BufferList* data,
PostResponseCb&& cb) const {
return make_curl_request_common(
stats,
url,
curl_code,
data,
&write_memory_callback_cb,
static_cast<void*>(&cb));
}
Expand Down Expand Up @@ -478,6 +482,7 @@ Status Curl::make_curl_request_common(
stats::Stats* const stats,
const char* const url,
CURLcode* const curl_code,
BufferList* data,
size_t (*write_cb)(void*, size_t, size_t, void*),
void* const write_cb_arg) const {
CURL* curl = curl_.get();
Expand All @@ -487,11 +492,27 @@ Status Curl::make_curl_request_common(

*curl_code = CURLE_OK;
uint64_t retry_delay = retry_initial_delay_ms_;

// Save the offsets before the request in case we need to retry
size_t current_buffer_index = 0;
uint64_t current_relative_offset = 0;
if (data != nullptr) {
std::tie(current_buffer_index, current_relative_offset) =
data->get_offset();
}

// <= because the 0ths retry is actually the initial request
for (uint8_t i = 0; i <= retry_count_; i++) {
WriteCbState write_cb_state;
write_cb_state.arg = write_cb_arg;

// If this is a retry we need to reset the offsets in the data buffer list
// to the initial position before the failed request so that we send the
// correct data.
if (data != nullptr && retry_count_ > 0) {
data->set_offset(current_buffer_index, current_relative_offset);
}

/* set url to fetch */
curl_easy_setopt(curl, CURLOPT_URL, url);

Expand Down Expand Up @@ -548,7 +569,7 @@ Status Curl::make_curl_request_common(
/* fetch the url */
CURLcode tmp_curl_code = curl_easy_perform_instrumented(url, i);

bool retry;
bool retry = false;
RETURN_NOT_OK(should_retry_based_on_http_status(&retry));

/* If Curl call was successful (not http status, but no socket error, etc)
Expand Down Expand Up @@ -704,7 +725,7 @@ Status Curl::post_data(
stats::Stats* const stats,
const std::string& url,
const SerializationType serialization_type,
const BufferList* data,
BufferList* data,
Buffer* const returned_data,
const std::string& res_uri) {
struct curl_slist* headers;
Expand All @@ -714,7 +735,7 @@ Status Curl::post_data(

CURLcode ret;
headerData.uri = &res_uri;
auto st = make_curl_request(stats, url.c_str(), &ret, returned_data);
auto st = make_curl_request(stats, url.c_str(), &ret, data, returned_data);
curl_slist_free_all(headers);
RETURN_NOT_OK(st);

Expand All @@ -728,7 +749,7 @@ Status Curl::post_data(
stats::Stats* const stats,
const std::string& url,
const SerializationType serialization_type,
const BufferList* data,
BufferList* data,
Buffer* const returned_data,
PostResponseCb&& cb,
const std::string& res_uri) {
Expand All @@ -737,7 +758,7 @@ Status Curl::post_data(

CURLcode ret;
headerData.uri = &res_uri;
auto st = make_curl_request(stats, url.c_str(), &ret, std::move(cb));
auto st = make_curl_request(stats, url.c_str(), &ret, data, std::move(cb));
curl_slist_free_all(headers);
RETURN_NOT_OK(st);

Expand All @@ -749,7 +770,7 @@ Status Curl::post_data(

Status Curl::post_data_common(
const SerializationType serialization_type,
const BufferList* data,
BufferList* data,
struct curl_slist** headers) {
CURL* curl = curl_.get();
if (curl == nullptr)
Expand All @@ -771,7 +792,7 @@ Status Curl::post_data_common(
set_content_type(serialization_type, headers),
curl_slist_free_all(*headers));

/* HTTP PUT please */
/* HTTP POST please */
curl_easy_setopt(curl, CURLOPT_POST, 1L);
curl_easy_setopt(
curl, CURLOPT_READFUNCTION, buffer_list_read_memory_callback);
Expand Down Expand Up @@ -810,7 +831,7 @@ Status Curl::get_data(

CURLcode ret;
headerData.uri = &res_ns_uri;
auto st = make_curl_request(stats, url.c_str(), &ret, returned_data);
auto st = make_curl_request(stats, url.c_str(), &ret, nullptr, returned_data);
curl_slist_free_all(headers);
RETURN_NOT_OK(st);

Expand Down Expand Up @@ -849,7 +870,7 @@ Status Curl::options(

CURLcode ret;
headerData.uri = &res_ns_uri;
auto st = make_curl_request(stats, url.c_str(), &ret, returned_data);
auto st = make_curl_request(stats, url.c_str(), &ret, nullptr, returned_data);
curl_slist_free_all(headers);
RETURN_NOT_OK(st);

Expand Down Expand Up @@ -885,7 +906,7 @@ Status Curl::delete_data(

CURLcode ret;
headerData.uri = &res_uri;
auto st = make_curl_request(stats, url.c_str(), &ret, returned_data);
auto st = make_curl_request(stats, url.c_str(), &ret, nullptr, returned_data);

// Erase record in case of de-registered array
std::unique_lock<std::mutex> rd_lck(*(headerData.redirect_uri_map_lock));
Expand All @@ -903,15 +924,15 @@ Status Curl::patch_data(
stats::Stats* const stats,
const std::string& url,
const SerializationType serialization_type,
const BufferList* data,
BufferList* data,
Buffer* const returned_data,
const std::string& res_uri) {
struct curl_slist* headers;
RETURN_NOT_OK(patch_data_common(serialization_type, data, &headers));

CURLcode ret;
headerData.uri = &res_uri;
auto st = make_curl_request(stats, url.c_str(), &ret, returned_data);
auto st = make_curl_request(stats, url.c_str(), &ret, data, returned_data);
curl_slist_free_all(headers);
RETURN_NOT_OK(st);

Expand All @@ -923,7 +944,7 @@ Status Curl::patch_data(

Status Curl::patch_data_common(
const SerializationType serialization_type,
const BufferList* data,
BufferList* data,
struct curl_slist** headers) {
CURL* curl = curl_.get();
if (curl == nullptr)
Expand Down Expand Up @@ -968,15 +989,15 @@ Status Curl::put_data(
stats::Stats* const stats,
const std::string& url,
const SerializationType serialization_type,
const BufferList* data,
BufferList* data,
Buffer* const returned_data,
const std::string& res_uri) {
struct curl_slist* headers;
RETURN_NOT_OK(put_data_common(serialization_type, data, &headers));

CURLcode ret;
headerData.uri = &res_uri;
auto st = make_curl_request(stats, url.c_str(), &ret, returned_data);
auto st = make_curl_request(stats, url.c_str(), &ret, data, returned_data);
curl_slist_free_all(headers);
RETURN_NOT_OK(st);

Expand All @@ -988,7 +1009,7 @@ Status Curl::put_data(

Status Curl::put_data_common(
const SerializationType serialization_type,
const BufferList* data,
BufferList* data,
struct curl_slist** headers) {
CURL* curl = curl_.get();
if (curl == nullptr)
Expand Down
Loading

0 comments on commit 2a30fbe

Please sign in to comment.