Skip to content

Commit

Permalink
feat: improve performance of count_data (#1091)
Browse files Browse the repository at this point in the history
  • Loading branch information
GehaFearless authored Aug 30, 2022
1 parent 689fd6f commit 7e26c9f
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 67 deletions.
2 changes: 2 additions & 0 deletions idl/rrdb.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ struct get_scanner_request
11:optional bool validate_partition_hash;
12:optional bool return_expire_ts;
13:optional bool full_scan; // true means client want to build 'full scan' context with the server side, false otherwise
14:optional bool only_return_count = false;
}

struct scan_request
Expand All @@ -294,6 +295,7 @@ struct scan_response
4:i32 app_id;
5:i32 partition_index;
6:string server;
7:optional i32 kv_count;
}

struct duplicate_request
Expand Down
16 changes: 16 additions & 0 deletions src/client_lib/pegasus_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ class pegasus_client_impl : public pegasus_client
std::string &value,
internal_info *info = nullptr) override;

int next(int32_t &count, internal_info *info = nullptr) override;

void async_next(async_scan_next_callback_t &&) override;

bool safe_destructible() const override;
Expand All @@ -277,6 +279,13 @@ class pegasus_client_impl : public pegasus_client
bool full_scan);

private:
enum class async_scan_type : char
{
NORMAL,
COUNT_ONLY,
COUNT_ONLY_FINISHED
};

::dsn::apps::rrdb_client *_client;
::dsn::blob _start_key;
::dsn::blob _stop_key;
Expand All @@ -287,13 +296,15 @@ class pegasus_client_impl : public pegasus_client
std::vector<::dsn::apps::key_value> _kvs;
internal_info _info;
int32_t _p;
int32_t _kv_count;

int64_t _context;
mutable ::dsn::zlock _lock;
std::list<async_scan_next_callback_t> _queue;
volatile bool _rpc_started;
bool _validate_partition_hash;
bool _full_scan;
async_scan_type _type;

void _async_next_internal();
void _start_scan();
Expand All @@ -320,6 +331,11 @@ class pegasus_client_impl : public pegasus_client

void async_next(async_scan_next_callback_t &&callback) override;

int next(int32_t &count, internal_info *info = nullptr) override
{
return _p->next(count, info);
}

int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
Expand Down
73 changes: 59 additions & 14 deletions src/client_lib/pegasus_scanner_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,38 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd
_options(options),
_splits_hash(std::move(hash)),
_p(-1),
_kv_count(-1),
_context(SCAN_CONTEXT_ID_COMPLETED),
_rpc_started(false),
_validate_partition_hash(validate_partition_hash),
_full_scan(full_scan)
_full_scan(full_scan),
_type(async_scan_type::NORMAL)
{
}

int pegasus_client_impl::pegasus_scanner_impl::next(int32_t &count, internal_info *info)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback = [&](int err,
std::string &&hash,
std::string &&sort,
std::string &&val,
internal_info &&ii,
uint32_t expire_ts_seconds,
int32_t kv_count) {
ret = err;
if (info != nullptr) {
*info = std::move(ii);
}
count = kv_count;
op_completed.notify();
};
async_next(std::move(callback));
op_completed.wait();
return ret;
}

int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
std::string &sortkey,
std::string &value,
Expand All @@ -70,7 +95,8 @@ int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
std::string &&sort,
std::string &&val,
internal_info &&ii,
uint32_t expire_ts_seconds) {
uint32_t expire_ts_seconds,
int32_t kv_count) {
ret = err;
hashkey = std::move(hash);
sortkey = std::move(sort);
Expand Down Expand Up @@ -120,7 +146,8 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()

std::list<async_scan_next_callback_t> temp;
while (true) {
while (++_p >= _kvs.size()) {
// count_only means should calculate kv counts once
while (++_p >= _kvs.size() && _type != async_scan_type::COUNT_ONLY) {
if (_context == SCAN_CONTEXT_ID_COMPLETED) {
// reach the end of one partition
if (_splits_hash.empty()) {
Expand All @@ -139,7 +166,8 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
std::string(),
std::string(),
std::move(info),
0);
0,
-1);
}
}
return;
Expand All @@ -162,13 +190,16 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
}

// valid data got
std::string hash_key, sort_key;
pegasus_restore_key(_kvs[_p].key, hash_key, sort_key);
std::string value(_kvs[_p].value.data(), _kvs[_p].value.length());
uint32_t expire_ts_seconds = _kvs[_p].__isset.expire_ts_seconds
? static_cast<uint32_t>(_kvs[_p].expire_ts_seconds)
: 0;
std::string hash_key, sort_key, value;
uint32_t expire_ts_seconds = 0;

if (!_options.only_return_count) {
pegasus_restore_key(_kvs[_p].key, hash_key, sort_key);
value = std::string(_kvs[_p].value.data(), _kvs[_p].value.length());
if (_kvs[_p].__isset.expire_ts_seconds) {
expire_ts_seconds = static_cast<uint32_t>(_kvs[_p].expire_ts_seconds);
}
}
auto &callback = _queue.front();
if (callback) {
internal_info info(_info);
Expand All @@ -178,7 +209,11 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
std::move(sort_key),
std::move(value),
std::move(info),
expire_ts_seconds);
expire_ts_seconds,
_kv_count);
if (_options.only_return_count) {
_type = async_scan_type::COUNT_ONLY_FINISHED;
}
_lock.lock();
if (_queue.size() == 1) {
// keep the last callback until exit this function
Expand Down Expand Up @@ -230,6 +265,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_start_scan()
req.__set_validate_partition_hash(_validate_partition_hash);
req.__set_return_expire_ts(_options.return_expire_ts);
req.__set_full_scan(_full_scan);
req.__set_only_return_count(_options.only_return_count);

dassert(!_rpc_started, "");
_rpc_started = true;
Expand Down Expand Up @@ -261,6 +297,13 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c
_kvs = std::move(response.kvs);
_p = -1;
_context = response.context_id;
// If `kv_count` exists in response, then:
// 1) server side supports only counting size, and
// 2) `kvs` in response must be empty
if (response.__isset.kv_count) {
_type = async_scan_type::COUNT_ONLY;
_kv_count = response.kv_count;
}
_async_next_internal();
return;
} else if (get_rocksdb_server_error(response.error) == PERR_NOT_FOUND) {
Expand Down Expand Up @@ -288,7 +331,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c

for (auto &callback : temp) {
if (callback) {
callback(ret, std::string(), std::string(), std::string(), internal_info(info), 0);
callback(ret, std::string(), std::string(), std::string(), internal_info(info), 0, -1);
}
}
}
Expand Down Expand Up @@ -323,13 +366,15 @@ void pegasus_client_impl::pegasus_scanner_impl_wrapper::async_next(
std::string &&sort_key,
std::string &&value,
internal_info &&info,
uint32_t expire_ts_seconds) {
uint32_t expire_ts_seconds,
int32_t kv_count) {
user_callback(error_code,
std::move(hash_key),
std::move(sort_key),
std::move(value),
std::move(info),
expire_ts_seconds);
expire_ts_seconds,
kv_count);
});
}

