From a1736e4e484264aa8c2a88f1f0eff159b2ffb71f Mon Sep 17 00:00:00 2001 From: Rei Shimizu Date: Fri, 16 Oct 2020 10:11:52 +0900 Subject: [PATCH] cluster manager: avoid immediate activation for dynamic inserted cluster when initialize (#12783) Signed-off-by: Shikugawa --- .../common/upstream/cluster_manager_impl.cc | 68 +++++++++---------- source/common/upstream/cluster_manager_impl.h | 1 + .../upstream/cluster_manager_impl_test.cc | 4 +- test/integration/ads_integration_test.cc | 20 ++++++ 4 files changed, 57 insertions(+), 36 deletions(-) diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 15df928be990..80a9ef9ac070 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -417,7 +417,16 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) { // been setup for cross-thread updates to avoid needless updates during initialization. The order // of operations here is important. We start by initializing the thread aware load balancer if // needed. This must happen first so cluster updates are heard first by the load balancer. - auto cluster_data = active_clusters_.find(cluster.info()->name()); + // Also, it assures that all of clusters which this function is called should be always active. + auto cluster_data = warming_clusters_.find(cluster.info()->name()); + // We have a situation that clusters will be immediately active, such as static and primary + // cluster. So we must have this prevention logic here. + if (cluster_data != warming_clusters_.end()) { + clusterWarmingToActive(cluster.info()->name()); + updateClusterCounts(); + } + cluster_data = active_clusters_.find(cluster.info()->name()); + if (cluster_data->second->thread_aware_lb_ != nullptr) { cluster_data->second->thread_aware_lb_->initialize(); } @@ -587,17 +596,6 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl // The following init manager remove call is a NOP in the case we are already initialized. // It's just kept here to avoid additional logic. init_helper_.removeCluster(*existing_active_cluster->second->cluster_); - } else { - // Validate that warming clusters are not added to the init_helper_. - // NOTE: This loop is compiled out in optimized builds. - for (const std::list& cluster_list : - {std::cref(init_helper_.primary_init_clusters_), - std::cref(init_helper_.secondary_init_clusters_)}) { - ASSERT(!std::any_of(cluster_list.begin(), cluster_list.end(), - [&existing_warming_cluster](Cluster* cluster) { - return existing_warming_cluster->second->cluster_.get() == cluster; - })); - } } cm_stats_.cluster_modified_.inc(); } else { @@ -614,40 +612,39 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl // the future we may decide to undergo a refactor to unify the logic but the effort/risk to // do that right now does not seem worth it given that the logic is generally pretty clean // and easy to understand. - const bool use_active_map = - init_helper_.state() != ClusterManagerInitHelper::State::AllClustersInitialized; - loadCluster(cluster, version_info, true, use_active_map ? active_clusters_ : warming_clusters_); - - if (use_active_map) { + const bool all_clusters_initialized = + init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized; + loadCluster(cluster, version_info, true, warming_clusters_); + auto& cluster_entry = warming_clusters_.at(cluster_name); + if (!all_clusters_initialized) { ENVOY_LOG(debug, "add/update cluster {} during init", cluster_name); - auto& cluster_entry = active_clusters_.at(cluster_name); createOrUpdateThreadLocalCluster(*cluster_entry); init_helper_.addCluster(*cluster_entry->cluster_); } else { - auto& cluster_entry = warming_clusters_.at(cluster_name); ENVOY_LOG(debug, "add/update cluster {} starting warming", cluster_name); cluster_entry->cluster_->initialize([this, cluster_name] { - auto warming_it = warming_clusters_.find(cluster_name); - auto& cluster_entry = *warming_it->second; - - // If the cluster is being updated, we need to cancel any pending merged updates. - // Otherwise, applyUpdates() will fire with a dangling cluster reference. - updates_map_.erase(cluster_name); - - active_clusters_[cluster_name] = std::move(warming_it->second); - warming_clusters_.erase(warming_it); - ENVOY_LOG(debug, "warming cluster {} complete", cluster_name); - createOrUpdateThreadLocalCluster(cluster_entry); - onClusterInit(*cluster_entry.cluster_); - updateClusterCounts(); + auto state_changed_cluster_entry = warming_clusters_.find(cluster_name); + createOrUpdateThreadLocalCluster(*state_changed_cluster_entry->second); + onClusterInit(*state_changed_cluster_entry->second->cluster_); }); } - updateClusterCounts(); return true; } +void ClusterManagerImpl::clusterWarmingToActive(const std::string& cluster_name) { + auto warming_it = warming_clusters_.find(cluster_name); + ASSERT(warming_it != warming_clusters_.end()); + + // If the cluster is being updated, we need to cancel any pending merged updates. + // Otherwise, applyUpdates() will fire with a dangling cluster reference. + updates_map_.erase(cluster_name); + + active_clusters_[cluster_name] = std::move(warming_it->second); + warming_clusters_.erase(warming_it); +} + void ClusterManagerImpl::createOrUpdateThreadLocalCluster(ClusterData& cluster) { tls_->runOnAllThreads([new_cluster = cluster.cluster_->info(), thread_aware_lb_factory = cluster.loadBalancerFactory()]( @@ -702,6 +699,7 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) { if (existing_warming_cluster != warming_clusters_.end() && existing_warming_cluster->second->added_via_api_) { removed = true; + init_helper_.removeCluster(*existing_warming_cluster->second->cluster_); warming_clusters_.erase(existing_warming_cluster); ENVOY_LOG(info, "removing warming cluster {}", cluster_name); } @@ -804,7 +802,9 @@ void ClusterManagerImpl::updateClusterCounts() { // Once cluster is warmed up, CDS is resumed, and ACK is sent to ADS, providing a // signal to ADS to proceed with RDS updates. // If we're in the middle of shutting down (ads_mux_ already gone) then this is irrelevant. - if (ads_mux_) { + const bool all_clusters_initialized = + init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized; + if (all_clusters_initialized && ads_mux_) { const auto type_urls = Config::getAllVersionTypeUrls(); const uint64_t previous_warming = cm_stats_.warming_clusters_.value(); if (previous_warming == 0 && !warming_clusters_.empty()) { diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 920681bff0ef..1aa14c4be78c 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -482,6 +482,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable prefetch_pool); diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index b24c45330de5..34c31a7f4f03 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -1055,7 +1055,7 @@ TEST_F(ClusterManagerImplTest, InitializeOrder) { last_updated: seconds: 1234567891 nanos: 234000000 - dynamic_active_clusters: + dynamic_warming_clusters: - version_info: "version1" cluster: "@type": type.googleapis.com/envoy.config.cluster.v3.Cluster @@ -1107,7 +1107,7 @@ TEST_F(ClusterManagerImplTest, InitializeOrder) { last_updated: seconds: 1234567891 nanos: 234000000 - dynamic_warming_clusters: + dynamic_active_clusters: )EOF"); EXPECT_CALL(*cluster3, initialize(_)); diff --git a/test/integration/ads_integration_test.cc b/test/integration/ads_integration_test.cc index b49b217464bf..01aae9dc9f73 100644 --- a/test/integration/ads_integration_test.cc +++ b/test/integration/ads_integration_test.cc @@ -1132,6 +1132,26 @@ class AdsClusterV3Test : public AdsIntegrationTest { INSTANTIATE_TEST_SUITE_P(IpVersionsClientTypeDelta, AdsClusterV3Test, DELTA_SOTW_GRPC_CLIENT_INTEGRATION_PARAMS); +TEST_P(AdsClusterV3Test, BasicClusterInitialWarming) { + initialize(); + const auto cds_type_url = Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3); + const auto eds_type_url = Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3); + + EXPECT_TRUE(compareDiscoveryRequest(cds_type_url, "", {}, {}, {}, true)); + sendDiscoveryResponse( + cds_type_url, {buildCluster("cluster_0")}, {buildCluster("cluster_0")}, {}, "1", false); + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); + EXPECT_TRUE(compareDiscoveryRequest(eds_type_url, "", {"cluster_0"}, {"cluster_0"}, {})); + sendDiscoveryResponse( + eds_type_url, {buildClusterLoadAssignment("cluster_0")}, + {buildClusterLoadAssignment("cluster_0")}, {}, "1", false); + + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); +} + // Verify CDS is paused during cluster warming. TEST_P(AdsClusterV3Test, CdsPausedDuringWarming) { initialize();