Skip to content

Commit

Permalink
Revert SuspendingRatelimit changes
Browse files Browse the repository at this point in the history
This ratelimit class is not needed anymore with the new vanilla ratelimit implementation
  • Loading branch information
wasdennnoch committed Nov 28, 2024
1 parent b1acc04 commit e2e47ce
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,15 @@ import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import kotlin.time.Duration

class Ratelimit(val burst: Int, val duration: Duration) {
class SuspendingRatelimit(private val burst: Int, private val duration: Duration) {
@Volatile
private var remainingQuota: Int = burst
val remaining: Int get() = remainingQuota

@Volatile
private var resetTimestamp: Long = 0
val resetAt: Long get() = resetTimestamp

private val quotaRequestSem = Semaphore(1)

suspend fun requestQuota() {
quotaRequestSem.withPermit {
if (remainingQuota <= 0) {
tryResetQuota()
val waitTime = calculateWaitTime()
delay(waitTime)
}
tryResetQuota()
check(remainingQuota > 0)
remainingQuota--
}
}

fun tryRequestQuota(): Pair<Boolean, Long> {
tryResetQuota()
if (remainingQuota <= 0) {
return false to calculateWaitTime()
}
check(remainingQuota > 0)
remainingQuota--
return true to calculateWaitTime()
}

fun addQuota(amount: Int) {
remainingQuota += amount
}

fun overrideRatelimit(
remainingQuota: Int,
resetTimestamp: Long,
Expand All @@ -61,4 +32,29 @@ class Ratelimit(val burst: Int, val duration: Duration) {
resetTimestamp = System.currentTimeMillis() + duration.inWholeMilliseconds
}
}

suspend fun requestQuota() {
quotaRequestSem.withPermit {
if (remainingQuota <= 0) {
val waitTime = calculateWaitTime()
delay(waitTime)
}
tryResetQuota()

check(remainingQuota > 0)
remainingQuota--
}
}

fun tryRequestQuota(): Pair<Boolean, Long?> {
if (remainingQuota <= 0) {
val waitTime = calculateWaitTime()
return false to waitTime
}
tryResetQuota()

check(remainingQuota > 0)
remainingQuota--
return true to null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import gg.beemo.latte.broker.IgnoreRpcRequest
import gg.beemo.latte.broker.rpc.RpcStatus
import gg.beemo.latte.logging.Log
import gg.beemo.latte.ratelimit.SharedRatelimitData
import gg.beemo.latte.util.Ratelimit
import gg.beemo.latte.util.SuspendingRatelimit
import java.util.concurrent.ConcurrentHashMap
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
Expand Down Expand Up @@ -56,10 +56,10 @@ class BrokerRpcRatelimitClient(connection: BrokerConnection) : BrokerClient(conn

private class KafkaRatelimitProvider(private val burst: Int, private val duration: Duration) {

private val limiters = ConcurrentHashMap<String, Ratelimit>()
private val limiters = ConcurrentHashMap<String, SuspendingRatelimit>()

fun getClientRatelimit(clientId: String): Ratelimit = limiters.computeIfAbsent(clientId) {
Ratelimit(burst, duration)
fun getClientRatelimit(clientId: String): SuspendingRatelimit = limiters.computeIfAbsent(clientId) {
SuspendingRatelimit(burst, duration)
}

}

0 comments on commit e2e47ce

Please sign in to comment.