Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Optimize] Improve performance like/not like filter through pushdown function to storage engine #10355

Merged
merged 10 commits into from
Jul 19, 2022
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,8 @@ CONF_Int32(quick_compaction_batch_size, "10");
// do compaction min rowsets
CONF_Int32(quick_compaction_min_rowsets, "10");

CONF_mBool(enable_function_pushdown, "false");

// cooldown task configs
CONF_Int32(cooldown_thread_num, "5");
CONF_mInt64(generate_cooldown_task_interval_sec, "20");
Expand Down
93 changes: 82 additions & 11 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,9 @@ Status OlapScanNode::close(RuntimeState* state) {
}

VLOG_CRITICAL << "OlapScanNode::close()";
// pushed functions close
Expr::close(_pushed_func_conjunct_ctxs, state);

return ScanNode::close(state);
}

Expand Down Expand Up @@ -477,8 +480,10 @@ Status OlapScanNode::start_scan(RuntimeState* state) {
}

VLOG_CRITICAL << "BuildKeyRangesAndFilters";
// 3. Using ColumnValueRange to Build StorageEngine filters
// 3.1 Using ColumnValueRange to Build StorageEngine filters
RETURN_IF_ERROR(build_key_ranges_and_filters());
// 3.2 Function pushdown
if (config::enable_function_pushdown) RETURN_IF_ERROR(build_function_filters());

VLOG_CRITICAL << "Filter idle conjuncts";
// 4. Filter idle conjunct which already trans to olap filters
Expand All @@ -505,29 +510,34 @@ bool OlapScanNode::is_key_column(const std::string& key_name) {
}

void OlapScanNode::remove_pushed_conjuncts(RuntimeState* state) {
if (_pushed_conjuncts_index.empty()) {
if (_pushed_conjuncts_index.empty() && _pushed_func_conjuncts_index.empty()) {
return;
}

// dispose direct conjunct first
std::vector<ExprContext*> new_conjunct_ctxs;
for (int i = 0; i < _direct_conjunct_size; ++i) {
if (std::find(_pushed_conjuncts_index.cbegin(), _pushed_conjuncts_index.cend(), i) ==
_pushed_conjuncts_index.cend()) {
new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]);
if (!_pushed_conjuncts_index.empty() && _pushed_conjuncts_index.count(i)) {
_conjunct_ctxs[i]->close(state); // pushed condition, just close
} else if (!_pushed_func_conjuncts_index.empty() && _pushed_func_conjuncts_index.count(i)) {
_pushed_func_conjunct_ctxs.emplace_back(
_conjunct_ctxs[i]); // pushed functions, need keep ctxs
} else {
_conjunct_ctxs[i]->close(state);
new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]);
}
}

auto new_direct_conjunct_size = new_conjunct_ctxs.size();

// dispose hash join push down conjunct second
for (int i = _direct_conjunct_size; i < _conjunct_ctxs.size(); ++i) {
if (std::find(_pushed_conjuncts_index.cbegin(), _pushed_conjuncts_index.cend(), i) ==
_pushed_conjuncts_index.cend()) {
new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]);
if (!_pushed_conjuncts_index.empty() && _pushed_conjuncts_index.count(i)) {
_conjunct_ctxs[i]->close(state); // pushed condition, just close
} else if (!_pushed_func_conjuncts_index.empty() && _pushed_func_conjuncts_index.count(i)) {
_pushed_func_conjunct_ctxs.emplace_back(
_conjunct_ctxs[i]); // pushed functions, need keep ctxs
} else {
_conjunct_ctxs[i]->close(state);
new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]);
}
}

Expand Down Expand Up @@ -689,6 +699,67 @@ static std::string olap_filters_to_string(const std::vector<doris::TCondition>&
return filters_string;
}

Status OlapScanNode::build_function_filters() {
for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) {
ExprContext* ex_ctx = _conjunct_ctxs[conj_idx];
Expr* fn_expr = ex_ctx->root();
bool opposite = false;

if (TExprNodeType::COMPOUND_PRED == fn_expr->node_type() &&
TExprOpcode::COMPOUND_NOT == fn_expr->op()) {
fn_expr = fn_expr->get_child(0);
opposite = true;
}

// currently only support like / not like
if (TExprNodeType::FUNCTION_CALL == fn_expr->node_type() &&
"like" == fn_expr->fn().name.function_name) {
doris_udf::FunctionContext* func_cxt =
ex_ctx->fn_context(fn_expr->get_fn_context_index());

if (!func_cxt) {
continue;
}
if (fn_expr->children().size() != 2) {
continue;
}
SlotRef* slot_ref = nullptr;
Expr* literal_expr = nullptr;

if (TExprNodeType::SLOT_REF == fn_expr->get_child(0)->node_type()) {
literal_expr = fn_expr->get_child(1);
slot_ref = (SlotRef*)(fn_expr->get_child(0));
} else if (TExprNodeType::SLOT_REF == fn_expr->get_child(1)->node_type()) {
literal_expr = fn_expr->get_child(0);
slot_ref = (SlotRef*)(fn_expr->get_child(1));
} else {
continue;
}

if (TExprNodeType::STRING_LITERAL != literal_expr->node_type()) continue;

const SlotDescriptor* slot_desc = nullptr;
std::vector<SlotId> slot_ids;
slot_ref->get_slot_ids(&slot_ids);
for (SlotDescriptor* slot : _tuple_desc->slots()) {
if (slot->id() == slot_ids[0]) {
slot_desc = slot;
break;
}
}

if (!slot_desc) {
continue;
}
std::string col = slot_desc->col_name();
StringVal val = literal_expr->get_string_val(ex_ctx, nullptr);
_push_down_functions.emplace_back(opposite, col, func_cxt, val);
_pushed_func_conjuncts_index.insert(conj_idx);
}
}
return Status::OK();
}

