diff --git a/include/envoy/event/BUILD b/include/envoy/event/BUILD index 91d13f4d113c..c79ed56eb85e 100644 --- a/include/envoy/event/BUILD +++ b/include/envoy/event/BUILD @@ -13,11 +13,17 @@ envoy_cc_library( hdrs = ["deferred_deletable.h"], ) +envoy_cc_library( + name = "dispatcher_thread_deletable", + hdrs = ["dispatcher_thread_deletable.h"], +) + envoy_cc_library( name = "dispatcher_interface", hdrs = ["dispatcher.h"], deps = [ ":deferred_deletable", + ":dispatcher_thread_deletable", ":file_event_interface", ":scaled_timer", ":schedulable_cb_interface", diff --git a/include/envoy/event/dispatcher.h b/include/envoy/event/dispatcher.h index 36e34f78bd1a..276de18fdd3c 100644 --- a/include/envoy/event/dispatcher.h +++ b/include/envoy/event/dispatcher.h @@ -8,6 +8,7 @@ #include "envoy/common/scope_tracker.h" #include "envoy/common/time.h" +#include "envoy/event/dispatcher_thread_deletable.h" #include "envoy/event/file_event.h" #include "envoy/event/scaled_timer.h" #include "envoy/event/schedulable_cb.h" @@ -260,6 +261,12 @@ class Dispatcher : public DispatcherBase { */ virtual void post(PostCb callback) PURE; + /** + * Post the deletable to this dispatcher. The deletable objects are guaranteed to be destroyed on + * the dispatcher's thread before dispatcher destroy. This is safe cross thread. + */ + virtual void deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) PURE; + /** * Runs the event loop. This will not return until exit() is called either from within a callback * or from a different thread. @@ -287,6 +294,11 @@ class Dispatcher : public DispatcherBase { * Updates approximate monotonic time to current value. */ virtual void updateApproximateMonotonicTime() PURE; + + /** + * Shutdown the dispatcher by clear dispatcher thread deletable. + */ + virtual void shutdown() PURE; }; using DispatcherPtr = std::unique_ptr; diff --git a/include/envoy/event/dispatcher_thread_deletable.h b/include/envoy/event/dispatcher_thread_deletable.h new file mode 100644 index 000000000000..bf5b1808e0e3 --- /dev/null +++ b/include/envoy/event/dispatcher_thread_deletable.h @@ -0,0 +1,21 @@ +#pragma once + +#include + +namespace Envoy { +namespace Event { + +/** + * If an object derives from this class, it can be passed to the destination dispatcher who + * guarantees to delete it in that dispatcher thread. The common use case is to ensure config + * related objects are deleted in the main thread. + */ +class DispatcherThreadDeletable { +public: + virtual ~DispatcherThreadDeletable() = default; +}; + +using DispatcherThreadDeletableConstPtr = std::unique_ptr; + +} // namespace Event +} // namespace Envoy diff --git a/source/common/event/dispatcher_impl.cc b/source/common/event/dispatcher_impl.cc index f67d7787abc9..ef68c020b190 100644 --- a/source/common/event/dispatcher_impl.cc +++ b/source/common/event/dispatcher_impl.cc @@ -63,6 +63,8 @@ DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api, ? watermark_factory : std::make_shared()), scheduler_(time_system.createScheduler(base_scheduler_, base_scheduler_)), + thread_local_delete_cb_( + base_scheduler_.createSchedulableCallback([this]() -> void { runThreadLocalDelete(); })), deferred_delete_cb_(base_scheduler_.createSchedulableCallback( [this]() -> void { clearDeferredDeleteList(); })), post_cb_(base_scheduler_.createSchedulableCallback([this]() -> void { runPostCallbacks(); })), @@ -74,7 +76,12 @@ DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api, std::bind(&DispatcherImpl::updateApproximateMonotonicTime, this)); } -DispatcherImpl::~DispatcherImpl() { FatalErrorHandler::removeFatalErrorHandler(*this); } +DispatcherImpl::~DispatcherImpl() { + ENVOY_LOG(debug, "destroying dispatcher {}", name_); + FatalErrorHandler::removeFatalErrorHandler(*this); + // TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and enable + // ASSERT(deletable_in_dispatcher_thread_.empty()) +} void DispatcherImpl::registerWatchdog(const Server::WatchDogSharedPtr& watchdog, std::chrono::milliseconds min_touch_interval) { @@ -265,9 +272,23 @@ void DispatcherImpl::post(std::function callback) { } } +void DispatcherImpl::deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) { + bool need_schedule; + { + Thread::LockGuard lock(thread_local_deletable_lock_); + need_schedule = deletables_in_dispatcher_thread_.empty(); + deletables_in_dispatcher_thread_.emplace_back(std::move(deletable)); + // TODO(lambdai): Enable below after https://github.com/envoyproxy/envoy/issues/15072 + // ASSERT(!shutdown_called_, "inserted after shutdown"); + } + + if (need_schedule) { + thread_local_delete_cb_->scheduleCallbackCurrentIteration(); + } +} + void DispatcherImpl::run(RunType type) { run_tid_ = api_.threadFactory().currentThreadId(); - // Flush all post callbacks before we run the event loop. We do this because there are post // callbacks that have to get run before the initial event loop starts running. libevent does // not guarantee that events are run in any particular order. So even if we post() and call @@ -280,12 +301,56 @@ MonotonicTime DispatcherImpl::approximateMonotonicTime() const { return approximate_monotonic_time_; } +void DispatcherImpl::shutdown() { + // TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and loop delete below + // below 3 lists until all lists are empty. The 3 lists are list of deferred delete objects, post + // callbacks and dispatcher thread deletable objects. + ASSERT(isThreadSafe()); + auto deferred_deletables_size = current_to_delete_->size(); + std::list>::size_type post_callbacks_size; + { + Thread::LockGuard lock(post_lock_); + post_callbacks_size = post_callbacks_.size(); + } + + std::list local_deletables; + { + Thread::LockGuard lock(thread_local_deletable_lock_); + local_deletables = std::move(deletables_in_dispatcher_thread_); + } + auto thread_local_deletables_size = local_deletables.size(); + while (!local_deletables.empty()) { + local_deletables.pop_front(); + } + ASSERT(!shutdown_called_); + shutdown_called_ = true; + ENVOY_LOG( + trace, + "{} destroyed {} thread local objects. Peek {} deferred deletables, {} post callbacks. ", + __FUNCTION__, deferred_deletables_size, post_callbacks_size, thread_local_deletables_size); +} + void DispatcherImpl::updateApproximateMonotonicTime() { updateApproximateMonotonicTimeInternal(); } void DispatcherImpl::updateApproximateMonotonicTimeInternal() { approximate_monotonic_time_ = api_.timeSource().monotonicTime(); } +void DispatcherImpl::runThreadLocalDelete() { + std::list to_be_delete; + { + Thread::LockGuard lock(thread_local_deletable_lock_); + to_be_delete = std::move(deletables_in_dispatcher_thread_); + ASSERT(deletables_in_dispatcher_thread_.empty()); + } + while (!to_be_delete.empty()) { + // Touch the watchdog before deleting the objects to avoid spurious watchdog miss events when + // executing complicated destruction. + touchWatchdog(); + // Delete in FIFO order. + to_be_delete.pop_front(); + } +} void DispatcherImpl::runPostCallbacks() { // Clear the deferred delete list before running post callbacks to reduce non-determinism in // callback processing, and more easily detect if a scheduled post callback refers to one of the diff --git a/source/common/event/dispatcher_impl.h b/source/common/event/dispatcher_impl.h index f94107ec79bb..a4eec3790b25 100644 --- a/source/common/event/dispatcher_impl.h +++ b/source/common/event/dispatcher_impl.h @@ -86,12 +86,14 @@ class DispatcherImpl : Logger::Loggable, void exit() override; SignalEventPtr listenForSignal(signal_t signal_num, SignalCb cb) override; void post(std::function callback) override; + void deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) override; void run(RunType type) override; Buffer::WatermarkFactory& getWatermarkFactory() override { return *buffer_factory_; } void pushTrackedObject(const ScopeTrackedObject* object) override; void popTrackedObject(const ScopeTrackedObject* expected_object) override; MonotonicTime approximateMonotonicTime() const override; void updateApproximateMonotonicTime() override; + void shutdown() override; // FatalErrorInterface void onFatalError(std::ostream& os) const override; @@ -127,6 +129,8 @@ class DispatcherImpl : Logger::Loggable, TimerPtr createTimerInternal(TimerCb cb); void updateApproximateMonotonicTimeInternal(); void runPostCallbacks(); + void runThreadLocalDelete(); + // Helper used to touch the watchdog after most schedulable, fd, and timer callbacks. void touchWatchdog(); @@ -145,13 +149,24 @@ class DispatcherImpl : Logger::Loggable, Buffer::WatermarkFactorySharedPtr buffer_factory_; LibeventScheduler base_scheduler_; SchedulerPtr scheduler_; + + SchedulableCallbackPtr thread_local_delete_cb_; + Thread::MutexBasicLockable thread_local_deletable_lock_; + // `deletables_in_dispatcher_thread` must be destroyed last to allow other callbacks populate. + std::list + deletables_in_dispatcher_thread_ ABSL_GUARDED_BY(thread_local_deletable_lock_); + bool shutdown_called_{false}; + SchedulableCallbackPtr deferred_delete_cb_; + SchedulableCallbackPtr post_cb_; + Thread::MutexBasicLockable post_lock_; + std::list> post_callbacks_ ABSL_GUARDED_BY(post_lock_); + std::vector to_delete_1_; std::vector to_delete_2_; std::vector* current_to_delete_; - Thread::MutexBasicLockable post_lock_; - std::list> post_callbacks_ ABSL_GUARDED_BY(post_lock_); + absl::InlinedVector tracked_object_stack_; bool deferred_deleting_{}; diff --git a/source/common/grpc/async_client_impl.cc b/source/common/grpc/async_client_impl.cc index 9128b296ccb7..6d1152ef9f45 100644 --- a/source/common/grpc/async_client_impl.cc +++ b/source/common/grpc/async_client_impl.cc @@ -210,6 +210,7 @@ void AsyncStreamImpl::cleanup() { // This will destroy us, but only do so if we are actually in a list. This does not happen in // the immediate failure case. if (LinkedObject::inserted()) { + ASSERT(dispatcher_->isThreadSafe()); dispatcher_->deferredDelete( LinkedObject::removeFromList(parent_.active_streams_)); } diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index 361fdfb173ac..ffe65b5ed62f 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -149,6 +149,7 @@ void AsyncStreamImpl::sendHeaders(RequestHeaderMap& headers, bool end_stream) { } void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) { + ASSERT(dispatcher().isThreadSafe()); // Map send calls after local closure to no-ops. The send call could have been queued prior to // remote reset or closure, and/or closure could have occurred synchronously in response to a // previous send. In these cases the router will have already cleaned up stream state. This @@ -169,6 +170,7 @@ void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) { } void AsyncStreamImpl::sendTrailers(RequestTrailerMap& trailers) { + ASSERT(dispatcher().isThreadSafe()); // See explanation in sendData. if (local_closed_) { return; @@ -226,6 +228,7 @@ void AsyncStreamImpl::reset() { } void AsyncStreamImpl::cleanup() { + ASSERT(dispatcher().isThreadSafe()); local_closed_ = remote_closed_ = true; // This will destroy us, but only do so if we are actually in a list. This does not happen in // the immediate failure case. diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index eb70612bebe5..8f783dee2d08 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -211,6 +211,7 @@ Connection::State ConnectionImpl::state() const { void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent::LocalClose); } void ConnectionImpl::setTransportSocketIsReadable() { + ASSERT(dispatcher_.isThreadSafe()); // Remember that the transport requested read resumption, in case the resumption event is not // scheduled immediately or is "lost" because read was disabled. transport_wants_read_ = true; @@ -301,6 +302,7 @@ void ConnectionImpl::noDelay(bool enable) { } void ConnectionImpl::onRead(uint64_t read_buffer_size) { + ASSERT(dispatcher_.isThreadSafe()); if (inDelayedClose() || !filterChainWantsData()) { return; } @@ -420,6 +422,7 @@ void ConnectionImpl::raiseEvent(ConnectionEvent event) { bool ConnectionImpl::readEnabled() const { // Calls to readEnabled on a closed socket are considered to be an error. ASSERT(state() == State::Open); + ASSERT(dispatcher_.isThreadSafe()); return read_disable_count_ == 0; } @@ -437,6 +440,7 @@ void ConnectionImpl::write(Buffer::Instance& data, bool end_stream) { void ConnectionImpl::write(Buffer::Instance& data, bool end_stream, bool through_filter_chain) { ASSERT(!end_stream || enable_half_close_); + ASSERT(dispatcher_.isThreadSafe()); if (write_end_stream_) { // It is an API violation to write more data after writing end_stream, but a duplicate diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 411f49ae7f2b..f33849a8bfb3 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -926,9 +926,16 @@ ClusterImplBase::ClusterImplBase( auto socket_matcher = std::make_unique( cluster.transport_socket_matches(), factory_context, socket_factory, *stats_scope); - info_ = std::make_unique(cluster, factory_context.clusterManager().bindConfig(), - runtime, std::move(socket_matcher), - std::move(stats_scope), added_via_api, factory_context); + auto& dispatcher = factory_context.dispatcher(); + info_ = std::shared_ptr( + new ClusterInfoImpl(cluster, factory_context.clusterManager().bindConfig(), runtime, + std::move(socket_matcher), std::move(stats_scope), added_via_api, + factory_context), + [&dispatcher](const ClusterInfoImpl* self) { + ENVOY_LOG(trace, "Schedule destroy cluster info {}", self->name()); + dispatcher.deleteInDispatcherThread( + std::unique_ptr(self)); + }); if ((info_->features() & ClusterInfoImpl::Features::USE_ALPN) && !raw_factory_pointer->supportsAlpn()) { @@ -1120,7 +1127,7 @@ void ClusterImplBase::reloadHealthyHostsHelper(const HostSharedPtr&) { for (size_t priority = 0; priority < host_sets.size(); ++priority) { const auto& host_set = host_sets[priority]; // TODO(htuch): Can we skip these copies by exporting out const shared_ptr from HostSet? - HostVectorConstSharedPtr hosts_copy(new HostVector(host_set->hosts())); + HostVectorConstSharedPtr hosts_copy = std::make_shared(host_set->hosts()); HostsPerLocalityConstSharedPtr hosts_per_locality_copy = host_set->hostsPerLocality().clone(); prioritySet().updateHosts(priority, @@ -1311,10 +1318,10 @@ void PriorityStateManager::registerHostForPriority( auto metadata = lb_endpoint.has_metadata() ? parent_.constMetadataSharedPool()->getObject(lb_endpoint.metadata()) : nullptr; - const HostSharedPtr host(new HostImpl( + const auto host = std::make_shared( parent_.info(), hostname, address, metadata, lb_endpoint.load_balancing_weight().value(), locality_lb_endpoint.locality(), lb_endpoint.endpoint().health_check_config(), - locality_lb_endpoint.priority(), lb_endpoint.health_status(), time_source)); + locality_lb_endpoint.priority(), lb_endpoint.health_status(), time_source); registerHostForPriority(host, locality_lb_endpoint); } diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index 6addfd626b99..2bc595cd5130 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -509,7 +509,9 @@ class PrioritySetImpl : public PrioritySet { /** * Implementation of ClusterInfo that reads from JSON. */ -class ClusterInfoImpl : public ClusterInfo, protected Logger::Loggable { +class ClusterInfoImpl : public ClusterInfo, + public Event::DispatcherThreadDeletable, + protected Logger::Loggable { public: using HttpProtocolOptionsConfigImpl = Envoy::Extensions::Upstreams::Http::ProtocolOptionsConfigImpl; diff --git a/source/server/config_validation/server.cc b/source/server/config_validation/server.cc index 30f125685e02..d6369e83495f 100644 --- a/source/server/config_validation/server.cc +++ b/source/server/config_validation/server.cc @@ -116,6 +116,7 @@ void ValidationInstance::shutdown() { config_.clusterManager()->shutdown(); } thread_local_.shutdownThread(); + dispatcher_->shutdown(); } } // namespace Server diff --git a/source/server/server.cc b/source/server/server.cc index 61e4c366f83f..674c84393fe1 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -139,6 +139,7 @@ InstanceImpl::~InstanceImpl() { ENVOY_LOG(debug, "destroying listener manager"); listener_manager_.reset(); ENVOY_LOG(debug, "destroyed listener manager"); + dispatcher_->shutdown(); } Upstream::ClusterManager& InstanceImpl::clusterManager() { diff --git a/source/server/worker_impl.cc b/source/server/worker_impl.cc index 95c24175321c..aaece18609ce 100644 --- a/source/server/worker_impl.cc +++ b/source/server/worker_impl.cc @@ -135,6 +135,7 @@ void WorkerImpl::threadRoutine(GuardDog& guard_dog) { dispatcher_->run(Event::Dispatcher::RunType::Block); ENVOY_LOG(debug, "worker exited dispatch loop"); guard_dog.stopWatching(watch_dog_); + dispatcher_->shutdown(); // We must close all active connections before we actually exit the thread. This prevents any // destructors from running on the main thread which might reference thread locals. Destroying diff --git a/test/common/event/dispatcher_impl_test.cc b/test/common/event/dispatcher_impl_test.cc index 198a0ef4fd8b..345305a35b63 100644 --- a/test/common/event/dispatcher_impl_test.cc +++ b/test/common/event/dispatcher_impl_test.cc @@ -240,6 +240,15 @@ class TestDeferredDeletable : public DeferredDeletable { std::function on_destroy_; }; +class TestDispatcherThreadDeletable : public DispatcherThreadDeletable { +public: + TestDispatcherThreadDeletable(std::function on_destroy) : on_destroy_(on_destroy) {} + ~TestDispatcherThreadDeletable() override { on_destroy_(); } + +private: + std::function on_destroy_; +}; + TEST(DeferredDeleteTest, DeferredDelete) { InSequence s; Api::ApiPtr api = Api::createApiForTest(); @@ -482,6 +491,100 @@ TEST_F(DispatcherImplTest, RunPostCallbacksLocking) { } } +TEST_F(DispatcherImplTest, DispatcherThreadDeleted) { + dispatcher_->deleteInDispatcherThread(std::make_unique( + [this, id = api_->threadFactory().currentThreadId()]() { + ASSERT(id != api_->threadFactory().currentThreadId()); + { + Thread::LockGuard lock(mu_); + ASSERT(!work_finished_); + work_finished_ = true; + } + cv_.notifyOne(); + })); + + Thread::LockGuard lock(mu_); + while (!work_finished_) { + cv_.wait(mu_); + } +} + +TEST(DispatcherThreadDeletedImplTest, DispatcherThreadDeletedAtNextCycle) { + Api::ApiPtr api_(Api::createApiForTest()); + DispatcherPtr dispatcher(api_->allocateDispatcher("test_thread")); + std::vector> watchers; + watchers.reserve(3); + for (int i = 0; i < 3; ++i) { + watchers.push_back(std::make_unique()); + } + dispatcher->deleteInDispatcherThread( + std::make_unique([&watchers]() { watchers[0]->ready(); })); + EXPECT_CALL(*watchers[0], ready()); + dispatcher->run(Event::Dispatcher::RunType::NonBlock); + dispatcher->deleteInDispatcherThread( + std::make_unique([&watchers]() { watchers[1]->ready(); })); + dispatcher->deleteInDispatcherThread( + std::make_unique([&watchers]() { watchers[2]->ready(); })); + EXPECT_CALL(*watchers[1], ready()); + EXPECT_CALL(*watchers[2], ready()); + dispatcher->run(Event::Dispatcher::RunType::NonBlock); +} + +class DispatcherShutdownTest : public testing::Test { +protected: + DispatcherShutdownTest() + : api_(Api::createApiForTest()), dispatcher_(api_->allocateDispatcher("test_thread")) {} + + Api::ApiPtr api_; + DispatcherPtr dispatcher_; +}; + +TEST_F(DispatcherShutdownTest, ShutdownClearThreadLocalDeletables) { + ReadyWatcher watcher; + + dispatcher_->deleteInDispatcherThread( + std::make_unique([&watcher]() { watcher.ready(); })); + EXPECT_CALL(watcher, ready()); + dispatcher_->shutdown(); +} + +TEST_F(DispatcherShutdownTest, ShutdownDoesnotClearDeferredListOrPostCallback) { + ReadyWatcher watcher; + ReadyWatcher deferred_watcher; + ReadyWatcher post_watcher; + + { + InSequence s; + + dispatcher_->deferredDelete(std::make_unique( + [&deferred_watcher]() { deferred_watcher.ready(); })); + dispatcher_->post([&post_watcher]() { post_watcher.ready(); }); + dispatcher_->deleteInDispatcherThread( + std::make_unique([&watcher]() { watcher.ready(); })); + EXPECT_CALL(watcher, ready()); + dispatcher_->shutdown(); + + ::testing::Mock::VerifyAndClearExpectations(&watcher); + EXPECT_CALL(deferred_watcher, ready()); + dispatcher_.reset(); + } +} + +TEST_F(DispatcherShutdownTest, DestroyClearAllList) { + ReadyWatcher watcher; + ReadyWatcher deferred_watcher; + dispatcher_->deferredDelete( + std::make_unique([&deferred_watcher]() { deferred_watcher.ready(); })); + dispatcher_->deleteInDispatcherThread( + std::make_unique([&watcher]() { watcher.ready(); })); + { + InSequence s; + EXPECT_CALL(deferred_watcher, ready()); + EXPECT_CALL(watcher, ready()); + dispatcher_.reset(); + } +} + TEST_F(DispatcherImplTest, Timer) { timerTest([](Timer& timer) { timer.enableTimer(std::chrono::milliseconds(0)); }); timerTest([](Timer& timer) { timer.enableTimer(std::chrono::milliseconds(50)); }); diff --git a/test/common/network/connection_impl_test.cc b/test/common/network/connection_impl_test.cc index 0ff55ecaf946..dc437bd5d06e 100644 --- a/test/common/network/connection_impl_test.cc +++ b/test/common/network/connection_impl_test.cc @@ -1984,6 +1984,7 @@ class FakeReadFilter : public Network::ReadFilter { class MockTransportConnectionImplTest : public testing::Test { public: MockTransportConnectionImplTest() : stream_info_(dispatcher_.timeSource(), nullptr) { + EXPECT_CALL(dispatcher_, isThreadSafe()).WillRepeatedly(Return(true)); EXPECT_CALL(dispatcher_.buffer_factory_, create_(_, _, _)) .WillRepeatedly(Invoke([](std::function below_low, std::function above_high, std::function above_overflow) -> Buffer::Instance* { diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 51d94c8165b6..b6746dd2c8e3 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -874,8 +874,10 @@ TEST_F(ClusterManagerImplTest, HttpHealthChecker) { createClientConnection_( PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:11001")), _, _, _)) .WillOnce(Return(connection)); + EXPECT_CALL(factory_.dispatcher_, deleteInDispatcherThread(_)); create(parseBootstrapFromV3Yaml(yaml)); factory_.tls_.shutdownThread(); + factory_.dispatcher_.to_delete_.clear(); } TEST_F(ClusterManagerImplTest, UnknownCluster) { diff --git a/test/common/upstream/logical_dns_cluster_test.cc b/test/common/upstream/logical_dns_cluster_test.cc index bc46c5771a79..140d1dc19277 100644 --- a/test/common/upstream/logical_dns_cluster_test.cc +++ b/test/common/upstream/logical_dns_cluster_test.cc @@ -204,11 +204,11 @@ class LogicalDnsClusterTest : public Event::TestUsingSimulatedTime, public testi Network::DnsResolver::ResolveCb dns_callback_; NiceMock tls_; Event::MockTimer* resolve_timer_; - std::shared_ptr cluster_; ReadyWatcher membership_updated_; ReadyWatcher initialized_; NiceMock runtime_; NiceMock dispatcher_; + std::shared_ptr cluster_; NiceMock local_info_; NiceMock admin_; Singleton::ManagerImpl singleton_manager_{Thread::threadFactoryForTest()}; diff --git a/test/common/upstream/original_dst_cluster_test.cc b/test/common/upstream/original_dst_cluster_test.cc index 3db904641719..10723a07de1b 100644 --- a/test/common/upstream/original_dst_cluster_test.cc +++ b/test/common/upstream/original_dst_cluster_test.cc @@ -94,11 +94,11 @@ class OriginalDstClusterTest : public Event::TestUsingSimulatedTime, public test Stats::TestUtil::TestStore stats_store_; Ssl::MockContextManager ssl_context_manager_; + NiceMock dispatcher_; OriginalDstClusterSharedPtr cluster_; ReadyWatcher membership_updated_; ReadyWatcher initialized_; NiceMock runtime_; - NiceMock dispatcher_; Event::MockTimer* cleanup_timer_; NiceMock random_; NiceMock local_info_; diff --git a/test/integration/sds_dynamic_integration_test.cc b/test/integration/sds_dynamic_integration_test.cc index 6023882ab9d6..c1dcb3501a8b 100644 --- a/test/integration/sds_dynamic_integration_test.cc +++ b/test/integration/sds_dynamic_integration_test.cc @@ -673,5 +673,134 @@ TEST_P(SdsDynamicUpstreamIntegrationTest, WrongSecretFirst) { EXPECT_EQ(1, test_server_->counter("sds.client_cert.update_rejected")->value()); } +// Test CDS with SDS. A cluster provided by CDS raises new SDS request for upstream cert. +class SdsCdsIntegrationTest : public SdsDynamicIntegrationBaseTest { +public: + void initialize() override { + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + // Create the dynamic cluster. This cluster will be using sds. + dynamic_cluster_ = bootstrap.mutable_static_resources()->clusters(0); + dynamic_cluster_.set_name("dynamic"); + dynamic_cluster_.mutable_connect_timeout()->MergeFrom( + ProtobufUtil::TimeUtil::MillisecondsToDuration(500000)); + auto* transport_socket = dynamic_cluster_.mutable_transport_socket(); + envoy::extensions::transport_sockets::tls::v3::UpstreamTlsContext tls_context; + auto* secret_config = + tls_context.mutable_common_tls_context()->add_tls_certificate_sds_secret_configs(); + setUpSdsConfig(secret_config, "client_cert"); + + transport_socket->set_name("envoy.transport_sockets.tls"); + transport_socket->mutable_typed_config()->PackFrom(tls_context); + + // Add cds cluster first. + auto* cds_cluster = bootstrap.mutable_static_resources()->add_clusters(); + cds_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); + cds_cluster->set_name("cds_cluster"); + ConfigHelper::setHttp2(*cds_cluster); + // Then add sds cluster. + auto* sds_cluster = bootstrap.mutable_static_resources()->add_clusters(); + sds_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); + sds_cluster->set_name("sds_cluster"); + ConfigHelper::setHttp2(*sds_cluster); + + const std::string cds_yaml = R"EOF( + resource_api_version: V3 + api_config_source: + api_type: GRPC + transport_api_version: V3 + grpc_services: + envoy_grpc: + cluster_name: cds_cluster + set_node_on_first_message_only: true +)EOF"; + auto* cds = bootstrap.mutable_dynamic_resources()->mutable_cds_config(); + TestUtility::loadFromYaml(cds_yaml, *cds); + }); + + HttpIntegrationTest::initialize(); + } + + void TearDown() override { + { + AssertionResult result = sds_connection_->close(); + RELEASE_ASSERT(result, result.message()); + result = sds_connection_->waitForDisconnect(); + RELEASE_ASSERT(result, result.message()); + sds_connection_.reset(); + } + cleanUpXdsConnection(); + cleanupUpstreamAndDownstream(); + codec_client_.reset(); + test_server_.reset(); + fake_upstreams_.clear(); + } + + void createUpstreams() override { + // Static cluster. + addFakeUpstream(FakeHttpConnection::Type::HTTP1); + // Cds Cluster. + addFakeUpstream(FakeHttpConnection::Type::HTTP2); + // Sds Cluster. + addFakeUpstream(FakeHttpConnection::Type::HTTP2); + } + + void sendCdsResponse() { + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "", {}, {}, {}, true)); + sendDiscoveryResponse( + Config::TypeUrl::get().Cluster, {dynamic_cluster_}, {dynamic_cluster_}, {}, "55"); + } + + void sendSdsResponse2(const envoy::extensions::transport_sockets::tls::v3::Secret& secret, + FakeStream& sds_stream) { + API_NO_BOOST(envoy::api::v2::DiscoveryResponse) discovery_response; + discovery_response.set_version_info("1"); + discovery_response.set_type_url(Config::TypeUrl::get().Secret); + discovery_response.add_resources()->PackFrom(secret); + sds_stream.sendGrpcMessage(discovery_response); + } + envoy::config::cluster::v3::Cluster dynamic_cluster_; + FakeHttpConnectionPtr sds_connection_; + FakeStreamPtr sds_stream_; +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, SdsCdsIntegrationTest, GRPC_CLIENT_INTEGRATION_PARAMS); + +TEST_P(SdsCdsIntegrationTest, BasicSuccess) { + on_server_init_function_ = [this]() { + { + // CDS. + AssertionResult result = + fake_upstreams_[1]->waitForHttpConnection(*dispatcher_, xds_connection_); + RELEASE_ASSERT(result, result.message()); + result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + RELEASE_ASSERT(result, result.message()); + xds_stream_->startGrpcStream(); + sendCdsResponse(); + } + { + // SDS. + AssertionResult result = + fake_upstreams_[2]->waitForHttpConnection(*dispatcher_, sds_connection_); + RELEASE_ASSERT(result, result.message()); + + result = sds_connection_->waitForNewStream(*dispatcher_, sds_stream_); + RELEASE_ASSERT(result, result.message()); + sds_stream_->startGrpcStream(); + sendSdsResponse2(getClientSecret(), *sds_stream_); + } + }; + initialize(); + + test_server_->waitForCounterGe( + "cluster.dynamic.client_ssl_socket_factory.ssl_context_update_by_sds", 1); + // The 4 clusters are CDS,SDS,static and dynamic cluster. + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 4); + + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, {}, {}, + {}, "42"); + // Successfully removed the dynamic cluster. + test_server_->waitForGaugeEq("cluster_manager.active_clusters", 3); +} + } // namespace Ssl } // namespace Envoy diff --git a/test/mocks/event/mocks.cc b/test/mocks/event/mocks.cc index 75b60c1cbccb..c3f2c766e202 100644 --- a/test/mocks/event/mocks.cc +++ b/test/mocks/event/mocks.cc @@ -34,6 +34,7 @@ MockDispatcher::MockDispatcher(const std::string& name) : name_(name) { std::function above_overflow) -> Buffer::Instance* { return new Buffer::WatermarkBuffer(below_low, above_high, above_overflow); })); + ON_CALL(*this, isThreadSafe()).WillByDefault(Return(true)); } MockDispatcher::~MockDispatcher() = default; diff --git a/test/mocks/event/mocks.h b/test/mocks/event/mocks.h index 066eb68d6747..0c107fa8a00b 100644 --- a/test/mocks/event/mocks.h +++ b/test/mocks/event/mocks.h @@ -146,6 +146,7 @@ class MockDispatcher : public Dispatcher { MOCK_METHOD(void, exit, ()); MOCK_METHOD(SignalEvent*, listenForSignal_, (signal_t signal_num, SignalCb cb)); MOCK_METHOD(void, post, (std::function callback)); + MOCK_METHOD(void, deleteInDispatcherThread, (DispatcherThreadDeletableConstPtr deletable)); MOCK_METHOD(void, run, (RunType type)); MOCK_METHOD(void, pushTrackedObject, (const ScopeTrackedObject* object)); MOCK_METHOD(void, popTrackedObject, (const ScopeTrackedObject* expected_object)); @@ -154,6 +155,7 @@ class MockDispatcher : public Dispatcher { MOCK_METHOD(Thread::ThreadId, getCurrentThreadId, ()); MOCK_METHOD(MonotonicTime, approximateMonotonicTime, (), (const)); MOCK_METHOD(void, updateApproximateMonotonicTime, ()); + MOCK_METHOD(void, shutdown, ()); GlobalTimeSystem time_system_; std::list to_delete_; diff --git a/test/mocks/event/wrapped_dispatcher.h b/test/mocks/event/wrapped_dispatcher.h index a6dc7be22716..780480902035 100644 --- a/test/mocks/event/wrapped_dispatcher.h +++ b/test/mocks/event/wrapped_dispatcher.h @@ -101,6 +101,10 @@ class WrappedDispatcher : public Dispatcher { void post(std::function callback) override { impl_.post(std::move(callback)); } + void deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) override { + impl_.deleteInDispatcherThread(std::move(deletable)); + } + void run(RunType type) override { impl_.run(type); } Buffer::WatermarkFactory& getWatermarkFactory() override { return impl_.getWatermarkFactory(); } @@ -120,6 +124,8 @@ class WrappedDispatcher : public Dispatcher { bool isThreadSafe() const override { return impl_.isThreadSafe(); } + void shutdown() override { impl_.shutdown(); } + protected: Dispatcher& impl_; };