Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve performance of count_data #1091

Merged
merged 13 commits into from
Aug 30, 2022
2 changes: 2 additions & 0 deletions idl/rrdb.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ struct get_scanner_request
11:optional bool validate_partition_hash;
12:optional bool return_expire_ts;
13:optional bool full_scan; // true means client want to build 'full scan' context with the server side, false otherwise
14:optional bool only_return_count = false;
}

struct scan_request
Expand All @@ -294,6 +295,7 @@ struct scan_response
4:i32 app_id;
5:i32 partition_index;
6:string server;
7:optional i32 kv_count;
}

struct duplicate_request
Expand Down
16 changes: 16 additions & 0 deletions src/client_lib/pegasus_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ class pegasus_client_impl : public pegasus_client
std::string &value,
internal_info *info = nullptr) override;

int next(int32_t &count, internal_info *info = nullptr) override;

void async_next(async_scan_next_callback_t &&) override;

bool safe_destructible() const override;
Expand All @@ -277,6 +279,13 @@ class pegasus_client_impl : public pegasus_client
bool full_scan);

private:
enum class async_scan_type : char
{
NORMAL,
COUNT_ONLY,
COUNT_ONLY_FINISHED
};

::dsn::apps::rrdb_client *_client;
::dsn::blob _start_key;
::dsn::blob _stop_key;
Expand All @@ -287,13 +296,15 @@ class pegasus_client_impl : public pegasus_client
std::vector<::dsn::apps::key_value> _kvs;
internal_info _info;
int32_t _p;
int32_t _kv_count;

int64_t _context;
mutable ::dsn::zlock _lock;
std::list<async_scan_next_callback_t> _queue;
volatile bool _rpc_started;
bool _validate_partition_hash;
bool _full_scan;
async_scan_type _type;

void _async_next_internal();
void _start_scan();
Expand All @@ -320,6 +331,11 @@ class pegasus_client_impl : public pegasus_client

void async_next(async_scan_next_callback_t &&callback) override;

int next(int32_t &count, internal_info *info = nullptr) override
{
return _p->next(count, info);
}

int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
Expand Down
73 changes: 59 additions & 14 deletions src/client_lib/pegasus_scanner_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,38 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd
_options(options),
_splits_hash(std::move(hash)),
_p(-1),
_kv_count(-1),
_context(SCAN_CONTEXT_ID_COMPLETED),
_rpc_started(false),
_validate_partition_hash(validate_partition_hash),
_full_scan(full_scan)
_full_scan(full_scan),
_type(async_scan_type::NORMAL)
{
}

int pegasus_client_impl::pegasus_scanner_impl::next(int32_t &count, internal_info *info)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback = [&](int err,
std::string &&hash,
std::string &&sort,
std::string &&val,
internal_info &&ii,
uint32_t expire_ts_seconds,
int32_t kv_count) {
ret = err;
if (info != nullptr) {
*info = std::move(ii);
}
count = kv_count;
op_completed.notify();
};
async_next(std::move(callback));
op_completed.wait();
return ret;
}

int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
GehaFearless marked this conversation as resolved.
Show resolved Hide resolved
std::string &sortkey,
std::string &value,
Expand All @@ -70,7 +95,8 @@ int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
std::string &&sort,
std::string &&val,
internal_info &&ii,
uint32_t expire_ts_seconds) {
uint32_t expire_ts_seconds,
int32_t kv_count) {
ret = err;
hashkey = std::move(hash);
sortkey = std::move(sort);
Expand Down Expand Up @@ -120,7 +146,8 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()

std::list<async_scan_next_callback_t> temp;
while (true) {
while (++_p >= _kvs.size()) {
// count_only means should calculate kv counts once
while (++_p >= _kvs.size() && _type != async_scan_type::COUNT_ONLY) {
if (_context == SCAN_CONTEXT_ID_COMPLETED) {
// reach the end of one partition
if (_splits_hash.empty()) {
Expand All @@ -139,7 +166,8 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
std::string(),
std::string(),
std::move(info),
0);
0,
-1);
}
}
return;
Expand All @@ -162,13 +190,16 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
}

// valid data got
std::string hash_key, sort_key;
pegasus_restore_key(_kvs[_p].key, hash_key, sort_key);
std::string value(_kvs[_p].value.data(), _kvs[_p].value.length());
uint32_t expire_ts_seconds = _kvs[_p].__isset.expire_ts_seconds
? static_cast<uint32_t>(_kvs[_p].expire_ts_seconds)
: 0;
std::string hash_key, sort_key, value;
uint32_t expire_ts_seconds = 0;

if (!_options.only_return_count) {
pegasus_restore_key(_kvs[_p].key, hash_key, sort_key);
value = std::string(_kvs[_p].value.data(), _kvs[_p].value.length());
if (_kvs[_p].__isset.expire_ts_seconds) {
expire_ts_seconds = static_cast<uint32_t>(_kvs[_p].expire_ts_seconds);
}
}
auto &callback = _queue.front();
if (callback) {
internal_info info(_info);
Expand All @@ -178,7 +209,11 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
std::move(sort_key),
std::move(value),
std::move(info),
expire_ts_seconds);
expire_ts_seconds,
_kv_count);
if (_options.only_return_count) {
_type = async_scan_type::COUNT_ONLY_FINISHED;
}
_lock.lock();
if (_queue.size() == 1) {
// keep the last callback until exit this function
Expand Down Expand Up @@ -230,6 +265,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_start_scan()
req.__set_validate_partition_hash(_validate_partition_hash);
req.__set_return_expire_ts(_options.return_expire_ts);
req.__set_full_scan(_full_scan);
req.__set_only_return_count(_options.only_return_count);

dassert(!_rpc_started, "");
_rpc_started = true;
Expand Down Expand Up @@ -261,6 +297,13 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c
_kvs = std::move(response.kvs);
_p = -1;
_context = response.context_id;
// If `kv_count` exists in response, then:
// 1) server side supports only counting size, and
// 2) `kvs` in response must be empty
if (response.__isset.kv_count) {
_type = async_scan_type::COUNT_ONLY;
_kv_count = response.kv_count;
}
_async_next_internal();
return;
} else if (get_rocksdb_server_error(response.error) == PERR_NOT_FOUND) {
Expand Down Expand Up @@ -288,7 +331,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c

for (auto &callback : temp) {
if (callback) {
callback(ret, std::string(), std::string(), std::string(), internal_info(info), 0);
callback(ret, std::string(), std::string(), std::string(), internal_info(info), 0, -1);
}
}
}
Expand Down Expand Up @@ -323,13 +366,15 @@ void pegasus_client_impl::pegasus_scanner_impl_wrapper::async_next(
std::string &&sort_key,
std::string &&value,
internal_info &&info,
uint32_t expire_ts_seconds) {
uint32_t expire_ts_seconds,
int32_t kv_count) {
user_callback(error_code,
std::move(hash_key),
std::move(sort_key),
std::move(value),
std::move(info),
expire_ts_seconds);
expire_ts_seconds,
kv_count);
});
}

