Skip to content

Commit

Permalink
tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
wasdennnoch committed Dec 17, 2023
1 parent 03323c1 commit b7954a3
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 4 deletions.
5 changes: 2 additions & 3 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ private class TopicMetadata(

private class KeyMetadata(
val topic: TopicMetadata,
val key: String,
val producers: MutableSet<ProducerSubclient<*>>,
val consumers: MutableSet<ConsumerSubclient<*>>,
)
Expand Down Expand Up @@ -76,7 +75,7 @@ abstract class BrokerClient(
topic: String,
key: String,
options: BrokerClientOptions = BrokerClientOptions(),
noinline callback: suspend CoroutineScope.(BaseRpcRequestMessage<RequestT, ResponseT>) -> Pair<RpcStatus, ResponseT>,
noinline callback: suspend CoroutineScope.(BaseRpcRequestMessage<RequestT, ResponseT>) -> RpcResponse<ResponseT>,
): RpcClient<RequestT, ResponseT> {
return RpcClient(
this,
Expand Down Expand Up @@ -146,7 +145,7 @@ abstract class BrokerClient(
TopicMetadata(topic, Collections.synchronizedMap(HashMap()))
}
val keyData = topicData.keys.computeIfAbsent(key) {
KeyMetadata(topicData, key, Collections.synchronizedSet(HashSet()), Collections.synchronizedSet(HashSet()))
KeyMetadata(topicData, Collections.synchronizedSet(HashSet()), Collections.synchronizedSet(HashSet()))
}
return keyData
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class ConsumerSubclient<T>(

}

typealias RpcResponse<ResponseT> = Pair<RpcStatus, ResponseT>

class RpcClient<RequestT, ResponseT>(
client: BrokerClient,
topic: String,
Expand All @@ -178,7 +180,7 @@ class RpcClient<RequestT, ResponseT>(
requestIsNullable: Boolean,
private val responseType: Class<ResponseT>,
private val responseIsNullable: Boolean,
private val callback: suspend CoroutineScope.(BaseRpcRequestMessage<RequestT, ResponseT>) -> Pair<RpcStatus, ResponseT>,
private val callback: suspend CoroutineScope.(BaseRpcRequestMessage<RequestT, ResponseT>) -> RpcResponse<ResponseT>,
) : BaseSubclient(
client.connection,
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gg.beemo.vanilla
import gg.beemo.latte.broker.BrokerClient
import gg.beemo.latte.broker.BrokerConnection
import gg.beemo.latte.broker.IgnoreRpcRequest
import gg.beemo.latte.broker.RpcStatus
import gg.beemo.latte.logging.Log
import gg.beemo.latte.ratelimit.SharedRatelimitData
import gg.beemo.latte.util.SuspendingRatelimit
Expand Down Expand Up @@ -46,6 +47,8 @@ class RatelimitClient(connection: BrokerConnection) : BrokerClient(connection) {
provider.getClientRatelimit(clientId).requestQuota()

log.debug("Granted {} quota request for service {}", type, service)

return@rpc RpcStatus.OK to Unit
}
}

Expand Down

0 comments on commit b7954a3

Please sign in to comment.