Skip to content

Commit

Permalink
feat: improve performance of count_data
Browse files Browse the repository at this point in the history
  • Loading branch information
GehaFearless committed Aug 1, 2022
1 parent 422cb2d commit 12798d8
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 73 deletions.
3 changes: 3 additions & 0 deletions idl/rrdb.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,13 @@ struct get_scanner_request
11:optional bool validate_partition_hash;
12:optional bool return_expire_ts;
13:optional bool full_scan; // true means client want to build 'full scan' context with the server side, false otherwise
14:optional bool only_return_count;
}

struct scan_request
{
1:i64 context_id;
2:optional bool only_return_count;
}

struct scan_response
Expand All @@ -294,6 +296,7 @@ struct scan_response
4:i32 app_id;
5:i32 partition_index;
6:string server;
7:optional i32 kv_count = -1;
}

struct duplicate_request
Expand Down
14 changes: 14 additions & 0 deletions src/client_lib/pegasus_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@ class pegasus_client_impl : public pegasus_client
std::string &value,
internal_info *info = nullptr) override;

int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
int32_t &count_number) override;

void async_next(async_scan_next_callback_t &&) override;

bool safe_destructible() const override;
Expand Down Expand Up @@ -287,6 +292,7 @@ class pegasus_client_impl : public pegasus_client
std::vector<::dsn::apps::key_value> _kvs;
internal_info _info;
int32_t _p;
int32_t _kv_count;

int64_t _context;
mutable ::dsn::zlock _lock;
Expand Down Expand Up @@ -320,6 +326,14 @@ class pegasus_client_impl : public pegasus_client

void async_next(async_scan_next_callback_t &&callback) override;

int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
int32_t &count_number) override
{
return _p->next(hashkey, sortkey, value, count_number);
}

int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
Expand Down
63 changes: 50 additions & 13 deletions src/client_lib/pegasus_scanner_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,34 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd
_rpc_started(false),
_validate_partition_hash(validate_partition_hash),
_full_scan(full_scan)

{
}

int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
std::string &sortkey,
std::string &value,
int32_t &count_number)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback = [&](int err,
std::string &&hash,
std::string &&sort,
std::string &&val,
internal_info &&ii,
uint32_t expire_ts_seconds,
int32_t kv_count) {
ret = err;
hashkey = std::move(hash);
sortkey = std::move(sort);
value = std::move(val);
count_number = kv_count;
op_completed.notify();
};
async_next(std::move(callback));
op_completed.wait();
return ret;
}

int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
Expand All @@ -70,7 +97,8 @@ int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
std::string &&sort,
std::string &&val,
internal_info &&ii,
uint32_t expire_ts_seconds) {
uint32_t expire_ts_seconds,
int32_t kv_count) {
ret = err;
hashkey = std::move(hash);
sortkey = std::move(sort);
Expand Down Expand Up @@ -139,7 +167,8 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
std::string(),
std::string(),
std::move(info),
0);
0,
-1);
}
}
return;
Expand All @@ -162,13 +191,15 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
}

// valid data got
std::string hash_key, sort_key;
pegasus_restore_key(_kvs[_p].key, hash_key, sort_key);
std::string value(_kvs[_p].value.data(), _kvs[_p].value.length());
uint32_t expire_ts_seconds = _kvs[_p].__isset.expire_ts_seconds
? static_cast<uint32_t>(_kvs[_p].expire_ts_seconds)
: 0;

