Skip to content

Commit

Permalink
Create scaled timers through the Dispatcher
Browse files Browse the repository at this point in the history
When creating the Dispatcher, use a factory returned by the
OverloadManager to construct the ScaledRangeTimerManager.

Signed-off-by: Alex Konradi <akonradi@google.com>
  • Loading branch information
akonradi committed Jan 12, 2021
1 parent 842d5fa commit 9d167b1
Show file tree
Hide file tree
Showing 26 changed files with 178 additions and 109 deletions.
12 changes: 12 additions & 0 deletions include/envoy/api/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ class Api {
*/
virtual Event::DispatcherPtr allocateDispatcher(const std::string& name) PURE;

/**
* Allocate a dispatcher.
* @param name the identity name for a dispatcher, e.g. "worker_2" or "main_thread".
* This name will appear in per-handler/worker statistics, such as
* "server.worker_2.watchdog_miss".
* @param scaled_timer_factory the factory to use when creating the scaled timer manager.
* @return Event::DispatcherPtr which is owned by the caller.
*/
virtual Event::DispatcherPtr
allocateDispatcher(const std::string& name,
const Event::ScaledRangeTimerManagerFactory& scaled_timer_factory) PURE;

/**
* Allocate a dispatcher.
* @param name the identity name for a dispatcher, e.g. "worker_2" or "main_thread".
Expand Down
3 changes: 2 additions & 1 deletion include/envoy/event/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ envoy_cc_library(
deps = [
":deferred_deletable",
":file_event_interface",
":scaled_range_timer_manager_interface",
":schedulable_cb_interface",
":signal_interface",
":timer_interface",
"//include/envoy/common:scope_tracker_interface",
"//include/envoy/common:time_interface",
"//include/envoy/event:timer_interface",
"//include/envoy/filesystem:watcher_interface",
"//include/envoy/network:connection_handler_interface",
"//include/envoy/network:connection_interface",
Expand Down
16 changes: 16 additions & 0 deletions include/envoy/event/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "envoy/common/scope_tracker.h"
#include "envoy/common/time.h"
#include "envoy/event/file_event.h"
#include "envoy/event/scaled_range_timer_manager.h"
#include "envoy/event/schedulable_cb.h"
#include "envoy/event/signal.h"
#include "envoy/event/timer.h"
Expand Down Expand Up @@ -77,6 +78,21 @@ class DispatcherBase {
*/
virtual Event::TimerPtr createTimer(TimerCb cb) PURE;

/**
* Allocates a scaled timer. @see Timer for docs on how to use the timer.
* @param timer_type the type of timer to create.
* @param cb supplies the callback to invoke when the timer fires.
*/
virtual Event::TimerPtr createScaledTimer(Event::ScaledRangeTimerManager::TimerType timer_type,
TimerCb cb) PURE;

/**
* Allocates a scaled timer. @see Timer for docs on how to use the timer.
* @param minimum the rule for computing the minimum value of the timer.
* @param cb supplies the callback to invoke when the timer fires.
*/
virtual Event::TimerPtr createScaledTimer(ScaledTimerMinimum minimum, TimerCb cb) PURE;

/**
* Allocates a schedulable callback. @see SchedulableCallback for docs on how to use the wrapped
* callback.
Expand Down
3 changes: 3 additions & 0 deletions include/envoy/event/scaled_range_timer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,8 @@ class ScaledRangeTimerManager {

using ScaledRangeTimerManagerPtr = std::unique_ptr<ScaledRangeTimerManager>;

class Dispatcher;
using ScaledRangeTimerManagerFactory = std::function<ScaledRangeTimerManagerPtr(Dispatcher&)>;

} // namespace Event
} // namespace Envoy
6 changes: 6 additions & 0 deletions include/envoy/server/overload/overload_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "envoy/common/pure.h"
#include "envoy/event/dispatcher.h"
#include "envoy/event/scaled_range_timer_manager.h"
#include "envoy/server/overload/thread_local_overload_state.h"

#include "common/singleton/const_singleton.h"
Expand Down Expand Up @@ -69,6 +70,11 @@ class OverloadManager {
* an alternative to registering a callback for overload action state changes.
*/
virtual ThreadLocalOverloadState& getThreadLocalOverloadState() PURE;

/**
* Get a factory for constructing scaled timer managers that respond to overload state.
*/
virtual Event::ScaledRangeTimerManagerFactory scaledTimerFactory() PURE;
};

} // namespace Server
Expand Down
8 changes: 0 additions & 8 deletions include/envoy/server/overload/thread_local_overload_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,6 @@ class ThreadLocalOverloadState : public ThreadLocal::ThreadLocalObject {
public:
// Get a thread-local reference to the value for the given action key.
virtual const OverloadActionState& getState(const std::string& action) PURE;

// Get a scaled timer whose minimum corresponds to the configured value for the given timer type.
virtual Event::TimerPtr createScaledTimer(Event::ScaledRangeTimerManager::TimerType timer_type,
Event::TimerCb callback) PURE;

// Get a scaled timer whose minimum is determined by the given scaling rule.
virtual Event::TimerPtr createScaledTimer(Event::ScaledTimerMinimum minimum,
Event::TimerCb callback) PURE;
};

} // namespace Server
Expand Down
12 changes: 10 additions & 2 deletions source/common/api/api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ Impl::Impl(Thread::ThreadFactory& thread_factory, Stats::Store& store,
process_context_(process_context), watermark_factory_(std::move(watermark_factory)) {}

Event::DispatcherPtr Impl::allocateDispatcher(const std::string& name) {
return std::make_unique<Event::DispatcherImpl>(name, *this, time_system_, watermark_factory_);
return std::make_unique<Event::DispatcherImpl>(name, *this, time_system_, nullptr,
watermark_factory_);
}
Event::DispatcherPtr
Impl::allocateDispatcher(const std::string& name,
const Event::ScaledRangeTimerManagerFactory& scaled_timer_factory) {
return std::make_unique<Event::DispatcherImpl>(name, *this, time_system_, scaled_timer_factory,
watermark_factory_);
}

Event::DispatcherPtr Impl::allocateDispatcher(const std::string& name,
Buffer::WatermarkFactoryPtr&& factory) {
return std::make_unique<Event::DispatcherImpl>(name, *this, time_system_, std::move(factory));
return std::make_unique<Event::DispatcherImpl>(name, *this, time_system_, nullptr,
std::move(factory));
}

} // namespace Api
Expand Down
3 changes: 3 additions & 0 deletions source/common/api/api_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ class Impl : public Api {

// Api::Api
Event::DispatcherPtr allocateDispatcher(const std::string& name) override;
Event::DispatcherPtr
allocateDispatcher(const std::string& name,
const Event::ScaledRangeTimerManagerFactory& scaled_timer_factory) override;
Event::DispatcherPtr allocateDispatcher(const std::string& name,
Buffer::WatermarkFactoryPtr&& watermark_factory) override;
Thread::ThreadFactory& threadFactory() override { return thread_factory_; }
Expand Down
1 change: 1 addition & 0 deletions source/common/event/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ envoy_cc_library(
":libevent_scheduler_lib",
":real_time_system_lib",
":signal_lib",
":scaled_range_timer_manager_lib",
"//include/envoy/common:scope_tracker_interface",
"//include/envoy/common:time_interface",
"//include/envoy/event:signal_interface",
Expand Down
29 changes: 23 additions & 6 deletions source/common/event/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "common/common/thread.h"
#include "common/event/file_event_impl.h"
#include "common/event/libevent_scheduler.h"
#include "common/event/scaled_range_timer_manager_impl.h"
#include "common/event/signal_impl.h"
#include "common/event/timer_impl.h"
#include "common/filesystem/watcher_impl.h"
Expand All @@ -37,17 +38,22 @@
namespace Envoy {
namespace Event {

DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
Event::TimeSystem& time_system,
const Buffer::WatermarkFactorySharedPtr& factory)
DispatcherImpl::DispatcherImpl(
const std::string& name, Api::Api& api, Event::TimeSystem& time_system,
const std::function<ScaledRangeTimerManagerPtr(Dispatcher&)>& scaled_timer_factory,
const Buffer::WatermarkFactorySharedPtr& watermark_factory)
: name_(name), api_(api),
buffer_factory_(factory != nullptr ? factory
: std::make_shared<Buffer::WatermarkBufferFactory>()),
buffer_factory_(watermark_factory != nullptr
? watermark_factory
: std::make_shared<Buffer::WatermarkBufferFactory>()),
scheduler_(time_system.createScheduler(base_scheduler_, base_scheduler_)),
deferred_delete_cb_(base_scheduler_.createSchedulableCallback(
[this]() -> void { clearDeferredDeleteList(); })),
post_cb_(base_scheduler_.createSchedulableCallback([this]() -> void { runPostCallbacks(); })),
current_to_delete_(&to_delete_1_) {
current_to_delete_(&to_delete_1_),
scaled_timer_manager_(scaled_timer_factory != nullptr
? scaled_timer_factory(*this)
: std::make_unique<ScaledRangeTimerManagerImpl>(*this)) {
ASSERT(!name_.empty());
FatalErrorHandler::registerFatalErrorHandler(*this);
updateApproximateMonotonicTimeInternal();
Expand Down Expand Up @@ -190,6 +196,17 @@ TimerPtr DispatcherImpl::createTimer(TimerCb cb) {
return createTimerInternal(cb);
}

TimerPtr DispatcherImpl::createScaledTimer(ScaledRangeTimerManager::TimerType timer_type,
TimerCb cb) {
ASSERT(isThreadSafe());
return scaled_timer_manager_->createTimer(timer_type, std::move(cb));
}

TimerPtr DispatcherImpl::createScaledTimer(ScaledTimerMinimum minimum, TimerCb cb) {
ASSERT(isThreadSafe());
return scaled_timer_manager_->createTimer(minimum, std::move(cb));
}

Event::SchedulableCallbackPtr DispatcherImpl::createSchedulableCallback(std::function<void()> cb) {
ASSERT(isThreadSafe());
return base_scheduler_.createSchedulableCallback([this, cb]() {
Expand Down
7 changes: 6 additions & 1 deletion source/common/event/dispatcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
public FatalErrorHandlerInterface {
public:
DispatcherImpl(const std::string& name, Api::Api& api, Event::TimeSystem& time_system,
const Buffer::WatermarkFactorySharedPtr& factory = nullptr);
const ScaledRangeTimerManagerFactory& scaled_timer_factory = nullptr,
const Buffer::WatermarkFactorySharedPtr& watermark_factory = nullptr);
~DispatcherImpl() override;

/**
Expand Down Expand Up @@ -67,6 +68,9 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
Network::UdpListenerPtr createUdpListener(Network::SocketSharedPtr socket,
Network::UdpListenerCallbacks& cb) override;
TimerPtr createTimer(TimerCb cb) override;
TimerPtr createScaledTimer(ScaledRangeTimerManager::TimerType timer_type, TimerCb cb) override;
TimerPtr createScaledTimer(ScaledTimerMinimum minimum, TimerCb cb) override;

Event::SchedulableCallbackPtr createSchedulableCallback(std::function<void()> cb) override;
void deferredDelete(DeferredDeletablePtr&& to_delete) override;
void exit() override;
Expand Down Expand Up @@ -154,6 +158,7 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
bool deferred_deleting_{};
MonotonicTime approximate_monotonic_time_;
WatchdogRegistrationPtr watchdog_registration_;
const ScaledRangeTimerManagerPtr scaled_timer_manager_;
};

} // namespace Event
Expand Down
16 changes: 9 additions & 7 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ void ConnectionManagerImpl::initializeReadFilterCallbacks(Network::ReadFilterCal
read_callbacks_->connection().addConnectionCallbacks(*this);

if (config_.idleTimeout()) {
connection_idle_timer_ = overload_state_.createScaledTimer(
connection_idle_timer_ = read_callbacks_->connection().dispatcher().createScaledTimer(
Event::ScaledRangeTimerManager::TimerType::HttpDownstreamIdleConnectionTimeout,
[this]() -> void { onIdleTimeout(); });
connection_idle_timer_->enableTimer(config_.idleTimeout().value());
Expand Down Expand Up @@ -628,9 +628,10 @@ ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connect

if (connection_manager_.config_.streamIdleTimeout().count()) {
idle_timeout_ms_ = connection_manager_.config_.streamIdleTimeout();
stream_idle_timer_ = connection_manager_.overload_state_.createScaledTimer(
Event::ScaledRangeTimerManager::TimerType::HttpDownstreamIdleStreamTimeout,
[this]() -> void { onIdleTimeout(); });
stream_idle_timer_ =
connection_manager_.read_callbacks_->connection().dispatcher().createScaledTimer(
Event::ScaledRangeTimerManager::TimerType::HttpDownstreamIdleStreamTimeout,
[this]() -> void { onIdleTimeout(); });
resetIdleTimer();
}

Expand Down Expand Up @@ -1052,9 +1053,10 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& he
if (idle_timeout_ms_.count()) {
// If we have a route-level idle timeout but no global stream idle timeout, create a timer.
if (stream_idle_timer_ == nullptr) {
stream_idle_timer_ = connection_manager_.overload_state_.createScaledTimer(
Event::ScaledRangeTimerManager::TimerType::HttpDownstreamIdleStreamTimeout,
[this]() -> void { onIdleTimeout(); });
stream_idle_timer_ =
connection_manager_.read_callbacks_->connection().dispatcher().createScaledTimer(
Event::ScaledRangeTimerManager::TimerType::HttpDownstreamIdleStreamTimeout,
[this]() -> void { onIdleTimeout(); });
}
} else if (stream_idle_timer_ != nullptr) {
// If we had a global stream idle timeout but the route-level idle timeout is set to zero
Expand Down
11 changes: 2 additions & 9 deletions source/server/admin/admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,6 @@ class AdminImpl : public Admin,
struct NullThreadLocalOverloadState : public ThreadLocalOverloadState {
NullThreadLocalOverloadState(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}
const OverloadActionState& getState(const std::string&) override { return inactive_; }
Event::TimerPtr createScaledTimer(Event::ScaledRangeTimerManager::TimerType,
Event::TimerCb callback) override {
return dispatcher_.createTimer(callback);
}
Event::TimerPtr createScaledTimer(Event::ScaledTimerMinimum,
Event::TimerCb callback) override {
return dispatcher_.createTimer(callback);
}

Event::Dispatcher& dispatcher_;
const OverloadActionState inactive_ = OverloadActionState::inactive();
};
Expand All @@ -282,6 +273,8 @@ class AdminImpl : public Admin,
return tls_->getTyped<NullThreadLocalOverloadState>();
}

Event::ScaledRangeTimerManagerFactory scaledTimerFactory() override { return nullptr; }

bool registerForAction(const std::string&, Event::Dispatcher&, OverloadActionCb) override {
// This method shouldn't be called by the admin listener
NOT_REACHED_GCOVR_EXCL_LINE;
Expand Down
37 changes: 14 additions & 23 deletions source/server/overload_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,9 @@ using TimerTypeMap = Event::ScaledRangeTimerManagerImpl::TimerTypeMap;
*/
class ThreadLocalOverloadStateImpl : public ThreadLocalOverloadState {
public:
ThreadLocalOverloadStateImpl(Event::ScaledRangeTimerManagerPtr scaled_timer_manager,
const NamedOverloadActionSymbolTable& action_symbol_table)
explicit ThreadLocalOverloadStateImpl(const NamedOverloadActionSymbolTable& action_symbol_table)
: action_symbol_table_(action_symbol_table),
actions_(action_symbol_table.size(), OverloadActionState(UnitFloat::min())),
scaled_timer_action_(action_symbol_table.lookup(OverloadActionNames::get().ReduceTimeouts)),
scaled_timer_manager_(std::move(scaled_timer_manager)) {}
actions_(action_symbol_table.size(), OverloadActionState(UnitFloat::min())) {}

const OverloadActionState& getState(const std::string& action) override {
if (const auto symbol = action_symbol_table_.lookup(action); symbol != absl::nullopt) {
Expand All @@ -41,29 +38,14 @@ class ThreadLocalOverloadStateImpl : public ThreadLocalOverloadState {
return always_inactive_;
}

Event::TimerPtr createScaledTimer(Event::ScaledRangeTimerManager::TimerType timer_type,
Event::TimerCb callback) override {
return scaled_timer_manager_->createTimer(timer_type, std::move(callback));
}

Event::TimerPtr createScaledTimer(Event::ScaledTimerMinimum minimum,
Event::TimerCb callback) override {
return scaled_timer_manager_->createTimer(minimum, std::move(callback));
}

void setState(NamedOverloadActionSymbolTable::Symbol action, OverloadActionState state) {
actions_[action.index()] = state;
if (scaled_timer_action_.has_value() && scaled_timer_action_.value() == action) {
scaled_timer_manager_->setScaleFactor(1 - state.value());
}
}

private:
static const OverloadActionState always_inactive_;
const NamedOverloadActionSymbolTable& action_symbol_table_;
std::vector<OverloadActionState> actions_;
absl::optional<NamedOverloadActionSymbolTable::Symbol> scaled_timer_action_;
const Event::ScaledRangeTimerManagerPtr scaled_timer_manager_;
};

const OverloadActionState ThreadLocalOverloadStateImpl::always_inactive_{UnitFloat::min()};
Expand Down Expand Up @@ -332,9 +314,8 @@ void OverloadManagerImpl::start() {
ASSERT(!started_);
started_ = true;

tls_.set([this](Event::Dispatcher& dispatcher) {
return std::make_shared<ThreadLocalOverloadStateImpl>(createScaledRangeTimerManager(dispatcher),
action_symbol_table_);
tls_.set([this](Event::Dispatcher&) {
return std::make_shared<ThreadLocalOverloadStateImpl>(action_symbol_table_);
});

if (resources_.empty()) {
Expand Down Expand Up @@ -386,6 +367,16 @@ bool OverloadManagerImpl::registerForAction(const std::string& action,
}

ThreadLocalOverloadState& OverloadManagerImpl::getThreadLocalOverloadState() { return *tls_; }
Event::ScaledRangeTimerManagerFactory OverloadManagerImpl::scaledTimerFactory() {
return [this](Event::Dispatcher& dispatcher) {
auto manager = createScaledRangeTimerManager(dispatcher);
registerForAction(OverloadActionNames::get().ReduceTimeouts, dispatcher,
[manager = manager.get()](OverloadActionState scale_state) {
manager->setScaleFactor(1 - scale_state.value());
});
return manager;
};
}

Event::ScaledRangeTimerManagerPtr
OverloadManagerImpl::createScaledRangeTimerManager(Event::Dispatcher& dispatcher) const {
Expand Down
1 change: 1 addition & 0 deletions source/server/overload_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class OverloadManagerImpl : Logger::Loggable<Logger::Id::main>, public OverloadM
bool registerForAction(const std::string& action, Event::Dispatcher& dispatcher,
OverloadActionCb callback) override;
ThreadLocalOverloadState& getThreadLocalOverloadState() override;
Event::ScaledRangeTimerManagerFactory scaledTimerFactory() override;

// Stop the overload manager timer and wait for any pending resource updates to complete.
// After this returns, overload manager clients should not receive any more callbacks
Expand Down
3 changes: 2 additions & 1 deletion source/server/worker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ namespace Server {

WorkerPtr ProdWorkerFactory::createWorker(uint32_t index, OverloadManager& overload_manager,
const std::string& worker_name) {
Event::DispatcherPtr dispatcher(api_.allocateDispatcher(worker_name));
Event::DispatcherPtr dispatcher(
api_.allocateDispatcher(worker_name, overload_manager.scaledTimerFactory()));
return std::make_unique<WorkerImpl>(tls_, hooks_, std::move(dispatcher),
std::make_unique<ConnectionHandlerImpl>(*dispatcher, index),
overload_manager, api_);
Expand Down
2 changes: 1 addition & 1 deletion test/common/event/scaled_range_timer_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,8 @@ TEST_F(ScaledRangeTimerManagerTest, LooksUpConfiguredMinimums) {
// one calls into this one after looking up the minimum.
class TestScaledRangeTimerManager : public ScaledRangeTimerManagerImpl {
public:
using ScaledRangeTimerManagerImpl::ScaledRangeTimerManagerImpl;
using ScaledRangeTimerManagerImpl::createTimer;
using ScaledRangeTimerManagerImpl::ScaledRangeTimerManagerImpl;
TimerPtr createTimer(ScaledTimerMinimum minimum, TimerCb callback) override {
return createScaledTimer(minimum, callback);
}
Expand Down
Loading

0 comments on commit 9d167b1

Please sign in to comment.