Skip to content

Commit

Permalink
[opt](parquet) add predicate filter time for parquet reader
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Aug 27, 2024
1 parent f75f7ae commit 30d6a8a
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 9 deletions.
38 changes: 29 additions & 9 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,12 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
}
IColumn::Filter result_filter(block->rows(), 1);
bool can_filter_all = false;
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
_filter_conjuncts, &filters, block, &result_filter, &can_filter_all));

{
SCOPED_RAW_TIMER(&_predicate_filter_time);
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
_filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
}

if (can_filter_all) {
for (auto& col : columns_to_filter) {
Expand Down Expand Up @@ -450,7 +454,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
columns_to_filter[i] = i;
}
IColumn::Filter result_filter;
while (true) {
while (!_state->is_cancelled()) {
// read predicate columns
pre_read_rows = 0;
pre_eof = false;
Expand Down Expand Up @@ -485,8 +489,12 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
for (auto& conjunct : _filter_conjuncts) {
filter_contexts.emplace_back(conjunct);
}
RETURN_IF_ERROR(VExprContext::execute_conjuncts(filter_contexts, &filters, block,
&result_filter, &can_filter_all));

{
SCOPED_RAW_TIMER(&_predicate_filter_time);
RETURN_IF_ERROR(VExprContext::execute_conjuncts(filter_contexts, &filters, block,
&result_filter, &can_filter_all));
}

if (_lazy_read_ctx.resize_first_column) {
// We have to clean the first column to insert right data.
Expand Down Expand Up @@ -523,6 +531,10 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
break;
}
}
if (_state->is_cancelled()) {
return Status::Cancelled("cancelled");
}

if (select_vector_ptr == nullptr) {
DCHECK_EQ(pre_read_rows + _cached_filtered_rows, 0);
*read_rows = 0;
Expand Down Expand Up @@ -584,8 +596,13 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
RETURN_IF_ERROR(_fill_partition_columns(block, column_size, _lazy_read_ctx.partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, column_size, _lazy_read_ctx.missing_columns));
if (!_not_single_slot_filter_conjuncts.empty()) {
RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
_not_single_slot_filter_conjuncts, block, columns_to_filter, origin_column_num)));
{
SCOPED_RAW_TIMER(&_predicate_filter_time);
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
_not_single_slot_filter_conjuncts, block, columns_to_filter,
origin_column_num)));
}
}
return Status::OK();
}
Expand Down Expand Up @@ -843,8 +860,11 @@ Status RowGroupReader::_rewrite_dict_predicates() {
// The following process may be tricky and time-consuming, but we have no other way.
temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size);
}
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts_and_filter_block(
ctxs, &temp_block, columns_to_filter, column_to_keep));
{
SCOPED_RAW_TIMER(&_predicate_filter_time);
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts_and_filter_block(
ctxs, &temp_block, columns_to_filter, column_to_keep));
}
if (dict_pos != 0) {
// We have to clean the first column to insert right data.
temp_block.get_by_position(0).column->assume_mutable()->clear();
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class RowGroupReader : public ProfileCollector {
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts);
Status next_batch(Block* block, size_t batch_size, size_t* read_rows, bool* batch_eof);
int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows; }
int64_t predicate_filter_time() const { return _predicate_filter_time; }

ParquetColumnReader::Statistics statistics();
void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }
Expand Down Expand Up @@ -211,6 +212,7 @@ class RowGroupReader : public ProfileCollector {

const LazyReadContext& _lazy_read_ctx;
int64_t _lazy_read_filtered_rows = 0;
int64_t _predicate_filter_time = 0;
// If continuous batches are skipped, we can cache them to skip a whole page
size_t _cached_filtered_rows = 0;
std::unique_ptr<IColumn::Filter> _pos_delete_filter_ptr;
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ void ParquetReader::_init_profile() {
_profile, "SkipPageHeaderNum", TUnit::UNIT, parquet_profile, 1);
_parquet_profile.parse_page_header_num = ADD_CHILD_COUNTER_WITH_LEVEL(
_profile, "ParsePageHeaderNum", TUnit::UNIT, parquet_profile, 1);
_parquet_profile.predicate_filter_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PredicateFilterTime", parquet_profile, 1);
}
}

Expand Down Expand Up @@ -595,6 +597,7 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof)
auto column_st = _current_group_reader->statistics();
_column_statistics.merge(column_st);
_statistics.lazy_read_filtered_rows += _current_group_reader->lazy_read_filtered_rows();
_statistics.predicate_filter_time += _current_group_reader->predicate_filter_time();
if (_read_row_groups.size() == 0) {
*eof = true;
} else {
Expand Down Expand Up @@ -1042,6 +1045,7 @@ void ParquetReader::_collect_profile() {
COUNTER_UPDATE(_parquet_profile.skip_page_header_num, _column_statistics.skip_page_header_num);
COUNTER_UPDATE(_parquet_profile.parse_page_header_num,
_column_statistics.parse_page_header_num);
COUNTER_UPDATE(_parquet_profile.predicate_filter_time, _statistics.predicate_filter_time);
COUNTER_UPDATE(_parquet_profile.file_read_time, _column_statistics.read_time);
COUNTER_UPDATE(_parquet_profile.file_read_calls, _column_statistics.read_calls);
COUNTER_UPDATE(_parquet_profile.file_meta_read_calls, _column_statistics.meta_read_calls);
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class ParquetReader : public GenericReader {
int64_t page_index_filter_time = 0;
int64_t read_page_index_time = 0;
int64_t parse_page_index_time = 0;
int64_t predicate_filter_time = 0;
};

ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
Expand Down Expand Up @@ -188,6 +189,7 @@ class ParquetReader : public GenericReader {
RuntimeProfile::Counter* decode_null_map_time = nullptr;
RuntimeProfile::Counter* skip_page_header_num = nullptr;
RuntimeProfile::Counter* parse_page_header_num = nullptr;
RuntimeProfile::Counter* predicate_filter_time = nullptr;
};

Status _open_file();
Expand Down

0 comments on commit 30d6a8a

Please sign in to comment.