Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve performance of count_data #1091

Merged
merged 13 commits into from
Aug 30, 2022
2 changes: 2 additions & 0 deletions idl/rrdb.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ struct get_scanner_request
11:optional bool validate_partition_hash;
12:optional bool return_expire_ts;
13:optional bool full_scan; // true means client want to build 'full scan' context with the server side, false otherwise
14:optional bool only_return_count = false;
}

struct scan_request
Expand All @@ -294,6 +295,7 @@ struct scan_response
4:i32 app_id;
5:i32 partition_index;
6:string server;
7:optional i32 kv_count = -1;
}

struct duplicate_request
Expand Down
17 changes: 14 additions & 3 deletions src/client_lib/pegasus_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,21 @@ class pegasus_client_impl : public pegasus_client

static void init_error();

enum class async_scan_type : char
GehaFearless marked this conversation as resolved.
Show resolved Hide resolved
{
normal,
GehaFearless marked this conversation as resolved.
Show resolved Hide resolved
count_only,
count_only_finished
};

class pegasus_scanner_impl : public pegasus_scanner
{
public:
int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
internal_info *info = nullptr) override;
internal_info *info = nullptr,
int32_t *count = nullptr) override;
GehaFearless marked this conversation as resolved.
Show resolved Hide resolved

void async_next(async_scan_next_callback_t &&) override;

Expand Down Expand Up @@ -287,13 +295,15 @@ class pegasus_client_impl : public pegasus_client
std::vector<::dsn::apps::key_value> _kvs;
internal_info _info;
int32_t _p;
int32_t _kv_count;

int64_t _context;
mutable ::dsn::zlock _lock;
std::list<async_scan_next_callback_t> _queue;
volatile bool _rpc_started;
bool _validate_partition_hash;
bool _full_scan;
async_scan_type _type;

void _async_next_internal();
void _start_scan();
Expand Down Expand Up @@ -323,9 +333,10 @@ class pegasus_client_impl : public pegasus_client
int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
internal_info *info) override
internal_info *info,
int32_t *count = nullptr) override
{
return _p->next(hashkey, sortkey, value, info);
return _p->next(hashkey, sortkey, value, info, count);
}
};

Expand Down
60 changes: 45 additions & 15 deletions src/client_lib/pegasus_scanner_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,20 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd
_options(options),
_splits_hash(std::move(hash)),
_p(-1),
_kv_count(-1),
_context(SCAN_CONTEXT_ID_COMPLETED),
_rpc_started(false),
_validate_partition_hash(validate_partition_hash),
_full_scan(full_scan)
_full_scan(full_scan),
_type(async_scan_type::normal)
{
}

int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
GehaFearless marked this conversation as resolved.
Show resolved Hide resolved
std::string &sortkey,
std::string &value,
internal_info *info)
internal_info *info,
int32_t *count)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
Expand All @@ -70,14 +73,18 @@ int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
std::string &&sort,
std::string &&val,
internal_info &&ii,
uint32_t expire_ts_seconds) {
uint32_t expire_ts_seconds,
int32_t kv_count) {
ret = err;
hashkey = std::move(hash);
sortkey = std::move(sort);
value = std::move(val);
if (info) {
(*info) = std::move(ii);
}
if (count) {
(*count) = kv_count;
}
op_completed.notify();
};
async_next(std::move(callback));
Expand Down Expand Up @@ -120,7 +127,8 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()

std::list<async_scan_next_callback_t> temp;
while (true) {
while (++_p >= _kvs.size()) {
// count_only means should calculate kv counts once
while (++_p >= _kvs.size() && _type != async_scan_type::count_only) {
if (_context == SCAN_CONTEXT_ID_COMPLETED) {
// reach the end of one partition
if (_splits_hash.empty()) {
Expand All @@ -139,7 +147,8 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
std::string(),
std::string(),
std::move(info),
0);
0,
-1);
}
}
return;
Expand All @@ -162,13 +171,16 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
}

// valid data got
std::string hash_key, sort_key;
pegasus_restore_key(_kvs[_p].key, hash_key, sort_key);
std::string value(_kvs[_p].value.data(), _kvs[_p].value.length());
uint32_t expire_ts_seconds = _kvs[_p].__isset.expire_ts_seconds
? static_cast<uint32_t>(_kvs[_p].expire_ts_seconds)
: 0;
std::string hash_key, sort_key, value;
uint32_t expire_ts_seconds = 0;

