Skip to content

Commit

Permalink
[improve](load) limit flush thread num by CPU count (apache#33325)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored Apr 27, 2024
1 parent d52a426 commit a9040c8
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 5 deletions.
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,9 @@ DEFINE_mInt64(storage_flood_stage_left_capacity_bytes, "1073741824"); // 1GB
DEFINE_Int32(flush_thread_num_per_store, "6");
// number of thread for flushing memtable per store, for high priority load task
DEFINE_Int32(high_priority_flush_thread_num_per_store, "6");
// number of threads = min(flush_thread_num_per_store * num_store,
// max_flush_thread_num_per_cpu * num_cpu)
DEFINE_Int32(max_flush_thread_num_per_cpu, "4");

// config for tablet meta checkpoint
DEFINE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10");
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,9 @@ DECLARE_mInt64(storage_flood_stage_left_capacity_bytes); // 1GB
DECLARE_Int32(flush_thread_num_per_store);
// number of thread for flushing memtable per store, for high priority load task
DECLARE_Int32(high_priority_flush_thread_num_per_store);
// number of threads = min(flush_thread_num_per_store * num_store,
// max_flush_thread_num_per_cpu * num_cpu)
DECLARE_Int32(max_flush_thread_num_per_cpu);

// config for tablet meta checkpoint
DECLARE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num);
Expand Down
11 changes: 8 additions & 3 deletions be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,20 @@ void FlushToken::_flush_memtable(std::unique_ptr<MemTable> memtable_ptr, int32_t

void MemTableFlushExecutor::init(int num_disk) {
num_disk = std::max(1, num_disk);
size_t min_threads = std::max(1, config::flush_thread_num_per_store);
size_t max_threads = num_disk * min_threads;
int num_cpus = std::thread::hardware_concurrency();
int min_threads = std::max(1, config::flush_thread_num_per_store);
int max_threads = num_cpus == 0 ? num_disk * min_threads
: std::min(num_disk * min_threads,
num_cpus * config::max_flush_thread_num_per_cpu);
static_cast<void>(ThreadPoolBuilder("MemTableFlushThreadPool")
.set_min_threads(min_threads)
.set_max_threads(max_threads)
.build(&_flush_pool));

min_threads = std::max(1, config::high_priority_flush_thread_num_per_store);
max_threads = num_disk * min_threads;
max_threads = num_cpus == 0 ? num_disk * min_threads
: std::min(num_disk * min_threads,
num_cpus * config::max_flush_thread_num_per_cpu);
static_cast<void>(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool")
.set_min_threads(min_threads)
.set_max_threads(max_threads)
Expand Down
8 changes: 6 additions & 2 deletions be/src/runtime/load_stream_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,13 @@ LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num,
: _num_threads(segment_file_writer_thread_num),
_heavy_work_pool(heavy_work_pool),
_light_work_pool(light_work_pool) {
uint32_t num_cpu = std::thread::hardware_concurrency();
uint32_t thread_num = num_cpu == 0 ? segment_file_writer_thread_num
: std::min(segment_file_writer_thread_num,
num_cpu * config::max_flush_thread_num_per_cpu);
static_cast<void>(ThreadPoolBuilder("SegmentFileWriterThreadPool")
.set_min_threads(segment_file_writer_thread_num)
.set_max_threads(segment_file_writer_thread_num)
.set_min_threads(thread_num)
.set_max_threads(thread_num)
.build(&_file_writer_thread_pool));
}

Expand Down

0 comments on commit a9040c8

Please sign in to comment.