From 2a30fbef1ca67d15d13924cd22fa31c95aebad0d Mon Sep 17 00:00:00 2001 From: Ypatia Tsavliri Date: Fri, 2 Aug 2024 10:47:29 +0300 Subject: [PATCH] Reset offsets in buffer list for retries (#5220) There's a bug in the retry logic when a 503 is encountered while uploading data. Our curl client fails to reset its BufferList state back to zero in this case and we end up sending a bunch of garbage data to the cloud service. This PR fixes the issue and adds a regression test that failed before and passes with the fix. [sc-49128] --- TYPE: BUG DESC: Reset offsets in buffer list for retries --- test/src/unit-capi-incomplete.cc | 45 +++++++++++++++++++++++++-- tiledb/sm/buffer/buffer_list.cc | 10 ++++++ tiledb/sm/buffer/buffer_list.h | 20 ++++++++++++ tiledb/sm/rest/curl.cc | 53 ++++++++++++++++++++++---------- tiledb/sm/rest/curl.h | 20 +++++++----- 5 files changed, 122 insertions(+), 26 deletions(-) diff --git a/test/src/unit-capi-incomplete.cc b/test/src/unit-capi-incomplete.cc index fe26c1f8c04..d45b02692f3 100644 --- a/test/src/unit-capi-incomplete.cc +++ b/test/src/unit-capi-incomplete.cc @@ -1145,7 +1145,7 @@ void IncompleteFx::check_sparse_unsplittable_complete() { TEST_CASE_METHOD( IncompleteFx, "C API: Test incomplete read queries, dense", - "[capi][incomplete][dense-incomplete][serialization][rest]") { + "[capi][incomplete][dense-incomplete][rest]") { create_dense_array(); write_dense_full(); check_dense_incomplete(); @@ -1159,7 +1159,7 @@ TEST_CASE_METHOD( TEST_CASE_METHOD( IncompleteFx, "C API: Test incomplete read queries, sparse", - "[capi][incomplete][sparse][serialization][rest]") { + "[capi][incomplete][sparse][rest]") { create_sparse_array(); write_sparse_full(); check_sparse_incomplete(); @@ -1171,8 +1171,47 @@ TEST_CASE_METHOD( TEST_CASE_METHOD( IncompleteFx, "C API: Test incomplete read queries, dense, serialized", - "[capi][incomplete][dense][serialization][rest]") { + "[capi][incomplete][dense][rest]") { create_dense_array(); write_dense_full(); check_dense_incomplete(); } + +TEST_CASE_METHOD( + IncompleteFx, + "C API: Test incomplete read queries, sparse", + "[capi][incomplete][sparse][retries][sc-49128][rest]") { + // This test is testing CURL logic and only makes sense on REST-CI + if (!vfs_test_setup_.is_rest()) { + return; + } + + // Force retries on successful requests to test that the buffers + // resetting in the retry logic works well + tiledb_config_t* cfg; + tiledb_error_t* err = nullptr; + int rc = tiledb_config_alloc(&cfg, &err); + REQUIRE(rc == TILEDB_OK); + REQUIRE(err == nullptr); + rc = tiledb_config_set(cfg, "rest.retry_http_codes", "200", &err); + REQUIRE(rc == TILEDB_OK); + REQUIRE(err == nullptr); + rc = tiledb_config_set(cfg, "rest.retry_count", "1", &err); + REQUIRE(rc == TILEDB_OK); + REQUIRE(err == nullptr); + rc = tiledb_config_set(cfg, "rest.retry_initial_delay_ms", "5", &err); + REQUIRE(rc == TILEDB_OK); + REQUIRE(err == nullptr); + + // Update the context with config + vfs_test_setup_.update_config(cfg); + ctx_ = vfs_test_setup_.ctx_c; + tiledb_config_free(&cfg); + + create_sparse_array(); + write_sparse_full(); + check_sparse_incomplete(); + check_sparse_until_complete(); + check_sparse_unsplittable_overflow(); + check_sparse_unsplittable_complete(); +} diff --git a/tiledb/sm/buffer/buffer_list.cc b/tiledb/sm/buffer/buffer_list.cc index 19206acecb0..769b2c53bdf 100644 --- a/tiledb/sm/buffer/buffer_list.cc +++ b/tiledb/sm/buffer/buffer_list.cc @@ -145,6 +145,16 @@ void BufferList::reset_offset() { current_relative_offset_ = 0; } +void BufferList::set_offset( + const size_t current_buffer_index, const uint64_t current_relative_offset) { + current_buffer_index_ = current_buffer_index; + current_relative_offset_ = current_relative_offset; +} + +std::tuple BufferList::get_offset() const { + return {current_buffer_index_, current_relative_offset_}; +} + uint64_t BufferList::total_size() const { uint64_t size = 0; for (const auto& b : buffers_) diff --git a/tiledb/sm/buffer/buffer_list.h b/tiledb/sm/buffer/buffer_list.h index b15a4f435a1..471205343f2 100644 --- a/tiledb/sm/buffer/buffer_list.h +++ b/tiledb/sm/buffer/buffer_list.h @@ -129,6 +129,26 @@ class BufferList { /** Returns the sum of sizes of all buffers in the list. */ uint64_t total_size() const; + /** + * Sets the current offsets for reading. + * + * @param current_buffer_index The index of the current buffer in the list. + * @param current_relative_offset The current relative offset within the + * current buffer. + * + * */ + void set_offset( + const size_t current_buffer_index, + const uint64_t current_relative_offset); + + /** + * Returns the current offsets. + * + * @return The index of the current buffer in the list. + * @return The current relative offset within the current buffer. + */ + std::tuple get_offset() const; + private: /** The underlying list of Buffers. */ std::vector buffers_; diff --git a/tiledb/sm/rest/curl.cc b/tiledb/sm/rest/curl.cc index 12eee7e447c..f30ce80051f 100644 --- a/tiledb/sm/rest/curl.cc +++ b/tiledb/sm/rest/curl.cc @@ -425,11 +425,13 @@ Status Curl::make_curl_request( stats::Stats* const stats, const char* url, CURLcode* curl_code, + BufferList* data, Buffer* returned_data) const { return make_curl_request_common( stats, url, curl_code, + data, &write_memory_callback, static_cast(returned_data)); } @@ -438,11 +440,13 @@ Status Curl::make_curl_request( stats::Stats* const stats, const char* url, CURLcode* curl_code, + BufferList* data, PostResponseCb&& cb) const { return make_curl_request_common( stats, url, curl_code, + data, &write_memory_callback_cb, static_cast(&cb)); } @@ -478,6 +482,7 @@ Status Curl::make_curl_request_common( stats::Stats* const stats, const char* const url, CURLcode* const curl_code, + BufferList* data, size_t (*write_cb)(void*, size_t, size_t, void*), void* const write_cb_arg) const { CURL* curl = curl_.get(); @@ -487,11 +492,27 @@ Status Curl::make_curl_request_common( *curl_code = CURLE_OK; uint64_t retry_delay = retry_initial_delay_ms_; + + // Save the offsets before the request in case we need to retry + size_t current_buffer_index = 0; + uint64_t current_relative_offset = 0; + if (data != nullptr) { + std::tie(current_buffer_index, current_relative_offset) = + data->get_offset(); + } + // <= because the 0ths retry is actually the initial request for (uint8_t i = 0; i <= retry_count_; i++) { WriteCbState write_cb_state; write_cb_state.arg = write_cb_arg; + // If this is a retry we need to reset the offsets in the data buffer list + // to the initial position before the failed request so that we send the + // correct data. + if (data != nullptr && retry_count_ > 0) { + data->set_offset(current_buffer_index, current_relative_offset); + } + /* set url to fetch */ curl_easy_setopt(curl, CURLOPT_URL, url); @@ -548,7 +569,7 @@ Status Curl::make_curl_request_common( /* fetch the url */ CURLcode tmp_curl_code = curl_easy_perform_instrumented(url, i); - bool retry; + bool retry = false; RETURN_NOT_OK(should_retry_based_on_http_status(&retry)); /* If Curl call was successful (not http status, but no socket error, etc) @@ -704,7 +725,7 @@ Status Curl::post_data( stats::Stats* const stats, const std::string& url, const SerializationType serialization_type, - const BufferList* data, + BufferList* data, Buffer* const returned_data, const std::string& res_uri) { struct curl_slist* headers; @@ -714,7 +735,7 @@ Status Curl::post_data( CURLcode ret; headerData.uri = &res_uri; - auto st = make_curl_request(stats, url.c_str(), &ret, returned_data); + auto st = make_curl_request(stats, url.c_str(), &ret, data, returned_data); curl_slist_free_all(headers); RETURN_NOT_OK(st); @@ -728,7 +749,7 @@ Status Curl::post_data( stats::Stats* const stats, const std::string& url, const SerializationType serialization_type, - const BufferList* data, + BufferList* data, Buffer* const returned_data, PostResponseCb&& cb, const std::string& res_uri) { @@ -737,7 +758,7 @@ Status Curl::post_data( CURLcode ret; headerData.uri = &res_uri; - auto st = make_curl_request(stats, url.c_str(), &ret, std::move(cb)); + auto st = make_curl_request(stats, url.c_str(), &ret, data, std::move(cb)); curl_slist_free_all(headers); RETURN_NOT_OK(st); @@ -749,7 +770,7 @@ Status Curl::post_data( Status Curl::post_data_common( const SerializationType serialization_type, - const BufferList* data, + BufferList* data, struct curl_slist** headers) { CURL* curl = curl_.get(); if (curl == nullptr) @@ -771,7 +792,7 @@ Status Curl::post_data_common( set_content_type(serialization_type, headers), curl_slist_free_all(*headers)); - /* HTTP PUT please */ + /* HTTP POST please */ curl_easy_setopt(curl, CURLOPT_POST, 1L); curl_easy_setopt( curl, CURLOPT_READFUNCTION, buffer_list_read_memory_callback); @@ -810,7 +831,7 @@ Status Curl::get_data( CURLcode ret; headerData.uri = &res_ns_uri; - auto st = make_curl_request(stats, url.c_str(), &ret, returned_data); + auto st = make_curl_request(stats, url.c_str(), &ret, nullptr, returned_data); curl_slist_free_all(headers); RETURN_NOT_OK(st); @@ -849,7 +870,7 @@ Status Curl::options( CURLcode ret; headerData.uri = &res_ns_uri; - auto st = make_curl_request(stats, url.c_str(), &ret, returned_data); + auto st = make_curl_request(stats, url.c_str(), &ret, nullptr, returned_data); curl_slist_free_all(headers); RETURN_NOT_OK(st); @@ -885,7 +906,7 @@ Status Curl::delete_data( CURLcode ret; headerData.uri = &res_uri; - auto st = make_curl_request(stats, url.c_str(), &ret, returned_data); + auto st = make_curl_request(stats, url.c_str(), &ret, nullptr, returned_data); // Erase record in case of de-registered array std::unique_lock rd_lck(*(headerData.redirect_uri_map_lock)); @@ -903,7 +924,7 @@ Status Curl::patch_data( stats::Stats* const stats, const std::string& url, const SerializationType serialization_type, - const BufferList* data, + BufferList* data, Buffer* const returned_data, const std::string& res_uri) { struct curl_slist* headers; @@ -911,7 +932,7 @@ Status Curl::patch_data( CURLcode ret; headerData.uri = &res_uri; - auto st = make_curl_request(stats, url.c_str(), &ret, returned_data); + auto st = make_curl_request(stats, url.c_str(), &ret, data, returned_data); curl_slist_free_all(headers); RETURN_NOT_OK(st); @@ -923,7 +944,7 @@ Status Curl::patch_data( Status Curl::patch_data_common( const SerializationType serialization_type, - const BufferList* data, + BufferList* data, struct curl_slist** headers) { CURL* curl = curl_.get(); if (curl == nullptr) @@ -968,7 +989,7 @@ Status Curl::put_data( stats::Stats* const stats, const std::string& url, const SerializationType serialization_type, - const BufferList* data, + BufferList* data, Buffer* const returned_data, const std::string& res_uri) { struct curl_slist* headers; @@ -976,7 +997,7 @@ Status Curl::put_data( CURLcode ret; headerData.uri = &res_uri; - auto st = make_curl_request(stats, url.c_str(), &ret, returned_data); + auto st = make_curl_request(stats, url.c_str(), &ret, data, returned_data); curl_slist_free_all(headers); RETURN_NOT_OK(st); @@ -988,7 +1009,7 @@ Status Curl::put_data( Status Curl::put_data_common( const SerializationType serialization_type, - const BufferList* data, + BufferList* data, struct curl_slist** headers) { CURL* curl = curl_.get(); if (curl == nullptr) diff --git a/tiledb/sm/rest/curl.h b/tiledb/sm/rest/curl.h index 3270f3ce5b6..9defebb2642 100644 --- a/tiledb/sm/rest/curl.h +++ b/tiledb/sm/rest/curl.h @@ -154,7 +154,7 @@ class Curl { stats::Stats* stats, const std::string& url, SerializationType serialization_type, - const BufferList* data, + BufferList* data, Buffer* returned_data, const std::string& res_ns_uri); @@ -174,7 +174,7 @@ class Curl { stats::Stats* stats, const std::string& url, SerializationType serialization_type, - const BufferList* data, + BufferList* data, Buffer* returned_data, const std::string& res_ns_uri); @@ -194,7 +194,7 @@ class Curl { stats::Stats* stats, const std::string& url, SerializationType serialization_type, - const BufferList* data, + BufferList* data, Buffer* returned_data, const std::string& res_ns_uri); @@ -247,7 +247,7 @@ class Curl { stats::Stats* stats, const std::string& url, SerializationType serialization_type, - const BufferList* data, + BufferList* data, Buffer* returned_data, PostResponseCb&& write_cb, const std::string& res_ns_uri); @@ -264,7 +264,7 @@ class Curl { */ Status patch_data_common( SerializationType serialization_type, - const BufferList* data, + BufferList* data, struct curl_slist** headers); /** @@ -279,7 +279,7 @@ class Curl { */ Status put_data_common( SerializationType serialization_type, - const BufferList* data, + BufferList* data, struct curl_slist** headers); /** @@ -294,7 +294,7 @@ class Curl { */ Status post_data_common( SerializationType serialization_type, - const BufferList* data, + BufferList* data, struct curl_slist** headers); /** @@ -412,6 +412,7 @@ class Curl { * @param stats The stats instance to record into * @param url URL to fetch * @param curl_code Set to the return value of the curl call + * @param data Encoded data buffer for sending, if applicable to the request * @param returned_data Buffer that will store the response data * @return Status */ @@ -419,6 +420,7 @@ class Curl { stats::Stats* const stats, const char* url, CURLcode* curl_code, + BufferList* data, Buffer* returned_data) const; /** @@ -428,6 +430,7 @@ class Curl { * @param stats The stats instance to record into * @param url URL to fetch * @param curl_code Set to the return value of the curl call + * @param data Encoded data buffer for sending, if applicable to the request * @param write_cb Callback to invoke as response data is received * @return Status */ @@ -435,6 +438,7 @@ class Curl { stats::Stats* stats, const char* url, CURLcode* curl_code, + BufferList* data, PostResponseCb&& write_cb) const; /** @@ -443,6 +447,7 @@ class Curl { * @param stats The stats instance to record into * @param url URL to fetch * @param curl_code Set to the return value of the curl call + * @param data Encoded data buffer for sending, if applicable to the request * @param write_cb Callback to invoke as response data is received. * @param write_arg Opaque memory address passed to 'write_cb'. * @return Status @@ -451,6 +456,7 @@ class Curl { stats::Stats* stats, const char* url, CURLcode* curl_code, + BufferList* data, size_t (*write_cb)(void*, size_t, size_t, void*), void* write_arg) const;