if (!_options.only_return_count) {
pegasus_restore_key(_kvs[_p].key, hash_key, sort_key);
value = std::string(_kvs[_p].value.data(), _kvs[_p].value.length());
if (_kvs[_p].__isset.expire_ts_seconds) {
expire_ts_seconds = static_cast<uint32_t>(_kvs[_p].expire_ts_seconds);
}
}
auto &callback = _queue.front();
if (callback) {
internal_info info(_info);
Expand All @@ -178,7 +190,11 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
std::move(sort_key),
std::move(value),
std::move(info),
expire_ts_seconds);
expire_ts_seconds,
_kv_count);
if (_options.only_return_count) {
_type = async_scan_type::count_only_finished;
}
_lock.lock();
if (_queue.size() == 1) {
// keep the last callback until exit this function
Expand Down Expand Up @@ -230,6 +246,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_start_scan()
req.__set_validate_partition_hash(_validate_partition_hash);
req.__set_return_expire_ts(_options.return_expire_ts);
req.__set_full_scan(_full_scan);
req.__set_only_return_count(_options.only_return_count);

dassert(!_rpc_started, "");
_rpc_started = true;
Expand Down Expand Up @@ -261,6 +278,17 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c
_kvs = std::move(response.kvs);
_p = -1;
_context = response.context_id;
// 1. kv_count exist on response means server is newer version (added counting size only
// implementation)
// 1> kv_count == -1 indicates response still have key and value data
// 2> kv_count > -1 indicates response only have kv size count, but not key && value
// 2. kv_count is not existed means server is older version
if (response.__isset.kv_count) {
if (response.kv_count != -1) {
GehaFearless marked this conversation as resolved.
Show resolved Hide resolved
_type = async_scan_type::count_only;
_kv_count = response.kv_count;
}
}
_async_next_internal();
return;
} else if (get_rocksdb_server_error(response.error) == PERR_NOT_FOUND) {
Expand Down Expand Up @@ -288,7 +316,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c

for (auto &callback : temp) {
if (callback) {
callback(ret, std::string(), std::string(), std::string(), internal_info(info), 0);
callback(ret, std::string(), std::string(), std::string(), internal_info(info), 0, -1);
}
}
}
Expand Down Expand Up @@ -323,13 +351,15 @@ void pegasus_client_impl::pegasus_scanner_impl_wrapper::async_next(
std::string &&sort_key,
std::string &&value,
internal_info &&info,
uint32_t expire_ts_seconds) {
uint32_t expire_ts_seconds,
int32_t kv_count) {
user_callback(error_code,
std::move(hash_key),
std::move(sort_key),
std::move(value),
std::move(info),
expire_ts_seconds);
expire_ts_seconds,
kv_count);
});
}

Expand Down
3 changes: 2 additions & 1 deletion src/geo/lib/geo_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,8 @@ void geo_client::do_scan(pegasus_client::pegasus_scanner_wrapper scanner_wrapper
std::string &&geo_sort_key,
std::string &&value,
pegasus_client::internal_info &&info,
uint32_t expire_ts_seconds) mutable {
uint32_t expire_ts_seconds,
int32_t kv_count) mutable {
if (ret == PERR_SCAN_COMPLETE) {
cb();
return;
Expand Down
13 changes: 9 additions & 4 deletions src/include/pegasus/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ class pegasus_client
std::string sort_key_filter_pattern;
bool no_value; // only fetch hash_key and sort_key, but not fetch value
bool return_expire_ts;
bool only_return_count;
scan_options()
: timeout_ms(5000),
batch_size(100),
Expand All @@ -260,7 +261,8 @@ class pegasus_client
hash_key_filter_type(FT_NO_FILTER),
sort_key_filter_type(FT_NO_FILTER),
no_value(false),
return_expire_ts(false)
return_expire_ts(false),
only_return_count(false)
{
}
scan_options(const scan_options &o)
Expand All @@ -273,7 +275,8 @@ class pegasus_client
sort_key_filter_type(o.sort_key_filter_type),
sort_key_filter_pattern(o.sort_key_filter_pattern),
no_value(o.no_value),
return_expire_ts(o.return_expire_ts)
return_expire_ts(o.return_expire_ts),
only_return_count(o.only_return_count)
{
}
};
Expand Down Expand Up @@ -312,7 +315,8 @@ class pegasus_client
std::string && /*sort_key*/,
std::string && /*value*/,
internal_info && /*info*/,
uint32_t /*expire_ts_seconds*/)>
uint32_t /*expire_ts_seconds*/,
int32_t /*kv_count*/)>
async_scan_next_callback_t;
typedef std::function<void(int /*error_code*/, pegasus_scanner * /*hash_scanner*/)>
async_get_scanner_callback_t;
Expand Down Expand Up @@ -341,7 +345,8 @@ class pegasus_client
virtual int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
internal_info *info = nullptr) = 0;
internal_info *info = nullptr,
int32_t *count = nullptr) = 0;

///
/// \brief async get the next key-value pair of this scanner
Expand Down
7 changes: 5 additions & 2 deletions src/server/pegasus_scan_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ struct pegasus_scan_context
int32_t batch_size_,
bool no_value_,
bool validate_partition_hash_,
bool return_expire_ts_)
bool return_expire_ts_,
bool only_return_count_)
: _stop_holder(std::move(stop_)),
_hash_key_filter_pattern_holder(std::move(hash_key_filter_pattern_)),
_sort_key_filter_pattern_holder(std::move(sort_key_filter_pattern_)),
Expand All @@ -59,7 +60,8 @@ struct pegasus_scan_context
batch_size(batch_size_),
no_value(no_value_),
validate_partition_hash(validate_partition_hash_),
return_expire_ts(return_expire_ts_)
return_expire_ts(return_expire_ts_),
only_return_count(only_return_count_)
{
}

Expand All @@ -80,6 +82,7 @@ struct pegasus_scan_context
bool no_value;
bool validate_partition_hash;
bool return_expire_ts;
bool only_return_count;
};

class pegasus_context_cache
Expand Down
Loading