Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle DNS SERVFAIL differently than NXDOMAIN #2776

Merged
merged 14 commits into from
Apr 11, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@

import static io.netty.handler.codec.dns.DefaultDnsRecordDecoder.decodeName;
import static io.netty.handler.codec.dns.DnsRecordType.SRV;
import static io.netty.handler.codec.dns.DnsResponseCode.NXDOMAIN;
import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.AVAILABLE;
import static io.servicetalk.concurrent.api.AsyncCloseables.toAsyncCloseable;
import static io.servicetalk.concurrent.api.Completable.completed;
Expand Down Expand Up @@ -132,18 +133,18 @@ final class DefaultDnsClient implements DnsClient {
@Nullable
private final DnsServiceDiscovererObserver observer;
private final ServiceDiscovererEvent.Status missingRecordStatus;
private final boolean nxInvalidation;
private final IntFunction<? extends Completable> srvHostNameRepeater;
private final int srvConcurrency;
private final boolean srvFilterDuplicateEvents;
private final boolean inactiveEventsOnError;
private final DnsResolverAddressTypes addressTypes;
private final String id;
private boolean closed;

DefaultDnsClient(final String id, final IoExecutor ioExecutor, final int consolidateCacheSize,
final int minTTL, final int maxTTL, final int minCacheTTL, final int maxCacheTTL,
final int negativeTTLCacheSeconds, final long ttlJitterNanos,
final int srvConcurrency, final boolean inactiveEventsOnError,
final int srvConcurrency,
final boolean completeOncePreferredResolved, final boolean srvFilterDuplicateEvents,
Duration srvHostNameRepeatInitialDelay, Duration srvHostNameRepeatJitter,
@Nullable Integer maxUdpPayloadSize, @Nullable final Integer ndots,
Expand All @@ -152,10 +153,10 @@ final class DefaultDnsClient implements DnsClient {
@Nullable final SocketAddress localAddress,
@Nullable final DnsServerAddressStreamProvider dnsServerAddressStreamProvider,
@Nullable final DnsServiceDiscovererObserver observer,
final ServiceDiscovererEvent.Status missingRecordStatus) {
final ServiceDiscovererEvent.Status missingRecordStatus,
final boolean nxInvalidation) {
this.srvConcurrency = srvConcurrency;
this.srvFilterDuplicateEvents = srvFilterDuplicateEvents;
this.inactiveEventsOnError = inactiveEventsOnError;
// Implementation of this class expects to use only single EventLoop from IoExecutor
this.nettyIoExecutor = toEventLoopAwareNettyIoExecutor(ioExecutor).next();
// We must use nettyIoExecutor for the repeater for thread safety!
Expand All @@ -170,6 +171,7 @@ final class DefaultDnsClient implements DnsClient {
this.addressTypes = dnsResolverAddressTypes;
this.observer = observer;
this.missingRecordStatus = missingRecordStatus;
this.nxInvalidation = nxInvalidation;
this.id = id + '@' + toHexString(identityHashCode(this));
asyncCloseable = toAsyncCloseable(graceful -> {
if (nettyIoExecutor.isCurrentThreadEventLoop()) {
Expand Down Expand Up @@ -252,9 +254,8 @@ public Publisher<Collection<ServiceDiscovererEvent<InetAddress>>> dnsQuery(final
return defer(() -> {
final DnsDiscoveryObserver discoveryObserver = newDiscoveryObserver(address);
ARecordPublisher pub = new ARecordPublisher(address, discoveryObserver);
Publisher<? extends Collection<ServiceDiscovererEvent<InetAddress>>> events = inactiveEventsOnError ?
recoverWithInactiveEvents(pub, false) :
pub;
Publisher<? extends Collection<ServiceDiscovererEvent<InetAddress>>> events =
recoverWithInactiveEvents(pub, false, nxInvalidation);
return discoveryObserver == null ? events : events.beforeFinally(new TerminalSignalConsumer() {
@Override
public void onComplete() {
Expand Down Expand Up @@ -298,7 +299,7 @@ public Publisher<Collection<ServiceDiscovererEvent<InetSocketAddress>>> dnsSrvQu
// any pending scheduled tasks. SrvInactiveCombinerOperator is used to filter the aggregated collection of
// inactive events if necessary.
Publisher<Collection<ServiceDiscovererEvent<InetSocketAddress>>> events =
recoverWithInactiveEvents(new SrvRecordPublisher(serviceName, discoveryObserver), true)
recoverWithInactiveEvents(new SrvRecordPublisher(serviceName, discoveryObserver), true, nxInvalidation)
.flatMapConcatIterable(identity())
.flatMapMerge(srvEvent -> {
assertInEventloop();
Expand All @@ -312,8 +313,10 @@ public Publisher<Collection<ServiceDiscovererEvent<InetSocketAddress>>> dnsSrvQu
return newDuplicateSrv(serviceName, srvEvent.address().hostName());
}

// NXDOMAIN = invalidation for A queries part of SRV lookups for backwards compatibility.
// This is a behavior difference between plain A lookups and SRV rooted A lookups.
Publisher<? extends Collection<ServiceDiscovererEvent<InetAddress>>> returnPub =
recoverWithInactiveEvents(aPublisher, false);
recoverWithInactiveEvents(aPublisher, false, true);
return srvFilterDuplicateEvents ?
srvFilterDups(returnPub, availableAddresses, srvEvent.address().port()) :
returnPub.map(ev -> mapEventList(ev, inetAddress ->
Expand All @@ -338,7 +341,7 @@ public Publisher<Collection<ServiceDiscovererEvent<InetSocketAddress>>> dnsSrvQu
return empty();
}
}, srvConcurrency)
.liftSync(inactiveEventsOnError ? SrvInactiveCombinerOperator.EMIT : SrvInactiveCombinerOperator.NO_EMIT);
.liftSync(SrvInactiveCombinerOperator.EMIT);

return discoveryObserver == null ? events : events.beforeFinally(new TerminalSignalConsumer() {
@Override
Expand Down Expand Up @@ -952,10 +955,10 @@ private static Publisher<? extends Collection<ServiceDiscovererEvent<InetSocketA
}

private static <T, A> Publisher<? extends Collection<ServiceDiscovererEvent<T>>> recoverWithInactiveEvents(
AbstractDnsPublisher<T> pub, boolean generateAggregateEvent) {
AbstractDnsPublisher<T> pub, boolean generateAggregateEvent, boolean nxInvalidation) {
return pub.onErrorResume(cause -> {
AbstractDnsPublisher<T>.AbstractDnsSubscription subscription = pub.subscription;
if (subscription != null) {
if (subscription != null && shouldRevokeState(cause, nxInvalidation)) {
List<ServiceDiscovererEvent<T>> events = subscription.generateInactiveEvent();
if (!events.isEmpty()) {
return (generateAggregateEvent ? Publisher.<List<ServiceDiscovererEvent<T>>>from(
Expand All @@ -968,6 +971,16 @@ private static <T, A> Publisher<? extends Collection<ServiceDiscovererEvent<T>>>
});
}

private static boolean shouldRevokeState(final Throwable t, final boolean nxInvalidation) {
// ISE => Subscriber exceptions (downstream of retry)
return t instanceof SrvAddressRemovedException || t instanceof IllegalStateException ||
t instanceof ClosedDnsServiceDiscovererException || (nxInvalidation &&
// string matching is done on purpose to avoid the hard Netty dependency
(t.getCause() != null && t.getCause().getClass().getName()
.equals("io.netty.resolver.dns.DnsErrorCauseException")) &&
Comment on lines +978 to +980
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting work around.

NXDOMAIN.equals(((io.netty.resolver.dns.DnsErrorCauseException) t.getCause()).getCode()));
}

private static <T> Publisher<T> newDuplicateSrv(String serviceName, String resolvedAddress) {
return failed(new IllegalStateException("Duplicate SRV entry for SRV name " + serviceName + " for address " +
resolvedAddress));
Expand All @@ -982,7 +995,6 @@ private static final class SrvInactiveCombinerOperator implements
PublisherOperator<Collection<ServiceDiscovererEvent<InetSocketAddress>>,
Collection<ServiceDiscovererEvent<InetSocketAddress>>> {
static final SrvInactiveCombinerOperator EMIT = new SrvInactiveCombinerOperator(true);
static final SrvInactiveCombinerOperator NO_EMIT = new SrvInactiveCombinerOperator(false);
private final boolean emitAggregatedEvents;

private SrvInactiveCombinerOperator(boolean emitAggregatedEvents) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public final class DefaultDnsServiceDiscovererBuilder implements DnsServiceDisco
*/
@Deprecated // FIXME: 0.43 - consider removing this system property
private static final String SKIP_BINDING_PROPERTY = "io.servicetalk.dns.discovery.netty.skipBinding";
private static final String NX_DOMAIN_INVALIDATES_PROPERTY = "io.servicetalk.dns.discovery.nxdomain.invalidation";
static final boolean DEFAULT_NX_DOMAIN_INVALIDATES = parseProperty(NX_DOMAIN_INVALIDATES_PROPERTY, false);
@Nullable
private static final SocketAddress DEFAULT_LOCAL_ADDRESS =
getBoolean(SKIP_BINDING_PROPERTY) ? null : new InetSocketAddress(0);
Expand Down Expand Up @@ -101,6 +103,7 @@ public final class DefaultDnsServiceDiscovererBuilder implements DnsServiceDisco
LOGGER.debug("-D{}={}", NEGATIVE_TTL_CACHE_SECONDS_PROPERTY, negativeCacheTtlValue);
LOGGER.debug("Default negative TTL cache in seconds: {}", DEFAULT_NEGATIVE_TTL_CACHE_SECONDS);
LOGGER.debug("Default missing records status: {}", DEFAULT_MISSING_RECOREDS_STATUS);
LOGGER.debug("-D{}: {}", NX_DOMAIN_INVALIDATES_PROPERTY, DEFAULT_NX_DOMAIN_INVALIDATES);
}
}

Expand Down Expand Up @@ -128,7 +131,6 @@ public final class DefaultDnsServiceDiscovererBuilder implements DnsServiceDisco
private int negativeTTLCacheSeconds = DEFAULT_NEGATIVE_TTL_CACHE_SECONDS;
private Duration ttlJitter = ofSeconds(DEFAULT_TTL_POLL_JITTER_SECONDS);
private int srvConcurrency = 2048;
private boolean inactiveEventsOnError;
private boolean completeOncePreferredResolved = true;
private boolean srvFilterDuplicateEvents;
private Duration srvHostNameRepeatInitialDelay = ofSeconds(10);
Expand All @@ -138,6 +140,7 @@ public final class DefaultDnsServiceDiscovererBuilder implements DnsServiceDisco
@Nullable
private DnsServiceDiscovererObserver observer;
private ServiceDiscovererEvent.Status missingRecordStatus = DEFAULT_MISSING_RECOREDS_STATUS;
private boolean nxInvalidation = DEFAULT_NX_DOMAIN_INVALIDATES;

/**
* Creates a new {@link DefaultDnsServiceDiscovererBuilder}.
Expand Down Expand Up @@ -288,6 +291,18 @@ public DefaultDnsServiceDiscovererBuilder missingRecordStatus(ServiceDiscovererE
return this;
}

/**
* Modify the behavior of the system flag about invalidating DNS state when NXDOMAIN is seen.
* Default behavior is controlled through {@link #NX_DOMAIN_INVALIDATES_PROPERTY}.
*
* @param nxInvalidation Flag to enable/disable behavior.
* @return {@code this} builder.
*/
DefaultDnsServiceDiscovererBuilder nxInvalidates(final boolean nxInvalidation) {
this.nxInvalidation = nxInvalidation;
return this;
}

@Override
public ServiceDiscoverer<String, InetSocketAddress, ServiceDiscovererEvent<InetSocketAddress>>
buildSrvDiscoverer() {
Expand All @@ -300,11 +315,6 @@ public DefaultDnsServiceDiscovererBuilder missingRecordStatus(ServiceDiscovererE
return asHostAndPortDiscoverer(build());
}

DefaultDnsServiceDiscovererBuilder inactiveEventsOnError(boolean inactiveEventsOnError) {
this.inactiveEventsOnError = inactiveEventsOnError;
return this;
}

DefaultDnsServiceDiscovererBuilder srvConcurrency(int srvConcurrency) {
this.srvConcurrency = ensurePositive(srvConcurrency, "srvConcurrency");
return this;
Expand All @@ -317,8 +327,11 @@ DefaultDnsServiceDiscovererBuilder completeOncePreferredResolved(boolean complet

DefaultDnsServiceDiscovererBuilder srvHostNameRepeatDelay(
Duration initialDelay, Duration jitter) {
this.srvHostNameRepeatInitialDelay = requireNonNull(initialDelay);
this.srvHostNameRepeatJitter = requireNonNull(jitter);
this.srvHostNameRepeatInitialDelay = ensurePositive(initialDelay, "srvHostNameRepeatInitialDelay");
this.srvHostNameRepeatJitter = ensurePositive(jitter, "srvHostNameRepeatJitter");
if (srvHostNameRepeatJitter.toNanos() >= srvHostNameRepeatInitialDelay.toNanos()) {
throw new IllegalArgumentException("The jitter value should be less than the initial delay.");
}
return this;
}

Expand Down Expand Up @@ -369,10 +382,10 @@ DnsClient build() {
ioExecutor == null ? globalExecutionContext().ioExecutor() : ioExecutor, consolidateCacheSize,
minTTLSeconds, maxTTLSeconds, minTTLCacheSeconds, maxTTLCacheSeconds, negativeTTLCacheSeconds,
ttlJitter.toNanos(),
srvConcurrency, inactiveEventsOnError, completeOncePreferredResolved, srvFilterDuplicateEvents,
srvConcurrency, completeOncePreferredResolved, srvFilterDuplicateEvents,
srvHostNameRepeatInitialDelay, srvHostNameRepeatJitter, maxUdpPayloadSize, ndots, optResourceEnabled,
queryTimeout, dnsResolverAddressTypes, localAddress, dnsServerAddressStreamProvider, observer,
missingRecordStatus);
missingRecordStatus, nxInvalidation);
return filterFactory == null ? rawClient : filterFactory.create(rawClient);
}

Expand All @@ -389,4 +402,12 @@ private static int parseProperty(final String name, final int defaultValue) {
return defaultValue;
}
}

private static boolean parseProperty(final String name, final boolean defaultValue) {
final String value = getProperty(name);
if (value == null) {
return defaultValue;
}
return Boolean.parseBoolean(value);
}
}
Loading
Loading