Skip to content

Commit

Permalink
add filter on server
Browse files Browse the repository at this point in the history
  • Loading branch information
GehaFearless committed Aug 3, 2022
1 parent 71da038 commit 1be846c
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 10 deletions.
7 changes: 5 additions & 2 deletions idl/rrdb.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,9 @@ struct get_scanner_request
11:optional bool validate_partition_hash;
12:optional bool return_expire_ts;
13:optional bool full_scan; // true means client want to build 'full scan' context with the server side, false otherwise
14:optional bool only_return_count;
14:optional filter_type value_filter_type;
15:optional dsn.blob value_filter_pattern;
16:optional bool only_return_count = false;
}

struct scan_request
Expand All @@ -296,7 +298,8 @@ struct scan_response
4:i32 app_id;
5:i32 partition_index;
6:string server;
7:optional i32 kv_count = -1;
7:optional bool filter_on_server = false;
8:optional i32 kv_count = -1;
}

struct duplicate_request
Expand Down
1 change: 1 addition & 0 deletions src/client_lib/pegasus_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ class pegasus_client_impl : public pegasus_client
internal_info _info;
int32_t _p;
int32_t _kv_count;
bool _filter_on_server;

int64_t _context;
mutable ::dsn::zlock _lock;
Expand Down
19 changes: 18 additions & 1 deletion src/client_lib/pegasus_scanner_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
std::string &&val,
internal_info &&ii,
uint32_t expire_ts_seconds,
bool filter_on_server,
int32_t kv_count) {
ret = err;
hashkey = std::move(hash);
Expand All @@ -98,6 +99,7 @@ int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
std::string &&val,
internal_info &&ii,
uint32_t expire_ts_seconds,
bool filter_on_server,
int32_t kv_count) {
ret = err;
hashkey = std::move(hash);
Expand Down Expand Up @@ -168,6 +170,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
std::string(),
std::move(info),
0,
_filter_on_server,
-1);
}
}
Expand Down Expand Up @@ -210,6 +213,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
std::move(value),
std::move(info),
expire_ts_seconds,
_filter_on_server,
_kv_count);
_lock.lock();
if (_queue.size() == 1) {
Expand Down Expand Up @@ -259,6 +263,9 @@ void pegasus_client_impl::pegasus_scanner_impl::_start_scan()
req.sort_key_filter_type = (dsn::apps::filter_type::type)_options.sort_key_filter_type;
req.sort_key_filter_pattern = ::dsn::blob(
_options.sort_key_filter_pattern.data(), 0, _options.sort_key_filter_pattern.size());
req.__set_sort_key_filter_type((dsn::apps::filter_type::type)_options.sort_key_filter_type);
req.__set_sort_key_filter_pattern(::dsn::blob(
_options.sort_key_filter_pattern.data(), 0, _options.sort_key_filter_pattern.size()));
req.no_value = _options.no_value;
req.__set_validate_partition_hash(_validate_partition_hash);
req.__set_return_expire_ts(_options.return_expire_ts);
Expand Down Expand Up @@ -296,6 +303,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c
_p = -1;
_context = response.context_id;
_kv_count = response.kv_count;
_filter_on_server = response.filter_on_server;
_async_next_internal();
return;
} else if (get_rocksdb_server_error(response.error) == PERR_NOT_FOUND) {
Expand Down Expand Up @@ -323,7 +331,14 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c

for (auto &callback : temp) {
if (callback) {
callback(ret, std::string(), std::string(), std::string(), internal_info(info), 0, -1);
callback(ret,
std::string(),
std::string(),
std::string(),
internal_info(info),
0,
_filter_on_server,
-1);
}
}
}
Expand Down Expand Up @@ -359,13 +374,15 @@ void pegasus_client_impl::pegasus_scanner_impl_wrapper::async_next(
std::string &&value,
internal_info &&info,
uint32_t expire_ts_seconds,
bool filter_on_server,
int32_t kv_count) {
user_callback(error_code,
std::move(hash_key),
std::move(sort_key),
std::move(value),
std::move(info),
expire_ts_seconds,
filter_on_server,
kv_count);
});
}
Expand Down
1 change: 1 addition & 0 deletions src/geo/lib/geo_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,7 @@ void geo_client::do_scan(pegasus_client::pegasus_scanner_wrapper scanner_wrapper
std::string &&value,
pegasus_client::internal_info &&info,
uint32_t expire_ts_seconds,
bool filter_on_server,
int32_t kv_count) mutable {
if (ret == PERR_SCAN_COMPLETE) {
cb();
Expand Down
6 changes: 6 additions & 0 deletions src/include/pegasus/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ class pegasus_client
std::string hash_key_filter_pattern;
filter_type sort_key_filter_type;
std::string sort_key_filter_pattern;
filter_type value_filter_type;
std::string value_filter_pattern;
bool no_value; // only fetch hash_key and sort_key, but not fetch value
bool return_expire_ts;
bool only_return_count;
Expand All @@ -260,6 +262,7 @@ class pegasus_client
stop_inclusive(false),
hash_key_filter_type(FT_NO_FILTER),
sort_key_filter_type(FT_NO_FILTER),
value_filter_type(FT_NO_FILTER),
no_value(false),
return_expire_ts(false),
only_return_count(false)
Expand All @@ -274,6 +277,8 @@ class pegasus_client
hash_key_filter_pattern(o.hash_key_filter_pattern),
sort_key_filter_type(o.sort_key_filter_type),
sort_key_filter_pattern(o.sort_key_filter_pattern),
value_filter_type(o.value_filter_type),
value_filter_pattern(o.value_filter_pattern),
no_value(o.no_value),
return_expire_ts(o.return_expire_ts),
only_return_count(o.only_return_count)
Expand Down Expand Up @@ -316,6 +321,7 @@ class pegasus_client
std::string && /*value*/,
internal_info && /*info*/,
uint32_t /*expire_ts_seconds*/,
bool /*filter_on_server*/,
int32_t /*kv_count*/)>
async_scan_next_callback_t;
typedef std::function<void(int /*error_code*/, pegasus_scanner * /*hash_scanner*/)>
Expand Down
9 changes: 9 additions & 0 deletions src/server/pegasus_scan_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@ struct pegasus_scan_context
const std::string &&hash_key_filter_pattern_,
::dsn::apps::filter_type::type sort_key_filter_type_,
const std::string &&sort_key_filter_pattern_,
::dsn::apps::filter_type::type value_filter_type_,
const std::string &&value_filter_pattern_,
int32_t batch_size_,
bool no_value_,
bool validate_partition_hash_,
bool return_expire_ts_)
: _stop_holder(std::move(stop_)),
_hash_key_filter_pattern_holder(std::move(hash_key_filter_pattern_)),
_sort_key_filter_pattern_holder(std::move(sort_key_filter_pattern_)),
_value_filter_pattern_holder(std::move(value_filter_pattern_)),
iterator(std::move(iterator_)),
stop(_stop_holder.data(), _stop_holder.size()),
stop_inclusive(stop_inclusive_),
Expand All @@ -56,6 +59,9 @@ struct pegasus_scan_context
sort_key_filter_type(sort_key_filter_type_),
sort_key_filter_pattern(
_sort_key_filter_pattern_holder.data(), 0, _sort_key_filter_pattern_holder.length()),
value_filter_type(value_filter_type_),
value_filter_pattern(
_value_filter_pattern_holder.data(), 0, _value_filter_pattern_holder.length()),
batch_size(batch_size_),
no_value(no_value_),
validate_partition_hash(validate_partition_hash_),
Expand All @@ -67,6 +73,7 @@ struct pegasus_scan_context
std::string _stop_holder;
std::string _hash_key_filter_pattern_holder;
std::string _sort_key_filter_pattern_holder;
std::string _value_filter_pattern_holder;

