Skip to content

Commit f8c012b

Browse files
yiguoleiDoris-Extras
authored andcommitted
[bugfix](scanmemory)modify scan memory profile (apache#41435)
1. memory usage in scan operator is empty. 2. peak memory usage is not in scan operator , it is in scanner. --------- Co-authored-by: yiguolei <yiguolei@gmail.com>
1 parent 3f578c5 commit f8c012b

File tree

5 files changed

+9
-9
lines changed

5 files changed

+9
-9
lines changed

be/src/pipeline/exec/operator.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ class PipelineXLocalStateBase {
199199
RuntimeProfile::Counter* _projection_timer = nullptr;
200200
RuntimeProfile::Counter* _exec_timer = nullptr;
201201
// Account for peak memory used by this node
202-
RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
202+
RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr;
203203
RuntimeProfile::Counter* _init_timer = nullptr;
204204
RuntimeProfile::Counter* _open_timer = nullptr;
205205
RuntimeProfile::Counter* _close_timer = nullptr;
@@ -371,7 +371,7 @@ class PipelineXSinkLocalStateBase {
371371
RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
372372
RuntimeProfile::Counter* _exec_timer = nullptr;
373373
RuntimeProfile::Counter* _memory_used_counter = nullptr;
374-
RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
374+
RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr;
375375

376376
std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
377377
};

be/src/pipeline/exec/scan_operator.cpp

+2-6
Original file line numberDiff line numberDiff line change
@@ -1052,17 +1052,13 @@ Status ScanLocalState<Derived>::_init_profile() {
10521052
_total_throughput_counter =
10531053
profile()->add_rate_counter("TotalReadThroughput", _rows_read_counter);
10541054
_num_scanners = ADD_COUNTER(_runtime_profile, "NumScanners", TUnit::UNIT);
1055+
_scanner_peak_memory_usage = _peak_memory_usage_counter;
1056+
//_runtime_profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES);
10551057

10561058
// 2. counters for scanners
10571059
_scanner_profile.reset(new RuntimeProfile("VScanner"));
10581060
profile()->add_child(_scanner_profile.get(), true, nullptr);
10591061

1060-
_memory_usage_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_scanner_profile, "MemoryUsage", 1);
1061-
_free_blocks_memory_usage =
1062-
_scanner_profile->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES, "MemoryUsage", 1);
1063-
_scanner_peak_memory_usage =
1064-
_scanner_profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES);
1065-
10661062
_newly_create_free_blocks_num =
10671063
ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", TUnit::UNIT);
10681064
_scale_up_scanners_counter = ADD_COUNTER(_scanner_profile, "NumScaleUpScanners", TUnit::UNIT);

be/src/pipeline/exec/scan_operator.h

-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public RuntimeFilterCon
119119
// time of filter output block from scanner
120120
RuntimeProfile::Counter* _filter_timer = nullptr;
121121
RuntimeProfile::Counter* _memory_usage_counter = nullptr;
122-
RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage = nullptr;
123122
RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
124123
// rows read from the scanner (including those discarded by (pre)filters)
125124
RuntimeProfile::Counter* _rows_read_counter = nullptr;

be/src/vec/exec/scan/scanner_context.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ Status ScannerContext::init() {
133133
_scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer;
134134
_scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time;
135135
_scale_up_scanners_counter = _local_state->_scale_up_scanners_counter;
136+
_scanner_memory_used_counter = _local_state->_memory_used_counter;
136137

137138
#ifndef BE_TEST
138139
// 3. get thread token
@@ -172,6 +173,7 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool force) {
172173
if (_free_blocks.try_dequeue(block)) {
173174
DCHECK(block->mem_reuse());
174175
_block_memory_usage -= block->allocated_bytes();
176+
_scanner_memory_used_counter->set(_block_memory_usage);
175177
// A free block is reused, so the memory usage should be decreased
176178
// The caller of get_free_block will increase the memory usage
177179
update_peak_memory_usage(-block->allocated_bytes());
@@ -187,6 +189,7 @@ void ScannerContext::return_free_block(vectorized::BlockUPtr block) {
187189
if (block->mem_reuse() && _block_memory_usage < _max_bytes_in_queue) {
188190
size_t block_size_to_reuse = block->allocated_bytes();
189191
_block_memory_usage += block_size_to_reuse;
192+
_scanner_memory_used_counter->set(_block_memory_usage);
190193
block->clear_column_data();
191194
if (_free_blocks.enqueue(std::move(block))) {
192195
update_peak_memory_usage(block_size_to_reuse);

be/src/vec/exec/scan/scanner_context.h

+2
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,8 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
225225
const int _num_parallel_instances;
226226
std::shared_ptr<RuntimeProfile> _scanner_profile;
227227
RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
228+
// This counter refers to scan operator's local state
229+
RuntimeProfile::Counter* _scanner_memory_used_counter = nullptr;
228230
RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
229231
RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
230232
RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;

0 commit comments

Comments
 (0)