Skip to content

Commit

Permalink
cr6
Browse files Browse the repository at this point in the history
  • Loading branch information
GehaFearless committed Aug 18, 2022
1 parent f45b0d7 commit b0585c3
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 8 deletions.
9 changes: 8 additions & 1 deletion src/client_lib/pegasus_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,13 @@ class pegasus_client_impl : public pegasus_client

static void init_error();

enum class async_scan_type : char
{
normal,
count_only,
count_only_finished
};

class pegasus_scanner_impl : public pegasus_scanner
{
public:
Expand Down Expand Up @@ -296,7 +303,7 @@ class pegasus_client_impl : public pegasus_client
volatile bool _rpc_started;
bool _validate_partition_hash;
bool _full_scan;
bool _already_add_count;
async_scan_type _type;

void _async_next_internal();
void _start_scan();
Expand Down
9 changes: 5 additions & 4 deletions src/client_lib/pegasus_scanner_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd
_rpc_started(false),
_validate_partition_hash(validate_partition_hash),
_full_scan(full_scan),
_already_add_count(true)
_type(async_scan_type::normal)
{
}

Expand Down Expand Up @@ -127,7 +127,8 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()

std::list<async_scan_next_callback_t> temp;
while (true) {
while (++_p >= _kvs.size() && _already_add_count) {
// count_only means should calculate kv counts once
while (++_p >= _kvs.size() && _type != async_scan_type::count_only) {
if (_context == SCAN_CONTEXT_ID_COMPLETED) {
// reach the end of one partition
if (_splits_hash.empty()) {
Expand Down Expand Up @@ -192,7 +193,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
expire_ts_seconds,
_kv_count);
if (_options.only_return_count) {
_already_add_count = true;
_type = async_scan_type::count_only_finished;
}
_lock.lock();
if (_queue.size() == 1) {
Expand Down Expand Up @@ -284,7 +285,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c
// 2. kv_count is not existed means server is older version
if (response.__isset.kv_count) {
if (response.kv_count != -1) {
_already_add_count = false;
_type = async_scan_type::count_only;
_kv_count = response.kv_count;
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/shell/commands/data_operations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2333,7 +2333,6 @@ bool count_data(command_executor *e, shell_context *sc, arguments args)
tp.output(std::cout, tp_output_format::kTabular);
return true;
}
options.only_return_count = true;

if (max_batch_count <= 1) {
fprintf(stderr, "ERROR: max_batch_count should be greater than 1\n");
Expand Down Expand Up @@ -2404,8 +2403,8 @@ bool count_data(command_executor *e, shell_context *sc, arguments args)
if (diff_hash_key || stat_size || value_filter_type != pegasus::pegasus_client::FT_NO_FILTER ||
sort_key_filter_type == pegasus::pegasus_client::FT_MATCH_EXACT) {
options.only_return_count = false;
}
if (options.only_return_count) {
} else {
options.only_return_count = true;
fprintf(stderr, "INFO: scanner only return kv count, not return value\n");
}

Expand Down

0 comments on commit b0585c3

Please sign in to comment.