Skip to content

Commit

Permalink
[SKV-618] fix(ddl_client): sleep for a while before retry once meta s…
Browse files Browse the repository at this point in the history
…erver is busy

https://jira.sensorsdata.cn/browse/SKV-618

社区issue:apache#1449

社区pr:apache#1453

Under some circumstances, creating/dropping table might spend more time,
which could even be greater than RPC timeout. Then, once DDL client launched
another requests to meta server, it would receive `ERR_BUSY_CREATING` or
`ERR_BUSY_DROPPING`. Between these two attempts there is no interval, thus
these attempts are ineffective.

Therefore, a retry mechanism is introduced to resolve this problem:

- the max number of attempts is configurable; 
- the retry mechanism for failed sending request to meta server due to
`ERR_NETWORK_FAILURE` or `ERR_TIMEOUT` is unchanged, and the max
number of attempts is restrict by the configuration;
- once response is `ERR_BUSY_CREATING` or `ERR_BUSY_DROPPING`, DDL
client would also retry, and between two attempts there is an configurable
interval.
  • Loading branch information
王聃 committed May 23, 2023
1 parent 7e99c01 commit ede7854
Show file tree
Hide file tree
Showing 12 changed files with 566 additions and 115 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/lint_and_test_cpp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ jobs:
- base_test
- dsn_aio_test
- dsn_block_service_test
- dsn_client_test
- dsn.failure_detector.tests
- dsn_http_test
- dsn_meta_state_tests
Expand Down Expand Up @@ -216,6 +217,7 @@ jobs:
- base_test
- dsn_aio_test
- dsn_block_service_test
- dsn_client_test
- dsn.failure_detector.tests
- dsn_http_test
- dsn_meta_state_tests
Expand Down Expand Up @@ -316,6 +318,7 @@ jobs:
- base_test
- dsn_aio_test
- dsn_block_service_test
- dsn_client_test
- dsn.failure_detector.tests
- dsn_http_test
- dsn_meta_state_tests
Expand Down
2 changes: 1 addition & 1 deletion run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ function run_test()

BUILD_DIR=$ROOT/src/builder
if [ "$test_modules" == "" ]; then
test_modules="dsn_runtime_tests,dsn_utils_tests,dsn_perf_counter_test,dsn.zookeeper.tests,dsn_aio_test,dsn.failure_detector.tests,dsn_meta_state_tests,dsn_nfs_test,dsn_block_service_test,dsn.replication.simple_kv,dsn.rep_tests.simple_kv,dsn.meta.test,dsn.replica.test,dsn_http_test,dsn_replica_dup_test,dsn_replica_backup_test,dsn_replica_bulk_load_test,dsn_replica_split_test,pegasus_unit_test,pegasus_function_test"
test_modules="dsn_runtime_tests,dsn_utils_tests,dsn_perf_counter_test,dsn.zookeeper.tests,dsn_aio_test,dsn.failure_detector.tests,dsn_meta_state_tests,dsn_nfs_test,dsn_block_service_test,dsn.replication.simple_kv,dsn.rep_tests.simple_kv,dsn.meta.test,dsn.replica.test,dsn_http_test,dsn_replica_dup_test,dsn_replica_backup_test,dsn_replica_bulk_load_test,dsn_replica_split_test,dsn_client_test,pegasus_unit_test,pegasus_function_test"
fi
echo "test_modules=$test_modules"

Expand Down
110 changes: 102 additions & 8 deletions src/rdsn/include/dsn/dist/replication/replication_ddl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,25 @@
#include <dsn/dist/replication.h>

#include <cctype>
#include <chrono>
#include <deque>
#include <gtest/gtest_prod.h>
#include <string>
#include <map>
#include <thread>
#include <vector>

#include <dsn/tool-api/task_tracker.h>
#include <dsn/tool-api/async_calls.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/errors.h>
#include <dsn/utility/fail_point.h>
#include <dsn/utility/flags.h>
#include <dsn/utility/ports.h>
#include <dsn/utility/string_view.h>

