diff --git a/src/yb/consensus/log_cache-test.cc b/src/yb/consensus/log_cache-test.cc index fc025bf144e0..63ae461d3a3b 100644 --- a/src/yb/consensus/log_cache-test.cc +++ b/src/yb/consensus/log_cache-test.cc @@ -60,6 +60,7 @@ using std::thread; DECLARE_int32(log_cache_size_limit_mb); DECLARE_int32(global_log_cache_size_limit_mb); +DECLARE_int32(global_log_cache_size_limit_percentage); METRIC_DECLARE_entity(tablet); @@ -306,11 +307,12 @@ TEST_F(LogCacheTest, TestMemoryLimit) { ASSERT_EQ(cache_->BytesUsed(), 0); } -TEST_F(LogCacheTest, TestGlobalMemoryLimit) { - FLAGS_global_log_cache_size_limit_mb = 4; +TEST_F(LogCacheTest, TestGlobalMemoryLimitMB) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_global_log_cache_size_limit_mb) = 4; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_global_log_cache_size_limit_percentage) = 100; CloseAndReopenCache(MinimumOpId()); - // Exceed the global hard limit. + // Consume all but 1 MB of cache space. ScopedTrackedConsumption consumption(cache_->parent_tracker_, 3_MB); const int kPayloadSize = 768_KB; @@ -323,6 +325,26 @@ TEST_F(LogCacheTest, TestGlobalMemoryLimit) { ASSERT_LE(cache_->BytesUsed(), 1_MB); } +TEST_F(LogCacheTest, TestGlobalMemoryLimitPercentage) { + FLAGS_global_log_cache_size_limit_mb = INT32_MAX; + FLAGS_global_log_cache_size_limit_percentage = 5; + const int64_t root_mem_limit = MemTracker::GetRootTracker()->limit(); + + CloseAndReopenCache(MinimumOpId()); + + // Consume all but 1 MB of cache space. + ScopedTrackedConsumption consumption(cache_->parent_tracker_, root_mem_limit * 0.05 - 1_MB); + + const int kPayloadSize = 768_KB; + + // Should succeed, but only end up caching one of the two ops because of the global limit. + ASSERT_OK(AppendReplicateMessagesToCache(1, 2, kPayloadSize)); + ASSERT_OK(log_->WaitUntilAllFlushed()); + + ASSERT_EQ(1, cache_->num_cached_ops()); + ASSERT_LE(cache_->BytesUsed(), 1_MB); +} + // Test that the log cache properly replaces messages when an index // is reused. This is a regression test for a bug where the memtracker's // consumption wasn't properly managed when messages were replaced. diff --git a/src/yb/consensus/log_cache.cc b/src/yb/consensus/log_cache.cc index 4d8a3775964b..0dc6f3986fe2 100644 --- a/src/yb/consensus/log_cache.cc +++ b/src/yb/consensus/log_cache.cc @@ -73,6 +73,11 @@ DEFINE_int32(global_log_cache_size_limit_mb, 1024, "caching log entries across all tablets is kept under this threshold."); TAG_FLAG(global_log_cache_size_limit_mb, advanced); +DEFINE_int32(global_log_cache_size_limit_percentage, 5, + "The maximum percentage of root process memory that can be used for caching log " + "entries across all tablets. Default is 5."); +TAG_FLAG(global_log_cache_size_limit_percentage, advanced); + DEFINE_test_flag(bool, log_cache_skip_eviction, false, "Don't evict log entries in tests."); @@ -132,7 +137,17 @@ LogCache::LogCache(const scoped_refptr& metric_entity, } MemTrackerPtr LogCache::GetServerMemTracker(const MemTrackerPtr& server_tracker) { - const int64_t global_max_ops_size_bytes = FLAGS_global_log_cache_size_limit_mb * 1_MB; + CHECK(FLAGS_global_log_cache_size_limit_percentage > 0 && + FLAGS_global_log_cache_size_limit_percentage <= 100) + << Substitute("Flag FLAGS_global_log_cache_size_limit_percentage must be between 0 and 100. ", + "Current value: $0", + FLAGS_global_log_cache_size_limit_percentage); + + int64_t global_max_ops_size_bytes = FLAGS_global_log_cache_size_limit_mb * 1_MB; + int64_t root_mem_limit = MemTracker::GetRootTracker()->limit(); + global_max_ops_size_bytes = std::min( + global_max_ops_size_bytes, + root_mem_limit * FLAGS_global_log_cache_size_limit_percentage / 100); return MemTracker::FindOrCreateTracker( global_max_ops_size_bytes, kParentMemTrackerId, server_tracker); } diff --git a/src/yb/consensus/log_cache.h b/src/yb/consensus/log_cache.h index a3dfe7a57dca..38b71e77f055 100644 --- a/src/yb/consensus/log_cache.h +++ b/src/yb/consensus/log_cache.h @@ -173,7 +173,8 @@ class LogCache { private: FRIEND_TEST(LogCacheTest, TestAppendAndGetMessages); - FRIEND_TEST(LogCacheTest, TestGlobalMemoryLimit); + FRIEND_TEST(LogCacheTest, TestGlobalMemoryLimitMB); + FRIEND_TEST(LogCacheTest, TestGlobalMemoryLimitPercentage); FRIEND_TEST(LogCacheTest, TestReplaceMessages); friend class LogCacheTest; diff --git a/src/yb/tserver/tablet_memory_manager.cc b/src/yb/tserver/tablet_memory_manager.cc index 19606c36d39a..0a93c5434b91 100644 --- a/src/yb/tserver/tablet_memory_manager.cc +++ b/src/yb/tserver/tablet_memory_manager.cc @@ -210,7 +210,6 @@ void TabletMemoryManager::InitLogCacheGC() { void TabletMemoryManager::ConfigureBackgroundTask(tablet::TabletOptions* options) { // Calculate memstore_size_bytes based on total RAM available and global percentage. - bool should_count_memory = FLAGS_global_memstore_size_percentage > 0; CHECK(FLAGS_global_memstore_size_percentage > 0 && FLAGS_global_memstore_size_percentage <= 100) << Substitute( "Flag tablet_block_cache_size_percentage must be between 0 and 100. Current value: " @@ -226,16 +225,15 @@ void TabletMemoryManager::ConfigureBackgroundTask(tablet::TabletOptions* options // Add memory monitor and background thread for flushing. // TODO(zhaoalex): replace task with Poller - if (should_count_memory) { - background_task_.reset(new BackgroundTask( - std::function([this]() { FlushTabletIfLimitExceeded(); }), - "tablet manager", - "flush scheduler bgtask")); - options->memory_monitor = std::make_shared( - memstore_size_bytes, - std::function([this](){ - YB_WARN_NOT_OK(background_task_->Wake(), "Wakeup error"); })); - } + background_task_.reset(new BackgroundTask( + std::function([this]() { FlushTabletIfLimitExceeded(); }), + "tablet manager", + "flush scheduler bgtask")); + options->memory_monitor = std::make_shared( + memstore_size_bytes, + std::function([this](){ + YB_WARN_NOT_OK(background_task_->Wake(), "Wakeup error"); })); + // Must assign memory_monitor_ after configuring the background task. memory_monitor_ = options->memory_monitor; }