Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](scanner) Fix incorrect _max_thread_num in scanner context #40569

Merged
merged 9 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
RETURN_IF_ERROR(status);
if (_scanner_ctx) {
DCHECK(!_eos && _num_scanners->value() > 0);
RETURN_IF_ERROR(_scanner_ctx->init());
RETURN_IF_ERROR(_scanner_ctx->init(p.ignore_data_distribution()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可能把,这个参数,作为scanner context的构造函数的参数,传递给scanner context,这样你往2.1 pick的时候会比较方便。否则2.1 在多种pipeline 模型下,代码不好pick

}
_opened = true;
return status;
Expand Down Expand Up @@ -994,18 +994,9 @@ template <typename Derived>
Status ScanLocalState<Derived>::_start_scanners(
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners) {
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = vectorized::ScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
state()->scan_queue_mem_limit(), _scan_dependency,
// NOTE: This will logic makes _max_thread_num of ScannerContext to be C(num of cores) * 2
// For a query with C/2 instance and M scan node, scan task of this query will be C/2 * M * C*2
// and will be C*C*N at most.
// 1. If data distribution is ignored , we use 1 instance to scan.
// 2. Else if this operator is not file scan operator, we use config::doris_scanner_thread_pool_thread_num scanners to scan.
// 3. Else, file scanner will consume much memory so we use config::doris_scanner_thread_pool_thread_num / query_parallel_instance_num scanners to scan.
p.ignore_data_distribution() || !p.is_file_scan_operator()
? 1
: state()->query_parallel_instance_num());
_scanner_ctx = vectorized::ScannerContext::create_shared(state(), this, p._output_tuple_desc,
p.output_row_descriptor(), scanners,
p.limit(), _scan_dependency);
return Status::OK();
}

Expand Down
122 changes: 71 additions & 51 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ ScannerContext::ScannerContext(
RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners, int64_t limit_,
int64_t max_bytes_in_blocks_queue, std::shared_ptr<pipeline::Dependency> dependency,
const int num_parallel_instances)
std::shared_ptr<pipeline::Dependency> dependency)
: HasTaskExecutionCtx(state),
_state(state),
_local_state(local_state),
Expand All @@ -53,53 +52,102 @@ ScannerContext::ScannerContext(
_output_row_descriptor(output_row_descriptor),
_batch_size(state->batch_size()),
limit(limit_),
_max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) *
num_parallel_instances),
_scanner_scheduler(state->exec_env()->scanner_scheduler()),
_all_scanners(scanners.begin(), scanners.end()),
_num_parallel_instances(num_parallel_instances) {
_all_scanners(scanners.begin(), scanners.end()) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
_query_id = _state->get_query_ctx()->query_id();
ctx_id = UniqueId::gen_uid().to_string();
// Provide more memory for wide tables, increase proportionally by multiples of 300
_max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
if (scanners.empty()) {
_is_finished = true;
_set_scanner_done();
}
_scanners.enqueue_bulk(scanners.begin(), scanners.size());
if (limit < 0) {
limit = -1;
}
MAX_SCALE_UP_RATIO = _state->scanner_scale_up_ratio();
_query_thread_context = {_query_id, _state->query_mem_tracker(),
_state->get_query_ctx()->workload_group()};
_dependency = dependency;
}

// After init function call, should not access _parent
Status ScannerContext::init(bool ignore_data_distribution) {
_scanner_profile = _local_state->_scanner_profile;
_scanner_sched_counter = _local_state->_scanner_sched_counter;
_newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num;
_scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer;
_scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time;
_scale_up_scanners_counter = _local_state->_scale_up_scanners_counter;

#ifndef BE_TEST
// 3. get thread token
if (_state->get_query_ctx()) {
thread_token = _state->get_query_ctx()->get_token();
_simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler();
if (_simple_scan_scheduler) {
_should_reset_thread_name = false;
}
_remote_scan_task_scheduler = _state->get_query_ctx()->get_remote_scan_scheduler();
}
#endif
_local_state->_runtime_profile->add_info_string("UseSpecificThreadToken",
thread_token == nullptr ? "False" : "True");

int num_parallel_instances = _state->query_parallel_instance_num();

// NOTE: When ignore_data_distribution is true, the parallelism
// of the scan operator is regarded as 1 (actually maybe not).
// That will make the number of scan task can be submitted to the scheduler
// in a vary large value. This logicl is kept from the older implementation.
// https://github.com/apache/doris/pull/28266
if (ignore_data_distribution) {
num_parallel_instances = 1;
}

// _max_bytes_in_queue controls the maximum memory that can be used by a single scan instance.
// scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value
// is larger than 10MB.
_max_bytes_in_queue =
std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10);

// Provide more memory for wide tables, increase proportionally by multiples of 300
_max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;

// TODO: Where is the proper position to place this code?
if (_all_scanners.empty()) {
_is_finished = true;
_set_scanner_done();
}