public:
std::unique_ptr<rocksdb::Iterator> iterator;
Expand All @@ -76,6 +83,8 @@ struct pegasus_scan_context
dsn::blob hash_key_filter_pattern;
::dsn::apps::filter_type::type sort_key_filter_type;
dsn::blob sort_key_filter_pattern;
::dsn::apps::filter_type::type value_filter_type;
dsn::blob value_filter_pattern;
int32_t batch_size;
bool no_value;
bool validate_partition_hash;
Expand Down
32 changes: 32 additions & 0 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
resp.app_id = _gpid.get_app_id();
resp.partition_index = _gpid.get_partition_index();
resp.server = _primary_address;
resp.__set_filter_on_server(true);

if (!_read_size_throttling_controller->available()) {
rpc.error() = dsn::ERR_BUSY;
Expand Down Expand Up @@ -1087,6 +1088,18 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)

return;
}
if (!is_filter_type_supported(request.value_filter_type)) {
derror("%s: invalid argument for get_scanner from %s: "
"value filter type %d not supported",
replica_name(),
rpc.remote_address().to_string(),
request.value_filter_type);
resp.error = rocksdb::Status::kInvalidArgument;
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
_pfc_scan_latency->set(dsn_now_ns() - start_time);

return;
}

rocksdb::ReadOptions rd_opts(_data_cf_rd_opts);
if (_data_cf_opts.prefix_extractor) {
Expand Down Expand Up @@ -1195,6 +1208,8 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
request.hash_key_filter_pattern,
request.sort_key_filter_type,
request.sort_key_filter_pattern,
request.value_filter_type,
request.value_filter_pattern,
epoch_now,
request.no_value,
request.__isset.validate_partition_hash ? request.validate_partition_hash : true,
Expand Down Expand Up @@ -1276,6 +1291,8 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
request.sort_key_filter_type,
std::string(request.sort_key_filter_pattern.data(),
request.sort_key_filter_pattern.length()),
request.value_filter_type,
std::string(request.value_filter_pattern.data(), request.value_filter_pattern.length()),
batch_count,
request.no_value,
request.__isset.validate_partition_hash ? request.validate_partition_hash : true,
Expand Down Expand Up @@ -1336,6 +1353,8 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
const ::dsn::blob &hash_key_filter_pattern = context->hash_key_filter_pattern;
::dsn::apps::filter_type::type sort_key_filter_type = context->sort_key_filter_type;
const ::dsn::blob &sort_key_filter_pattern = context->sort_key_filter_pattern;
::dsn::apps::filter_type::type value_filter_type = context->sort_key_filter_type;
const ::dsn::blob &value_filter_pattern = context->sort_key_filter_pattern;
bool no_value = context->no_value;
bool validate_hash = context->validate_partition_hash;
bool return_expire_ts = context->return_expire_ts;
Expand Down Expand Up @@ -1370,6 +1389,8 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
hash_key_filter_pattern,
sort_key_filter_type,
sort_key_filter_pattern,
value_filter_type,
value_filter_pattern,
epoch_now,
no_value,
validate_hash,
Expand Down Expand Up @@ -2288,6 +2309,8 @@ pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_valu
const ::dsn::blob &hash_key_filter_pattern,
::dsn::apps::filter_type::type sort_key_filter_type,
const ::dsn::blob &sort_key_filter_pattern,
::dsn::apps::filter_type::type value_filter_type,
const ::dsn::blob &value_filter_pattern,
uint32_t epoch_now,
bool no_value,
bool request_validate_hash,
Expand All @@ -2314,6 +2337,7 @@ pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_valu
// extract raw key
::dsn::blob raw_key(key.data(), 0, key.size());
if (hash_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER ||
value_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER ||
sort_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER) {
::dsn::blob hash_key, sort_key;
pegasus_restore_key(raw_key, hash_key, sort_key);
Expand All @@ -2331,6 +2355,14 @@ pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_valu
}
return range_iteration_state::kFiltered;
}
::dsn::blob value_(value.data(), 0, value.size());
if (value_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER &&
!validate_filter(value_filter_type, value_filter_pattern, value_)) {
if (_verbose_log) {
derror("%s: value filtered for scan", replica_name());
}
return range_iteration_state::kFiltered;
}
}
if (fill_value) {
::dsn::apps::key_value kv;
Expand Down
2 changes: 2 additions & 0 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ class pegasus_server_impl : public pegasus_read_service
const ::dsn::blob &hash_key_filter_pattern,
::dsn::apps::filter_type::type sort_key_filter_type,
const ::dsn::blob &sort_key_filter_pattern,
::dsn::apps::filter_type::type value_filter_type,
const ::dsn::blob &value_filter_pattern,
uint32_t epoch_now,
bool no_value,
bool request_validate_hash,
Expand Down
24 changes: 18 additions & 6 deletions src/shell/command_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,15 +263,25 @@ inline bool validate_filter(pegasus::pegasus_client::filter_type filter_type,
return false;
}
// return true if the data is valid for the filter
inline bool
validate_filter(scan_data_context *context, const std::string &sort_key, const std::string &value)
inline bool validate_filter(scan_data_context *context,
const std::string &sort_key,
const std::string &value,
bool filter_on_server)
{
// for sort key, we only need to check MATCH_EXACT, because it is not supported
// we only need to check MATCH_EXACT, because it is not supported
// on the server side, but MATCH_PREFIX is already satisified.
if (context->sort_key_filter_type == pegasus::pegasus_client::FT_MATCH_EXACT &&
sort_key.length() > context->sort_key_filter_pattern.length())
return false;
return validate_filter(context->value_filter_type, context->value_filter_pattern, value);

if (!filter_on_server) {
return validate_filter(context->value_filter_type, context->value_filter_pattern, value);
}

if (context->value_filter_type == pegasus::pegasus_client::FT_MATCH_EXACT &&
value.length() > context->value_filter_pattern.length())
return false;
return true;
}

inline int compute_ttl_seconds(uint32_t expire_ts_seconds, bool &ts_expired)
Expand Down Expand Up @@ -323,9 +333,10 @@ inline void scan_multi_data_next(scan_data_context *context)
std::string &&value,
pegasus::pegasus_client::internal_info &&info,
uint32_t expire_ts_seconds,
bool filter_on_server,
uint32_t kv_count) {
if (ret == pegasus::PERR_OK) {
if (validate_filter(context, sort_key, value)) {
if (validate_filter(context, sort_key, value, filter_on_server)) {
bool ts_expired = false;
int ttl_seconds = 0;
ttl_seconds = compute_ttl_seconds(expire_ts_seconds, ts_expired);
Expand Down Expand Up @@ -403,9 +414,10 @@ inline void scan_data_next(scan_data_context *context)
std::string &&value,
pegasus::pegasus_client::internal_info &&info,
uint32_t expire_ts_seconds,
bool filter_on_server,
int32_t kv_count) {
if (ret == pegasus::PERR_OK) {
if (kv_count == -1 || validate_filter(context, sort_key, value)) {
if (validate_filter(context, sort_key, value, filter_on_server)) {
bool ts_expired = false;
int ttl_seconds = 0;
switch (context->op) {
Expand Down
10 changes: 9 additions & 1 deletion src/shell/commands/data_operations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2394,12 +2394,20 @@ bool count_data(command_executor *e, shell_context *sc, arguments args)
options.sort_key_filter_type = sort_key_filter_type;
options.sort_key_filter_pattern = sort_key_filter_pattern;
}
if (value_filter_type != pegasus::pegasus_client::FT_NO_FILTER) {
if (value_filter_type == pegasus::pegasus_client::FT_MATCH_EXACT)
options.value_filter_type = pegasus::pegasus_client::FT_MATCH_PREFIX;
else
options.value_filter_type = value_filter_type;
options.value_filter_pattern = value_filter_pattern;
}
if (stat_size || value_filter_type != pegasus::pegasus_client::FT_NO_FILTER)
options.no_value = false;
else
options.no_value = true;

if (diff_hash_key || stat_size || value_filter_type != pegasus::pegasus_client::FT_NO_FILTER ||
if (diff_hash_key || stat_size ||
value_filter_type == pegasus::pegasus_client::FT_MATCH_EXACT ||
sort_key_filter_type == pegasus::pegasus_client::FT_MATCH_EXACT) {
options.only_return_count = false;
}
Expand Down

0 comments on commit 1be846c

Please sign in to comment.