Skip to content

Commit

Permalink
cr3
Browse files Browse the repository at this point in the history
  • Loading branch information
GehaFearless committed Aug 11, 2022
1 parent 21d1b66 commit 8d2fe21
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 70 deletions.
114 changes: 56 additions & 58 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1187,22 +1187,23 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)

limiter->add_count();

auto state = append_key_value_for_scan(
resp.kvs,
auto state = validate_key_value_for_scan(
it->key(),
it->value(),
request.hash_key_filter_type,
request.hash_key_filter_pattern,
request.sort_key_filter_type,
request.sort_key_filter_pattern,
epoch_now,
request.no_value,
request.__isset.validate_partition_hash ? request.validate_partition_hash : true,
return_expire_ts,
!request.only_return_count);
request.__isset.validate_partition_hash ? request.validate_partition_hash : true);

switch (state) {
case range_iteration_state::kNormal:
count++;
if (!request.only_return_count) {
append_key_value(
resp.kvs, it->key(), it->value(), request.no_value, return_expire_ts);
}
break;
case range_iteration_state::kExpired:
expire_count++;
Expand Down Expand Up @@ -1303,10 +1304,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
_pfc_recent_filter_count->add(filter_count);
}

// abandon calculate capacity unit
if (!request.only_return_count) {
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
}
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
_pfc_scan_latency->set(dsn_now_ns() - start_time);
}

Expand Down Expand Up @@ -1366,21 +1364,21 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)

limiter->add_count();

auto state = append_key_value_for_scan(resp.kvs,
it->key(),
it->value(),
hash_key_filter_type,
hash_key_filter_pattern,
sort_key_filter_type,
sort_key_filter_pattern,
epoch_now,
no_value,
validate_hash,
return_expire_ts,
!only_return_count);
auto state = validate_key_value_for_scan(it->key(),
it->value(),
hash_key_filter_type,
hash_key_filter_pattern,
sort_key_filter_type,
sort_key_filter_pattern,
epoch_now,
validate_hash);

switch (state) {
case range_iteration_state::kNormal:
count++;
if (!only_return_count) {
append_key_value(resp.kvs, it->key(), it->value(), no_value, return_expire_ts);
}
break;
case range_iteration_state::kExpired:
expire_count++;
Expand All @@ -1401,7 +1399,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
it->Next();
}

if (!only_return_count) {
if (only_return_count) {
resp.kvs.emplace_back(::dsn::apps::key_value());
resp.__set_kv_count(count);
}
Expand Down Expand Up @@ -1466,10 +1464,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
resp.error = rocksdb::Status::Code::kNotFound;
}

// abandon calculate capacity unit
if (only_return_count) {
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
}
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
_pfc_scan_latency->set(dsn_now_ns() - start_time);
}

Expand Down Expand Up @@ -2283,19 +2278,15 @@ bool pegasus_server_impl::validate_filter(::dsn::apps::filter_type::type filter_
return false;
}

range_iteration_state
pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_value> &kvs,
const rocksdb::Slice &key,
const rocksdb::Slice &value,
::dsn::apps::filter_type::type hash_key_filter_type,
const ::dsn::blob &hash_key_filter_pattern,
::dsn::apps::filter_type::type sort_key_filter_type,
const ::dsn::blob &sort_key_filter_pattern,
uint32_t epoch_now,
bool no_value,
bool request_validate_hash,
bool request_expire_ts,
bool fill_value)
range_iteration_state pegasus_server_impl::validate_key_value_for_scan(
const rocksdb::Slice &key,
const rocksdb::Slice &value,
::dsn::apps::filter_type::type hash_key_filter_type,
const ::dsn::blob &hash_key_filter_pattern,
::dsn::apps::filter_type::type sort_key_filter_type,
const ::dsn::blob &sort_key_filter_pattern,
uint32_t epoch_now,
bool request_validate_hash)
{
if (check_if_record_expired(epoch_now, value)) {
if (_verbose_log) {
Expand Down Expand Up @@ -2335,29 +2326,36 @@ pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_valu
return range_iteration_state::kFiltered;
}
}
if (fill_value) {
::dsn::apps::key_value kv;

std::shared_ptr<char> key_buf(::dsn::utils::make_shared_array<char>(raw_key.length()));
::memcpy(key_buf.get(), raw_key.data(), raw_key.length());
kv.key.assign(std::move(key_buf), 0, raw_key.length());
return range_iteration_state::kNormal;
}

// extract expire ts if necessary
if (request_expire_ts) {
auto expire_ts_seconds =
pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(value));
kv.__set_expire_ts_seconds(static_cast<int32_t>(expire_ts_seconds));
}
void pegasus_server_impl::append_key_value(std::vector<::dsn::apps::key_value> &kvs,
const rocksdb::Slice &key,
const rocksdb::Slice &value,
bool no_value,
bool request_expire_ts)
{
::dsn::apps::key_value kv;
::dsn::blob raw_key(key.data(), 0, key.size());
std::shared_ptr<char> key_buf(::dsn::utils::make_shared_array<char>(raw_key.length()));
::memcpy(key_buf.get(), raw_key.data(), raw_key.length());
kv.key.assign(std::move(key_buf), 0, raw_key.length());

// extract value
if (!no_value) {
std::string value_buf(value.data(), value.size());
pegasus_extract_user_data(_pegasus_data_version, std::move(value_buf), kv.value);
}
kvs.emplace_back(std::move(kv));
// extract expire ts if necessary
if (request_expire_ts) {
auto expire_ts_seconds =
pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(value));
kv.__set_expire_ts_seconds(static_cast<int32_t>(expire_ts_seconds));
}

return range_iteration_state::kNormal;
// extract value
if (!no_value) {
std::string value_buf(value.data(), value.size());
pegasus_extract_user_data(_pegasus_data_version, std::move(value_buf), kv.value);
}

kvs.emplace_back(std::move(kv));
}

range_iteration_state pegasus_server_impl::append_key_value_for_multi_get(
Expand Down
26 changes: 14 additions & 12 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,19 +224,21 @@ class pegasus_server_impl : public pegasus_read_service

void set_last_durable_decree(int64_t decree) { _last_durable_decree.store(decree); }

void append_key_value(std::vector<::dsn::apps::key_value> &kvs,
const rocksdb::Slice &key,
const rocksdb::Slice &value,
bool no_value,
bool request_expire_ts);

range_iteration_state
append_key_value_for_scan(std::vector<::dsn::apps::key_value> &kvs,
const rocksdb::Slice &key,
const rocksdb::Slice &value,
::dsn::apps::filter_type::type hash_key_filter_type,
const ::dsn::blob &hash_key_filter_pattern,
::dsn::apps::filter_type::type sort_key_filter_type,
const ::dsn::blob &sort_key_filter_pattern,
uint32_t epoch_now,
bool no_value,
bool request_validate_hash,
bool request_expire_ts,
bool fill_value);
validate_key_value_for_scan(const rocksdb::Slice &key,
const rocksdb::Slice &value,
::dsn::apps::filter_type::type hash_key_filter_type,
const ::dsn::blob &hash_key_filter_pattern,
::dsn::apps::filter_type::type sort_key_filter_type,
const ::dsn::blob &sort_key_filter_pattern,
uint32_t epoch_now,
bool request_validate_hash);

range_iteration_state
append_key_value_for_multi_get(std::vector<::dsn::apps::key_value> &kvs,
Expand Down

0 comments on commit 8d2fe21

Please sign in to comment.