Skip to content

Commit

Permalink
cr4
Browse files Browse the repository at this point in the history
  • Loading branch information
GehaFearless committed Aug 16, 2022
1 parent 501ce57 commit 12d1ec6
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 13 deletions.
2 changes: 2 additions & 0 deletions src/client_lib/pegasus_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ class pegasus_client_impl : public pegasus_client
volatile bool _rpc_started;
bool _validate_partition_hash;
bool _full_scan;
bool _only_calculate_count;
bool _already_add_count;

void _async_next_internal();
void _start_scan();
Expand Down
27 changes: 22 additions & 5 deletions src/client_lib/pegasus_scanner_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,13 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd
_options(options),
_splits_hash(std::move(hash)),
_p(-1),
_kv_count(-1),
_context(SCAN_CONTEXT_ID_COMPLETED),
_rpc_started(false),
_validate_partition_hash(validate_partition_hash),
_full_scan(full_scan)
_full_scan(full_scan),
_only_calculate_count(false),
_already_add_count(true)
{
}

Expand Down Expand Up @@ -125,7 +128,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()

std::list<async_scan_next_callback_t> temp;
while (true) {
while (++_p >= _kvs.size()) {
while (++_p >= _kvs.size() && _already_add_count) {
if (_context == SCAN_CONTEXT_ID_COMPLETED) {
// reach the end of one partition
if (_splits_hash.empty()) {
Expand Down Expand Up @@ -170,8 +173,8 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
// valid data got
std::string hash_key, sort_key, value;
uint32_t expire_ts_seconds = 0;
// _kv_count > -1 means req just want to get data counts, not include data value
if (_kv_count == -1) {

if (!_only_calculate_count) {
pegasus_restore_key(_kvs[_p].key, hash_key, sort_key);
value = std::string(_kvs[_p].value.data(), _kvs[_p].value.length());
if (_kvs[_p].__isset.expire_ts_seconds) {
Expand All @@ -189,6 +192,9 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
std::move(info),
expire_ts_seconds,
_kv_count);
if (_only_calculate_count) {
_already_add_count = true;
}
_lock.lock();
if (_queue.size() == 1) {
// keep the last callback until exit this function
Expand Down Expand Up @@ -272,7 +278,18 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c
_kvs = std::move(response.kvs);
_p = -1;
_context = response.context_id;
_kv_count = response.kv_count;
// 1. kv_count exist on response means server is newer version (added counting size only
// implementation)
// 1> kv_count == -1 indicates response still have key and value data
// 2> kv_count > -1 indicates response only have kv size count, but not key && value
// 2. kv_count is not existed means server is older version
if (response.__isset.kv_count) {
if (response.kv_count != -1) {
_only_calculate_count = true;
_already_add_count = false;
_kv_count = response.kv_count;
}
}
_async_next_internal();
return;
} else if (get_rocksdb_server_error(response.error) == PERR_NOT_FOUND) {
Expand Down
10 changes: 4 additions & 6 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
resp.kvs.reserve(batch_count);

bool return_expire_ts = request.__isset.return_expire_ts ? request.return_expire_ts : false;
bool only_return_count = request.__isset.only_return_count ? request.only_return_count : false;

std::unique_ptr<range_read_limiter> limiter =
dsn::make_unique<range_read_limiter>(_rng_rd_opts.rocksdb_max_iteration_count,
Expand Down Expand Up @@ -1200,7 +1201,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
switch (state) {
case range_iteration_state::kNormal:
count++;
if (!request.only_return_count) {
if (!only_return_count) {
append_key_value(
resp.kvs, it->key(), it->value(), request.no_value, return_expire_ts);
}
Expand All @@ -1223,9 +1224,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)

it->Next();
}
if (request.only_return_count) {
// add a null data avoid to refactor the client code
resp.kvs.emplace_back(::dsn::apps::key_value());
if (only_return_count) {
resp.__set_kv_count(count);
}

Expand Down Expand Up @@ -1282,7 +1281,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
request.no_value,
request.__isset.validate_partition_hash ? request.validate_partition_hash : true,
return_expire_ts,
request.only_return_count));
only_return_count));
int64_t handle = _context_cache.put(std::move(context));
resp.context_id = handle;
// if the context is used, it will be fetched and re-put into cache,
Expand Down Expand Up @@ -1401,7 +1400,6 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
}

if (only_return_count) {
resp.kvs.emplace_back(::dsn::apps::key_value());
resp.__set_kv_count(count);
}

Expand Down
2 changes: 1 addition & 1 deletion src/shell/command_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ inline void scan_data_next(scan_data_context *context)
uint32_t expire_ts_seconds,
int32_t kv_count) {
if (ret == pegasus::PERR_OK) {
if (kv_count == -1 || validate_filter(context, sort_key, value)) {
if (kv_count != -1 || validate_filter(context, sort_key, value)) {
bool ts_expired = false;
int ttl_seconds = 0;
switch (context->op) {
Expand Down
1 change: 0 additions & 1 deletion src/test/function_test/test_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ TEST_F(scan, OVERALL_COUNT_ONLY)
std::string sort_key;
std::string value;
int32_t data_counts = 0;
std::map<std::string, std::map<std::string, std::string>> data;
for (auto scanner : scanners) {
ASSERT_NE(nullptr, scanner);
int32_t kv_count;
Expand Down

0 comments on commit 12d1ec6

Please sign in to comment.