Expand Down
3 changes: 2 additions & 1 deletion src/geo/lib/geo_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,8 @@ void geo_client::do_scan(pegasus_client::pegasus_scanner_wrapper scanner_wrapper
std::string &&geo_sort_key,
std::string &&value,
pegasus_client::internal_info &&info,
uint32_t expire_ts_seconds) mutable {
uint32_t expire_ts_seconds,
int32_t kv_count) mutable {
if (ret == PERR_SCAN_COMPLETE) {
cb();
return;
Expand Down
25 changes: 22 additions & 3 deletions src/include/pegasus/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ class pegasus_client
std::string sort_key_filter_pattern;
bool no_value; // only fetch hash_key and sort_key, but not fetch value
bool return_expire_ts;
bool only_return_count;
scan_options()
: timeout_ms(5000),
batch_size(100),
Expand All @@ -260,7 +261,8 @@ class pegasus_client
hash_key_filter_type(FT_NO_FILTER),
sort_key_filter_type(FT_NO_FILTER),
no_value(false),
return_expire_ts(false)
return_expire_ts(false),
only_return_count(false)
{
}
scan_options(const scan_options &o)
Expand All @@ -273,7 +275,8 @@ class pegasus_client
sort_key_filter_type(o.sort_key_filter_type),
sort_key_filter_pattern(o.sort_key_filter_pattern),
no_value(o.no_value),
return_expire_ts(o.return_expire_ts)
return_expire_ts(o.return_expire_ts),
only_return_count(o.only_return_count)
{
}
};
Expand Down Expand Up @@ -312,7 +315,8 @@ class pegasus_client
std::string && /*sort_key*/,
std::string && /*value*/,
internal_info && /*info*/,
uint32_t /*expire_ts_seconds*/)>
uint32_t /*expire_ts_seconds*/,
int32_t /*kv_count*/)>
async_scan_next_callback_t;
typedef std::function<void(int /*error_code*/, pegasus_scanner * /*hash_scanner*/)>
async_get_scanner_callback_t;
Expand Down Expand Up @@ -343,6 +347,21 @@ class pegasus_client
std::string &value,
internal_info *info = nullptr) = 0;

///
/// \brief get the next k-v pair count of this scanner
// only used for scanner which option only_return_count is true
/// thread-safe
/// \param count
/// data count value
/// \return
/// int, the error indicates whether or not the operation is succeeded.
/// this error can be converted to a string using get_error_string()
/// PERR_OK means a valid k-v pair count got
/// PERR_SCAN_COMPLETE means all k-v pair count have been return before this call
/// otherwise some error orrured
///
virtual int next(int32_t &count, internal_info *info = nullptr) = 0;

///
/// \brief async get the next key-value pair of this scanner
/// thread-safe
Expand Down
7 changes: 5 additions & 2 deletions src/server/pegasus_scan_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ struct pegasus_scan_context
int32_t batch_size_,
bool no_value_,
bool validate_partition_hash_,
bool return_expire_ts_)
bool return_expire_ts_,
bool only_return_count_)
: _stop_holder(std::move(stop_)),
_hash_key_filter_pattern_holder(std::move(hash_key_filter_pattern_)),
_sort_key_filter_pattern_holder(std::move(sort_key_filter_pattern_)),
Expand All @@ -59,7 +60,8 @@ struct pegasus_scan_context
batch_size(batch_size_),
no_value(no_value_),
validate_partition_hash(validate_partition_hash_),
return_expire_ts(return_expire_ts_)
return_expire_ts(return_expire_ts_),
only_return_count(only_return_count_)
{
}

Expand All @@ -80,6 +82,7 @@ struct pegasus_scan_context
bool no_value;
bool validate_partition_hash;
bool return_expire_ts;
bool only_return_count;
};

class pegasus_context_cache
Expand Down
Loading

0 comments on commit 7e26c9f

Please sign in to comment.