// _max_thread_num controls how many scanners of this ScanOperator can be submitted to scheduler at a time.
// The overall target of our system is to make full utilization of the resources.
// At the same time, we dont want too many tasks are queued by scheduler, that makes the query
// waiting too long, and existing task can not be scheduled in time.
// First of all, we try to make sure _max_thread_num of a ScanNode of a query on a single backend is less than
// config::doris_scanner_thread_pool_thread_num.
// At the same time, we dont want too many tasks are queued by scheduler, that is not necessary.
// So, first of all, we try to make sure _max_thread_num of a ScanNode of a query on a single backend is less than
// 2 * config::doris_scanner_thread_pool_thread_num, so that we can make all io threads busy.
// For example, on a 64-core machine, the default value of config::doris_scanner_thread_pool_thread_num will be 64*2 =128.
// and the num_parallel_instances of this scan operator will be 64/2=32.
// For a query who has two scan nodes, the _max_thread_num of each scan node instance will be 128 / 32 = 4.
// We have 32 instances of this scan operator, so for the ScanNode, we have 4 * 32 = 128 scanner tasks can be submitted at a time.
// Remember that we have to ScanNode in this query, so the total number of scanner tasks can be submitted at a time is 128 * 2 = 256.
// For a query who has one scan nodes, the _max_thread_num of each scan node instance will be 2 * 128 / 32 = 8.
// We have 32 instances of this scan operator, so for the ScanNode, we have 8 * 32 = 256 scanner tasks can be submitted at a time.
// The thread pool of scanner is 128, that means we will have 128 tasks running in parallel and another 128 tasks are waiting in the queue.
// When first 128 tasks are finished, the next 128 tasks will be extricated from the queue and be executed,
// and another 128 tasks will be submitted to the queue if there are remaining.
_max_thread_num =
_state->num_scanner_threads() > 0
? _state->num_scanner_threads()
: config::doris_scanner_thread_pool_thread_num / num_parallel_instances;
: 2 * (config::doris_scanner_thread_pool_thread_num / num_parallel_instances);
_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
// In some situation, there are not too many big tablets involed, so we can reduce the thread number.
_max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size());
// NOTE: when _all_scanners.size is zero, the _max_thread_num will be 0.
_max_thread_num = std::min(_max_thread_num, (int32_t)_all_scanners.size());

// 1. Calculate max concurrency
// For select * from table limit 10; should just use one thread.
if (_local_state->should_run_serial()) {
_max_thread_num = 1;
}

// when user not specify scan_thread_num, so we can try downgrade _max_thread_num.
// becaue we found in a table with 5k columns, column reader may ocuppy too much memory.
// you can refer https://github.com/apache/doris/issues/35340 for details.
int32_t max_column_reader_num = state->query_options().max_column_reader_num;
int32_t max_column_reader_num = _state->query_options().max_column_reader_num;
if (_max_thread_num != 1 && max_column_reader_num > 0) {
int32_t scan_column_num = _output_tuple_desc->slots().size();
int32_t current_column_num = scan_column_num * _max_thread_num;
Expand All @@ -109,43 +157,15 @@ ScannerContext::ScannerContext(
if (new_max_thread_num < _max_thread_num) {
int32_t origin_max_thread_num = _max_thread_num;
_max_thread_num = new_max_thread_num;
LOG(INFO) << "downgrade query:" << print_id(state->query_id())
LOG(INFO) << "downgrade query:" << print_id(_state->query_id())
<< " scan's max_thread_num from " << origin_max_thread_num << " to "
<< _max_thread_num << ",column num: " << scan_column_num
<< ", max_column_reader_num: " << max_column_reader_num;
}
}
}

_query_thread_context = {_query_id, _state->query_mem_tracker(),
_state->get_query_ctx()->workload_group()};
_dependency = dependency;
}

// After init function call, should not access _parent
Status ScannerContext::init() {
_scanner_profile = _local_state->_scanner_profile;
_scanner_sched_counter = _local_state->_scanner_sched_counter;
_newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num;
_scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer;
_scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time;
_scale_up_scanners_counter = _local_state->_scale_up_scanners_counter;

#ifndef BE_TEST
// 3. get thread token
if (_state->get_query_ctx()) {
thread_token = _state->get_query_ctx()->get_token();
_simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler();
if (_simple_scan_scheduler) {
_should_reset_thread_name = false;
}
_remote_scan_task_scheduler = _state->get_query_ctx()->get_remote_scan_scheduler();
}
#endif

COUNTER_SET(_local_state->_max_scanner_thread_num, (int64_t)_max_thread_num);
_local_state->_runtime_profile->add_info_string("UseSpecificThreadToken",
thread_token == nullptr ? "False" : "True");

// submit `_max_thread_num` running scanners to `ScannerScheduler`
// When a running scanners is finished, it will submit one of the remaining scanners.
Expand Down
9 changes: 3 additions & 6 deletions be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
int64_t limit_, int64_t max_bytes_in_blocks_queue,
std::shared_ptr<pipeline::Dependency> dependency,
const int num_parallel_instances);
int64_t limit_, std::shared_ptr<pipeline::Dependency> dependency);

~ScannerContext() override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
Expand All @@ -118,7 +116,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
}
block.reset();
}
Status init();
Status init(bool ignore_data_distribution);

vectorized::BlockUPtr get_free_block(bool force);
void return_free_block(vectorized::BlockUPtr block);
Expand Down Expand Up @@ -210,7 +208,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
int64_t limit;

int32_t _max_thread_num = 0;
int64_t _max_bytes_in_queue;
int64_t _max_bytes_in_queue = 0;
doris::vectorized::ScannerScheduler* _scanner_scheduler;
SimplifiedScanScheduler* _simple_scan_scheduler = nullptr;
SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr;
Expand All @@ -220,7 +218,6 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
int32_t _num_running_scanners = 0;
// weak pointer for _scanners, used in stop function
std::vector<std::weak_ptr<ScannerDelegate>> _all_scanners;
const int _num_parallel_instances;
std::shared_ptr<RuntimeProfile> _scanner_profile;
RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,10 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = EXEC_MEM_LIMIT)
public long maxExecMemByte = 2147483648L;

@VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT)
@VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT,
description = {"每个 Scan Instance 的 block queue 能够保存多少字节的 block",
"How many bytes of block can be saved in the block queue of each Scan Instance"})
// 100MB
public long maxScanQueueMemByte = 2147483648L / 20;

@VariableMgr.VarAttr(name = NUM_SCANNER_THREADS, needForward = true, description = {
Expand Down
Loading