Skip to content

Commit

Permalink
upstream: Introducing close_connections_on_host_set_change property (#…
Browse files Browse the repository at this point in the history
…7675)

Signed-off-by: Kateryna Nezdolii <nezdolik@spotify.com>
  • Loading branch information
Kateryna Nezdolii authored and snowp committed Aug 23, 2019
1 parent 73c2b64 commit 07e3e28
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 10 deletions.
4 changes: 4 additions & 0 deletions api/envoy/api/v2/cds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,10 @@ message Cluster {
// If panic mode is triggered, new hosts are still eligible for traffic; they simply do not
// contribute to the calculation when deciding whether panic mode is enabled or not.
bool ignore_new_hosts_until_first_hc = 5;

// If set to `true`, the cluster manager will drain all existing
// connections to upstream hosts whenever hosts are added or removed from the cluster.
bool close_connections_on_host_set_change = 6;
}

// Common configuration for all load balancer implementations.
Expand Down
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Version history
certificate validation context.
* upstream: added network filter chains to upstream connections, see :ref:`filters<envoy_api_field_Cluster.filters>`.
* upstream: use p2c to select hosts for least-requests load balancers if all host weights are the same, even in cases where weights are not equal to 1.
* upstream: added :ref:`an option <envoy_api_field_Cluster.CommonLbConfig.close_connections_on_host_set_change>` that allows draining HTTP, TCP connection pools on cluster membership change.
* zookeeper: parse responses and emit latency stats.

