Skip to content

Commit

Permalink
Expose per zone hosts as part of host set (#169)
Browse files Browse the repository at this point in the history
  • Loading branch information
RomanDzhabarov authored Oct 24, 2016
1 parent eb21f2e commit c241ccd
Show file tree
Hide file tree
Showing 16 changed files with 216 additions and 128 deletions.
18 changes: 12 additions & 6 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,20 @@ class HostSet {
* set on the command line and to use a cluster type that supports population such as
* the SDS cluster type.
*/
virtual const std::vector<HostPtr>& localZoneHosts() const PURE;

/**
* @return all healthy hosts that are in the zone local to this node. See healthyHosts() and
* localZoneHosts() for more information.
* @return hosts per zone, index 0 is dedicated to local zone hosts.
* If there are no hosts in local zone for upstream cluster hostPerZone() will @return
* empty vector.
*
* Note, that we sort zones in alphabetical order starting from index 1.
*/
virtual const std::vector<HostPtr>& localZoneHealthyHosts() const PURE;
virtual const std::vector<std::vector<HostPtr>>& hostsPerZone() const PURE;

/**
* @return same as hostsPerZone but only contains healthy hosts.
*/
virtual const std::vector<std::vector<HostPtr>>& healthyHostsPerZone() const PURE;
};

/**
Expand Down Expand Up @@ -188,8 +195,7 @@ class HostSet {
COUNTER(zone_over_percentage) \
COUNTER(zone_routing_sampled) \
COUNTER(zone_routing_no_sampled) \
GAUGE (max_host_weight) \
GAUGE (upstream_zone_count)
GAUGE (max_host_weight)
// clang-format on

/**
Expand Down
15 changes: 8 additions & 7 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,17 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(const ClusterImplBase& pri
const std::string& name = primary_cluster.name();
ConstHostVectorPtr hosts_copy = primary_cluster.rawHosts();
ConstHostVectorPtr healthy_hosts_copy = primary_cluster.rawHealthyHosts();
ConstHostVectorPtr local_zone_hosts_copy = primary_cluster.rawLocalZoneHosts();
ConstHostVectorPtr local_zone_healthy_hosts_copy = primary_cluster.rawLocalZoneHealthyHosts();
ConstHostListsPtr hosts_per_zone_copy = primary_cluster.rawHostsPerZone();
ConstHostListsPtr healthy_hosts_per_zone_copy = primary_cluster.rawHealthyHostsPerZone();
ThreadLocal::Instance& tls = tls_;
uint32_t thead_local_slot = thread_local_slot_;

tls_.runOnAllThreads(
[name, hosts_copy, healthy_hosts_copy, local_zone_hosts_copy, local_zone_healthy_hosts_copy,
[name, hosts_copy, healthy_hosts_copy, hosts_per_zone_copy, healthy_hosts_per_zone_copy,
hosts_added, hosts_removed, &tls, thead_local_slot]() mutable -> void {
ThreadLocalClusterManagerImpl::updateClusterMembership(
name, hosts_copy, healthy_hosts_copy, local_zone_hosts_copy,
local_zone_healthy_hosts_copy, hosts_added, hosts_removed, tls, thead_local_slot);
name, hosts_copy, healthy_hosts_copy, hosts_per_zone_copy, healthy_hosts_per_zone_copy,
hosts_added, hosts_removed, tls, thead_local_slot);
});
}

Expand Down Expand Up @@ -289,7 +290,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools(

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership(
const std::string& name, ConstHostVectorPtr hosts, ConstHostVectorPtr healthy_hosts,
ConstHostVectorPtr local_zone_hosts, ConstHostVectorPtr local_zone_healthy_hosts,
ConstHostListsPtr hosts_per_zone, ConstHostListsPtr healthy_hosts_per_zone,
const std::vector<HostPtr>& hosts_added, const std::vector<HostPtr>& hosts_removed,
ThreadLocal::Instance& tls, uint32_t thead_local_slot) {

Expand All @@ -298,7 +299,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership(

ASSERT(config.thread_local_clusters_.find(name) != config.thread_local_clusters_.end());
config.thread_local_clusters_[name]->host_set_.updateHosts(
hosts, healthy_hosts, local_zone_hosts, local_zone_healthy_hosts, hosts_added, hosts_removed);
hosts, healthy_hosts, hosts_per_zone, healthy_hosts_per_zone, hosts_added, hosts_removed);
}

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::shutdown() {
Expand Down
4 changes: 2 additions & 2 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ class ClusterManagerImpl : public ClusterManager {
void drainConnPools(HostPtr old_host, ConnPoolsContainer& container);
static void updateClusterMembership(const std::string& name, ConstHostVectorPtr hosts,
ConstHostVectorPtr healthy_hosts,
ConstHostVectorPtr local_zone_hosts,
ConstHostVectorPtr local_zone_healthy_hosts,
ConstHostListsPtr hosts_per_zone,
ConstHostListsPtr healthy_hosts_per_zone,
const std::vector<HostPtr>& hosts_added,
const std::vector<HostPtr>& hosts_removed,
ThreadLocal::Instance& tls, uint32_t thread_local_slot);
Expand Down
66 changes: 43 additions & 23 deletions source/common/upstream/load_balancer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,69 @@

namespace Upstream {

const std::vector<HostPtr>& LoadBalancerBase::hostsToUse() {
ASSERT(host_set_.healthyHosts().size() <= host_set_.hosts().size());
if (host_set_.hosts().empty()) {
return host_set_.hosts();
bool LoadBalancerBase::earlyExitNonZoneRouting() {
uint32_t number_of_zones = host_set_.healthyHostsPerZone().size();
if (number_of_zones < 2 ||
!runtime_.snapshot().featureEnabled("upstream.zone_routing.enabled", 100)) {
return true;
}

const std::vector<HostPtr>& local_zone_healthy_hosts = host_set_.healthyHostsPerZone()[0];
if (local_zone_healthy_hosts.empty()) {
return true;
}

// Do not perform zone routing for small clusters.
uint64_t min_cluster_size =
runtime_.snapshot().getInteger("upstream.zone_routing.min_cluster_size", 6U);

if (host_set_.healthyHosts().size() < min_cluster_size) {
stats_.zone_cluster_too_small_.inc();
return true;
}

return false;
}

bool LoadBalancerBase::isGlobalPanic() {
uint64_t global_panic_threshold =
std::min(100UL, runtime_.snapshot().getInteger("upstream.healthy_panic_threshold", 50));
double healthy_percent = 100.0 * host_set_.healthyHosts().size() / host_set_.hosts().size();

// If the % of healthy hosts in the cluster is less than our panic threshold, we use all hosts.
if (healthy_percent < global_panic_threshold) {
stats_.upstream_rq_lb_healthy_panic_.inc();
return host_set_.hosts();
return true;
}

uint32_t number_of_zones = stats_.upstream_zone_count_.value();
// Early exit if we cannot perform zone aware routing.
if (number_of_zones < 2 || host_set_.localZoneHealthyHosts().empty() ||
!runtime_.snapshot().featureEnabled("upstream.zone_routing.enabled", 100)) {
return host_set_.healthyHosts();
}
return false;
}

// Do not perform zone routing for small clusters.
uint64_t min_cluster_size =
runtime_.snapshot().getInteger("upstream.zone_routing.min_cluster_size", 6U);
const std::vector<HostPtr>& LoadBalancerBase::hostsToUse() {
ASSERT(host_set_.healthyHosts().size() <= host_set_.hosts().size());

if (host_set_.healthyHosts().size() < min_cluster_size) {
stats_.zone_cluster_too_small_.inc();
if (host_set_.hosts().empty() || isGlobalPanic()) {
return host_set_.hosts();
}

if (earlyExitNonZoneRouting()) {
return host_set_.healthyHosts();
}

// If number of hosts in a local zone big enough route all requests to the same zone.
if (host_set_.localZoneHealthyHosts().size() * number_of_zones >=
host_set_.healthyHosts().size()) {
// At this point it's guaranteed to be at least 2 zones.
uint32_t number_of_zones = host_set_.healthyHostsPerZone().size();
ASSERT(number_of_zones >= 2U);
const std::vector<HostPtr>& local_zone_healthy_hosts = host_set_.healthyHostsPerZone()[0];

// If number of hosts in a local zone big enough then route all requests to the same zone.
if (local_zone_healthy_hosts.size() * number_of_zones >= host_set_.healthyHosts().size()) {
stats_.zone_over_percentage_.inc();
return host_set_.localZoneHealthyHosts();
return local_zone_healthy_hosts;
}

// If local zone ratio is lower than expected we should only partially route requests from the
// same zone.
double zone_host_ratio =
1.0 * host_set_.localZoneHealthyHosts().size() / host_set_.healthyHosts().size();
double zone_host_ratio = 1.0 * local_zone_healthy_hosts.size() / host_set_.healthyHosts().size();
double ratio_to_route = zone_host_ratio * number_of_zones;

// Not zone routed requests will be distributed between all hosts and hence
Expand All @@ -63,7 +83,7 @@ const std::vector<HostPtr>& LoadBalancerBase::hostsToUse() {

if (random_.random() % 10000 < zone_routing_threshold) {
stats_.zone_routing_sampled_.inc();
return host_set_.localZoneHealthyHosts();
return local_zone_healthy_hosts;
} else {
stats_.zone_routing_no_sampled_.inc();
return host_set_.healthyHosts();
Expand Down
3 changes: 3 additions & 0 deletions source/common/upstream/load_balancer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ class LoadBalancerBase {
Runtime::RandomGenerator& random_;

private:
bool earlyExitNonZoneRouting();
bool isGlobalPanic();

const HostSet& host_set_;
const HostSet* local_host_set_;
};
Expand Down
4 changes: 2 additions & 2 deletions source/common/upstream/logical_dns_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ void LogicalDnsCluster::startResolve() {
logical_host_.reset(new LogicalHost(*this, dns_url_, *this));
HostVectorPtr new_hosts(new std::vector<HostPtr>());
new_hosts->emplace_back(logical_host_);
updateHosts(new_hosts, createHealthyHostList(*new_hosts), empty_host_list_,
empty_host_list_, *new_hosts, {});
updateHosts(new_hosts, createHealthyHostList(*new_hosts), empty_host_lists_,
empty_host_lists_, *new_hosts, {});
}
}

Expand Down
23 changes: 18 additions & 5 deletions source/common/upstream/sds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,30 @@ void SdsClusterImpl::parseSdsResponse(Http::Message& response) {
if (updateDynamicHostList(new_hosts, *current_hosts_copy, hosts_added, hosts_removed,
health_checker_ != nullptr)) {
log_debug("sds hosts changed for cluster: {} ({})", name_, hosts().size());
HostVectorPtr local_zone_hosts(new std::vector<HostPtr>());
HostListsPtr per_zone(new std::vector<std::vector<HostPtr>>());

// If local zone name is not defined then skip populating per zone hosts.
if (!sds_config_.local_zone_name_.empty()) {
std::map<std::string, std::vector<HostPtr>> hosts_per_zone;

for (HostPtr host : *current_hosts_copy) {
if (host->zone() == sds_config_.local_zone_name_) {
local_zone_hosts->push_back(host);
hosts_per_zone[host->zone()].push_back(host);
}

// Populate per_zone hosts only if upstream cluster has hosts in the same zone.
if (hosts_per_zone.find(sds_config_.local_zone_name_) != hosts_per_zone.end()) {
per_zone->push_back(hosts_per_zone[sds_config_.local_zone_name_]);

for (auto& entry : hosts_per_zone) {
if (sds_config_.local_zone_name_ != entry.first) {
per_zone->push_back(entry.second);
}
}
}
}

updateHosts(current_hosts_copy, createHealthyHostList(*current_hosts_copy), local_zone_hosts,
createHealthyHostList(*local_zone_hosts), hosts_added, hosts_removed);
updateHosts(current_hosts_copy, createHealthyHostList(*current_hosts_copy), per_zone,
createHealthyHostLists(*per_zone), hosts_added, hosts_removed);

if (initialize_callback_ && health_checker_ && pending_health_checks_ == 0) {
pending_health_checks_ = hosts().size();
Expand Down
40 changes: 26 additions & 14 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void HostSetImpl::runUpdateCallbacks(const std::vector<HostPtr>& hosts_added,
}
}

const ConstHostVectorPtr ClusterImplBase::empty_host_list_{new std::vector<HostPtr>{}};
const ConstHostListsPtr ClusterImplBase::empty_host_lists_{new std::vector<std::vector<HostPtr>>()};

ClusterImplBase::ClusterImplBase(const Json::Object& config, Runtime::Loader& runtime,
Stats::Store& stats, Ssl::ContextManager& ssl_context_manager)
Expand Down Expand Up @@ -85,7 +85,7 @@ ClusterImplBase::ClusterImplBase(const Json::Object& config, Runtime::Loader& ru

ConstHostVectorPtr ClusterImplBase::createHealthyHostList(const std::vector<HostPtr>& hosts) {
HostVectorPtr healthy_list(new std::vector<HostPtr>());
for (auto host : hosts) {
for (const auto& host : hosts) {
if (host->healthy()) {
healthy_list->emplace_back(host);
}
Expand All @@ -94,6 +94,23 @@ ConstHostVectorPtr ClusterImplBase::createHealthyHostList(const std::vector<Host
return healthy_list;
}

ConstHostListsPtr
ClusterImplBase::createHealthyHostLists(const std::vector<std::vector<HostPtr>>& hosts) {
HostListsPtr healthy_list(new std::vector<std::vector<HostPtr>>());

for (const auto& hosts_zone : hosts) {
std::vector<HostPtr> current_zone_hosts;
for (const auto& host : hosts_zone) {
if (host->healthy()) {
current_zone_hosts.emplace_back(host);
}
}
healthy_list->push_back(std::move(current_zone_hosts));
}

return healthy_list;
}

uint64_t ClusterImplBase::parseFeatures(const Json::Object& config) {
uint64_t features = 0;
for (const std::string& feature : StringUtil::split(config.getString("features", ""), ',')) {
Expand Down Expand Up @@ -130,8 +147,8 @@ void ClusterImplBase::setHealthChecker(HealthCheckerPtr&& health_checker) {
// If we get a health check completion that resulted in a state change, signal to
// update the host sets on all threads.
if (changed_state) {
updateHosts(rawHosts(), createHealthyHostList(*rawHosts()), rawLocalZoneHosts(),
createHealthyHostList(*rawLocalZoneHosts()), {}, {});
updateHosts(rawHosts(), createHealthyHostList(*rawHosts()), rawHostsPerZone(),
createHealthyHostLists(*rawHostsPerZone()), {}, {});
}
});
}
Expand All @@ -143,8 +160,8 @@ void ClusterImplBase::setOutlierDetector(OutlierDetectorPtr&& outlier_detector)

outlier_detector_ = std::move(outlier_detector);
outlier_detector_->addChangedStateCb([this](HostPtr) -> void {
updateHosts(rawHosts(), createHealthyHostList(*rawHosts()), rawLocalZoneHosts(),
createHealthyHostList(*rawLocalZoneHosts()), {}, {});
updateHosts(rawHosts(), createHealthyHostList(*rawHosts()), rawHostsPerZone(),
createHealthyHostLists(*rawHostsPerZone()), {}, {});
});
}

Expand Down Expand Up @@ -187,8 +204,8 @@ StaticClusterImpl::StaticClusterImpl(const Json::Object& config, Runtime::Loader
new_hosts->emplace_back(HostPtr{new HostImpl(*this, url, false, 1, "")});
}

updateHosts(new_hosts, createHealthyHostList(*new_hosts), empty_host_list_, empty_host_list_, {},
{});
updateHosts(new_hosts, createHealthyHostList(*new_hosts), empty_host_lists_, empty_host_lists_,
{}, {});
}

bool BaseDynamicClusterImpl::updateDynamicHostList(const std::vector<HostPtr>& new_hosts,
Expand All @@ -197,7 +214,6 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const std::vector<HostPtr>& n
std::vector<HostPtr>& hosts_removed,
bool depend_on_hc) {
uint64_t max_host_weight = 1;
std::unordered_set<std::string> zones;

// Go through and see if the list we have is different from what we just got. If it is, we
// make a new host list and raise a change notification. This uses an N^2 search given that
Expand All @@ -209,7 +225,6 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const std::vector<HostPtr>& n
// If we find a host matched based on URL, we keep it. However we do change weight inline so
// do that here.
if ((*i)->url() == host->url()) {
zones.insert((*i)->zone());
if (host->weight() > max_host_weight) {
max_host_weight = host->weight();
}
Expand All @@ -227,7 +242,6 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const std::vector<HostPtr>& n
if (host->weight() > max_host_weight) {
max_host_weight = host->weight();
}
zones.insert(host->zone());

final_hosts.push_back(host);
hosts_added.push_back(host);
Expand All @@ -246,7 +260,6 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const std::vector<HostPtr>& n
if ((*i)->weight() > max_host_weight) {
max_host_weight = (*i)->weight();
}
zones.insert((*i)->zone());

final_hosts.push_back(*i);
i = current_hosts.erase(i);
Expand All @@ -257,7 +270,6 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const std::vector<HostPtr>& n
}

stats_.max_host_weight_.set(max_host_weight);
stats_.upstream_zone_count_.set(zones.size());

if (!hosts_added.empty() || !current_hosts.empty()) {
hosts_removed = std::move(current_hosts);
Expand Down Expand Up @@ -295,7 +307,7 @@ void StrictDnsClusterImpl::updateAllHosts(const std::vector<HostPtr>& hosts_adde
}
}

updateHosts(new_hosts, createHealthyHostList(*new_hosts), empty_host_list_, empty_host_list_,
updateHosts(new_hosts, createHealthyHostList(*new_hosts), empty_host_lists_, empty_host_lists_,
hosts_added, hosts_removed);
}

Expand Down
Loading

0 comments on commit c241ccd

Please sign in to comment.