diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt index 43e6660..5c53c59 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt @@ -114,11 +114,6 @@ abstract class BrokerClient( } } - internal fun toResponseTopic(topic: String): String = - if (connection.supportsTopicHotSwap) "$topic.responses" else topic - - internal fun toResponseKey(key: String): String = "$key.response" - } private class TopicMetadata( @@ -127,21 +122,6 @@ private class TopicMetadata( private val topic: String, ) { - private class KeyMetadata(val key: String) { - val producers: MutableSet> = Collections.synchronizedSet(HashSet()) - val consumers: MutableSet> = Collections.synchronizedSet(HashSet()) - - val isEmpty: Boolean - get() = producers.isEmpty() && consumers.isEmpty() - - fun destroy() { - producers.forEach(ProducerSubclient<*>::destroy) - consumers.forEach(ConsumerSubclient<*>::destroy) - producers.clear() - consumers.clear() - } - } - private val log by Log private val _keys: MutableMap = Collections.synchronizedMap(HashMap()) private val isBeingDestroyed = AtomicBoolean(false) @@ -158,6 +138,9 @@ private class TopicMetadata( subclient.key, subclient.topic ) + check(subclient.topic == topic) { + "Attempting to register subclient with topic '${subclient.topic}' in TopicMetadata of '$topic'" + } val metadata = getOrCreateKeyMetadata(subclient.key) when (subclient) { is ConsumerSubclient<*> -> { @@ -255,6 +238,21 @@ private class TopicMetadata( } +private class KeyMetadata(val key: String) { + val producers: MutableSet> = Collections.synchronizedSet(HashSet()) + val consumers: MutableSet> = Collections.synchronizedSet(HashSet()) + + val isEmpty: Boolean + get() = producers.isEmpty() && consumers.isEmpty() + + fun destroy() { + producers.forEach(ProducerSubclient<*>::destroy) + consumers.forEach(ConsumerSubclient<*>::destroy) + producers.clear() + consumers.clear() + } +} + @PublishedApi internal inline fun isTypeNullable(): Boolean { return null is T || T::class.java == Unit::class.java || T::class.java == Void::class.java diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt index 2651652..91271ed 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt @@ -85,6 +85,7 @@ abstract class BrokerConnection { listeners.remove(cb) if (listeners.size == 0) { log.debug("Removing topic '{}'", topic) + deferredTopicsToCreate.remove(topic) removeTopic(topic) null } else { diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerMessageHeaders.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerMessageHeaders.kt index f9937ac..559da34 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerMessageHeaders.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerMessageHeaders.kt @@ -21,29 +21,17 @@ open class BrokerMessageHeaders(val headers: Map) { } constructor( - sourceService: String, - sourceInstance: String, + connection: BrokerConnection, targetServices: Set, targetInstances: Set, ) : this( createHeadersMap( - sourceService, - sourceInstance, + connection.serviceName, + connection.instanceId, targetServices, targetInstances, null, - ) - ) - - constructor( - connection: BrokerConnection, - targetServices: Set, - targetInstances: Set, - ) : this( - connection.serviceName, - connection.instanceId, - targetServices, - targetInstances, + ), ) companion object { @@ -54,6 +42,7 @@ open class BrokerMessageHeaders(val headers: Map) { private const val HEADER_TARGET_INSTANCES = "target-instances" private const val HEADER_MESSAGE_ID = "message-id" + // Needs to be JvmStatic to be used in subclasses @JvmStatic protected fun createHeadersMap( sourceService: String, @@ -66,23 +55,17 @@ open class BrokerMessageHeaders(val headers: Map) { val headers = HashMap() headers[HEADER_SOURCE_SERVICE] = sourceService headers[HEADER_SOURCE_INSTANCE] = sourceInstance - headers[HEADER_TARGET_SERVICES] = joinToString(targetServices) - headers[HEADER_TARGET_INSTANCES] = joinToString(targetInstances) + headers[HEADER_TARGET_SERVICES] = targetServices.joinToString(",") + headers[HEADER_TARGET_INSTANCES] = targetInstances.joinToString(",") headers[HEADER_MESSAGE_ID] = messageId ?: UUID.randomUUID().toString() headers.putAll(extra) return headers } - @JvmStatic protected fun splitToSet(value: String): Set { return value.split(",").filter { it.isNotEmpty() }.toSet() } - @JvmStatic - protected fun joinToString(value: Set): String { - return value.joinToString(",") - } - } } diff --git a/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcClient.kt b/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcClient.kt index 4ca0b28..0f5a2c7 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcClient.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcClient.kt @@ -41,8 +41,8 @@ class RpcClient( private val requestConsumer = client.consumer(topic, key, options, requestType, requestIsNullable) { msg -> suspend fun sendResponse(response: ResponseT?, status: RpcStatus, isException: Boolean, isUpdate: Boolean) { val responseMsg = RpcResponseMessage( - client.toResponseTopic(topic), - client.toResponseKey(key), + toResponseTopic(topic), + toResponseKey(key), response, RpcMessageHeaders( connection, @@ -71,7 +71,7 @@ class RpcClient( return@consumer } catch (ex: Exception) { log.error( - "Uncaught RPC callbac#k error while processing message ${msg.headers.messageId} " + + "Uncaught RPC callback error while processing message ${msg.headers.messageId} " + "with key '$key' in topic '$topic'", ex, ) @@ -79,16 +79,16 @@ class RpcClient( } } private val responseProducer = client.producer( - client.toResponseTopic(topic), - client.toResponseKey(key), + toResponseTopic(topic), + toResponseKey(key), options, responseType, responseIsNullable, ) private val responseFlow = MutableSharedFlow>() private val responseConsumer = client.consumer( - client.toResponseTopic(topic), - client.toResponseKey(key), + toResponseTopic(topic), + toResponseKey(key), options, responseType, responseIsNullable, @@ -160,6 +160,11 @@ class RpcClient( } + private fun toResponseTopic(topic: String): String = + if (connection.supportsTopicHotSwap) "$topic.responses" else topic + + private fun toResponseKey(key: String): String = "$key.response" + override fun doDestroy() { requestProducer.destroy() requestConsumer.destroy() diff --git a/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcMessageHeaders.kt b/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcMessageHeaders.kt index 4545143..de4c112 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcMessageHeaders.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/rpc/RpcMessageHeaders.kt @@ -23,8 +23,7 @@ class RpcMessageHeaders(headers: Map) : BrokerMessageHeaders(hea constructor(base: BrokerMessageHeaders) : this(base.headers) constructor( - sourceService: String, - sourceInstance: String, + connection: BrokerConnection, targetServices: Set, targetInstances: Set, inReplyTo: MessageId, @@ -33,46 +32,25 @@ class RpcMessageHeaders(headers: Map) : BrokerMessageHeaders(hea isUpdate: Boolean, ) : this( createHeadersMap( - sourceService, - sourceInstance, + connection.serviceName, + connection.instanceId, targetServices, targetInstances, null, - extra = mapOf( - HEADER_IN_REPLY_TO to inReplyTo, - HEADER_STATUS to status.code.toString(), - HEADER_IS_EXCEPTION to isException.toString(), - HEADER_IS_UPDATE to isUpdate.toString(), - ) - ) - ) - - constructor( - connection: BrokerConnection, - targetServices: Set, - targetInstances: Set, - inReplyTo: MessageId, - status: RpcStatus, - isException: Boolean, - isUpdate: Boolean, - ) : this( - connection.serviceName, - connection.instanceId, - targetServices, - targetInstances, - inReplyTo, - status, - isException, - isUpdate, + extra = + mapOf( + HEADER_IN_REPLY_TO to inReplyTo, + HEADER_STATUS to status.code.toString(), + HEADER_IS_EXCEPTION to isException.toString(), + HEADER_IS_UPDATE to isUpdate.toString(), + ), + ), ) companion object { - private const val HEADER_IN_REPLY_TO = "rpc-in-reply-to" private const val HEADER_STATUS = "rpc-response-status" private const val HEADER_IS_EXCEPTION = "rpc-is-exception" private const val HEADER_IS_UPDATE = "rpc-is-update" - } - }