-
Notifications
You must be signed in to change notification settings - Fork 312
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
server: add check_and_mutate() interface and implementation #161
Changes from 23 commits
64685b4
02e8965
6f55951
02e1b92
8081041
7f62ae9
ebf111b
b8e66c7
0b6594b
d62f9ba
2c9bc76
eb30735
b924587
110555e
9507cd7
aee1daa
5fc6ea6
0f83bec
7e2a266
14c7171
2a4e4c3
37c2d1f
793beff
544760f
21b978a
91d6d91
a7e5ae0
9362f1f
90bc568
0fb9684
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -876,6 +876,14 @@ void pegasus_client_impl::async_check_and_set(const std::string &hash_key, | |
return; | ||
} | ||
|
||
if (dsn::apps::_cas_check_type_VALUES_TO_NAMES.find(check_type) == | ||
dsn::apps::_cas_check_type_VALUES_TO_NAMES.end()) { | ||
derror("invalid check type: %d", (int)check_type); | ||
if (callback != nullptr) | ||
callback(PERR_INVALID_ARGUMENT, check_and_set_results(), internal_info()); | ||
return; | ||
} | ||
|
||
::dsn::apps::check_and_set_request req; | ||
req.hash_key.assign(hash_key.c_str(), 0, hash_key.size()); | ||
req.check_sort_key.assign(check_sort_key.c_str(), 0, check_sort_key.size()); | ||
|
@@ -938,6 +946,123 @@ void pegasus_client_impl::async_check_and_set(const std::string &hash_key, | |
partition_hash); | ||
} | ||
|
||
int pegasus_client_impl::check_and_mutate(const std::string &hash_key, | ||
const std::string &check_sort_key, | ||
cas_check_type check_type, | ||
const std::string &check_operand, | ||
const mutations &mutations, | ||
const check_and_mutate_options &options, | ||
check_and_mutate_results &results, | ||
int timeout_milliseconds, | ||
internal_info *info) | ||
{ | ||
::dsn::utils::notify_event op_completed; | ||
int ret = -1; | ||
auto callback = [&](int _err, check_and_mutate_results &&_results, internal_info &&_info) { | ||
ret = _err; | ||
results = std::move(_results); | ||
if (info != nullptr) | ||
(*info) = std::move(_info); | ||
op_completed.notify(); | ||
}; | ||
async_check_and_mutate(hash_key, | ||
check_sort_key, | ||
check_type, | ||
check_operand, | ||
mutations, | ||
options, | ||
std::move(callback), | ||
timeout_milliseconds); | ||
op_completed.wait(); | ||
return ret; | ||
} | ||
|
||
void pegasus_client_impl::async_check_and_mutate(const std::string &hash_key, | ||
const std::string &check_sort_key, | ||
cas_check_type check_type, | ||
const std::string &check_operand, | ||
const mutations &mutations, | ||
const check_and_mutate_options &options, | ||
async_check_and_mutate_callback_t &&callback, | ||
int timeout_milliseconds) | ||
{ | ||
// check params | ||
if (hash_key.size() >= UINT16_MAX) { | ||
derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d", | ||
(int)hash_key.size()); | ||
if (callback != nullptr) | ||
callback(PERR_INVALID_HASH_KEY, check_and_mutate_results(), internal_info()); | ||
return; | ||
} | ||
|
||
if (dsn::apps::_cas_check_type_VALUES_TO_NAMES.find(check_type) == | ||
dsn::apps::_cas_check_type_VALUES_TO_NAMES.end()) { | ||
derror("invalid check type: %d", (int)check_type); | ||
if (callback != nullptr) | ||
callback(PERR_INVALID_ARGUMENT, check_and_mutate_results(), internal_info()); | ||
return; | ||
} | ||
if (mutations.is_empty()) { | ||
derror("invalid mutations: mutations should not be empty."); | ||
if (callback != nullptr) | ||
callback(PERR_INVALID_HASH_KEY, check_and_mutate_results(), internal_info()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 不是PERR_INVALID_HASH_KEY There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 应当是PERR_INVALID_ARGUMENT |
||
return; | ||
} | ||
|
||
::dsn::apps::check_and_mutate_request req; | ||
req.hash_key.assign(hash_key.c_str(), 0, hash_key.size()); | ||
req.check_sort_key.assign(check_sort_key.c_str(), 0, check_sort_key.size()); | ||
req.check_type = (dsn::apps::cas_check_type::type)check_type; | ||
req.check_operand.assign(check_operand.c_str(), 0, check_operand.size()); | ||
mutations.get_mutations(req.mutate_list); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. get_mutations名字是不是不大合适? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 我觉得还可以,java中也是这么写的。毕竟也是const函数,不修改原值。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. get_mutations这里考虑到的两点是 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 我觉得现在这样就可以 |
||
req.return_check_value = options.return_check_value; | ||
|
||
::dsn::blob tmp_key; | ||
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob()); | ||
auto partition_hash = pegasus_key_hash(tmp_key); | ||
auto new_callback = [user_callback = std::move(callback)]( | ||
::dsn::error_code err, dsn_message_t req, dsn_message_t resp) | ||
{ | ||
if (user_callback == nullptr) { | ||
return; | ||
} | ||
check_and_mutate_results results; | ||
internal_info info; | ||
::dsn::apps::check_and_mutate_response response; | ||
if (err == ::dsn::ERR_OK) { | ||
::dsn::unmarshall(resp, response); | ||
if (response.error == 0) { | ||
results.mutate_succeed = true; | ||
} else if (response.error == 13) { // kTryAgain | ||
results.mutate_succeed = false; | ||
response.error = 0; | ||
} else { | ||
results.mutate_succeed = false; | ||
} | ||
if (response.check_value_returned) { | ||
results.check_value_returned = true; | ||
if (response.check_value_exist) { | ||
results.check_value_exist = true; | ||
results.check_value.assign(response.check_value.data(), | ||
response.check_value.length()); | ||
} | ||
} | ||
info.app_id = response.app_id; | ||
info.partition_index = response.partition_index; | ||
info.decree = response.decree; | ||
info.server = response.server; | ||
} | ||
int ret = | ||
get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err)); | ||
user_callback(ret, std::move(results), std::move(info)); | ||
}; | ||
_client->check_and_mutate(req, | ||
std::move(new_callback), | ||
std::chrono::milliseconds(timeout_milliseconds), | ||
0, | ||
partition_hash); | ||
} | ||
|
||
int pegasus_client_impl::ttl(const std::string &hash_key, | ||
const std::string &sort_key, | ||
int &ttl_seconds, | ||
|
@@ -1171,5 +1296,5 @@ const char *pegasus_client_impl::get_error_string(int error_code) const | |
{ | ||
return (rocskdb_error == 0) ? 0 : ROCSKDB_ERROR_START - rocskdb_error; | ||
} | ||
} | ||
} // namespace | ||
} // namespace client | ||
} // namespace pegasus |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,6 +40,12 @@ enum cas_check_type | |
CT_VALUE_INT_GREATER // int compare: value > operand | ||
} | ||
|
||
enum mutate_operation | ||
{ | ||
MO_PUT, | ||
MO_DELETE | ||
} | ||
|
||
struct update_request | ||
{ | ||
1:dsn.blob key; | ||
|
@@ -181,6 +187,38 @@ struct check_and_set_response | |
8:string server; | ||
} | ||
|
||
struct mutate | ||
{ | ||
1:mutate_operation operation; | ||
2:dsn.blob sort_key; | ||
3:dsn.blob value; // set null if operation is MO_DELETE | ||
4:i32 set_expire_ts_seconds; // set 0 if operation is MO_DELETE | ||
} | ||
|
||
struct check_and_mutate_request | ||
{ | ||
1:dsn.blob hash_key; | ||
2:dsn.blob check_sort_key; | ||
3:cas_check_type check_type; | ||
4:dsn.blob check_operand; | ||
5:list<mutate> mutate_list; | ||
6:bool return_check_value; | ||
} | ||
|
||
struct check_and_mutate_response | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个可以直接使用 check_and_set_response,不用新增struct了 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 感觉这个留着好。以后更有可能的是用check_and_mutate把check_and_set给实现了,到时候还是得改。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 在 XiaoMi/rdsn#149 这个修复提交后,就可以移除 check_and_set,并发布新版本1.11.0 。这样java client 1.10.0在访问server 1.11.0,check_and_set操作不存在,也能返回handle not found的错误码,用户很容易就知道是什么问题,然后升级到java client 1.11.0就可以了。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 和伟杰讨论下,还是按照现在的方案,同时保留check_and_set和check_and_mutate两个接口,以保持各个版本间的兼容性。不过在pegasus_write_service的实现里面,这两个函数的实现上绝大部分逻辑都是一样的,尽量将重复的代码合并。 |
||
{ | ||
1:i32 error; // return kTryAgain if check not passed. | ||
// return kInvalidArgument if check type is int compare and | ||
// check_operand/check_value is not integer or out of range. | ||
2:bool check_value_returned; | ||
3:bool check_value_exist; // used only if check_value_returned is true | ||
4:dsn.blob check_value; // used only if check_value_returned and check_value_exist is true | ||
5:i32 app_id; | ||
6:i32 partition_index; | ||
7:i64 decree; | ||
8:string server; | ||
} | ||
|
||
struct get_scanner_request | ||
{ | ||
1:dsn.blob start_key; | ||
|
@@ -218,6 +256,7 @@ service rrdb | |
multi_remove_response multi_remove(1:multi_remove_request request); | ||
incr_response incr(1:incr_request request); | ||
check_and_set_response check_and_set(1:check_and_set_request request); | ||
check_and_mutate_response check_and_mutate(1:check_and_mutate_request request); | ||
read_response get(1:dsn.blob key); | ||
multi_get_response multi_get(1:multi_get_request request); | ||
count_response sortkey_count(1:dsn.blob hash_key); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
那这里也要改了,建议给这个改动加个单测
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里为什么要改?