Skip to content

Commit

Permalink
cluster manager: avoid immediate activation for dynamic inserted clus…
Browse files Browse the repository at this point in the history
…ter when initialize (envoyproxy#12783)

Signed-off-by: Shikugawa <rei@tetrate.io>
Signed-off-by: Yuchen Dai <silentdai@gmail.com>
  • Loading branch information
Shikugawa authored and lambdai committed Nov 10, 2020
1 parent a490de8 commit 89d0724
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 36 deletions.
68 changes: 34 additions & 34 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,16 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) {
// been setup for cross-thread updates to avoid needless updates during initialization. The order
// of operations here is important. We start by initializing the thread aware load balancer if
// needed. This must happen first so cluster updates are heard first by the load balancer.
auto cluster_data = active_clusters_.find(cluster.info()->name());
// Also, it assures that all of clusters which this function is called should be always active.
auto cluster_data = warming_clusters_.find(cluster.info()->name());
// We have a situation that clusters will be immediately active, such as static and primary
// cluster. So we must have this prevention logic here.
if (cluster_data != warming_clusters_.end()) {
clusterWarmingToActive(cluster.info()->name());
updateClusterCounts();
}
cluster_data = active_clusters_.find(cluster.info()->name());

if (cluster_data->second->thread_aware_lb_ != nullptr) {
cluster_data->second->thread_aware_lb_->initialize();
}
Expand Down Expand Up @@ -587,17 +596,6 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl
// The following init manager remove call is a NOP in the case we are already initialized.
// It's just kept here to avoid additional logic.
init_helper_.removeCluster(*existing_active_cluster->second->cluster_);
} else {
// Validate that warming clusters are not added to the init_helper_.
// NOTE: This loop is compiled out in optimized builds.
for (const std::list<Cluster*>& cluster_list :
{std::cref(init_helper_.primary_init_clusters_),
std::cref(init_helper_.secondary_init_clusters_)}) {
ASSERT(!std::any_of(cluster_list.begin(), cluster_list.end(),
[&existing_warming_cluster](Cluster* cluster) {
return existing_warming_cluster->second->cluster_.get() == cluster;
}));
}
}
cm_stats_.cluster_modified_.inc();
} else {
Expand All @@ -614,40 +612,39 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl
// the future we may decide to undergo a refactor to unify the logic but the effort/risk to
// do that right now does not seem worth it given that the logic is generally pretty clean
// and easy to understand.
const bool use_active_map =
init_helper_.state() != ClusterManagerInitHelper::State::AllClustersInitialized;
loadCluster(cluster, version_info, true, use_active_map ? active_clusters_ : warming_clusters_);

if (use_active_map) {
const bool all_clusters_initialized =
init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized;
loadCluster(cluster, version_info, true, warming_clusters_);
auto& cluster_entry = warming_clusters_.at(cluster_name);
if (!all_clusters_initialized) {
ENVOY_LOG(debug, "add/update cluster {} during init", cluster_name);
auto& cluster_entry = active_clusters_.at(cluster_name);
createOrUpdateThreadLocalCluster(*cluster_entry);
init_helper_.addCluster(*cluster_entry->cluster_);
} else {
auto& cluster_entry = warming_clusters_.at(cluster_name);
ENVOY_LOG(debug, "add/update cluster {} starting warming", cluster_name);
cluster_entry->cluster_->initialize([this, cluster_name] {
auto warming_it = warming_clusters_.find(cluster_name);
auto& cluster_entry = *warming_it->second;

// If the cluster is being updated, we need to cancel any pending merged updates.
// Otherwise, applyUpdates() will fire with a dangling cluster reference.
updates_map_.erase(cluster_name);

active_clusters_[cluster_name] = std::move(warming_it->second);
warming_clusters_.erase(warming_it);

ENVOY_LOG(debug, "warming cluster {} complete", cluster_name);
createOrUpdateThreadLocalCluster(cluster_entry);
onClusterInit(*cluster_entry.cluster_);
updateClusterCounts();
auto state_changed_cluster_entry = warming_clusters_.find(cluster_name);
createOrUpdateThreadLocalCluster(*state_changed_cluster_entry->second);
onClusterInit(*state_changed_cluster_entry->second->cluster_);
});
}

updateClusterCounts();
return true;
}

void ClusterManagerImpl::clusterWarmingToActive(const std::string& cluster_name) {
auto warming_it = warming_clusters_.find(cluster_name);
ASSERT(warming_it != warming_clusters_.end());

// If the cluster is being updated, we need to cancel any pending merged updates.
// Otherwise, applyUpdates() will fire with a dangling cluster reference.
updates_map_.erase(cluster_name);

active_clusters_[cluster_name] = std::move(warming_it->second);
warming_clusters_.erase(warming_it);
}

void ClusterManagerImpl::createOrUpdateThreadLocalCluster(ClusterData& cluster) {
tls_->runOnAllThreads([new_cluster = cluster.cluster_->info(),
thread_aware_lb_factory = cluster.loadBalancerFactory()](
Expand Down Expand Up @@ -702,6 +699,7 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) {
if (existing_warming_cluster != warming_clusters_.end() &&
existing_warming_cluster->second->added_via_api_) {
removed = true;
init_helper_.removeCluster(*existing_warming_cluster->second->cluster_);
warming_clusters_.erase(existing_warming_cluster);
ENVOY_LOG(info, "removing warming cluster {}", cluster_name);
}
Expand Down Expand Up @@ -804,7 +802,9 @@ void ClusterManagerImpl::updateClusterCounts() {
// Once cluster is warmed up, CDS is resumed, and ACK is sent to ADS, providing a
// signal to ADS to proceed with RDS updates.
// If we're in the middle of shutting down (ads_mux_ already gone) then this is irrelevant.
if (ads_mux_) {
const bool all_clusters_initialized =
init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized;
if (all_clusters_initialized && ads_mux_) {
const auto type_urls = Config::getAllVersionTypeUrls<envoy::config::cluster::v3::Cluster>();
const uint64_t previous_warming = cm_stats_.warming_clusters_.value();
if (previous_warming == 0 && !warming_clusters_.empty()) {
Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
void onClusterInit(Cluster& cluster);
void postThreadLocalHealthFailure(const HostSharedPtr& host);
void updateClusterCounts();
void clusterWarmingToActive(const std::string& cluster_name);
void maybePrefetch(ThreadLocalClusterManagerImpl::ClusterEntryPtr& cluster_entry,
std::function<ConnectionPool::Instance*()> prefetch_pool);

Expand Down
4 changes: 2 additions & 2 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ TEST_F(ClusterManagerImplTest, InitializeOrder) {
last_updated:
seconds: 1234567891
nanos: 234000000
dynamic_active_clusters:
dynamic_warming_clusters:
- version_info: "version1"
cluster:
"@type": type.googleapis.com/envoy.config.cluster.v3.Cluster
Expand Down Expand Up @@ -1107,7 +1107,7 @@ TEST_F(ClusterManagerImplTest, InitializeOrder) {
last_updated:
seconds: 1234567891
nanos: 234000000
dynamic_warming_clusters:
dynamic_active_clusters:
)EOF");

EXPECT_CALL(*cluster3, initialize(_));
Expand Down
20 changes: 20 additions & 0 deletions test/integration/ads_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,26 @@ class AdsClusterV3Test : public AdsIntegrationTest {
INSTANTIATE_TEST_SUITE_P(IpVersionsClientTypeDelta, AdsClusterV3Test,
DELTA_SOTW_GRPC_CLIENT_INTEGRATION_PARAMS);

TEST_P(AdsClusterV3Test, BasicClusterInitialWarming) {
initialize();
const auto cds_type_url = Config::getTypeUrl<envoy::config::cluster::v3::Cluster>(
envoy::config::core::v3::ApiVersion::V3);
const auto eds_type_url = Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>(
envoy::config::core::v3::ApiVersion::V3);

EXPECT_TRUE(compareDiscoveryRequest(cds_type_url, "", {}, {}, {}, true));
sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(
cds_type_url, {buildCluster("cluster_0")}, {buildCluster("cluster_0")}, {}, "1", false);
test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1);
EXPECT_TRUE(compareDiscoveryRequest(eds_type_url, "", {"cluster_0"}, {"cluster_0"}, {}));
sendDiscoveryResponse<envoy::config::endpoint::v3::ClusterLoadAssignment>(
eds_type_url, {buildClusterLoadAssignment("cluster_0")},
{buildClusterLoadAssignment("cluster_0")}, {}, "1", false);

test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0);
test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2);
}

// Verify CDS is paused during cluster warming.
TEST_P(AdsClusterV3Test, CdsPausedDuringWarming) {
initialize();
Expand Down

0 comments on commit 89d0724

Please sign in to comment.