std::string hash_key, sort_key, value = "";
uint32_t expire_ts_seconds = 0;
if (_kv_count == -1) {
pegasus_restore_key(_kvs[_p].key, hash_key, sort_key);
value = std::string(_kvs[_p].value.data(), _kvs[_p].value.length());
if (_kvs[_p].__isset.expire_ts_seconds) {
expire_ts_seconds = static_cast<uint32_t>(_kvs[_p].expire_ts_seconds);
}
}
auto &callback = _queue.front();
if (callback) {
internal_info info(_info);
Expand All @@ -178,7 +209,8 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
std::move(sort_key),
std::move(value),
std::move(info),
expire_ts_seconds);
expire_ts_seconds,
_kv_count);
_lock.lock();
if (_queue.size() == 1) {
// keep the last callback until exit this function
Expand All @@ -196,6 +228,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_next_batch()
{
::dsn::apps::scan_request req;
req.context_id = _context;
req.__set_only_return_count(_options.only_return_count);

dassert(!_rpc_started, "");
_rpc_started = true;
Expand Down Expand Up @@ -230,6 +263,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_start_scan()
req.__set_validate_partition_hash(_validate_partition_hash);
req.__set_return_expire_ts(_options.return_expire_ts);
req.__set_full_scan(_full_scan);
req.__set_only_return_count(_options.only_return_count);

dassert(!_rpc_started, "");
_rpc_started = true;
Expand Down Expand Up @@ -261,6 +295,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c
_kvs = std::move(response.kvs);
_p = -1;
_context = response.context_id;
_kv_count = response.kv_count;
_async_next_internal();
return;
} else if (get_rocksdb_server_error(response.error) == PERR_NOT_FOUND) {
Expand Down Expand Up @@ -288,7 +323,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c

for (auto &callback : temp) {
if (callback) {
callback(ret, std::string(), std::string(), std::string(), internal_info(info), 0);
callback(ret, std::string(), std::string(), std::string(), internal_info(info), 0, -1);
}
}
}
Expand Down Expand Up @@ -323,13 +358,15 @@ void pegasus_client_impl::pegasus_scanner_impl_wrapper::async_next(
std::string &&sort_key,
std::string &&value,
internal_info &&info,
uint32_t expire_ts_seconds) {
uint32_t expire_ts_seconds,
int32_t kv_count) {
user_callback(error_code,
std::move(hash_key),
std::move(sort_key),
std::move(value),
std::move(info),
expire_ts_seconds);
expire_ts_seconds,
kv_count);
});
}

Expand Down
3 changes: 2 additions & 1 deletion src/geo/lib/geo_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,8 @@ void geo_client::do_scan(pegasus_client::pegasus_scanner_wrapper scanner_wrapper
std::string &&geo_sort_key,
std::string &&value,
pegasus_client::internal_info &&info,
uint32_t expire_ts_seconds) mutable {
uint32_t expire_ts_seconds,
int32_t kv_count) mutable {
if (ret == PERR_SCAN_COMPLETE) {
cb();
return;
Expand Down
15 changes: 12 additions & 3 deletions src/include/pegasus/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ class pegasus_client
std::string sort_key_filter_pattern;
bool no_value; // only fetch hash_key and sort_key, but not fetch value
bool return_expire_ts;
bool only_return_count;
scan_options()
: timeout_ms(5000),
batch_size(100),
Expand All @@ -260,7 +261,8 @@ class pegasus_client
hash_key_filter_type(FT_NO_FILTER),
sort_key_filter_type(FT_NO_FILTER),
no_value(false),
return_expire_ts(false)
return_expire_ts(false),
only_return_count(false)
{
}
scan_options(const scan_options &o)
Expand All @@ -273,7 +275,8 @@ class pegasus_client
sort_key_filter_type(o.sort_key_filter_type),
sort_key_filter_pattern(o.sort_key_filter_pattern),
no_value(o.no_value),
return_expire_ts(o.return_expire_ts)
return_expire_ts(o.return_expire_ts),
only_return_count(o.only_return_count)
{
}
};
Expand Down Expand Up @@ -312,7 +315,8 @@ class pegasus_client
std::string && /*sort_key*/,
std::string && /*value*/,
internal_info && /*info*/,
uint32_t /*expire_ts_seconds*/)>
uint32_t /*expire_ts_seconds*/,
int32_t /*kv_count*/)>
async_scan_next_callback_t;
typedef std::function<void(int /*error_code*/, pegasus_scanner * /*hash_scanner*/)>
async_get_scanner_callback_t;
Expand Down Expand Up @@ -343,6 +347,11 @@ class pegasus_client
std::string &value,
internal_info *info = nullptr) = 0;

virtual int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
int32_t &count_number) = 0;

///
/// \brief async get the next key-value pair of this scanner
/// thread-safe
Expand Down
62 changes: 41 additions & 21 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1198,7 +1198,8 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
epoch_now,
request.no_value,
request.__isset.validate_partition_hash ? request.validate_partition_hash : true,
return_expire_ts);
return_expire_ts,
request.only_return_count ? false : true);
switch (state) {
case range_iteration_state::kNormal:
count++;
Expand All @@ -1221,6 +1222,10 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)

