diff --git a/src/main/java/com/orbitz/consul/cache/CacheConfig.java b/src/main/java/com/orbitz/consul/cache/CacheConfig.java index 0677f8e5..8fa3c1d4 100644 --- a/src/main/java/com/orbitz/consul/cache/CacheConfig.java +++ b/src/main/java/com/orbitz/consul/cache/CacheConfig.java @@ -19,6 +19,7 @@ class CacheConfig { private static String TIMEOUT_AUTO_ENABLED = "timeout.autoAdjustment.enable"; private static String TIMEOUT_AUTO_MARGIN = "timeout.autoAdjustment.margin"; + private static String REQUEST_RATE_LIMITER = "minTimeBetweenRequests"; private static final Supplier INSTANCE = Suppliers.memoize(CacheConfig::new); @@ -99,4 +100,16 @@ Duration getWatchDuration() { return duration; } + + /** + * Gets the minimum time between two requests for caches. + * @throws RuntimeException if an error occurs while retrieving the configuration property. + */ + Duration getMinimumDurationBetweenRequests() { + try { + return config.getDuration(REQUEST_RATE_LIMITER); + } catch (Exception ex) { + throw new RuntimeException(String.format("Error extracting config variable %s", REQUEST_RATE_LIMITER), ex); + } + } } diff --git a/src/main/java/com/orbitz/consul/cache/ConsulCache.java b/src/main/java/com/orbitz/consul/cache/ConsulCache.java index ab8fbb42..0e43ee26 100644 --- a/src/main/java/com/orbitz/consul/cache/ConsulCache.java +++ b/src/main/java/com/orbitz/consul/cache/ConsulCache.java @@ -1,6 +1,7 @@ package com.orbitz.consul.cache; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.orbitz.consul.ConsulException; @@ -12,6 +13,7 @@ import org.slf4j.LoggerFactory; import java.math.BigInteger; +import java.time.Duration; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -52,6 +54,7 @@ enum State {latent, starting, started, stopped } new ThreadFactoryBuilder().setDaemon(true).build()); private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); private final ReentrantLock listenersStartingLock = new ReentrantLock(); + private final Stopwatch stopWatch = Stopwatch.createUnstarted(); private final Function keyConversion; private final CallbackConsumer callBackConsumer; @@ -72,8 +75,10 @@ public void onComplete(ConsulResponse> consulResponse) { if (!isRunning()) { return; } + Duration elapsedTime = stopWatch.elapsed(); updateIndex(consulResponse); - LOGGER.debug("Consul cache updated (index={})", latestIndex); + LOGGER.debug("Consul cache updated (index={}), request duration: {} ms", + latestIndex, elapsedTime.toMillis()); ImmutableMap full = convertToMap(consulResponse); @@ -107,7 +112,15 @@ public void onComplete(ConsulResponse> consulResponse) { if (state.compareAndSet(State.starting, State.started)) { initLatch.countDown(); } - runCallback(); + + Duration timeToWait = CacheConfig.get().getMinimumDurationBetweenRequests().minus(elapsedTime); + if (timeToWait.isNegative() || timeToWait.isZero()) { + runCallback(); + } else { + executorService.schedule(ConsulCache.this::runCallback, + timeToWait.toMillis(), TimeUnit.MILLISECONDS); + } + } else { onFailure(new ConsulException("Consul cluster has no elected leader")); } @@ -133,6 +146,9 @@ public void start() { public void stop() { State previous = state.getAndSet(State.stopped); + if (stopWatch.isRunning()) { + stopWatch.stop(); + } if (previous != State.stopped) { executorService.shutdownNow(); } @@ -145,6 +161,7 @@ public void close() throws Exception { private void runCallback() { if (isRunning()) { + stopWatch.reset().start(); callBackConsumer.consume(latestIndex.get(), responseCallback); } } diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 42388bba..e1a13bce 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -1,7 +1,10 @@ com.orbitz.consul { - cache.backOffDelay: 10 seconds - cache.timeout.autoAdjustment { - enable: true - margin: 2 seconds + cache { + backOffDelay: 10 seconds + minTimeBetweenRequests: 0 seconds + timeout.autoAdjustment { + enable: true + margin: 2 seconds + } } }