Skip to content

Commit

Permalink
cr8
Browse files Browse the repository at this point in the history
  • Loading branch information
GehaFearless committed Aug 23, 2022
1 parent 0d0c64b commit 8de3a44
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 33 deletions.
2 changes: 1 addition & 1 deletion idl/rrdb.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ struct scan_response
4:i32 app_id;
5:i32 partition_index;
6:string server;
7:optional i32 kv_count = -1;
7:optional i32 kv_count;
}

struct duplicate_request
Expand Down
29 changes: 17 additions & 12 deletions src/client_lib/pegasus_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,21 +247,15 @@ class pegasus_client_impl : public pegasus_client

static void init_error();

enum class async_scan_type : char
{
NORMAL,
COUNT_ONLY,
COUNT_ONLY_FINISHED
};

class pegasus_scanner_impl : public pegasus_scanner
{
public:
int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
internal_info *info = nullptr,
int32_t *count = nullptr) override;
internal_info *info = nullptr) override;

int next(int32_t &count, internal_info *info = nullptr) override;

void async_next(async_scan_next_callback_t &&) override;

Expand All @@ -285,6 +279,13 @@ class pegasus_client_impl : public pegasus_client
bool full_scan);

private:
enum class async_scan_type : char
{
NORMAL,
COUNT_ONLY,
COUNT_ONLY_FINISHED
};

::dsn::apps::rrdb_client *_client;
::dsn::blob _start_key;
::dsn::blob _stop_key;
Expand Down Expand Up @@ -330,13 +331,17 @@ class pegasus_client_impl : public pegasus_client

void async_next(async_scan_next_callback_t &&callback) override;

int next(int32_t &count, internal_info *info = nullptr) override
{
return _p->next(count, info);
}

int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
internal_info *info,
int32_t *count = nullptr) override
internal_info *info) override
{
return _p->next(hashkey, sortkey, value, info, count);
return _p->next(hashkey, sortkey, value, info);
}
};

Expand Down
46 changes: 32 additions & 14 deletions src/client_lib/pegasus_scanner_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,33 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd
{
}

int pegasus_client_impl::pegasus_scanner_impl::next(int32_t &count, internal_info *info)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback = [&](int err,
std::string &&hash,
std::string &&sort,
std::string &&val,
internal_info &&ii,
uint32_t expire_ts_seconds,
int32_t kv_count) {
ret = err;
if (info) {
(*info) = std::move(ii);
}
count = kv_count;
op_completed.notify();
};
async_next(std::move(callback));
op_completed.wait();
return ret;
}

int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
std::string &sortkey,
std::string &value,
internal_info *info,
int32_t *count)
internal_info *info)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
Expand All @@ -82,9 +104,6 @@ int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
if (info) {
(*info) = std::move(ii);
}
if (count) {
(*count) = kv_count;
}
op_completed.notify();
};
async_next(std::move(callback));
Expand Down Expand Up @@ -278,16 +297,15 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c
_kvs = std::move(response.kvs);
_p = -1;
_context = response.context_id;
// 1. kv_count exist on response means server is newer version (added counting size only
// implementation)
// 1> kv_count == -1 indicates response still have key and value data
// 2> kv_count > -1 indicates response only have kv size count, but not key && value
// 2. kv_count is not existed means server is older version
// 1. kv_count exist on response mean (a && b):
// a> server is newer version (added counting size only implementation)
// b> response only have kv size count, but not key && value
// 2. kv_count is not existed means (a || b):
// a> server is older version
// b> response still have key and value data
if (response.__isset.kv_count) {
if (response.kv_count != -1) {
_type = async_scan_type::COUNT_ONLY;
_kv_count = response.kv_count;
}
_type = async_scan_type::COUNT_ONLY;
_kv_count = response.kv_count;
}
_async_next_internal();
return;
Expand Down
18 changes: 16 additions & 2 deletions src/include/pegasus/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,22 @@ class pegasus_client
virtual int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
internal_info *info = nullptr,
int32_t *count = nullptr) = 0;
internal_info *info = nullptr) = 0;

///
/// \brief get the next k-v pair count of this scanner
// only used for scanner which option only_return_count is true
/// thread-safe
/// \param count
/// data count value
/// \return
/// int, the error indicates whether or not the operation is succeeded.
/// this error can be converted to a string using get_error_string()
/// PERR_OK means a valid k-v pair count got
/// PERR_SCAN_COMPLETE means all k-v pair count have been return before this call
/// otherwise some error orrured
///
virtual int next(int32_t &count, internal_info *info = nullptr) = 0;

///
/// \brief async get the next key-value pair of this scanner
Expand Down
5 changes: 1 addition & 4 deletions src/test/function_test/test_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,11 @@ TEST_F(scan, OVERALL_COUNT_ONLY)
<< client->get_error_string(ret);
ASSERT_LE(scanners.size(), 3);

std::string hash_key;
std::string sort_key;
std::string value;
int32_t data_count = 0;
for (auto scanner : scanners) {
ASSERT_NE(nullptr, scanner);
int32_t kv_count;
while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value, nullptr, &kv_count)))) {
while (PERR_OK == (ret = (scanner->next(kv_count)))) {
data_count += kv_count;
}
ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
Expand Down

0 comments on commit 8de3a44

Please sign in to comment.