Skip to content

Commit

Permalink
Refactoring rate limiter code to make rate limiting more consistent w…
Browse files Browse the repository at this point in the history
…hen multiple rate limiters are applied to a single log statement.

This rewrites the two currently support rate limiters and will cause a change in behaviour if both are used together in a single log statement, such as:

logger.atWarning().every(N).atMostEvery(T, TimeUnit).log(...);

The JavaDoc for the rate limiting methods has been updated to reflect the new behaviour.

RELNOTES=Improving rate limiter behaviour.
PiperOrigin-RevId: 513565040
  • Loading branch information
hagbard authored and Flogger Team committed Mar 2, 2023
1 parent 5036701 commit 90f01f1
Show file tree
Hide file tree
Showing 10 changed files with 835 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,22 @@
package com.google.common.flogger;

import static com.google.common.flogger.LogContext.Key.LOG_EVERY_N;
import static com.google.common.flogger.RateLimitStatus.DISALLOW;

import com.google.common.flogger.backend.Metadata;
import java.util.concurrent.atomic.AtomicLong;
import org.checkerframework.checker.nullness.compatqual.NullableDecl;

/**
* Rate limiter to support {@code every(N)} functionality. This class is mutable, but thread safe.
* Rate limiter to support {@code every(N)} functionality.
*
* <p>Instances of this class are created for each unique {@link LogSiteKey} for which rate limiting
* via the {@code LOG_EVERY_N} metadata key is required. This class implements {@code
* RateLimitStatus} as a mechanism for resetting the rate limiter state.
*
* <p>Instances of this class are thread safe.
*/
final class CountingRateLimiter {
final class CountingRateLimiter extends RateLimitStatus {
private static final LogSiteMap<CountingRateLimiter> map =
new LogSiteMap<CountingRateLimiter>() {
@Override
Expand All @@ -33,25 +41,46 @@ protected CountingRateLimiter initialValue() {
}
};

static boolean shouldLog(Metadata metadata, LogSiteKey logSiteKey) {
// Fast path is "there's no metadata so return true" and this must not allocate.
/**
* Returns the status of the rate limiter, or {@code null} if the {@code LOG_EVERY_N} metadata was
* not present.
*
* <p>The rate limiter status is {@code DISALLOW} until the log count exceeds the specified limit,
* and then the limiter switches to its pending state and returns an allow status until it is
* reset.
*/
@NullableDecl
static RateLimitStatus check(Metadata metadata, LogSiteKey logSiteKey) {
Integer rateLimitCount = metadata.findValue(LOG_EVERY_N);
if (rateLimitCount == null) {
return true;
// Without rate limiter specific metadata, this limiter has no effect.
return null;
}
return map.get(logSiteKey, metadata).incrementAndCheckInvocationCount(rateLimitCount);
return map.get(logSiteKey, metadata).incrementAndCheckLogCount(rateLimitCount);
}

private final AtomicLong invocationCount = new AtomicLong();
// By setting the initial value as Integer#MAX_VALUE we ensure that the first time rate limiting
// is checked, the rate limit count (which is only an Integer) must be reached, placing the
// limiter into its pending state immediately. If this is the only limiter used, this corresponds
// to the first log statement always being emitted.
private final AtomicLong invocationCount = new AtomicLong(Integer.MAX_VALUE);

// Visible for testing.
CountingRateLimiter() {}

/**
* Increments the invocation count and returns true if it was a multiple of the specified rate
* limit count; implying that the log statement should be emitted. This is invoked during
* post-processing if a rate limiting count was set via {@link LoggingApi#every(int)}.
* Increments the invocation count and returns true if it reached the specified rate limit count.
* This is invoked during post-processing if a rate limiting count was set via {@link
* LoggingApi#every(int)}.
*/
// Visible for testing.
boolean incrementAndCheckInvocationCount(int rateLimitCount) {
// Assume overflow cannot happen for a Long counter.
return (invocationCount.getAndIncrement() % rateLimitCount) == 0;
RateLimitStatus incrementAndCheckLogCount(int rateLimitCount) {
return invocationCount.incrementAndGet() >= rateLimitCount ? this : DISALLOW;
}

// Reset function called to move the limiter out of the "pending" state after a log occurs.
@Override
public void reset() {
invocationCount.set(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,27 @@
package com.google.common.flogger;

import static com.google.common.flogger.LogContext.Key.LOG_AT_MOST_EVERY;
import static com.google.common.flogger.RateLimitStatus.DISALLOW;
import static com.google.common.flogger.util.Checks.checkArgument;
import static com.google.common.flogger.util.Checks.checkNotNull;
import static java.lang.Math.max;

import com.google.common.flogger.backend.LogData;
import com.google.common.flogger.backend.Metadata;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.checkerframework.checker.nullness.compatqual.NullableDecl;

/**
* Rate limiter to support {@code atMostEvery(N, units)} functionality. This class is mutable, but
* thread safe.
* Rate limiter to support {@code atMostEvery(N, units)} functionality.
*
* <p>Instances of this class are created for each unique {@link LogSiteKey} for which rate limiting
* via the {@code LOG_AT_MOST_EVERY} metadata key is required. This class implements {@code
* RateLimitStatus} as a mechanism for resetting the rate limiter state.
*
* <p>Instances of this class are thread safe.
*/
final class DurationRateLimiter {
final class DurationRateLimiter extends RateLimitStatus {
private static final LogSiteMap<DurationRateLimiter> map =
new LogSiteMap<DurationRateLimiter>() {
@Override
Expand All @@ -38,21 +46,25 @@ protected DurationRateLimiter initialValue() {
}
};

/** Creates a period for rate limiting for the specified duration. */
/**
* Creates a period for rate limiting for the specified duration. This is invoked by the {@link
* LogContext#atMostEvery(int, TimeUnit)} method to create a metadata value.
*/
static RateLimitPeriod newRateLimitPeriod(int n, TimeUnit unit) {
// We could cache commonly used values here if we wanted.
return new RateLimitPeriod(n, unit);
}

/**
* Returns whether the log site should log based on the value of the {@code LOG_AT_MOST_EVERY}
* metadata value and the current log site timestamp.
*/
static boolean shouldLogForTimestamp(
Metadata metadata, LogSiteKey logSiteKey, long timestampNanos) {
// Fast path is "there's no metadata so return true" and this must not allocate.
@NullableDecl
static RateLimitStatus check(Metadata metadata, LogSiteKey logSiteKey, long timestampNanos) {
RateLimitPeriod rateLimitPeriod = metadata.findValue(LOG_AT_MOST_EVERY);
if (rateLimitPeriod == null) {
return true;
// Without rate limiter specific metadata, this limiter has no effect.
return null;
}
return map.get(logSiteKey, metadata).checkLastTimestamp(timestampNanos, rateLimitPeriod);
}
Expand All @@ -65,8 +77,6 @@ static boolean shouldLogForTimestamp(
static final class RateLimitPeriod {
private final int n;
private final TimeUnit unit;
// Count of the number of log statements skipped in the last period. Set during post processing.
private int skipCount = -1;

private RateLimitPeriod(int n, TimeUnit unit) {
// This code will work with a zero length time period, but it's nonsensical to try.
Expand All @@ -85,23 +95,14 @@ private long toNanos() {
return unit.toNanos(n);
}

private void setSkipCount(int skipCount) {
this.skipCount = skipCount;
}

@Override
public String toString() {
// TODO: Make this less ugly and internationalization friendly.
StringBuilder out = new StringBuilder().append(n).append(' ').append(unit);
if (skipCount > 0) {
out.append(" [skipped: ").append(skipCount).append(']');
}
return out.toString();
return n + " " + unit;
}

@Override
public int hashCode() {
// Rough and ready. We don't expected this be be needed much at all.
// Rough and ready. We don't expect this be be needed much at all.
return (n * 37) ^ unit.hashCode();
}

Expand All @@ -115,30 +116,46 @@ public boolean equals(Object obj) {
}
}

private final AtomicLong lastTimestampNanos = new AtomicLong();
private final AtomicInteger skippedLogStatements = new AtomicInteger();
private final AtomicLong lastTimestampNanos = new AtomicLong(-1L);

// Visible for testing.
DurationRateLimiter() {}

/**
* Checks whether the current time stamp is after the rate limiting period and if so, updates the
* time stamp and returns true. This is invoked during post-processing if a rate limiting duration
* was set via {@link LoggingApi#atMostEvery(int, TimeUnit)}.
*/
// Visible for testing.
boolean checkLastTimestamp(long timestampNanos, RateLimitPeriod period) {
RateLimitStatus checkLastTimestamp(long timestampNanos, RateLimitPeriod period) {
checkArgument(timestampNanos >= 0, "timestamp cannot be negative");
// If this is negative, we are in the pending state and will return "allow" until we are reset.
// The value held here is updated to be the most recent negated timestamp, and is negated again
// (making it positive and setting us into the rate limiting state) when we are reset.
long lastNanos = lastTimestampNanos.get();
// Avoid a race condition where two threads log at the same time. This is safe as lastNanos
// can never be equal to timestampNanos (because the period is never zero), so if multiple
// threads read the same value for lastNanos, only one thread can succeed in setting a new
// value. For ludicrous durations which overflow the deadline we ensure it never triggers.
long deadlineNanos = lastNanos + period.toNanos();
if ((deadlineNanos >= 0)
&& (timestampNanos >= deadlineNanos || lastNanos == 0)
&& lastTimestampNanos.compareAndSet(lastNanos, timestampNanos)) {
period.setSkipCount(skippedLogStatements.getAndSet(0));
return true;
} else {
skippedLogStatements.incrementAndGet();
return false;
if (lastNanos >= 0) {
long deadlineNanos = lastNanos + period.toNanos();
// Check for negative deadline to avoid overflow for ridiculous durations. Assume overflow
// always means "no logging".
if (deadlineNanos < 0 || timestampNanos < deadlineNanos) {
return DISALLOW;
}
}
// When logging is triggered, negate the timestamp to move us into the "pending" state and
// return our reset status.
// We don't want to race with the reset function (which may have already set a new timestamp).
lastTimestampNanos.compareAndSet(lastNanos, -timestampNanos);
return this;
}

// Reset function called to move the limiter out of the "pending" state. We do this by negating
// the timestamp (which was already negated when we entered the pending state, so we restore it
// to a positive value which moves us back into the "limiting" state).
@Override
public void reset() {
// Only one thread at a time can reset a rate limiter, so this can be unconditional. We should
// only be able to get here if the timestamp was set to a negative value above. However use
// max() to make sure we always move out of the pending state.
lastTimestampNanos.set(max(-lastTimestampNanos.get(), 0));
}
}
Loading

0 comments on commit 90f01f1

Please sign in to comment.