From 236ec2edf2a3514f52591dd6512148f51d55ea21 Mon Sep 17 00:00:00 2001 From: Adrian Paschkowski Date: Sun, 18 Feb 2024 11:54:27 +0100 Subject: [PATCH] Fix BrokerClient topic handling --- .../gg/beemo/latte/broker/BrokerClient.kt | 191 ++++++++++++------ .../gg/beemo/latte/broker/BrokerConnection.kt | 1 + .../java/gg/beemo/latte/broker/Subclients.kt | 20 +- .../latte/broker/kafka/KafkaConnection.kt | 1 + .../gg/beemo/latte/broker/rpc/RpcClient.kt | 2 +- latte/src/main/resources/log4j2.xml | 2 +- .../src/main/java/gg/beemo/vanilla/Vanilla.kt | 11 +- 7 files changed, 161 insertions(+), 67 deletions(-) diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt index 95470f6..43e6660 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt @@ -6,18 +6,8 @@ import gg.beemo.latte.broker.rpc.RpcResponse import gg.beemo.latte.logging.Log import kotlinx.coroutines.* import java.util.Collections +import java.util.concurrent.atomic.AtomicBoolean -private class TopicMetadata( - val topic: String, - val keys: MutableMap, - var connectionListener: TopicListener? = null, -) - -private class KeyMetadata( - val topic: TopicMetadata, - val producers: MutableSet>, - val consumers: MutableSet>, -) abstract class BrokerClient( @PublishedApi @@ -48,7 +38,7 @@ abstract class BrokerClient( ): ConsumerSubclient { log.debug("Creating consumer for key '{}' in topic '{}' with type {}", key, topic, type.name) return ConsumerSubclient(connection, this, topic, key, options, type, isNullable, callback).also { - registerConsumer(it) + registerSubclient(it) } } @@ -70,7 +60,7 @@ abstract class BrokerClient( ): ProducerSubclient { log.debug("Creating producer for key '{}' in topic '{}' with type {}", key, topic, type.name) return ProducerSubclient(connection, this, topic, key, options, type, isNullable).also { - registerProducer(it) + registerSubclient(it) } } @@ -93,74 +83,141 @@ abstract class BrokerClient( ) } - fun destroy(cancelScope: Boolean = true) { - val producers = topics.values.flatMap { metadata -> metadata.keys.values.flatMap { it.producers } } - val consumers = topics.values.flatMap { metadata -> metadata.keys.values.flatMap { it.consumers } } - producers.forEach { - it.destroy() + private fun registerSubclient(subclient: BaseSubclient) { + val topic = subclient.topic + val metadata = topics.computeIfAbsent(topic) { + TopicMetadata(connection, consumerScope, topic) + } + metadata.registerSubclient(subclient) + } + + internal fun deregisterSubclient(subclient: BaseSubclient) { + val topic = subclient.topic + topics[topic]?.let { + it.deregisterSubclient(subclient) + if (it.isEmpty) { + it.destroy() + topics.remove(topic) + } } - consumers.forEach { - it.destroy() + } + + fun destroy(cancelScope: Boolean = true) { + log.debug("Destroying BrokerClient of type {} with active topics {}", javaClass.simpleName, topics.keys) + while (topics.isNotEmpty()) { + val topic = topics.keys.first() + topics[topic]?.destroy() + topics.remove(topic) } - topics.clear() if (cancelScope) { consumerScope.cancel() } } - private fun registerProducer(producer: ProducerSubclient<*>) { - val metadata = getOrCreateKeyMetadata(producer.topic, producer.key) - metadata.producers.add(producer) - } + internal fun toResponseTopic(topic: String): String = + if (connection.supportsTopicHotSwap) "$topic.responses" else topic - private fun registerConsumer(consumer: ConsumerSubclient<*>) { - val metadata = getOrCreateKeyMetadata(consumer.topic, consumer.key) - if (metadata.consumers.isEmpty() && metadata.topic.connectionListener == null) { - // New consumer - create a new connection listener for this topic - val listener = TopicListener { topic, key, value, headers -> - onTopicMessage(topic, key, value, headers) - } - connection.on(consumer.topic, listener) - metadata.topic.connectionListener = listener + internal fun toResponseKey(key: String): String = "$key.response" + +} + +private class TopicMetadata( + private val connection: BrokerConnection, + private val consumerScope: CoroutineScope, + private val topic: String, +) { + + private class KeyMetadata(val key: String) { + val producers: MutableSet> = Collections.synchronizedSet(HashSet()) + val consumers: MutableSet> = Collections.synchronizedSet(HashSet()) + + val isEmpty: Boolean + get() = producers.isEmpty() && consumers.isEmpty() + + fun destroy() { + producers.forEach(ProducerSubclient<*>::destroy) + consumers.forEach(ConsumerSubclient<*>::destroy) + producers.clear() + consumers.clear() } - metadata.consumers.add(consumer) } - internal fun deregisterProducer(producer: ProducerSubclient<*>) { - log.debug("Removing producer for key '{}' in topic '{}'", producer.key, producer.topic) - val metadata = getExistingKeyMetadata(producer.topic, producer.key) - metadata?.producers?.remove(producer) + private val log by Log + private val _keys: MutableMap = Collections.synchronizedMap(HashMap()) + private val isBeingDestroyed = AtomicBoolean(false) + val isEmpty: Boolean + get() = _keys.isEmpty() + + @Volatile + private var connectionListener: TopicListener? = null + + fun registerSubclient(subclient: BaseSubclient) { + log.debug( + "Adding {} for key '{}' in topic '{}'", + subclient.javaClass.simpleName, + subclient.key, + subclient.topic + ) + val metadata = getOrCreateKeyMetadata(subclient.key) + when (subclient) { + is ConsumerSubclient<*> -> { + if (metadata.consumers.isEmpty() && connectionListener == null) { + log.debug("Creating new connection listener for topic '{}'", subclient.topic) + // New consumer - create a new connection listener for this topic + val listener = TopicListener { topic, key, value, headers -> + onTopicMessage(topic, key, value, headers) + } + connection.on(subclient.topic, listener) + connectionListener = listener + } + metadata.consumers.add(subclient) + } + + is ProducerSubclient<*> -> { + metadata.producers.add(subclient) + } + } } - internal fun deregisterConsumer(consumer: ConsumerSubclient<*>) { - log.debug("Removing consumer for key '{}' in topic '{}'", consumer.key, consumer.topic) - val metadata = getExistingKeyMetadata(consumer.topic, consumer.key) - if (metadata?.consumers?.remove(consumer) == true && metadata.consumers.isEmpty()) { - metadata.topic.connectionListener?.let { - connection.off(metadata.topic.topic, it) - metadata.topic.connectionListener = null + fun deregisterSubclient(subclient: BaseSubclient) { + log.debug( + "Removing {} for key '{}' in topic '{}'", + subclient.javaClass.simpleName, + subclient.key, + subclient.topic + ) + val metadata = getExistingKeyMetadata(subclient.key) + metadata?.let { + when (subclient) { + is ConsumerSubclient<*> -> it.consumers.remove(subclient) + is ProducerSubclient<*> -> it.producers.remove(subclient) } + maybeCleanupKeyMetadata(it) } } - private fun getOrCreateKeyMetadata(topic: String, key: String): KeyMetadata { - val topicData = topics.computeIfAbsent(topic) { - TopicMetadata(topic, Collections.synchronizedMap(HashMap())) + private fun maybeCleanupKeyMetadata(keyMetadata: KeyMetadata) { + if (keyMetadata.isEmpty) { + _keys.remove(keyMetadata.key) } - val keyData = topicData.keys.computeIfAbsent(key) { - KeyMetadata(topicData, Collections.synchronizedSet(HashSet()), Collections.synchronizedSet(HashSet())) + if (this.isEmpty) { + connectionListener?.let { + log.debug("Removing connection listener for topic '{}' after key cleanup", topic) + connection.off(topic, it) + connectionListener = null + } } - return keyData } - private fun getExistingKeyMetadata(topic: String, key: String): KeyMetadata? { - return topics[topic]?.keys?.get(key) + private fun getOrCreateKeyMetadata(key: String): KeyMetadata { + return _keys.computeIfAbsent(key) { + KeyMetadata(key) + } } - internal fun toResponseTopic(topic: String): String = - if (connection.supportsTopicHotSwap) "$topic.responses" else topic - - internal fun toResponseKey(key: String): String = "$key.response" + private fun getExistingKeyMetadata(key: String): KeyMetadata? { + return _keys[key] + } private fun onTopicMessage( topic: String, @@ -168,7 +225,7 @@ abstract class BrokerClient( value: String, headers: BrokerMessageHeaders, ) { - val metadata = getExistingKeyMetadata(topic, key) ?: return + val metadata = getExistingKeyMetadata(key) ?: return for (consumer in metadata.consumers) { consumerScope.launch { try { @@ -180,6 +237,22 @@ abstract class BrokerClient( } } + fun destroy() { + if (!isBeingDestroyed.compareAndSet(false, true)) { + return + } + while (_keys.isNotEmpty()) { + val key = _keys.keys.first() + _keys[key]?.destroy() + _keys.remove(key) + } + connectionListener?.let { + log.debug("Removing connection listener for topic '{}' in destroy()", topic) + connection.off(topic, it) + connectionListener = null + } + } + } @PublishedApi diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt index 81a915f..a35ac2b 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt @@ -21,6 +21,7 @@ abstract class BrokerConnection { abstract suspend fun start() open fun destroy() { + log.debug("Destroying BrokerConnection") topicListeners.clear() } diff --git a/latte/src/main/java/gg/beemo/latte/broker/Subclients.kt b/latte/src/main/java/gg/beemo/latte/broker/Subclients.kt index c14cd2e..32f5397 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/Subclients.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/Subclients.kt @@ -10,6 +10,7 @@ import gg.beemo.latte.util.MoshiUnitAdapter import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.coroutineScope import java.time.Instant +import java.util.concurrent.atomic.AtomicBoolean data class BrokerClientOptions( val useSafeJsLongs: Boolean = false, @@ -23,7 +24,16 @@ abstract class BaseSubclient( protected val options: BrokerClientOptions, ) { - internal abstract fun destroy() + private val log by Log + private val isBeingDestroyed = AtomicBoolean(false) + + internal fun destroy() { + if (isBeingDestroyed.compareAndSet(false, true)) { + doDestroy() + } + } + + protected abstract fun doDestroy() protected fun createMoshiAdapter(type: Class): JsonAdapter { val mochi = if (options.useSafeJsLongs) safeJsMoshi else baseMoshi @@ -62,8 +72,8 @@ class ProducerSubclient( private val log by Log private val adapter: JsonAdapter = createMoshiAdapter(requestType) - override fun destroy() { - client.deregisterProducer(this) + override fun doDestroy() { + client.deregisterSubclient(this) } suspend fun send( @@ -128,8 +138,8 @@ class ConsumerSubclient( private val log by Log private val adapter: JsonAdapter = createMoshiAdapter(incomingType) - override fun destroy() { - client.deregisterConsumer(this) + override fun doDestroy() { + client.deregisterSubclient(this) } internal suspend fun onIncomingMessage( diff --git a/latte/src/main/java/gg/beemo/latte/broker/kafka/KafkaConnection.kt b/latte/src/main/java/gg/beemo/latte/broker/kafka/KafkaConnection.kt index 53f644b..ffa64ae 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/kafka/KafkaConnection.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/kafka/KafkaConnection.kt @@ -91,6 +91,7 @@ class KafkaConnection( } override fun destroy() { + log.debug("Destroying KafkaConnection") consumer?.close() consumer = null producer?.close() diff --git a/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcClient.kt b/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcClient.kt index 845888d..6156060 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcClient.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcClient.kt @@ -158,7 +158,7 @@ class RpcClient( } - override fun destroy() { + override fun doDestroy() { requestProducer.destroy() requestConsumer.destroy() } diff --git a/latte/src/main/resources/log4j2.xml b/latte/src/main/resources/log4j2.xml index 281459c..5270fb3 100644 --- a/latte/src/main/resources/log4j2.xml +++ b/latte/src/main/resources/log4j2.xml @@ -1,5 +1,5 @@ - + %d{yyyy-MM-dd HH:mm:ss.SSS} %highlight{%-5level}{FATAL=bg_bright_red, ERROR=bright_red, WARN=bright_yellow, INFO=bright_green, DEBUG=bright_cyan, TRACE=bright_white} [%style{%t}{bright_white}] %style{%logger{36}}{white}: %msg%n%ex diff --git a/vanilla/src/main/java/gg/beemo/vanilla/Vanilla.kt b/vanilla/src/main/java/gg/beemo/vanilla/Vanilla.kt index 1801eac..1db0ed9 100644 --- a/vanilla/src/main/java/gg/beemo/vanilla/Vanilla.kt +++ b/vanilla/src/main/java/gg/beemo/vanilla/Vanilla.kt @@ -4,7 +4,9 @@ import gg.beemo.latte.CommonConfig import gg.beemo.latte.broker.kafka.KafkaConnection import gg.beemo.latte.config.Configurator import gg.beemo.latte.logging.Log +import gg.beemo.latte.logging.log import kotlinx.coroutines.runBlocking +import org.apache.logging.log4j.LogManager object Vanilla { @@ -25,7 +27,14 @@ object Vanilla { ) log.debug("Initializing Kafka Ratelimit client") - RatelimitClient(brokerConnection) + val ratelimitClient = RatelimitClient(brokerConnection) + + Runtime.getRuntime().addShutdownHook(Thread({ + log.info("Destroying everything") + ratelimitClient.destroy() + brokerConnection.destroy() + LogManager.shutdown(true, true) + }, "Vanilla Shutdown Hook")) log.debug("Starting Kafka connection") brokerConnection.start()