1.11.1 (August 13, 2019)
Expand Down
25 changes: 17 additions & 8 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,21 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) {
// Now setup for cross-thread updates.
cluster.prioritySet().addMemberUpdateCb(
[&cluster, this](const HostVector&, const HostVector& hosts_removed) -> void {
// TODO(snowp): Should this be subject to merge windows?

// Whenever hosts are removed from the cluster, we make each TLS cluster drain it's
// connection pools for the removed hosts.
if (!hosts_removed.empty()) {
postThreadLocalHostRemoval(cluster, hosts_removed);
if (cluster.info()->lbConfig().close_connections_on_host_set_change()) {
for (const auto& host_set : cluster.prioritySet().hostSetsPerPriority()) {
// This will drain all tcp and http connection pools.
postThreadLocalDrainConnections(cluster, host_set->hosts());
}
} else {
// TODO(snowp): Should this be subject to merge windows?

// Whenever hosts are removed from the cluster, we make each TLS cluster drain it's
// connection pools for the removed hosts. If `close_connections_on_host_set_change` is
// enabled, this case will be covered by first `if` statement, where all
// connection pools are drained.
if (!hosts_removed.empty()) {
postThreadLocalDrainConnections(cluster, hosts_removed);
}
}
});

Expand Down Expand Up @@ -712,8 +721,8 @@ Tcp::ConnectionPool::Instance* ClusterManagerImpl::tcpConnPoolForCluster(
return entry->second->tcpConnPool(priority, context, transport_socket_options);
}

void ClusterManagerImpl::postThreadLocalHostRemoval(const Cluster& cluster,
const HostVector& hosts_removed) {
void ClusterManagerImpl::postThreadLocalDrainConnections(const Cluster& cluster,
const HostVector& hosts_removed) {
tls_->runOnAllThreads([this, name = cluster.info()->name(), hosts_removed]() {
ThreadLocalClusterManagerImpl::removeHosts(name, hosts_removed, *tls_);
});
Expand Down
3 changes: 2 additions & 1 deletion source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
std::size_t warmingClusterCount() const override { return warming_clusters_.size(); }

protected:
virtual void postThreadLocalHostRemoval(const Cluster& cluster, const HostVector& hosts_removed);
virtual void postThreadLocalDrainConnections(const Cluster& cluster,
const HostVector& hosts_removed);
virtual void postThreadLocalClusterUpdate(const Cluster& cluster, uint32_t priority,
const HostVector& hosts_added,
const HostVector& hosts_removed);
Expand Down
191 changes: 190 additions & 1 deletion test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class MockedUpdatedClusterManagerImpl : public TestClusterManagerImpl {
local_cluster_update_.post(priority, hosts_added, hosts_removed);
}

void postThreadLocalHostRemoval(const Cluster&, const HostVector& hosts_removed) override {
void postThreadLocalDrainConnections(const Cluster&, const HostVector& hosts_removed) override {
local_hosts_removed_.post(hosts_removed);
}

Expand Down Expand Up @@ -3324,6 +3324,195 @@ TEST_F(TcpKeepaliveTest, TcpKeepaliveWithAllOptions) {
expectSetsockoptSoKeepalive(7, 4, 1);
}

TEST_F(ClusterManagerImplTest, ConnPoolsDrainedOnHostSetChange) {
const std::string yaml = R"EOF(
static_resources:
clusters:
- name: cluster_1
connect_timeout: 0.250s
lb_policy: ROUND_ROBIN
type: STATIC
common_lb_config:
close_connections_on_host_set_change: true
)EOF";

ReadyWatcher initialized;
EXPECT_CALL(initialized, ready());

create(parseBootstrapFromV2Yaml(yaml));

// Set up for an initialize callback.
cluster_manager_->setInitializedCb([&]() -> void { initialized.ready(); });

std::unique_ptr<MockClusterUpdateCallbacks> callbacks(new NiceMock<MockClusterUpdateCallbacks>());
ClusterUpdateCallbacksHandlePtr cb =
cluster_manager_->addThreadLocalClusterUpdateCallbacks(*callbacks);

EXPECT_FALSE(cluster_manager_->get("cluster_1")->info()->addedViaApi());

// Verify that we get no hosts when the HostSet is empty.
EXPECT_EQ(nullptr, cluster_manager_->httpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, Http::Protocol::Http11, nullptr));
EXPECT_EQ(nullptr, cluster_manager_->tcpConnPoolForCluster("cluster_1", ResourcePriority::Default,
nullptr, nullptr));
EXPECT_EQ(nullptr,
cluster_manager_->tcpConnForCluster("cluster_1", nullptr, nullptr).connection_);

Cluster& cluster = cluster_manager_->activeClusters().begin()->second;

// Set up the HostSet.
HostSharedPtr host1 = makeTestHost(cluster.info(), "tcp://127.0.0.1:80");
HostSharedPtr host2 = makeTestHost(cluster.info(), "tcp://127.0.0.1:81");

HostVector hosts{host1, host2};
auto hosts_ptr = std::make_shared<HostVector>(hosts);

// Sending non-mergeable updates.
cluster.prioritySet().updateHosts(
0, HostSetImpl::partitionHosts(hosts_ptr, HostsPerLocalityImpl::empty()), nullptr, hosts, {},
100);

EXPECT_EQ(1, factory_.stats_.counter("cluster_manager.cluster_updated").value());
EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.cluster_updated_via_merge").value());
EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.update_merge_cancelled").value());

EXPECT_CALL(factory_, allocateConnPool_(_, _))
.Times(3)
.WillRepeatedly(ReturnNew<Http::ConnectionPool::MockInstance>());

EXPECT_CALL(factory_, allocateTcpConnPool_(_))
.Times(3)
.WillRepeatedly(ReturnNew<Tcp::ConnectionPool::MockInstance>());

// This should provide us a CP for each of the above hosts.
Http::ConnectionPool::MockInstance* cp1 =
dynamic_cast<Http::ConnectionPool::MockInstance*>(cluster_manager_->httpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, Http::Protocol::Http11, nullptr));
// Create persistent connection for host2.
Http::ConnectionPool::MockInstance* cp2 =
dynamic_cast<Http::ConnectionPool::MockInstance*>(cluster_manager_->httpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, Http::Protocol::Http2, nullptr));

Tcp::ConnectionPool::MockInstance* tcp1 =
dynamic_cast<Tcp::ConnectionPool::MockInstance*>(cluster_manager_->tcpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, nullptr, nullptr));

Tcp::ConnectionPool::MockInstance* tcp2 =
dynamic_cast<Tcp::ConnectionPool::MockInstance*>(cluster_manager_->tcpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, nullptr, nullptr));

EXPECT_NE(cp1, cp2);
EXPECT_NE(tcp1, tcp2);

EXPECT_CALL(*cp2, addDrainedCallback(_))
.WillOnce(Invoke([](Http::ConnectionPool::Instance::DrainedCb cb) { cb(); }));

EXPECT_CALL(*cp1, addDrainedCallback(_))
.WillOnce(Invoke([](Http::ConnectionPool::Instance::DrainedCb cb) { cb(); }));

EXPECT_CALL(*tcp1, addDrainedCallback(_))
.WillOnce(Invoke([](Tcp::ConnectionPool::Instance::DrainedCb cb) { cb(); }));