Status OlapScanNode::build_key_ranges_and_filters() {
const std::vector<std::string>& column_names = _olap_scan_node.key_column_name;
const std::vector<TPrimitiveType::type>& column_types = _olap_scan_node.key_column_type;
Expand Down Expand Up @@ -873,7 +944,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) {
// so that scanner can be automatically deconstructed if prepare failed.
_scanner_pool.add(scanner);
RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, _olap_filter,
_bloom_filters_push_down));
_bloom_filters_push_down, _push_down_functions));

_olap_scanners.push_back(scanner);
disk_set.insert(scanner->scan_disk());
Expand Down
12 changes: 12 additions & 0 deletions be/src/exec/olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "exec/olap_scanner.h"
#include "exec/scan_node.h"
#include "exprs/bloomfilter_predicate.h"
#include "exprs/function_filter.h"
#include "exprs/in_predicate.h"
#include "runtime/descriptors.h"
#include "util/progress_updater.h"
Expand Down Expand Up @@ -108,6 +109,8 @@ class OlapScanNode : public ScanNode {
void eval_const_conjuncts();
Status normalize_conjuncts();
Status build_key_ranges_and_filters();
Status build_function_filters();

Status start_scan_thread(RuntimeState* state);

template <PrimitiveType T>
Expand Down Expand Up @@ -190,6 +193,15 @@ class OlapScanNode : public ScanNode {
std::vector<std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>>
_bloom_filters_push_down;

// push down functions to storage engine
// only support scalar functions, now just support like / not like
std::vector<FunctionFilter> _push_down_functions;
// functions conjunct's index which already be push down storage engine
std::set<uint32_t> _pushed_func_conjuncts_index;
// need keep these conjunct to the end of scan node,
// since some memory referenced by pushed function filters
std::vector<ExprContext*> _pushed_func_conjunct_ctxs;

// Pool for storing allocated scanner objects. We don't want to use the
// runtime pool to ensure that the scanner objects are deleted before this
// object is.
Expand Down
17 changes: 11 additions & 6 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool
Status OlapScanner::prepare(
const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
const std::vector<TCondition>& filters,
const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>&
bloom_filters) {
const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& bloom_filters,
const std::vector<FunctionFilter>& function_filters) {
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
set_tablet_reader();
// set limit to reduce end of rowset and segment mem use
Expand Down Expand Up @@ -124,8 +124,9 @@ Status OlapScanner::prepare(
}

{
// Initialize tablet_reader_params
RETURN_IF_ERROR(_init_tablet_reader_params(key_ranges, filters, bloom_filters));
// Initialize _params
RETURN_IF_ERROR(
_init_tablet_reader_params(key_ranges, filters, bloom_filters, function_filters));
}

return Status::OK();
Expand Down Expand Up @@ -157,8 +158,8 @@ Status OlapScanner::open() {
// it will be called under tablet read lock because capture rs readers need
Status OlapScanner::_init_tablet_reader_params(
const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters,
const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>&
bloom_filters) {
const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>& bloom_filters,
const std::vector<FunctionFilter>& function_filters) {
// if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
bool single_version =
(_tablet_reader_params.rs_readers.size() == 1 &&
Expand Down Expand Up @@ -193,6 +194,10 @@ Status OlapScanner::_init_tablet_reader_params(
std::inserter(_tablet_reader_params.bloom_filters,
_tablet_reader_params.bloom_filters.begin()));

std::copy(function_filters.cbegin(), function_filters.cend(),
std::inserter(_tablet_reader_params.function_filters,
_tablet_reader_params.function_filters.begin()));

// Range
for (auto key_range : key_ranges) {
if (key_range->begin_scan_range.size() == 1 &&
Expand Down
8 changes: 6 additions & 2 deletions be/src/exec/olap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include "exec/exec_node.h"
#include "exec/olap_utils.h"
#include "exprs/bloomfilter_predicate.h"
#include "exprs/expr.h"
#include "exprs/function_filter.h"
#include "gen_cpp/PaloInternalService_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "olap/tuple_reader.h"
Expand All @@ -48,7 +50,8 @@ class OlapScanner {
Status prepare(const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
const std::vector<TCondition>& filters,
const std::vector<std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>>&
bloom_filters);
bloom_filters,
const std::vector<FunctionFilter>& function_filters);

Status open();

Expand Down Expand Up @@ -92,7 +95,8 @@ class OlapScanner {
Status _init_tablet_reader_params(
const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters,
const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>&
bloom_filters);
bloom_filters,
const std::vector<FunctionFilter>& function_filters);
Status _init_return_columns(bool need_seq_col);
void _convert_row_to_tuple(Tuple* tuple);

Expand Down
1 change: 1 addition & 0 deletions be/src/exprs/expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ class Expr {
};

static Expr* copy(ObjectPool* pool, Expr* old_expr);
int get_fn_context_index() { return _fn_context_index; }

protected:
friend class AggFnEvaluator;
Expand Down
43 changes: 43 additions & 0 deletions be/src/exprs/function_filter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once

#include <memory>

#include "udf/udf.h"
#include "udf/udf_internal.h"

namespace doris {

class FunctionFilter {
public:
FunctionFilter(bool opposite, const std::string& col_name, doris_udf::FunctionContext* fn_ctx,
doris_udf::StringVal string_param)
: _opposite(opposite),
_col_name(col_name),
_fn_ctx(fn_ctx),
_string_param(string_param) {}

bool _opposite;
std::string _col_name;
// these pointer's life time controlled by scan node
doris_udf::FunctionContext* _fn_ctx;
doris_udf::StringVal
_string_param; // only one param from conjunct, because now only support like predicate
};

} // namespace doris
Loading