diff --git a/proto/ratelimit.proto b/proto/ratelimit.proto index a98216f..eac2c05 100644 --- a/proto/ratelimit.proto +++ b/proto/ratelimit.proto @@ -7,20 +7,19 @@ option java_outer_classname = "RatelimitProto"; package ratelimit; service Ratelimit { - rpc RequestQuota (RatelimitRequest) returns (RatelimitResponse) {} + rpc ReserveQuota (RatelimitRequest) returns (RatelimitQuota) {} } message RatelimitRequest { RatelimitType type = 1; fixed64 clientId = 2; + optional bool probeOnly = 3; + optional uint32 maxDelay = 4; } -message RatelimitResponse { +message RatelimitQuota { bool granted = 1; - uint32 limit = 2; - uint32 remaining = 3; - uint64 reset = 4; - uint32 resetAfter = 5; + uint64 at = 2; } enum RatelimitType { diff --git a/vanilla/src/main/java/gg/beemo/vanilla/KafkaRatelimitClient.kt b/vanilla/src/main/java/gg/beemo/vanilla/BrokerRpcRatelimitClient.kt similarity index 96% rename from vanilla/src/main/java/gg/beemo/vanilla/KafkaRatelimitClient.kt rename to vanilla/src/main/java/gg/beemo/vanilla/BrokerRpcRatelimitClient.kt index d3183fa..3374fb0 100644 --- a/vanilla/src/main/java/gg/beemo/vanilla/KafkaRatelimitClient.kt +++ b/vanilla/src/main/java/gg/beemo/vanilla/BrokerRpcRatelimitClient.kt @@ -14,7 +14,7 @@ import kotlin.time.Duration.Companion.seconds // Give request expiry a bit of leeway in case of clock drift private val EXPIRY_GRACE_PERIOD = 5.seconds.inWholeMilliseconds -class KafkaRatelimitClient(connection: BrokerConnection) : BrokerClient(connection) { +class BrokerRpcRatelimitClient(connection: BrokerConnection) : BrokerClient(connection) { private val log by Log private val globalRatelimitProvider = KafkaRatelimitProvider(50, 1.seconds) diff --git a/vanilla/src/main/java/gg/beemo/vanilla/GrpcRatelimitService.kt b/vanilla/src/main/java/gg/beemo/vanilla/GrpcRatelimitService.kt index f906f09..8d9c119 100644 --- a/vanilla/src/main/java/gg/beemo/vanilla/GrpcRatelimitService.kt +++ b/vanilla/src/main/java/gg/beemo/vanilla/GrpcRatelimitService.kt @@ -1,47 +1,115 @@ package gg.beemo.vanilla import gg.beemo.latte.logging.Log -import gg.beemo.latte.util.Ratelimit +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import java.util.* import java.util.concurrent.ConcurrentHashMap import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds -// TODO https://github.com/grpc/grpc-kotlin/blob/master/examples/server/src/main/kotlin/io/grpc/examples/animals/AnimalsServer.kt - class GrpcRatelimitService : RatelimitGrpcKt.RatelimitCoroutineImplBase() { private val log by Log - private val globalRatelimits = RatelimitMap(50, 1.seconds) - private val identifyRatelimits = RatelimitMap(1, 5.seconds) + private val globalRatelimits = ClientRatelimits(50, 1.seconds) + private val identifyRatelimits = ClientRatelimits(1, 5.seconds) - override suspend fun requestQuota(request: RatelimitRequest): RatelimitResponse { + override suspend fun reserveQuota(request: RatelimitRequest): RatelimitQuota { val (ratelimitMap, typeString) = when (request.type) { RatelimitType.GLOBAL -> globalRatelimits to "global" RatelimitType.IDENTIFY -> identifyRatelimits to "identify" else -> throw IllegalArgumentException("Unknown ratelimit type ${request.type}") } + val ratelimit = ratelimitMap.getClientRatelimit(request.clientId) - // TODO Do we want to make a blocking version? - val (granted, resetAfter) = ratelimit.tryRequestQuota() - log.debug("Got '{}' quota request from clientId {}, was tranted: {}", typeString, request.clientId, granted) + val maxDelay = if (request.hasMaxDelay()) request.maxDelay.toLong() else null + val (granted, at) = ratelimit.reserveQuota(request.probeOnly, maxDelay) + + if (request.probeOnly) { + log.debug("Probed {} quota slot for clientId {} is at {}", typeString, request.clientId, at) + } else if (granted) { + log.debug("Reserved {} quota slot for clientId {} at {}", typeString, request.clientId, at) + } else { + val maxTimestamp = if (maxDelay != null) System.currentTimeMillis() + maxDelay else null + log.debug( + "Failed to reserve {} quota slot for clientId {}, next slot would be at {}, requested max delay was {} (-> {})", + typeString, + request.clientId, + at, + maxDelay, + maxTimestamp + ) + } - return ratelimitResponse { + return ratelimitQuota { this.granted = granted - this.limit = ratelimit.burst - this.remaining = ratelimit.remaining - this.reset = ratelimit.resetAt - this.resetAfter = resetAfter.toInt() + this.at = at } } } -private class RatelimitMap(private val burst: Int, private val duration: Duration) { +private class ClientRatelimits(private val burst: Int, private val duration: Duration) { - private val limiters = ConcurrentHashMap() + private val limiters = ConcurrentHashMap() - fun getClientRatelimit(clientId: Long): Ratelimit = limiters.computeIfAbsent(clientId) { - Ratelimit(burst, duration) + fun getClientRatelimit(clientId: Long): RatelimitQueue = limiters.computeIfAbsent(clientId) { + RatelimitQueue(burst, duration) } } + +data class RatelimitSlot( + var usedQuota: Int, + val startsAt: Long, + val endsAt: Long, +) + +private class RatelimitQueue(private val burst: Int, private val duration: Duration) { + + private val queue = LinkedList() + private val lock = Mutex() + + suspend fun reserveQuota(probeOnly: Boolean = false, maxDelay: Long? = null): Pair = + lock.withLock { + val now = System.currentTimeMillis() + + // Clean up expired slots + while (queue.isNotEmpty() && now > queue.first.endsAt) { + queue.removeFirst() + } + + // Find free slot at the end of the queue + val lastSlot = queue.lastOrNull() + // No slots are used, so ratelimit is immediately available + if (lastSlot == null) { + // No timeout to check if we can immediately grant quota + if (probeOnly) { + return@withLock false to 0 + } + queue.add(RatelimitSlot(1, now, now + duration.inWholeMilliseconds)) + return@withLock true to 0 + } + + // Check if slot still has quota available + if (lastSlot.usedQuota < burst) { + val exceedsDelay = maxDelay != null && lastSlot.startsAt > now + maxDelay + if (exceedsDelay || probeOnly) { + return@withLock false to lastSlot.startsAt + } + lastSlot.usedQuota++ + return@withLock true to lastSlot.startsAt + } + + // Slot is full, create new slot + val exceedsDelay = maxDelay != null && lastSlot.endsAt > now + maxDelay + if (exceedsDelay || probeOnly) { + return@withLock false to lastSlot.endsAt + } + val nextStart = lastSlot.endsAt + val nextEnd = nextStart + duration.inWholeMilliseconds + queue.add(RatelimitSlot(1, nextStart, nextEnd)) + return@withLock true to nextStart + } + +} diff --git a/vanilla/src/main/java/gg/beemo/vanilla/Vanilla.kt b/vanilla/src/main/java/gg/beemo/vanilla/Vanilla.kt index e847a91..bcf6baf 100644 --- a/vanilla/src/main/java/gg/beemo/vanilla/Vanilla.kt +++ b/vanilla/src/main/java/gg/beemo/vanilla/Vanilla.kt @@ -4,7 +4,6 @@ import gg.beemo.latte.CommonConfig import gg.beemo.latte.broker.rabbitmq.RabbitConnection import gg.beemo.latte.config.Configurator import gg.beemo.latte.logging.Log -import gg.beemo.latte.logging.log import io.grpc.Server import io.grpc.ServerBuilder import kotlinx.coroutines.runBlocking @@ -30,9 +29,10 @@ object Vanilla { password = Config.RABBIT_PASSWORD, ) - log.debug("Initializing Kafka Ratelimit client") - val ratelimitClient = KafkaRatelimitClient(brokerConnection) + log.debug("Initializing Broker Ratelimit client") + val ratelimitClient = BrokerRpcRatelimitClient(brokerConnection) + log.debug("Initializing gRPC Ratelimit client") val grpcServer: Server = ServerBuilder.forPort(Config.GRPC_PORT) .addService(GrpcRatelimitService()) .build()