EXPECT_CALL(*tcp2, addDrainedCallback(_))
.WillOnce(Invoke([](Tcp::ConnectionPool::Instance::DrainedCb cb) { cb(); }));

HostVector hosts_removed;
hosts_removed.push_back(host2);

// This update should drain all connection pools (host1, host2).
cluster.prioritySet().updateHosts(
0, HostSetImpl::partitionHosts(hosts_ptr, HostsPerLocalityImpl::empty()), nullptr, {},
hosts_removed, 100);

// Recreate connection pool for host1.
cp1 = dynamic_cast<Http::ConnectionPool::MockInstance*>(cluster_manager_->httpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, Http::Protocol::Http11, nullptr));

tcp1 = dynamic_cast<Tcp::ConnectionPool::MockInstance*>(cluster_manager_->tcpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, nullptr, nullptr));

HostSharedPtr host3 = makeTestHost(cluster.info(), "tcp://127.0.0.1:82");

HostVector hosts_added;
hosts_added.push_back(host3);

EXPECT_CALL(*cp1, addDrainedCallback(_))
.WillOnce(Invoke([](Http::ConnectionPool::Instance::DrainedCb cb) { cb(); }));

EXPECT_CALL(*tcp1, addDrainedCallback(_))
.WillOnce(Invoke([](Tcp::ConnectionPool::Instance::DrainedCb cb) { cb(); }));

// Adding host3 should drain connection pool for host1.
cluster.prioritySet().updateHosts(
0, HostSetImpl::partitionHosts(hosts_ptr, HostsPerLocalityImpl::empty()), nullptr,
hosts_added, {}, 100);
}

TEST_F(ClusterManagerImplTest, ConnPoolsNotDrainedOnHostSetChange) {
const std::string yaml = R"EOF(
static_resources:
clusters:
- name: cluster_1
connect_timeout: 0.250s
lb_policy: ROUND_ROBIN
type: STATIC
)EOF";

ReadyWatcher initialized;
EXPECT_CALL(initialized, ready());
create(parseBootstrapFromV2Yaml(yaml));

// Set up for an initialize callback.
cluster_manager_->setInitializedCb([&]() -> void { initialized.ready(); });

std::unique_ptr<MockClusterUpdateCallbacks> callbacks(new NiceMock<MockClusterUpdateCallbacks>());
ClusterUpdateCallbacksHandlePtr cb =
cluster_manager_->addThreadLocalClusterUpdateCallbacks(*callbacks);

Cluster& cluster = cluster_manager_->activeClusters().begin()->second;

// Set up the HostSet.
HostSharedPtr host1 = makeTestHost(cluster.info(), "tcp://127.0.0.1:80");

HostVector hosts{host1};
auto hosts_ptr = std::make_shared<HostVector>(hosts);

// Sending non-mergeable updates.
cluster.prioritySet().updateHosts(
0, HostSetImpl::partitionHosts(hosts_ptr, HostsPerLocalityImpl::empty()), nullptr, hosts, {},
100);

EXPECT_CALL(factory_, allocateConnPool_(_, _))
.Times(1)
.WillRepeatedly(ReturnNew<Http::ConnectionPool::MockInstance>());

EXPECT_CALL(factory_, allocateTcpConnPool_(_))
.Times(1)
.WillRepeatedly(ReturnNew<Tcp::ConnectionPool::MockInstance>());

// This should provide us a CP for each of the above hosts.
Http::ConnectionPool::MockInstance* cp1 =
dynamic_cast<Http::ConnectionPool::MockInstance*>(cluster_manager_->httpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, Http::Protocol::Http11, nullptr));

Tcp::ConnectionPool::MockInstance* tcp1 =
dynamic_cast<Tcp::ConnectionPool::MockInstance*>(cluster_manager_->tcpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, nullptr, nullptr));

HostSharedPtr host2 = makeTestHost(cluster.info(), "tcp://127.0.0.1:82");
HostVector hosts_added;
hosts_added.push_back(host2);

// No connection pools should be drained.
EXPECT_CALL(*cp1, drainConnections()).Times(0);
EXPECT_CALL(*tcp1, drainConnections()).Times(0);

// No connection pools should be drained.
cluster.prioritySet().updateHosts(
0, HostSetImpl::partitionHosts(hosts_ptr, HostsPerLocalityImpl::empty()), nullptr,
hosts_added, {}, 100);
}

} // namespace
} // namespace Upstream
} // namespace Envoy

0 comments on commit 07e3e28

Please sign in to comment.