From da5fb621a7e3974e4792272d2bf44155b24740fd Mon Sep 17 00:00:00 2001 From: Weihan Kong Date: Wed, 22 May 2024 16:47:05 -0400 Subject: [PATCH] Rate limiting should be ineffective when RateLimitInfo is not present Built a new ConditionalRateLimiter that can be disabled but keep the rate. Also moved setRate() into the ConditionalRateLimiter. Update and disable is throttled based on period from RateLimitInfo. By default the rate limiter is disabled when initiated. Enable, disable and update rate will be logged as INFO, as they're critical to the run of the worker and only logged per 10s by default for each worker thread. --- .../RateLimitingServerStreamingCallable.java | 202 ++++++++++++++---- .../v2/stub/RateLimitingCallableTest.java | 192 +++++++++++++++-- 2 files changed, 342 insertions(+), 52 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java index 97cc2f73ec..141c5916f0 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java @@ -31,8 +31,8 @@ import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nonnull; import org.threeten.bp.Duration; @@ -40,6 +40,7 @@ class RateLimitingServerStreamingCallable extends ServerStreamingCallable { + private static final Logger logger = Logger.getLogger(RateLimitingServerStreamingCallable.class.getName()); @@ -64,16 +65,14 @@ class RateLimitingServerStreamingCallable // as the server side cap private static final double MAX_FACTOR = 1.3; - private final RateLimiter limiter; + private final ConditionalRateLimiter limiter; - private final AtomicReference lastQpsChangeTime = new AtomicReference<>(Instant.now()); private final ServerStreamingCallable innerCallable; RateLimitingServerStreamingCallable( @Nonnull ServerStreamingCallable innerCallable) { - this.limiter = RateLimiter.create(DEFAULT_QPS); + this.limiter = new ConditionalRateLimiter(DEFAULT_QPS); this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set"); - logger.info("Rate limiting is enabled with initial QPS of " + limiter.getRate()); } @Override @@ -88,25 +87,126 @@ public void call( ((BigtableTracer) context.getTracer()) .batchRequestThrottled(stopwatch.elapsed(TimeUnit.NANOSECONDS)); } - RateLimitingResponseObserver innerObserver = - new RateLimitingResponseObserver(limiter, lastQpsChangeTime, responseObserver); + RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver(responseObserver); innerCallable.call(request, innerObserver, context); } + /** A rate limiter wrapper class that can be disabled. */ + static class ConditionalRateLimiter { + + private final AtomicBoolean enabled = new AtomicBoolean(false); + + private final RateLimiter limiter; + + // This is the next time allowed to change QPS or disable rate limiting. + private final AtomicReference nextRateUpdateTime = + new AtomicReference<>(Instant.now()); + + public ConditionalRateLimiter(long defaultQps) { + limiter = RateLimiter.create(defaultQps); + logger.info("Rate limiting is initiated (but disabled) with rate of " + defaultQps + " QPS."); + } + + /** + * Works the same way with {@link RateLimiter#acquire()} except that when the rate limiter is + * disabled, {@link ConditionalRateLimiter#acquire()} always returns immediately. + */ + public void acquire() { + if (enabled.get()) { + limiter.acquire(); + } + } + + // Enable rate limiting immediately or disable after the QPS update period. Otherwise, no-op. + + /** + * Disables the rate limier if the current time exceeded the next rate update time. When + * disabled, the rate is retained and will be re-used if re-enabled later. + */ + public void tryDisable() { + // Only disable after the rate update time. + Instant nextTime = nextRateUpdateTime.get(); + Instant now = Instant.now(); + if (now.isAfter(nextTime)) { + boolean wasEnabled = this.enabled.getAndSet(false); + if (wasEnabled) { + logger.info("Rate limiter is disabled."); + } + // No need to update nextRateUpdateTime, any new RateLimitInfo can enable rate limiting and + // update the rate again. + } + } + + /** Enables the rate limiter immediately. */ + public void enable() { + boolean wasEnabled = this.enabled.getAndSet(true); + if (!wasEnabled) { + logger.info("Rate limiter is enabled."); + } + } + + public boolean isEnabled() { + return this.enabled.get(); + } + + public double getRate() { + return limiter.getRate(); + } + + // Update the rate after the QPS update period. Otherwise, no-op. + + /** + * Sets the rate and the next rate update time based on period, if the current time exceeds the + * next rate update time. Otherwise, no-op. + * + * @param rate The new rate of the rate limiter. + * @param period The period during which rate should not be updated again and the rate limiter + * should not be disabled. + */ + public void trySetRate(double rate, Duration period) { + Instant nextTime = nextRateUpdateTime.get(); + Instant now = Instant.now(); + + if (now.isBefore(nextTime)) { + return; + } + + Instant newNextTime = now.plusSeconds(period.getSeconds()); + + if (!nextRateUpdateTime.compareAndSet(nextTime, newNextTime)) { + // Someone else updated it already. + return; + } + final double oldRate = limiter.getRate(); + limiter.setRate(rate); + logger.info( + "Updated max rate from " + + oldRate + + " to " + + rate + + " with period " + + period.getSeconds() + + " seconds."); + } + + @VisibleForTesting + void setEnabled(boolean enabled) { + this.enabled.set(enabled); + } + + @VisibleForTesting + void setRate(double rate) { + limiter.setRate(rate); + } + } + class RateLimitingResponseObserver extends SafeResponseObserver { - private final ResponseObserver outerObserver; - private final RateLimiter rateLimiter; - private final AtomicReference lastQpsChangeTime; + private final ResponseObserver outerObserver; - RateLimitingResponseObserver( - RateLimiter rateLimiter, - AtomicReference lastQpsChangeTime, - ResponseObserver observer) { + RateLimitingResponseObserver(ResponseObserver observer) { super(observer); this.outerObserver = observer; - this.rateLimiter = rateLimiter; - this.lastQpsChangeTime = lastQpsChangeTime; } @Override @@ -114,18 +214,35 @@ protected void onStartImpl(StreamController controller) { outerObserver.onStart(controller); } + private boolean hasValidRateLimitInfo(MutateRowsResponse response) { + // RateLimitInfo is an optional field. However, proto3 sub-message field always + // have presence even thought it's marked as "optional". Check the factor and + // period to make sure they're not 0. + if (!response.hasRateLimitInfo()) { + logger.finest("Response carries no RateLimitInfo"); + return false; + } + + if (response.getRateLimitInfo().getFactor() <= 0 + || response.getRateLimitInfo().getPeriod().getSeconds() <= 0) { + logger.finest("Response carries invalid RateLimitInfo=" + response.getRateLimitInfo()); + return false; + } + + logger.finest("Response carries valid RateLimitInfo=" + response.getRateLimitInfo()); + return true; + } + @Override protected void onResponseImpl(MutateRowsResponse response) { - if (response.hasRateLimitInfo()) { + if (hasValidRateLimitInfo(response)) { + limiter.enable(); RateLimitInfo info = response.getRateLimitInfo(); - // RateLimitInfo is an optional field. However, proto3 sub-message field always - // have presence even thought it's marked as "optional". Check the factor and - // period to make sure they're not 0. - if (info.getFactor() != 0 && info.getPeriod().getSeconds() != 0) { - updateQps( - info.getFactor(), - Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod()))); - } + updateQps( + info.getFactor(), + Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod()))); + } else { + limiter.tryDisable(); } outerObserver.onResponse(response); } @@ -148,28 +265,35 @@ protected void onCompleteImpl() { } private void updateQps(double factor, Duration period) { - Instant lastTime = lastQpsChangeTime.get(); - Instant now = Instant.now(); - - if (now.minus(period).isAfter(lastTime) && lastQpsChangeTime.compareAndSet(lastTime, now)) { - double cappedFactor = Math.min(Math.max(factor, MIN_FACTOR), MAX_FACTOR); - double currentRate = limiter.getRate(); - limiter.setRate(Math.min(Math.max(currentRate * cappedFactor, MIN_QPS), MAX_QPS)); - logger.log( - Level.FINE, - "Updated QPS from {0} to {1}, server returned factor is {2}, capped factor is {3}", - new Object[] {currentRate, limiter.getRate(), factor, cappedFactor}); - } + double cappedFactor = Math.min(Math.max(factor, MIN_FACTOR), MAX_FACTOR); + double currentRate = limiter.getRate(); + double cappedRate = Math.min(Math.max(currentRate * cappedFactor, MIN_QPS), MAX_QPS); + limiter.trySetRate(cappedRate, period); } } @VisibleForTesting - AtomicReference getLastQpsChangeTime() { - return lastQpsChangeTime; + AtomicReference getNextRateUpdateTime() { + return limiter.nextRateUpdateTime; } @VisibleForTesting double getCurrentRate() { return limiter.getRate(); } + + @VisibleForTesting + void setRate(double rate) { + limiter.setRate(rate); + } + + @VisibleForTesting + boolean getLimiterEnabled() { + return limiter.isEnabled(); + } + + @VisibleForTesting + void setLimiterEnabled(boolean enabled) { + limiter.setEnabled(enabled); + } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingCallableTest.java index 92b93cfafe..f2fe77725d 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingCallableTest.java @@ -17,6 +17,8 @@ package com.google.cloud.bigtable.data.v2.stub; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.rpc.ApiCallContext; @@ -59,21 +61,90 @@ public void setup() throws Exception { } @Test - public void testWithRateLimitInfo() throws Exception { + public void testDefaultSettingOnInitiate() throws Exception { callableToTest.call(request, responseObserver, context); + assertFalse(callableToTest.getLimiterEnabled()); + assertThat(callableToTest.getCurrentRate()).isEqualTo(10); + } + + @Test + public void testUpdateRate() throws Exception { + callableToTest.call(request, responseObserver, context); + callableToTest.setLimiterEnabled(true); Instant earlier = Instant.now().minus(org.threeten.bp.Duration.ofHours(1)); - // make sure QPS will be updated - callableToTest.getLastQpsChangeTime().set(earlier); + // Make sure rate will be updated. + callableToTest.getNextRateUpdateTime().set(earlier); double oldQps = callableToTest.getCurrentRate(); double factor = 0.8; + int periodSeconds = 10; + + RateLimitInfo info = + RateLimitInfo.newBuilder() + .setFactor(factor) + .setPeriod(Duration.newBuilder().setSeconds(periodSeconds).build()) + .build(); + + MutateRowsResponse response = MutateRowsResponse.newBuilder().setRateLimitInfo(info).build(); + + innerCallable.getObserver().onResponse(response); + + // Give the thread some time to update the rate. + Thread.sleep(100); + double newQps = callableToTest.getCurrentRate(); + + assertThat(newQps).isWithin(0.01).of(oldQps * factor); + + innerCallable.getObserver().onComplete(); + } + + @Test + public void testNoRateLimitInfoDoesNotUpdateRate() throws Exception { + callableToTest.call(request, responseObserver, context); + callableToTest.setLimiterEnabled(true); + + Instant earlier = Instant.now().minus(org.threeten.bp.Duration.ofHours(1)); + + // Make sure rate will be updated. + callableToTest.getNextRateUpdateTime().set(earlier); + double oldQps = callableToTest.getCurrentRate(); + + // A response without RateLimitInfo. + MutateRowsResponse response = MutateRowsResponse.newBuilder().build(); + + innerCallable.getObserver().onResponse(response); + + // Give the thread some time to update the rate. + Thread.sleep(100); + double newQps = callableToTest.getCurrentRate(); + + assertThat(newQps).isEqualTo(oldQps); // No change + assertFalse(callableToTest.getLimiterEnabled()); // Rate limiter is also disabled. + + innerCallable.getObserver().onComplete(); + } + + @Test + public void testInvalidRateLimitInfoDoesNotUpdateRate() throws Exception { + callableToTest.call(request, responseObserver, context); + callableToTest.setLimiterEnabled(true); + + Instant earlier = Instant.now().minus(org.threeten.bp.Duration.ofHours(1)); + + // make sure QPS will be updated + callableToTest.getNextRateUpdateTime().set(earlier); + double oldQps = callableToTest.getCurrentRate(); + + // A response with invalid RateLimitInfo. + double factor = 0; // Invalid factor + int periodSeconds = 10; RateLimitInfo info = RateLimitInfo.newBuilder() .setFactor(factor) - .setPeriod(Duration.newBuilder().setSeconds(10).build()) + .setPeriod(Duration.newBuilder().setSeconds(periodSeconds).build()) .build(); MutateRowsResponse response = MutateRowsResponse.newBuilder().setRateLimitInfo(info).build(); @@ -84,37 +155,132 @@ public void testWithRateLimitInfo() throws Exception { Thread.sleep(100); double newQps = callableToTest.getCurrentRate(); - assertThat(newQps).isWithin(0.1).of(oldQps * factor); + assertThat(newQps).isEqualTo(oldQps); // No change + assertFalse(callableToTest.getLimiterEnabled()); // Rate limiter is also disabled. + + innerCallable.getObserver().onComplete(); + } + + @Test + public void testMissingRateLimitInfoFactorDoesNotUpdateRate() throws Exception { + callableToTest.call(request, responseObserver, context); + callableToTest.setLimiterEnabled(true); + + Instant earlier = Instant.now().minus(org.threeten.bp.Duration.ofHours(1)); + + // Make sure rate can be updated. + callableToTest.getNextRateUpdateTime().set(earlier); + double oldQps = callableToTest.getCurrentRate(); + + // A response with invalid RateLimitInfo. + // Missing factor is equivalent to 0. + int periodSeconds = 10; + RateLimitInfo info = + RateLimitInfo.newBuilder() + .setPeriod(Duration.newBuilder().setSeconds(periodSeconds).build()) + .build(); + + MutateRowsResponse response = MutateRowsResponse.newBuilder().setRateLimitInfo(info).build(); + + innerCallable.getObserver().onResponse(response); + + // Give the thread some time to update the rate. + Thread.sleep(100); + double newQps = callableToTest.getCurrentRate(); + + assertThat(newQps).isEqualTo(oldQps); // No change + assertFalse(callableToTest.getLimiterEnabled()); // Rate limiter is also disabled. innerCallable.getObserver().onComplete(); } @Test - public void testNoUpdateWithinPeriod() throws Exception { + public void testNoUpdateBeforeAllowedTime() throws Exception { callableToTest.call(request, responseObserver, context); + callableToTest.setLimiterEnabled(true); - Instant now = Instant.now(); - // make sure QPS will not be updated - callableToTest.getLastQpsChangeTime().set(now); + Instant later = Instant.now().plus(org.threeten.bp.Duration.ofHours(1)); + // Make sure rate will not be updated. + callableToTest.getNextRateUpdateTime().set(later); double oldQps = callableToTest.getCurrentRate(); double factor = 0.3; + int periodSeconds = 10; RateLimitInfo info = RateLimitInfo.newBuilder() .setFactor(factor) - .setPeriod(Duration.newBuilder().setSeconds(600).build()) + .setPeriod(Duration.newBuilder().setSeconds(periodSeconds).build()) .build(); MutateRowsResponse response = MutateRowsResponse.newBuilder().setRateLimitInfo(info).build(); innerCallable.getObserver().onResponse(response); - // Give the thread sometime to update the QPS + // Give the thread some time to update the rate. + Thread.sleep(100); + double newQps = callableToTest.getCurrentRate(); + + assertThat(newQps).isEqualTo(oldQps); // No change. + assertTrue(callableToTest.getLimiterEnabled()); // Still enabled. + + innerCallable.getObserver().onComplete(); + } + + @Test + public void testDoesNotDisableBeforeAllowedTime() throws Exception { + callableToTest.call(request, responseObserver, context); + callableToTest.setLimiterEnabled(true); + + Instant later = Instant.now().plus(org.threeten.bp.Duration.ofHours(1)); + // Make sure limiter will not be disabled. + callableToTest.getNextRateUpdateTime().set(later); + double oldQps = callableToTest.getCurrentRate(); + + // Missing RateLimitInfo disables rate limiting. + MutateRowsResponse response = MutateRowsResponse.newBuilder().build(); + + innerCallable.getObserver().onResponse(response); + + // Give the thread sometime to disable the rate limiter. + Thread.sleep(100); + double newQps = callableToTest.getCurrentRate(); + + assertThat(newQps).isEqualTo(oldQps); // No change on QPS. + assertTrue(callableToTest.getLimiterEnabled()); // Still enabled. + + innerCallable.getObserver().onComplete(); + } + + @Test + public void testEnableWithinPeriodDoesNotUpdateRate() throws Exception { + callableToTest.call(request, responseObserver, context); + callableToTest.setRate(1.5); + + Instant later = Instant.now().plus(org.threeten.bp.Duration.ofHours(1)); + // Even though the rate update time is far in the future, enable is always allowed. + callableToTest.getNextRateUpdateTime().set(later); + double oldQps = callableToTest.getCurrentRate(); + + double factor = 0.3; + int periodSeconds = 600; + + RateLimitInfo info = + RateLimitInfo.newBuilder() + .setFactor(factor) + .setPeriod(Duration.newBuilder().setSeconds(periodSeconds).build()) + .build(); + + MutateRowsResponse response = MutateRowsResponse.newBuilder().setRateLimitInfo(info).build(); + + innerCallable.getObserver().onResponse(response); + + // Give the thread some time to enable the rate limiter. Thread.sleep(100); double newQps = callableToTest.getCurrentRate(); - assertThat(newQps).isEqualTo(oldQps); + assertThat(newQps).isEqualTo(oldQps); // No change on QPS due to QPS update time. + assertTrue(callableToTest.getLimiterEnabled()); // Rate limiting is enabled. innerCallable.getObserver().onComplete(); } @@ -126,7 +292,7 @@ public void testErrorInfoLowerQPS() throws Exception { Instant earlier = Instant.now().minus(org.threeten.bp.Duration.ofHours(1)); // make sure QPS will be updated - callableToTest.getLastQpsChangeTime().set(earlier); + callableToTest.getNextRateUpdateTime().set(earlier); double oldQps = callableToTest.getCurrentRate(); innerCallable