Skip to content

Commit 07eb1d0

Browse files
xinyiZzzstephen
authored and
stephen
committed
[fix](memory) Add thread asynchronous purge jemalloc dirty pages (apache#28655)
jemallctl purge all arena dirty pages may take several seconds, which will block memory GC and cause OOM. So purge asynchronously in a thread.
1 parent ed7b288 commit 07eb1d0

File tree

4 files changed

+35
-4
lines changed

4 files changed

+35
-4
lines changed

be/src/common/daemon.cpp

+18
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,21 @@ void Daemon::block_spill_gc_thread() {
352352
}
353353
}
354354

355+
void Daemon::je_purge_dirty_pages_thread() const {
356+
do {
357+
std::unique_lock<std::mutex> l(doris::MemInfo::je_purge_dirty_pages_lock);
358+
while (_stop_background_threads_latch.count() != 0 &&
359+
!doris::MemInfo::je_purge_dirty_pages_notify.load(std::memory_order_relaxed)) {
360+
doris::MemInfo::je_purge_dirty_pages_cv.wait_for(l, std::chrono::seconds(1));
361+
}
362+
if (_stop_background_threads_latch.count() == 0) {
363+
break;
364+
}
365+
doris::MemInfo::je_purge_all_arena_dirty_pages();
366+
doris::MemInfo::je_purge_dirty_pages_notify.store(false, std::memory_order_relaxed);
367+
} while (true);
368+
}
369+
355370
void Daemon::start() {
356371
Status st;
357372
st = Thread::create(
@@ -381,6 +396,9 @@ void Daemon::start() {
381396
st = Thread::create(
382397
"Daemon", "block_spill_gc_thread", [this]() { this->block_spill_gc_thread(); },
383398
&_threads.emplace_back());
399+
st = Thread::create(
400+
"Daemon", "je_purge_dirty_pages_thread",
401+
[this]() { this->je_purge_dirty_pages_thread(); }, &_threads.emplace_back());
384402
CHECK(st.ok()) << st;
385403
}
386404

be/src/common/daemon.h

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class Daemon {
4343
void memtable_memory_limiter_tracker_refresh_thread();
4444
void calculate_metrics_thread();
4545
void block_spill_gc_thread();
46+
void je_purge_dirty_pages_thread() const;
4647

4748
CountDownLatch _stop_background_threads_latch;
4849
std::vector<scoped_refptr<Thread>> _threads;

be/src/util/mem_info.cpp

+7-4
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ int64_t MemInfo::_s_sys_mem_available_low_water_mark = -1;
8080
int64_t MemInfo::_s_sys_mem_available_warning_water_mark = -1;
8181
int64_t MemInfo::_s_process_minor_gc_size = -1;
8282
int64_t MemInfo::_s_process_full_gc_size = -1;
83+
std::mutex MemInfo::je_purge_dirty_pages_lock;
84+
std::condition_variable MemInfo::je_purge_dirty_pages_cv;
85+
std::atomic<bool> MemInfo::je_purge_dirty_pages_notify {false};
8386

8487
void MemInfo::refresh_allocator_mem() {
8588
#if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) || defined(THREAD_SANITIZER)
@@ -129,7 +132,7 @@ bool MemInfo::process_minor_gc() {
129132
std::string pre_sys_mem_available = MemInfo::sys_mem_available_str();
130133

131134
Defer defer {[&]() {
132-
je_purge_all_arena_dirty_pages();
135+
notify_je_purge_dirty_pages();
133136
std::stringstream ss;
134137
profile->pretty_print(&ss);
135138
LOG(INFO) << fmt::format(
@@ -139,7 +142,7 @@ bool MemInfo::process_minor_gc() {
139142
}};
140143

141144
freed_mem += CacheManager::instance()->for_each_cache_prune_stale(profile.get());
142-
je_purge_all_arena_dirty_pages();
145+
notify_je_purge_dirty_pages();
143146
if (freed_mem > _s_process_minor_gc_size) {
144147
return true;
145148
}
@@ -180,7 +183,7 @@ bool MemInfo::process_full_gc() {
180183
std::string pre_sys_mem_available = MemInfo::sys_mem_available_str();
181184

182185
Defer defer {[&]() {
183-
je_purge_all_arena_dirty_pages();
186+
notify_je_purge_dirty_pages();
184187
std::stringstream ss;
185188
profile->pretty_print(&ss);
186189
LOG(INFO) << fmt::format(
@@ -190,7 +193,7 @@ bool MemInfo::process_full_gc() {
190193
}};
191194

192195
freed_mem += CacheManager::instance()->for_each_cache_prune_all(profile.get());
193-
je_purge_all_arena_dirty_pages();
196+
notify_je_purge_dirty_pages();
194197
if (freed_mem > _s_process_full_gc_size) {
195198
return true;
196199
}

be/src/util/mem_info.h

+9
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <stdint.h>
2525

2626
#include <atomic>
27+
#include <condition_variable>
2728
#include <string>
2829

2930
#if !defined(__APPLE__) || !defined(_POSIX_C_SOURCE)
@@ -127,6 +128,14 @@ class MemInfo {
127128
#endif
128129
}
129130

131+
static std::mutex je_purge_dirty_pages_lock;
132+
static std::condition_variable je_purge_dirty_pages_cv;
133+
static std::atomic<bool> je_purge_dirty_pages_notify;
134+
static void notify_je_purge_dirty_pages() {
135+
je_purge_dirty_pages_notify.store(true, std::memory_order_relaxed);
136+
je_purge_dirty_pages_cv.notify_all();
137+
}
138+
130139
static inline size_t allocator_virtual_mem() {
131140
return _s_virtual_memory_used.load(std::memory_order_relaxed);
132141
}

0 commit comments

Comments
 (0)