From e58c3eb5c531e991a0473666cce0d4592fb3e6ac Mon Sep 17 00:00:00 2001 From: Elizabeth Paige Harper Date: Mon, 17 Jun 2024 15:32:57 -0400 Subject: [PATCH 1/4] handle rabbitmq client closing --- build.gradle.kts | 6 +- makefile | 8 +- rabbitmq.conf | 2 + readme.adoc | 2 +- ...eueDispatcher.kt => JobQueueDispatcher.kt} | 19 +- .../{QueueWorker.kt => JobQueueExecutor.kt} | 72 +++--- .../veupathdb/lib/rabbit/jobs/QueueConfig.kt | 149 ----------- .../veupathdb/lib/rabbit/jobs/QueueWrapper.kt | 84 +++---- .../rabbit/jobs/config/AnyFailureEnforcer.kt | 9 + .../rabbit/jobs/config/ConnectionConfig.kt | 47 ++++ .../lib/rabbit/jobs/config/ExecutorConfig.kt | 35 +++ .../jobs/config/ExecutorFailureEnforcer.kt | 34 +++ .../jobs/config/ExecutorFailurePolicy.kt | 70 ++++++ .../rabbit/jobs/config/MaxFailureEnforcer.kt | 9 + .../lib/rabbit/jobs/config/QueueConfig.kt | 230 +++++++++++++++++ .../jobs/config/WindowedFailureEnforcer.kt | 15 ++ .../rabbit/jobs/model/ExecutorPoolState.kt | 4 + .../lib/rabbit/jobs/model/PoolState.kt | 6 + .../rabbit/jobs/pools/exec/ChannelProvider.kt | 6 + .../lib/rabbit/jobs/pools/exec/Executor.kt | 97 ++++++++ .../jobs/pools/exec/ExecutorPoolConfig.kt | 63 +++++ .../jobs/pools/exec/JobQueueExecutorPool.kt | 231 ++++++++++++++++++ .../lib/rabbit/jobs/utils/ScrollingCounter.kt | 34 +++ test/client/.dockerignore | 2 + test/client/build.gradle.kts | 8 +- test/client/src/main/kotlin/main.kt | 23 +- test/client/src/main/resources/log4j2.xml | 13 + test/docker-compose.yml | 8 +- test/server/.dockerignore | 2 + test/server/build.gradle.kts | 7 +- test/server/src/main/kotlin/main.kt | 20 +- test/server/src/main/resources/log4j2.xml | 13 + 32 files changed, 1059 insertions(+), 269 deletions(-) create mode 100644 rabbitmq.conf rename src/main/kotlin/org/veupathdb/lib/rabbit/jobs/{QueueDispatcher.kt => JobQueueDispatcher.kt} (83%) rename src/main/kotlin/org/veupathdb/lib/rabbit/jobs/{QueueWorker.kt => JobQueueExecutor.kt} (53%) delete mode 100644 src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueConfig.kt create mode 100644 src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/AnyFailureEnforcer.kt create mode 100644 src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ConnectionConfig.kt create mode 100644 src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorConfig.kt create mode 100644 src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorFailureEnforcer.kt create mode 100644 src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorFailurePolicy.kt create mode 100644 src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/MaxFailureEnforcer.kt create mode 100644 src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/QueueConfig.kt create mode 100644 src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/WindowedFailureEnforcer.kt create mode 100644 src/main/kotlin/org/veupathdb/lib/rabbit/jobs/model/ExecutorPoolState.kt create mode 100644 src/main/kotlin/org/veupathdb/lib/rabbit/jobs/model/PoolState.kt create mode 100644 src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/ChannelProvider.kt create mode 100644 src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/Executor.kt create mode 100644 src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/ExecutorPoolConfig.kt create mode 100644 src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/JobQueueExecutorPool.kt create mode 100644 src/main/kotlin/org/veupathdb/lib/rabbit/jobs/utils/ScrollingCounter.kt create mode 100644 test/client/.dockerignore create mode 100644 test/client/src/main/resources/log4j2.xml create mode 100644 test/server/.dockerignore create mode 100644 test/server/src/main/resources/log4j2.xml diff --git a/build.gradle.kts b/build.gradle.kts index 4ce3a37..6b3519a 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -4,12 +4,12 @@ import org.jetbrains.kotlin.gradle.tasks.KotlinCompile plugins { `java-library` `maven-publish` - kotlin("jvm") version "1.9.20" - id("org.jetbrains.dokka") version "1.9.10" + kotlin("jvm") version "1.9.23" + id("org.jetbrains.dokka") version "1.9.20" } group = "org.veupathdb.lib" -version = "1.3.0" +version = "2.0.0" repositories { mavenCentral() diff --git a/makefile b/makefile index 414b9d3..744d808 100644 --- a/makefile +++ b/makefile @@ -4,10 +4,14 @@ nothing: .PHONY: end-to-end end-to-end: - @docker-compose -f test/docker-compose.yml build \ + @docker compose -f test/docker-compose.yml build \ --build-arg=GITHUB_USERNAME=$(shell grep 'gpr.user' ~/.gradle/gradle.properties | cut -d= -f2) \ --build-arg=GITHUB_TOKEN=$(shell grep 'gpr.key' ~/.gradle/gradle.properties | cut -d= -f2) - @docker-compose -f test/docker-compose.yml up | grep --color=always -v rabbit_1 + @docker compose -f test/docker-compose.yml up --no-attach rabbit + +.PHONY: kill-tests +kill-tests: + @docker compose -f test/docker-compose.yml down --rmi local -v .PHONY: docs docs: diff --git a/rabbitmq.conf b/rabbitmq.conf new file mode 100644 index 0000000..3b81b25 --- /dev/null +++ b/rabbitmq.conf @@ -0,0 +1,2 @@ +# 5 second timeout message ack timeout for error recovery testing. +consumer_timeout = 5000 \ No newline at end of file diff --git a/readme.adoc b/readme.adoc index 74c1db0..505c661 100644 --- a/readme.adoc +++ b/readme.adoc @@ -10,7 +10,7 @@ Client/server library for utilizing RabbitMQ as a job queue. .Gradle [source, kotlin] ---- - implementation("org.veupathdb.lib:rabbit-job-queue:1.0.0") + implementation("org.veupathdb.lib:rabbit-job-queue:2.0.0") ---- === Worker / Client diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueDispatcher.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/JobQueueDispatcher.kt similarity index 83% rename from src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueDispatcher.kt rename to src/main/kotlin/org/veupathdb/lib/rabbit/jobs/JobQueueDispatcher.kt index 4084131..bc2e3c4 100644 --- a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueDispatcher.kt +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/JobQueueDispatcher.kt @@ -3,6 +3,7 @@ package org.veupathdb.lib.rabbit.jobs import com.rabbitmq.client.CancelCallback import com.rabbitmq.client.DeliverCallback import org.slf4j.LoggerFactory +import org.veupathdb.lib.rabbit.jobs.config.QueueConfig import org.veupathdb.lib.rabbit.jobs.fn.ErrorHandler import org.veupathdb.lib.rabbit.jobs.fn.SuccessHandler import org.veupathdb.lib.rabbit.jobs.model.ErrorNotification @@ -11,11 +12,13 @@ import org.veupathdb.lib.rabbit.jobs.model.SuccessNotification import org.veupathdb.lib.rabbit.jobs.pools.ErrorHandlers import org.veupathdb.lib.rabbit.jobs.pools.SuccessHandlers import org.veupathdb.lib.rabbit.jobs.serialization.Json +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors /** * Job dispatcher. */ -class QueueDispatcher : QueueWrapper { +class JobQueueDispatcher : QueueWrapper { private val Log = LoggerFactory.getLogger(javaClass) @@ -23,9 +26,17 @@ class QueueDispatcher : QueueWrapper { private val successHandlers = SuccessHandlers() - constructor(config: QueueConfig): super(config) + private val workers: ExecutorService - constructor(action: QueueConfig.() -> Unit): super(action) + constructor(config: QueueConfig): super(config) { + workers = Executors.newFixedThreadPool(config.workers) + initCallbacks() + } + + constructor(action: QueueConfig.() -> Unit): super(action) { + workers = Executors.newFixedThreadPool(config.workers) + initCallbacks() + } /** * Registers a callback to be executed on job success notification. @@ -57,7 +68,7 @@ class QueueDispatcher : QueueWrapper { withDispatchQueue { publish(dispatchQueueName, job) } } - override fun initCallbacks() { + private fun initCallbacks() { withErrorQueue { basicConsume( errorQueueName, diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueWorker.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/JobQueueExecutor.kt similarity index 53% rename from src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueWorker.kt rename to src/main/kotlin/org/veupathdb/lib/rabbit/jobs/JobQueueExecutor.kt index 7a9195f..872dcf0 100644 --- a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueWorker.kt +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/JobQueueExecutor.kt @@ -1,30 +1,44 @@ package org.veupathdb.lib.rabbit.jobs -import com.rabbitmq.client.CancelCallback -import com.rabbitmq.client.DeliverCallback import org.slf4j.LoggerFactory +import org.veupathdb.lib.rabbit.jobs.config.QueueConfig import org.veupathdb.lib.rabbit.jobs.fn.JobHandler import org.veupathdb.lib.rabbit.jobs.model.ErrorNotification -import org.veupathdb.lib.rabbit.jobs.model.JobDispatch import org.veupathdb.lib.rabbit.jobs.model.SuccessNotification +import org.veupathdb.lib.rabbit.jobs.pools.exec.ExecutorPoolConfig import org.veupathdb.lib.rabbit.jobs.pools.JobHandlers -import org.veupathdb.lib.rabbit.jobs.serialization.Json +import org.veupathdb.lib.rabbit.jobs.pools.exec.JobQueueExecutorPool /** - * Job executor end of the job queue. + * Job execution end of the job queue. */ -class QueueWorker : QueueWrapper { +class JobQueueExecutor : QueueWrapper { private val Log = LoggerFactory.getLogger(javaClass) private val handlers = JobHandlers() + private val executorPool: JobQueueExecutorPool + /** * Instantiates a new QueueWorker based on the given configuration. * * @param config Configuration for the RabbitMQ connections. */ - constructor(config: QueueConfig): super(config) + constructor(config: QueueConfig): super(config) { + executorPool = JobQueueExecutorPool(ExecutorPoolConfig( + channelProvider = ::dispatchQueue, + queueName = config.jobQueueName, + handlers = handlers, + poolSize = config.workers, + maxJobTime = config.maxJobExecutionTime, + threadFactory = null, + failureChecker = config.executorConfig.getOrCreateFailureEnforcer(), + shutdownCB = ::abort, + )) + + executorPool.start() + } /** * Instantiates a new QueueWorker using the given action to configure the @@ -32,7 +46,20 @@ class QueueWorker : QueueWrapper { * * @param action Action used to configure the RabbitMQ connections. */ - constructor(action: QueueConfig.() -> Unit): super(action) + constructor(action: QueueConfig.() -> Unit): super(action) { + executorPool = JobQueueExecutorPool(ExecutorPoolConfig( + channelProvider = ::dispatchQueue, + queueName = config.jobQueueName, + handlers = handlers, + poolSize = config.workers, + maxJobTime = config.maxJobExecutionTime, + threadFactory = null, + failureChecker = config.executorConfig.getOrCreateFailureEnforcer(), + shutdownCB = ::abort, + )) + + executorPool.start() + } /** * Registers a callback to be executed when a new job is submitted to the @@ -67,27 +94,12 @@ class QueueWorker : QueueWrapper { withSuccessQueue { publish(successQueueName, msg) } } - /** - * Initializes the job queue callback. - */ - override fun initCallbacks() { - withDispatchQueue { - basicConsume( - dispatchQueueName, - false, - DeliverCallback { _, msg -> - Log.debug("handling job message {}", msg.envelope.deliveryTag) - workers.execute { - try { - handlers.execute(JobDispatch.fromJson(Json.from(msg.body))) - } finally { - Log.debug("acknowledging job message {}", msg.envelope.deliveryTag) - basicAck(msg.envelope.deliveryTag, false) - } - } - }, - CancelCallback { } - ) - } + fun shutdown(blocking: Boolean = true) { + executorPool.stop(blocking) + } + + private fun abort() { + Log.info("closing connection to RabbitMQ") + connection.abort() } } \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueConfig.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueConfig.kt deleted file mode 100644 index 2a58306..0000000 --- a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueConfig.kt +++ /dev/null @@ -1,149 +0,0 @@ -package org.veupathdb.lib.rabbit.jobs - -/** - * RabbitMQ Queue Configuration - */ -class QueueConfig { - /** - * RabbitMQ Connection Hostname - */ - var hostname = "rabbit" - - /** - * RabbitMQ Authentication Username - */ - var username = "guest" - - /** - * RabbitMQ Authentication Password - */ - var password = "guest" - - /** - * RabbitMQ Connection Port - */ - var hostPort = 5672 - - /** - * RabbitMQ Connection Timeout - */ - var timeout = 5_000 - - /** - * Job Dispatch Queue Name - */ - var jobQueueName = "jobs" - - /** - * Job Failure Notification Queue Name - */ - var errorQueueName = "errors" - - /** - * Job Success Notification Queue Name - */ - var successQueueName = "successes" - - /** - * Callback Worker Count - * - * Number of worker threads used to handle incoming messages. - */ - var workers = 5 - - /** - * Configures the RabbitMQ hostname. - * - * @param host RabbitMQ hostname. - * - * @return This configuration. - */ - fun hostname(host: String): QueueConfig { - hostname = host - return this - } - - /** - * Configures the RabbitMQ authentication username. - * - * @param user RabbitMQ username. - * - * @return This configuration. - */ - fun username(user: String): QueueConfig { - username = user - return this - } - - /** - * Configures the RabbitMQ authentication password. - * - * @param pass RabbitMQ password. - * - * @return This configuration. - */ - fun password(pass: String): QueueConfig { - password = pass - return this - } - - /** - * Configures the RabbitMQ host port. - * - * @param port RabbitMQ port. - * - * @return This configuration. - */ - fun hostPort(port: Int): QueueConfig { - hostPort = port - return this - } - - /** - * Configures the RabbitMQ connection timeout. - * - * @param time Connection timeout. - * - * @return This configuration. - */ - fun timeout(time: Int): QueueConfig { - timeout = time - return this - } - - /** - * Configures the name of the job dispatch queue. - * - * @param name Job dispatch queue name. - * - * @return This configuration. - */ - fun jobQueueName(name: String): QueueConfig { - jobQueueName = name - return this - } - - /** - * Configures the name of the job error notification queue. - * - * @param name Error notification queue name. - * - * @return This configuration. - */ - fun errorQueueName(name: String): QueueConfig { - errorQueueName = name - return this - } - - /** - * Configures the name of the job success notification queue. - * - * @param name Success notification queue name. - * - * @return This configuration. - */ - fun successQueueName(name: String): QueueConfig { - successQueueName = name - return this - } -} \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueWrapper.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueWrapper.kt index 75e5d4e..d4c5606 100644 --- a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueWrapper.kt +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueWrapper.kt @@ -3,10 +3,9 @@ package org.veupathdb.lib.rabbit.jobs import com.rabbitmq.client.Channel import com.rabbitmq.client.Connection import com.rabbitmq.client.ConnectionFactory +import org.veupathdb.lib.rabbit.jobs.config.QueueConfig import org.veupathdb.lib.rabbit.jobs.serialization.JsonSerializable import java.nio.charset.StandardCharsets -import java.util.concurrent.ExecutorService -import java.util.concurrent.Executors /** * Base implementation of a queue worker or dispatcher. @@ -17,22 +16,12 @@ sealed class QueueWrapper { */ private val factory = ConnectionFactory() - protected val workers: ExecutorService + protected val config: QueueConfig /** * Open RabbitMQ Connection - * - * Connection will be refreshed if it has closed since last access. */ - protected var connection: Connection - get() { - if (!field.isOpen) { - field = factory.newConnection() - } - - return field - } - private set + protected val connection: Connection /** * Job Dispatch Queue Name @@ -55,16 +44,14 @@ sealed class QueueWrapper { * @param config Queue configuration used to configure this [QueueWrapper]. */ constructor(config: QueueConfig) { + this.config = config configure(config) + connection = factory.newConnection() - workers = Executors.newFixedThreadPool(config.workers) dispatchQueueName = config.jobQueueName errorQueueName = config.errorQueueName successQueueName = config.successQueueName - - @Suppress("LeakingThis") - initCallbacks() } /** @@ -74,18 +61,14 @@ sealed class QueueWrapper { * configure this [QueueWrapper]. */ constructor(action: QueueConfig.() -> Unit) { - val tmp = QueueConfig() - tmp.action() - configure(tmp) + config = QueueConfig() + config.action() + configure(config) connection = factory.newConnection() - workers = Executors.newFixedThreadPool(tmp.workers) - dispatchQueueName = tmp.jobQueueName - errorQueueName = tmp.errorQueueName - successQueueName = tmp.successQueueName - - @Suppress("LeakingThis") - initCallbacks() + dispatchQueueName = config.jobQueueName + errorQueueName = config.errorQueueName + successQueueName = config.successQueueName } /** @@ -107,14 +90,18 @@ sealed class QueueWrapper { * Initializes the job dispatch queue if it is not already initialized. */ protected open fun Channel.initDispatchQueue() { - queueDeclare(dispatchQueueName, true, false, false, emptyMap()) + queueDeclare( + /* queue = */ dispatchQueueName, + /* durable = */ true, + /* exclusive = */ false, + /* autoDelete = */ false, + /* arguments = */ mapOf( + "x-consumer-timeout" to (config.executorConfig.maxJobExecutionTime * 1.5).inWholeMilliseconds + ), + ) } - /** - * Internal inline channel usage. - */ - protected inline fun withChannel(action: Channel.() -> Unit) = - with(connection.createChannel()) { action() } + protected fun dispatchQueue(): Channel = connection.createChannel().also { it.initDispatchQueue() } /** * Executes the given action against the job dispatch queue in a thread safe @@ -122,11 +109,9 @@ sealed class QueueWrapper { * * @param action Action to execute against the job dispatch queue. */ - protected inline fun withDispatchQueue(action: Channel.() -> Unit) = - withChannel { - initDispatchQueue() - action() - } + protected inline fun withDispatchQueue(action: Channel.() -> Unit) = dispatchQueue().let(action) + + protected fun errorQueue(): Channel = connection.createChannel().also { it.initErrorQueue() } /** * Executes the give action against the error notification queue in a thread @@ -134,11 +119,9 @@ sealed class QueueWrapper { * * @param action Action to execute against the error notification queue. */ - protected inline fun withErrorQueue(action: Channel.() -> Unit) = - withChannel { - initErrorQueue() - action() - } + protected inline fun withErrorQueue(action: Channel.() -> Unit) = errorQueue().let(action) + + protected fun successQueue(): Channel = connection.createChannel().also { it.initSuccessQueue() } /** * Executes the given action against the success notification queue in a @@ -146,11 +129,7 @@ sealed class QueueWrapper { * * @param action Action to execute */ - protected inline fun withSuccessQueue(action: Channel.() -> Unit) = - withChannel { - initSuccessQueue() - action() - } + protected inline fun withSuccessQueue(action: Channel.() -> Unit) = successQueue().let(action) /** * Publishes a message to the given [Channel]. @@ -167,11 +146,6 @@ sealed class QueueWrapper { ) } - /** - * Initialize queue callbacks. - */ - protected abstract fun initCallbacks() - /** * Configures the RabbitMQ [ConnectionFactory] based on the settings in the * given [QueueConfig]. @@ -183,6 +157,6 @@ sealed class QueueWrapper { factory.username = config.username factory.password = config.password factory.port = config.hostPort - factory.connectionTimeout = config.timeout + factory.connectionTimeout = config.timeout.inWholeMilliseconds.toInt() } } diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/AnyFailureEnforcer.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/AnyFailureEnforcer.kt new file mode 100644 index 0000000..0149ed1 --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/AnyFailureEnforcer.kt @@ -0,0 +1,9 @@ +package org.veupathdb.lib.rabbit.jobs.config + +internal class AnyFailureEnforcer(policies: Array) : ExecutorFailureEnforcer { + private val enforcers = policies.map(ExecutorFailurePolicy::newEnforcer) + + override fun markFailure() = enforcers.forEach(ExecutorFailureEnforcer::markFailure) + override fun shouldHalt() = enforcers.any(ExecutorFailureEnforcer::shouldHalt) + override fun reason() = enforcers.first(ExecutorFailureEnforcer::shouldHalt).reason() +} \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ConnectionConfig.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ConnectionConfig.kt new file mode 100644 index 0000000..774eaa3 --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ConnectionConfig.kt @@ -0,0 +1,47 @@ +package org.veupathdb.lib.rabbit.jobs.config + +import com.rabbitmq.client.ConnectionFactory +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds + +class ConnectionConfig { + /** + * RabbitMQ Connection Hostname + */ + var hostname: String = "rabbit" + set(value) { + if (hostname.isBlank()) + throw IllegalArgumentException("RabbitMQ hostname cannot be blank") + field = value + } + + /** + * RabbitMQ Authentication Username + */ + var username: String = "guest" + + /** + * RabbitMQ Authentication Password + */ + var password: String = "guest" + + /** + * RabbitMQ Connection Port + */ + var hostPort: Int = 5672 + set(value) { + if (value !in 1 .. 65535) + throw IllegalArgumentException("invalid port number $value") + field = value + } + + /** + * RabbitMQ Connection Timeout + */ + var timeout: Duration = 5.seconds + + /** + * Connection Factory + */ + var connectionFactory: ConnectionFactory = ConnectionFactory() +} diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorConfig.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorConfig.kt new file mode 100644 index 0000000..b04e2ba --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorConfig.kt @@ -0,0 +1,35 @@ +package org.veupathdb.lib.rabbit.jobs.config + +import kotlin.time.Duration +import kotlin.time.Duration.Companion.minutes +import kotlin.time.Duration.Companion.seconds + +class ExecutorConfig { + /** + * Configures the number of job execution worker threads. + */ + var workers: Int = 5 + set(value) { + if (value < 1) + throw IllegalArgumentException("cannot set worker count to a value less than 1") + + field = value + } + + /** + * Max allowed execution time for a single job. + * + * If a job exceeds this time it will be forceably terminated. + */ + var maxJobExecutionTime: Duration = 15.minutes + + /** + * Failure policy defining the circumstances in which the target + * [JobQueueExecutor][org.veupathdb.lib.rabbit.jobs.JobQueueExecutor] should + * shut down without attempting recovery. + */ + var failurePolicy: ExecutorFailurePolicy? = null + + internal fun getOrCreateFailureEnforcer() = + failurePolicy?.newEnforcer() ?: WindowedFailureEnforcer((workers * 2).toUInt(), 2.seconds) +} \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorFailureEnforcer.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorFailureEnforcer.kt new file mode 100644 index 0000000..eb25374 --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorFailureEnforcer.kt @@ -0,0 +1,34 @@ +package org.veupathdb.lib.rabbit.jobs.config + +/** + * Executor Failure State Shutdown Enforcer + * + * Tracks failures and determines when an executor should be considered as + * "failed" and shut down. + * + * @author Elizabeth Paige Harper [foxcapades.io@gmail.com] + * @since 2.0.0 + */ +interface ExecutorFailureEnforcer { + /** + * Called when a channel or consumer is killed due to an unhandled exception. + */ + fun markFailure() + + /** + * Called to test whether the containing executor should be halted according + * to the rules defined in this enforcer. + * + * @return `true` if the executor should be halted, otherwise `false`. + */ + fun shouldHalt(): Boolean + + /** + * Returns the failure reason for this enforcer. + * + * This method will only be called after [shouldHalt] has returned `true`. + * + * @return A message describing the reason the executor is being halted. + */ + fun reason(): String +} \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorFailurePolicy.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorFailurePolicy.kt new file mode 100644 index 0000000..a27d48d --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorFailurePolicy.kt @@ -0,0 +1,70 @@ +package org.veupathdb.lib.rabbit.jobs.config + +import kotlin.time.Duration + +enum class ExecutorFailureResponse { + GracefulStop, + ImmediateStop, +} + +/** + * Defines a policy that itself defines the state or point at which an executor + * should be considered "failed" and shut down. + * + * This type is effectively a factory for [ExecutorFailureEnforcer] instances + * which will individually be used to enforce the rules of the policy. + * + * @author Elizabeth Paige Harper [foxcapades.io@gmail.com] + * @since 2.0.0 + */ +fun interface ExecutorFailurePolicy { + /** + * Creates a new [ExecutorFailureEnforcer] instance. + */ + fun newEnforcer(): ExecutorFailureEnforcer + + companion object { + /** + * Defines a new failure policy that considers an executor as failed when + * channels or consumers are unexpectedly killed more than [max] times + * within the time window defined by [within]. + * + * This may be used to catch instances where consumers are spinning up and + * failing immediately, possibly indicating a persistent issue that will not + * be resolved automatically. + * + * @param max Max permissible number of times consumers may be unexpectedly + * killed within the defined time window. + * + * @param within Defines the time window within which at most [max] + * unexpected consumer deaths may occur. + * + * @return A new [ExecutorFailurePolicy] instance. + */ + fun maxFailuresWithin(max: Int, within: Duration) = ExecutorFailurePolicy { WindowedFailureEnforcer(max.toUInt(), within) } + + /** + * Defines a new failure policy that considers an executor as failed when + * channels or consumers are unexpectedly killed more than [max] times total + * within the lifespan of the executor. + * + * @param max Max permissible number of times consumers may be unexpectedly + * killed. + * + * @return A new [ExecutorFailurePolicy] instance. + */ + fun maxTotalFailures(max: Int) = ExecutorFailurePolicy { MaxFailureEnforcer(max.toUInt()) } + + /** + * Defines a new failure policy that wraps other policies and considers an + * executor as failed when any of the sub policies consider the executor as + * failed. + * + * @param others Policies to apply. + * + * @return A new [ExecutorFailurePolicy] instance. + */ + fun ofAny(vararg others: ExecutorFailurePolicy) = ExecutorFailurePolicy { AnyFailureEnforcer(others) } + } +} + diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/MaxFailureEnforcer.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/MaxFailureEnforcer.kt new file mode 100644 index 0000000..c62ce2a --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/MaxFailureEnforcer.kt @@ -0,0 +1,9 @@ +package org.veupathdb.lib.rabbit.jobs.config + +internal class MaxFailureEnforcer(private val maxFailures: UInt) : ExecutorFailureEnforcer { + private var failureCount = 0u + + override fun markFailure() { failureCount++ } + override fun shouldHalt() = failureCount > maxFailures + override fun reason() = "process exceeded $maxFailures total failures" +} \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/QueueConfig.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/QueueConfig.kt new file mode 100644 index 0000000..f813c01 --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/QueueConfig.kt @@ -0,0 +1,230 @@ +package org.veupathdb.lib.rabbit.jobs.config + +import kotlin.time.Duration + +/** + * RabbitMQ Queue Configuration + */ +class QueueConfig { + + // region Connection + + /** + * RabbitMQ connection configuration. + */ + var connectionConfig: ConnectionConfig = ConnectionConfig() + + /** + * RabbitMQ connection configuration. + * + * @param connectionConfig New connection configuration object. + * + * @return This configuration. + */ + fun connectionConfig(connectionConfig: ConnectionConfig) = also { it.connectionConfig = connectionConfig } + + /** + * Executes the given function on the current [connectionConfig] value. + * + * @param fn Function to execute on the current [connectionConfig] value. + * + * @return This configuration. + */ + inline fun connectionConfig(fn: ConnectionConfig.() -> Unit) = also { it.connectionConfig.fn() } + + + /** + * See [ConnectionConfig.hostname]. + */ + inline var hostname + get() = connectionConfig.hostname + set(value) { connectionConfig.hostname = value } + + /** + * See [ConnectionConfig.hostname]. + * + * @param host RabbitMQ hostname. + * + * @return This configuration. + */ + fun hostname(host: String) = apply { connectionConfig.hostname = host } + + + /** + * See [ConnectionConfig.username]. + */ + inline var username + get() = connectionConfig.username + set(value) { connectionConfig.username = value } + + /** + * See [ConnectionConfig.username]. + * + * @param user RabbitMQ username. + * + * @return This configuration. + */ + fun username(user: String) = apply { connectionConfig.username = user } + + + /** + * See [ConnectionConfig.password]. + */ + inline var password + get() = connectionConfig.password + set(value) { connectionConfig.password = value } + + /** + * See [ConnectionConfig.password]. + * + * @param pass RabbitMQ password. + * + * @return This configuration. + */ + fun password(pass: String) = apply { connectionConfig.password = pass } + + + /** + * See [ConnectionConfig.hostPort]. + */ + inline var hostPort + get() = connectionConfig.hostPort + set(value) { connectionConfig.hostPort = value } + + /** + * See [ConnectionConfig.hostPort]. + * + * See [ConnectionConfig.hostPort]. + * + * @return This configuration. + */ + fun hostPort(port: Int) = apply { connectionConfig.hostPort = port } + + + /** + * See [ConnectionConfig.timeout]. + */ + inline var timeout + get() = connectionConfig.timeout + set(value) { connectionConfig.timeout = value } + + /** + * See [ConnectionConfig.timeout]. + * + * @param time Connection timeout. + * + * @return This configuration. + */ + fun timeout(time: Duration) = apply { connectionConfig.timeout = time } + + // endregion Connection + + // region Queue Names + + /** + * Job Dispatch Queue Name + */ + var jobQueueName = "jobs" + + /** + * Configures the name of the job dispatch queue. + * + * @param name Job dispatch queue name. + * + * @return This configuration. + */ + fun jobQueueName(name: String) = apply { jobQueueName = name } + + + /** + * Job Failure Notification Queue Name + */ + var errorQueueName = "errors" + + /** + * Configures the name of the job error notification queue. + * + * @param name Error notification queue name. + * + * @return This configuration. + */ + fun errorQueueName(name: String) = apply { errorQueueName = name } + + + /** + * Job Success Notification Queue Name + */ + var successQueueName = "successes" + + /** + * Configures the name of the job success notification queue. + * + * @param name Success notification queue name. + * + * @return This configuration. + */ + fun successQueueName(name: String) = apply { successQueueName = name } + + // endregion Queue Names + + // region Executor + + var executorConfig: ExecutorConfig = ExecutorConfig() + + fun executorConfig(executorConfig: ExecutorConfig) = also { it.executorConfig = executorConfig } + + inline fun executorConfig(fn: ExecutorConfig.() -> Unit) = also { it.executorConfig.fn() } + + + /** + * See [ExecutorConfig.workers]. + */ + inline var workers + get() = executorConfig.workers + set(value) { executorConfig.workers = value } + + /** + * See [ExecutorConfig.workers]. + * + * @param value Number of workers + * + * @return This configuration. + */ + fun workers(value: Int) = apply { executorConfig.workers = value } + + + /** + * See [ExecutorConfig.maxJobExecutionTime]. + */ + inline var maxJobExecutionTime + get() = executorConfig.maxJobExecutionTime + set(value) { executorConfig.maxJobExecutionTime = value } + + /** + * See [ExecutorConfig.maxJobExecutionTime]. + * + * @param duration Max duration a job will be permitted to run for. + * + * @return This configuration. + */ + fun maxJobExecutionTime(duration: Duration) = apply { executorConfig.maxJobExecutionTime = duration } + + + /** + * See [ExecutorConfig.failurePolicy]. + */ + inline var executorFailurePolicy + get() = executorConfig.failurePolicy + set(value) { executorConfig.failurePolicy = value } + + /** + * See [ExecutorConfig.failurePolicy]. + * + * @param policy Failure policy to use. + * + * @return This configuration. + */ + fun executorFailurePolicy(policy: ExecutorFailurePolicy) = apply { executorConfig.failurePolicy = policy } + + // endregion Executor +} diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/WindowedFailureEnforcer.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/WindowedFailureEnforcer.kt new file mode 100644 index 0000000..bf271a3 --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/WindowedFailureEnforcer.kt @@ -0,0 +1,15 @@ +package org.veupathdb.lib.rabbit.jobs.config + +import org.veupathdb.lib.rabbit.jobs.utils.ScrollingCounter +import kotlin.time.Duration + +internal class WindowedFailureEnforcer( + private val maxFailures: UInt, + private val within: Duration, +) : ExecutorFailureEnforcer { + private val counter = ScrollingCounter(within) + + override fun markFailure() = counter.inc() + override fun shouldHalt() = counter > maxFailures + override fun reason() = "process exceeded $maxFailures within $within" +} \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/model/ExecutorPoolState.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/model/ExecutorPoolState.kt new file mode 100644 index 0000000..c247fec --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/model/ExecutorPoolState.kt @@ -0,0 +1,4 @@ +package org.veupathdb.lib.rabbit.jobs.model + +sealed interface ExecutorPoolState + diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/model/PoolState.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/model/PoolState.kt new file mode 100644 index 0000000..bec79cd --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/model/PoolState.kt @@ -0,0 +1,6 @@ +package org.veupathdb.lib.rabbit.jobs.model + +internal data class PoolState( + var totalFailures: UInt, + var windowedFailures: UInt, +) : ExecutorPoolState \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/ChannelProvider.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/ChannelProvider.kt new file mode 100644 index 0000000..c48084c --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/ChannelProvider.kt @@ -0,0 +1,6 @@ +package org.veupathdb.lib.rabbit.jobs.pools.exec + +import com.rabbitmq.client.Channel + +internal typealias ChannelProvider = () -> Channel + diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/Executor.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/Executor.kt new file mode 100644 index 0000000..93bf447 --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/Executor.kt @@ -0,0 +1,97 @@ +package org.veupathdb.lib.rabbit.jobs.pools.exec + +import com.rabbitmq.client.Channel +import com.rabbitmq.client.ConsumerShutdownSignalCallback +import org.slf4j.LoggerFactory +import org.veupathdb.lib.rabbit.jobs.model.JobDispatch +import org.veupathdb.lib.rabbit.jobs.pools.JobHandlers +import org.veupathdb.lib.rabbit.jobs.serialization.Json +import java.util.concurrent.* +import kotlin.time.Duration + +/** + * Single 'worker' wrapping a RabbitMQ consumer client whose purpose is to parse + * incoming job config messages and fire jobs onto an external thread pool for + * each message. + * + * Each executor runs a single job at a time and awaits the result of that job + * before acknowledging the job config message and moving on to the next + * available message. + * + * Additionally, via the [next] and [prev] values, `Executor` instances act as + * nodes in a linked list, allowing dead nodes to be pruned from the executor + * pool efficiently. + * + * @author Elizabeth Paige Harper [foxcapades.io@gmail.com] + * @since 2.0.0 + */ +internal class Executor( + /** + * Identifier string for this executor. + * + * This value is used for logging. + */ + val id: String, + + /** + * Channel this executor should subscribe to. + */ + private val channel: Channel, + + /** + * Job execution handlers. + */ + private val handlers: JobHandlers, + + /** + * Max allowed job execution time. + */ + private val jobTimeout: Duration, + + /** + * Callback used to submit jobs to a worker pool. + */ + private val submitJobFn: (Runnable) -> Future<*> +) { + private val log = LoggerFactory.getLogger(javaClass) + + var prev: Executor? = null + + var next: Executor? = null + + fun init(queue: String, shutdown: ConsumerShutdownSignalCallback) { + channel.basicConsume( + queue, + false, + { _, msg -> + if (!channel.isOpen) { + log.error("consumer '{}' cannot execute job for message {}, channel is closed!", id, msg.envelope.deliveryTag) + return@basicConsume + } + + log.debug("consumer '{}' executing job for message {}", id, msg.envelope.deliveryTag) + + val fut = submitJobFn { handlers.execute(JobDispatch.fromJson(Json.from(msg.body))) } + + // Wait for {jobTimeout} at most before killing the job and + // acknowledging the message + try { + fut.get(jobTimeout.inWholeSeconds, TimeUnit.SECONDS) + log.debug("acknowledging job message {}", msg.envelope.deliveryTag) + channel.basicAck(msg.envelope.deliveryTag, false) + } catch (e: TimeoutException) { + log.warn("consumer '{}' killing job for message {} for taking longer than {}", id, msg.envelope.deliveryTag, jobTimeout) + fut.cancel(true) + channel.basicAck(msg.envelope.deliveryTag, false) + } + }, + { }, + shutdown, + ) + } + + fun stop() { + log.debug("closing channel for consumer {}", id) + try { channel.close() } catch (e: Throwable) { /* do nothing */ } + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/ExecutorPoolConfig.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/ExecutorPoolConfig.kt new file mode 100644 index 0000000..890ef5b --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/ExecutorPoolConfig.kt @@ -0,0 +1,63 @@ +package org.veupathdb.lib.rabbit.jobs.pools.exec + +import org.veupathdb.lib.rabbit.jobs.config.ExecutorFailureEnforcer +import org.veupathdb.lib.rabbit.jobs.pools.JobHandlers +import java.util.concurrent.ThreadFactory +import kotlin.time.Duration + +/** + * Configuration values for a [JobQueueExecutorPool] instance. + * + * @author Elizabeth Paige Harper [foxcapades.io@gmail.com] + * @since 2.0.0 + */ +internal data class ExecutorPoolConfig( + /** + * Function used to get new RabbitMQ `Channel` instances to bind newly created + * [Executor]s to. + */ + val channelProvider: ChannelProvider, + + /** + * Name of the queue [Executor]s in the target pool should subscribe to. + */ + val queueName: String, + + /** + * Job execution handlers. + */ + val handlers: JobHandlers, + + /** + * Number of [Executor]s that should be kept in the target pool. + */ + val poolSize: Int, + + /** + * Max allowed job execution time. + * + * Jobs will be killed if they exceed this time limit. + */ + val maxJobTime: Duration, + + /** + * Thread factory to use in the [JobQueueExecutorPool]'s internal thread pool. + * + * If this value is set to null, a default thread factory will be used. + */ + val threadFactory: ThreadFactory?, + + /** + * Executor failure checker. + * + * Used to determine when the target executor pool has reached a 'failed' + * state. + */ + val failureChecker: ExecutorFailureEnforcer, + + /** + * Shutdown callback. Used to trigger a connection close on critical worker + * pool failure. + */ + val shutdownCB: () -> Unit, +) \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/JobQueueExecutorPool.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/JobQueueExecutorPool.kt new file mode 100644 index 0000000..1881639 --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/JobQueueExecutorPool.kt @@ -0,0 +1,231 @@ +package org.veupathdb.lib.rabbit.jobs.pools.exec + +import com.rabbitmq.client.Consumer +import com.rabbitmq.client.ConsumerShutdownSignalCallback +import org.slf4j.LoggerFactory +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.time.Duration +import kotlin.time.Duration.Companion.days + +/** + * Job Queue Executor Pool + * + * Contains a pool of job execution workers subscribed to a RabbitMQ queue + * defined by the given configuration params. + * + * Each executor in the pool operates with its own [Consumer] instance on its + * own channel provided by the given [ChannelProvider]. + * + * If an executor's consumer dies, the executor will be replaced. + * + * ```kotlin + * // Create a new executor pool + * val executor = JobQueueExecutorPool(config) + * + * // Start the executor pool's workers + * executor.start() + * + * // Gracefully stop the executor pool + * executor.stop() + * + * // Immediately stop the executor pool (interrupting any running jobs) + * executor.stopNow() + * ``` + * + * @author Elizabeth Paige Harper [foxcapades.io@gmail.com] + * @since 2.0.0 + */ +internal class JobQueueExecutorPool(private val config: ExecutorPoolConfig) { + private val logger = LoggerFactory.getLogger(javaClass) + + private val closing = AtomicBoolean(false) + private val name = "${config.queueName}-consumer-pool" + private val threads = Executors.newFixedThreadPool(config.poolSize) + + private var head: Executor? = null + private var tail: Executor? = null + private var counter = 0u + + init { + if (config.poolSize !in 1..64) + throw IllegalArgumentException("invalid size value ${config.poolSize}, must be in the range [1, 64]") + } + + /** + * Starts the executor pool's workers. + * + * Until this method is called, the worker pool will not receive or react to + * any job queue messages. + */ + fun start() { + for (i in 0 ..< config.poolSize) + add(createNew().also { it.init(config.queueName, onConsumerShutdown(it)) }) + } + + /** + * Attempts to gracefully shut down the executor pool. + * + * If [block] is `true`, this method will block the current thread until the + * [killAfter] duration has been reached, at which point the executor pool + * will request a force-shutdown of the underlying thread pool, aborting any + * jobs still in progress at that point. If [blockAfterKill] is also `true` + * at this point, the method will continue to block after the force-shutdown + * request until the last job has ended. + * + * If [block] is `false`, this method will not block. It will request the + * underlying thread pool shut down, then return immediately. In this case + * the values of [killAfter] and [blockAfterKill] are ignored. + * + * @param block Whether this method should block the current thread until the + * underlying thread pool has gracefully shut down. + * + * @param killAfter Max duration to wait for a graceful shutdown before + * attempting to abort remaining in-progress job executions. + * + * @param blockAfterKill Whether this method should continue to block after + * the [killAfter] duration has passed. + */ + fun stop( + block: Boolean = false, + killAfter: Duration = 10_000.days, + blockAfterKill: Boolean = true, + ): StopCode { + stopExecutors() + threads.shutdown() + + if (!block) + return if (threads.isTerminated) StopCode.Graceful else StopCode.Unknown + + if (threads.awaitTermination(killAfter.inWholeMilliseconds, TimeUnit.MILLISECONDS)) + return StopCode.Graceful + + threads.shutdownNow() + silently(config.shutdownCB) + if (blockAfterKill) + threads.awaitTermination(243256, TimeUnit.DAYS) + + return StopCode.Forced + } + + @JvmInline value class StopCode private constructor(private val value: Int) { + companion object { + val Unknown = StopCode(0) + val Graceful = StopCode(1) + val Forced = StopCode(2) + } + } + + /** + * Shuts down the executor pool by force, aborting any currently running jobs. + * + * This method may optionally block until the shutdown has completed. + * + * @param block Whether this method should block until the shutdown has + * completed. + */ + fun stopNow(block: Boolean = true): StopCode { + stopExecutors() + threads.shutdownNow() + silently(config.shutdownCB) + + if (block) + threads.awaitTermination(243256, TimeUnit.DAYS) + + return StopCode.Forced + } + + private fun stopExecutors() { + synchronized(this) { + if (closing.get()) + return + + closing.set(true) + + var next = head + head = null + tail = null + + while (next != null) { + val tn = next.next + + next.prev = null + next.next = null + next.stop() + + next = tn + } + } + } + + private fun createNew(): Executor { + counter++ + val name = "$name-$counter" + + logger.debug("creating new consumer: {}", name) + + return Executor( + id = name, + channel = config.channelProvider(), + handlers = config.handlers, + jobTimeout = config.maxJobTime, + submitJobFn = threads::submit + ) + } + + private fun onConsumerShutdown(ex: Executor): ConsumerShutdownSignalCallback { + return ConsumerShutdownSignalCallback { _, sig -> + if (sig.isInitiatedByApplication) { + logger.debug("received consumer {} shutdown signal", ex.id) + } else { + logger.warn("caught unexpected shutdown on consumer {}", ex.id) + config.failureChecker.markFailure() + if (config.failureChecker.shouldHalt()) { + val reason = config.failureChecker.reason() + logger.error("shutting down job queue executor: {}", reason) + + stopNow() + return@ConsumerShutdownSignalCallback + } + } + + remove(ex) + ex.stop() + + if (!closing.get()) + add(createNew().also { it.init(config.queueName, onConsumerShutdown(it)) }) + } + } + + private fun add(ex: Executor) { + if (head == null) { + head = ex + tail = ex + } else { + tail!!.next = ex + tail = ex + } + } + + private fun remove(ex: Executor) { + if (head === ex) { + head = ex.next + head?.prev = null + } else if (tail === ex) { + tail = ex.prev + tail?.next = null + } else { + ex.next?.prev = ex.prev + ex.prev?.next = ex.next + } + } + + private fun silently(fn: () -> Unit) { + try { + fn() + } catch (e: Throwable) { + // do nothing + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/utils/ScrollingCounter.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/utils/ScrollingCounter.kt new file mode 100644 index 0000000..189dce3 --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/utils/ScrollingCounter.kt @@ -0,0 +1,34 @@ +package org.veupathdb.lib.rabbit.jobs.utils + +import java.util.LinkedList +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds + +class ScrollingCounter(windowDuration: Duration = 5.seconds) { + private val entries = LinkedList() + private val maxDelta = windowDuration.inWholeMilliseconds + + fun inc() { entries.offer(System.currentTimeMillis()) } + + operator fun compareTo(value: Int) = compareTo(value.toUInt()) + operator fun compareTo(value: UInt) = count().compareTo(value) + + fun count(): UInt { + val cutoff = System.currentTimeMillis() - maxDelta + + while (entries.isNotEmpty() && entries.first < cutoff) + entries.pop() + + return entries.size.toUInt() + } + + override fun equals(other: Any?) = + when (other) { + is Int -> count() == other + is UInt -> count() == other + is ScrollingCounter -> other === this || count() == other.count() + else -> false + } + + override fun hashCode() = entries.hashCode() * 31 + maxDelta.hashCode() +} \ No newline at end of file diff --git a/test/client/.dockerignore b/test/client/.dockerignore new file mode 100644 index 0000000..bb88a77 --- /dev/null +++ b/test/client/.dockerignore @@ -0,0 +1,2 @@ +/test/server/ +/test/docker-compose.yml \ No newline at end of file diff --git a/test/client/build.gradle.kts b/test/client/build.gradle.kts index 02bdf3b..7c25afc 100644 --- a/test/client/build.gradle.kts +++ b/test/client/build.gradle.kts @@ -27,12 +27,16 @@ dependencies { implementation(kotlin("stdlib-jdk8")) implementation(rootProject) implementation("org.veupathdb.lib:hash-id:1.0.2") - implementation("com.fasterxml.jackson.core:jackson-databind:2.13.0") + + implementation("com.fasterxml.jackson.core:jackson-databind:2.17.1") + implementation("org.apache.logging.log4j:log4j-slf4j-impl:2.23.1") + implementation("org.apache.logging.log4j:log4j-api:2.23.1") + implementation("org.apache.logging.log4j:log4j-core:2.23.1") } kotlin { jvmToolchain { - (this as JavaToolchainSpec).languageVersion.set(JavaLanguageVersion.of(17)) + languageVersion.set(JavaLanguageVersion.of(17)) } } diff --git a/test/client/src/main/kotlin/main.kt b/test/client/src/main/kotlin/main.kt index 5c57a61..acef7aa 100644 --- a/test/client/src/main/kotlin/main.kt +++ b/test/client/src/main/kotlin/main.kt @@ -1,22 +1,25 @@ @file:JvmName("Main") -import com.fasterxml.jackson.databind.node.TextNode -import org.veupathdb.lib.hash_id.HashID -import org.veupathdb.lib.rabbit.jobs.QueueWorker -import org.veupathdb.lib.rabbit.jobs.model.ErrorNotification +import org.veupathdb.lib.rabbit.jobs.JobQueueExecutor +import org.veupathdb.lib.rabbit.jobs.config.ExecutorFailurePolicy import org.veupathdb.lib.rabbit.jobs.model.SuccessNotification +import kotlin.time.Duration.Companion.seconds fun main() { println("Sleeping for 10 seconds...") Thread.sleep(10_000) - val conFac = QueueWorker {} + val conFac = JobQueueExecutor { + executorFailurePolicy = ExecutorFailurePolicy.maxTotalFailures(1) + workers = 1 + maxJobExecutionTime = 15.seconds + } conFac.onJob { - print("Server: ") - println(it) - } + println("Server said: $it") - conFac.sendSuccess(SuccessNotification(HashID("0102030405060708090A0B0C0D0E0F10"))) - conFac.sendError(ErrorNotification(HashID("0102030405060708090A0B0C0D0E0F10"), 123, 0,"butts", TextNode("body"))) + Thread.sleep(20.seconds.inWholeMilliseconds) + + conFac.sendSuccess(SuccessNotification(it.jobID)) + } } \ No newline at end of file diff --git a/test/client/src/main/resources/log4j2.xml b/test/client/src/main/resources/log4j2.xml new file mode 100644 index 0000000..473e169 --- /dev/null +++ b/test/client/src/main/resources/log4j2.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/test/docker-compose.yml b/test/docker-compose.yml index 22efe5d..a72b148 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -10,6 +10,10 @@ services: context: ../ dockerfile: test/server/Dockerfile rabbit: - image: rabbitmq:3.9.11-management-alpine + image: rabbitmq:3.13.3-management-alpine ports: - - "15672:15672" \ No newline at end of file + - "15672:15672" + volumes: + - type: bind + source: ../rabbitmq.conf + target: /etc/rabbitmq/rabbitmq.conf diff --git a/test/server/.dockerignore b/test/server/.dockerignore new file mode 100644 index 0000000..d3849d4 --- /dev/null +++ b/test/server/.dockerignore @@ -0,0 +1,2 @@ +/test/client/ +/test/docker-compose.yml diff --git a/test/server/build.gradle.kts b/test/server/build.gradle.kts index 653f9ff..5fc85eb 100644 --- a/test/server/build.gradle.kts +++ b/test/server/build.gradle.kts @@ -27,12 +27,15 @@ dependencies { implementation(kotlin("stdlib-jdk8")) implementation(rootProject) implementation("org.veupathdb.lib:hash-id:1.1.0") - implementation("com.fasterxml.jackson.core:jackson-databind:2.13.3") + implementation("com.fasterxml.jackson.core:jackson-databind:2.17.1") + implementation("org.apache.logging.log4j:log4j-slf4j-impl:2.23.1") + implementation("org.apache.logging.log4j:log4j-api:2.23.1") + implementation("org.apache.logging.log4j:log4j-core:2.23.1") } kotlin { jvmToolchain { - (this as JavaToolchainSpec).languageVersion.set(JavaLanguageVersion.of(17)) + languageVersion.set(JavaLanguageVersion.of(17)) } } diff --git a/test/server/src/main/kotlin/main.kt b/test/server/src/main/kotlin/main.kt index e305dcc..09cc0ee 100644 --- a/test/server/src/main/kotlin/main.kt +++ b/test/server/src/main/kotlin/main.kt @@ -2,26 +2,28 @@ import com.fasterxml.jackson.databind.node.TextNode import org.veupathdb.lib.hash_id.HashID -import org.veupathdb.lib.rabbit.jobs.QueueDispatcher +import org.veupathdb.lib.rabbit.jobs.JobQueueDispatcher import org.veupathdb.lib.rabbit.jobs.model.JobDispatch fun main() { println("Sleeping for 10 seconds...") Thread.sleep(10_000) - val conFac = QueueDispatcher {} + val conFac = JobQueueDispatcher {} conFac.onSuccess { - println("Client Success: $it") + println("Success from client: $it") } conFac.onError { - print("Client Error: $it") + print("Error from client: $it") } - conFac.dispatch(JobDispatch( - HashID("01020304050607080102030405060708"), - TextNode("foo"), - "something", - )) + for (i in 1 .. 15) { + conFac.dispatch(JobDispatch( + HashID("01020304050607080102030405060701"), + TextNode("foo $i"), + "something", + )) + } } \ No newline at end of file diff --git a/test/server/src/main/resources/log4j2.xml b/test/server/src/main/resources/log4j2.xml new file mode 100644 index 0000000..473e169 --- /dev/null +++ b/test/server/src/main/resources/log4j2.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file From d5fdffdd604eb200b23c09e7d50f6e24ec2425fb Mon Sep 17 00:00:00 2001 From: Elizabeth Paige Harper Date: Mon, 17 Jun 2024 16:15:19 -0400 Subject: [PATCH 2/4] updates --- .../lib/rabbit/jobs/JobQueueDispatcher.kt | 4 +- .../lib/rabbit/jobs/JobQueueExecutor.kt | 14 +- .../veupathdb/lib/rabbit/jobs/QueueWrapper.kt | 12 +- .../lib/rabbit/jobs/config/ExecutorConfig.kt | 7 + .../jobs/config/ExecutorFailurePolicy.kt | 5 - .../lib/rabbit/jobs/config/QueueConfig.kt | 164 +----------------- .../rabbit/jobs/model/ExecutorPoolState.kt | 4 - .../lib/rabbit/jobs/model/PoolState.kt | 6 - .../lib/rabbit/jobs/pools/exec/Executor.kt | 23 ++- .../jobs/pools/exec/ExecutorPoolConfig.kt | 6 + .../jobs/pools/exec/JobQueueExecutorPool.kt | 3 +- test/client/src/main/kotlin/main.kt | 9 +- 12 files changed, 63 insertions(+), 194 deletions(-) delete mode 100644 src/main/kotlin/org/veupathdb/lib/rabbit/jobs/model/ExecutorPoolState.kt delete mode 100644 src/main/kotlin/org/veupathdb/lib/rabbit/jobs/model/PoolState.kt diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/JobQueueDispatcher.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/JobQueueDispatcher.kt index bc2e3c4..8757492 100644 --- a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/JobQueueDispatcher.kt +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/JobQueueDispatcher.kt @@ -29,12 +29,12 @@ class JobQueueDispatcher : QueueWrapper { private val workers: ExecutorService constructor(config: QueueConfig): super(config) { - workers = Executors.newFixedThreadPool(config.workers) + workers = Executors.newFixedThreadPool(config.executor.workers) initCallbacks() } constructor(action: QueueConfig.() -> Unit): super(action) { - workers = Executors.newFixedThreadPool(config.workers) + workers = Executors.newFixedThreadPool(config.executor.workers) initCallbacks() } diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/JobQueueExecutor.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/JobQueueExecutor.kt index 872dcf0..acc8af7 100644 --- a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/JobQueueExecutor.kt +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/JobQueueExecutor.kt @@ -30,11 +30,12 @@ class JobQueueExecutor : QueueWrapper { channelProvider = ::dispatchQueue, queueName = config.jobQueueName, handlers = handlers, - poolSize = config.workers, - maxJobTime = config.maxJobExecutionTime, + poolSize = config.executor.workers, + maxJobTime = config.executor.maxJobExecutionTime, threadFactory = null, - failureChecker = config.executorConfig.getOrCreateFailureEnforcer(), + failureChecker = config.executor.getOrCreateFailureEnforcer(), shutdownCB = ::abort, + timeoutCB = config.executor.jobTimeoutCallback, )) executorPool.start() @@ -51,11 +52,12 @@ class JobQueueExecutor : QueueWrapper { channelProvider = ::dispatchQueue, queueName = config.jobQueueName, handlers = handlers, - poolSize = config.workers, - maxJobTime = config.maxJobExecutionTime, + poolSize = config.executor.workers, + maxJobTime = config.executor.maxJobExecutionTime, threadFactory = null, - failureChecker = config.executorConfig.getOrCreateFailureEnforcer(), + failureChecker = config.executor.getOrCreateFailureEnforcer(), shutdownCB = ::abort, + timeoutCB = config.executor.jobTimeoutCallback, )) executorPool.start() diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueWrapper.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueWrapper.kt index d4c5606..1f8bee8 100644 --- a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueWrapper.kt +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueWrapper.kt @@ -96,7 +96,7 @@ sealed class QueueWrapper { /* exclusive = */ false, /* autoDelete = */ false, /* arguments = */ mapOf( - "x-consumer-timeout" to (config.executorConfig.maxJobExecutionTime * 1.5).inWholeMilliseconds + "x-consumer-timeout" to (config.executor.maxJobExecutionTime * 1.5).inWholeMilliseconds ), ) } @@ -153,10 +153,10 @@ sealed class QueueWrapper { * @param config Caller initialized RabbitMQ configuration properties. */ private fun configure(config: QueueConfig) { - factory.host = config.hostname - factory.username = config.username - factory.password = config.password - factory.port = config.hostPort - factory.connectionTimeout = config.timeout.inWholeMilliseconds.toInt() + factory.host = config.connection.hostname + factory.username = config.connection.username + factory.password = config.connection.password + factory.port = config.connection.hostPort + factory.connectionTimeout = config.connection.timeout.inWholeMilliseconds.toInt() } } diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorConfig.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorConfig.kt index b04e2ba..6e962fa 100644 --- a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorConfig.kt +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorConfig.kt @@ -1,5 +1,6 @@ package org.veupathdb.lib.rabbit.jobs.config +import org.veupathdb.lib.rabbit.jobs.model.JobDispatch import kotlin.time.Duration import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.seconds @@ -23,6 +24,12 @@ class ExecutorConfig { */ var maxJobExecutionTime: Duration = 15.minutes + /** + * Callback that will be executed for every job that is killed for exceeding + * the configured [maxJobExecutionTime] value. + */ + var jobTimeoutCallback: (JobDispatch) -> Unit = {} + /** * Failure policy defining the circumstances in which the target * [JobQueueExecutor][org.veupathdb.lib.rabbit.jobs.JobQueueExecutor] should diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorFailurePolicy.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorFailurePolicy.kt index a27d48d..edc12c0 100644 --- a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorFailurePolicy.kt +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/ExecutorFailurePolicy.kt @@ -2,11 +2,6 @@ package org.veupathdb.lib.rabbit.jobs.config import kotlin.time.Duration -enum class ExecutorFailureResponse { - GracefulStop, - ImmediateStop, -} - /** * Defines a policy that itself defines the state or point at which an executor * should be considered "failed" and shut down. diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/QueueConfig.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/QueueConfig.kt index f813c01..f2663ba 100644 --- a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/QueueConfig.kt +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/config/QueueConfig.kt @@ -1,18 +1,14 @@ package org.veupathdb.lib.rabbit.jobs.config -import kotlin.time.Duration - /** * RabbitMQ Queue Configuration */ class QueueConfig { - // region Connection - /** * RabbitMQ connection configuration. */ - var connectionConfig: ConnectionConfig = ConnectionConfig() + var connection: ConnectionConfig = ConnectionConfig() /** * RabbitMQ connection configuration. @@ -21,106 +17,18 @@ class QueueConfig { * * @return This configuration. */ - fun connectionConfig(connectionConfig: ConnectionConfig) = also { it.connectionConfig = connectionConfig } - - /** - * Executes the given function on the current [connectionConfig] value. - * - * @param fn Function to execute on the current [connectionConfig] value. - * - * @return This configuration. - */ - inline fun connectionConfig(fn: ConnectionConfig.() -> Unit) = also { it.connectionConfig.fn() } - - - /** - * See [ConnectionConfig.hostname]. - */ - inline var hostname - get() = connectionConfig.hostname - set(value) { connectionConfig.hostname = value } - - /** - * See [ConnectionConfig.hostname]. - * - * @param host RabbitMQ hostname. - * - * @return This configuration. - */ - fun hostname(host: String) = apply { connectionConfig.hostname = host } - - - /** - * See [ConnectionConfig.username]. - */ - inline var username - get() = connectionConfig.username - set(value) { connectionConfig.username = value } + fun connection(connectionConfig: ConnectionConfig) = also { it.connection = connectionConfig } /** - * See [ConnectionConfig.username]. + * Executes the given function on the current [connection] value. * - * @param user RabbitMQ username. + * @param fn Function to execute on the current [connection] value. * * @return This configuration. */ - fun username(user: String) = apply { connectionConfig.username = user } + inline fun connection(fn: ConnectionConfig.() -> Unit) = also { it.connection.fn() } - /** - * See [ConnectionConfig.password]. - */ - inline var password - get() = connectionConfig.password - set(value) { connectionConfig.password = value } - - /** - * See [ConnectionConfig.password]. - * - * @param pass RabbitMQ password. - * - * @return This configuration. - */ - fun password(pass: String) = apply { connectionConfig.password = pass } - - - /** - * See [ConnectionConfig.hostPort]. - */ - inline var hostPort - get() = connectionConfig.hostPort - set(value) { connectionConfig.hostPort = value } - - /** - * See [ConnectionConfig.hostPort]. - * - * See [ConnectionConfig.hostPort]. - * - * @return This configuration. - */ - fun hostPort(port: Int) = apply { connectionConfig.hostPort = port } - - - /** - * See [ConnectionConfig.timeout]. - */ - inline var timeout - get() = connectionConfig.timeout - set(value) { connectionConfig.timeout = value } - - /** - * See [ConnectionConfig.timeout]. - * - * @param time Connection timeout. - * - * @return This configuration. - */ - fun timeout(time: Duration) = apply { connectionConfig.timeout = time } - - // endregion Connection - - // region Queue Names - /** * Job Dispatch Queue Name */ @@ -165,66 +73,10 @@ class QueueConfig { */ fun successQueueName(name: String) = apply { successQueueName = name } - // endregion Queue Names - - // region Executor - - var executorConfig: ExecutorConfig = ExecutorConfig() - - fun executorConfig(executorConfig: ExecutorConfig) = also { it.executorConfig = executorConfig } - - inline fun executorConfig(fn: ExecutorConfig.() -> Unit) = also { it.executorConfig.fn() } - - - /** - * See [ExecutorConfig.workers]. - */ - inline var workers - get() = executorConfig.workers - set(value) { executorConfig.workers = value } - - /** - * See [ExecutorConfig.workers]. - * - * @param value Number of workers - * - * @return This configuration. - */ - fun workers(value: Int) = apply { executorConfig.workers = value } - - /** - * See [ExecutorConfig.maxJobExecutionTime]. - */ - inline var maxJobExecutionTime - get() = executorConfig.maxJobExecutionTime - set(value) { executorConfig.maxJobExecutionTime = value } + var executor: ExecutorConfig = ExecutorConfig() - /** - * See [ExecutorConfig.maxJobExecutionTime]. - * - * @param duration Max duration a job will be permitted to run for. - * - * @return This configuration. - */ - fun maxJobExecutionTime(duration: Duration) = apply { executorConfig.maxJobExecutionTime = duration } - - - /** - * See [ExecutorConfig.failurePolicy]. - */ - inline var executorFailurePolicy - get() = executorConfig.failurePolicy - set(value) { executorConfig.failurePolicy = value } - - /** - * See [ExecutorConfig.failurePolicy]. - * - * @param policy Failure policy to use. - * - * @return This configuration. - */ - fun executorFailurePolicy(policy: ExecutorFailurePolicy) = apply { executorConfig.failurePolicy = policy } + fun executor(executorConfig: ExecutorConfig) = also { it.executor = executorConfig } - // endregion Executor + inline fun executor(fn: ExecutorConfig.() -> Unit) = also { it.executor.fn() } } diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/model/ExecutorPoolState.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/model/ExecutorPoolState.kt deleted file mode 100644 index c247fec..0000000 --- a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/model/ExecutorPoolState.kt +++ /dev/null @@ -1,4 +0,0 @@ -package org.veupathdb.lib.rabbit.jobs.model - -sealed interface ExecutorPoolState - diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/model/PoolState.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/model/PoolState.kt deleted file mode 100644 index bec79cd..0000000 --- a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/model/PoolState.kt +++ /dev/null @@ -1,6 +0,0 @@ -package org.veupathdb.lib.rabbit.jobs.model - -internal data class PoolState( - var totalFailures: UInt, - var windowedFailures: UInt, -) : ExecutorPoolState \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/Executor.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/Executor.kt index 93bf447..052fdc9 100644 --- a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/Executor.kt +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/Executor.kt @@ -51,7 +51,9 @@ internal class Executor( /** * Callback used to submit jobs to a worker pool. */ - private val submitJobFn: (Runnable) -> Future<*> + private val submitJobFn: (Runnable) -> Future<*>, + + private val timeoutFn: (JobDispatch) -> Unit, ) { private val log = LoggerFactory.getLogger(javaClass) @@ -71,18 +73,20 @@ internal class Executor( log.debug("consumer '{}' executing job for message {}", id, msg.envelope.deliveryTag) - val fut = submitJobFn { handlers.execute(JobDispatch.fromJson(Json.from(msg.body))) } + val dispatch = JobDispatch.fromJson(Json.from(msg.body)) + val future = submitJobFn { handlers.execute(dispatch) } // Wait for {jobTimeout} at most before killing the job and // acknowledging the message try { - fut.get(jobTimeout.inWholeSeconds, TimeUnit.SECONDS) + future.get(jobTimeout.inWholeSeconds, TimeUnit.SECONDS) log.debug("acknowledging job message {}", msg.envelope.deliveryTag) channel.basicAck(msg.envelope.deliveryTag, false) } catch (e: TimeoutException) { log.warn("consumer '{}' killing job for message {} for taking longer than {}", id, msg.envelope.deliveryTag, jobTimeout) - fut.cancel(true) - channel.basicAck(msg.envelope.deliveryTag, false) + swallow { future.cancel(true) } + swallow { channel.basicAck(msg.envelope.deliveryTag, false) } + swallow { timeoutFn(dispatch) } } }, { }, @@ -94,4 +98,13 @@ internal class Executor( log.debug("closing channel for consumer {}", id) try { channel.close() } catch (e: Throwable) { /* do nothing */ } } + + private fun swallow(log: Boolean = true, fn: () -> Unit) { + try { + fn() + } catch (e: Throwable) { + if (log) + this.log.warn("caught exception:", e) + } + } } \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/ExecutorPoolConfig.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/ExecutorPoolConfig.kt index 890ef5b..71dcd2a 100644 --- a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/ExecutorPoolConfig.kt +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/ExecutorPoolConfig.kt @@ -1,6 +1,7 @@ package org.veupathdb.lib.rabbit.jobs.pools.exec import org.veupathdb.lib.rabbit.jobs.config.ExecutorFailureEnforcer +import org.veupathdb.lib.rabbit.jobs.model.JobDispatch import org.veupathdb.lib.rabbit.jobs.pools.JobHandlers import java.util.concurrent.ThreadFactory import kotlin.time.Duration @@ -60,4 +61,9 @@ internal data class ExecutorPoolConfig( * pool failure. */ val shutdownCB: () -> Unit, + + /** + * On job timeout callback. + */ + val timeoutCB: (JobDispatch) -> Unit, ) \ No newline at end of file diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/JobQueueExecutorPool.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/JobQueueExecutorPool.kt index 1881639..42550f8 100644 --- a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/JobQueueExecutorPool.kt +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/JobQueueExecutorPool.kt @@ -170,7 +170,8 @@ internal class JobQueueExecutorPool(private val config: ExecutorPoolConfig) { channel = config.channelProvider(), handlers = config.handlers, jobTimeout = config.maxJobTime, - submitJobFn = threads::submit + submitJobFn = threads::submit, + timeoutFn = config.timeoutCB, ) } diff --git a/test/client/src/main/kotlin/main.kt b/test/client/src/main/kotlin/main.kt index acef7aa..1f511be 100644 --- a/test/client/src/main/kotlin/main.kt +++ b/test/client/src/main/kotlin/main.kt @@ -10,9 +10,12 @@ fun main() { Thread.sleep(10_000) val conFac = JobQueueExecutor { - executorFailurePolicy = ExecutorFailurePolicy.maxTotalFailures(1) - workers = 1 - maxJobExecutionTime = 15.seconds + executor { + failurePolicy = ExecutorFailurePolicy.maxTotalFailures(1) + workers = 1 + maxJobExecutionTime = 15.seconds + jobTimeoutCallback = { println(it.body) } + } } conFac.onJob { From a708be9b05c9929b650e72a1226ea08db8b41774 Mon Sep 17 00:00:00 2001 From: Elizabeth Paige Harper Date: Wed, 14 Aug 2024 12:30:57 -0400 Subject: [PATCH 3/4] test updates --- makefile | 2 +- test/client/src/main/kotlin/main.kt | 6 +++--- test/docker-compose.yml | 4 ---- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/makefile b/makefile index 744d808..f496a47 100644 --- a/makefile +++ b/makefile @@ -7,7 +7,7 @@ end-to-end: @docker compose -f test/docker-compose.yml build \ --build-arg=GITHUB_USERNAME=$(shell grep 'gpr.user' ~/.gradle/gradle.properties | cut -d= -f2) \ --build-arg=GITHUB_TOKEN=$(shell grep 'gpr.key' ~/.gradle/gradle.properties | cut -d= -f2) - @docker compose -f test/docker-compose.yml up --no-attach rabbit + @docker compose -f test/docker-compose.yml up .PHONY: kill-tests kill-tests: diff --git a/test/client/src/main/kotlin/main.kt b/test/client/src/main/kotlin/main.kt index 1f511be..7e98d59 100644 --- a/test/client/src/main/kotlin/main.kt +++ b/test/client/src/main/kotlin/main.kt @@ -3,7 +3,7 @@ import org.veupathdb.lib.rabbit.jobs.JobQueueExecutor import org.veupathdb.lib.rabbit.jobs.config.ExecutorFailurePolicy import org.veupathdb.lib.rabbit.jobs.model.SuccessNotification -import kotlin.time.Duration.Companion.seconds +import kotlin.time.Duration.Companion.minutes fun main() { println("Sleeping for 10 seconds...") @@ -13,7 +13,7 @@ fun main() { executor { failurePolicy = ExecutorFailurePolicy.maxTotalFailures(1) workers = 1 - maxJobExecutionTime = 15.seconds + maxJobExecutionTime = 35.minutes jobTimeoutCallback = { println(it.body) } } } @@ -21,7 +21,7 @@ fun main() { conFac.onJob { println("Server said: $it") - Thread.sleep(20.seconds.inWholeMilliseconds) + Thread.sleep(40.minutes.inWholeMilliseconds) conFac.sendSuccess(SuccessNotification(it.jobID)) } diff --git a/test/docker-compose.yml b/test/docker-compose.yml index a72b148..532e474 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -13,7 +13,3 @@ services: image: rabbitmq:3.13.3-management-alpine ports: - "15672:15672" - volumes: - - type: bind - source: ../rabbitmq.conf - target: /etc/rabbitmq/rabbitmq.conf From 69444f9442529d3ede6d0e116375c86b25abfc0e Mon Sep 17 00:00:00 2001 From: Elizabeth Paige Harper Date: Wed, 14 Aug 2024 12:37:52 -0400 Subject: [PATCH 4/4] add comment --- .../lib/rabbit/jobs/pools/exec/JobQueueExecutorPool.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/JobQueueExecutorPool.kt b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/JobQueueExecutorPool.kt index 42550f8..86745ff 100644 --- a/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/JobQueueExecutorPool.kt +++ b/src/main/kotlin/org/veupathdb/lib/rabbit/jobs/pools/exec/JobQueueExecutorPool.kt @@ -131,7 +131,7 @@ internal class JobQueueExecutorPool(private val config: ExecutorPoolConfig) { silently(config.shutdownCB) if (block) - threads.awaitTermination(243256, TimeUnit.DAYS) + threads.awaitTermination(243256, TimeUnit.DAYS) // basically just wait indefinitely return StopCode.Forced }