it->Next();
}
if (request.only_return_count) {
resp.kvs.emplace_back(::dsn::apps::key_value());
resp.__set_kv_count(count);
}

// check iteration time whether exceed limit
if (!complete) {
Expand Down Expand Up @@ -1297,7 +1302,10 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
_pfc_recent_filter_count->add(filter_count);
}

_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
// abandon calculate capacity unit
if (!request.only_return_count) {
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
}
_pfc_scan_latency->set(dsn_now_ns() - start_time);
}

Expand Down Expand Up @@ -1365,7 +1373,8 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
epoch_now,
no_value,
validate_hash,
return_expire_ts);
return_expire_ts,
request.only_return_count ? false : true);
switch (state) {
case range_iteration_state::kNormal:
count++;
Expand All @@ -1389,6 +1398,11 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
it->Next();
}

if (request.only_return_count) {
resp.kvs.emplace_back(::dsn::apps::key_value());
resp.__set_kv_count(count);
}

// check iteration time whether exceed limit
if (!complete) {
limiter->time_check_after_incomplete_scan();
Expand Down Expand Up @@ -1449,7 +1463,10 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
resp.error = rocksdb::Status::Code::kNotFound;
}

_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
// abandon calculate capacity unit
if (request.only_return_count) {
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
}
_pfc_scan_latency->set(dsn_now_ns() - start_time);
}

Expand Down Expand Up @@ -2274,7 +2291,8 @@ pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_valu
uint32_t epoch_now,
bool no_value,
bool request_validate_hash,
bool request_expire_ts)
bool request_expire_ts,
bool fill_value)
{
if (check_if_record_expired(epoch_now, value)) {
if (_verbose_log) {
Expand All @@ -2293,8 +2311,6 @@ pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_valu
}
}

::dsn::apps::key_value kv;

// extract raw key
::dsn::blob raw_key(key.data(), 0, key.size());
if (hash_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER ||
Expand All @@ -2316,24 +2332,28 @@ pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_valu
return range_iteration_state::kFiltered;
}
}
std::shared_ptr<char> key_buf(::dsn::utils::make_shared_array<char>(raw_key.length()));
::memcpy(key_buf.get(), raw_key.data(), raw_key.length());
kv.key.assign(std::move(key_buf), 0, raw_key.length());
if (fill_value) {
::dsn::apps::key_value kv;

// extract expire ts if necessary
if (request_expire_ts) {
auto expire_ts_seconds =
pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(value));
kv.__set_expire_ts_seconds(static_cast<int32_t>(expire_ts_seconds));
}
std::shared_ptr<char> key_buf(::dsn::utils::make_shared_array<char>(raw_key.length()));
::memcpy(key_buf.get(), raw_key.data(), raw_key.length());
kv.key.assign(std::move(key_buf), 0, raw_key.length());

// extract value
if (!no_value) {
std::string value_buf(value.data(), value.size());
pegasus_extract_user_data(_pegasus_data_version, std::move(value_buf), kv.value);
// extract expire ts if necessary
if (request_expire_ts) {
auto expire_ts_seconds =
pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(value));
kv.__set_expire_ts_seconds(static_cast<int32_t>(expire_ts_seconds));
}

// extract value
if (!no_value) {
std::string value_buf(value.data(), value.size());
pegasus_extract_user_data(_pegasus_data_version, std::move(value_buf), kv.value);
}
kvs.emplace_back(std::move(kv));
}

kvs.emplace_back(std::move(kv));
return range_iteration_state::kNormal;
}

Expand Down
3 changes: 2 additions & 1 deletion src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ class pegasus_server_impl : public pegasus_read_service
uint32_t epoch_now,
bool no_value,
bool request_validate_hash,
bool request_expire_ts);
bool request_expire_ts,
bool fill_value);

range_iteration_state
append_key_value_for_multi_get(std::vector<::dsn::apps::key_value> &kvs,
Expand Down
Loading

0 comments on commit 12798d8

Please sign in to comment.