Skip to content

Commit 738ab8c

Browse files
committed
1
1 parent 71b7dcf commit 738ab8c

File tree

3 files changed

+31
-4
lines changed

3 files changed

+31
-4
lines changed

be/src/common/daemon.cpp

+19
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@
6363

6464
namespace doris {
6565

66+
CountDownLatch Daemon::_je_purge_dirty_pages_thread_latch {1};
67+
6668
void Daemon::tcmalloc_gc_thread() {
6769
// TODO All cache GC wish to be supported
6870
#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && \
@@ -352,6 +354,18 @@ void Daemon::block_spill_gc_thread() {
352354
}
353355
}
354356

357+
void Daemon::je_purge_dirty_pages_thread() const {
358+
_je_purge_dirty_pages_thread_latch.reset(1);
359+
do {
360+
_je_purge_dirty_pages_thread_latch.wait();
361+
if (_is_stopped) {
362+
break;
363+
}
364+
doris::MemInfo::je_purge_all_arena_dirty_pages();
365+
_je_purge_dirty_pages_thread_latch.reset(1);
366+
} while (true);
367+
}
368+
355369
void Daemon::start() {
356370
Status st;
357371
st = Thread::create(
@@ -381,6 +395,9 @@ void Daemon::start() {
381395
st = Thread::create(
382396
"Daemon", "block_spill_gc_thread", [this]() { this->block_spill_gc_thread(); },
383397
&_threads.emplace_back());
398+
st = Thread::create(
399+
"Daemon", "je_purge_dirty_pages_thread",
400+
[this]() { this->je_purge_dirty_pages_thread(); }, &_threads.emplace_back());
384401
CHECK(st.ok()) << st;
385402
}
386403

@@ -390,7 +407,9 @@ void Daemon::stop() {
390407
LOG(INFO) << "Doris daemon stop returned since no bg threads latch.";
391408
return;
392409
}
410+
_is_stopped = true;
393411
_stop_background_threads_latch.count_down();
412+
_je_purge_dirty_pages_thread_latch.count_down();
394413
for (auto&& t : _threads) {
395414
if (t) {
396415
t->join();

be/src/common/daemon.h

+7
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,22 @@ class Daemon {
3636
// Stop background threads
3737
void stop();
3838

39+
static void count_down_je_purge_dirty_pages_thread_latch() {
40+
_je_purge_dirty_pages_thread_latch.count_down();
41+
}
42+
3943
private:
4044
void tcmalloc_gc_thread();
4145
void memory_maintenance_thread();
4246
void memory_gc_thread();
4347
void memtable_memory_limiter_tracker_refresh_thread();
4448
void calculate_metrics_thread();
4549
void block_spill_gc_thread();
50+
void je_purge_dirty_pages_thread() const;
4651

4752
CountDownLatch _stop_background_threads_latch;
53+
static CountDownLatch _je_purge_dirty_pages_thread_latch;
54+
bool _is_stopped {};
4855
std::vector<scoped_refptr<Thread>> _threads;
4956
};
5057
} // namespace doris

be/src/util/mem_info.cpp

+5-4
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include <vector>
3838

3939
#include "common/config.h"
40+
#include "common/daemon.h"
4041
#include "common/status.h"
4142
#include "gutil/strings/split.h"
4243
#include "runtime/exec_env.h"
@@ -129,7 +130,7 @@ bool MemInfo::process_minor_gc() {
129130
std::string pre_sys_mem_available = MemInfo::sys_mem_available_str();
130131

131132
Defer defer {[&]() {
132-
je_purge_all_arena_dirty_pages();
133+
Daemon::count_down_je_purge_dirty_pages_thread_latch();
133134
std::stringstream ss;
134135
profile->pretty_print(&ss);
135136
LOG(INFO) << fmt::format(
@@ -139,7 +140,7 @@ bool MemInfo::process_minor_gc() {
139140
}};
140141

141142
freed_mem += CacheManager::instance()->for_each_cache_prune_stale(profile.get());
142-
je_purge_all_arena_dirty_pages();
143+
Daemon::count_down_je_purge_dirty_pages_thread_latch();
143144
if (freed_mem > _s_process_minor_gc_size) {
144145
return true;
145146
}
@@ -180,7 +181,7 @@ bool MemInfo::process_full_gc() {
180181
std::string pre_sys_mem_available = MemInfo::sys_mem_available_str();
181182

182183
Defer defer {[&]() {
183-
je_purge_all_arena_dirty_pages();
184+
Daemon::count_down_je_purge_dirty_pages_thread_latch();
184185
std::stringstream ss;
185186
profile->pretty_print(&ss);
186187
LOG(INFO) << fmt::format(
@@ -190,7 +191,7 @@ bool MemInfo::process_full_gc() {
190191
}};
191192

192193
freed_mem += CacheManager::instance()->for_each_cache_prune_all(profile.get());
193-
je_purge_all_arena_dirty_pages();
194+
Daemon::count_down_je_purge_dirty_pages_thread_latch();
194195
if (freed_mem > _s_process_full_gc_size) {
195196
return true;
196197
}

0 commit comments

Comments
 (0)