diff --git a/latte/src/main/java/gg/beemo/latte/util/Ratelimit.kt b/latte/src/main/java/gg/beemo/latte/util/SuspendingRatelimit.kt similarity index 77% rename from latte/src/main/java/gg/beemo/latte/util/Ratelimit.kt rename to latte/src/main/java/gg/beemo/latte/util/SuspendingRatelimit.kt index 1e4c166..c521fe4 100644 --- a/latte/src/main/java/gg/beemo/latte/util/Ratelimit.kt +++ b/latte/src/main/java/gg/beemo/latte/util/SuspendingRatelimit.kt @@ -5,44 +5,15 @@ import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.sync.withPermit import kotlin.time.Duration -class Ratelimit(val burst: Int, val duration: Duration) { +class SuspendingRatelimit(private val burst: Int, private val duration: Duration) { @Volatile private var remainingQuota: Int = burst - val remaining: Int get() = remainingQuota @Volatile private var resetTimestamp: Long = 0 - val resetAt: Long get() = resetTimestamp private val quotaRequestSem = Semaphore(1) - suspend fun requestQuota() { - quotaRequestSem.withPermit { - if (remainingQuota <= 0) { - tryResetQuota() - val waitTime = calculateWaitTime() - delay(waitTime) - } - tryResetQuota() - check(remainingQuota > 0) - remainingQuota-- - } - } - - fun tryRequestQuota(): Pair { - tryResetQuota() - if (remainingQuota <= 0) { - return false to calculateWaitTime() - } - check(remainingQuota > 0) - remainingQuota-- - return true to calculateWaitTime() - } - - fun addQuota(amount: Int) { - remainingQuota += amount - } - fun overrideRatelimit( remainingQuota: Int, resetTimestamp: Long, @@ -61,4 +32,29 @@ class Ratelimit(val burst: Int, val duration: Duration) { resetTimestamp = System.currentTimeMillis() + duration.inWholeMilliseconds } } + + suspend fun requestQuota() { + quotaRequestSem.withPermit { + if (remainingQuota <= 0) { + val waitTime = calculateWaitTime() + delay(waitTime) + } + tryResetQuota() + + check(remainingQuota > 0) + remainingQuota-- + } + } + + fun tryRequestQuota(): Pair { + if (remainingQuota <= 0) { + val waitTime = calculateWaitTime() + return false to waitTime + } + tryResetQuota() + + check(remainingQuota > 0) + remainingQuota-- + return true to null + } } diff --git a/vanilla/src/main/java/gg/beemo/vanilla/BrokerRpcRatelimitClient.kt b/vanilla/src/main/java/gg/beemo/vanilla/BrokerRpcRatelimitClient.kt index 3374fb0..1cc7ee6 100644 --- a/vanilla/src/main/java/gg/beemo/vanilla/BrokerRpcRatelimitClient.kt +++ b/vanilla/src/main/java/gg/beemo/vanilla/BrokerRpcRatelimitClient.kt @@ -6,7 +6,7 @@ import gg.beemo.latte.broker.IgnoreRpcRequest import gg.beemo.latte.broker.rpc.RpcStatus import gg.beemo.latte.logging.Log import gg.beemo.latte.ratelimit.SharedRatelimitData -import gg.beemo.latte.util.Ratelimit +import gg.beemo.latte.util.SuspendingRatelimit import java.util.concurrent.ConcurrentHashMap import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds @@ -56,10 +56,10 @@ class BrokerRpcRatelimitClient(connection: BrokerConnection) : BrokerClient(conn private class KafkaRatelimitProvider(private val burst: Int, private val duration: Duration) { - private val limiters = ConcurrentHashMap() + private val limiters = ConcurrentHashMap() - fun getClientRatelimit(clientId: String): Ratelimit = limiters.computeIfAbsent(clientId) { - Ratelimit(burst, duration) + fun getClientRatelimit(clientId: String): SuspendingRatelimit = limiters.computeIfAbsent(clientId) { + SuspendingRatelimit(burst, duration) } }