Expand Down
3 changes: 2 additions & 1 deletion src/geo/lib/geo_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,8 @@ void geo_client::do_scan(pegasus_client::pegasus_scanner_wrapper scanner_wrapper
std::string &&geo_sort_key,
std::string &&value,
pegasus_client::internal_info &&info,
uint32_t expire_ts_seconds) mutable {
uint32_t expire_ts_seconds,
int32_t kv_count) mutable {
if (ret == PERR_SCAN_COMPLETE) {
cb();
return;
Expand Down
25 changes: 22 additions & 3 deletions src/include/pegasus/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ class pegasus_client
std::string sort_key_filter_pattern;
bool no_value; // only fetch hash_key and sort_key, but not fetch value
bool return_expire_ts;
bool only_return_count;
scan_options()
: timeout_ms(5000),
batch_size(100),
Expand All @@ -260,7 +261,8 @@ class pegasus_client
hash_key_filter_type(FT_NO_FILTER),
sort_key_filter_type(FT_NO_FILTER),
no_value(false),
return_expire_ts(false)
return_expire_ts(false),
only_return_count(false)
{
}
scan_options(const scan_options &o)
Expand All @@ -273,7 +275,8 @@ class pegasus_client
sort_key_filter_type(o.sort_key_filter_type),
sort_key_filter_pattern(o.sort_key_filter_pattern),
no_value(o.no_value),
return_expire_ts(o.return_expire_ts)
return_expire_ts(o.return_expire_ts),
only_return_count(o.only_return_count)
{
}
};
Expand Down Expand Up @@ -312,7 +315,8 @@ class pegasus_client
std::string && /*sort_key*/,
std::string && /*value*/,
internal_info && /*info*/,
uint32_t /*expire_ts_seconds*/)>
uint32_t /*expire_ts_seconds*/,
int32_t /*kv_count*/)>
async_scan_next_callback_t;
typedef std::function<void(int /*error_code*/, pegasus_scanner * /*hash_scanner*/)>
async_get_scanner_callback_t;
Expand Down Expand Up @@ -343,6 +347,21 @@ class pegasus_client
std::string &value,
internal_info *info = nullptr) = 0;

///
/// \brief get the next k-v pair count of this scanner
// only used for scanner which option only_return_count is true
/// thread-safe
/// \param count
/// data count value
/// \return
/// int, the error indicates whether or not the operation is succeeded.
/// this error can be converted to a string using get_error_string()
/// PERR_OK means a valid k-v pair count got
/// PERR_SCAN_COMPLETE means all k-v pair count have been return before this call
/// otherwise some error orrured
///
virtual int next(int32_t &count, internal_info *info = nullptr) = 0;

///
/// \brief async get the next key-value pair of this scanner
/// thread-safe
Expand Down
7 changes: 5 additions & 2 deletions src/server/pegasus_scan_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ struct pegasus_scan_context
int32_t batch_size_,
bool no_value_,
bool validate_partition_hash_,
bool return_expire_ts_)
bool return_expire_ts_,
bool only_return_count_)
: _stop_holder(std::move(stop_)),
_hash_key_filter_pattern_holder(std::move(hash_key_filter_pattern_)),
_sort_key_filter_pattern_holder(std::move(sort_key_filter_pattern_)),
Expand All @@ -59,7 +60,8 @@ struct pegasus_scan_context
batch_size(batch_size_),
no_value(no_value_),
validate_partition_hash(validate_partition_hash_),
return_expire_ts(return_expire_ts_)
return_expire_ts(return_expire_ts_),
only_return_count(only_return_count_)
{
}

Expand All @@ -80,6 +82,7 @@ struct pegasus_scan_context
bool no_value;
bool validate_partition_hash;
bool return_expire_ts;
bool only_return_count;
};

class pegasus_context_cache
Expand Down
Loading