Skip to content

Commit 07a2765

Browse files
committed
2
1 parent 90d2a17 commit 07a2765

File tree

9 files changed

+26
-25
lines changed

9 files changed

+26
-25
lines changed

be/src/olap/rowset/segment_v2/segment_iterator.cpp

+16-7
Original file line numberDiff line numberDiff line change
@@ -1287,6 +1287,7 @@ void SegmentIterator::_vec_init_lazy_materialization() {
12871287
std::set<ColumnId> non_pred_set(_non_predicate_columns.begin(),
12881288
_non_predicate_columns.end());
12891289

1290+
DCHECK(_second_read_column_ids.empty());
12901291
// _second_read_column_ids must be empty. Otherwise _lazy_materialization_read must not false.
12911292
for (int i = 0; i < _schema.num_column_ids(); i++) {
12921293
auto cid = _schema.column_id(i);
@@ -1704,15 +1705,19 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {
17041705
}
17051706

17061707
// block->rows() takes the size of the first column by default. If the first column is no predicate column,
1707-
// it has not been read yet. Specify the column that has been read to calculate rows().
1708-
block->set_effective_col(_schema_block_id_map[*_common_expr_columns.begin()]);
1708+
// it has not been read yet. add a const column that has been read to calculate rows().
1709+
if (block->rows() == 0) {
1710+
auto res_column = vectorized::ColumnString::create();
1711+
res_column->insert_data("", 0);
1712+
auto col_const = vectorized::ColumnConst::create(std::move(res_column),
1713+
selected_size);
1714+
block->replace_by_position(0, std::move(col_const));
1715+
}
17091716
DCHECK(block->columns() > _schema_block_id_map[*_common_expr_columns.begin()]);
17101717
_output_index_result_column(sel_rowid_idx, selected_size, block);
17111718

17121719
block->shrink_char_type_column_suffix_zero(_char_type_idx);
17131720
RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx, selected_size, block));
1714-
1715-
block->set_effective_col(INT_MIN);
17161721
}
17171722
} else if (_is_need_expr_eval) {
17181723
for (auto cid : _second_read_column_ids) {
@@ -1731,12 +1736,16 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {
17311736
sel_rowid_idx[i] = i;
17321737
}
17331738

1734-
block->set_effective_col(_schema_block_id_map[*_common_expr_columns.begin()]);
1739+
if (block->rows() == 0) {
1740+
auto res_column = vectorized::ColumnString::create();
1741+
res_column->insert_data("", 0);
1742+
auto col_const =
1743+
vectorized::ColumnConst::create(std::move(res_column), selected_size);
1744+
block->replace_by_position(0, std::move(col_const));
1745+
}
17351746
_output_index_result_column(nullptr, 0, block);
17361747
block->shrink_char_type_column_suffix_zero(_char_type_idx);
17371748
RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx, selected_size, block));
1738-
1739-
block->set_effective_col(INT_MIN);
17401749
}
17411750

