From b9613239137dfb4767081eedef03d7470f2afca0 Mon Sep 17 00:00:00 2001 From: Christoph Pakulski Date: Wed, 3 Mar 2021 09:57:24 -0500 Subject: [PATCH] backport to 1.17: cluster: destroy on main thread (#14954) (#15197) * Dispatcher: keeps a stack of tracked objects. (#14573) Dispatcher will now keep a stack of tracked objects; on crash it'll "unwind" and have those objects dump their state. Moreover, it'll invoke fatal actions with the tracked objects. This allows us to dump more information during crash. See related PR: #14509 Will follow up with another PR dumping information at the codec/parser level. Signed-off-by: Kevin Baichoo Signed-off-by: Christoph Pakulski * cluster: destroy on main thread (#14954) Signed-off-by: Yuchen Dai Signed-off-by: Christoph Pakulski * Updated release notes. Signed-off-by: Christoph Pakulski Co-authored-by: Kevin Baichoo Co-authored-by: Yuchen Dai --- docs/root/version_history/current.rst | 1 + include/envoy/event/BUILD | 6 + include/envoy/event/dispatcher.h | 29 ++- .../envoy/event/dispatcher_thread_deletable.h | 21 +++ include/envoy/server/fatal_action_config.h | 8 +- source/common/common/scope_tracker.h | 25 ++- source/common/event/BUILD | 3 + source/common/event/dispatcher_impl.cc | 101 ++++++++++- source/common/event/dispatcher_impl.h | 47 +++-- source/common/grpc/async_client_impl.cc | 1 + source/common/http/async_client_impl.cc | 3 + source/common/network/connection_impl.cc | 4 + source/common/upstream/upstream_impl.cc | 19 +- source/common/upstream/upstream_impl.h | 4 +- source/server/config_validation/server.cc | 1 + source/server/server.cc | 1 + source/server/worker_impl.cc | 1 + test/common/common/BUILD | 12 ++ test/common/common/scope_tracker_test.cc | 37 ++++ test/common/event/dispatcher_impl_test.cc | 170 +++++++++++++++++- .../scaled_range_timer_manager_impl_test.cc | 10 +- test/common/http/conn_manager_impl_test.cc | 6 +- test/common/http/conn_manager_impl_test_2.cc | 23 ++- test/common/http/filter_manager_test.cc | 5 +- test/common/network/connection_impl_test.cc | 4 +- test/common/router/router_test.cc | 14 +- .../common/router/router_upstream_log_test.cc | 3 +- test/common/signal/fatal_action_test.cc | 12 +- test/common/signal/signals_test.cc | 17 +- .../upstream/cluster_manager_impl_test.cc | 2 + .../upstream/logical_dns_cluster_test.cc | 2 +- .../upstream/original_dst_cluster_test.cc | 2 +- .../filters/http/fault/fault_filter_test.cc | 58 +++--- .../filters/http/squash/squash_filter_test.cc | 17 +- .../sds_dynamic_integration_test.cc | 129 +++++++++++++ test/mocks/event/mocks.cc | 1 + test/mocks/event/mocks.h | 6 +- test/mocks/event/wrapped_dispatcher.h | 14 +- test/mocks/router/router_filter_interface.cc | 3 +- test/server/server_test.cc | 4 +- 40 files changed, 707 insertions(+), 119 deletions(-) create mode 100644 include/envoy/event/dispatcher_thread_deletable.h create mode 100644 test/common/common/scope_tracker_test.cc diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 77b1b5b6d733..3ae7afe1afa8 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -21,6 +21,7 @@ Removed Config or Runtime New Features ------------ +* dispatcher: supports a stack of `Envoy::ScopeTrackedObject` instead of a single tracked object. This will allow Envoy to dump more debug information on crash. Deprecated ---------- diff --git a/include/envoy/event/BUILD b/include/envoy/event/BUILD index a4c865a2d225..75a7b759a9f7 100644 --- a/include/envoy/event/BUILD +++ b/include/envoy/event/BUILD @@ -13,11 +13,17 @@ envoy_cc_library( hdrs = ["deferred_deletable.h"], ) +envoy_cc_library( + name = "dispatcher_thread_deletable", + hdrs = ["dispatcher_thread_deletable.h"], +) + envoy_cc_library( name = "dispatcher_interface", hdrs = ["dispatcher.h"], deps = [ ":deferred_deletable", + ":dispatcher_thread_deletable", ":file_event_interface", ":schedulable_cb_interface", ":signal_interface", diff --git a/include/envoy/event/dispatcher.h b/include/envoy/event/dispatcher.h index 599617e87d28..f6ffdc2ccc8a 100644 --- a/include/envoy/event/dispatcher.h +++ b/include/envoy/event/dispatcher.h @@ -8,6 +8,7 @@ #include "envoy/common/scope_tracker.h" #include "envoy/common/time.h" +#include "envoy/event/dispatcher_thread_deletable.h" #include "envoy/event/file_event.h" #include "envoy/event/schedulable_cb.h" #include "envoy/event/signal.h" @@ -86,15 +87,18 @@ class DispatcherBase { virtual Event::SchedulableCallbackPtr createSchedulableCallback(std::function cb) PURE; /** - * Sets a tracked object, which is currently operating in this Dispatcher. - * This should be cleared with another call to setTrackedObject() when the object is done doing - * work. Calling setTrackedObject(nullptr) results in no object being tracked. + * Appends a tracked object to the current stack of tracked objects operating + * in the dispatcher. * - * This is optimized for performance, to avoid allocation where we do scoped object tracking. - * - * @return The previously tracked object or nullptr if there was none. + * It's recommended to use ScopeTrackerScopeState to manage the object's tracking. If directly + * invoking, there needs to be a subsequent call to popTrackedObject(). */ - virtual const ScopeTrackedObject* setTrackedObject(const ScopeTrackedObject* object) PURE; + virtual void pushTrackedObject(const ScopeTrackedObject* object) PURE; + + /** + * Removes the top of the stack of tracked object and asserts that it was expected. + */ + virtual void popTrackedObject(const ScopeTrackedObject* expected_object) PURE; /** * Validates that an operation is thread-safe with respect to this dispatcher; i.e. that the @@ -242,6 +246,12 @@ class Dispatcher : public DispatcherBase { */ virtual void post(PostCb callback) PURE; + /** + * Post the deletable to this dispatcher. The deletable objects are guaranteed to be destroyed on + * the dispatcher's thread before dispatcher destroy. This is safe cross thread. + */ + virtual void deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) PURE; + /** * Runs the event loop. This will not return until exit() is called either from within a callback * or from a different thread. @@ -269,6 +279,11 @@ class Dispatcher : public DispatcherBase { * Updates approximate monotonic time to current value. */ virtual void updateApproximateMonotonicTime() PURE; + + /** + * Shutdown the dispatcher by clear dispatcher thread deletable. + */ + virtual void shutdown() PURE; }; using DispatcherPtr = std::unique_ptr; diff --git a/include/envoy/event/dispatcher_thread_deletable.h b/include/envoy/event/dispatcher_thread_deletable.h new file mode 100644 index 000000000000..bf5b1808e0e3 --- /dev/null +++ b/include/envoy/event/dispatcher_thread_deletable.h @@ -0,0 +1,21 @@ +#pragma once + +#include + +namespace Envoy { +namespace Event { + +/** + * If an object derives from this class, it can be passed to the destination dispatcher who + * guarantees to delete it in that dispatcher thread. The common use case is to ensure config + * related objects are deleted in the main thread. + */ +class DispatcherThreadDeletable { +public: + virtual ~DispatcherThreadDeletable() = default; +}; + +using DispatcherThreadDeletableConstPtr = std::unique_ptr; + +} // namespace Event +} // namespace Envoy diff --git a/include/envoy/server/fatal_action_config.h b/include/envoy/server/fatal_action_config.h index c8768dced40a..1e5914ac2592 100644 --- a/include/envoy/server/fatal_action_config.h +++ b/include/envoy/server/fatal_action_config.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include "envoy/common/pure.h" #include "envoy/config/bootstrap/v3/bootstrap.pb.h" @@ -17,11 +18,10 @@ class FatalAction { public: virtual ~FatalAction() = default; /** - * Callback function to run when we are crashing. - * @param current_object the object we were working on when we started - * crashing. + * Callback function to run when Envoy is crashing. + * @param tracked_objects a span of objects Envoy was working on when Envoy started crashing. */ - virtual void run(const ScopeTrackedObject* current_object) PURE; + virtual void run(absl::Span tracked_objects) PURE; /** * @return whether the action is async-signal-safe. diff --git a/source/common/common/scope_tracker.h b/source/common/common/scope_tracker.h index bed58c3fa8c0..4426bbaca5cc 100644 --- a/source/common/common/scope_tracker.h +++ b/source/common/common/scope_tracker.h @@ -3,24 +3,35 @@ #include "envoy/common/scope_tracker.h" #include "envoy/event/dispatcher.h" +#include "common/common/assert.h" + namespace Envoy { -// A small class for tracking the scope of the object which is currently having +// A small class for managing the scope of a tracked object which is currently having // work done in this thread. // -// When created, it sets the tracked object in the dispatcher, and when destroyed it points the -// dispatcher at the previously tracked object. +// When created, it appends the tracked object to the dispatcher's stack of tracked objects, and +// when destroyed it pops the dispatcher's stack of tracked object, which should be the object it +// registered. class ScopeTrackerScopeState { public: ScopeTrackerScopeState(const ScopeTrackedObject* object, Event::Dispatcher& dispatcher) - : dispatcher_(dispatcher) { - latched_object_ = dispatcher_.setTrackedObject(object); + : registered_object_(object), dispatcher_(dispatcher) { + dispatcher_.pushTrackedObject(registered_object_); + } + + ~ScopeTrackerScopeState() { + // If ScopeTrackerScopeState is always used for managing tracked objects, + // then the object popped off should be the object we registered. + dispatcher_.popTrackedObject(registered_object_); } - ~ScopeTrackerScopeState() { dispatcher_.setTrackedObject(latched_object_); } + // Make this object stack-only, it doesn't make sense for it + // to be on the heap since it's tracking a stack of active operations. + void* operator new(std::size_t) = delete; private: - const ScopeTrackedObject* latched_object_; + const ScopeTrackedObject* registered_object_; Event::Dispatcher& dispatcher_; }; diff --git a/source/common/event/BUILD b/source/common/event/BUILD index c14a6ee08e99..09d640b6b817 100644 --- a/source/common/event/BUILD +++ b/source/common/event/BUILD @@ -112,6 +112,9 @@ envoy_cc_library( "file_event_impl.h", "schedulable_cb_impl.h", ], + external_deps = [ + "abseil_inlined_vector", + ], deps = [ ":libevent_lib", ":libevent_scheduler_lib", diff --git a/source/common/event/dispatcher_impl.cc b/source/common/event/dispatcher_impl.cc index 558d82b9230d..281caa0dd8c5 100644 --- a/source/common/event/dispatcher_impl.cc +++ b/source/common/event/dispatcher_impl.cc @@ -7,10 +7,12 @@ #include #include "envoy/api/api.h" +#include "envoy/common/scope_tracker.h" #include "envoy/network/listen_socket.h" #include "envoy/network/listener.h" #include "common/buffer/buffer_impl.h" +#include "common/common/assert.h" #include "common/common/lock_guard.h" #include "common/common/thread.h" #include "common/event/file_event_impl.h" @@ -44,6 +46,8 @@ DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api, buffer_factory_(factory != nullptr ? factory : std::make_shared()), scheduler_(time_system.createScheduler(base_scheduler_, base_scheduler_)), + thread_local_delete_cb_( + base_scheduler_.createSchedulableCallback([this]() -> void { runThreadLocalDelete(); })), deferred_delete_cb_(base_scheduler_.createSchedulableCallback( [this]() -> void { clearDeferredDeleteList(); })), post_cb_(base_scheduler_.createSchedulableCallback([this]() -> void { runPostCallbacks(); })), @@ -55,7 +59,12 @@ DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api, std::bind(&DispatcherImpl::updateApproximateMonotonicTime, this)); } -DispatcherImpl::~DispatcherImpl() { FatalErrorHandler::removeFatalErrorHandler(*this); } +DispatcherImpl::~DispatcherImpl() { + ENVOY_LOG(debug, "destroying dispatcher {}", name_); + FatalErrorHandler::removeFatalErrorHandler(*this); + // TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and enable + // ASSERT(deletable_in_dispatcher_thread_.empty()) +} void DispatcherImpl::registerWatchdog(const Server::WatchDogSharedPtr& watchdog, std::chrono::milliseconds min_touch_interval) { @@ -236,9 +245,23 @@ void DispatcherImpl::post(std::function callback) { } } +void DispatcherImpl::deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) { + bool need_schedule; + { + Thread::LockGuard lock(thread_local_deletable_lock_); + need_schedule = deletables_in_dispatcher_thread_.empty(); + deletables_in_dispatcher_thread_.emplace_back(std::move(deletable)); + // TODO(lambdai): Enable below after https://github.com/envoyproxy/envoy/issues/15072 + // ASSERT(!shutdown_called_, "inserted after shutdown"); + } + + if (need_schedule) { + thread_local_delete_cb_->scheduleCallbackCurrentIteration(); + } +} + void DispatcherImpl::run(RunType type) { run_tid_ = api_.threadFactory().currentThreadId(); - // Flush all post callbacks before we run the event loop. We do this because there are post // callbacks that have to get run before the initial event loop starts running. libevent does // not guarantee that events are run in any particular order. So even if we post() and call @@ -251,12 +274,56 @@ MonotonicTime DispatcherImpl::approximateMonotonicTime() const { return approximate_monotonic_time_; } +void DispatcherImpl::shutdown() { + // TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and loop delete below + // below 3 lists until all lists are empty. The 3 lists are list of deferred delete objects, post + // callbacks and dispatcher thread deletable objects. + ASSERT(isThreadSafe()); + auto deferred_deletables_size = current_to_delete_->size(); + std::list>::size_type post_callbacks_size; + { + Thread::LockGuard lock(post_lock_); + post_callbacks_size = post_callbacks_.size(); + } + + std::list local_deletables; + { + Thread::LockGuard lock(thread_local_deletable_lock_); + local_deletables = std::move(deletables_in_dispatcher_thread_); + } + auto thread_local_deletables_size = local_deletables.size(); + while (!local_deletables.empty()) { + local_deletables.pop_front(); + } + ASSERT(!shutdown_called_); + shutdown_called_ = true; + ENVOY_LOG( + trace, + "{} destroyed {} thread local objects. Peek {} deferred deletables, {} post callbacks. ", + __FUNCTION__, deferred_deletables_size, post_callbacks_size, thread_local_deletables_size); +} + void DispatcherImpl::updateApproximateMonotonicTime() { updateApproximateMonotonicTimeInternal(); } void DispatcherImpl::updateApproximateMonotonicTimeInternal() { approximate_monotonic_time_ = api_.timeSource().monotonicTime(); } +void DispatcherImpl::runThreadLocalDelete() { + std::list to_be_delete; + { + Thread::LockGuard lock(thread_local_deletable_lock_); + to_be_delete = std::move(deletables_in_dispatcher_thread_); + ASSERT(deletables_in_dispatcher_thread_.empty()); + } + while (!to_be_delete.empty()) { + // Touch the watchdog before deleting the objects to avoid spurious watchdog miss events when + // executing complicated destruction. + touchWatchdog(); + // Delete in FIFO order. + to_be_delete.pop_front(); + } +} void DispatcherImpl::runPostCallbacks() { // Clear the deferred delete list before running post callbacks to reduce non-determinism in // callback processing, and more easily detect if a scheduled post callback refers to one of the @@ -287,6 +354,16 @@ void DispatcherImpl::runPostCallbacks() { } } +void DispatcherImpl::onFatalError(std::ostream& os) const { + // Dump the state of the tracked objects in the dispatcher if thread safe. This generally + // results in dumping the active state only for the thread which caused the fatal error. + if (isThreadSafe()) { + for (auto iter = tracked_object_stack_.rbegin(); iter != tracked_object_stack_.rend(); ++iter) { + (*iter)->dumpState(os); + } + } +} + void DispatcherImpl::runFatalActionsOnTrackedObject( const FatalAction::FatalActionPtrList& actions) const { // Only run if this is the dispatcher of the current thread and @@ -296,7 +373,7 @@ void DispatcherImpl::runFatalActionsOnTrackedObject( } for (const auto& action : actions) { - action->run(current_object_); + action->run(tracked_object_stack_); } } @@ -306,5 +383,23 @@ void DispatcherImpl::touchWatchdog() { } } +void DispatcherImpl::pushTrackedObject(const ScopeTrackedObject* object) { + ASSERT(isThreadSafe()); + ASSERT(object != nullptr); + tracked_object_stack_.push_back(object); + ASSERT(tracked_object_stack_.size() <= ExpectedMaxTrackedObjectStackDepth); +} + +void DispatcherImpl::popTrackedObject(const ScopeTrackedObject* expected_object) { + ASSERT(isThreadSafe()); + ASSERT(expected_object != nullptr); + RELEASE_ASSERT(!tracked_object_stack_.empty(), "Tracked Object Stack is empty, nothing to pop!"); + + const ScopeTrackedObject* top = tracked_object_stack_.back(); + tracked_object_stack_.pop_back(); + ASSERT(top == expected_object, + "Popped the top of the tracked object stack, but it wasn't the expected object!"); +} + } // namespace Event } // namespace Envoy diff --git a/source/common/event/dispatcher_impl.h b/source/common/event/dispatcher_impl.h index bd3b698af11f..1d61f3f8a2fd 100644 --- a/source/common/event/dispatcher_impl.h +++ b/source/common/event/dispatcher_impl.h @@ -20,9 +20,16 @@ #include "common/event/libevent_scheduler.h" #include "common/signal/fatal_error_handler.h" +#include "absl/container/inlined_vector.h" + namespace Envoy { namespace Event { +// The tracked object stack likely won't grow larger than this initial +// reservation; this should make appends constant time since the stack +// shouldn't have to grow larger. +inline constexpr size_t ExpectedMaxTrackedObjectStackDepth = 10; + /** * libevent implementation of Event::Dispatcher. */ @@ -72,27 +79,17 @@ class DispatcherImpl : Logger::Loggable, void exit() override; SignalEventPtr listenForSignal(signal_t signal_num, SignalCb cb) override; void post(std::function callback) override; + void deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) override; void run(RunType type) override; Buffer::WatermarkFactory& getWatermarkFactory() override { return *buffer_factory_; } - const ScopeTrackedObject* setTrackedObject(const ScopeTrackedObject* object) override { - const ScopeTrackedObject* return_object = current_object_; - current_object_ = object; - return return_object; - } + void pushTrackedObject(const ScopeTrackedObject* object) override; + void popTrackedObject(const ScopeTrackedObject* expected_object) override; MonotonicTime approximateMonotonicTime() const override; void updateApproximateMonotonicTime() override; + void shutdown() override; // FatalErrorInterface - void onFatalError(std::ostream& os) const override { - // Dump the state of the tracked object if it is in the current thread. This generally results - // in dumping the active state only for the thread which caused the fatal error. - if (isThreadSafe()) { - if (current_object_) { - current_object_->dumpState(os); - } - } - } - + void onFatalError(std::ostream& os) const override; void runFatalActionsOnTrackedObject(const FatalAction::FatalActionPtrList& actions) const override; @@ -125,6 +122,8 @@ class DispatcherImpl : Logger::Loggable, TimerPtr createTimerInternal(TimerCb cb); void updateApproximateMonotonicTimeInternal(); void runPostCallbacks(); + void runThreadLocalDelete(); + // Helper used to touch the watchdog after most schedulable, fd, and timer callbacks. void touchWatchdog(); @@ -143,14 +142,26 @@ class DispatcherImpl : Logger::Loggable, Buffer::WatermarkFactorySharedPtr buffer_factory_; LibeventScheduler base_scheduler_; SchedulerPtr scheduler_; + + SchedulableCallbackPtr thread_local_delete_cb_; + Thread::MutexBasicLockable thread_local_deletable_lock_; + // `deletables_in_dispatcher_thread` must be destroyed last to allow other callbacks populate. + std::list + deletables_in_dispatcher_thread_ ABSL_GUARDED_BY(thread_local_deletable_lock_); + bool shutdown_called_{false}; + SchedulableCallbackPtr deferred_delete_cb_; + SchedulableCallbackPtr post_cb_; + Thread::MutexBasicLockable post_lock_; + std::list> post_callbacks_ ABSL_GUARDED_BY(post_lock_); + std::vector to_delete_1_; std::vector to_delete_2_; std::vector* current_to_delete_; - Thread::MutexBasicLockable post_lock_; - std::list> post_callbacks_ ABSL_GUARDED_BY(post_lock_); - const ScopeTrackedObject* current_object_{}; + + absl::InlinedVector + tracked_object_stack_; bool deferred_deleting_{}; MonotonicTime approximate_monotonic_time_; WatchdogRegistrationPtr watchdog_registration_; diff --git a/source/common/grpc/async_client_impl.cc b/source/common/grpc/async_client_impl.cc index 9128b296ccb7..6d1152ef9f45 100644 --- a/source/common/grpc/async_client_impl.cc +++ b/source/common/grpc/async_client_impl.cc @@ -210,6 +210,7 @@ void AsyncStreamImpl::cleanup() { // This will destroy us, but only do so if we are actually in a list. This does not happen in // the immediate failure case. if (LinkedObject::inserted()) { + ASSERT(dispatcher_->isThreadSafe()); dispatcher_->deferredDelete( LinkedObject::removeFromList(parent_.active_streams_)); } diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index 361fdfb173ac..ffe65b5ed62f 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -149,6 +149,7 @@ void AsyncStreamImpl::sendHeaders(RequestHeaderMap& headers, bool end_stream) { } void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) { + ASSERT(dispatcher().isThreadSafe()); // Map send calls after local closure to no-ops. The send call could have been queued prior to // remote reset or closure, and/or closure could have occurred synchronously in response to a // previous send. In these cases the router will have already cleaned up stream state. This @@ -169,6 +170,7 @@ void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) { } void AsyncStreamImpl::sendTrailers(RequestTrailerMap& trailers) { + ASSERT(dispatcher().isThreadSafe()); // See explanation in sendData. if (local_closed_) { return; @@ -226,6 +228,7 @@ void AsyncStreamImpl::reset() { } void AsyncStreamImpl::cleanup() { + ASSERT(dispatcher().isThreadSafe()); local_closed_ = remote_closed_ = true; // This will destroy us, but only do so if we are actually in a list. This does not happen in // the immediate failure case. diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 4ce8ec63dd38..4a7a9260bbac 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -210,6 +210,7 @@ Connection::State ConnectionImpl::state() const { void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent::LocalClose); } void ConnectionImpl::setTransportSocketIsReadable() { + ASSERT(dispatcher_.isThreadSafe()); // Remember that the transport requested read resumption, in case the resumption event is not // scheduled immediately or is "lost" because read was disabled. transport_wants_read_ = true; @@ -300,6 +301,7 @@ void ConnectionImpl::noDelay(bool enable) { } void ConnectionImpl::onRead(uint64_t read_buffer_size) { + ASSERT(dispatcher_.isThreadSafe()); if (inDelayedClose() || !filterChainWantsData()) { return; } @@ -419,6 +421,7 @@ void ConnectionImpl::raiseEvent(ConnectionEvent event) { bool ConnectionImpl::readEnabled() const { // Calls to readEnabled on a closed socket are considered to be an error. ASSERT(state() == State::Open); + ASSERT(dispatcher_.isThreadSafe()); return read_disable_count_ == 0; } @@ -436,6 +439,7 @@ void ConnectionImpl::write(Buffer::Instance& data, bool end_stream) { void ConnectionImpl::write(Buffer::Instance& data, bool end_stream, bool through_filter_chain) { ASSERT(!end_stream || enable_half_close_); + ASSERT(dispatcher_.isThreadSafe()); if (write_end_stream_) { // It is an API violation to write more data after writing end_stream, but a duplicate diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 18025dfca211..4d44e88846fa 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -919,9 +919,16 @@ ClusterImplBase::ClusterImplBase( auto socket_matcher = std::make_unique( cluster.transport_socket_matches(), factory_context, socket_factory, *stats_scope); - info_ = std::make_unique(cluster, factory_context.clusterManager().bindConfig(), - runtime, std::move(socket_matcher), - std::move(stats_scope), added_via_api, factory_context); + auto& dispatcher = factory_context.dispatcher(); + info_ = std::shared_ptr( + new ClusterInfoImpl(cluster, factory_context.clusterManager().bindConfig(), runtime, + std::move(socket_matcher), std::move(stats_scope), added_via_api, + factory_context), + [&dispatcher](const ClusterInfoImpl* self) { + ENVOY_LOG(trace, "Schedule destroy cluster info {}", self->name()); + dispatcher.deleteInDispatcherThread( + std::unique_ptr(self)); + }); if ((info_->features() & ClusterInfoImpl::Features::USE_ALPN) && !raw_factory_pointer->supportsAlpn()) { @@ -1098,7 +1105,7 @@ void ClusterImplBase::reloadHealthyHostsHelper(const HostSharedPtr&) { for (size_t priority = 0; priority < host_sets.size(); ++priority) { const auto& host_set = host_sets[priority]; // TODO(htuch): Can we skip these copies by exporting out const shared_ptr from HostSet? - HostVectorConstSharedPtr hosts_copy(new HostVector(host_set->hosts())); + HostVectorConstSharedPtr hosts_copy = std::make_shared(host_set->hosts()); HostsPerLocalityConstSharedPtr hosts_per_locality_copy = host_set->hostsPerLocality().clone(); prioritySet().updateHosts(priority, @@ -1289,10 +1296,10 @@ void PriorityStateManager::registerHostForPriority( auto metadata = lb_endpoint.has_metadata() ? parent_.constMetadataSharedPool()->getObject(lb_endpoint.metadata()) : nullptr; - const HostSharedPtr host(new HostImpl( + const auto host = std::make_shared( parent_.info(), hostname, address, metadata, lb_endpoint.load_balancing_weight().value(), locality_lb_endpoint.locality(), lb_endpoint.endpoint().health_check_config(), - locality_lb_endpoint.priority(), lb_endpoint.health_status(), time_source)); + locality_lb_endpoint.priority(), lb_endpoint.health_status(), time_source); registerHostForPriority(host, locality_lb_endpoint); } diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index 3ff38c4b770d..7a2401425161 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -517,7 +517,9 @@ class PrioritySetImpl : public PrioritySet { /** * Implementation of ClusterInfo that reads from JSON. */ -class ClusterInfoImpl : public ClusterInfo, protected Logger::Loggable { +class ClusterInfoImpl : public ClusterInfo, + public Event::DispatcherThreadDeletable, + protected Logger::Loggable { public: using HttpProtocolOptionsConfigImpl = Envoy::Extensions::Upstreams::Http::ProtocolOptionsConfigImpl; diff --git a/source/server/config_validation/server.cc b/source/server/config_validation/server.cc index e4130e383ff2..a2bfd270dbb2 100644 --- a/source/server/config_validation/server.cc +++ b/source/server/config_validation/server.cc @@ -116,6 +116,7 @@ void ValidationInstance::shutdown() { config_.clusterManager()->shutdown(); } thread_local_.shutdownThread(); + dispatcher_->shutdown(); } } // namespace Server diff --git a/source/server/server.cc b/source/server/server.cc index 26795a8b2aed..e4757d638654 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -139,6 +139,7 @@ InstanceImpl::~InstanceImpl() { ENVOY_LOG(debug, "destroying listener manager"); listener_manager_.reset(); ENVOY_LOG(debug, "destroyed listener manager"); + dispatcher_->shutdown(); } Upstream::ClusterManager& InstanceImpl::clusterManager() { return *config_.clusterManager(); } diff --git a/source/server/worker_impl.cc b/source/server/worker_impl.cc index 760b7ca630bc..30c519c50109 100644 --- a/source/server/worker_impl.cc +++ b/source/server/worker_impl.cc @@ -134,6 +134,7 @@ void WorkerImpl::threadRoutine(GuardDog& guard_dog) { dispatcher_->run(Event::Dispatcher::RunType::Block); ENVOY_LOG(debug, "worker exited dispatch loop"); guard_dog.stopWatching(watch_dog_); + dispatcher_->shutdown(); // We must close all active connections before we actually exit the thread. This prevents any // destructors from running on the main thread which might reference thread locals. Destroying diff --git a/test/common/common/BUILD b/test/common/common/BUILD index f0e0b68986ca..2a451dd88615 100644 --- a/test/common/common/BUILD +++ b/test/common/common/BUILD @@ -361,3 +361,15 @@ envoy_cc_test( srcs = ["interval_value_test.cc"], deps = ["//source/common/common:interval_value"], ) + +envoy_cc_test( + name = "scope_tracker_test", + srcs = ["scope_tracker_test.cc"], + deps = [ + "//source/common/api:api_lib", + "//source/common/common:scope_tracker", + "//source/common/event:dispatcher_lib", + "//test/mocks:common_lib", + "//test/test_common:utility_lib", + ], +) diff --git a/test/common/common/scope_tracker_test.cc b/test/common/common/scope_tracker_test.cc new file mode 100644 index 000000000000..1c5451660f04 --- /dev/null +++ b/test/common/common/scope_tracker_test.cc @@ -0,0 +1,37 @@ +#include + +#include "common/api/api_impl.h" +#include "common/common/scope_tracker.h" +#include "common/event/dispatcher_impl.h" + +#include "test/mocks/common.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace { + +using testing::_; + +TEST(ScopeTrackerScopeStateTest, ShouldManageTrackedObjectOnDispatcherStack) { + Api::ApiPtr api(Api::createApiForTest()); + Event::DispatcherPtr dispatcher(api->allocateDispatcher("test_thread")); + MockScopedTrackedObject tracked_object; + { + ScopeTrackerScopeState scope(&tracked_object, *dispatcher); + // Check that the tracked_object is on the tracked object stack + dispatcher->popTrackedObject(&tracked_object); + + // Restore it to the top, it should be removed in the dtor of scope. + dispatcher->pushTrackedObject(&tracked_object); + } + + // Check nothing is tracked now. + EXPECT_CALL(tracked_object, dumpState(_, _)).Times(0); + static_cast(dispatcher.get())->onFatalError(std::cerr); +} + +} // namespace +} // namespace Envoy diff --git a/test/common/event/dispatcher_impl_test.cc b/test/common/event/dispatcher_impl_test.cc index 8c612144db50..2bca020fb4b4 100644 --- a/test/common/event/dispatcher_impl_test.cc +++ b/test/common/event/dispatcher_impl_test.cc @@ -1,10 +1,13 @@ #include +#include "envoy/common/scope_tracker.h" #include "envoy/thread/thread.h" #include "common/api/api_impl.h" #include "common/api/os_sys_calls_impl.h" #include "common/common/lock_guard.h" +#include "common/common/scope_tracker.h" +#include "common/common/utility.h" #include "common/event/deferred_task.h" #include "common/event/dispatcher_impl.h" #include "common/event/timer_impl.h" @@ -234,6 +237,15 @@ class TestDeferredDeletable : public DeferredDeletable { std::function on_destroy_; }; +class TestDispatcherThreadDeletable : public DispatcherThreadDeletable { +public: + TestDispatcherThreadDeletable(std::function on_destroy) : on_destroy_(on_destroy) {} + ~TestDispatcherThreadDeletable() override { on_destroy_(); } + +private: + std::function on_destroy_; +}; + TEST(DeferredDeleteTest, DeferredDelete) { InSequence s; Api::ApiPtr api = Api::createApiForTest(); @@ -476,6 +488,100 @@ TEST_F(DispatcherImplTest, RunPostCallbacksLocking) { } } +TEST_F(DispatcherImplTest, DispatcherThreadDeleted) { + dispatcher_->deleteInDispatcherThread(std::make_unique( + [this, id = api_->threadFactory().currentThreadId()]() { + ASSERT(id != api_->threadFactory().currentThreadId()); + { + Thread::LockGuard lock(mu_); + ASSERT(!work_finished_); + work_finished_ = true; + } + cv_.notifyOne(); + })); + + Thread::LockGuard lock(mu_); + while (!work_finished_) { + cv_.wait(mu_); + } +} + +TEST(DispatcherThreadDeletedImplTest, DispatcherThreadDeletedAtNextCycle) { + Api::ApiPtr api_(Api::createApiForTest()); + DispatcherPtr dispatcher(api_->allocateDispatcher("test_thread")); + std::vector> watchers; + watchers.reserve(3); + for (int i = 0; i < 3; ++i) { + watchers.push_back(std::make_unique()); + } + dispatcher->deleteInDispatcherThread( + std::make_unique([&watchers]() { watchers[0]->ready(); })); + EXPECT_CALL(*watchers[0], ready()); + dispatcher->run(Event::Dispatcher::RunType::NonBlock); + dispatcher->deleteInDispatcherThread( + std::make_unique([&watchers]() { watchers[1]->ready(); })); + dispatcher->deleteInDispatcherThread( + std::make_unique([&watchers]() { watchers[2]->ready(); })); + EXPECT_CALL(*watchers[1], ready()); + EXPECT_CALL(*watchers[2], ready()); + dispatcher->run(Event::Dispatcher::RunType::NonBlock); +} + +class DispatcherShutdownTest : public testing::Test { +protected: + DispatcherShutdownTest() + : api_(Api::createApiForTest()), dispatcher_(api_->allocateDispatcher("test_thread")) {} + + Api::ApiPtr api_; + DispatcherPtr dispatcher_; +}; + +TEST_F(DispatcherShutdownTest, ShutdownClearThreadLocalDeletables) { + ReadyWatcher watcher; + + dispatcher_->deleteInDispatcherThread( + std::make_unique([&watcher]() { watcher.ready(); })); + EXPECT_CALL(watcher, ready()); + dispatcher_->shutdown(); +} + +TEST_F(DispatcherShutdownTest, ShutdownDoesnotClearDeferredListOrPostCallback) { + ReadyWatcher watcher; + ReadyWatcher deferred_watcher; + ReadyWatcher post_watcher; + + { + InSequence s; + + dispatcher_->deferredDelete(std::make_unique( + [&deferred_watcher]() { deferred_watcher.ready(); })); + dispatcher_->post([&post_watcher]() { post_watcher.ready(); }); + dispatcher_->deleteInDispatcherThread( + std::make_unique([&watcher]() { watcher.ready(); })); + EXPECT_CALL(watcher, ready()); + dispatcher_->shutdown(); + + ::testing::Mock::VerifyAndClearExpectations(&watcher); + EXPECT_CALL(deferred_watcher, ready()); + dispatcher_.reset(); + } +} + +TEST_F(DispatcherShutdownTest, DestroyClearAllList) { + ReadyWatcher watcher; + ReadyWatcher deferred_watcher; + dispatcher_->deferredDelete( + std::make_unique([&deferred_watcher]() { deferred_watcher.ready(); })); + dispatcher_->deleteInDispatcherThread( + std::make_unique([&watcher]() { watcher.ready(); })); + { + InSequence s; + EXPECT_CALL(deferred_watcher, ready()); + EXPECT_CALL(watcher, ready()); + dispatcher_.reset(); + } +} + TEST_F(DispatcherImplTest, Timer) { timerTest([](Timer& timer) { timer.enableTimer(std::chrono::milliseconds(0)); }); timerTest([](Timer& timer) { timer.enableTimer(std::chrono::milliseconds(50)); }); @@ -533,9 +639,71 @@ TEST_F(DispatcherImplTest, IsThreadSafe) { EXPECT_FALSE(dispatcher_->isThreadSafe()); } +TEST_F(DispatcherImplTest, ShouldDumpNothingIfNoTrackedObjects) { + std::array buffer; + OutputBufferStream ostream{buffer.data(), buffer.size()}; + + // Call on FatalError to trigger dumps of tracked objects. + dispatcher_->post([this, &ostream]() { + Thread::LockGuard lock(mu_); + static_cast(dispatcher_.get())->onFatalError(ostream); + work_finished_ = true; + cv_.notifyOne(); + }); + + Thread::LockGuard lock(mu_); + while (!work_finished_) { + cv_.wait(mu_); + } + + // Check ostream still empty. + EXPECT_EQ(ostream.contents(), ""); +} + +class MessageTrackedObject : public ScopeTrackedObject { +public: + MessageTrackedObject(absl::string_view sv) : sv_(sv) {} + void dumpState(std::ostream& os, int /*indent_level*/) const override { os << sv_; } + +private: + absl::string_view sv_; +}; + +TEST_F(DispatcherImplTest, ShouldDumpTrackedObjectsInFILO) { + std::array buffer; + OutputBufferStream ostream{buffer.data(), buffer.size()}; + + // Call on FatalError to trigger dumps of tracked objects. + dispatcher_->post([this, &ostream]() { + Thread::LockGuard lock(mu_); + + // Add several tracked objects to the dispatcher + MessageTrackedObject first{"first"}; + ScopeTrackerScopeState first_state{&first, *dispatcher_}; + MessageTrackedObject second{"second"}; + ScopeTrackerScopeState second_state{&second, *dispatcher_}; + MessageTrackedObject third{"third"}; + ScopeTrackerScopeState third_state{&third, *dispatcher_}; + + static_cast(dispatcher_.get())->onFatalError(ostream); + work_finished_ = true; + cv_.notifyOne(); + }); + + Thread::LockGuard lock(mu_); + while (!work_finished_) { + cv_.wait(mu_); + } + + // Check the dump includes and registered objects in a FILO order. + EXPECT_EQ(ostream.contents(), "thirdsecondfirst"); +} + class TestFatalAction : public Server::Configuration::FatalAction { public: - void run(const ScopeTrackedObject* /*current_object*/) override { ++times_ran_; } + void run(absl::Span /*tracked_objects*/) override { + ++times_ran_; + } bool isAsyncSignalSafe() const override { return true; } int getNumTimesRan() { return times_ran_; } diff --git a/test/common/event/scaled_range_timer_manager_impl_test.cc b/test/common/event/scaled_range_timer_manager_impl_test.cc index c6f7476c8af5..29e6a19aa529 100644 --- a/test/common/event/scaled_range_timer_manager_impl_test.cc +++ b/test/common/event/scaled_range_timer_manager_impl_test.cc @@ -1,5 +1,6 @@ #include +#include "envoy/common/scope_tracker.h" #include "envoy/event/timer.h" #include "common/event/dispatcher_impl.h" @@ -24,9 +25,14 @@ class ScopeTrackingDispatcher : public WrappedDispatcher { ScopeTrackingDispatcher(DispatcherPtr dispatcher) : WrappedDispatcher(*dispatcher), dispatcher_(std::move(dispatcher)) {} - const ScopeTrackedObject* setTrackedObject(const ScopeTrackedObject* object) override { + void pushTrackedObject(const ScopeTrackedObject* object) override { scope_ = object; - return impl_.setTrackedObject(object); + return impl_.pushTrackedObject(object); + } + + void popTrackedObject(const ScopeTrackedObject* expected_object) override { + scope_ = nullptr; + return impl_.popTrackedObject(expected_object); } const ScopeTrackedObject* scope_{nullptr}; diff --git a/test/common/http/conn_manager_impl_test.cc b/test/common/http/conn_manager_impl_test.cc index cd9944e0d7cc..04df5b2070e6 100644 --- a/test/common/http/conn_manager_impl_test.cc +++ b/test/common/http/conn_manager_impl_test.cc @@ -2656,7 +2656,8 @@ TEST_F(HttpConnectionManagerImplTest, RequestTimeoutCallbackDisarmsAndReturns408 EXPECT_CALL(response_encoder_, encodeData(_, true)).WillOnce(AddBufferToString(&response_body)); conn_manager_->newStream(response_encoder_); - EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, popTrackedObject(_)); request_timer->invokeCallback(); return Http::okStatus(); })); @@ -2886,7 +2887,8 @@ TEST_F(HttpConnectionManagerImplTest, RequestHeaderTimeoutCallbackDisarmsAndRetu EXPECT_CALL(*request_header_timer, enableTimer(request_headers_timeout_, _)); conn_manager_->newStream(response_encoder_); - EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, popTrackedObject(_)); return Http::okStatus(); })); diff --git a/test/common/http/conn_manager_impl_test_2.cc b/test/common/http/conn_manager_impl_test_2.cc index 76495244f508..b7dfd0298ab9 100644 --- a/test/common/http/conn_manager_impl_test_2.cc +++ b/test/common/http/conn_manager_impl_test_2.cc @@ -2348,9 +2348,10 @@ TEST_F(HttpConnectionManagerImplTest, TestSessionTrace) { { RequestHeaderMapPtr headers{ new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "POST"}}}; - EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, setTrackedObject(_)) - .Times(2) - .WillOnce(Invoke([](const ScopeTrackedObject* object) -> const ScopeTrackedObject* { + + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, pushTrackedObject(_)) + .Times(1) + .WillOnce(Invoke([](const ScopeTrackedObject* object) -> void { ASSERT(object != nullptr); // On the first call, this should be the active stream. std::stringstream out; object->dumpState(out); @@ -2358,9 +2359,8 @@ TEST_F(HttpConnectionManagerImplTest, TestSessionTrace) { EXPECT_THAT(state, testing::HasSubstr("filter_manager_callbacks_.requestHeaders(): null")); EXPECT_THAT(state, testing::HasSubstr("protocol_: 1")); - return nullptr; - })) - .WillRepeatedly(Return(nullptr)); + })); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, popTrackedObject(_)); EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false)) .WillOnce(Invoke([](HeaderMap&, bool) -> FilterHeadersStatus { return FilterHeadersStatus::StopIteration; @@ -2371,9 +2371,9 @@ TEST_F(HttpConnectionManagerImplTest, TestSessionTrace) { // Send trailers to that stream, and verify by this point headers are in logged state. { RequestTrailerMapPtr trailers{new TestRequestTrailerMapImpl{{"foo", "bar"}}}; - EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, setTrackedObject(_)) - .Times(2) - .WillOnce(Invoke([](const ScopeTrackedObject* object) -> const ScopeTrackedObject* { + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, pushTrackedObject(_)) + .Times(1) + .WillOnce(Invoke([](const ScopeTrackedObject* object) -> void { ASSERT(object != nullptr); // On the first call, this should be the active stream. std::stringstream out; object->dumpState(out); @@ -2381,9 +2381,8 @@ TEST_F(HttpConnectionManagerImplTest, TestSessionTrace) { EXPECT_THAT(state, testing::HasSubstr("filter_manager_callbacks_.requestHeaders(): \n")); EXPECT_THAT(state, testing::HasSubstr("':authority', 'host'\n")); EXPECT_THAT(state, testing::HasSubstr("protocol_: 1")); - return nullptr; - })) - .WillRepeatedly(Return(nullptr)); + })); + EXPECT_CALL(filter_callbacks_.connection_.dispatcher_, popTrackedObject(_)); EXPECT_CALL(*decoder_filters_[0], decodeComplete()); EXPECT_CALL(*decoder_filters_[0], decodeTrailers(_)) .WillOnce(Return(FilterTrailersStatus::StopIteration)); diff --git a/test/common/http/filter_manager_test.cc b/test/common/http/filter_manager_test.cc index 85d755e864cb..0328cd4a0425 100644 --- a/test/common/http/filter_manager_test.cc +++ b/test/common/http/filter_manager_test.cc @@ -198,7 +198,8 @@ TEST_F(FilterManagerTest, MatchTreeSkipActionDecodingHeaders) { TEST_F(FilterManagerTest, MatchTreeSkipActionRequestAndResponseHeaders) { initialize(); - EXPECT_CALL(dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(dispatcher_, popTrackedObject(_)); // This stream filter will skip further callbacks once it sees both the request and response // header. As such, it should see the decoding callbacks but none of the encoding callbacks. @@ -251,4 +252,4 @@ TEST_F(FilterManagerTest, MatchTreeSkipActionRequestAndResponseHeaders) { } } // namespace } // namespace Http -} // namespace Envoy \ No newline at end of file +} // namespace Envoy diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index 63361228acbe..cfbccc6c4b8d 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -1958,6 +1958,7 @@ class FakeReadFilter : public Network::ReadFilter { class MockTransportConnectionImplTest : public testing::Test { public: MockTransportConnectionImplTest() : stream_info_(dispatcher_.timeSource(), nullptr) { + EXPECT_CALL(dispatcher_, isThreadSafe()).WillRepeatedly(Return(true)); EXPECT_CALL(dispatcher_.buffer_factory_, create_(_, _, _)) .WillRepeatedly(Invoke([](std::function below_low, std::function above_high, std::function above_overflow) -> Buffer::Instance* { @@ -1978,7 +1979,8 @@ class MockTransportConnectionImplTest : public testing::Test { TransportSocketPtr(transport_socket_), stream_info_, true); connection_->addConnectionCallbacks(callbacks_); // File events will trigger setTrackedObject on the dispatcher. - EXPECT_CALL(dispatcher_, setTrackedObject(_)).WillRepeatedly(Return(nullptr)); + EXPECT_CALL(dispatcher_, pushTrackedObject(_)).Times(AnyNumber()); + EXPECT_CALL(dispatcher_, popTrackedObject(_)).Times(AnyNumber()); } ~MockTransportConnectionImplTest() override { connection_->close(ConnectionCloseType::NoFlush); } diff --git a/test/common/router/router_test.cc b/test/common/router/router_test.cc index a540901ec284..26525ebf3536 100644 --- a/test/common/router/router_test.cc +++ b/test/common/router/router_test.cc @@ -116,8 +116,9 @@ class RouterTestBase : public testing::Test { // Make the "system time" non-zero, because 0 is considered invalid by DateUtil. test_time_.setMonotonicTime(std::chrono::milliseconds(50)); - // Allow any number of setTrackedObject calls for the dispatcher strict mock. - EXPECT_CALL(callbacks_.dispatcher_, setTrackedObject(_)).Times(AnyNumber()); + // Allow any number of (append|pop)TrackedObject calls for the dispatcher strict mock. + EXPECT_CALL(callbacks_.dispatcher_, pushTrackedObject(_)).Times(AnyNumber()); + EXPECT_CALL(callbacks_.dispatcher_, popTrackedObject(_)).Times(AnyNumber()); } void expectResponseTimerCreate() { @@ -294,7 +295,8 @@ class RouterTestBase : public testing::Test { [&](Http::ResponseDecoder& decoder, Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* { response_decoder_ = &decoder; - EXPECT_CALL(callbacks_.dispatcher_, setTrackedObject(_)).Times(testing::AtLeast(2)); + EXPECT_CALL(callbacks_.dispatcher_, pushTrackedObject(_)).Times(testing::AtLeast(1)); + EXPECT_CALL(callbacks_.dispatcher_, popTrackedObject(_)).Times(testing::AtLeast(1)); callbacks.onPoolReady(original_encoder_, cm_.thread_local_cluster_.conn_pool_.host_, upstream_stream_info_, Http::Protocol::Http10); return nullptr; @@ -2262,7 +2264,8 @@ TEST_F(RouterTest, GrpcOk) { EXPECT_EQ(1U, callbacks_.route_->route_entry_.virtual_cluster_.stats().upstream_rq_total_.value()); - EXPECT_CALL(callbacks_.dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(callbacks_.dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(callbacks_.dispatcher_, popTrackedObject(_)); Http::ResponseHeaderMapPtr response_headers( new Http::TestResponseHeaderMapImpl{{":status", "200"}}); EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_.host_->outlier_detector_, @@ -2270,7 +2273,8 @@ TEST_F(RouterTest, GrpcOk) { response_decoder->decodeHeaders(std::move(response_headers), false); EXPECT_TRUE(verifyHostUpstreamStats(0, 0)); - EXPECT_CALL(callbacks_.dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(callbacks_.dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(callbacks_.dispatcher_, popTrackedObject(_)); Http::ResponseTrailerMapPtr response_trailers( new Http::TestResponseTrailerMapImpl{{"grpc-status", "0"}}); response_decoder->decodeTrailers(std::move(response_trailers)); diff --git a/test/common/router/router_upstream_log_test.cc b/test/common/router/router_upstream_log_test.cc index 7821291c80a5..8b66f739140a 100644 --- a/test/common/router/router_upstream_log_test.cc +++ b/test/common/router/router_upstream_log_test.cc @@ -99,7 +99,8 @@ class RouterUpstreamLogTest : public testing::Test { ShadowWriterPtr(new MockShadowWriter()), router_proto); router_ = std::make_shared(*config_); router_->setDecoderFilterCallbacks(callbacks_); - EXPECT_CALL(callbacks_.dispatcher_, setTrackedObject(_)).Times(testing::AnyNumber()); + EXPECT_CALL(callbacks_.dispatcher_, pushTrackedObject(_)).Times(testing::AnyNumber()); + EXPECT_CALL(callbacks_.dispatcher_, popTrackedObject(_)).Times(testing::AnyNumber()); upstream_locality_.set_zone("to_az"); context_.cluster_manager_.initializeThreadLocalClusters({"fake_cluster"}); diff --git a/test/common/signal/fatal_action_test.cc b/test/common/signal/fatal_action_test.cc index 9a286c9f9fb1..1e276f016e4a 100644 --- a/test/common/signal/fatal_action_test.cc +++ b/test/common/signal/fatal_action_test.cc @@ -1,3 +1,6 @@ +#include + +#include "envoy/common/scope_tracker.h" #include "envoy/server/fatal_action_config.h" #include "common/signal/fatal_action.h" @@ -23,9 +26,10 @@ class TestFatalErrorHandler : public FatalErrorHandlerInterface { void onFatalError(std::ostream& /*os*/) const override {} void runFatalActionsOnTrackedObject(const FatalAction::FatalActionPtrList& actions) const override { - // Call the Fatal Actions with nullptr + // Call the Fatal Actions with a non-empty vector so it runs the action. + std::vector tracked_objects{nullptr}; for (const Server::Configuration::FatalActionPtr& action : actions) { - action->run(nullptr); + action->run(tracked_objects); } } }; @@ -33,7 +37,9 @@ class TestFatalErrorHandler : public FatalErrorHandlerInterface { class TestFatalAction : public Server::Configuration::FatalAction { public: TestFatalAction(bool is_safe, int* const counter) : is_safe_(is_safe), counter_(counter) {} - void run(const ScopeTrackedObject* /*current_object*/) override { ++(*counter_); } + void run(absl::Span /*tracked_objects*/) override { + ++(*counter_); + } bool isAsyncSignalSafe() const override { return is_safe_; } private: diff --git a/test/common/signal/signals_test.cc b/test/common/signal/signals_test.cc index 3ecf49f6695b..f2e5ddde8c8d 100644 --- a/test/common/signal/signals_test.cc +++ b/test/common/signal/signals_test.cc @@ -1,6 +1,9 @@ #include #include +#include + +#include "envoy/common/scope_tracker.h" #include "common/signal/fatal_error_handler.h" #include "common/signal/signal_action.h" @@ -28,21 +31,27 @@ extern void resetFatalActionStateForTest(); // Use this test handler instead of a mock, because fatal error handlers must be // signal-safe and a mock might allocate memory. class TestFatalErrorHandler : public FatalErrorHandlerInterface { +public: void onFatalError(std::ostream& os) const override { os << "HERE!"; } void runFatalActionsOnTrackedObject(const FatalAction::FatalActionPtrList& actions) const override { // Run the actions for (const auto& action : actions) { - action->run(nullptr); + action->run(tracked_objects_); } } + +private: + std::vector tracked_objects_{nullptr}; }; // Use this to test fatal actions get called, as well as the order they run. class EchoFatalAction : public Server::Configuration::FatalAction { public: EchoFatalAction(absl::string_view echo_msg) : echo_msg_(echo_msg) {} - void run(const ScopeTrackedObject* /*current_object*/) override { std::cerr << echo_msg_; } + void run(absl::Span /*tracked_objects*/) override { + std::cerr << echo_msg_; + } bool isAsyncSignalSafe() const override { return true; } private: @@ -52,7 +61,9 @@ class EchoFatalAction : public Server::Configuration::FatalAction { // Use this to test failing while in a signal handler. class SegfaultFatalAction : public Server::Configuration::FatalAction { public: - void run(const ScopeTrackedObject* /*current_object*/) override { raise(SIGSEGV); } + void run(absl::Span /*tracked_objects*/) override { + raise(SIGSEGV); + } bool isAsyncSignalSafe() const override { return false; } }; diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index df0cfd1aba77..194d41180f59 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -869,8 +869,10 @@ TEST_F(ClusterManagerImplTest, HttpHealthChecker) { createClientConnection_( PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:11001")), _, _, _)) .WillOnce(Return(connection)); + EXPECT_CALL(factory_.dispatcher_, deleteInDispatcherThread(_)); create(parseBootstrapFromV3Yaml(yaml)); factory_.tls_.shutdownThread(); + factory_.dispatcher_.to_delete_.clear(); } TEST_F(ClusterManagerImplTest, UnknownCluster) { diff --git a/test/common/upstream/logical_dns_cluster_test.cc b/test/common/upstream/logical_dns_cluster_test.cc index db1511775067..5e0e1b107aaf 100644 --- a/test/common/upstream/logical_dns_cluster_test.cc +++ b/test/common/upstream/logical_dns_cluster_test.cc @@ -202,11 +202,11 @@ class LogicalDnsClusterTest : public Event::TestUsingSimulatedTime, public testi Network::DnsResolver::ResolveCb dns_callback_; NiceMock tls_; Event::MockTimer* resolve_timer_; - std::shared_ptr cluster_; ReadyWatcher membership_updated_; ReadyWatcher initialized_; NiceMock runtime_; NiceMock dispatcher_; + std::shared_ptr cluster_; NiceMock local_info_; NiceMock admin_; Singleton::ManagerImpl singleton_manager_{Thread::threadFactoryForTest()}; diff --git a/test/common/upstream/original_dst_cluster_test.cc b/test/common/upstream/original_dst_cluster_test.cc index 26f15b226453..9ecd87ede145 100644 --- a/test/common/upstream/original_dst_cluster_test.cc +++ b/test/common/upstream/original_dst_cluster_test.cc @@ -94,11 +94,11 @@ class OriginalDstClusterTest : public Event::TestUsingSimulatedTime, public test Stats::TestUtil::TestStore stats_store_; Ssl::MockContextManager ssl_context_manager_; + NiceMock dispatcher_; OriginalDstClusterSharedPtr cluster_; ReadyWatcher membership_updated_; ReadyWatcher initialized_; NiceMock runtime_; - NiceMock dispatcher_; Event::MockTimer* cleanup_timer_; NiceMock random_; NiceMock local_info_; diff --git a/test/extensions/filters/http/fault/fault_filter_test.cc b/test/extensions/filters/http/fault/fault_filter_test.cc index 63707e4d96e9..570ef954e94d 100644 --- a/test/extensions/filters/http/fault/fault_filter_test.cc +++ b/test/extensions/filters/http/fault/fault_filter_test.cc @@ -122,15 +122,16 @@ class FaultFilterTest : public testing::Test { const std::string v2_empty_fault_config_yaml = "{}"; - void SetUpTest(const envoy::extensions::filters::http::fault::v3::HTTPFault fault) { + void setUpTest(const envoy::extensions::filters::http::fault::v3::HTTPFault fault) { config_ = std::make_shared(fault, runtime_, "prefix.", stats_, time_system_); filter_ = std::make_unique(config_); filter_->setDecoderFilterCallbacks(decoder_filter_callbacks_); filter_->setEncoderFilterCallbacks(encoder_filter_callbacks_); - EXPECT_CALL(decoder_filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(AnyNumber()); + EXPECT_CALL(decoder_filter_callbacks_.dispatcher_, pushTrackedObject(_)).Times(AnyNumber()); + EXPECT_CALL(decoder_filter_callbacks_.dispatcher_, popTrackedObject(_)).Times(AnyNumber()); } - void SetUpTest(const std::string& yaml) { SetUpTest(convertYamlStrToProtoConfig(yaml)); } + void setUpTest(const std::string& yaml) { setUpTest(convertYamlStrToProtoConfig(yaml)); } void expectDelayTimer(uint64_t duration_ms) { timer_ = new Event::MockTimer(&decoder_filter_callbacks_.dispatcher_); @@ -228,7 +229,7 @@ TEST_F(FaultFilterTest, AbortWithHttpStatus) { fault.mutable_abort()->mutable_percentage()->set_denominator( envoy::type::v3::FractionalPercent::HUNDRED); fault.mutable_abort()->set_http_status(429); - SetUpTest(fault); + setUpTest(fault); EXPECT_CALL(runtime_.snapshot_, getInteger("fault.http.max_active_faults", std::numeric_limits::max())) @@ -274,7 +275,7 @@ TEST_F(FaultFilterTest, AbortWithHttpStatus) { } TEST_F(FaultFilterTest, HeaderAbortWithHttpStatus) { - SetUpTest(header_abort_only_yaml); + setUpTest(header_abort_only_yaml); request_headers_.addCopy("x-envoy-fault-abort-request", "429"); @@ -329,7 +330,7 @@ TEST_F(FaultFilterTest, AbortWithGrpcStatus) { fault.mutable_abort()->mutable_percentage()->set_denominator( envoy::type::v3::FractionalPercent::HUNDRED); fault.mutable_abort()->set_grpc_status(5); - SetUpTest(fault); + setUpTest(fault); EXPECT_CALL(runtime_.snapshot_, getInteger("fault.http.max_active_faults", std::numeric_limits::max())) @@ -377,7 +378,7 @@ TEST_F(FaultFilterTest, AbortWithGrpcStatus) { TEST_F(FaultFilterTest, HeaderAbortWithGrpcStatus) { decoder_filter_callbacks_.is_grpc_request_ = true; - SetUpTest(header_abort_only_yaml); + setUpTest(header_abort_only_yaml); request_headers_.addCopy("x-envoy-fault-abort-grpc-request", "5"); @@ -427,7 +428,7 @@ TEST_F(FaultFilterTest, HeaderAbortWithGrpcStatus) { } TEST_F(FaultFilterTest, HeaderAbortWithHttpAndGrpcStatus) { - SetUpTest(header_abort_only_yaml); + setUpTest(header_abort_only_yaml); request_headers_.addCopy("x-envoy-fault-abort-request", "429"); request_headers_.addCopy("x-envoy-fault-abort-grpc-request", "5"); @@ -478,7 +479,7 @@ TEST_F(FaultFilterTest, HeaderAbortWithHttpAndGrpcStatus) { } TEST_F(FaultFilterTest, FixedDelayZeroDuration) { - SetUpTest(fixed_delay_only_yaml); + setUpTest(fixed_delay_only_yaml); // Delay related calls EXPECT_CALL( @@ -506,7 +507,7 @@ TEST_F(FaultFilterTest, FixedDelayZeroDuration) { } TEST_F(FaultFilterTest, Overflow) { - SetUpTest(fixed_delay_only_yaml); + setUpTest(fixed_delay_only_yaml); // Delay related calls EXPECT_CALL( @@ -532,7 +533,7 @@ TEST_F(FaultFilterTest, Overflow) { // Verifies that we don't increment the active_faults gauge when not applying a fault. TEST_F(FaultFilterTest, Passthrough) { envoy::extensions::filters::http::fault::v3::HTTPFault fault; - SetUpTest(fault); + setUpTest(fault); EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, true)); @@ -545,7 +546,7 @@ TEST_F(FaultFilterTest, FixedDelayDeprecatedPercentAndNonZeroDuration) { fault.mutable_delay()->mutable_percentage()->set_denominator( envoy::type::v3::FractionalPercent::HUNDRED); fault.mutable_delay()->mutable_fixed_delay()->set_seconds(5); - SetUpTest(fault); + setUpTest(fault); EXPECT_CALL(runtime_.snapshot_, getInteger("fault.http.max_active_faults", std::numeric_limits::max())) @@ -588,7 +589,7 @@ TEST_F(FaultFilterTest, FixedDelayDeprecatedPercentAndNonZeroDuration) { } TEST_F(FaultFilterTest, DelayForDownstreamCluster) { - SetUpTest(fixed_delay_only_yaml); + setUpTest(fixed_delay_only_yaml); EXPECT_CALL(runtime_.snapshot_, getInteger("fault.http.max_active_faults", std::numeric_limits::max())) @@ -624,7 +625,8 @@ TEST_F(FaultFilterTest, DelayForDownstreamCluster) { EXPECT_CALL(decoder_filter_callbacks_, continueDecoding()); EXPECT_EQ(Http::FilterDataStatus::StopIterationAndWatermark, filter_->decodeData(data_, false)); - EXPECT_CALL(decoder_filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(decoder_filter_callbacks_.dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(decoder_filter_callbacks_.dispatcher_, popTrackedObject(_)); timer_->invokeCallback(); EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers_)); @@ -636,7 +638,7 @@ TEST_F(FaultFilterTest, DelayForDownstreamCluster) { } TEST_F(FaultFilterTest, FixedDelayAndAbortDownstream) { - SetUpTest(fixed_delay_and_abort_yaml); + setUpTest(fixed_delay_and_abort_yaml); EXPECT_CALL(runtime_.snapshot_, getInteger("fault.http.max_active_faults", std::numeric_limits::max())) @@ -702,7 +704,7 @@ TEST_F(FaultFilterTest, FixedDelayAndAbortDownstream) { } TEST_F(FaultFilterTest, FixedDelayAndAbort) { - SetUpTest(fixed_delay_and_abort_yaml); + setUpTest(fixed_delay_and_abort_yaml); EXPECT_CALL(runtime_.snapshot_, getInteger("fault.http.max_active_faults", std::numeric_limits::max())) @@ -758,7 +760,7 @@ TEST_F(FaultFilterTest, FixedDelayAndAbort) { } TEST_F(FaultFilterTest, FixedDelayAndAbortDownstreamNodes) { - SetUpTest(fixed_delay_and_abort_nodes_yaml); + setUpTest(fixed_delay_and_abort_nodes_yaml); EXPECT_CALL(runtime_.snapshot_, getInteger("fault.http.max_active_faults", std::numeric_limits::max())) @@ -812,13 +814,13 @@ TEST_F(FaultFilterTest, FixedDelayAndAbortDownstreamNodes) { } TEST_F(FaultFilterTest, NoDownstreamMatch) { - SetUpTest(fixed_delay_and_abort_nodes_yaml); + setUpTest(fixed_delay_and_abort_nodes_yaml); EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers_, true)); } TEST_F(FaultFilterTest, FixedDelayAndAbortHeaderMatchSuccess) { - SetUpTest(fixed_delay_and_abort_match_headers_yaml); + setUpTest(fixed_delay_and_abort_match_headers_yaml); request_headers_.addCopy("x-foo1", "Bar"); request_headers_.addCopy("x-foo2", "RandomValue"); @@ -875,7 +877,7 @@ TEST_F(FaultFilterTest, FixedDelayAndAbortHeaderMatchSuccess) { } TEST_F(FaultFilterTest, FixedDelayAndAbortHeaderMatchFail) { - SetUpTest(fixed_delay_and_abort_match_headers_yaml); + setUpTest(fixed_delay_and_abort_match_headers_yaml); request_headers_.addCopy("x-foo1", "Bar"); request_headers_.addCopy("x-foo3", "Baz"); @@ -903,7 +905,7 @@ TEST_F(FaultFilterTest, FixedDelayAndAbortHeaderMatchFail) { } TEST_F(FaultFilterTest, TimerResetAfterStreamReset) { - SetUpTest(fixed_delay_only_yaml); + setUpTest(fixed_delay_only_yaml); EXPECT_CALL(runtime_.snapshot_, getInteger("fault.http.max_active_faults", std::numeric_limits::max())) @@ -954,7 +956,7 @@ TEST_F(FaultFilterTest, TimerResetAfterStreamReset) { } TEST_F(FaultFilterTest, FaultWithTargetClusterMatchSuccess) { - SetUpTest(delay_with_upstream_cluster_yaml); + setUpTest(delay_with_upstream_cluster_yaml); const std::string upstream_cluster("www1"); EXPECT_CALL(decoder_filter_callbacks_.route_->route_entry_, clusterName()) @@ -999,7 +1001,7 @@ TEST_F(FaultFilterTest, FaultWithTargetClusterMatchSuccess) { } TEST_F(FaultFilterTest, FaultWithTargetClusterMatchFail) { - SetUpTest(delay_with_upstream_cluster_yaml); + setUpTest(delay_with_upstream_cluster_yaml); const std::string upstream_cluster("mismatch"); EXPECT_CALL(decoder_filter_callbacks_.route_->route_entry_, clusterName()) @@ -1027,7 +1029,7 @@ TEST_F(FaultFilterTest, FaultWithTargetClusterMatchFail) { } TEST_F(FaultFilterTest, FaultWithTargetClusterNullRoute) { - SetUpTest(delay_with_upstream_cluster_yaml); + setUpTest(delay_with_upstream_cluster_yaml); const std::string upstream_cluster("www1"); EXPECT_CALL(*decoder_filter_callbacks_.route_, routeEntry()).WillRepeatedly(Return(nullptr)); @@ -1108,7 +1110,7 @@ TEST_F(FaultFilterTest, RouteFaultOverridesListenerFault) { // route-level fault overrides listener-level fault { - SetUpTest(v2_empty_fault_config_yaml); // This is a valid listener level fault + setUpTest(v2_empty_fault_config_yaml); // This is a valid listener level fault TestPerFilterConfigFault(&delay_fault, nullptr); } @@ -1116,7 +1118,7 @@ TEST_F(FaultFilterTest, RouteFaultOverridesListenerFault) { { config_->stats().aborts_injected_.reset(); config_->stats().delays_injected_.reset(); - SetUpTest(v2_empty_fault_config_yaml); + setUpTest(v2_empty_fault_config_yaml); TestPerFilterConfigFault(nullptr, &delay_fault); } @@ -1124,7 +1126,7 @@ TEST_F(FaultFilterTest, RouteFaultOverridesListenerFault) { { config_->stats().aborts_injected_.reset(); config_->stats().delays_injected_.reset(); - SetUpTest(v2_empty_fault_config_yaml); + setUpTest(v2_empty_fault_config_yaml); TestPerFilterConfigFault(&delay_fault, &abort_fault); } } @@ -1135,7 +1137,7 @@ class FaultFilterRateLimitTest : public FaultFilterTest { envoy::extensions::filters::http::fault::v3::HTTPFault fault; fault.mutable_response_rate_limit()->mutable_fixed_limit()->set_limit_kbps(1); fault.mutable_response_rate_limit()->mutable_percentage()->set_numerator(100); - SetUpTest(fault); + setUpTest(fault); EXPECT_CALL( runtime_.snapshot_, diff --git a/test/extensions/filters/http/squash/squash_filter_test.cc b/test/extensions/filters/http/squash/squash_filter_test.cc index 5cb81564c4be..260ba53ca2a1 100644 --- a/test/extensions/filters/http/squash/squash_filter_test.cc +++ b/test/extensions/filters/http/squash/squash_filter_test.cc @@ -2,6 +2,7 @@ #include #include +#include "envoy/common/scope_tracker.h" #include "envoy/extensions/filters/http/squash/v3/squash.pb.h" #include "common/http/message_impl.h" @@ -328,7 +329,8 @@ TEST_F(SquashFilterTest, Timeout) { EXPECT_CALL(request_, cancel()); EXPECT_CALL(filter_callbacks_, continueDecoding()); - EXPECT_CALL(filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(filter_callbacks_.dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(filter_callbacks_.dispatcher_, popTrackedObject(_)); attachmentTimeout_timer_->invokeCallback(); EXPECT_EQ(Envoy::Http::FilterDataStatus::Continue, filter_->decodeData(buffer, false)); @@ -357,7 +359,9 @@ TEST_F(SquashFilterTest, CheckRetryPollingAttachment) { // Expect the second get attachment request expectAsyncClientSend(); - EXPECT_CALL(filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(filter_callbacks_.dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(filter_callbacks_.dispatcher_, popTrackedObject(_)); + retry_timer->invokeCallback(); EXPECT_CALL(filter_callbacks_, continueDecoding()); completeGetStatusRequest("attached"); @@ -377,7 +381,8 @@ TEST_F(SquashFilterTest, PollingAttachmentNoCluster) { // Expect the second get attachment request ON_CALL(factory_context_.cluster_manager_, getThreadLocalCluster("squash")) .WillByDefault(Return(nullptr)); - EXPECT_CALL(filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(filter_callbacks_.dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(filter_callbacks_.dispatcher_, popTrackedObject(_)); EXPECT_CALL(*retry_timer, enableTimer(config_->attachmentPollPeriod(), _)); retry_timer->invokeCallback(); } @@ -395,7 +400,8 @@ TEST_F(SquashFilterTest, CheckRetryPollingAttachmentOnFailure) { // Expect the second get attachment request expectAsyncClientSend(); - EXPECT_CALL(filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(filter_callbacks_.dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(filter_callbacks_.dispatcher_, popTrackedObject(_)); retry_timer->invokeCallback(); EXPECT_CALL(filter_callbacks_, continueDecoding()); @@ -466,7 +472,8 @@ TEST_F(SquashFilterTest, TimerExpiresInline) { attachmentTimeout_timer_->scope_ = scope; attachmentTimeout_timer_->enabled_ = true; // timer expires inline - EXPECT_CALL(filter_callbacks_.dispatcher_, setTrackedObject(_)).Times(2); + EXPECT_CALL(filter_callbacks_.dispatcher_, pushTrackedObject(_)); + EXPECT_CALL(filter_callbacks_.dispatcher_, popTrackedObject(_)); attachmentTimeout_timer_->invokeCallback(); })); diff --git a/test/integration/sds_dynamic_integration_test.cc b/test/integration/sds_dynamic_integration_test.cc index 6023882ab9d6..c1dcb3501a8b 100644 --- a/test/integration/sds_dynamic_integration_test.cc +++ b/test/integration/sds_dynamic_integration_test.cc @@ -673,5 +673,134 @@ TEST_P(SdsDynamicUpstreamIntegrationTest, WrongSecretFirst) { EXPECT_EQ(1, test_server_->counter("sds.client_cert.update_rejected")->value()); } +// Test CDS with SDS. A cluster provided by CDS raises new SDS request for upstream cert. +class SdsCdsIntegrationTest : public SdsDynamicIntegrationBaseTest { +public: + void initialize() override { + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + // Create the dynamic cluster. This cluster will be using sds. + dynamic_cluster_ = bootstrap.mutable_static_resources()->clusters(0); + dynamic_cluster_.set_name("dynamic"); + dynamic_cluster_.mutable_connect_timeout()->MergeFrom( + ProtobufUtil::TimeUtil::MillisecondsToDuration(500000)); + auto* transport_socket = dynamic_cluster_.mutable_transport_socket(); + envoy::extensions::transport_sockets::tls::v3::UpstreamTlsContext tls_context; + auto* secret_config = + tls_context.mutable_common_tls_context()->add_tls_certificate_sds_secret_configs(); + setUpSdsConfig(secret_config, "client_cert"); + + transport_socket->set_name("envoy.transport_sockets.tls"); + transport_socket->mutable_typed_config()->PackFrom(tls_context); + + // Add cds cluster first. + auto* cds_cluster = bootstrap.mutable_static_resources()->add_clusters(); + cds_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); + cds_cluster->set_name("cds_cluster"); + ConfigHelper::setHttp2(*cds_cluster); + // Then add sds cluster. + auto* sds_cluster = bootstrap.mutable_static_resources()->add_clusters(); + sds_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); + sds_cluster->set_name("sds_cluster"); + ConfigHelper::setHttp2(*sds_cluster); + + const std::string cds_yaml = R"EOF( + resource_api_version: V3 + api_config_source: + api_type: GRPC + transport_api_version: V3 + grpc_services: + envoy_grpc: + cluster_name: cds_cluster + set_node_on_first_message_only: true +)EOF"; + auto* cds = bootstrap.mutable_dynamic_resources()->mutable_cds_config(); + TestUtility::loadFromYaml(cds_yaml, *cds); + }); + + HttpIntegrationTest::initialize(); + } + + void TearDown() override { + { + AssertionResult result = sds_connection_->close(); + RELEASE_ASSERT(result, result.message()); + result = sds_connection_->waitForDisconnect(); + RELEASE_ASSERT(result, result.message()); + sds_connection_.reset(); + } + cleanUpXdsConnection(); + cleanupUpstreamAndDownstream(); + codec_client_.reset(); + test_server_.reset(); + fake_upstreams_.clear(); + } + + void createUpstreams() override { + // Static cluster. + addFakeUpstream(FakeHttpConnection::Type::HTTP1); + // Cds Cluster. + addFakeUpstream(FakeHttpConnection::Type::HTTP2); + // Sds Cluster. + addFakeUpstream(FakeHttpConnection::Type::HTTP2); + } + + void sendCdsResponse() { + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "", {}, {}, {}, true)); + sendDiscoveryResponse( + Config::TypeUrl::get().Cluster, {dynamic_cluster_}, {dynamic_cluster_}, {}, "55"); + } + + void sendSdsResponse2(const envoy::extensions::transport_sockets::tls::v3::Secret& secret, + FakeStream& sds_stream) { + API_NO_BOOST(envoy::api::v2::DiscoveryResponse) discovery_response; + discovery_response.set_version_info("1"); + discovery_response.set_type_url(Config::TypeUrl::get().Secret); + discovery_response.add_resources()->PackFrom(secret); + sds_stream.sendGrpcMessage(discovery_response); + } + envoy::config::cluster::v3::Cluster dynamic_cluster_; + FakeHttpConnectionPtr sds_connection_; + FakeStreamPtr sds_stream_; +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, SdsCdsIntegrationTest, GRPC_CLIENT_INTEGRATION_PARAMS); + +TEST_P(SdsCdsIntegrationTest, BasicSuccess) { + on_server_init_function_ = [this]() { + { + // CDS. + AssertionResult result = + fake_upstreams_[1]->waitForHttpConnection(*dispatcher_, xds_connection_); + RELEASE_ASSERT(result, result.message()); + result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + RELEASE_ASSERT(result, result.message()); + xds_stream_->startGrpcStream(); + sendCdsResponse(); + } + { + // SDS. + AssertionResult result = + fake_upstreams_[2]->waitForHttpConnection(*dispatcher_, sds_connection_); + RELEASE_ASSERT(result, result.message()); + + result = sds_connection_->waitForNewStream(*dispatcher_, sds_stream_); + RELEASE_ASSERT(result, result.message()); + sds_stream_->startGrpcStream(); + sendSdsResponse2(getClientSecret(), *sds_stream_); + } + }; + initialize(); + + test_server_->waitForCounterGe( + "cluster.dynamic.client_ssl_socket_factory.ssl_context_update_by_sds", 1); + // The 4 clusters are CDS,SDS,static and dynamic cluster. + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 4); + + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, {}, {}, + {}, "42"); + // Successfully removed the dynamic cluster. + test_server_->waitForGaugeEq("cluster_manager.active_clusters", 3); +} + } // namespace Ssl } // namespace Envoy diff --git a/test/mocks/event/mocks.cc b/test/mocks/event/mocks.cc index a8db4995abb3..06d39c6e020a 100644 --- a/test/mocks/event/mocks.cc +++ b/test/mocks/event/mocks.cc @@ -31,6 +31,7 @@ MockDispatcher::MockDispatcher(const std::string& name) : name_(name) { std::function above_overflow) -> Buffer::Instance* { return new Buffer::WatermarkBuffer(below_low, above_high, above_overflow); })); + ON_CALL(*this, isThreadSafe()).WillByDefault(Return(true)); } MockDispatcher::~MockDispatcher() = default; diff --git a/test/mocks/event/mocks.h b/test/mocks/event/mocks.h index 8e29e84c3b32..87627c6d5ce9 100644 --- a/test/mocks/event/mocks.h +++ b/test/mocks/event/mocks.h @@ -5,6 +5,7 @@ #include #include +#include "envoy/common/scope_tracker.h" #include "envoy/common/time.h" #include "envoy/event/deferred_deletable.h" #include "envoy/event/dispatcher.h" @@ -129,13 +130,16 @@ class MockDispatcher : public Dispatcher { MOCK_METHOD(void, exit, ()); MOCK_METHOD(SignalEvent*, listenForSignal_, (signal_t signal_num, SignalCb cb)); MOCK_METHOD(void, post, (std::function callback)); + MOCK_METHOD(void, deleteInDispatcherThread, (DispatcherThreadDeletableConstPtr deletable)); MOCK_METHOD(void, run, (RunType type)); - MOCK_METHOD(const ScopeTrackedObject*, setTrackedObject, (const ScopeTrackedObject* object)); + MOCK_METHOD(void, pushTrackedObject, (const ScopeTrackedObject* object)); + MOCK_METHOD(void, popTrackedObject, (const ScopeTrackedObject* expected_object)); MOCK_METHOD(bool, isThreadSafe, (), (const)); Buffer::WatermarkFactory& getWatermarkFactory() override { return buffer_factory_; } MOCK_METHOD(Thread::ThreadId, getCurrentThreadId, ()); MOCK_METHOD(MonotonicTime, approximateMonotonicTime, (), (const)); MOCK_METHOD(void, updateApproximateMonotonicTime, ()); + MOCK_METHOD(void, shutdown, ()); GlobalTimeSystem time_system_; std::list to_delete_; diff --git a/test/mocks/event/wrapped_dispatcher.h b/test/mocks/event/wrapped_dispatcher.h index 974d61b39be2..5036a5514351 100644 --- a/test/mocks/event/wrapped_dispatcher.h +++ b/test/mocks/event/wrapped_dispatcher.h @@ -94,11 +94,19 @@ class WrappedDispatcher : public Dispatcher { void post(std::function callback) override { impl_.post(std::move(callback)); } + void deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) override { + impl_.deleteInDispatcherThread(std::move(deletable)); + } + void run(RunType type) override { impl_.run(type); } Buffer::WatermarkFactory& getWatermarkFactory() override { return impl_.getWatermarkFactory(); } - const ScopeTrackedObject* setTrackedObject(const ScopeTrackedObject* object) override { - return impl_.setTrackedObject(object); + void pushTrackedObject(const ScopeTrackedObject* object) override { + return impl_.pushTrackedObject(object); + } + + void popTrackedObject(const ScopeTrackedObject* expected_object) override { + return impl_.popTrackedObject(expected_object); } MonotonicTime approximateMonotonicTime() const override { @@ -109,6 +117,8 @@ class WrappedDispatcher : public Dispatcher { bool isThreadSafe() const override { return impl_.isThreadSafe(); } + void shutdown() override { impl_.shutdown(); } + protected: Dispatcher& impl_; }; diff --git a/test/mocks/router/router_filter_interface.cc b/test/mocks/router/router_filter_interface.cc index f1fb2ab0518c..9d175ee6cb7a 100644 --- a/test/mocks/router/router_filter_interface.cc +++ b/test/mocks/router/router_filter_interface.cc @@ -18,7 +18,8 @@ MockRouterFilterInterface::MockRouterFilterInterface() ON_CALL(*this, config()).WillByDefault(ReturnRef(config_)); ON_CALL(*this, cluster()).WillByDefault(Return(cluster_info_)); ON_CALL(*this, upstreamRequests()).WillByDefault(ReturnRef(requests_)); - EXPECT_CALL(callbacks_.dispatcher_, setTrackedObject(_)).Times(AnyNumber()); + EXPECT_CALL(callbacks_.dispatcher_, pushTrackedObject(_)).Times(AnyNumber()); + EXPECT_CALL(callbacks_.dispatcher_, popTrackedObject(_)).Times(AnyNumber()); ON_CALL(*this, routeEntry()).WillByDefault(Return(&route_entry_)); ON_CALL(callbacks_, connection()).WillByDefault(Return(&client_connection_)); route_entry_.connect_config_.emplace(RouteEntry::ConnectConfig()); diff --git a/test/server/server_test.cc b/test/server/server_test.cc index 10cc3a019291..087f6d0b7848 100644 --- a/test/server/server_test.cc +++ b/test/server/server_test.cc @@ -1363,7 +1363,7 @@ TEST_P(ServerInstanceImplTest, WithUnknownBootstrapExtensions) { #ifndef WIN32 class SafeFatalAction : public Configuration::FatalAction { public: - void run(const ScopeTrackedObject* /*current_object*/) override { + void run(absl::Span /*tracked_objects*/) override { std::cerr << "Called SafeFatalAction" << std::endl; } @@ -1372,7 +1372,7 @@ class SafeFatalAction : public Configuration::FatalAction { class UnsafeFatalAction : public Configuration::FatalAction { public: - void run(const ScopeTrackedObject* /*current_object*/) override { + void run(absl::Span /*tracked_objects*/) override { std::cerr << "Called UnsafeFatalAction" << std::endl; }