Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose per zone hosts as part of host set #169

Merged
merged 3 commits into from
Oct 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,20 @@ class HostSet {
* set on the command line and to use a cluster type that supports population such as
* the SDS cluster type.
*/
virtual const std::vector<HostPtr>& localZoneHosts() const PURE;

/**
* @return all healthy hosts that are in the zone local to this node. See healthyHosts() and
* localZoneHosts() for more information.
* @return hosts per zone, index 0 is dedicated to local zone hosts.
* If there are no hosts in local zone for upstream cluster hostPerZone() will @return
* empty vector.
*
* Note, that we sort zones in alphabetical order starting from index 1.
*/
virtual const std::vector<HostPtr>& localZoneHealthyHosts() const PURE;
virtual const std::vector<std::vector<HostPtr>>& hostsPerZone() const PURE;

/**
* @return same as hostsPerZone but only contains healthy hosts.
*/
virtual const std::vector<std::vector<HostPtr>>& healthyHostsPerZone() const PURE;
};

/**
Expand Down Expand Up @@ -188,8 +195,7 @@ class HostSet {
COUNTER(zone_over_percentage) \
COUNTER(zone_routing_sampled) \
COUNTER(zone_routing_no_sampled) \
GAUGE (max_host_weight) \
GAUGE (upstream_zone_count)
GAUGE (max_host_weight)
// clang-format on

/**
Expand Down
15 changes: 8 additions & 7 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,17 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(const ClusterImplBase& pri
const std::string& name = primary_cluster.name();
ConstHostVectorPtr hosts_copy = primary_cluster.rawHosts();
ConstHostVectorPtr healthy_hosts_copy = primary_cluster.rawHealthyHosts();
ConstHostVectorPtr local_zone_hosts_copy = primary_cluster.rawLocalZoneHosts();
ConstHostVectorPtr local_zone_healthy_hosts_copy = primary_cluster.rawLocalZoneHealthyHosts();
ConstHostListsPtr hosts_per_zone_copy = primary_cluster.rawHostsPerZone();
ConstHostListsPtr healthy_hosts_per_zone_copy = primary_cluster.rawHealthyHostsPerZone();
ThreadLocal::Instance& tls = tls_;
uint32_t thead_local_slot = thread_local_slot_;

tls_.runOnAllThreads(
[name, hosts_copy, healthy_hosts_copy, local_zone_hosts_copy, local_zone_healthy_hosts_copy,
[name, hosts_copy, healthy_hosts_copy, hosts_per_zone_copy, healthy_hosts_per_zone_copy,
hosts_added, hosts_removed, &tls, thead_local_slot]() mutable -> void {
ThreadLocalClusterManagerImpl::updateClusterMembership(
name, hosts_copy, healthy_hosts_copy, local_zone_hosts_copy,
local_zone_healthy_hosts_copy, hosts_added, hosts_removed, tls, thead_local_slot);
name, hosts_copy, healthy_hosts_copy, hosts_per_zone_copy, healthy_hosts_per_zone_copy,
hosts_added, hosts_removed, tls, thead_local_slot);
});
}

Expand Down Expand Up @@ -289,7 +290,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools(

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership(
const std::string& name, ConstHostVectorPtr hosts, ConstHostVectorPtr healthy_hosts,
ConstHostVectorPtr local_zone_hosts, ConstHostVectorPtr local_zone_healthy_hosts,
ConstHostListsPtr hosts_per_zone, ConstHostListsPtr healthy_hosts_per_zone,
const std::vector<HostPtr>& hosts_added, const std::vector<HostPtr>& hosts_removed,
ThreadLocal::Instance& tls, uint32_t thead_local_slot) {

Expand All @@ -298,7 +299,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership(

ASSERT(config.thread_local_clusters_.find(name) != config.thread_local_clusters_.end());
config.thread_local_clusters_[name]->host_set_.updateHosts(
hosts, healthy_hosts, local_zone_hosts, local_zone_healthy_hosts, hosts_added, hosts_removed);
hosts, healthy_hosts, hosts_per_zone, healthy_hosts_per_zone, hosts_added, hosts_removed);
}

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::shutdown() {
Expand Down
4 changes: 2 additions & 2 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ class ClusterManagerImpl : public ClusterManager {
void drainConnPools(HostPtr old_host, ConnPoolsContainer& container);
static void updateClusterMembership(const std::string& name, ConstHostVectorPtr hosts,
ConstHostVectorPtr healthy_hosts,
ConstHostVectorPtr local_zone_hosts,
ConstHostVectorPtr local_zone_healthy_hosts,
ConstHostListsPtr hosts_per_zone,
ConstHostListsPtr healthy_hosts_per_zone,
const std::vector<HostPtr>& hosts_added,
const std::vector<HostPtr>& hosts_removed,
ThreadLocal::Instance& tls, uint32_t thread_local_slot);
Expand Down
66 changes: 43 additions & 23 deletions source/common/upstream/load_balancer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,69 @@

namespace Upstream {

const std::vector<HostPtr>& LoadBalancerBase::hostsToUse() {
ASSERT(host_set_.healthyHosts().size() <= host_set_.hosts().size());
if (host_set_.hosts().empty()) {
return host_set_.hosts();
bool LoadBalancerBase::earlyExitNonZoneRouting() {
uint32_t number_of_zones = host_set_.healthyHostsPerZone().size();
if (number_of_zones < 2 ||
!runtime_.snapshot().featureEnabled("upstream.zone_routing.enabled", 100)) {
return true;
}

const std::vector<HostPtr>& local_zone_healthy_hosts = host_set_.healthyHostsPerZone()[0];
if (local_zone_healthy_hosts.empty()) {
return true;
}

// Do not perform zone routing for small clusters.
uint64_t min_cluster_size =
runtime_.snapshot().getInteger("upstream.zone_routing.min_cluster_size", 6U);

if (host_set_.healthyHosts().size() < min_cluster_size) {
stats_.zone_cluster_too_small_.inc();
return true;
}

return false;
}

bool LoadBalancerBase::isGlobalPanic() {
uint64_t global_panic_threshold =
std::min(100UL, runtime_.snapshot().getInteger("upstream.healthy_panic_threshold", 50));
double healthy_percent = 100.0 * host_set_.healthyHosts().size() / host_set_.hosts().size();

// If the % of healthy hosts in the cluster is less than our panic threshold, we use all hosts.
if (healthy_percent < global_panic_threshold) {
stats_.upstream_rq_lb_healthy_panic_.inc();
return host_set_.hosts();
return true;
}

uint32_t number_of_zones = stats_.upstream_zone_count_.value();
// Early exit if we cannot perform zone aware routing.
if (number_of_zones < 2 || host_set_.localZoneHealthyHosts().empty() ||
!runtime_.snapshot().featureEnabled("upstream.zone_routing.enabled", 100)) {
return host_set_.healthyHosts();
}
return false;
}

// Do not perform zone routing for small clusters.
uint64_t min_cluster_size =
runtime_.snapshot().getInteger("upstream.zone_routing.min_cluster_size", 6U);
const std::vector<HostPtr>& LoadBalancerBase::hostsToUse() {
ASSERT(host_set_.healthyHosts().size() <= host_set_.hosts().size());

if (host_set_.healthyHosts().size() < min_cluster_size) {
stats_.zone_cluster_too_small_.inc();
if (host_set_.hosts().empty() || isGlobalPanic()) {
return host_set_.hosts();
}

if (earlyExitNonZoneRouting()) {
return host_set_.healthyHosts();
}

// If number of hosts in a local zone big enough route all requests to the same zone.
if (host_set_.localZoneHealthyHosts().size() * number_of_zones >=
host_set_.healthyHosts().size()) {
// At this point it's guaranteed to be at least 2 zones.
uint32_t number_of_zones = host_set_.healthyHostsPerZone().size();
ASSERT(number_of_zones >= 2U);
const std::vector<HostPtr>& local_zone_healthy_hosts = host_set_.healthyHostsPerZone()[0];

// If number of hosts in a local zone big enough then route all requests to the same zone.
if (local_zone_healthy_hosts.size() * number_of_zones >= host_set_.healthyHosts().size()) {
stats_.zone_over_percentage_.inc();
return host_set_.localZoneHealthyHosts();
return local_zone_healthy_hosts;
}

// If local zone ratio is lower than expected we should only partially route requests from the
// same zone.
double zone_host_ratio =
1.0 * host_set_.localZoneHealthyHosts().size() / host_set_.healthyHosts().size();
double zone_host_ratio = 1.0 * local_zone_healthy_hosts.size() / host_set_.healthyHosts().size();
double ratio_to_route = zone_host_ratio * number_of_zones;

// Not zone routed requests will be distributed between all hosts and hence
Expand All @@ -63,7 +83,7 @@ const std::vector<HostPtr>& LoadBalancerBase::hostsToUse() {

if (random_.random() % 10000 < zone_routing_threshold) {
stats_.zone_routing_sampled_.inc();
return host_set_.localZoneHealthyHosts();
return local_zone_healthy_hosts;
} else {
stats_.zone_routing_no_sampled_.inc();
return host_set_.healthyHosts();
Expand Down
3 changes: 3 additions & 0 deletions source/common/upstream/load_balancer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ class LoadBalancerBase {
Runtime::RandomGenerator& random_;

private:
bool earlyExitNonZoneRouting();
bool isGlobalPanic();

const HostSet& host_set_;
const HostSet* local_host_set_;
};
Expand Down
4 changes: 2 additions & 2 deletions source/common/upstream/logical_dns_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ void LogicalDnsCluster::startResolve() {
logical_host_.reset(new LogicalHost(*this, dns_url_, *this));
HostVectorPtr new_hosts(new std::vector<HostPtr>());
new_hosts->emplace_back(logical_host_);
updateHosts(new_hosts, createHealthyHostList(*new_hosts), empty_host_list_,
empty_host_list_, *new_hosts, {});
updateHosts(new_hosts, createHealthyHostList(*new_hosts), empty_host_lists_,
empty_host_lists_, *new_hosts, {});
}
}

Expand Down
23 changes: 18 additions & 5 deletions source/common/upstream/sds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,30 @@ void SdsClusterImpl::parseSdsResponse(Http::Message& response) {
if (updateDynamicHostList(new_hosts, *current_hosts_copy, hosts_added, hosts_removed,
health_checker_ != nullptr)) {
log_debug("sds hosts changed for cluster: {} ({})", name_, hosts().size());
HostVectorPtr local_zone_hosts(new std::vector<HostPtr>());
HostListsPtr per_zone(new std::vector<std::vector<HostPtr>>());

// If local zone name is not defined then skip populating per zone hosts.
if (!sds_config_.local_zone_name_.empty()) {
std::map<std::string, std::vector<HostPtr>> hosts_per_zone;

for (HostPtr host : *current_hosts_copy) {
if (host->zone() == sds_config_.local_zone_name_) {
local_zone_hosts->push_back(host);
hosts_per_zone[host->zone()].push_back(host);
}

// Populate per_zone hosts only if upstream cluster has hosts in the same zone.
if (hosts_per_zone.find(sds_config_.local_zone_name_) != hosts_per_zone.end()) {
per_zone->push_back(hosts_per_zone[sds_config_.local_zone_name_]);

for (auto& entry : hosts_per_zone) {
if (sds_config_.local_zone_name_ != entry.first) {
per_zone->push_back(entry.second);
}
}
}
}

updateHosts(current_hosts_copy, createHealthyHostList(*current_hosts_copy), local_zone_hosts,
createHealthyHostList(*local_zone_hosts), hosts_added, hosts_removed);
updateHosts(current_hosts_copy, createHealthyHostList(*current_hosts_copy), per_zone,
createHealthyHostLists(*per_zone), hosts_added, hosts_removed);

if (initialize_callback_ && health_checker_ && pending_health_checks_ == 0) {
pending_health_checks_ = hosts().size();
Expand Down
40 changes: 26 additions & 14 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void HostSetImpl::runUpdateCallbacks(const std::vector<HostPtr>& hosts_added,
}
}

const ConstHostVectorPtr ClusterImplBase::empty_host_list_{new std::vector<HostPtr>{}};
const ConstHostListsPtr ClusterImplBase::empty_host_lists_{new std::vector<std::vector<HostPtr>>()};

ClusterImplBase::ClusterImplBase(const Json::Object& config, Runtime::Loader& runtime,
Stats::Store& stats, Ssl::ContextManager& ssl_context_manager)
Expand Down Expand Up @@ -85,7 +85,7 @@ ClusterImplBase::ClusterImplBase(const Json::Object& config, Runtime::Loader& ru

ConstHostVectorPtr ClusterImplBase::createHealthyHostList(const std::vector<HostPtr>& hosts) {
HostVectorPtr healthy_list(new std::vector<HostPtr>());
for (auto host : hosts) {
for (const auto& host : hosts) {
if (host->healthy()) {
healthy_list->emplace_back(host);
}
Expand All @@ -94,6 +94,23 @@ ConstHostVectorPtr ClusterImplBase::createHealthyHostList(const std::vector<Host
return healthy_list;
}

ConstHostListsPtr
ClusterImplBase::createHealthyHostLists(const std::vector<std::vector<HostPtr>>& hosts) {
HostListsPtr healthy_list(new std::vector<std::vector<HostPtr>>());

for (const auto& hosts_zone : hosts) {
std::vector<HostPtr> current_zone_hosts;
for (const auto& host : hosts_zone) {
if (host->healthy()) {
current_zone_hosts.emplace_back(host);
}
}
healthy_list->push_back(std::move(current_zone_hosts));
}

return healthy_list;
}

uint64_t ClusterImplBase::parseFeatures(const Json::Object& config) {
uint64_t features = 0;
for (const std::string& feature : StringUtil::split(config.getString("features", ""), ',')) {
Expand Down Expand Up @@ -130,8 +147,8 @@ void ClusterImplBase::setHealthChecker(HealthCheckerPtr&& health_checker) {
// If we get a health check completion that resulted in a state change, signal to
// update the host sets on all threads.
if (changed_state) {
updateHosts(rawHosts(), createHealthyHostList(*rawHosts()), rawLocalZoneHosts(),
createHealthyHostList(*rawLocalZoneHosts()), {}, {});
updateHosts(rawHosts(), createHealthyHostList(*rawHosts()), rawHostsPerZone(),
createHealthyHostLists(*rawHostsPerZone()), {}, {});
}
});
}
Expand All @@ -143,8 +160,8 @@ void ClusterImplBase::setOutlierDetector(OutlierDetectorPtr&& outlier_detector)

outlier_detector_ = std::move(outlier_detector);
outlier_detector_->addChangedStateCb([this](HostPtr) -> void {
updateHosts(rawHosts(), createHealthyHostList(*rawHosts()), rawLocalZoneHosts(),
createHealthyHostList(*rawLocalZoneHosts()), {}, {});
updateHosts(rawHosts(), createHealthyHostList(*rawHosts()), rawHostsPerZone(),
createHealthyHostLists(*rawHostsPerZone()), {}, {});
});
}

Expand Down Expand Up @@ -187,8 +204,8 @@ StaticClusterImpl::StaticClusterImpl(const Json::Object& config, Runtime::Loader
new_hosts->emplace_back(HostPtr{new HostImpl(*this, url, false, 1, "")});
}

updateHosts(new_hosts, createHealthyHostList(*new_hosts), empty_host_list_, empty_host_list_, {},
{});
updateHosts(new_hosts, createHealthyHostList(*new_hosts), empty_host_lists_, empty_host_lists_,
{}, {});
}

bool BaseDynamicClusterImpl::updateDynamicHostList(const std::vector<HostPtr>& new_hosts,
Expand All @@ -197,7 +214,6 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const std::vector<HostPtr>& n
std::vector<HostPtr>& hosts_removed,
bool depend_on_hc) {
uint64_t max_host_weight = 1;
std::unordered_set<std::string> zones;

// Go through and see if the list we have is different from what we just got. If it is, we
// make a new host list and raise a change notification. This uses an N^2 search given that
Expand All @@ -209,7 +225,6 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const std::vector<HostPtr>& n
// If we find a host matched based on URL, we keep it. However we do change weight inline so
// do that here.
if ((*i)->url() == host->url()) {
zones.insert((*i)->zone());
if (host->weight() > max_host_weight) {
max_host_weight = host->weight();
}
Expand All @@ -227,7 +242,6 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const std::vector<HostPtr>& n
if (host->weight() > max_host_weight) {
max_host_weight = host->weight();
}
zones.insert(host->zone());

final_hosts.push_back(host);
hosts_added.push_back(host);
Expand All @@ -246,7 +260,6 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const std::vector<HostPtr>& n
if ((*i)->weight() > max_host_weight) {
max_host_weight = (*i)->weight();
}
zones.insert((*i)->zone());

final_hosts.push_back(*i);
i = current_hosts.erase(i);
Expand All @@ -257,7 +270,6 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const std::vector<HostPtr>& n
}

stats_.max_host_weight_.set(max_host_weight);
stats_.upstream_zone_count_.set(zones.size());

if (!hosts_added.empty() || !current_hosts.empty()) {
hosts_removed = std::move(current_hosts);
Expand Down Expand Up @@ -295,7 +307,7 @@ void StrictDnsClusterImpl::updateAllHosts(const std::vector<HostPtr>& hosts_adde
}
}

updateHosts(new_hosts, createHealthyHostList(*new_hosts), empty_host_list_, empty_host_list_,
updateHosts(new_hosts, createHealthyHostList(*new_hosts), empty_host_lists_, empty_host_lists_,
hosts_added, hosts_removed);
}

Expand Down
Loading