Skip to content

Commit df5d8c1

Browse files
authored
[ut](spill) add UT for partitioned hash join (apache#48432)
1 parent 6fbe9d4 commit df5d8c1

16 files changed

+2334
-20
lines changed

be/src/pipeline/exec/hashjoin_build_sink.h

+5-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#pragma once
1919

20+
#include "common/be_mock_util.h"
2021
#include "exprs/runtime_filter_slots.h"
2122
#include "join_build_sink_operator.h"
2223
#include "operator.h"
@@ -25,7 +26,7 @@ namespace doris::pipeline {
2526
#include "common/compile_check_begin.h"
2627
class HashJoinBuildSinkOperatorX;
2728

28-
class HashJoinBuildSinkLocalState final
29+
class HashJoinBuildSinkLocalState MOCK_REMOVE(final)
2930
: public JoinBuildSinkLocalState<HashJoinSharedState, HashJoinBuildSinkLocalState> {
3031
public:
3132
ENABLE_FACTORY_CREATOR(HashJoinBuildSinkLocalState);
@@ -56,7 +57,7 @@ class HashJoinBuildSinkLocalState final
5657

5758
Status disable_runtime_filters(RuntimeState* state);
5859

59-
[[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
60+
[[nodiscard]] MOCK_FUNCTION size_t get_reserve_mem_size(RuntimeState* state, bool eos);
6061

6162
protected:
6263
Status _hash_table_init(RuntimeState* state);
@@ -109,7 +110,7 @@ class HashJoinBuildSinkLocalState final
109110
RuntimeProfile::Counter* _runtime_filter_init_timer = nullptr;
110111
};
111112

112-
class HashJoinBuildSinkOperatorX final
113+
class HashJoinBuildSinkOperatorX MOCK_REMOVE(final)
113114
: public JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState> {
114115
public:
115116
HashJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id,
@@ -129,7 +130,7 @@ class HashJoinBuildSinkOperatorX final
129130

130131
[[nodiscard]] size_t get_memory_usage(RuntimeState* state) const;
131132

132-
std::string get_memory_usage_debug_str(RuntimeState* state) const;
133+
MOCK_FUNCTION std::string get_memory_usage_debug_str(RuntimeState* state) const;
133134

134135
bool should_dry_run(RuntimeState* state) override {
135136
return _is_broadcast_join && !state->get_sink_local_state()

be/src/pipeline/exec/hashjoin_probe_operator.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
#include <stdint.h>
2020

21+
#include "common/be_mock_util.h"
2122
#include "common/status.h"
2223
#include "operator.h"
2324
#include "pipeline/exec/join_probe_operator.h"
@@ -43,7 +44,7 @@ using HashTableCtxVariants =
4344
ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN>>;
4445

4546
class HashJoinProbeOperatorX;
46-
class HashJoinProbeLocalState final
47+
class HashJoinProbeLocalState MOCK_REMOVE(final)
4748
: public JoinProbeLocalState<HashJoinSharedState, HashJoinProbeLocalState> {
4849
public:
4950
using Parent = HashJoinProbeOperatorX;
@@ -124,7 +125,8 @@ class HashJoinProbeLocalState final
124125
RuntimeProfile::Counter* _non_equal_join_conjuncts_timer = nullptr;
125126
};
126127

127-
class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLocalState> {
128+
class HashJoinProbeOperatorX MOCK_REMOVE(final)
129+
: public JoinProbeOperatorX<HashJoinProbeLocalState> {
128130
public:
129131
HashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
130132
const DescriptorTbl& descs);

be/src/pipeline/exec/operator.h

+6-2
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,6 @@ class PipelineXSpillLocalState : public PipelineXLocalState<SharedStateArg> {
284284
Status init(RuntimeState* state, LocalStateInfo& info) override {
285285
RETURN_IF_ERROR(PipelineXLocalState<SharedStateArg>::init(state, info));
286286

287-
_spill_total_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTotalTime", 1);
288287
init_spill_read_counters();
289288

290289
return Status::OK();
@@ -317,6 +316,8 @@ class PipelineXSpillLocalState : public PipelineXLocalState<SharedStateArg> {
317316
}
318317

319318
void init_spill_read_counters() {
319+
_spill_total_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTotalTime", 1);
320+
320321
// Spill read counters
321322
_spill_recover_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillRecoverTime", 1);
322323

@@ -669,7 +670,11 @@ class PipelineXSpillSinkLocalState : public PipelineXSinkLocalState<SharedStateA
669670

670671
Status init(RuntimeState* state, LocalSinkStateInfo& info) override {
671672
RETURN_IF_ERROR(Base::init(state, info));
673+
init_spill_counters();
674+
return Status::OK();
675+
}
672676

677+
void init_spill_counters() {
673678
_spill_total_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTotalTime", 1);
674679

675680
_spill_write_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteTime", 1);
@@ -696,7 +701,6 @@ class PipelineXSpillSinkLocalState : public PipelineXSinkLocalState<SharedStateA
696701
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillMaxRowsOfPartition", TUnit::UNIT, 1);
697702
_spill_min_rows_of_partition =
698703
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillMinRowsOfPartition", TUnit::UNIT, 1);
699-
return Status::OK();
700704
}
701705

702706
std::vector<Dependency*> dependencies() const override {

be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp

+7-3
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
5858
"HashJoinProbeSpillDependency", true);
5959
state->get_task()->add_spill_dependency(_spill_dependency.get());
6060

61+
init_counters();
62+
return Status::OK();
63+
}
64+
65+
void PartitionedHashJoinProbeLocalState::init_counters() {
6166
_partition_timer = ADD_TIMER(profile(), "SpillPartitionTime");
6267
_partition_shuffle_timer = ADD_TIMER(profile(), "SpillPartitionShuffleTime");
6368
_spill_build_rows = ADD_COUNTER(profile(), "SpillBuildRows", TUnit::UNIT);
@@ -78,7 +83,6 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
7883
ADD_COUNTER_WITH_LEVEL(profile(), "ProbeBloksBytesInMem", TUnit::BYTES, 1);
7984
_memory_usage_reserved =
8085
ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved", TUnit::BYTES, 1);
81-
return Status::OK();
8286
}
8387

8488
#define UPDATE_COUNTER_FROM_INNER(name) \
@@ -417,7 +421,7 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
417421
st = spilled_stream->read_next_block_sync(&block, &eos);
418422
if (!st.ok()) {
419423
break;
420-
} else {
424+
} else if (!block.empty()) {
421425
COUNTER_UPDATE(_recovery_probe_rows, block.rows());
422426
COUNTER_UPDATE(_recovery_probe_blocks, 1);
423427
read_size += block.allocated_bytes();
@@ -786,7 +790,7 @@ size_t PartitionedHashJoinProbeOperatorX::_revocable_mem_size(RuntimeState* stat
786790
size_t PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* state) {
787791
auto& local_state = get_local_state(state);
788792
const auto need_to_spill = local_state._shared_state->need_to_spill;
789-
if (!need_to_spill || !local_state._child_eos) {
793+
if (!need_to_spill || local_state._child_eos) {
790794
return Base::get_reserve_mem_size(state);
791795
}
792796

be/src/pipeline/exec/partitioned_hash_join_probe_operator.h

+5-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <cstdint>
2121
#include <memory>
2222

23+
#include "common/be_mock_util.h"
2324
#include "common/status.h"
2425
#include "operator.h"
2526
#include "pipeline/dependency.h"
@@ -36,7 +37,7 @@ namespace pipeline {
3637

3738
class PartitionedHashJoinProbeOperatorX;
3839

39-
class PartitionedHashJoinProbeLocalState final
40+
class PartitionedHashJoinProbeLocalState MOCK_REMOVE(final)
4041
: public PipelineXSpillLocalState<PartitionedHashJoinSharedState> {
4142
public:
4243
using Parent = PartitionedHashJoinProbeOperatorX;
@@ -65,7 +66,9 @@ class PartitionedHashJoinProbeLocalState final
6566

6667
std::string debug_string(int indentation_level = 0) const override;
6768

68-
void update_profile_from_inner();
69+
MOCK_FUNCTION void update_profile_from_inner();
70+
71+
void init_counters();
6972

7073
friend class PartitionedHashJoinProbeOperatorX;
7174

be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
188188
-(inner_sink_state->_hash_table_memory_usage->value() +
189189
inner_sink_state->_build_arena_memory_usage->value()));
190190
}
191-
auto row_desc = p._child->row_desc();
191+
const auto& row_desc = p._child->row_desc();
192192
const auto num_slots = row_desc.num_slots();
193193
vectorized::Block build_block;
194194
int64_t block_old_mem = 0;

be/src/pipeline/exec/partitioned_hash_join_sink_operator.h

+5-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <atomic>
2323

24+
#include "common/be_mock_util.h"
2425
#include "common/status.h"
2526
#include "operator.h"
2627
#include "pipeline/exec/hashjoin_build_sink.h"
@@ -51,7 +52,7 @@ class PartitionedHashJoinSinkLocalState
5152
size_t revocable_mem_size(RuntimeState* state) const;
5253
[[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
5354
void update_memory_usage();
54-
void update_profile_from_inner();
55+
MOCK_FUNCTION void update_profile_from_inner();
5556

5657
Dependency* finishdependency() override;
5758

@@ -143,6 +144,9 @@ class PartitionedHashJoinSinkOperatorX
143144

144145
private:
145146
friend class PartitionedHashJoinSinkLocalState;
147+
#ifdef BE_TEST
148+
friend class PartitionedHashJoinSinkOperatorTest;
149+
#endif
146150

147151
const TJoinDistributionType::type _join_distribution;
148152

be/src/pipeline/exec/spill_utils.h

+7-4
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,16 @@ class SpillRunnable : public Runnable {
138138
return;
139139
}
140140

141+
Defer set_ready_defer([&] {
142+
if (_spill_dependency) {
143+
_spill_dependency->set_ready();
144+
}
145+
});
146+
141147
if (_state->is_cancelled()) {
142148
return;
143149
}
150+
144151
auto status = _spill_exec_func();
145152
if (!status.ok()) {
146153
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(_state->query_id(), status);
@@ -153,10 +160,6 @@ class SpillRunnable : public Runnable {
153160
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(_state->query_id(), status2);
154161
}
155162
}
156-
157-
if (_spill_dependency) {
158-
_spill_dependency->set_ready();
159-
}
160163
}
161164

162165
protected:

be/src/runtime/fragment_mgr.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include <unordered_map>
3232
#include <vector>
3333

34+
#include "common/be_mock_util.h"
3435
#include "common/status.h"
3536
#include "gutil/ref_counted.h"
3637
#include "http/rest_monitor_iface.h"
@@ -140,7 +141,7 @@ class FragmentMgr : public RestMonitorIface {
140141
std::shared_ptr<pipeline::PipelineFragmentContext>&&);
141142

142143
// Can be used in both version.
143-
void cancel_query(const TUniqueId query_id, const Status reason);
144+
MOCK_FUNCTION void cancel_query(const TUniqueId query_id, const Status reason);
144145

145146
void cancel_worker();
146147

0 commit comments

Comments
 (0)