17421751
if (UNLIKELY(_opts.record_rowids)) {

be/src/vec/core/block.cpp

-5
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ void Block::erase(size_t position) {
198198
DCHECK(position < data.size()) << fmt::format(
199199
"Position out of bound in Block::erase(), max position = {}", data.size() - 1);
200200

201-
DCHECK(position != _effective_col);
202201
erase_impl(position);
203202
}
204203

@@ -329,9 +328,6 @@ void Block::check_number_of_rows(bool allow_null_columns) const {
329328
}
330329

331330
size_t Block::rows() const {
332-
if (_effective_col != INT_MIN) {
333-
return data[_effective_col].column->size();
334-
}
335331
for (const auto& elem : data) {
336332
if (elem.column) {
337333
return elem.column->size();
@@ -628,7 +624,6 @@ void Block::clear() {
628624
data.clear();
629625
index_by_name.clear();
630626
row_same_bit.clear();
631-
_effective_col = INT_MIN;
632627
}
633628

634629
void Block::clear_column_data(int column_size) noexcept {

be/src/vec/core/block.h

-4
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ class Block {
7575
IndexByName index_by_name;
7676
std::vector<bool> row_same_bit;
7777

78-
int _effective_col = INT_MIN;
7978
int64_t _decompress_time_ns = 0;
8079
int64_t _decompressed_bytes = 0;
8180

@@ -203,9 +202,6 @@ class Block {
203202

204203
std::string each_col_size() const;
205204

206-
// Specify column index to calculate rows(), default rows() uses the size of the first column.
207-
void set_effective_col(int col) { _effective_col = col; }
208-
209205
// Cut the rows in block, use in LIMIT operation
210206
void set_num_rows(size_t length);
211207

be/src/vec/exec/scan/new_olap_scan_node.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -443,9 +443,9 @@ Status NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
443443
// add scanner to pool before doing prepare.
444444
// so that scanner can be automatically deconstructed if prepare failed.
445445
_scanner_pool.add(scanner);
446-
RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, _vconjunct_ctx_ptr.get(),
447-
_olap_filters, _filter_predicates,
448-
_push_down_functions, _common_vexpr_ctxs_pushdown));
446+
RETURN_IF_ERROR(scanner->prepare(
447+
*scan_range, scanner_ranges, _vconjunct_ctx_ptr.get(), _olap_filters,
448+
_filter_predicates, _push_down_functions, _common_vexpr_ctxs_pushdown.get()));
449449
scanners->push_back((VScanner*)scanner);
450450
disk_set.insert(scanner->scan_disk());
451451
}

be/src/vec/exec/scan/new_olap_scanner.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,10 @@ Status NewOlapScanner::prepare(const TPaloScanRange& scan_range,
5252
const std::vector<TCondition>& filters,
5353
const FilterPredicates& filter_predicates,
5454
const std::vector<FunctionFilter>& function_filters,
55-
VExprContext* common_vexpr_ctxs_pushdown) {
55+
VExprContext** common_vexpr_ctxs_pushdown) {
5656
if (common_vexpr_ctxs_pushdown != nullptr) {
5757
// Copy common_vexpr_ctxs_pushdown from scan node to this scanner's _common_vexpr_ctxs_pushdown, just necessary.
58-
RETURN_IF_ERROR(common_vexpr_ctxs_pushdown->clone(_state, &_common_vexpr_ctxs_pushdown));
58+
RETURN_IF_ERROR((*common_vexpr_ctxs_pushdown)->clone(_state, &_common_vexpr_ctxs_pushdown));
5959
}
6060

6161
// set limit to reduce end of rowset and segment mem use

be/src/vec/exec/scan/new_olap_scanner.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class NewOlapScanner : public VScanner {
4545
VExprContext** vconjunct_ctx_ptr, const std::vector<TCondition>& filters,
4646
const FilterPredicates& filter_predicates,
4747
const std::vector<FunctionFilter>& function_filters,
48-
VExprContext* common_vexpr_ctxs_pushdown);
48+
VExprContext** common_vexpr_ctxs_pushdown);
4949

5050
const std::string& scan_disk() const { return _tablet->data_dir()->path(); }
5151

be/src/vec/exec/scan/vscan_node.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ void VScanNode::release_resource(RuntimeState* state) {
389389
(*ctx)->close(state);
390390
}
391391
if (_common_vexpr_ctxs_pushdown) {
392-
_common_vexpr_ctxs_pushdown->close(state);
392+
(*_common_vexpr_ctxs_pushdown)->close(state);
393393
}
394394
_scanner_pool.clear();
395395

@@ -461,7 +461,7 @@ Status VScanNode::_normalize_conjuncts() {
461461
if (new_root) {
462462
(*_vconjunct_ctx_ptr)->set_root(new_root);
463463
if (_should_push_down_common_expr()) {
464-
_common_vexpr_ctxs_pushdown = *_vconjunct_ctx_ptr;
464+
_common_vexpr_ctxs_pushdown = std::move(_vconjunct_ctx_ptr);
465465
_vconjunct_ctx_ptr.reset(nullptr);
466466
}
467467
} else { // All conjucts are pushed down as predicate column

be/src/vec/exec/scan/vscan_node.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ class VScanNode : public ExecNode {
261261
// Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in this vector
262262
// so that it will be destroyed uniformly at the end of the query.
263263
std::vector<std::unique_ptr<VExprContext*>> _stale_vexpr_ctxs;
264-
VExprContext* _common_vexpr_ctxs_pushdown = nullptr;
264+
std::unique_ptr<VExprContext*> _common_vexpr_ctxs_pushdown = nullptr;
265265

266266
// If sort info is set, push limit to each scanner;
267267
int64_t _limit_per_scanner = -1;

fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java

+1
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,7 @@ public class SessionVariable implements Serializable, Writable {
769769
public void initFuzzyModeVariables() {
770770
Random random = new Random(System.currentTimeMillis());
771771
this.parallelExecInstanceNum = random.nextInt(8) + 1;
772+
this.enableCommonExprPushdown = random.nextBoolean();
772773
this.enableLocalExchange = random.nextBoolean();
773774
// This will cause be dead loop, disable it first
774775
// this.disableJoinReorder = random.nextBoolean();

0 commit comments

Comments
 (0)