Skip to content

Commit

Permalink
cr 2
Browse files Browse the repository at this point in the history
  • Loading branch information
GehaFearless committed Aug 5, 2022
1 parent 71da038 commit d188ff6
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 61 deletions.
1 change: 0 additions & 1 deletion idl/rrdb.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ struct get_scanner_request
struct scan_request
{
1:i64 context_id;
2:optional bool only_return_count;
}

struct scan_response
Expand Down
21 changes: 5 additions & 16 deletions src/client_lib/pegasus_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,8 @@ class pegasus_client_impl : public pegasus_client
int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
internal_info *info = nullptr) override;

int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
int32_t &count_number) override;
internal_info *info = nullptr,
int32_t *count = nullptr) override;

void async_next(async_scan_next_callback_t &&) override;

Expand Down Expand Up @@ -329,17 +325,10 @@ class pegasus_client_impl : public pegasus_client
int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
int32_t &count_number) override
{
return _p->next(hashkey, sortkey, value, count_number);
}

int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
internal_info *info) override
internal_info *info,
int32_t *count = nullptr) override
{
return _p->next(hashkey, sortkey, value, info);
return _p->next(hashkey, sortkey, value, info, count);
}
};

Expand Down
37 changes: 7 additions & 30 deletions src/client_lib/pegasus_scanner_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,40 +55,14 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd
_rpc_started(false),
_validate_partition_hash(validate_partition_hash),
_full_scan(full_scan)

{
}

int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
std::string &sortkey,
std::string &value,
int32_t &count_number)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback = [&](int err,
std::string &&hash,
std::string &&sort,
std::string &&val,
internal_info &&ii,
uint32_t expire_ts_seconds,
int32_t kv_count) {
ret = err;
hashkey = std::move(hash);
sortkey = std::move(sort);
value = std::move(val);
count_number = kv_count;
op_completed.notify();
};
async_next(std::move(callback));
op_completed.wait();
return ret;
}

int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
std::string &sortkey,
std::string &value,
internal_info *info)
internal_info *info,
int32_t *count)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
Expand All @@ -106,6 +80,9 @@ int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
if (info) {
(*info) = std::move(ii);
}
if (count) {
(*count) = kv_count;
}
op_completed.notify();
};
async_next(std::move(callback));
Expand Down Expand Up @@ -191,8 +168,9 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
}

// valid data got
std::string hash_key, sort_key, value = "";
std::string hash_key, sort_key, value;
uint32_t expire_ts_seconds = 0;
// _kv_count == -1 means req just want to get data counts, not include data value
if (_kv_count == -1) {
pegasus_restore_key(_kvs[_p].key, hash_key, sort_key);
value = std::string(_kvs[_p].value.data(), _kvs[_p].value.length());
Expand Down Expand Up @@ -228,7 +206,6 @@ void pegasus_client_impl::pegasus_scanner_impl::_next_batch()
{
::dsn::apps::scan_request req;
req.context_id = _context;
req.__set_only_return_count(_options.only_return_count);

dassert(!_rpc_started, "");
_rpc_started = true;
Expand Down
8 changes: 2 additions & 6 deletions src/include/pegasus/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,8 @@ class pegasus_client
virtual int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
internal_info *info = nullptr) = 0;

virtual int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
int32_t &count_number) = 0;
internal_info *info = nullptr,
int32_t *count = nullptr) = 0;

///
/// \brief async get the next key-value pair of this scanner
Expand Down
7 changes: 5 additions & 2 deletions src/server/pegasus_scan_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ struct pegasus_scan_context
int32_t batch_size_,
bool no_value_,
bool validate_partition_hash_,
bool return_expire_ts_)
bool return_expire_ts_,
bool only_return_count_)
: _stop_holder(std::move(stop_)),
_hash_key_filter_pattern_holder(std::move(hash_key_filter_pattern_)),
_sort_key_filter_pattern_holder(std::move(sort_key_filter_pattern_)),
Expand All @@ -59,7 +60,8 @@ struct pegasus_scan_context
batch_size(batch_size_),
no_value(no_value_),
validate_partition_hash(validate_partition_hash_),
return_expire_ts(return_expire_ts_)
return_expire_ts(return_expire_ts_),
only_return_count(only_return_count_)
{
}

Expand All @@ -80,6 +82,7 @@ struct pegasus_scan_context
bool no_value;
bool validate_partition_hash;
bool return_expire_ts;
bool only_return_count;
};

class pegasus_context_cache
Expand Down
11 changes: 7 additions & 4 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,8 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
batch_count,
request.no_value,
request.__isset.validate_partition_hash ? request.validate_partition_hash : true,
return_expire_ts));
return_expire_ts,
request.only_return_count));
int64_t handle = _context_cache.put(std::move(context));
resp.context_id = handle;
// if the context is used, it will be fetched and re-put into cache,
Expand Down Expand Up @@ -1320,6 +1321,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_address;
bool only_return_count = false;

if (!_read_size_throttling_controller->available()) {
rpc.error() = dsn::ERR_BUSY;
Expand All @@ -1332,6 +1334,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
rocksdb::Iterator *it = context->iterator.get();
const rocksdb::Slice &stop = context->stop;
bool stop_inclusive = context->stop_inclusive;
only_return_count = context->only_return_count;
::dsn::apps::filter_type::type hash_key_filter_type = context->hash_key_filter_type;
const ::dsn::blob &hash_key_filter_pattern = context->hash_key_filter_pattern;
::dsn::apps::filter_type::type sort_key_filter_type = context->sort_key_filter_type;
Expand Down Expand Up @@ -1374,7 +1377,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
no_value,
validate_hash,
return_expire_ts,
!request.only_return_count);
!only_return_count);
switch (state) {
case range_iteration_state::kNormal:
count++;
Expand All @@ -1398,7 +1401,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
it->Next();
}

if (request.only_return_count) {
if (only_return_count) {
resp.kvs.emplace_back(::dsn::apps::key_value());
resp.__set_kv_count(count);
}
Expand Down Expand Up @@ -1464,7 +1467,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
}

// abandon calculate capacity unit
if (request.only_return_count) {
if (only_return_count) {
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
}
_pfc_scan_latency->set(dsn_now_ns() - start_time);
Expand Down
4 changes: 2 additions & 2 deletions src/test/function_test/test_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ TEST_F(scan, OVERALL_COUNT_ONLY)
for (auto scanner : scanners) {
ASSERT_NE(nullptr, scanner);
int32_t kv_count;
while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value, kv_count)))) {
while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value, nullptr, &kv_count)))) {
data_counts += kv_count;
}
ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
Expand Down Expand Up @@ -432,7 +432,7 @@ TEST_F(scan, REQUEST_EXPIRE_TS)
std::string &&value,
pegasus::pegasus_client::internal_info &&info,
uint32_t expire_ts_seconds,
uint32_t kv_count) {
int32_t kv_count) {
if (err == pegasus::PERR_OK) {
check_and_put(data, hash_key, sort_key, value);
if (expire_ts_seconds > 0) {
Expand Down

0 comments on commit d188ff6

Please sign in to comment.