From 8de3a442304bfc9f6cda192ec9fd0e30ec124f74 Mon Sep 17 00:00:00 2001 From: liguohao <948193394@qq.com> Date: Tue, 23 Aug 2022 14:35:19 +0800 Subject: [PATCH] cr8 --- idl/rrdb.thrift | 2 +- src/client_lib/pegasus_client_impl.h | 29 +++++++++------- src/client_lib/pegasus_scanner_impl.cpp | 46 +++++++++++++++++-------- src/include/pegasus/client.h | 18 ++++++++-- src/test/function_test/test_scan.cpp | 5 +-- 5 files changed, 67 insertions(+), 33 deletions(-) diff --git a/idl/rrdb.thrift b/idl/rrdb.thrift index b2aef17317..bff0d69313 100644 --- a/idl/rrdb.thrift +++ b/idl/rrdb.thrift @@ -295,7 +295,7 @@ struct scan_response 4:i32 app_id; 5:i32 partition_index; 6:string server; - 7:optional i32 kv_count = -1; + 7:optional i32 kv_count; } struct duplicate_request diff --git a/src/client_lib/pegasus_client_impl.h b/src/client_lib/pegasus_client_impl.h index a200949b57..42cc3462d0 100644 --- a/src/client_lib/pegasus_client_impl.h +++ b/src/client_lib/pegasus_client_impl.h @@ -247,21 +247,15 @@ class pegasus_client_impl : public pegasus_client static void init_error(); - enum class async_scan_type : char - { - NORMAL, - COUNT_ONLY, - COUNT_ONLY_FINISHED - }; - class pegasus_scanner_impl : public pegasus_scanner { public: int next(std::string &hashkey, std::string &sortkey, std::string &value, - internal_info *info = nullptr, - int32_t *count = nullptr) override; + internal_info *info = nullptr) override; + + int next(int32_t &count, internal_info *info = nullptr) override; void async_next(async_scan_next_callback_t &&) override; @@ -285,6 +279,13 @@ class pegasus_client_impl : public pegasus_client bool full_scan); private: + enum class async_scan_type : char + { + NORMAL, + COUNT_ONLY, + COUNT_ONLY_FINISHED + }; + ::dsn::apps::rrdb_client *_client; ::dsn::blob _start_key; ::dsn::blob _stop_key; @@ -330,13 +331,17 @@ class pegasus_client_impl : public pegasus_client void async_next(async_scan_next_callback_t &&callback) override; + int next(int32_t &count, internal_info *info = nullptr) override + { + return _p->next(count, info); + } + int next(std::string &hashkey, std::string &sortkey, std::string &value, - internal_info *info, - int32_t *count = nullptr) override + internal_info *info) override { - return _p->next(hashkey, sortkey, value, info, count); + return _p->next(hashkey, sortkey, value, info); } }; diff --git a/src/client_lib/pegasus_scanner_impl.cpp b/src/client_lib/pegasus_scanner_impl.cpp index 6f970e8fe6..0164f98b63 100644 --- a/src/client_lib/pegasus_scanner_impl.cpp +++ b/src/client_lib/pegasus_scanner_impl.cpp @@ -60,11 +60,33 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd { } +int pegasus_client_impl::pegasus_scanner_impl::next(int32_t &count, internal_info *info) +{ + ::dsn::utils::notify_event op_completed; + int ret = -1; + auto callback = [&](int err, + std::string &&hash, + std::string &&sort, + std::string &&val, + internal_info &&ii, + uint32_t expire_ts_seconds, + int32_t kv_count) { + ret = err; + if (info) { + (*info) = std::move(ii); + } + count = kv_count; + op_completed.notify(); + }; + async_next(std::move(callback)); + op_completed.wait(); + return ret; +} + int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey, std::string &sortkey, std::string &value, - internal_info *info, - int32_t *count) + internal_info *info) { ::dsn::utils::notify_event op_completed; int ret = -1; @@ -82,9 +104,6 @@ int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey, if (info) { (*info) = std::move(ii); } - if (count) { - (*count) = kv_count; - } op_completed.notify(); }; async_next(std::move(callback)); @@ -278,16 +297,15 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c _kvs = std::move(response.kvs); _p = -1; _context = response.context_id; - // 1. kv_count exist on response means server is newer version (added counting size only - // implementation) - // 1> kv_count == -1 indicates response still have key and value data - // 2> kv_count > -1 indicates response only have kv size count, but not key && value - // 2. kv_count is not existed means server is older version + // 1. kv_count exist on response mean (a && b): + // a> server is newer version (added counting size only implementation) + // b> response only have kv size count, but not key && value + // 2. kv_count is not existed means (a || b): + // a> server is older version + // b> response still have key and value data if (response.__isset.kv_count) { - if (response.kv_count != -1) { - _type = async_scan_type::COUNT_ONLY; - _kv_count = response.kv_count; - } + _type = async_scan_type::COUNT_ONLY; + _kv_count = response.kv_count; } _async_next_internal(); return; diff --git a/src/include/pegasus/client.h b/src/include/pegasus/client.h index d91a1746a8..68c9a38fef 100644 --- a/src/include/pegasus/client.h +++ b/src/include/pegasus/client.h @@ -345,8 +345,22 @@ class pegasus_client virtual int next(std::string &hashkey, std::string &sortkey, std::string &value, - internal_info *info = nullptr, - int32_t *count = nullptr) = 0; + internal_info *info = nullptr) = 0; + + /// + /// \brief get the next k-v pair count of this scanner + // only used for scanner which option only_return_count is true + /// thread-safe + /// \param count + /// data count value + /// \return + /// int, the error indicates whether or not the operation is succeeded. + /// this error can be converted to a string using get_error_string() + /// PERR_OK means a valid k-v pair count got + /// PERR_SCAN_COMPLETE means all k-v pair count have been return before this call + /// otherwise some error orrured + /// + virtual int next(int32_t &count, internal_info *info = nullptr) = 0; /// /// \brief async get the next key-value pair of this scanner diff --git a/src/test/function_test/test_scan.cpp b/src/test/function_test/test_scan.cpp index e3ed191a2e..410e447ae0 100644 --- a/src/test/function_test/test_scan.cpp +++ b/src/test/function_test/test_scan.cpp @@ -179,14 +179,11 @@ TEST_F(scan, OVERALL_COUNT_ONLY) << client->get_error_string(ret); ASSERT_LE(scanners.size(), 3); - std::string hash_key; - std::string sort_key; - std::string value; int32_t data_count = 0; for (auto scanner : scanners) { ASSERT_NE(nullptr, scanner); int32_t kv_count; - while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value, nullptr, &kv_count)))) { + while (PERR_OK == (ret = (scanner->next(kv_count)))) { data_count += kv_count; } ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="