Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add locking in cleanup #23

Merged
merged 5 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Transactional Outbox is a library that provides a simple way to implement
the [Transactional Outbox Pattern](https://microservices.io/patterns/data/transactional-outbox.html) in your
application, developed by Blueground.

Api Docs: https://bluegroundltd.github.io/transactional-outbox/
API Docs: https://bluegroundltd.github.io/transactional-outbox/

## Table of Contents

Expand All @@ -27,7 +27,7 @@ Transactional Outbox is published on `mavenCentral`. In order to use it just add

```gradle

implementation("io.github.bluegroundltd:transactional-outbox-core:1.0.0")
implementation("io.github.bluegroundltd:transactional-outbox-core:2.0.0")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add some notes on how users should upgrade from 1.0.0 to 2.0.0?
Or we should add this to the release tag after the release?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that's a nice idea!

In this case though, the upgrade only requires the addition of the locks provider call and the rename of the current function. The TransactionalBuilder API is guiding the user via the fluent interface. I've also kinda updated the docs here.

Do you possibly mean adding a migration guide? Like

Upgrading from 1.x to 2.0.0

?

Copy link
Collaborator Author

@chris-asl chris-asl Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

9d4d65e 👀


```

Expand All @@ -49,18 +49,20 @@ class OutboxConfiguration(

@Bean
fun transactionalOutbox(): TransactionalOutbox {
val locksProvider = OutboxLocksProvider(postgresLockDao, APPLICATION_SPECIFIC_ID)
val monitorLocksProvider = OutboxLocksProvider(postgresLockDao, MONITOR_APPLICATION_SPECIFIC_ID)
val cleanupLocksProvider = OutboxLocksProvider(postgresLockDao, CLEANUP_APPLICATION_SPECIFIC_ID)

return TransactionalOutboxBuilder
.make(clock)
.withHandlers(outboxHandlers)
.withLocksProvider(locksProvider)
.withMonitorLocksProvider(monitorLocksProvider)
.withCleanupLocksProvider(cleanupLocksProvider)
.withStore(outboxStore)
.build()
}
}

private class OutboxLocksProvider(
private class OutboxLocksProviderImpl(
private val postgresLockDao: PostgresLockDao,
private val id: Long
) : OutboxLocksProvider {
Expand All @@ -76,7 +78,10 @@ private class OutboxLocksProvider(

### Creating a new Outbox Handler

Then you can create a new `OutboxHandler` that will be responsible for processing the `Outbox` entries:
Then you can create a new `OutboxHandler` that will be responsible for processing the `Outbox` entries.
Below you can see a barebones handler, but there's also a utility handler, which uses JSON (de)serialization and
reduces the outbox handlers boilerplate code. Refer to [SimpleOutboxHandler](https://bluegroundltd.github.io/transactional-outbox/core/io.github.bluegroundltd.outbox/-simple-outbox-handler/index.html) in our [docs page](
https://bluegroundltd.github.io/transactional-outbox/index.html).

```kotlin
enum class MyOutboxType: OutboxType {
Expand Down
55 changes: 55 additions & 0 deletions UPGRADING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Upgrade Guide

## v.2.x.x

### v.2.0.0 - Completed outbox items cleanup coordination

Release 2.0.0 introduces coordination in the cleanup process across instances using locks.

#### `TransactionalOutboxBuilder#withLocksProvider` has been removed in favor of `withMonitorLocksProvider`
The reason for this breaking change was the need to introduce a different locks provider for the completed outbox items cleanup process.
We could reuse the same locks provider for both the monitor and cleanup process, but this would entail serialized execution of the two processes, which was not desirable.
The requirement for the new locks provider is solely to be independent of the locks provider used in the monitor process.
For example, if the locks provider implementation is using Postgres advisory locks, the monitor and the cleanup locks should use a different lock identifier.

**Required changes**
The `TransactionalOutboxBuilder` call needs to be updated from
```kotlin
TransactionalOutboxBuilder
.make(clock)
.withHandlers(outboxHandlers)
.withLocksProvider(locksProvider)
.withStore(outboxStore)
.build()
```
to
```kotlin
TransactionalOutboxBuilder
.make(clock)
.withHandlers(outboxHandlers)
.withMonitorLocksProvider(PostgresOutboxLocksProvider(LOCKS_MONITOR_ID))
.withCleanupLocksProvider(PostgresOutboxLocksProvider(LOCKS_CLEANUP_ID))
.withStore(outboxStore)
.build()
```
N.B.: The above assumes that the locks provider implementation is using Postgres advisory locks.

## v.1.x.x

### v.1.0.0 - Completed outbox items cleanup

Release 1.0 introduces a cleanup process for the outbox items that have been successfully processed, thus reducing the size of the outbox table, which can grow quite large.
When the outbox items are processes successfully, in addition to be marked as completed, their `OutboxItem.deleteAfter` field is set to `now() + retentionPeriod`.
The cleanup process, like monitor, should be run periodically, depending on your needs. Once run, it deletes the completed
outbox items whose `deleteAfter` is earlier than the current time.

It is advisable to manually delete the already completed outbox items before upgrading to 1.0.0, as the cleanup process
will issue a deletion, which may be quite heavy in terms of I/O operations, hence timeouts may occur on the first run.

**Required changes**
In the `OutboxStore` implementing class, the `deleteCompletedItems(now: Instant)` method needs to be implemented.
The method should simply delete the outbox items with status `COMPLETED` with a `deleteAfter` earlier than the provided `now` parameter.

Finally, the retention duration period can be defined per outbox handler for flexibility.
A new `OutboxHandler` method has been added `getRetentionDuration(): Duration` which should return the retention period for the outbox items of the handler.
Feel free to look the [SimpleOutboxHandler](./core/src/main/kotlin/io/github/bluegroundltd/outbox/SimpleOutboxHandler.kt) for an example.
2 changes: 1 addition & 1 deletion core/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
GROUP=io.github.bluegroundltd
POM_ARTIFACT_ID=transactional-outbox-core
VERSION_NAME=1.0.0
VERSION_NAME=2.0.0

POM_NAME=Transactional Outbox Core
POM_DESCRIPTION=Easily implement the transactional outbox pattern in your JVM application
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import kotlin.properties.Delegates
* return TransactionalOutboxBuilder
* .make(clock)
* .withHandlers(outboxHandlers)
* .withLocksProvider(locksProvider)
* .withMonitorLocksProvider(monitorLocksProvider)
* .withCleanupLocksProvider(cleanupLocksProvider)
* .withStore(outboxStore)
* .withInstantOutboxPublisher(instantOutboxPublisher)
* .build()
Expand All @@ -28,12 +29,18 @@ import kotlin.properties.Delegates
class TransactionalOutboxBuilder(
private val clock: Clock,
private val rerunAfterDuration: Duration = DEFAULT_RERUN_AFTER_DURATION
) : OutboxHandlersStep, LocksProviderStep, StoreStep, InstantOutboxPublisherStep, BuildStep {
) : OutboxHandlersStep,
MonitorLocksProviderStep,
CleanupLocksProviderStep,
StoreStep,
InstantOutboxPublisherStep,
BuildStep {
private val handlers: MutableMap<OutboxType, OutboxHandler> = mutableMapOf()
private var threadPoolSize by Delegates.notNull<Int>()
private var threadPoolTimeOut: Duration = DEFAULT_THREAD_POOL_TIMEOUT
private var decorators: MutableList<OutboxItemProcessorDecorator> = mutableListOf()
private lateinit var locksProvider: OutboxLocksProvider
private lateinit var monitorLocksProvider: OutboxLocksProvider
private lateinit var cleanupLocksProvider: OutboxLocksProvider
private lateinit var store: OutboxStore
private lateinit var instantOutboxPublisher: InstantOutboxPublisher

Expand All @@ -53,7 +60,7 @@ class TransactionalOutboxBuilder(
/**
* Sets the handlers for the outbox.
*/
override fun withHandlers(handlers: Set<OutboxHandler>): LocksProviderStep {
override fun withHandlers(handlers: Set<OutboxHandler>): MonitorLocksProviderStep {
validateNoDuplicateHandlerSupportedTypes(handlers)
handlers.associateByTo(this.handlers) { it.getSupportedType() }
return this
Expand Down Expand Up @@ -87,10 +94,18 @@ class TransactionalOutboxBuilder(
}

/**
* Sets the locks provider for the outbox.
* Sets the locks provider for the outbox monitor runs.
*/
override fun withLocksProvider(locksProvider: OutboxLocksProvider): StoreStep {
this.locksProvider = locksProvider
override fun withMonitorLocksProvider(locksProvider: OutboxLocksProvider): CleanupLocksProviderStep {
this.monitorLocksProvider = locksProvider
return this
}

/**
* Sets the locks provider for the outbox cleanup runs.
*/
override fun withCleanupLocksProvider(locksProvider: OutboxLocksProvider): StoreStep {
this.cleanupLocksProvider = locksProvider
return this
}

Expand Down Expand Up @@ -148,7 +163,8 @@ class TransactionalOutboxBuilder(
return TransactionalOutboxImpl(
clock,
handlers.toMap(),
locksProvider,
monitorLocksProvider,
cleanupLocksProvider,
store,
instantOutboxPublisher,
outboxItemFactory,
Expand All @@ -161,11 +177,15 @@ class TransactionalOutboxBuilder(
}

interface OutboxHandlersStep {
fun withHandlers(handlers: Set<OutboxHandler>): LocksProviderStep
fun withHandlers(handlers: Set<OutboxHandler>): MonitorLocksProviderStep
}

interface MonitorLocksProviderStep {
fun withMonitorLocksProvider(locksProvider: OutboxLocksProvider): CleanupLocksProviderStep
}

interface LocksProviderStep {
fun withLocksProvider(locksProvider: OutboxLocksProvider): StoreStep
interface CleanupLocksProviderStep {
fun withCleanupLocksProvider(locksProvider: OutboxLocksProvider): StoreStep
}

interface StoreStep {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

@SuppressWarnings("LongParameterList")
@SuppressWarnings("LongParameterList", "TooGenericExceptionCaught")
internal class TransactionalOutboxImpl(
private val clock: Clock,
private val outboxHandlers: Map<OutboxType, OutboxHandler>,
private val locksProvider: OutboxLocksProvider,
private val monitorLocksProvider: OutboxLocksProvider,
private val cleanupLocksProvider: OutboxLocksProvider,
private val outboxStore: OutboxStore,
private val instantOutboxPublisher: InstantOutboxPublisher,
private val outboxItemFactory: OutboxItemFactory,
Expand Down Expand Up @@ -66,12 +67,12 @@ internal class TransactionalOutboxImpl(

override fun monitor() {
if (inShutdownMode.get()) {
logger.info("$LOGGER_PREFIX Shutdown in process - no longer accepting items for processing")
logger.info("$LOGGER_PREFIX Shutdown in process, no longer accepting items for processing")
return
}

runCatching {
locksProvider.acquire()
monitorLocksProvider.acquire()

val items = fetchEligibleItems()
if (items.isEmpty()) {
Expand All @@ -87,8 +88,8 @@ internal class TransactionalOutboxImpl(
logger.error("$LOGGER_PREFIX Failure in monitor", it)
}

kotlin.runCatching { locksProvider.release() }.onFailure {
logger.error("$LOGGER_PREFIX Failed to release lock of $locksProvider", it)
runCatching { monitorLocksProvider.release() }.onFailure {
logger.error("$LOGGER_PREFIX Failed to release lock of $monitorLocksProvider", it)
}
}

Expand Down Expand Up @@ -172,7 +173,29 @@ internal class TransactionalOutboxImpl(
}

override fun cleanup() {
logger.info("$LOGGER_PREFIX Cleaning up outbox items")
outboxStore.deleteCompletedItems(Instant.now(clock))
if (inShutdownMode.get()) {
logger.info("$LOGGER_PREFIX Shutdown in process, deferring cleanup")
return
}

var wasLockingAcquired = false
try {
cleanupLocksProvider.acquire()
chris-asl marked this conversation as resolved.
Show resolved Hide resolved
wasLockingAcquired = true

val now = Instant.now(clock)
logger.info("$LOGGER_PREFIX Cleaning up completed outbox items, with deleteAfter <= $now")
outboxStore.deleteCompletedItems(now)
} catch (exception: Exception) {
logger.error("$LOGGER_PREFIX Failure in cleanup", exception)
} finally {
if (wasLockingAcquired) {
try {
cleanupLocksProvider.release()
} catch (exception: Exception) {
logger.error("$LOGGER_PREFIX Failed to release cleanup lock ($cleanupLocksProvider)", exception)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ class TransactionalOutboxImplSpec extends Specification {
OutboxType type = handler.getSupportedType()
Map<OutboxType, OutboxHandler> handlers = Map.of(type, handler)

OutboxLocksProvider locksProvider = Mock()
OutboxLocksProvider monitorLocksProvider = Mock()
OutboxLocksProvider cleanupLocksProvider = Mock()
OutboxStore store = Mock()
InstantOutboxPublisher instantOutboxPublisher = Mock()
OutboxItemFactory outboxItemFactory = Mock()
Expand All @@ -39,7 +40,8 @@ class TransactionalOutboxImplSpec extends Specification {
transactionalOutbox = new TransactionalOutboxImpl(
clock,
handlers,
locksProvider,
monitorLocksProvider,
cleanupLocksProvider,
store,
instantOutboxPublisher,
outboxItemFactory,
Expand Down Expand Up @@ -70,7 +72,7 @@ class TransactionalOutboxImplSpec extends Specification {
transactionalOutbox.monitor()

then:
1 * locksProvider.acquire()
1 * monitorLocksProvider.acquire()
1 * store.fetch(_) >> { OutboxFilter filter ->
with(filter) {
outboxPendingFilter.nextRunLessThan == now
Expand All @@ -86,7 +88,7 @@ class TransactionalOutboxImplSpec extends Specification {
}
return item
}
1 * locksProvider.release()
1 * monitorLocksProvider.release()
0 * _

when:
Expand All @@ -106,7 +108,7 @@ class TransactionalOutboxImplSpec extends Specification {
transactionalOutbox.monitor()

then:
1 * locksProvider.acquire()
1 * monitorLocksProvider.acquire()
1 * store.fetch(_) >> { OutboxFilter filter ->
with(filter) {
outboxPendingFilter.nextRunLessThan == now
Expand All @@ -121,7 +123,7 @@ class TransactionalOutboxImplSpec extends Specification {
}
return item
}
1 * locksProvider.release()
1 * monitorLocksProvider.release()
0 * _

when:
Expand All @@ -147,7 +149,58 @@ class TransactionalOutboxImplSpec extends Specification {
transactionalOutbox.cleanup()

then:
1 * cleanupLocksProvider.acquire()
1 * store.deleteCompletedItems(now)
1 * cleanupLocksProvider.release()
0 * _
}

def "Should early return from cleanup if in shutdown mode"() {
when:
transactionalOutbox.shutdown()
transactionalOutbox.cleanup()

then:
0 * _
}

def "Should handle an exception thrown from the cleanup store method"() {
when:
transactionalOutbox.cleanup()

then:
1 * cleanupLocksProvider.acquire()
1 * store.deleteCompletedItems(_) >> {
throw new InterruptedException()
}
1 * cleanupLocksProvider.release()
0 * _
noExceptionThrown()
}

def "Should handle an exception thrown during the cleanup release locks"() {
when:
transactionalOutbox.cleanup()

then:
1 * cleanupLocksProvider.acquire()
1 * store.deleteCompletedItems(_)
1 * cleanupLocksProvider.release() >> {
throw new InterruptedException()
}
0 * _
noExceptionThrown()
}

def "Should not release the lock in cleanup, after a failure in acquire"() {
when:
transactionalOutbox.cleanup()

then:
1 * cleanupLocksProvider.acquire() >> {
throw new InterruptedException()
}
0 * _
noExceptionThrown()
}
}
Loading
Loading