DSN_DECLARE_uint32(ddl_client_max_attempt_count);
DSN_DECLARE_uint32(ddl_client_retry_interval_ms);

namespace dsn {
namespace replication {
Expand Down Expand Up @@ -240,32 +252,99 @@ class replication_ddl_client
bool static valid_app_char(int c);

void end_meta_request(const rpc_response_task_ptr &callback,
int retry_times,
error_code err,
uint32_t attempt_count,
const error_code &err,
dsn::message_ex *request,
dsn::message_ex *resp);

template <typename TRequest>
rpc_response_task_ptr request_meta(dsn::task_code code,
rpc_response_task_ptr request_meta(const dsn::task_code &code,
std::shared_ptr<TRequest> &req,
int timeout_milliseconds = 0,
int reply_thread_hash = 0)
{
dsn::message_ex *msg = dsn::message_ex::create_request(code, timeout_milliseconds);
::dsn::marshall(msg, *req);
auto msg = dsn::message_ex::create_request(code, timeout_milliseconds);
dsn::marshall(msg, *req);

rpc_response_task_ptr task = ::dsn::rpc::create_rpc_response_task(
msg, nullptr, empty_rpc_handler, reply_thread_hash);
auto task =
dsn::rpc::create_rpc_response_task(msg, nullptr, empty_rpc_handler, reply_thread_hash);
rpc::call(_meta_server,
msg,
&_tracker,
[this, task](
error_code err, dsn::message_ex *request, dsn::message_ex *response) mutable {
end_meta_request(std::move(task), 0, err, request, response);

FAIL_POINT_INJECT_NOT_RETURN_F(
"ddl_client_request_meta",
[&err, this](dsn::string_view str) { err = pop_mock_error(); });

end_meta_request(std::move(task), 1, err, request, response);
});
return task;
}

static inline bool is_busy(const dsn::error_code &err)
{
return err == dsn::ERR_BUSY_CREATING || err == dsn::ERR_BUSY_DROPPING;
}

template <typename TRequest, typename TResponse>
rpc_response_task_ptr request_meta_and_wait_response(const dsn::task_code &code,
std::shared_ptr<TRequest> &req,
TResponse &resp,
int timeout_milliseconds = 0,
int reply_thread_hash = 0)
{
rpc_response_task_ptr resp_task;
for (uint32_t i = 1; i <= FLAGS_ddl_client_max_attempt_count; ++i) {
resp_task = request_meta(code, req, timeout_milliseconds, reply_thread_hash);
resp_task->wait();

// Failed to send request to meta server. The possible reason might be:
// * cannot connect to meta server (such as ERR_NETWORK_FAILURE);
// * do not receive any response from meta server (such as ERR_TIMEOUT)
if (resp_task->error() != dsn::ERR_OK) {
return resp_task;
}

// Once response is nullptr, it must be mocked by unit tests since network is
// not connected.
if (dsn_likely(resp_task->get_response() != nullptr)) {
// Received the response from meta server successfully, thus deserialize the
// response.
dsn::unmarshall(resp_task->get_response(), resp);
}

FAIL_POINT_INJECT_NOT_RETURN_F(
"ddl_client_request_meta",
[&resp, this](dsn::string_view str) { resp.err = pop_mock_error(); });

ddebug_f("received response from meta server: rpc_code={}, err={}, attempt_count={}, "
"max_attempt_count={}",
code,
resp.err,
i,
FLAGS_ddl_client_max_attempt_count);

// Once `err` field in the received response is ERR_OK or some non-busy error, do not
// attempt again.
if (resp.err == dsn::ERR_OK || !is_busy(resp.err)) {
return resp_task;
}

// Would not sleep for the last attempt.
if (i < FLAGS_ddl_client_max_attempt_count) {
dwarn_f("sleep {} milliseconds before launch another attempt for {}: err={}",
FLAGS_ddl_client_retry_interval_ms,
code,
resp.err);
std::this_thread::sleep_for(
std::chrono::milliseconds(FLAGS_ddl_client_retry_interval_ms));
}
}
return resp_task;
}

/// Send request to meta server synchronously.
template <typename TRpcHolder, typename TResponse = typename TRpcHolder::response_type>
error_with<TResponse> call_rpc_sync(TRpcHolder rpc, int reply_thread_hash = 0)
Expand Down Expand Up @@ -327,6 +406,21 @@ class replication_ddl_client
dsn::rpc_address _meta_server;
dsn::task_tracker _tracker;

// Used only for unit tests.
FRIEND_TEST(DDLClientTest, RetryMetaRequest);
void set_mock_errors(const std::vector<dsn::error_code> &mock_errors)
{
_mock_errors.assign(mock_errors.begin(), mock_errors.end());
}
dsn::error_code pop_mock_error()
{
dassert(!_mock_errors.empty(), "_mock_errors should be non-empty");
auto err = _mock_errors.front();
_mock_errors.pop_front();
return err;
}
std::deque<dsn::error_code> _mock_errors;

typedef rpc_holder<detect_hotkey_request, detect_hotkey_response> detect_hotkey_rpc;
typedef rpc_holder<query_disk_info_request, query_disk_info_response> query_disk_info_rpc;
typedef rpc_holder<add_new_disk_request, add_new_disk_response> add_new_disk_rpc;
Expand Down
3 changes: 2 additions & 1 deletion src/rdsn/include/dsn/tool-api/task_code.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,4 +189,5 @@ DEFINE_TASK_CODE(TASK_CODE_INVALID, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT)
// define a task_code "task_code_inlined", it's mainly used in situations when you want execute
// a task with "inline" mode.
DEFINE_TASK_CODE(TASK_CODE_EXEC_INLINED, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT)
}

} // namespace dsn
25 changes: 17 additions & 8 deletions src/rdsn/include/dsn/utility/fail_point.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,37 @@
/// A fail point implementation in C++.
/// This lib is ported from https://github.com/pingcap/fail-rs.

#include <dsn/utility/ports.h>
#include <dsn/utility/string_view.h>

/// The only entry to define a fail point with `return` function: lambda function must be
/// return non-void type. When a fail point is defined, it's referenced via the name.
#define FAIL_POINT_INJECT_F(name, lambda) \
// The only entry to define a fail point with `return` function: lambda function must be
// return non-void type. When a fail point is defined, it's referenced via the name.
//
// Lambda function is declare as variadic argument, for the reason that comma(,) might exist in
// lambda expressions (for example, capture or parameter list). If it was declared as a single
// argument, preprocess for this macro would fail for mismatched arguments.
#define FAIL_POINT_INJECT_F(name, ...) \
do { \
if (dsn_likely(!::dsn::fail::_S_FAIL_POINT_ENABLED)) \
break; \
auto __Func = lambda; \
auto __Func = __VA_ARGS__; \
auto __Res = ::dsn::fail::eval(name); \
if (__Res != nullptr) { \
return __Func(*__Res); \
} \
} while (0)

/// The only entry to define a fail point with `not return` function: lambda function usually
/// return void type. When a fail point is defined, it's referenced via the name.
#define FAIL_POINT_INJECT_NOT_RETURN_F(name, lambda) \
// The only entry to define a fail point with `not return` function: lambda function usually
// return void type. When a fail point is defined, it's referenced via the name.
//
// Lambda function is declare as variadic argument, for the reason that comma(,) might exist in
// lambda expressions (for example, capture or parameter list). If it was declared as a single
// argument, preprocess for this macro would fail for mismatched arguments.
#define FAIL_POINT_INJECT_NOT_RETURN_F(name, ...) \
do { \
if (dsn_likely(!::dsn::fail::_S_FAIL_POINT_ENABLED)) \
break; \
auto __Func = lambda; \
auto __Func = __VA_ARGS__; \
auto __Res = ::dsn::fail::eval(name); \
if (__Res != nullptr) { \
__Func(*__Res); \
Expand Down
2 changes: 2 additions & 0 deletions src/rdsn/src/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ set(MY_PROJ_LIBS "")
set(MY_BINPLACES "")

dsn_add_static_library()

add_subdirectory(test)
Loading

0 comments on commit ede7854

Please sign in to comment.