Skip to content

Commit

Permalink
Issue ReactiveX#12 AtomicRateLimiter tests and additional documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
storozhukBM committed Dec 2, 2016
1 parent 15bcef0 commit e9c67b6
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 300 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ dependencies {
testCompile "ch.qos.logback:logback-classic:0.9.26"
testCompile "io.dropwizard.metrics:metrics-healthchecks:3.1.2"
testCompile "org.mockito:mockito-core:1.10.19"
testCompile "org.powermock:powermock:1.6.6"
testCompile "org.powermock:powermock-api-mockito:1.6.6"
testCompile "org.powermock:powermock-module-junit4:1.6.6"
testCompile "io.projectreactor:reactor-core:2.5.0.M2"
testCompile "com.jayway.awaitility:awaitility:1.7.0"

Expand Down

This file was deleted.

14 changes: 0 additions & 14 deletions src/jmh/java/javaslang/circuitbreaker/RateLimiterBenchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import javaslang.ratelimiter.RateLimiterConfig;
import javaslang.ratelimiter.internal.AtomicRateLimiter;
import javaslang.ratelimiter.internal.SemaphoreBasedRateLimiter;
import javaslang.ratelimiter.internal.TimeBasedRateLimiter;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand Down Expand Up @@ -33,11 +32,9 @@ public class RateLimiterBenchmark {
private static final int THREAD_COUNT = 2;

private RateLimiter semaphoreBasedRateLimiter;
private RateLimiter timeBasedRateLimiter;
private RateLimiter atomicRateLimiter;

private Supplier<String> semaphoreGuardedSupplier;
private Supplier<String> timeGuardedSupplier;
private Supplier<String> atomicGuardedSupplier;

@Setup
Expand All @@ -48,15 +45,13 @@ public void setUp() {
.timeoutDuration(Duration.ofSeconds(5))
.build();
semaphoreBasedRateLimiter = new SemaphoreBasedRateLimiter("semaphoreBased", rateLimiterConfig);
timeBasedRateLimiter = new TimeBasedRateLimiter("timeBased", rateLimiterConfig);
atomicRateLimiter = new AtomicRateLimiter("atomicBased", rateLimiterConfig);

Supplier<String> stringSupplier = () -> {
Blackhole.consumeCPU(1);
return "Hello Benchmark";
};
semaphoreGuardedSupplier = RateLimiter.decorateSupplier(semaphoreBasedRateLimiter, stringSupplier);
timeGuardedSupplier = RateLimiter.decorateSupplier(timeBasedRateLimiter, stringSupplier);
atomicGuardedSupplier = RateLimiter.decorateSupplier(atomicRateLimiter, stringSupplier);
}

Expand All @@ -69,15 +64,6 @@ public String semaphoreBasedPermission() {
return semaphoreGuardedSupplier.get();
}

@Benchmark
@Threads(value = THREAD_COUNT)
@Warmup(iterations = WARMUP_COUNT)
@Fork(value = FORK_COUNT)
@Measurement(iterations = ITERATION_COUNT)
public String timeBasedPermission() {
return timeGuardedSupplier.get();
}

@Benchmark
@Threads(value = THREAD_COUNT)
@Warmup(iterations = WARMUP_COUNT)
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/javaslang/ratelimiter/RateLimiterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
public class RateLimiterConfig {
private static final String TIMEOUT_DURATION_MUST_NOT_BE_NULL = "TimeoutDuration must not be null";
private static final String LIMIT_REFRESH_PERIOD_MUST_NOT_BE_NULL = "LimitRefreshPeriod must not be null";

private static final Duration ACCEPTABLE_REFRESH_PERIOD = Duration.ofNanos(1L); // TODO: use jmh to find real one
private static final Duration ACCEPTABLE_REFRESH_PERIOD = Duration.ofNanos(1L);

private final Duration timeoutDuration;
private final Duration limitRefreshPeriod;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ public AtomicRateLimiter(String name, RateLimiterConfig rateLimiterConfig) {
permissionsPerCycle = rateLimiterConfig.getLimitForPeriod();

waitingThreads = new AtomicInteger(0);
long activeCycle = nanoTime() / cyclePeriodInNanos;
int activePermissions = permissionsPerCycle;
state = new AtomicReference<>(new State(activeCycle, activePermissions, 0));
state = new AtomicReference<>(new State(0, 0, 0));
}

/**
Expand All @@ -69,7 +67,7 @@ public boolean getPermission(final Duration timeoutDuration) {
* @return next {@link State}
*/
private State calculateNextState(final long timeoutInNanos, final State activeState) {
long currentNanos = nanoTime();
long currentNanos = currentNanoTime();
long currentCycle = currentNanos / cyclePeriodInNanos;

long nextCycle = activeState.activeCycle;
Expand All @@ -85,6 +83,7 @@ private State calculateNextState(final long timeoutInNanos, final State activeSt
return nextState;
}


/**
* Calculates time to wait for next permission as
* [time to the next cycle] + [duration of full cycles until reserved permissions expire]
Expand Down Expand Up @@ -147,19 +146,27 @@ private boolean waitForPermissionIfNecessary(final long timeoutInNanos, final lo

/**
* Parks {@link Thread} for nanosToWait.
* <p>If the current thread is {@linkplain Thread#interrupted}
* while waiting for a permit then it won't throw {@linkplain InterruptedException},
* but its interrupt status will be set.
*
* @param nanosToWait nanoseconds caller need to wait
* @return true if caller was not {@link Thread#interrupted} while waiting
*/
private boolean waitForPermission(final long nanosToWait) {
waitingThreads.incrementAndGet();
long deadline = nanoTime() + nanosToWait;
while (nanoTime() < deadline || currentThread().isInterrupted()) {
long sleepBlockDuration = deadline - nanoTime();
long deadline = currentNanoTime() + nanosToWait;
boolean wasInterrupted = false;
while (currentNanoTime() < deadline && !wasInterrupted) {
long sleepBlockDuration = deadline - currentNanoTime();
parkNanos(sleepBlockDuration);
wasInterrupted = Thread.interrupted();
}
waitingThreads.decrementAndGet();
return !currentThread().isInterrupted();
if (wasInterrupted) {
currentThread().interrupt();
}
return !wasInterrupted;
}

/**
Expand All @@ -182,7 +189,7 @@ public RateLimiterConfig getRateLimiterConfig() {
* {@inheritDoc}
*/
@Override
public Metrics getMetrics() {
public AtomicRateLimiterMetrics getMetrics() {
return new AtomicRateLimiterMetrics();
}

Expand All @@ -201,6 +208,7 @@ public Metrics getMetrics() {
* </ul>
*/
private static class State {

private final long activeCycle;
private final int activePermissions;
private final long nanosToWait;
Expand All @@ -210,12 +218,14 @@ public State(final long activeCycle, final int activePermissions, final long nan
this.activePermissions = activePermissions;
this.nanosToWait = nanosToWait;
}

}

/**
* Enhanced {@link Metrics} with some implementation specific details
*/
public final class AtomicRateLimiterMetrics implements Metrics {

private AtomicRateLimiterMetrics() {
}

Expand All @@ -228,5 +238,33 @@ private AtomicRateLimiterMetrics() {
public int getNumberOfWaitingThreads() {
return waitingThreads.get();
}

/**
* @return estimated time duration in nanos to wait for the next permission
*/
public long getNanosToWait() {
State currentState = state.get();
State estimatedState = calculateNextState(-1, currentState);
return estimatedState.nanosToWait;
}

/**
* Estimates count of permissions available permissions.
* Can be negative if some permissions where reserved.
*
* @return estimated count of permissions
*/
public long getAvailablePermissions() {
State currentState = state.get();
State estimatedState = calculateNextState(-1, currentState);
return estimatedState.activePermissions;
}
}

/**
* Created only for test purposes. Simply calls {@link System#nanoTime()}
*/
private long currentNanoTime() {
return nanoTime();
}
}
Loading

0 comments on commit e9c67b6

Please sign in to comment.