Skip to content

Commit

Permalink
Switch to non-blocking quota reserving using ratelimit queue
Browse files Browse the repository at this point in the history
  • Loading branch information
wasdennnoch committed Nov 28, 2024
1 parent 47c1927 commit b1acc04
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 28 deletions.
11 changes: 5 additions & 6 deletions proto/ratelimit.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,19 @@ option java_outer_classname = "RatelimitProto";
package ratelimit;

service Ratelimit {
rpc RequestQuota (RatelimitRequest) returns (RatelimitResponse) {}
rpc ReserveQuota (RatelimitRequest) returns (RatelimitQuota) {}
}

message RatelimitRequest {
RatelimitType type = 1;
fixed64 clientId = 2;
optional bool probeOnly = 3;
optional uint32 maxDelay = 4;
}

message RatelimitResponse {
message RatelimitQuota {
bool granted = 1;
uint32 limit = 2;
uint32 remaining = 3;
uint64 reset = 4;
uint32 resetAfter = 5;
uint64 at = 2;
}

enum RatelimitType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import kotlin.time.Duration.Companion.seconds
// Give request expiry a bit of leeway in case of clock drift
private val EXPIRY_GRACE_PERIOD = 5.seconds.inWholeMilliseconds

class KafkaRatelimitClient(connection: BrokerConnection) : BrokerClient(connection) {
class BrokerRpcRatelimitClient(connection: BrokerConnection) : BrokerClient(connection) {

private val log by Log
private val globalRatelimitProvider = KafkaRatelimitProvider(50, 1.seconds)
Expand Down
104 changes: 86 additions & 18 deletions vanilla/src/main/java/gg/beemo/vanilla/GrpcRatelimitService.kt
Original file line number Diff line number Diff line change
@@ -1,47 +1,115 @@
package gg.beemo.vanilla

import gg.beemo.latte.logging.Log
import gg.beemo.latte.util.Ratelimit
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds

// TODO https://github.com/grpc/grpc-kotlin/blob/master/examples/server/src/main/kotlin/io/grpc/examples/animals/AnimalsServer.kt

class GrpcRatelimitService : RatelimitGrpcKt.RatelimitCoroutineImplBase() {

private val log by Log
private val globalRatelimits = RatelimitMap(50, 1.seconds)
private val identifyRatelimits = RatelimitMap(1, 5.seconds)
private val globalRatelimits = ClientRatelimits(50, 1.seconds)
private val identifyRatelimits = ClientRatelimits(1, 5.seconds)

override suspend fun requestQuota(request: RatelimitRequest): RatelimitResponse {
override suspend fun reserveQuota(request: RatelimitRequest): RatelimitQuota {
val (ratelimitMap, typeString) = when (request.type) {
RatelimitType.GLOBAL -> globalRatelimits to "global"
RatelimitType.IDENTIFY -> identifyRatelimits to "identify"
else -> throw IllegalArgumentException("Unknown ratelimit type ${request.type}")
}

val ratelimit = ratelimitMap.getClientRatelimit(request.clientId)
// TODO Do we want to make a blocking version?
val (granted, resetAfter) = ratelimit.tryRequestQuota()
log.debug("Got '{}' quota request from clientId {}, was tranted: {}", typeString, request.clientId, granted)
val maxDelay = if (request.hasMaxDelay()) request.maxDelay.toLong() else null
val (granted, at) = ratelimit.reserveQuota(request.probeOnly, maxDelay)

if (request.probeOnly) {
log.debug("Probed {} quota slot for clientId {} is at {}", typeString, request.clientId, at)
} else if (granted) {
log.debug("Reserved {} quota slot for clientId {} at {}", typeString, request.clientId, at)
} else {
val maxTimestamp = if (maxDelay != null) System.currentTimeMillis() + maxDelay else null
log.debug(
"Failed to reserve {} quota slot for clientId {}, next slot would be at {}, requested max delay was {} (-> {})",
typeString,
request.clientId,
at,
maxDelay,
maxTimestamp
)
}

return ratelimitResponse {
return ratelimitQuota {
this.granted = granted
this.limit = ratelimit.burst
this.remaining = ratelimit.remaining
this.reset = ratelimit.resetAt
this.resetAfter = resetAfter.toInt()
this.at = at
}
}

}

private class RatelimitMap(private val burst: Int, private val duration: Duration) {
private class ClientRatelimits(private val burst: Int, private val duration: Duration) {

private val limiters = ConcurrentHashMap<Long, Ratelimit>()
private val limiters = ConcurrentHashMap<Long, RatelimitQueue>()

fun getClientRatelimit(clientId: Long): Ratelimit = limiters.computeIfAbsent(clientId) {
Ratelimit(burst, duration)
fun getClientRatelimit(clientId: Long): RatelimitQueue = limiters.computeIfAbsent(clientId) {
RatelimitQueue(burst, duration)
}

}

data class RatelimitSlot(
var usedQuota: Int,
val startsAt: Long,
val endsAt: Long,
)

private class RatelimitQueue(private val burst: Int, private val duration: Duration) {

private val queue = LinkedList<RatelimitSlot>()
private val lock = Mutex()

suspend fun reserveQuota(probeOnly: Boolean = false, maxDelay: Long? = null): Pair<Boolean, Long> =
lock.withLock {
val now = System.currentTimeMillis()

// Clean up expired slots
while (queue.isNotEmpty() && now > queue.first.endsAt) {
queue.removeFirst()
}

// Find free slot at the end of the queue
val lastSlot = queue.lastOrNull()
// No slots are used, so ratelimit is immediately available
if (lastSlot == null) {
// No timeout to check if we can immediately grant quota
if (probeOnly) {
return@withLock false to 0
}
queue.add(RatelimitSlot(1, now, now + duration.inWholeMilliseconds))
return@withLock true to 0
}

// Check if slot still has quota available
if (lastSlot.usedQuota < burst) {
val exceedsDelay = maxDelay != null && lastSlot.startsAt > now + maxDelay
if (exceedsDelay || probeOnly) {
return@withLock false to lastSlot.startsAt
}
lastSlot.usedQuota++
return@withLock true to lastSlot.startsAt
}

// Slot is full, create new slot
val exceedsDelay = maxDelay != null && lastSlot.endsAt > now + maxDelay
if (exceedsDelay || probeOnly) {
return@withLock false to lastSlot.endsAt
}
val nextStart = lastSlot.endsAt
val nextEnd = nextStart + duration.inWholeMilliseconds
queue.add(RatelimitSlot(1, nextStart, nextEnd))
return@withLock true to nextStart
}

}
6 changes: 3 additions & 3 deletions vanilla/src/main/java/gg/beemo/vanilla/Vanilla.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import gg.beemo.latte.CommonConfig
import gg.beemo.latte.broker.rabbitmq.RabbitConnection
import gg.beemo.latte.config.Configurator
import gg.beemo.latte.logging.Log
import gg.beemo.latte.logging.log
import io.grpc.Server
import io.grpc.ServerBuilder
import kotlinx.coroutines.runBlocking
Expand All @@ -30,9 +29,10 @@ object Vanilla {
password = Config.RABBIT_PASSWORD,
)

log.debug("Initializing Kafka Ratelimit client")
val ratelimitClient = KafkaRatelimitClient(brokerConnection)
log.debug("Initializing Broker Ratelimit client")
val ratelimitClient = BrokerRpcRatelimitClient(brokerConnection)

log.debug("Initializing gRPC Ratelimit client")
val grpcServer: Server = ServerBuilder.forPort(Config.GRPC_PORT)
.addService(GrpcRatelimitService())
.build()
Expand Down

0 comments on commit b1acc04

Please sign in to comment.