diff --git a/build.gradle b/build.gradle index d022610ffa..ca4369842c 100644 --- a/build.gradle +++ b/build.gradle @@ -67,6 +67,9 @@ dependencies { testCompile "ch.qos.logback:logback-classic:0.9.26" testCompile "io.dropwizard.metrics:metrics-healthchecks:3.1.2" testCompile "org.mockito:mockito-core:1.10.19" + testCompile "org.powermock:powermock:1.6.6" + testCompile "org.powermock:powermock-api-mockito:1.6.6" + testCompile "org.powermock:powermock-module-junit4:1.6.6" testCompile "io.projectreactor:reactor-core:2.5.0.M2" testCompile "com.jayway.awaitility:awaitility:1.7.0" diff --git a/src/jmh/java/javaslang/circuitbreaker/30-Nov-2016-i7-I7-5557U-OSX-Java-1.8.0_112-x64.txt b/src/jmh/java/javaslang/circuitbreaker/30-Nov-2016-i7-I7-5557U-OSX-Java-1.8.0_112-x64.txt deleted file mode 100644 index c9c091354e..0000000000 --- a/src/jmh/java/javaslang/circuitbreaker/30-Nov-2016-i7-I7-5557U-OSX-Java-1.8.0_112-x64.txt +++ /dev/null @@ -1,41 +0,0 @@ -Benchmark Mode Cnt Score Error Units - -RateLimiterBenchmark.atomicPermission thrpt 10 7.274 ± 0.132 ops/us -RateLimiterBenchmark.semaphoreBasedPermission thrpt 10 17.335 ± 3.441 ops/us -RateLimiterBenchmark.timeBasedPermission thrpt 10 3.522 ± 0.495 ops/us - -RateLimiterBenchmark.atomicPermission avgt 10 0.294 ± 0.038 us/op -RateLimiterBenchmark.semaphoreBasedPermission avgt 10 0.120 ± 0.018 us/op -RateLimiterBenchmark.timeBasedPermission avgt 10 0.562 ± 0.045 us/op - -RateLimiterBenchmark.atomicPermission sample 535765 1.480 ± 0.036 us/op -RateLimiterBenchmark.atomicPermission:atomicPermission·p0.00 sample 0.040 us/op -RateLimiterBenchmark.atomicPermission:atomicPermission·p0.50 sample 0.383 us/op -RateLimiterBenchmark.atomicPermission:atomicPermission·p0.90 sample 4.288 us/op -RateLimiterBenchmark.atomicPermission:atomicPermission·p0.95 sample 7.368 us/op -RateLimiterBenchmark.atomicPermission:atomicPermission·p0.99 sample 14.080 us/op -RateLimiterBenchmark.atomicPermission:atomicPermission·p0.999 sample 18.048 us/op -RateLimiterBenchmark.atomicPermission:atomicPermission·p0.9999 sample 58.449 us/op -RateLimiterBenchmark.atomicPermission:atomicPermission·p1.00 sample 1654.784 us/op -RateLimiterBenchmark.semaphoreBasedPermission sample 635614 0.166 ± 0.010 us/op -RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.00 sample 0.001 us/op -RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.50 sample 0.135 us/op -RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.90 sample 0.219 us/op -RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.95 sample 0.236 us/op -RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.99 sample 0.333 us/op -RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.999 sample 2.468 us/op -RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p0.9999 sample 15.519 us/op -RateLimiterBenchmark.semaphoreBasedPermission:semaphoreBasedPermission·p1.00 sample 1372.160 us/op -RateLimiterBenchmark.timeBasedPermission sample 553560 0.800 ± 0.053 us/op -RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.00 sample 0.054 us/op -RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.50 sample 0.550 us/op -RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.90 sample 0.749 us/op -RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.95 sample 0.826 us/op -RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.99 sample 8.256 us/op -RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.999 sample 33.920 us/op -RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p0.9999 sample 160.221 us/op -RateLimiterBenchmark.timeBasedPermission:timeBasedPermission·p1.00 sample 5742.592 us/op - -RateLimiterBenchmark.atomicPermission ss 10 17.140 ± 5.640 us/op -RateLimiterBenchmark.semaphoreBasedPermission ss 10 9.724 ± 4.602 us/op -RateLimiterBenchmark.timeBasedPermission ss 10 26.875 ± 10.869 us/op diff --git a/src/jmh/java/javaslang/circuitbreaker/RateLimiterBenchmark.java b/src/jmh/java/javaslang/circuitbreaker/RateLimiterBenchmark.java index 80cdcc75a8..fbdc3c13b3 100644 --- a/src/jmh/java/javaslang/circuitbreaker/RateLimiterBenchmark.java +++ b/src/jmh/java/javaslang/circuitbreaker/RateLimiterBenchmark.java @@ -4,7 +4,6 @@ import javaslang.ratelimiter.RateLimiterConfig; import javaslang.ratelimiter.internal.AtomicRateLimiter; import javaslang.ratelimiter.internal.SemaphoreBasedRateLimiter; -import javaslang.ratelimiter.internal.TimeBasedRateLimiter; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -33,11 +32,9 @@ public class RateLimiterBenchmark { private static final int THREAD_COUNT = 2; private RateLimiter semaphoreBasedRateLimiter; - private RateLimiter timeBasedRateLimiter; private RateLimiter atomicRateLimiter; private Supplier semaphoreGuardedSupplier; - private Supplier timeGuardedSupplier; private Supplier atomicGuardedSupplier; @Setup @@ -48,7 +45,6 @@ public void setUp() { .timeoutDuration(Duration.ofSeconds(5)) .build(); semaphoreBasedRateLimiter = new SemaphoreBasedRateLimiter("semaphoreBased", rateLimiterConfig); - timeBasedRateLimiter = new TimeBasedRateLimiter("timeBased", rateLimiterConfig); atomicRateLimiter = new AtomicRateLimiter("atomicBased", rateLimiterConfig); Supplier stringSupplier = () -> { @@ -56,7 +52,6 @@ public void setUp() { return "Hello Benchmark"; }; semaphoreGuardedSupplier = RateLimiter.decorateSupplier(semaphoreBasedRateLimiter, stringSupplier); - timeGuardedSupplier = RateLimiter.decorateSupplier(timeBasedRateLimiter, stringSupplier); atomicGuardedSupplier = RateLimiter.decorateSupplier(atomicRateLimiter, stringSupplier); } @@ -69,15 +64,6 @@ public String semaphoreBasedPermission() { return semaphoreGuardedSupplier.get(); } - @Benchmark - @Threads(value = THREAD_COUNT) - @Warmup(iterations = WARMUP_COUNT) - @Fork(value = FORK_COUNT) - @Measurement(iterations = ITERATION_COUNT) - public String timeBasedPermission() { - return timeGuardedSupplier.get(); - } - @Benchmark @Threads(value = THREAD_COUNT) @Warmup(iterations = WARMUP_COUNT) diff --git a/src/main/java/javaslang/ratelimiter/RateLimiterConfig.java b/src/main/java/javaslang/ratelimiter/RateLimiterConfig.java index 21fa3d93b8..f725e97703 100644 --- a/src/main/java/javaslang/ratelimiter/RateLimiterConfig.java +++ b/src/main/java/javaslang/ratelimiter/RateLimiterConfig.java @@ -7,8 +7,7 @@ public class RateLimiterConfig { private static final String TIMEOUT_DURATION_MUST_NOT_BE_NULL = "TimeoutDuration must not be null"; private static final String LIMIT_REFRESH_PERIOD_MUST_NOT_BE_NULL = "LimitRefreshPeriod must not be null"; - - private static final Duration ACCEPTABLE_REFRESH_PERIOD = Duration.ofNanos(1L); // TODO: use jmh to find real one + private static final Duration ACCEPTABLE_REFRESH_PERIOD = Duration.ofNanos(1L); private final Duration timeoutDuration; private final Duration limitRefreshPeriod; diff --git a/src/main/java/javaslang/ratelimiter/internal/AtomicRateLimiter.java b/src/main/java/javaslang/ratelimiter/internal/AtomicRateLimiter.java index d0cb61ec56..430d7ba841 100644 --- a/src/main/java/javaslang/ratelimiter/internal/AtomicRateLimiter.java +++ b/src/main/java/javaslang/ratelimiter/internal/AtomicRateLimiter.java @@ -42,9 +42,7 @@ public AtomicRateLimiter(String name, RateLimiterConfig rateLimiterConfig) { permissionsPerCycle = rateLimiterConfig.getLimitForPeriod(); waitingThreads = new AtomicInteger(0); - long activeCycle = nanoTime() / cyclePeriodInNanos; - int activePermissions = permissionsPerCycle; - state = new AtomicReference<>(new State(activeCycle, activePermissions, 0)); + state = new AtomicReference<>(new State(0, 0, 0)); } /** @@ -69,7 +67,7 @@ public boolean getPermission(final Duration timeoutDuration) { * @return next {@link State} */ private State calculateNextState(final long timeoutInNanos, final State activeState) { - long currentNanos = nanoTime(); + long currentNanos = currentNanoTime(); long currentCycle = currentNanos / cyclePeriodInNanos; long nextCycle = activeState.activeCycle; @@ -85,6 +83,7 @@ private State calculateNextState(final long timeoutInNanos, final State activeSt return nextState; } + /** * Calculates time to wait for next permission as * [time to the next cycle] + [duration of full cycles until reserved permissions expire] @@ -147,19 +146,27 @@ private boolean waitForPermissionIfNecessary(final long timeoutInNanos, final lo /** * Parks {@link Thread} for nanosToWait. + *

If the current thread is {@linkplain Thread#interrupted} + * while waiting for a permit then it won't throw {@linkplain InterruptedException}, + * but its interrupt status will be set. * * @param nanosToWait nanoseconds caller need to wait * @return true if caller was not {@link Thread#interrupted} while waiting */ private boolean waitForPermission(final long nanosToWait) { waitingThreads.incrementAndGet(); - long deadline = nanoTime() + nanosToWait; - while (nanoTime() < deadline || currentThread().isInterrupted()) { - long sleepBlockDuration = deadline - nanoTime(); + long deadline = currentNanoTime() + nanosToWait; + boolean wasInterrupted = false; + while (currentNanoTime() < deadline && !wasInterrupted) { + long sleepBlockDuration = deadline - currentNanoTime(); parkNanos(sleepBlockDuration); + wasInterrupted = Thread.interrupted(); } waitingThreads.decrementAndGet(); - return !currentThread().isInterrupted(); + if (wasInterrupted) { + currentThread().interrupt(); + } + return !wasInterrupted; } /** @@ -182,7 +189,7 @@ public RateLimiterConfig getRateLimiterConfig() { * {@inheritDoc} */ @Override - public Metrics getMetrics() { + public AtomicRateLimiterMetrics getMetrics() { return new AtomicRateLimiterMetrics(); } @@ -201,6 +208,7 @@ public Metrics getMetrics() { * */ private static class State { + private final long activeCycle; private final int activePermissions; private final long nanosToWait; @@ -210,12 +218,14 @@ public State(final long activeCycle, final int activePermissions, final long nan this.activePermissions = activePermissions; this.nanosToWait = nanosToWait; } + } /** * Enhanced {@link Metrics} with some implementation specific details */ public final class AtomicRateLimiterMetrics implements Metrics { + private AtomicRateLimiterMetrics() { } @@ -228,5 +238,33 @@ private AtomicRateLimiterMetrics() { public int getNumberOfWaitingThreads() { return waitingThreads.get(); } + + /** + * @return estimated time duration in nanos to wait for the next permission + */ + public long getNanosToWait() { + State currentState = state.get(); + State estimatedState = calculateNextState(-1, currentState); + return estimatedState.nanosToWait; + } + + /** + * Estimates count of permissions available permissions. + * Can be negative if some permissions where reserved. + * + * @return estimated count of permissions + */ + public long getAvailablePermissions() { + State currentState = state.get(); + State estimatedState = calculateNextState(-1, currentState); + return estimatedState.activePermissions; + } + } + + /** + * Created only for test purposes. Simply calls {@link System#nanoTime()} + */ + private long currentNanoTime() { + return nanoTime(); } } diff --git a/src/main/java/javaslang/ratelimiter/internal/TimeBasedRateLimiter.java b/src/main/java/javaslang/ratelimiter/internal/TimeBasedRateLimiter.java deleted file mode 100644 index cca265114f..0000000000 --- a/src/main/java/javaslang/ratelimiter/internal/TimeBasedRateLimiter.java +++ /dev/null @@ -1,178 +0,0 @@ -package javaslang.ratelimiter.internal; - -import static java.lang.System.nanoTime; -import static java.lang.Thread.currentThread; -import static java.util.concurrent.locks.LockSupport.parkNanos; - -import javaslang.ratelimiter.RateLimiter; -import javaslang.ratelimiter.RateLimiterConfig; - -import java.time.Duration; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Supplier; - -/** - * @author bstorozhuk - */ -public class TimeBasedRateLimiter implements RateLimiter { - - private final String name; - private final RateLimiterConfig rateLimiterConfig; - private final long cyclePeriodInNanos; - private final int permissionsPerCycle; - private final ReentrantLock lock; - private final AtomicInteger waitingThreads; - private long activeCycle; - private volatile int activePermissions; - - public TimeBasedRateLimiter(String name, RateLimiterConfig rateLimiterConfig) { - this.name = name; - this.rateLimiterConfig = rateLimiterConfig; - - cyclePeriodInNanos = rateLimiterConfig.getLimitRefreshPeriod().toNanos(); - permissionsPerCycle = rateLimiterConfig.getLimitForPeriod(); - - activeCycle = nanoTime() / cyclePeriodInNanos; - waitingThreads = new AtomicInteger(0); - lock = new ReentrantLock(false); - - activePermissions = permissionsPerCycle; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean getPermission(final Duration timeoutDuration) { - Supplier permissionSupplier = () -> { - long currentNanos = nanoTime(); - long currentCycle = currentNanos / cyclePeriodInNanos; -// System.out.println(MessageFormat.format( -// "Thread {0}: START activeCycle={1}; permissions={2}; currentNanos={3}; currentCycle={4};", -// currentThread().getId(), activeCycle, activePermissions, currentNanos, currentCycle) -// ); - if (activeCycle != currentCycle) { - refreshLimiterState(currentCycle); - } - return acquirePermission(currentNanos, timeoutDuration); - }; - return executeConcurrently(permissionSupplier); - } - - private void refreshLimiterState(final long currentCycle) { - assert lock.isHeldByCurrentThread(); - activeCycle = currentCycle; - activePermissions = Integer.min(activePermissions + permissionsPerCycle, permissionsPerCycle); -// System.out.println(MessageFormat.format( -// "Thread {0}: AFTER REFRESH activeCycle={1}; permissions={2}; currentCycle={3};", -// currentThread().getId(), activeCycle, activePermissions, currentCycle) -// ); - } - - private boolean acquirePermission(final long currentNanos, final Duration timeoutDuration) { - assert lock.isHeldByCurrentThread(); - long currentCycle = currentNanos / cyclePeriodInNanos; - long timeoutInNanos = timeoutDuration.toNanos(); - long nanosToWait = nanosToWaitForPermission(currentNanos, currentCycle); - if (timeoutInNanos < nanosToWait) { - waitForPermission(timeoutInNanos); - return false; - } - activePermissions--; - if (nanosToWait <= 0) { -// System.out.println(MessageFormat.format( -// "Thread {0}: ACQUIRE IMMEDIATELY activeCycle={1}; permissions={2}; currentCycle={3}; nanosToWait={4};", -// currentThread().getId(), activeCycle, activePermissions, currentCycle, nanosToWait) -// ); - return true; - } - lock.unlock(); - return waitForPermission(nanosToWait); - } - - private long nanosToWaitForPermission(final long currentNanos, final long currentCycle) { - if (activePermissions > 0) { - return 0L; - } - long nextCycleTimeInNanos = (currentCycle + 1) * cyclePeriodInNanos; - long nanosToNextCycle = nextCycleTimeInNanos - currentNanos; - int fullCyclesToWait = (-activePermissions) / permissionsPerCycle; - return (fullCyclesToWait * cyclePeriodInNanos) + nanosToNextCycle; - } - - private boolean waitForPermission(final long nanosToWait) { -// System.out.println(MessageFormat.format( -// "Thread {0}: WAIT activeCycle={1}; permissions={2}; nanosToWait={3};", -// currentThread().getId(), activeCycle, activePermissions, nanosToWait) -// ); - waitingThreads.incrementAndGet(); - long deadline = nanoTime() + nanosToWait; - while (nanoTime() < deadline || currentThread().isInterrupted()) { - long sleepBlockDuration = deadline - nanoTime(); - parkNanos(sleepBlockDuration); - } - waitingThreads.decrementAndGet(); - return !currentThread().isInterrupted(); - } - - private T executeConcurrently(final Supplier permissionSupplier) { - lock.lock(); - try { - return permissionSupplier.get(); - } finally { - if (lock.isHeldByCurrentThread()) { - lock.unlock(); - } - } - } - - @Override - public String getName() { - return name; - } - - @Override - public RateLimiterConfig getRateLimiterConfig() { - return rateLimiterConfig; - } - - @Override - public Metrics getMetrics() { - return null; - } - - /** - * Enhanced {@link Metrics} with some implementation specific details - */ - public final class TimeBasedRateLimiterMetrics implements Metrics { - private TimeBasedRateLimiterMetrics() { - } - - /** - * {@inheritDoc} - * - * @return - */ - @Override - public int getNumberOfWaitingThreads() { - return waitingThreads.get(); - } - - /** - * Returns the estimated time in nanos to wait for permission. - *

- *

This method is typically used for debugging and testing purposes. - * - * @return the estimated time in nanos to wait for permission. - */ - public long nanosToWait() { - long currentNanos = nanoTime(); - long currentCycle = currentNanos / cyclePeriodInNanos; - if (currentCycle == activeCycle) { - return 0; - } - return nanosToWaitForPermission(currentNanos, currentCycle); - } - } -} diff --git a/src/test/java/javaslang/ratelimiter/RateLimiterConfigTest.java b/src/test/java/javaslang/ratelimiter/RateLimiterConfigTest.java index d0b0532855..b2e93a3d39 100644 --- a/src/test/java/javaslang/ratelimiter/RateLimiterConfigTest.java +++ b/src/test/java/javaslang/ratelimiter/RateLimiterConfigTest.java @@ -76,7 +76,7 @@ public void builderRefreshPeriodTooShort() throws Exception { exception.expectMessage("RefreshPeriod is too short"); RateLimiterConfig.builder() .timeoutDuration(TIMEOUT) - .limitRefreshPeriod(Duration.ofNanos(499L)) + .limitRefreshPeriod(Duration.ZERO) .limitForPeriod(LIMIT) .build(); } diff --git a/src/test/java/javaslang/ratelimiter/internal/AtomicRateLimiterTest.java b/src/test/java/javaslang/ratelimiter/internal/AtomicRateLimiterTest.java new file mode 100644 index 0000000000..55e9dc2779 --- /dev/null +++ b/src/test/java/javaslang/ratelimiter/internal/AtomicRateLimiterTest.java @@ -0,0 +1,233 @@ +package javaslang.ratelimiter.internal; + +import static com.jayway.awaitility.Awaitility.await; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.assertj.core.api.BDDAssertions.then; +import static org.hamcrest.CoreMatchers.equalTo; + +import com.jayway.awaitility.core.ConditionFactory; +import javaslang.ratelimiter.RateLimiterConfig; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(AtomicRateLimiter.class) +public class AtomicRateLimiterTest { + + public static final String LIMITER_NAME = "test"; + public static final long CYCLE_IN_NANOS = 500_000_000L; + public static final long POLL_INTERVAL_IN_NANOS = 2_000_000L; + private RateLimiterConfig rateLimiterConfig; + private AtomicRateLimiter rateLimiter; + private AtomicRateLimiter.AtomicRateLimiterMetrics metrics; + + private static ConditionFactory awaitImpatiently() { + return await() + .pollDelay(1, TimeUnit.MICROSECONDS) + .pollInterval(POLL_INTERVAL_IN_NANOS, TimeUnit.NANOSECONDS); + } + + private void setTimeOnNanos(long nanoTime) throws Exception { + PowerMockito.doReturn(nanoTime) + .when(rateLimiter, "currentNanoTime"); + } + + @Before + public void setup() { + rateLimiterConfig = RateLimiterConfig.builder() + .limitForPeriod(1) + .limitRefreshPeriod(Duration.ofNanos(CYCLE_IN_NANOS)) + .timeoutDuration(Duration.ZERO) + .build(); + AtomicRateLimiter testLimiter = new AtomicRateLimiter(LIMITER_NAME, rateLimiterConfig); + rateLimiter = PowerMockito.spy(testLimiter); + metrics = rateLimiter.getMetrics(); + } + + @Test + public void acquireAndRefresh() throws Exception { + setTimeOnNanos(CYCLE_IN_NANOS); + boolean permission = rateLimiter.getPermission(Duration.ZERO); + then(permission).isTrue(); + then(metrics.getAvailablePermissions()).isEqualTo(0); + then(metrics.getNanosToWait()).isEqualTo(CYCLE_IN_NANOS); + boolean secondPermission = rateLimiter.getPermission(Duration.ZERO); + then(secondPermission).isFalse(); + then(metrics.getAvailablePermissions()).isEqualTo(0); + then(metrics.getNanosToWait()).isEqualTo(CYCLE_IN_NANOS); + + setTimeOnNanos(CYCLE_IN_NANOS * 2); + boolean thirdPermission = rateLimiter.getPermission(Duration.ZERO); + then(thirdPermission).isTrue(); + then(metrics.getAvailablePermissions()).isEqualTo(0); + then(metrics.getNanosToWait()).isEqualTo(CYCLE_IN_NANOS); + boolean fourthPermission = rateLimiter.getPermission(Duration.ZERO); + then(fourthPermission).isFalse(); + then(metrics.getAvailablePermissions()).isEqualTo(0); + then(metrics.getNanosToWait()).isEqualTo(CYCLE_IN_NANOS); + } + + @Test + public void reserveAndRefresh() throws Exception { + setTimeOnNanos(CYCLE_IN_NANOS); + boolean permission = rateLimiter.getPermission(Duration.ZERO); + then(permission).isTrue(); + then(metrics.getAvailablePermissions()).isEqualTo(0); + then(metrics.getNanosToWait()).isEqualTo(CYCLE_IN_NANOS); + + AtomicReference reservedPermission = new AtomicReference<>(null); + Thread caller = new Thread( + () -> reservedPermission.set(rateLimiter.getPermission(Duration.ofNanos(CYCLE_IN_NANOS)))); + caller.setDaemon(true); + caller.start(); + awaitImpatiently() + .atMost(10, MILLISECONDS) + .until(caller::getState, equalTo(Thread.State.TIMED_WAITING)); + then(metrics.getAvailablePermissions()).isEqualTo(-1); + then(metrics.getNanosToWait()).isEqualTo(CYCLE_IN_NANOS + CYCLE_IN_NANOS); + then(metrics.getNumberOfWaitingThreads()).isEqualTo(1); + + setTimeOnNanos(CYCLE_IN_NANOS * 2 + 10); + awaitImpatiently() + .atMost(CYCLE_IN_NANOS, NANOSECONDS) + .until(reservedPermission::get, equalTo(true)); + + then(metrics.getAvailablePermissions()).isEqualTo(0); + then(metrics.getNanosToWait()).isEqualTo(CYCLE_IN_NANOS - 10); + then(metrics.getNumberOfWaitingThreads()).isEqualTo(0); + } + + @Test + public void reserveFewThenSkipCyclesBeforeRefresh() throws Exception { + setTimeOnNanos(CYCLE_IN_NANOS); + boolean permission = rateLimiter.getPermission(Duration.ZERO); + then(permission).isTrue(); + then(metrics.getAvailablePermissions()).isEqualTo(0); + then(metrics.getNanosToWait()).isEqualTo(CYCLE_IN_NANOS); + then(metrics.getNumberOfWaitingThreads()).isEqualTo(0); + + AtomicReference firstReservedPermission = new AtomicReference<>(null); + Thread firstCaller = new Thread( + () -> firstReservedPermission.set(rateLimiter.getPermission(Duration.ofNanos(CYCLE_IN_NANOS)))); + firstCaller.setDaemon(true); + firstCaller.start(); + awaitImpatiently() + .atMost(50, MILLISECONDS) + .until(firstCaller::getState, equalTo(Thread.State.TIMED_WAITING)); + then(metrics.getAvailablePermissions()).isEqualTo(-1); + then(metrics.getNanosToWait()).isEqualTo(CYCLE_IN_NANOS * 2); + then(metrics.getNumberOfWaitingThreads()).isEqualTo(1); + + + AtomicReference secondReservedPermission = new AtomicReference<>(null); + Thread secondCaller = new Thread( + () -> secondReservedPermission.set(rateLimiter.getPermission(Duration.ofNanos(CYCLE_IN_NANOS * 2)))); + secondCaller.setDaemon(true); + secondCaller.start(); + awaitImpatiently() + .atMost(50, MILLISECONDS) + .until(secondCaller::getState, equalTo(Thread.State.TIMED_WAITING)); + then(metrics.getAvailablePermissions()).isEqualTo(-2); + then(metrics.getNanosToWait()).isEqualTo(CYCLE_IN_NANOS * 3); + then(metrics.getNumberOfWaitingThreads()).isEqualTo(2); + + setTimeOnNanos(CYCLE_IN_NANOS * 6 + 10); + awaitImpatiently() + .atMost(CYCLE_IN_NANOS + POLL_INTERVAL_IN_NANOS, NANOSECONDS) + .until(firstReservedPermission::get, equalTo(true)); + awaitImpatiently() + .atMost(CYCLE_IN_NANOS * 2 + POLL_INTERVAL_IN_NANOS, NANOSECONDS) + .until(secondReservedPermission::get, equalTo(true)); + then(metrics.getAvailablePermissions()).isEqualTo(1L); + then(metrics.getNanosToWait()).isEqualTo(0L); + then(metrics.getNumberOfWaitingThreads()).isEqualTo(0); + } + + @Test + public void rejectedByTimeout() throws Exception { + setTimeOnNanos(CYCLE_IN_NANOS); + boolean permission = rateLimiter.getPermission(Duration.ZERO); + then(permission).isTrue(); + then(metrics.getAvailablePermissions()).isEqualTo(0L); + then(metrics.getNanosToWait()).isEqualTo(CYCLE_IN_NANOS); + then(metrics.getNumberOfWaitingThreads()).isEqualTo(0); + + AtomicReference declinedPermission = new AtomicReference<>(null); + Thread caller = new Thread( + () -> declinedPermission.set(rateLimiter.getPermission(Duration.ofNanos(CYCLE_IN_NANOS - 1)))); + caller.setDaemon(true); + caller.start(); + + awaitImpatiently() + .atMost(100, MILLISECONDS) + .until(caller::getState, equalTo(Thread.State.TIMED_WAITING)); + then(metrics.getAvailablePermissions()).isEqualTo(0L); + then(metrics.getNanosToWait()).isEqualTo(CYCLE_IN_NANOS); + then(metrics.getNumberOfWaitingThreads()).isEqualTo(1); + + setTimeOnNanos(CYCLE_IN_NANOS * 2 - 1); + awaitImpatiently() + .atMost(CYCLE_IN_NANOS + POLL_INTERVAL_IN_NANOS, NANOSECONDS) + .until(declinedPermission::get, equalTo(false)); + then(metrics.getAvailablePermissions()).isEqualTo(0L); + then(metrics.getNanosToWait()).isEqualTo(1L); + then(metrics.getNumberOfWaitingThreads()).isEqualTo(0); + } + + @Test + public void waitingThreadIsInterrupted() throws Exception { + setTimeOnNanos(CYCLE_IN_NANOS); + boolean permission = rateLimiter.getPermission(Duration.ZERO); + then(permission).isTrue(); + then(metrics.getAvailablePermissions()).isEqualTo(0L); + then(metrics.getNanosToWait()).isEqualTo(CYCLE_IN_NANOS); + then(metrics.getNumberOfWaitingThreads()).isEqualTo(0); + + AtomicReference declinedPermission = new AtomicReference<>(null); + AtomicBoolean wasInterrupted = new AtomicBoolean(false); + Thread caller = new Thread( + () -> { + declinedPermission.set(rateLimiter.getPermission(Duration.ofNanos(CYCLE_IN_NANOS - 1))); + wasInterrupted.set(Thread.currentThread().isInterrupted()); + } + ); + caller.isDaemon(); + caller.start(); + + awaitImpatiently() + .atMost(100, MILLISECONDS) + .until(caller::getState, equalTo(Thread.State.TIMED_WAITING)); + then(metrics.getAvailablePermissions()).isEqualTo(0L); + then(metrics.getNanosToWait()).isEqualTo(CYCLE_IN_NANOS); + then(metrics.getNumberOfWaitingThreads()).isEqualTo(1); + + caller.interrupt(); + awaitImpatiently() + .atMost(CYCLE_IN_NANOS + POLL_INTERVAL_IN_NANOS, NANOSECONDS) + .until(declinedPermission::get, equalTo(false)); + then(wasInterrupted.get()).isTrue(); + then(metrics.getAvailablePermissions()).isEqualTo(0L); + then(metrics.getNanosToWait()).isEqualTo(CYCLE_IN_NANOS); + then(metrics.getNumberOfWaitingThreads()).isEqualTo(0); + } + + @Test + public void namePropagation() { + then(rateLimiter.getName()).isEqualTo(LIMITER_NAME); + } + + @Test + public void configPropagation() { + then(rateLimiter.getRateLimiterConfig()).isEqualTo(rateLimiterConfig); + } +} \ No newline at end of file diff --git a/src/test/java/javaslang/ratelimiter/internal/InMemoryRateLimiterRegistryTest.java b/src/test/java/javaslang/ratelimiter/internal/InMemoryRateLimiterRegistryTest.java index ce0aa4621f..3df57b5799 100644 --- a/src/test/java/javaslang/ratelimiter/internal/InMemoryRateLimiterRegistryTest.java +++ b/src/test/java/javaslang/ratelimiter/internal/InMemoryRateLimiterRegistryTest.java @@ -17,8 +17,6 @@ import java.time.Duration; import java.util.function.Supplier; - -@SuppressWarnings("unchecked") public class InMemoryRateLimiterRegistryTest { private static final int LIMIT = 50; @@ -51,6 +49,7 @@ public void rateLimiterPositive() throws Exception { } @Test + @SuppressWarnings("unchecked") public void rateLimiterPositiveWithSupplier() throws Exception { RateLimiterRegistry registry = new InMemoryRateLimiterRegistry(config); Supplier rateLimiterConfigSupplier = mock(Supplier.class); diff --git a/src/test/java/javaslang/ratelimiter/internal/TimeBasedRateLimiterTest.java b/src/test/java/javaslang/ratelimiter/internal/TimeBasedRateLimiterTest.java deleted file mode 100644 index f45a2b5dbb..0000000000 --- a/src/test/java/javaslang/ratelimiter/internal/TimeBasedRateLimiterTest.java +++ /dev/null @@ -1,53 +0,0 @@ -package javaslang.ratelimiter.internal; - -import javaslang.ratelimiter.RateLimiter; -import javaslang.ratelimiter.RateLimiterConfig; -import org.junit.Test; - -import java.time.Duration; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -/** - * @author bstorozhuk - */ -public class TimeBasedRateLimiterTest { - - public static final int N_THREADS = 4; - public static final AtomicLong counter = new AtomicLong(0); - public static final AtomicBoolean required = new AtomicBoolean(false); - - @Test - public void test() throws Exception { - RateLimiterConfig config = RateLimiterConfig.builder() - .limitForPeriod(10) - .limitRefreshPeriod(Duration.ofMillis(500)) - .timeoutDuration(Duration.ZERO) - .build(); - RateLimiter limiter = new AtomicRateLimiter("test", config); - - Runnable guarded = () -> { - if (limiter.getPermission(Duration.ofSeconds(10))) { - counter.incrementAndGet(); - } - }; - - ExecutorService pool = Executors.newFixedThreadPool(N_THREADS); - for (int i = 0; i < N_THREADS; i++) { - pool.execute(() -> { - while (true) { - if (required.get()) { - guarded.run(); - } - } - }); - } - required.set(true); - Thread.sleep(2200); - required.set(false); - System.out.println("COUNTER: " + counter); - pool.shutdownNow(); - } -} \ No newline at end of file