diff --git a/include/envoy/server/overload_manager.h b/include/envoy/server/overload_manager.h index 068ee8e36fff..7f8e08076651 100644 --- a/include/envoy/server/overload_manager.h +++ b/include/envoy/server/overload_manager.h @@ -1,6 +1,9 @@ #pragma once +#include + #include "envoy/common/pure.h" +#include "envoy/thread_local/thread_local.h" namespace Envoy { namespace Server { @@ -21,6 +24,32 @@ enum class OverloadActionState { */ typedef std::function OverloadActionCb; +/** + * Thread-local copy of the state of each configured overload action. + */ +class ThreadLocalOverloadState : public ThreadLocal::ThreadLocalObject { +public: + const OverloadActionState& getState(const std::string& action) { + auto it = actions_.find(action); + if (it == actions_.end()) { + it = actions_.insert(std::make_pair(action, OverloadActionState::Inactive)).first; + } + return it->second; + } + + void setState(const std::string& action, OverloadActionState state) { + auto it = actions_.find(action); + if (it == actions_.end()) { + actions_[action] = state; + } else { + it->second = state; + } + } + +private: + std::unordered_map actions_; +}; + /** * The OverloadManager protects the Envoy instance from being overwhelmed by client * requests. It monitors a set of resources and notifies registered listeners if @@ -46,6 +75,12 @@ class OverloadManager { */ virtual void registerForAction(const std::string& action, Event::Dispatcher& dispatcher, OverloadActionCb callback) PURE; + + /** + * Get the thread-local overload action states. Lookups in this object can be used as + * an alternative to registering a callback for overload action state changes. + */ + virtual ThreadLocalOverloadState& getThreadLocalOverloadState() PURE; }; } // namespace Server diff --git a/source/server/BUILD b/source/server/BUILD index 648c17a4e0f4..e66a51cdf3ce 100644 --- a/source/server/BUILD +++ b/source/server/BUILD @@ -167,6 +167,7 @@ envoy_cc_library( deps = [ "//include/envoy/server:overload_manager_interface", "//include/envoy/stats:stats_interface", + "//include/envoy/thread_local:thread_local_interface", "//source/common/common:logger_lib", "//source/common/config:utility_lib", "//source/server:resource_monitor_config_lib", diff --git a/source/server/overload_manager_impl.cc b/source/server/overload_manager_impl.cc index 1a10199e0acf..bfed61f32557 100644 --- a/source/server/overload_manager_impl.cc +++ b/source/server/overload_manager_impl.cc @@ -84,8 +84,9 @@ bool OverloadAction::isActive() const { return !fired_triggers_.empty(); } OverloadManagerImpl::OverloadManagerImpl( Event::Dispatcher& dispatcher, Stats::Scope& stats_scope, + ThreadLocal::SlotAllocator& slot_allocator, const envoy::config::overload::v2alpha::OverloadManager& config) - : started_(false), dispatcher_(dispatcher), + : started_(false), dispatcher_(dispatcher), tls_(slot_allocator.allocateSlot()), refresh_interval_( std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, refresh_interval, 1000))) { Configuration::ResourceMonitorFactoryContextImpl context(dispatcher); @@ -125,6 +126,10 @@ OverloadManagerImpl::OverloadManagerImpl( resource_to_actions_.insert(std::make_pair(resource, name)); } } + + tls_->set([](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { + return std::make_shared(); + }); } void OverloadManagerImpl::start() { @@ -157,6 +162,10 @@ void OverloadManagerImpl::registerForAction(const std::string& action, std::forward_as_tuple(dispatcher, callback)); } +ThreadLocalOverloadState& OverloadManagerImpl::getThreadLocalOverloadState() { + return tls_->getTyped(); +} + void OverloadManagerImpl::updateResourcePressure(const std::string& resource, double pressure) { auto action_range = resource_to_actions_.equal_range(resource); std::for_each(action_range.first, action_range.second, @@ -170,6 +179,9 @@ void OverloadManagerImpl::updateResourcePressure(const std::string& resource, do is_active ? OverloadActionState::Active : OverloadActionState::Inactive; ENVOY_LOG(info, "Overload action {} has become {}", action, is_active ? "active" : "inactive"); + tls_->runOnAllThreads([this, action, state] { + tls_->getTyped().setState(action, state); + }); auto callback_range = action_to_callbacks_.equal_range(action); std::for_each(callback_range.first, callback_range.second, [&](ActionToCallbackMap::value_type& cb_entry) { diff --git a/source/server/overload_manager_impl.h b/source/server/overload_manager_impl.h index 32f1076747e8..e419ef4f03e8 100644 --- a/source/server/overload_manager_impl.h +++ b/source/server/overload_manager_impl.h @@ -11,6 +11,7 @@ #include "envoy/server/resource_monitor.h" #include "envoy/stats/scope.h" #include "envoy/stats/stats.h" +#include "envoy/thread_local/thread_local.h" #include "common/common/logger.h" @@ -50,12 +51,14 @@ class OverloadAction { class OverloadManagerImpl : Logger::Loggable, public OverloadManager { public: OverloadManagerImpl(Event::Dispatcher& dispatcher, Stats::Scope& stats_scope, + ThreadLocal::SlotAllocator& slot_allocator, const envoy::config::overload::v2alpha::OverloadManager& config); // Server::OverloadManager void start() override; void registerForAction(const std::string& action, Event::Dispatcher& dispatcher, OverloadActionCb callback) override; + ThreadLocalOverloadState& getThreadLocalOverloadState() override; private: class Resource : public ResourceMonitor::Callbacks { @@ -90,6 +93,7 @@ class OverloadManagerImpl : Logger::Loggable, public OverloadM bool started_; Event::Dispatcher& dispatcher_; + ThreadLocal::SlotPtr tls_; const std::chrono::milliseconds refresh_interval_; Event::TimerPtr timer_; std::unordered_map resources_; diff --git a/source/server/server.cc b/source/server/server.cc index 447362b148e1..cdd3d605a64f 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -247,10 +247,6 @@ void InstanceImpl::initialize(Options& options, loadServerFlags(initial_config.flagsPath()); - // Initialize the overload manager early so other modules can register for actions. - overload_manager_.reset( - new OverloadManagerImpl(dispatcher(), stats(), bootstrap_.overload_manager())); - // Workers get created first so they register for thread local updates. listener_manager_.reset(new ListenerManagerImpl( *this, listener_component_factory_, worker_factory_, ProdSystemTimeSource::instance_)); @@ -259,6 +255,10 @@ void InstanceImpl::initialize(Options& options, // whether it runs on the main thread or on workers can still use TLS. thread_local_.registerThread(*dispatcher_, true); + // Initialize the overload manager early so other modules can register for actions. + overload_manager_.reset( + new OverloadManagerImpl(dispatcher(), stats(), threadLocal(), bootstrap_.overload_manager())); + // We can now initialize stats for threading. stats_store_.initializeThreading(*dispatcher_, thread_local_); diff --git a/test/mocks/server/mocks.h b/test/mocks/server/mocks.h index 61b28be98c0a..b43691cbd2b1 100644 --- a/test/mocks/server/mocks.h +++ b/test/mocks/server/mocks.h @@ -280,6 +280,7 @@ class MockOverloadManager : public OverloadManager { MOCK_METHOD0(start, void()); MOCK_METHOD3(registerForAction, void(const std::string& action, Event::Dispatcher& dispatcher, OverloadActionCb callback)); + MOCK_METHOD0(getThreadLocalOverloadState, ThreadLocalOverloadState&()); }; class MockInstance : public Instance { diff --git a/test/server/BUILD b/test/server/BUILD index db7f50b52471..064fe8fa72ef 100644 --- a/test/server/BUILD +++ b/test/server/BUILD @@ -122,6 +122,7 @@ envoy_cc_test( "//source/extensions/resource_monitors/common:factory_base_lib", "//source/server:overload_manager_lib", "//test/mocks/event:event_mocks", + "//test/mocks/thread_local:thread_local_mocks", "//test/test_common:registry_lib", "//test/test_common:utility_lib", ], diff --git a/test/server/overload_manager_impl_test.cc b/test/server/overload_manager_impl_test.cc index 5012879a0f9a..3d3b5f37cabc 100644 --- a/test/server/overload_manager_impl_test.cc +++ b/test/server/overload_manager_impl_test.cc @@ -8,6 +8,7 @@ #include "extensions/resource_monitors/common/factory_base.h" #include "test/mocks/event/mocks.h" +#include "test/mocks/thread_local/mocks.h" #include "test/test_common/registry.h" #include "test/test_common/utility.h" @@ -118,39 +119,48 @@ class OverloadManagerImplTest : public testing::Test { )EOF"; } + std::unique_ptr createOverloadManager(const std::string& config) { + return std::make_unique(dispatcher_, stats_, thread_local_, + parseConfig(config)); + } + FakeResourceMonitorFactory factory1_; FakeResourceMonitorFactory factory2_; Registry::InjectFactory register_factory1_; Registry::InjectFactory register_factory2_; NiceMock dispatcher_; Stats::IsolatedStoreImpl stats_; + NiceMock thread_local_; Event::TimerCb timer_cb_; }; TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) { setDispatcherExpectation(); - OverloadManagerImpl manager(dispatcher_, stats_, parseConfig(getConfig())); + auto manager(createOverloadManager(getConfig())); bool is_active = false; int cb_count = 0; - manager.registerForAction("envoy.overload_actions.dummy_action", dispatcher_, - [&](OverloadActionState state) { - is_active = state == OverloadActionState::Active; - cb_count++; - }); - manager.registerForAction("envoy.overload_actions.unknown_action", dispatcher_, - [&](OverloadActionState) { EXPECT_TRUE(false); }); - manager.start(); + manager->registerForAction("envoy.overload_actions.dummy_action", dispatcher_, + [&](OverloadActionState state) { + is_active = state == OverloadActionState::Active; + cb_count++; + }); + manager->registerForAction("envoy.overload_actions.unknown_action", dispatcher_, + [&](OverloadActionState) { EXPECT_TRUE(false); }); + manager->start(); Stats::Gauge& active_gauge = stats_.gauge("overload.envoy.overload_actions.dummy_action.active"); Stats::Gauge& pressure_gauge1 = stats_.gauge("overload.envoy.resource_monitors.fake_resource1.pressure"); Stats::Gauge& pressure_gauge2 = stats_.gauge("overload.envoy.resource_monitors.fake_resource2.pressure"); + const OverloadActionState& action_state = + manager->getThreadLocalOverloadState().getState("envoy.overload_actions.dummy_action"); factory1_.monitor_->setPressure(0.5); timer_cb_(); EXPECT_FALSE(is_active); + EXPECT_EQ(action_state, OverloadActionState::Inactive); EXPECT_EQ(0, cb_count); EXPECT_EQ(0, active_gauge.value()); EXPECT_EQ(50, pressure_gauge1.value()); @@ -158,6 +168,7 @@ TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) { factory1_.monitor_->setPressure(0.95); timer_cb_(); EXPECT_TRUE(is_active); + EXPECT_EQ(action_state, OverloadActionState::Active); EXPECT_EQ(1, cb_count); EXPECT_EQ(1, active_gauge.value()); EXPECT_EQ(95, pressure_gauge1.value()); @@ -166,6 +177,7 @@ TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) { factory1_.monitor_->setPressure(0.94); timer_cb_(); EXPECT_TRUE(is_active); + EXPECT_EQ(action_state, OverloadActionState::Active); EXPECT_EQ(1, cb_count); EXPECT_EQ(94, pressure_gauge1.value()); @@ -174,6 +186,7 @@ TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) { factory2_.monitor_->setPressure(0.9); timer_cb_(); EXPECT_TRUE(is_active); + EXPECT_EQ(action_state, OverloadActionState::Active); EXPECT_EQ(1, cb_count); EXPECT_EQ(50, pressure_gauge1.value()); EXPECT_EQ(90, pressure_gauge2.value()); @@ -181,6 +194,7 @@ TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) { factory2_.monitor_->setPressure(0.4); timer_cb_(); EXPECT_FALSE(is_active); + EXPECT_EQ(action_state, OverloadActionState::Inactive); EXPECT_EQ(2, cb_count); EXPECT_EQ(0, active_gauge.value()); EXPECT_EQ(40, pressure_gauge2.value()); @@ -188,8 +202,8 @@ TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) { TEST_F(OverloadManagerImplTest, FailedUpdates) { setDispatcherExpectation(); - OverloadManagerImpl manager(dispatcher_, stats_, parseConfig(getConfig())); - manager.start(); + auto manager(createOverloadManager(getConfig())); + manager->start(); Stats::Counter& failed_updates = stats_.counter("overload.envoy.resource_monitors.fake_resource1.failed_updates"); @@ -207,8 +221,8 @@ TEST_F(OverloadManagerImplTest, SkippedUpdates) { Event::PostCb post_cb; ON_CALL(dispatcher_, post(_)).WillByDefault(Invoke([&](Event::PostCb cb) { post_cb = cb; })); - OverloadManagerImpl manager(dispatcher_, stats_, parseConfig(getConfig())); - manager.start(); + auto manager(createOverloadManager(getConfig())); + manager->start(); Stats::Counter& skipped_updates = stats_.counter("overload.envoy.resource_monitors.fake_resource1.skipped_updates"); @@ -233,8 +247,8 @@ TEST_F(OverloadManagerImplTest, DuplicateResourceMonitor) { } )EOF"; - EXPECT_THROW_WITH_REGEX(OverloadManagerImpl(dispatcher_, stats_, parseConfig(config)), - EnvoyException, "Duplicate resource monitor .*"); + EXPECT_THROW_WITH_REGEX(createOverloadManager(config), EnvoyException, + "Duplicate resource monitor .*"); } TEST_F(OverloadManagerImplTest, DuplicateOverloadAction) { @@ -247,8 +261,8 @@ TEST_F(OverloadManagerImplTest, DuplicateOverloadAction) { } )EOF"; - EXPECT_THROW_WITH_REGEX(OverloadManagerImpl(dispatcher_, stats_, parseConfig(config)), - EnvoyException, "Duplicate overload action .*"); + EXPECT_THROW_WITH_REGEX(createOverloadManager(config), EnvoyException, + "Duplicate overload action .*"); } TEST_F(OverloadManagerImplTest, UnknownTrigger) { @@ -264,8 +278,8 @@ TEST_F(OverloadManagerImplTest, UnknownTrigger) { } )EOF"; - EXPECT_THROW_WITH_REGEX(OverloadManagerImpl(dispatcher_, stats_, parseConfig(config)), - EnvoyException, "Unknown trigger resource .*"); + EXPECT_THROW_WITH_REGEX(createOverloadManager(config), EnvoyException, + "Unknown trigger resource .*"); } TEST_F(OverloadManagerImplTest, DuplicateTrigger) { @@ -290,8 +304,7 @@ TEST_F(OverloadManagerImplTest, DuplicateTrigger) { } )EOF"; - EXPECT_THROW_WITH_REGEX(OverloadManagerImpl(dispatcher_, stats_, parseConfig(config)), - EnvoyException, "Duplicate trigger .*"); + EXPECT_THROW_WITH_REGEX(createOverloadManager(config), EnvoyException, "Duplicate trigger .*"); } } // namespace } // namespace Server