From 42ad01683afcc9f4cbd14a5b2385990c7ff5fdc1 Mon Sep 17 00:00:00 2001 From: liguohao <948193394@qq.com> Date: Thu, 28 Jul 2022 15:30:18 +0800 Subject: [PATCH] improve performance of count_data --- idl/rrdb.thrift | 3 ++ src/client_lib/pegasus_client_impl.h | 14 +++++ src/client_lib/pegasus_scanner_impl.cpp | 63 ++++++++++++++++++----- src/geo/lib/geo_client.cpp | 3 +- src/include/pegasus/client.h | 15 ++++-- src/server/pegasus_server_impl.cpp | 62 ++++++++++++++-------- src/server/pegasus_server_impl.h | 3 +- src/shell/command_helper.h | 68 ++++++++++++++----------- src/shell/commands/data_operations.cpp | 14 ++++- src/shell/main.cpp | 3 +- src/test/function_test/test_scan.cpp | 32 +++++++++++- 11 files changed, 207 insertions(+), 73 deletions(-) diff --git a/idl/rrdb.thrift b/idl/rrdb.thrift index 3fc82803d7..fbeb552755 100644 --- a/idl/rrdb.thrift +++ b/idl/rrdb.thrift @@ -279,11 +279,13 @@ struct get_scanner_request 11:optional bool validate_partition_hash; 12:optional bool return_expire_ts; 13:optional bool full_scan; // true means client want to build 'full scan' context with the server side, false otherwise + 14:optional bool only_return_count; } struct scan_request { 1:i64 context_id; + 2:optional bool only_return_count; } struct scan_response @@ -294,6 +296,7 @@ struct scan_response 4:i32 app_id; 5:i32 partition_index; 6:string server; + 7:optional i32 kv_count = -1; } struct duplicate_request diff --git a/src/client_lib/pegasus_client_impl.h b/src/client_lib/pegasus_client_impl.h index adbbc715ab..9160d90b00 100644 --- a/src/client_lib/pegasus_client_impl.h +++ b/src/client_lib/pegasus_client_impl.h @@ -255,6 +255,11 @@ class pegasus_client_impl : public pegasus_client std::string &value, internal_info *info = nullptr) override; + int next(std::string &hashkey, + std::string &sortkey, + std::string &value, + int32_t &count_number) override; + void async_next(async_scan_next_callback_t &&) override; bool safe_destructible() const override; @@ -287,6 +292,7 @@ class pegasus_client_impl : public pegasus_client std::vector<::dsn::apps::key_value> _kvs; internal_info _info; int32_t _p; + int32_t _kv_count; int64_t _context; mutable ::dsn::zlock _lock; @@ -320,6 +326,14 @@ class pegasus_client_impl : public pegasus_client void async_next(async_scan_next_callback_t &&callback) override; + int next(std::string &hashkey, + std::string &sortkey, + std::string &value, + int32_t &count_number) override + { + return _p->next(hashkey, sortkey, value, count_number); + } + int next(std::string &hashkey, std::string &sortkey, std::string &value, diff --git a/src/client_lib/pegasus_scanner_impl.cpp b/src/client_lib/pegasus_scanner_impl.cpp index ce9baa5087..801d830422 100644 --- a/src/client_lib/pegasus_scanner_impl.cpp +++ b/src/client_lib/pegasus_scanner_impl.cpp @@ -55,7 +55,34 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd _rpc_started(false), _validate_partition_hash(validate_partition_hash), _full_scan(full_scan) + +{ +} + +int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey, + std::string &sortkey, + std::string &value, + int32_t &count_number) { + ::dsn::utils::notify_event op_completed; + int ret = -1; + auto callback = [&](int err, + std::string &&hash, + std::string &&sort, + std::string &&val, + internal_info &&ii, + uint32_t expire_ts_seconds, + int32_t kv_count) { + ret = err; + hashkey = std::move(hash); + sortkey = std::move(sort); + value = std::move(val); + count_number = kv_count; + op_completed.notify(); + }; + async_next(std::move(callback)); + op_completed.wait(); + return ret; } int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey, @@ -70,7 +97,8 @@ int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey, std::string &&sort, std::string &&val, internal_info &&ii, - uint32_t expire_ts_seconds) { + uint32_t expire_ts_seconds, + int32_t kv_count) { ret = err; hashkey = std::move(hash); sortkey = std::move(sort); @@ -139,7 +167,8 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal() std::string(), std::string(), std::move(info), - 0); + 0, + -1); } } return; @@ -162,13 +191,15 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal() } // valid data got - std::string hash_key, sort_key; - pegasus_restore_key(_kvs[_p].key, hash_key, sort_key); - std::string value(_kvs[_p].value.data(), _kvs[_p].value.length()); - uint32_t expire_ts_seconds = _kvs[_p].__isset.expire_ts_seconds - ? static_cast(_kvs[_p].expire_ts_seconds) - : 0; - + std::string hash_key, sort_key, value = ""; + uint32_t expire_ts_seconds = 0; + if (_kv_count == -1) { + pegasus_restore_key(_kvs[_p].key, hash_key, sort_key); + value = std::string(_kvs[_p].value.data(), _kvs[_p].value.length()); + if (_kvs[_p].__isset.expire_ts_seconds) { + expire_ts_seconds = static_cast(_kvs[_p].expire_ts_seconds); + } + } auto &callback = _queue.front(); if (callback) { internal_info info(_info); @@ -178,7 +209,8 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal() std::move(sort_key), std::move(value), std::move(info), - expire_ts_seconds); + expire_ts_seconds, + _kv_count); _lock.lock(); if (_queue.size() == 1) { // keep the last callback until exit this function @@ -196,6 +228,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_next_batch() { ::dsn::apps::scan_request req; req.context_id = _context; + req.__set_only_return_count(_options.only_return_count); dassert(!_rpc_started, ""); _rpc_started = true; @@ -230,6 +263,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_start_scan() req.__set_validate_partition_hash(_validate_partition_hash); req.__set_return_expire_ts(_options.return_expire_ts); req.__set_full_scan(_full_scan); + req.__set_only_return_count(_options.only_return_count); dassert(!_rpc_started, ""); _rpc_started = true; @@ -261,6 +295,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c _kvs = std::move(response.kvs); _p = -1; _context = response.context_id; + _kv_count = response.kv_count; _async_next_internal(); return; } else if (get_rocksdb_server_error(response.error) == PERR_NOT_FOUND) { @@ -288,7 +323,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c for (auto &callback : temp) { if (callback) { - callback(ret, std::string(), std::string(), std::string(), internal_info(info), 0); + callback(ret, std::string(), std::string(), std::string(), internal_info(info), 0, -1); } } } @@ -323,13 +358,15 @@ void pegasus_client_impl::pegasus_scanner_impl_wrapper::async_next( std::string &&sort_key, std::string &&value, internal_info &&info, - uint32_t expire_ts_seconds) { + uint32_t expire_ts_seconds, + int32_t kv_count) { user_callback(error_code, std::move(hash_key), std::move(sort_key), std::move(value), std::move(info), - expire_ts_seconds); + expire_ts_seconds, + kv_count); }); } diff --git a/src/geo/lib/geo_client.cpp b/src/geo/lib/geo_client.cpp index e8b60be5de..d4ab34d07f 100644 --- a/src/geo/lib/geo_client.cpp +++ b/src/geo/lib/geo_client.cpp @@ -862,7 +862,8 @@ void geo_client::do_scan(pegasus_client::pegasus_scanner_wrapper scanner_wrapper std::string &&geo_sort_key, std::string &&value, pegasus_client::internal_info &&info, - uint32_t expire_ts_seconds) mutable { + uint32_t expire_ts_seconds, + int32_t kv_count) mutable { if (ret == PERR_SCAN_COMPLETE) { cb(); return; diff --git a/src/include/pegasus/client.h b/src/include/pegasus/client.h index 22e945c260..a4fc651768 100644 --- a/src/include/pegasus/client.h +++ b/src/include/pegasus/client.h @@ -252,6 +252,7 @@ class pegasus_client std::string sort_key_filter_pattern; bool no_value; // only fetch hash_key and sort_key, but not fetch value bool return_expire_ts; + bool only_return_count; scan_options() : timeout_ms(5000), batch_size(100), @@ -260,7 +261,8 @@ class pegasus_client hash_key_filter_type(FT_NO_FILTER), sort_key_filter_type(FT_NO_FILTER), no_value(false), - return_expire_ts(false) + return_expire_ts(false), + only_return_count(false) { } scan_options(const scan_options &o) @@ -273,7 +275,8 @@ class pegasus_client sort_key_filter_type(o.sort_key_filter_type), sort_key_filter_pattern(o.sort_key_filter_pattern), no_value(o.no_value), - return_expire_ts(o.return_expire_ts) + return_expire_ts(o.return_expire_ts), + only_return_count(o.only_return_count) { } }; @@ -312,7 +315,8 @@ class pegasus_client std::string && /*sort_key*/, std::string && /*value*/, internal_info && /*info*/, - uint32_t /*expire_ts_seconds*/)> + uint32_t /*expire_ts_seconds*/, + int32_t /*kv_count*/)> async_scan_next_callback_t; typedef std::function async_get_scanner_callback_t; @@ -343,6 +347,11 @@ class pegasus_client std::string &value, internal_info *info = nullptr) = 0; + virtual int next(std::string &hashkey, + std::string &sortkey, + std::string &value, + int32_t &count_number) = 0; + /// /// \brief async get the next key-value pair of this scanner /// thread-safe diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 019854b637..5d446bcdd6 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -1198,7 +1198,8 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc) epoch_now, request.no_value, request.__isset.validate_partition_hash ? request.validate_partition_hash : true, - return_expire_ts); + return_expire_ts, + request.only_return_count ? false : true); switch (state) { case range_iteration_state::kNormal: count++; @@ -1221,6 +1222,10 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc) it->Next(); } + if (request.only_return_count) { + resp.kvs.emplace_back(::dsn::apps::key_value()); + resp.__set_kv_count(count); + } // check iteration time whether exceed limit if (!complete) { @@ -1297,7 +1302,10 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc) _pfc_recent_filter_count->add(filter_count); } - _cu_calculator->add_scan_cu(req, resp.error, resp.kvs); + // abandon calculate capacity unit + if (!request.only_return_count) { + _cu_calculator->add_scan_cu(req, resp.error, resp.kvs); + } _pfc_scan_latency->set(dsn_now_ns() - start_time); } @@ -1365,7 +1373,8 @@ void pegasus_server_impl::on_scan(scan_rpc rpc) epoch_now, no_value, validate_hash, - return_expire_ts); + return_expire_ts, + request.only_return_count ? false : true); switch (state) { case range_iteration_state::kNormal: count++; @@ -1389,6 +1398,11 @@ void pegasus_server_impl::on_scan(scan_rpc rpc) it->Next(); } + if (request.only_return_count) { + resp.kvs.emplace_back(::dsn::apps::key_value()); + resp.__set_kv_count(count); + } + // check iteration time whether exceed limit if (!complete) { limiter->time_check_after_incomplete_scan(); @@ -1449,7 +1463,10 @@ void pegasus_server_impl::on_scan(scan_rpc rpc) resp.error = rocksdb::Status::Code::kNotFound; } - _cu_calculator->add_scan_cu(req, resp.error, resp.kvs); + // abandon calculate capacity unit + if (request.only_return_count) { + _cu_calculator->add_scan_cu(req, resp.error, resp.kvs); + } _pfc_scan_latency->set(dsn_now_ns() - start_time); } @@ -2274,7 +2291,8 @@ pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_valu uint32_t epoch_now, bool no_value, bool request_validate_hash, - bool request_expire_ts) + bool request_expire_ts, + bool fill_value) { if (check_if_record_expired(epoch_now, value)) { if (_verbose_log) { @@ -2293,8 +2311,6 @@ pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_valu } } - ::dsn::apps::key_value kv; - // extract raw key ::dsn::blob raw_key(key.data(), 0, key.size()); if (hash_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER || @@ -2316,24 +2332,28 @@ pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_valu return range_iteration_state::kFiltered; } } - std::shared_ptr key_buf(::dsn::utils::make_shared_array(raw_key.length())); - ::memcpy(key_buf.get(), raw_key.data(), raw_key.length()); - kv.key.assign(std::move(key_buf), 0, raw_key.length()); + if (fill_value) { + ::dsn::apps::key_value kv; - // extract expire ts if necessary - if (request_expire_ts) { - auto expire_ts_seconds = - pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(value)); - kv.__set_expire_ts_seconds(static_cast(expire_ts_seconds)); - } + std::shared_ptr key_buf(::dsn::utils::make_shared_array(raw_key.length())); + ::memcpy(key_buf.get(), raw_key.data(), raw_key.length()); + kv.key.assign(std::move(key_buf), 0, raw_key.length()); - // extract value - if (!no_value) { - std::string value_buf(value.data(), value.size()); - pegasus_extract_user_data(_pegasus_data_version, std::move(value_buf), kv.value); + // extract expire ts if necessary + if (request_expire_ts) { + auto expire_ts_seconds = + pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(value)); + kv.__set_expire_ts_seconds(static_cast(expire_ts_seconds)); + } + + // extract value + if (!no_value) { + std::string value_buf(value.data(), value.size()); + pegasus_extract_user_data(_pegasus_data_version, std::move(value_buf), kv.value); + } + kvs.emplace_back(std::move(kv)); } - kvs.emplace_back(std::move(kv)); return range_iteration_state::kNormal; } diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index d7f955b4a8..7f304dd213 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -235,7 +235,8 @@ class pegasus_server_impl : public pegasus_read_service uint32_t epoch_now, bool no_value, bool request_validate_hash, - bool request_expire_ts); + bool request_expire_ts, + bool fill_value); range_iteration_state append_key_value_for_multi_get(std::vector<::dsn::apps::key_value> &kvs, diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h index 04f326562f..bb76f1765d 100644 --- a/src/shell/command_helper.h +++ b/src/shell/command_helper.h @@ -322,7 +322,8 @@ inline void scan_multi_data_next(scan_data_context *context) std::string &&sort_key, std::string &&value, pegasus::pegasus_client::internal_info &&info, - uint32_t expire_ts_seconds) { + uint32_t expire_ts_seconds, + uint32_t kv_count) { if (ret == pegasus::PERR_OK) { if (validate_filter(context, sort_key, value)) { bool ts_expired = false; @@ -401,9 +402,10 @@ inline void scan_data_next(scan_data_context *context) std::string &&sort_key, std::string &&value, pegasus::pegasus_client::internal_info &&info, - uint32_t expire_ts_seconds) { + uint32_t expire_ts_seconds, + int32_t kv_count) { if (ret == pegasus::PERR_OK) { - if (validate_filter(context, sort_key, value)) { + if (kv_count == -1 || validate_filter(context, sort_key, value)) { bool ts_expired = false; int ttl_seconds = 0; switch (context->op) { @@ -499,36 +501,40 @@ inline void scan_data_next(scan_data_context *context) context->timeout_ms); break; case SCAN_COUNT: - context->split_rows++; - if (context->stat_size && context->statistics) { - long hash_key_size = hash_key.size(); - context->statistics->measureTime( - static_cast(histogram_type::HASH_KEY_SIZE), - hash_key_size); - - long sort_key_size = sort_key.size(); - context->statistics->measureTime( - static_cast(histogram_type::SORT_KEY_SIZE), - sort_key_size); - - long value_size = value.size(); - context->statistics->measureTime( - static_cast(histogram_type::VALUE_SIZE), value_size); - - long row_size = hash_key_size + sort_key_size + value_size; - context->statistics->measureTime( - static_cast(histogram_type::ROW_SIZE), row_size); - - if (context->top_count > 0) { - context->top_rows.push( - std::move(hash_key), std::move(sort_key), row_size); + if (kv_count == -1) { + context->split_rows++; + if (context->stat_size && context->statistics) { + long hash_key_size = hash_key.size(); + context->statistics->measureTime( + static_cast(histogram_type::HASH_KEY_SIZE), + hash_key_size); + + long sort_key_size = sort_key.size(); + context->statistics->measureTime( + static_cast(histogram_type::SORT_KEY_SIZE), + sort_key_size); + + long value_size = value.size(); + context->statistics->measureTime( + static_cast(histogram_type::VALUE_SIZE), value_size); + + long row_size = hash_key_size + sort_key_size + value_size; + context->statistics->measureTime( + static_cast(histogram_type::ROW_SIZE), row_size); + + if (context->top_count > 0) { + context->top_rows.push( + std::move(hash_key), std::move(sort_key), row_size); + } } - } - if (context->count_hash_key) { - if (hash_key != context->last_hash_key) { - context->split_hash_key_count++; - context->last_hash_key = std::move(hash_key); + if (context->count_hash_key) { + if (hash_key != context->last_hash_key) { + context->split_hash_key_count++; + context->last_hash_key = std::move(hash_key); + } } + } else { + context->split_rows += kv_count; } scan_data_next(context); break; diff --git a/src/shell/commands/data_operations.cpp b/src/shell/commands/data_operations.cpp index d8cd3de946..7965b41636 100644 --- a/src/shell/commands/data_operations.cpp +++ b/src/shell/commands/data_operations.cpp @@ -2169,6 +2169,7 @@ bool clear_data(command_executor *e, shell_context *sc, arguments args) bool count_data(command_executor *e, shell_context *sc, arguments args) { static struct option long_options[] = {{"precise", no_argument, 0, 'c'}, + {"only_return_data_count", no_argument, 0, 'o'}, {"partition", required_argument, 0, 'p'}, {"max_batch_count", required_argument, 0, 'b'}, {"timeout_ms", required_argument, 0, 't'}, @@ -2211,7 +2212,7 @@ bool count_data(command_executor *e, shell_context *sc, arguments args) int option_index = 0; int c; c = getopt_long( - args.argc, args.argv, "cp:b:t:h:x:s:y:v:z:dan:r:", long_options, &option_index); + args.argc, args.argv, "cop:b:t:h:x:s:y:v:z:dan:r:", long_options, &option_index); if (c == -1) break; // input any valid parameter means you want to get precise count by scanning. @@ -2220,6 +2221,9 @@ bool count_data(command_executor *e, shell_context *sc, arguments args) case 'c': precise = true; break; + case 'o': + options.only_return_count = true; + break; case 'p': if (!dsn::buf2int32(optarg, partition)) { fprintf(stderr, "ERROR: parse %s as partition failed\n", optarg); @@ -2397,6 +2401,14 @@ bool count_data(command_executor *e, shell_context *sc, arguments args) options.no_value = false; else options.no_value = true; + + if (diff_hash_key || stat_size || value_filter_type != pegasus::pegasus_client::FT_NO_FILTER || sort_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) { + options.only_return_count = false; + } + if (options.only_return_count) { + fprintf(stderr, "INFO: scanner only return kv count, not return value\n"); + } + int ret = sc->pg_client->get_unordered_scanners(INT_MAX, options, raw_scanners); if (ret != pegasus::PERR_OK) { fprintf( diff --git a/src/shell/main.cpp b/src/shell/main.cpp index 0f2a3bd42e..53d1731e40 100644 --- a/src/shell/main.cpp +++ b/src/shell/main.cpp @@ -304,7 +304,8 @@ static command_executor commands[] = { "[-y|--sort_key_filter_pattern str] " "[-v|--value_filter_type anywhere|prefix|postfix|exact] " "[-z|--value_filter_pattern str][-d|--diff_hash_key] " - "[-a|--stat_size] [-n|--top_count num] [-r|--run_seconds num]", + "[-a|--stat_size] [-n|--top_count num] " + "[-o|--only_return_data_count] [-r|--run_seconds num]", data_operations, }, { diff --git a/src/test/function_test/test_scan.cpp b/src/test/function_test/test_scan.cpp index 853a098c5a..f5a658ef9a 100644 --- a/src/test/function_test/test_scan.cpp +++ b/src/test/function_test/test_scan.cpp @@ -168,6 +168,35 @@ class scan : public testing::Test } }; +TEST_F(scan, OVERALL_COUNT_ONLY) +{ + ddebug("TEST OVERALL_SCAN_COUNT_ONLY..."); + pegasus_client::scan_options options; + options.only_return_count = true; + std::vector scanners; + int ret = client->get_unordered_scanners(3, options, scanners); + ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error=" + << client->get_error_string(ret); + ASSERT_LE(scanners.size(), 3); + + std::string hash_key; + std::string sort_key; + std::string value; + int32_t data_counts = 0; + std::map> data; + for (auto scanner : scanners) { + ASSERT_NE(nullptr, scanner); + int32_t kv_count; + while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value, kv_count)))) { + data_counts += kv_count; + } + ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error=" + << client->get_error_string(ret); + delete scanner; + } + ASSERT_EQ(10990, data_counts); +} + TEST_F(scan, ALL_SORT_KEY) { ddebug("TESTING_HASH_SCAN, ALL SORT_KEYS ...."); @@ -402,7 +431,8 @@ TEST_F(scan, REQUEST_EXPIRE_TS) std::string &&sort_key, std::string &&value, pegasus::pegasus_client::internal_info &&info, - uint32_t expire_ts_seconds) { + uint32_t expire_ts_seconds, + uint32_t kv_count) { if (err == pegasus::PERR_OK) { check_and_put(data, hash_key, sort_key, value); if (expire_ts_seconds > 0) {