Skip to content

Commit

Permalink
Add thread local cache of overload action states (#4090)
Browse files Browse the repository at this point in the history
Lookups in this cache can be used as an alternative to registering a callback for overload action state changes. Useful for objects (like the http connection manager) with dynamic lifetimes that are created after envoy initialization - currently callback registration must be done during initialization and there isn't support for unregistration. Those restrictions could be relaxed if we need to in the future but for now this keeps things simpler. For issue #373.

Risk Level: low
Testing: unit tests

Signed-off-by: Elisha Ziskind <eziskind@google.com>
  • Loading branch information
eziskind authored and htuch committed Aug 12, 2018
1 parent 3bb7fbc commit 5fe4e14
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 26 deletions.
35 changes: 35 additions & 0 deletions include/envoy/server/overload_manager.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#pragma once

#include <unordered_map>

#include "envoy/common/pure.h"
#include "envoy/thread_local/thread_local.h"

namespace Envoy {
namespace Server {
Expand All @@ -21,6 +24,32 @@ enum class OverloadActionState {
*/
typedef std::function<void(OverloadActionState)> OverloadActionCb;

/**
* Thread-local copy of the state of each configured overload action.
*/
class ThreadLocalOverloadState : public ThreadLocal::ThreadLocalObject {
public:
const OverloadActionState& getState(const std::string& action) {
auto it = actions_.find(action);
if (it == actions_.end()) {
it = actions_.insert(std::make_pair(action, OverloadActionState::Inactive)).first;
}
return it->second;
}

void setState(const std::string& action, OverloadActionState state) {
auto it = actions_.find(action);
if (it == actions_.end()) {
actions_[action] = state;
} else {
it->second = state;
}
}

private:
std::unordered_map<std::string, OverloadActionState> actions_;
};

/**
* The OverloadManager protects the Envoy instance from being overwhelmed by client
* requests. It monitors a set of resources and notifies registered listeners if
Expand All @@ -46,6 +75,12 @@ class OverloadManager {
*/
virtual void registerForAction(const std::string& action, Event::Dispatcher& dispatcher,
OverloadActionCb callback) PURE;

/**
* Get the thread-local overload action states. Lookups in this object can be used as
* an alternative to registering a callback for overload action state changes.
*/
virtual ThreadLocalOverloadState& getThreadLocalOverloadState() PURE;
};

} // namespace Server
Expand Down
1 change: 1 addition & 0 deletions source/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ envoy_cc_library(
deps = [
"//include/envoy/server:overload_manager_interface",
"//include/envoy/stats:stats_interface",
"//include/envoy/thread_local:thread_local_interface",
"//source/common/common:logger_lib",
"//source/common/config:utility_lib",
"//source/server:resource_monitor_config_lib",
Expand Down
14 changes: 13 additions & 1 deletion source/server/overload_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ bool OverloadAction::isActive() const { return !fired_triggers_.empty(); }

OverloadManagerImpl::OverloadManagerImpl(
Event::Dispatcher& dispatcher, Stats::Scope& stats_scope,
ThreadLocal::SlotAllocator& slot_allocator,
const envoy::config::overload::v2alpha::OverloadManager& config)
: started_(false), dispatcher_(dispatcher),
: started_(false), dispatcher_(dispatcher), tls_(slot_allocator.allocateSlot()),
refresh_interval_(
std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, refresh_interval, 1000))) {
Configuration::ResourceMonitorFactoryContextImpl context(dispatcher);
Expand Down Expand Up @@ -125,6 +126,10 @@ OverloadManagerImpl::OverloadManagerImpl(
resource_to_actions_.insert(std::make_pair(resource, name));
}
}

tls_->set([](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr {
return std::make_shared<ThreadLocalOverloadState>();
});
}

void OverloadManagerImpl::start() {
Expand Down Expand Up @@ -157,6 +162,10 @@ void OverloadManagerImpl::registerForAction(const std::string& action,
std::forward_as_tuple(dispatcher, callback));
}

ThreadLocalOverloadState& OverloadManagerImpl::getThreadLocalOverloadState() {
return tls_->getTyped<ThreadLocalOverloadState>();
}

void OverloadManagerImpl::updateResourcePressure(const std::string& resource, double pressure) {
auto action_range = resource_to_actions_.equal_range(resource);
std::for_each(action_range.first, action_range.second,
Expand All @@ -170,6 +179,9 @@ void OverloadManagerImpl::updateResourcePressure(const std::string& resource, do
is_active ? OverloadActionState::Active : OverloadActionState::Inactive;
ENVOY_LOG(info, "Overload action {} has become {}", action,
is_active ? "active" : "inactive");
tls_->runOnAllThreads([this, action, state] {
tls_->getTyped<ThreadLocalOverloadState>().setState(action, state);
});
auto callback_range = action_to_callbacks_.equal_range(action);
std::for_each(callback_range.first, callback_range.second,
[&](ActionToCallbackMap::value_type& cb_entry) {
Expand Down
4 changes: 4 additions & 0 deletions source/server/overload_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "envoy/server/resource_monitor.h"
#include "envoy/stats/scope.h"
#include "envoy/stats/stats.h"
#include "envoy/thread_local/thread_local.h"

#include "common/common/logger.h"

Expand Down Expand Up @@ -50,12 +51,14 @@ class OverloadAction {
class OverloadManagerImpl : Logger::Loggable<Logger::Id::main>, public OverloadManager {
public:
OverloadManagerImpl(Event::Dispatcher& dispatcher, Stats::Scope& stats_scope,
ThreadLocal::SlotAllocator& slot_allocator,
const envoy::config::overload::v2alpha::OverloadManager& config);

// Server::OverloadManager
void start() override;
void registerForAction(const std::string& action, Event::Dispatcher& dispatcher,
OverloadActionCb callback) override;
ThreadLocalOverloadState& getThreadLocalOverloadState() override;

private:
class Resource : public ResourceMonitor::Callbacks {
Expand Down Expand Up @@ -90,6 +93,7 @@ class OverloadManagerImpl : Logger::Loggable<Logger::Id::main>, public OverloadM

bool started_;
Event::Dispatcher& dispatcher_;
ThreadLocal::SlotPtr tls_;
const std::chrono::milliseconds refresh_interval_;
Event::TimerPtr timer_;
std::unordered_map<std::string, Resource> resources_;
Expand Down
8 changes: 4 additions & 4 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,6 @@ void InstanceImpl::initialize(Options& options,

loadServerFlags(initial_config.flagsPath());

// Initialize the overload manager early so other modules can register for actions.
overload_manager_.reset(
new OverloadManagerImpl(dispatcher(), stats(), bootstrap_.overload_manager()));

// Workers get created first so they register for thread local updates.
listener_manager_.reset(new ListenerManagerImpl(
*this, listener_component_factory_, worker_factory_, ProdSystemTimeSource::instance_));
Expand All @@ -259,6 +255,10 @@ void InstanceImpl::initialize(Options& options,
// whether it runs on the main thread or on workers can still use TLS.
thread_local_.registerThread(*dispatcher_, true);

// Initialize the overload manager early so other modules can register for actions.
overload_manager_.reset(
new OverloadManagerImpl(dispatcher(), stats(), threadLocal(), bootstrap_.overload_manager()));

// We can now initialize stats for threading.
stats_store_.initializeThreading(*dispatcher_, thread_local_);

Expand Down
1 change: 1 addition & 0 deletions test/mocks/server/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ class MockOverloadManager : public OverloadManager {
MOCK_METHOD0(start, void());
MOCK_METHOD3(registerForAction, void(const std::string& action, Event::Dispatcher& dispatcher,
OverloadActionCb callback));
MOCK_METHOD0(getThreadLocalOverloadState, ThreadLocalOverloadState&());
};

class MockInstance : public Instance {
Expand Down
1 change: 1 addition & 0 deletions test/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ envoy_cc_test(
"//source/extensions/resource_monitors/common:factory_base_lib",
"//source/server:overload_manager_lib",
"//test/mocks/event:event_mocks",
"//test/mocks/thread_local:thread_local_mocks",
"//test/test_common:registry_lib",
"//test/test_common:utility_lib",
],
Expand Down
55 changes: 34 additions & 21 deletions test/server/overload_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "extensions/resource_monitors/common/factory_base.h"

#include "test/mocks/event/mocks.h"
#include "test/mocks/thread_local/mocks.h"
#include "test/test_common/registry.h"
#include "test/test_common/utility.h"

Expand Down Expand Up @@ -118,46 +119,56 @@ class OverloadManagerImplTest : public testing::Test {
)EOF";
}

std::unique_ptr<OverloadManagerImpl> createOverloadManager(const std::string& config) {
return std::make_unique<OverloadManagerImpl>(dispatcher_, stats_, thread_local_,
parseConfig(config));
}

FakeResourceMonitorFactory factory1_;
FakeResourceMonitorFactory factory2_;
Registry::InjectFactory<Configuration::ResourceMonitorFactory> register_factory1_;
Registry::InjectFactory<Configuration::ResourceMonitorFactory> register_factory2_;
NiceMock<Event::MockDispatcher> dispatcher_;
Stats::IsolatedStoreImpl stats_;
NiceMock<ThreadLocal::MockInstance> thread_local_;
Event::TimerCb timer_cb_;
};

TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) {
setDispatcherExpectation();

OverloadManagerImpl manager(dispatcher_, stats_, parseConfig(getConfig()));
auto manager(createOverloadManager(getConfig()));
bool is_active = false;
int cb_count = 0;
manager.registerForAction("envoy.overload_actions.dummy_action", dispatcher_,
[&](OverloadActionState state) {
is_active = state == OverloadActionState::Active;
cb_count++;
});
manager.registerForAction("envoy.overload_actions.unknown_action", dispatcher_,
[&](OverloadActionState) { EXPECT_TRUE(false); });
manager.start();
manager->registerForAction("envoy.overload_actions.dummy_action", dispatcher_,
[&](OverloadActionState state) {
is_active = state == OverloadActionState::Active;
cb_count++;
});
manager->registerForAction("envoy.overload_actions.unknown_action", dispatcher_,
[&](OverloadActionState) { EXPECT_TRUE(false); });
manager->start();

Stats::Gauge& active_gauge = stats_.gauge("overload.envoy.overload_actions.dummy_action.active");
Stats::Gauge& pressure_gauge1 =
stats_.gauge("overload.envoy.resource_monitors.fake_resource1.pressure");
Stats::Gauge& pressure_gauge2 =
stats_.gauge("overload.envoy.resource_monitors.fake_resource2.pressure");
const OverloadActionState& action_state =
manager->getThreadLocalOverloadState().getState("envoy.overload_actions.dummy_action");

factory1_.monitor_->setPressure(0.5);
timer_cb_();
EXPECT_FALSE(is_active);
EXPECT_EQ(action_state, OverloadActionState::Inactive);
EXPECT_EQ(0, cb_count);
EXPECT_EQ(0, active_gauge.value());
EXPECT_EQ(50, pressure_gauge1.value());

factory1_.monitor_->setPressure(0.95);
timer_cb_();
EXPECT_TRUE(is_active);
EXPECT_EQ(action_state, OverloadActionState::Active);
EXPECT_EQ(1, cb_count);
EXPECT_EQ(1, active_gauge.value());
EXPECT_EQ(95, pressure_gauge1.value());
Expand All @@ -166,6 +177,7 @@ TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) {
factory1_.monitor_->setPressure(0.94);
timer_cb_();
EXPECT_TRUE(is_active);
EXPECT_EQ(action_state, OverloadActionState::Active);
EXPECT_EQ(1, cb_count);
EXPECT_EQ(94, pressure_gauge1.value());

Expand All @@ -174,22 +186,24 @@ TEST_F(OverloadManagerImplTest, CallbackOnlyFiresWhenStateChanges) {
factory2_.monitor_->setPressure(0.9);
timer_cb_();
EXPECT_TRUE(is_active);
EXPECT_EQ(action_state, OverloadActionState::Active);
EXPECT_EQ(1, cb_count);
EXPECT_EQ(50, pressure_gauge1.value());
EXPECT_EQ(90, pressure_gauge2.value());

factory2_.monitor_->setPressure(0.4);
timer_cb_();
EXPECT_FALSE(is_active);
EXPECT_EQ(action_state, OverloadActionState::Inactive);
EXPECT_EQ(2, cb_count);
EXPECT_EQ(0, active_gauge.value());
EXPECT_EQ(40, pressure_gauge2.value());
}

TEST_F(OverloadManagerImplTest, FailedUpdates) {
setDispatcherExpectation();
OverloadManagerImpl manager(dispatcher_, stats_, parseConfig(getConfig()));
manager.start();
auto manager(createOverloadManager(getConfig()));
manager->start();
Stats::Counter& failed_updates =
stats_.counter("overload.envoy.resource_monitors.fake_resource1.failed_updates");

Expand All @@ -207,8 +221,8 @@ TEST_F(OverloadManagerImplTest, SkippedUpdates) {
Event::PostCb post_cb;
ON_CALL(dispatcher_, post(_)).WillByDefault(Invoke([&](Event::PostCb cb) { post_cb = cb; }));

OverloadManagerImpl manager(dispatcher_, stats_, parseConfig(getConfig()));
manager.start();
auto manager(createOverloadManager(getConfig()));
manager->start();
Stats::Counter& skipped_updates =
stats_.counter("overload.envoy.resource_monitors.fake_resource1.skipped_updates");

Expand All @@ -233,8 +247,8 @@ TEST_F(OverloadManagerImplTest, DuplicateResourceMonitor) {
}
)EOF";

EXPECT_THROW_WITH_REGEX(OverloadManagerImpl(dispatcher_, stats_, parseConfig(config)),
EnvoyException, "Duplicate resource monitor .*");
EXPECT_THROW_WITH_REGEX(createOverloadManager(config), EnvoyException,
"Duplicate resource monitor .*");
}

TEST_F(OverloadManagerImplTest, DuplicateOverloadAction) {
Expand All @@ -247,8 +261,8 @@ TEST_F(OverloadManagerImplTest, DuplicateOverloadAction) {
}
)EOF";

EXPECT_THROW_WITH_REGEX(OverloadManagerImpl(dispatcher_, stats_, parseConfig(config)),
EnvoyException, "Duplicate overload action .*");
EXPECT_THROW_WITH_REGEX(createOverloadManager(config), EnvoyException,
"Duplicate overload action .*");
}

TEST_F(OverloadManagerImplTest, UnknownTrigger) {
Expand All @@ -264,8 +278,8 @@ TEST_F(OverloadManagerImplTest, UnknownTrigger) {
}
)EOF";

EXPECT_THROW_WITH_REGEX(OverloadManagerImpl(dispatcher_, stats_, parseConfig(config)),
EnvoyException, "Unknown trigger resource .*");
EXPECT_THROW_WITH_REGEX(createOverloadManager(config), EnvoyException,
"Unknown trigger resource .*");
}

TEST_F(OverloadManagerImplTest, DuplicateTrigger) {
Expand All @@ -290,8 +304,7 @@ TEST_F(OverloadManagerImplTest, DuplicateTrigger) {
}
)EOF";

EXPECT_THROW_WITH_REGEX(OverloadManagerImpl(dispatcher_, stats_, parseConfig(config)),
EnvoyException, "Duplicate trigger .*");
EXPECT_THROW_WITH_REGEX(createOverloadManager(config), EnvoyException, "Duplicate trigger .*");
}
} // namespace
} // namespace Server
Expand Down

0 comments on commit 5fe4e14

Please sign in to comment.