From 5fe4e14f011969061bc4f9b4bea8f83e134b9b93 Mon Sep 17 00:00:00 2001 From: Elisha Ziskind Date: Sat, 11 Aug 2018 21:15:54 -0400 Subject: [PATCH] Add thread local cache of overload action states (#4090) Lookups in this cache can be used as an alternative to registering a callback for overload action state changes. Useful for objects (like the http connection manager) with dynamic lifetimes that are created after envoy initialization - currently callback registration must be done during initialization and there isn't support for unregistration. Those restrictions could be relaxed if we need to in the future but for now this keeps things simpler. For issue #373. Risk Level: low Testing: unit tests Signed-off-by: Elisha Ziskind --- include/envoy/server/overload_manager.h | 35 +++++++++++++++ source/server/BUILD | 1 + source/server/overload_manager_impl.cc | 14 +++++- source/server/overload_manager_impl.h | 4 ++ source/server/server.cc | 8 ++-- test/mocks/server/mocks.h | 1 + test/server/BUILD | 1 + test/server/overload_manager_impl_test.cc | 55 ++++++++++++++--------- 8 files changed, 93 insertions(+), 26 deletions(-) diff --git a/include/envoy/server/overload_manager.h b/include/envoy/server/overload_manager.h index 068ee8e36fff..7f8e08076651 100644 --- a/include/envoy/server/overload_manager.h +++ b/include/envoy/server/overload_manager.h @@ -1,6 +1,9 @@ #pragma once +#include + #include "envoy/common/pure.h" +#include "envoy/thread_local/thread_local.h" namespace Envoy { namespace Server { @@ -21,6 +24,32 @@ enum class OverloadActionState { */ typedef std::function OverloadActionCb; +/** + * Thread-local copy of the state of each configured overload action. + */ +class ThreadLocalOverloadState : public ThreadLocal::ThreadLocalObject { +public: + const OverloadActionState& getState(const std::string& action) { + auto it = actions_.find(action); + if (it == actions_.end()) { + it = actions_.insert(std::make_pair(action, OverloadActionState::Inactive)).first; + } + return it->second; + } + + void setState(const std::string& action, OverloadActionState state) { + auto it = actions_.find(action); + if (it == actions_.end()) { + actions_[action] = state; + } else { + it->second = state; + } + } + +private: + std::unordered_map actions_; +}; + /** * The OverloadManager protects the Envoy instance from being overwhelmed by client * requests. It monitors a set of resources and notifies registered listeners if @@ -46,6 +75,12 @@ class OverloadManager { */ virtual void registerForAction(const std::string& action, Event::Dispatcher& dispatcher, OverloadActionCb callback) PURE; + + /** + * Get the thread-local overload action states. Lookups in this object can be used as + * an alternative to registering a callback for overload action state changes. + */ + virtual ThreadLocalOverloadState& getThreadLocalOverloadState() PURE; }; } // namespace Server diff --git a/source/server/BUILD b/source/server/BUILD index 648c17a4e0f4..e66a51cdf3ce 100644 --- a/source/server/BUILD +++ b/source/server/BUILD @@ -167,6 +167,7 @@ envoy_cc_library( deps = [ "//include/envoy/server:overload_manager_interface", "//include/envoy/stats:stats_interface", + "//include/envoy/thread_local:thread_local_interface", "//source/common/common:logger_lib", "//source/common/config:utility_lib", "//source/server:resource_monitor_config_lib", diff --git a/source/server/overload_manager_impl.cc b/source/server/overload_manager_impl.cc index 1a10199e0acf..bfed61f32557 100644 --- a/source/server/overload_manager_impl.cc +++ b/source/server/overload_manager_impl.cc @@ -84,8 +84,9 @@ bool OverloadAction::isActive() const { return !fired_triggers_.empty(); } OverloadManagerImpl::OverloadManagerImpl( Event::Dispatcher& dispatcher, Stats::Scope& stats_scope, + ThreadLocal::SlotAllocator& slot_allocator, const envoy::config::overload::v2alpha::OverloadManager& config) - : started_(false), dispatcher_(dispatcher), + : started_(false), dispatcher_(dispatcher), tls_(slot_allocator.allocateSlot()), refresh_interval_( std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, refresh_interval, 1000))) { Configuration::ResourceMonitorFactoryContextImpl context(dispatcher); @@ -125,6 +126,10 @@ OverloadManagerImpl::OverloadManagerImpl( resource_to_actions_.insert(std::make_pair(resource, name)); } } + + tls_->set([](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { + return std::make_shared(); + }); } void OverloadManagerImpl::start() { @@ -157,6 +162,10 @@ void OverloadManagerImpl::registerForAction(const std::string& action, std::forward_as_tuple(dispatcher, callback)); } +ThreadLocalOverloadState& OverloadManagerImpl::getThreadLocalOverloadState() { + return tls_->getTyped(); +} + void OverloadManagerImpl::updateResourcePressure(const std::string& resource, double pressure) { auto action_range = resource_to_actions_.equal_range(resource); std::for_each(action_range.first, action_range.second, @@ -170,6 +179,9 @@ void OverloadManagerImpl::updateResourcePressure(const std::string& resource, do is_active ? OverloadActionState::Active : OverloadActionState::Inactive; ENVOY_LOG(info, "Overload action {} has become {}", action, is_active ? "active" : "inactive"); + tls_->runOnAllThreads([this, action, state] { + tls_->getTyped().setState(action, state); + }); auto callback_range = action_to_callbacks_.equal_range(action); std::for_each(callback_range.first, callback_range.second, [&](ActionToCallbackMap::value_type& cb_entry) { diff --git a/source/server/overload_manager_impl.h b/source/server/overload_manager_impl.h index 32f1076747e8..e419ef4f03e8 100644 --- a/source/server/overload_manager_impl.h +++ b/source/server/overload_manager_impl.h @@ -11,6 +11,7 @@ #include "envoy/server/resource_monitor.h" #include "envoy/stats/scope.h" #include "envoy/stats/stats.h" +#include "envoy/thread_local/thread_local.h" #include "common/common/logger.h" @@ -50,12 +51,14 @@ class OverloadAction { class OverloadManagerImpl : Logger::Loggable, public OverloadManager { public: OverloadManagerImpl(Event::Dispatcher& dispatcher, Stats::Scope& stats_scope, + ThreadLocal::SlotAllocator& slot_allocator, const envoy::config::overload::v2alpha::OverloadManager& config); // Server::OverloadManager void start() override; void registerForAction(const std::string& action, Event::Dispatcher& dispatcher, OverloadActionCb callback) override; + ThreadLocalOverloadState& getThreadLocalOverloadState() override; private: class Resource : public ResourceMonitor::Callbacks { @@ -90,6 +93,7 @@ class OverloadManagerImpl : Logger::Loggable, public OverloadM bool started_; Event::Dispatcher& dispatcher_; + ThreadLocal::SlotPtr tls_; const std::chrono::milliseconds refresh_interval_; Event::TimerPtr timer_; std::unordered_map resources_; diff --git a/source/server/server.cc b/source/server/server.cc index 447362b148e1..cdd3d605a64f 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -247,10 +247,6 @@ void InstanceImpl::initialize(Options& options, loadServerFlags(initial_config.flagsPath()); - // Initialize the overload manager early so other modules can register for actions. - overload_manager_.reset( - new OverloadManagerImpl(dispatcher(), stats(), bootstrap_.overload_manager())); - // Workers get created first so they register for thread local updates. listener_manager_.reset(new ListenerManagerImpl( *this, listener_component_factory_, worker_factory_, ProdSystemTimeSource::instance_)); @@ -259,6 +255,10 @@ void InstanceImpl::initialize(Options& options, // whether it runs on the main thread or on workers can still use TLS. thread_local_.registerThread(*dispatcher_, true); + // Initialize the overload manager early so other modules can register for actions. + overload_manager_.reset( + new OverloadManagerImpl(dispatcher(), stats(), threadLocal(), bootstrap_.overload_manager())); + // We can now initialize stats for threading. stats_store_.initializeThreading(*dispatcher_, thread_local_); diff --git a/test/mocks/server/mocks.h b/test/mocks/server/mocks.h index 61b28be98c0a..b43691cbd2b1 100644 --- a/test/mocks/server/mocks.h +++ b/test/mocks/server/mocks.h @@ -280,6 +280,7 @@ class MockOverloadManager : public OverloadManager { MOCK_METHOD0(start, void()); MOCK_METHOD3(registerForAction, void(const std::string& action, Event::Dispatcher& dispatcher, OverloadActionCb callback)); + MOCK_METHOD0(getThreadLocalOverloadState, ThreadLocalOverloadState&()); }; class MockInstance : public Instance { diff --git a/test/server/BUILD b/test/server/BUILD index db7f50b52471..064fe8fa72ef 100644 --- a/test/server/BUILD +++ b/test/server/BUILD @@ -122,6 +122,7 @@ envoy_cc_test( "//source/extensions/resource_monitors/common:factory_base_lib", "//source/server:overload_manager_lib", "//test/mocks/event:event_mocks", + "//test/mocks/thread_local:thread_local_mocks", "//test/test_common:registry_lib", "//test/test_common:utility_lib", ], diff --git a/test/server/overload_manager_impl_test.cc b/test/server/overload_manager_impl_test.cc index 5012879a0f9a..3d3b5f37cabc 100644 --- a/test/server/overload_manager_impl_test.cc +++ b/test/server/overload_manager_impl_test.cc @@ -8,6 +8,7 @@ #include "extensions/resource_monitors/common/factory_base.h" #include "test/mocks/event/mocks.h" +#include "test/mocks/thread_local/mocks.h" #include "test/test_common/registry.h" #include "test/test_common/utility.h" @@ -118,39 +119,48 @@ class OverloadManagerImplTest : public testing::Test { )EOF"; } + std::unique_ptr createOverloadManager(const std::string& config) { + return std::make_unique(dispatcher_, stats_, thread_local_, + parseConfig(config)); + } + FakeResourceMonitorFactory factory1_; FakeResourceMonitorFactory factory2_; Registry::InjectFactory register_factory1_; Registry::InjectFactory register_factory2_; NiceMock dispatcher_; Stats::IsolatedStoreImpl stats_; + NiceMock thread_local_; Event::TimerCb timer_cb_; }; TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) { setDispatcherExpectation(); - OverloadManagerImpl manager(dispatcher_, stats_, parseConfig(getConfig())); + auto manager(createOverloadManager(getConfig())); bool is_active = false; int cb_count = 0; - manager.registerForAction("envoy.overload_actions.dummy_action", dispatcher_, - [&](OverloadActionState state) { - is_active = state == OverloadActionState::Active; - cb_count++; - }); - manager.registerForAction("envoy.overload_actions.unknown_action", dispatcher_, - [&](OverloadActionState) { EXPECT_TRUE(false); }); - manager.start(); + manager->registerForAction("envoy.overload_actions.dummy_action", dispatcher_, + [&](OverloadActionState state) { + is_active = state == OverloadActionState::Active; + cb_count++; + }); + manager->registerForAction("envoy.overload_actions.unknown_action", dispatcher_, + [&](OverloadActionState) { EXPECT_TRUE(false); }); + manager->start(); Stats::Gauge& active_gauge = stats_.gauge("overload.envoy.overload_actions.dummy_action.active"); Stats::Gauge& pressure_gauge1 = stats_.gauge("overload.envoy.resource_monitors.fake_resource1.pressure"); Stats::Gauge& pressure_gauge2 = stats_.gauge("overload.envoy.resource_monitors.fake_resource2.pressure"); + const OverloadActionState& action_state = + manager->getThreadLocalOverloadState().getState("envoy.overload_actions.dummy_action"); factory1_.monitor_->setPressure(0.5); timer_cb_(); EXPECT_FALSE(is_active); + EXPECT_EQ(action_state, OverloadActionState::Inactive); EXPECT_EQ(0, cb_count); EXPECT_EQ(0, active_gauge.value()); EXPECT_EQ(50, pressure_gauge1.value()); @@ -158,6 +168,7 @@ TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) { factory1_.monitor_->setPressure(0.95); timer_cb_(); EXPECT_TRUE(is_active); + EXPECT_EQ(action_state, OverloadActionState::Active); EXPECT_EQ(1, cb_count); EXPECT_EQ(1, active_gauge.value()); EXPECT_EQ(95, pressure_gauge1.value()); @@ -166,6 +177,7 @@ TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) { factory1_.monitor_->setPressure(0.94); timer_cb_(); EXPECT_TRUE(is_active); + EXPECT_EQ(action_state, OverloadActionState::Active); EXPECT_EQ(1, cb_count); EXPECT_EQ(94, pressure_gauge1.value()); @@ -174,6 +186,7 @@ TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) { factory2_.monitor_->setPressure(0.9); timer_cb_(); EXPECT_TRUE(is_active); + EXPECT_EQ(action_state, OverloadActionState::Active); EXPECT_EQ(1, cb_count); EXPECT_EQ(50, pressure_gauge1.value()); EXPECT_EQ(90, pressure_gauge2.value()); @@ -181,6 +194,7 @@ TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) { factory2_.monitor_->setPressure(0.4); timer_cb_(); EXPECT_FALSE(is_active); + EXPECT_EQ(action_state, OverloadActionState::Inactive); EXPECT_EQ(2, cb_count); EXPECT_EQ(0, active_gauge.value()); EXPECT_EQ(40, pressure_gauge2.value()); @@ -188,8 +202,8 @@ TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) { TEST_F(OverloadManagerImplTest, FailedUpdates) { setDispatcherExpectation(); - OverloadManagerImpl manager(dispatcher_, stats_, parseConfig(getConfig())); - manager.start(); + auto manager(createOverloadManager(getConfig())); + manager->start(); Stats::Counter& failed_updates = stats_.counter("overload.envoy.resource_monitors.fake_resource1.failed_updates"); @@ -207,8 +221,8 @@ TEST_F(OverloadManagerImplTest, SkippedUpdates) { Event::PostCb post_cb; ON_CALL(dispatcher_, post(_)).WillByDefault(Invoke([&](Event::PostCb cb) { post_cb = cb; })); - OverloadManagerImpl manager(dispatcher_, stats_, parseConfig(getConfig())); - manager.start(); + auto manager(createOverloadManager(getConfig())); + manager->start(); Stats::Counter& skipped_updates = stats_.counter("overload.envoy.resource_monitors.fake_resource1.skipped_updates"); @@ -233,8 +247,8 @@ TEST_F(OverloadManagerImplTest, DuplicateResourceMonitor) { } )EOF"; - EXPECT_THROW_WITH_REGEX(OverloadManagerImpl(dispatcher_, stats_, parseConfig(config)), - EnvoyException, "Duplicate resource monitor .*"); + EXPECT_THROW_WITH_REGEX(createOverloadManager(config), EnvoyException, + "Duplicate resource monitor .*"); } TEST_F(OverloadManagerImplTest, DuplicateOverloadAction) { @@ -247,8 +261,8 @@ TEST_F(OverloadManagerImplTest, DuplicateOverloadAction) { } )EOF"; - EXPECT_THROW_WITH_REGEX(OverloadManagerImpl(dispatcher_, stats_, parseConfig(config)), - EnvoyException, "Duplicate overload action .*"); + EXPECT_THROW_WITH_REGEX(createOverloadManager(config), EnvoyException, + "Duplicate overload action .*"); } TEST_F(OverloadManagerImplTest, UnknownTrigger) { @@ -264,8 +278,8 @@ TEST_F(OverloadManagerImplTest, UnknownTrigger) { } )EOF"; - EXPECT_THROW_WITH_REGEX(OverloadManagerImpl(dispatcher_, stats_, parseConfig(config)), - EnvoyException, "Unknown trigger resource .*"); + EXPECT_THROW_WITH_REGEX(createOverloadManager(config), EnvoyException, + "Unknown trigger resource .*"); } TEST_F(OverloadManagerImplTest, DuplicateTrigger) { @@ -290,8 +304,7 @@ TEST_F(OverloadManagerImplTest, DuplicateTrigger) { } )EOF"; - EXPECT_THROW_WITH_REGEX(OverloadManagerImpl(dispatcher_, stats_, parseConfig(config)), - EnvoyException, "Duplicate trigger .*"); + EXPECT_THROW_WITH_REGEX(createOverloadManager(config), EnvoyException, "Duplicate trigger .*"); } } // namespace } // namespace Server