Skip to content

Commit

Permalink
Merge pull request #6 from VEuPathDB/client-close-handling
Browse files Browse the repository at this point in the history
Client close handling
  • Loading branch information
Foxcapades authored Aug 14, 2024
2 parents cd7b24b + 69444f9 commit a4dca47
Show file tree
Hide file tree
Showing 30 changed files with 928 additions and 273 deletions.
6 changes: 3 additions & 3 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
`java-library`
`maven-publish`
kotlin("jvm") version "1.9.20"
id("org.jetbrains.dokka") version "1.9.10"
kotlin("jvm") version "1.9.23"
id("org.jetbrains.dokka") version "1.9.20"
}

group = "org.veupathdb.lib"
version = "1.3.0"
version = "2.0.0"

repositories {
mavenCentral()
Expand Down
8 changes: 6 additions & 2 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ nothing:

.PHONY: end-to-end
end-to-end:
@docker-compose -f test/docker-compose.yml build \
@docker compose -f test/docker-compose.yml build \
--build-arg=GITHUB_USERNAME=$(shell grep 'gpr.user' ~/.gradle/gradle.properties | cut -d= -f2) \
--build-arg=GITHUB_TOKEN=$(shell grep 'gpr.key' ~/.gradle/gradle.properties | cut -d= -f2)
@docker-compose -f test/docker-compose.yml up | grep --color=always -v rabbit_1
@docker compose -f test/docker-compose.yml up

.PHONY: kill-tests
kill-tests:
@docker compose -f test/docker-compose.yml down --rmi local -v

.PHONY: docs
docs:
Expand Down
2 changes: 2 additions & 0 deletions rabbitmq.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# 5 second timeout message ack timeout for error recovery testing.
consumer_timeout = 5000
2 changes: 1 addition & 1 deletion readme.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Client/server library for utilizing RabbitMQ as a job queue.
.Gradle
[source, kotlin]
----
implementation("org.veupathdb.lib:rabbit-job-queue:1.0.0")
implementation("org.veupathdb.lib:rabbit-job-queue:2.0.0")
----

=== Worker / Client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package org.veupathdb.lib.rabbit.jobs
import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.DeliverCallback
import org.slf4j.LoggerFactory
import org.veupathdb.lib.rabbit.jobs.config.QueueConfig
import org.veupathdb.lib.rabbit.jobs.fn.ErrorHandler
import org.veupathdb.lib.rabbit.jobs.fn.SuccessHandler
import org.veupathdb.lib.rabbit.jobs.model.ErrorNotification
Expand All @@ -11,21 +12,31 @@ import org.veupathdb.lib.rabbit.jobs.model.SuccessNotification
import org.veupathdb.lib.rabbit.jobs.pools.ErrorHandlers
import org.veupathdb.lib.rabbit.jobs.pools.SuccessHandlers
import org.veupathdb.lib.rabbit.jobs.serialization.Json
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

/**
* Job dispatcher.
*/
class QueueDispatcher : QueueWrapper {
class JobQueueDispatcher : QueueWrapper {

private val Log = LoggerFactory.getLogger(javaClass)

private val errorHandlers = ErrorHandlers()

private val successHandlers = SuccessHandlers()

constructor(config: QueueConfig): super(config)
private val workers: ExecutorService

constructor(action: QueueConfig.() -> Unit): super(action)
constructor(config: QueueConfig): super(config) {
workers = Executors.newFixedThreadPool(config.executor.workers)
initCallbacks()
}

constructor(action: QueueConfig.() -> Unit): super(action) {
workers = Executors.newFixedThreadPool(config.executor.workers)
initCallbacks()
}

/**
* Registers a callback to be executed on job success notification.
Expand Down Expand Up @@ -57,7 +68,7 @@ class QueueDispatcher : QueueWrapper {
withDispatchQueue { publish(dispatchQueueName, job) }
}

override fun initCallbacks() {
private fun initCallbacks() {
withErrorQueue {
basicConsume(
errorQueueName,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,67 @@
package org.veupathdb.lib.rabbit.jobs

import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.DeliverCallback
import org.slf4j.LoggerFactory
import org.veupathdb.lib.rabbit.jobs.config.QueueConfig
import org.veupathdb.lib.rabbit.jobs.fn.JobHandler
import org.veupathdb.lib.rabbit.jobs.model.ErrorNotification
import org.veupathdb.lib.rabbit.jobs.model.JobDispatch
import org.veupathdb.lib.rabbit.jobs.model.SuccessNotification
import org.veupathdb.lib.rabbit.jobs.pools.exec.ExecutorPoolConfig
import org.veupathdb.lib.rabbit.jobs.pools.JobHandlers
import org.veupathdb.lib.rabbit.jobs.serialization.Json
import org.veupathdb.lib.rabbit.jobs.pools.exec.JobQueueExecutorPool

/**
* Job executor end of the job queue.
* Job execution end of the job queue.
*/
class QueueWorker : QueueWrapper {
class JobQueueExecutor : QueueWrapper {

private val Log = LoggerFactory.getLogger(javaClass)

private val handlers = JobHandlers()

private val executorPool: JobQueueExecutorPool

/**
* Instantiates a new QueueWorker based on the given configuration.
*
* @param config Configuration for the RabbitMQ connections.
*/
constructor(config: QueueConfig): super(config)
constructor(config: QueueConfig): super(config) {
executorPool = JobQueueExecutorPool(ExecutorPoolConfig(
channelProvider = ::dispatchQueue,
queueName = config.jobQueueName,
handlers = handlers,
poolSize = config.executor.workers,
maxJobTime = config.executor.maxJobExecutionTime,
threadFactory = null,
failureChecker = config.executor.getOrCreateFailureEnforcer(),
shutdownCB = ::abort,
timeoutCB = config.executor.jobTimeoutCallback,
))

executorPool.start()
}

/**
* Instantiates a new QueueWorker using the given action to configure the
* RabbitMQ connections.
*
* @param action Action used to configure the RabbitMQ connections.
*/
constructor(action: QueueConfig.() -> Unit): super(action)
constructor(action: QueueConfig.() -> Unit): super(action) {
executorPool = JobQueueExecutorPool(ExecutorPoolConfig(
channelProvider = ::dispatchQueue,
queueName = config.jobQueueName,
handlers = handlers,
poolSize = config.executor.workers,
maxJobTime = config.executor.maxJobExecutionTime,
threadFactory = null,
failureChecker = config.executor.getOrCreateFailureEnforcer(),
shutdownCB = ::abort,
timeoutCB = config.executor.jobTimeoutCallback,
))

executorPool.start()
}

/**
* Registers a callback to be executed when a new job is submitted to the
Expand Down Expand Up @@ -67,27 +96,12 @@ class QueueWorker : QueueWrapper {
withSuccessQueue { publish(successQueueName, msg) }
}

/**
* Initializes the job queue callback.
*/
override fun initCallbacks() {
withDispatchQueue {
basicConsume(
dispatchQueueName,
false,
DeliverCallback { _, msg ->
Log.debug("handling job message {}", msg.envelope.deliveryTag)
workers.execute {
try {
handlers.execute(JobDispatch.fromJson(Json.from(msg.body)))
} finally {
Log.debug("acknowledging job message {}", msg.envelope.deliveryTag)
basicAck(msg.envelope.deliveryTag, false)
}
}
},
CancelCallback { }
)
}
fun shutdown(blocking: Boolean = true) {
executorPool.stop(blocking)
}

private fun abort() {
Log.info("closing connection to RabbitMQ")
connection.abort()
}
}
149 changes: 0 additions & 149 deletions src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueConfig.kt

This file was deleted.

Loading

0 comments on commit a4dca47

Please sign in to comment.