Skip to content

Commit

Permalink
[#22930, #22311] DocDB: Update DNS cache in background
Browse files Browse the repository at this point in the history
Summary:
We have DNS cache controlled by gflag dns_cache_expiration_ms (60000 by default).
When new RPC call to node with hostname is started and existing record has expired, we start new DNS resolution.
And RPC call waits until this DNS resolution completes.

But, actually, we don't have to wait until it completes.
The address from previous resolution could be used for this RPC call, while cache update could happen in background.
Once response to a new DNS request is received, we could update cached entry and use new address for all new RPC calls.

This diff implements such behaviour.

Also added flag dns_cache_failure_expiration_ms (2s by default) to control the time before DNS resolution retry in case of failure.

Added metric dns_resolve_latency that reflects time spend by DNS resolution.
Jira: DB-11847, DB-11222

Test Plan: Jenkins

Reviewers: qhu, rthallam, slingam

Reviewed By: qhu, slingam

Subscribers: slingam, ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D35993
  • Loading branch information
spolitov committed Jun 21, 2024
1 parent 4b6986c commit bf0fb4b
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/yb/rpc/messenger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ Messenger::Messenger(const MessengerBuilder &bld)
.name = name_,
.max_workers = bld.workers_limit_,
})),
resolver_(new DnsResolver(&io_thread_pool_.io_service())),
resolver_(new DnsResolver(&io_thread_pool_.io_service(), metric_entity_)),
rpc_metrics_(std::make_shared<RpcMetrics>(bld.metric_entity_)),
num_connections_to_server_(bld.num_connections_to_server_) {
#ifndef NDEBUG
Expand Down
4 changes: 3 additions & 1 deletion src/yb/util/net/dns_resolver-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
#include <gtest/gtest.h>

#include "yb/gutil/strings/util.h"

#include "yb/util/countdown_latch.h"
#include "yb/util/metrics.h"
#include "yb/util/net/sockaddr.h"
#include "yb/util/status_log.h"
#include "yb/util/test_macros.h"
Expand Down Expand Up @@ -83,7 +85,7 @@ class DnsResolverTest : public YBTest {
IoService io_service_;
boost::optional<IoService::work> work_{io_service_};

DnsResolver resolver_{&io_service_};
DnsResolver resolver_{&io_service_, nullptr};
};

TEST_F(DnsResolverTest, TestResolution) {
Expand Down
62 changes: 52 additions & 10 deletions src/yb/util/net/dns_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,18 @@

using namespace std::literals;

DEFINE_UNKNOWN_int64(dns_cache_expiration_ms, 60000,
DEFINE_RUNTIME_int64(dns_cache_expiration_ms, 60000,
"Time to store DNS resolution results in cache.");

DEFINE_RUNTIME_int64(dns_cache_failure_expiration_ms, 2000,
"Time before DNS resolution retry in case of failure.");

METRIC_DEFINE_event_stats(
server, dns_resolve_latency,
"DNS Resolve latency",
yb::MetricUnit::kMicroseconds,
"Microseconds spent resolving DNS requests");

namespace yb {

namespace {
Expand Down Expand Up @@ -92,14 +101,20 @@ Result<IpAddress> PickResolvedAddress(

class DnsResolver::Impl {
public:
explicit Impl(IoService* io_service) : io_service_(*io_service), resolver_(*io_service) {}
Impl(IoService* io_service, const scoped_refptr<MetricEntity>& metric_entity)
: io_service_(*io_service), resolver_(*io_service) {
if (metric_entity) {
metric_ = METRIC_dns_resolve_latency.Instantiate(metric_entity);
}
}

std::shared_future<Result<IpAddress>> ResolveFuture(const std::string& host) {
return ObtainEntry(host)->DoResolve(host, /* callback= */ nullptr, &io_service_, &resolver_);
return ObtainEntry(host)->DoResolve(
host, /* callback= */ nullptr, &io_service_, &resolver_, metric_);
}

void AsyncResolve(const std::string& host, const AsyncResolveCallback& callback) {
ObtainEntry(host)->DoResolve(host, &callback, &io_service_, &resolver_);
ObtainEntry(host)->DoResolve(host, &callback, &io_service_, &resolver_, metric_);
}

private:
Expand All @@ -110,6 +125,7 @@ class DnsResolver::Impl {
CoarseTimePoint expiration GUARDED_BY(mutex) = CoarseTimePoint::min();
std::shared_future<Result<IpAddress>> future GUARDED_BY(mutex);
std::vector<AsyncResolveCallback> waiters GUARDED_BY(mutex);
bool has_resolved_address GUARDED_BY(mutex) = false;

void SetResult(
const Result<IpAddress>& result,
Expand All @@ -123,7 +139,20 @@ class DnsResolver::Impl {
decltype(waiters) to_notify;
{
std::lock_guard lock(mutex);
expiration = CoarseMonoClock::now() + FLAGS_dns_cache_expiration_ms * 1ms;
uint64_t expiration_ms;
if (result.ok()) {
// If address was already resolved we did not fetch future from promise on resolution
// start so do it now to update existing resolved address.
if (has_resolved_address) {
future = promise->get_future().share();
} else {
has_resolved_address = true;
}
expiration_ms = FLAGS_dns_cache_expiration_ms;
} else {
expiration_ms = FLAGS_dns_cache_failure_expiration_ms;
}
expiration = CoarseMonoClock::now() + expiration_ms * 1ms;
waiters.swap(to_notify);
}
for (const auto& waiter : to_notify) {
Expand All @@ -133,14 +162,14 @@ class DnsResolver::Impl {

std::shared_future<Result<IpAddress>> DoResolve(
const std::string& host, const AsyncResolveCallback* callback, IoService* io_service,
Resolver* resolver) {
Resolver* resolver, const scoped_refptr<EventStats>& metric) {
std::shared_ptr<std::promise<Result<IpAddress>>> promise;
std::shared_future<Result<IpAddress>> result;
{
std::lock_guard lock(mutex);
promise = StartResolve(host);
result = future;
if (callback && expiration == CoarseTimePoint::max()) {
if (callback && expiration == CoarseTimePoint::max() && !has_resolved_address) {
// Resolve is in progress by a different caller.
waiters.push_back(*callback);
callback = nullptr;
Expand All @@ -155,11 +184,20 @@ class DnsResolver::Impl {
static const std::string kService = "";
resolver->async_resolve(
Resolver::query(host, kService),
[this, host, promise](
[this, host, promise, metric, start_time = CoarseMonoClock::Now()](
const boost::system::error_code& error,
const Resolver::results_type& entries) mutable {
// Unfortunately there is no safe way to set promise value from 2 different threads, w/o
// catching exception in case of concurrency.
const auto now = CoarseMonoClock::Now();
if (metric) {
metric->Increment(ToMicroseconds(now - start_time));
}
if (start_time + 250ms < now) {
YB_LOG_EVERY_N_SECS(WARNING, 5)
<< "Long time to resolse DNS for " << host << ": " << MonoDelta(now - start_time)
<< THROTTLE_MSG;
}
SetResult(PickResolvedAddress(host, error, entries), promise.get());
});

Expand All @@ -178,7 +216,9 @@ class DnsResolver::Impl {
}

auto promise = std::make_shared<std::promise<Result<IpAddress>>>();
future = promise->get_future().share();
if (!has_resolved_address) {
future = promise->get_future().share();
}

auto address = TryFastResolve(host);
if (address) {
Expand Down Expand Up @@ -208,11 +248,13 @@ class DnsResolver::Impl {

IoService& io_service_;
Resolver resolver_;
scoped_refptr<EventStats> metric_;
std::shared_timed_mutex mutex_;
std::unordered_map<std::string, CacheEntry> cache_;
};

DnsResolver::DnsResolver(IoService* io_service) : impl_(new Impl(io_service)) {
DnsResolver::DnsResolver(IoService* io_service, const scoped_refptr<MetricEntity>& metric_entity)
: impl_(new Impl(io_service, metric_entity)) {
}

DnsResolver::~DnsResolver() {
Expand Down
3 changes: 2 additions & 1 deletion src/yb/util/net/dns_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
namespace yb {

class EventStats;
class MetricEntity;
class HostPort;
class ThreadPool;

Expand All @@ -53,7 +54,7 @@ class DnsResolver {
DnsResolver(const DnsResolver&) = delete;
void operator=(const DnsResolver&) = delete;

explicit DnsResolver(IoService* io_service);
explicit DnsResolver(IoService* io_service, const scoped_refptr<MetricEntity>& metric_entity);
~DnsResolver();

std::shared_future<Result<IpAddress>> ResolveFuture(const std::string& host);
Expand Down

0 comments on commit bf0fb4b

Please sign in to comment.