Skip to content

Commit

Permalink
Start implementing RabbitConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
wasdennnoch committed Mar 8, 2024
1 parent 852c121 commit 9b88197
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 0 deletions.
4 changes: 4 additions & 0 deletions latte/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ dependencies {
implementation("org.apache.kafka:kafka-clients:$kafkaVersion")
implementation("org.apache.kafka:kafka-streams:$kafkaVersion")

// RabbitMQ
val rabbitVersion = "5.20.0"
implementation("com.rabbitmq:amqp-client:$rabbitVersion")

// JSON
val moshiVersion = "1.14.0"
implementation("com.squareup.moshi:moshi:$moshiVersion")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package gg.beemo.latte.broker.rabbitmq

import com.rabbitmq.client.Address
import com.rabbitmq.client.Channel
import com.rabbitmq.client.Connection
import com.rabbitmq.client.ConnectionFactory
import gg.beemo.latte.broker.BrokerConnection
import gg.beemo.latte.broker.BrokerMessageHeaders
import gg.beemo.latte.broker.MessageId
import gg.beemo.latte.logging.Log
import java.util.Collections

// TODO Implementation considerations:
// - Channels are not thread-safe (for sending); need to create a coroutine-wrapper around them
// - "Consuming in one thread and publishing in another thread on a shared channel can be safe."

class RabbitConnection(
rabbitHosts: Array<String>,
override val serviceName: String,
override val instanceId: String,
private val useTls: Boolean = false,
) : BrokerConnection() {

override val supportsTopicHotSwap = true
private val rabbitAddresses = rabbitHosts.map(Address::parseAddress)
private val log by Log

private var connection: Connection? = null
private val channels = Collections.synchronizedMap(HashMap<String, Channel>())

override suspend fun start() {
connection = ConnectionFactory().apply {
if (useTls) {
// TODO This will trust every cert, even self-signed ones
useSslProtocol()
}
}.newConnection(rabbitAddresses, instanceId)
// TODO Create exchange
}

override suspend fun abstractSend(
topic: String,
key: String,
value: String,
headers: BrokerMessageHeaders
): MessageId {
TODO()
}

override fun destroy() {
log.debug("Destroying RabbitConnection")
connection?.close()
connection = null
super.destroy()
}

override fun createTopic(topic: String) {
val channel = channels.computeIfAbsent(topic) {
val connection = checkNotNull(connection) { "Connection not open" }
connection.createChannel()
}
// TODO Consume
}

override fun removeTopic(topic: String) {
val channel = channels.remove(topic)
channel?.close()
}

}

0 comments on commit 9b88197

Please sign in to comment.