diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationConfiguration.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationConfiguration.kt index a36d0044c950e..34d13916f30fd 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationConfiguration.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/command/DestinationConfiguration.kt @@ -81,11 +81,13 @@ abstract class DestinationConfiguration : Configuration { /** * The amount of time given to implementor tasks (e.g. open, processBatch) to complete their - * current work after a failure. + * current work after a failure. Input consuming will stop right away, so this will give the + * tasks time to persist the messages already read. */ - open val gracefulCancellationTimeoutMs: Long = 60 * 1000L // 1 minutes + open val gracefulCancellationTimeoutMs: Long = 10 * 60 * 1000L // 10 minutes open val numProcessRecordsWorkers: Int = 2 + open val processRecordsIsIO: Boolean = false open val numProcessBatchWorkers: Int = 5 open val numProcessBatchWorkersForFileTransfer: Int = 3 open val batchQueueDepth: Int = 10 diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt index de43ff2302dfd..e5ebcbd91e860 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt @@ -141,9 +141,13 @@ class DefaultDestinationTaskLauncher( private val closeStreamHasRun = ConcurrentHashMap() - inner class TaskWrapper( - override val innerTask: ScopedTask, - ) : WrappedTask { + inner class WrappedTask( + private val innerTask: Task, + ) : Task { + override val isIO = innerTask.isIO + override val cancelAtEndOfSync = innerTask.cancelAtEndOfSync + override val killOnSyncFailure = innerTask.killOnSyncFailure + override suspend fun execute() { try { innerTask.execute() @@ -161,16 +165,8 @@ class DefaultDestinationTaskLauncher( } } - inner class NoopWrapper( - override val innerTask: ScopedTask, - ) : WrappedTask { - override suspend fun execute() { - innerTask.execute() - } - } - - private suspend fun enqueue(task: ScopedTask, withExceptionHandling: Boolean = true) { - val wrapped = if (withExceptionHandling) TaskWrapper(task) else NoopWrapper(task) + private suspend fun launch(task: Task, withExceptionHandling: Boolean = true) { + val wrapped = if (withExceptionHandling) WrappedTask(task) else task taskScopeProvider.launch(wrapped) } @@ -186,12 +182,12 @@ class DefaultDestinationTaskLauncher( fileTransferQueue = fileTransferQueue, destinationTaskLauncher = this, ) - enqueue(inputConsumerTask) + launch(inputConsumerTask) // Launch the client interface setup task log.info { "Starting startup task" } val setupTask = setupTaskFactory.make(this) - enqueue(setupTask) + launch(setupTask) // TODO: pluggable file transfer if (!fileTransferEnabled) { @@ -199,43 +195,43 @@ class DefaultDestinationTaskLauncher( catalog.streams.forEach { stream -> log.info { "Starting spill-to-disk task for $stream" } val spillTask = spillToDiskTaskFactory.make(this, stream.descriptor) - enqueue(spillTask) + launch(spillTask) } repeat(config.numProcessRecordsWorkers) { log.info { "Launching process records task $it" } val task = processRecordsTaskFactory.make(this) - enqueue(task) + launch(task) } repeat(config.numProcessBatchWorkers) { log.info { "Launching process batch task $it" } val task = processBatchTaskFactory.make(this) - enqueue(task) + launch(task) } } else { repeat(config.numProcessRecordsWorkers) { log.info { "Launching process file task $it" } - enqueue(processFileTaskFactory.make(this)) + launch(processFileTaskFactory.make(this)) } repeat(config.numProcessBatchWorkersForFileTransfer) { log.info { "Launching process batch task $it" } val task = processBatchTaskFactory.make(this) - enqueue(task) + launch(task) } } // Start flush task log.info { "Starting timed file aggregate flush task " } - enqueue(flushTickTask) + launch(flushTickTask) // Start the checkpoint management tasks log.info { "Starting timed checkpoint flush task" } - enqueue(timedCheckpointFlushTask) + launch(timedCheckpointFlushTask) log.info { "Starting checkpoint update task" } - enqueue(updateCheckpointsTask) + launch(updateCheckpointsTask) // Await completion if (succeeded.receive()) { @@ -250,7 +246,7 @@ class DefaultDestinationTaskLauncher( catalog.streams.forEach { log.info { "Starting open stream task for $it" } val task = openStreamTaskFactory.make(this, it) - enqueue(task) + launch(task) } } @@ -276,14 +272,14 @@ class DefaultDestinationTaskLauncher( log.info { "Batch $wrapped is persisted: Starting flush checkpoints task for $stream" } - enqueue(flushCheckpointsTaskFactory.make()) + launch(flushCheckpointsTaskFactory.make()) } if (streamManager.isBatchProcessingComplete()) { if (closeStreamHasRun.getOrPut(stream) { AtomicBoolean(false) }.setOnce()) { log.info { "Batch processing complete: Starting close stream task for $stream" } val task = closeStreamTaskFactory.make(this, stream) - enqueue(task) + launch(task) } else { log.info { "Close stream task has already run, skipping." } } @@ -296,7 +292,7 @@ class DefaultDestinationTaskLauncher( /** Called when a stream is closed. */ override suspend fun handleStreamClosed(stream: DestinationStream.Descriptor) { if (teardownIsEnqueued.setOnce()) { - enqueue(teardownTaskFactory.make(this)) + launch(teardownTaskFactory.make(this)) } else { log.info { "Teardown task already enqueued, not enqueuing another one" } } @@ -305,7 +301,7 @@ class DefaultDestinationTaskLauncher( override suspend fun handleException(e: Exception) { catalog.streams .map { failStreamTaskFactory.make(this, e, it.descriptor) } - .forEach { enqueue(it, withExceptionHandling = false) } + .forEach { launch(it, withExceptionHandling = false) } } override suspend fun handleFailStreamComplete( @@ -313,7 +309,7 @@ class DefaultDestinationTaskLauncher( e: Exception ) { if (failSyncIsEnqueued.setOnce()) { - enqueue(failSyncTaskFactory.make(this, e)) + launch(failSyncTaskFactory.make(this, e)) } else { log.info { "Teardown task already enqueued, not enqueuing another one" } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/Task.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/Task.kt index 453f9be8e103b..2843ae08607a1 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/Task.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/Task.kt @@ -5,6 +5,18 @@ package io.airbyte.cdk.load.task interface Task { + /** + * If the task performs any blocking io, even writing to local disk, it should set + * [isIO] = true. [cancelAtEndOfSync] is for long-running tasks that will otherwise + * not close. [killOnSyncFailure] is for tasks that close normally under success conditions + * but should be halted immediately on failure to permit shutdown (like input consuming). + * + * TODO: simplify this further. + */ + val isIO: Boolean + val cancelAtEndOfSync: Boolean + val killOnSyncFailure: Boolean + suspend fun execute() } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/TaskScopeProvider.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/TaskScopeProvider.kt index 409a1fbd0d52a..8232f0148811f 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/TaskScopeProvider.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/TaskScopeProvider.kt @@ -6,145 +6,82 @@ package io.airbyte.cdk.load.task import io.airbyte.cdk.load.command.DestinationConfiguration import io.github.oshai.kotlinlogging.KotlinLogging -import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton -import java.util.concurrent.Executors -import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicReference -import kotlin.system.measureTimeMillis -import kotlinx.coroutines.CompletableJob -import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job -import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeoutOrNull +import org.apache.mina.util.ConcurrentHashSet -/** - * The scope in which a task should run - * - [InternalScope]: - * ``` - * - internal to the task launcher - * - should not be blockable by implementor errors - * - killable w/o side effects - * ``` - * - [ImplementorScope]: implemented by the destination - * ``` - * - calls implementor interface - * - should not block internal tasks (esp reading from stdin) - * - should complete if possible even when failing the sync - * ``` - */ -sealed interface ScopedTask : Task - -interface InternalScope : ScopedTask - -interface ImplementorScope : ScopedTask - -/** - * Some tasks should be immediately cancelled upon any failure (for example, reading from stdin, the - * every-15-minutes flush). Those tasks should be placed into the fail-fast scope. - */ -interface KillableScope : ScopedTask - -interface WrappedTask : Task { - val innerTask: T +interface WrappedTask : Task { + val innerTask: Task } @Singleton -@Secondary class TaskScopeProvider(config: DestinationConfiguration) { private val log = KotlinLogging.logger {} private val timeoutMs = config.gracefulCancellationTimeoutMs - data class ControlScope( - val name: String, - val job: CompletableJob, - val dispatcher: CoroutineDispatcher - ) { - val scope: CoroutineScope = CoroutineScope(dispatcher + job) - val runningJobs: AtomicLong = AtomicLong(0) - } - - private val internalScope = ControlScope("internal", Job(), Dispatchers.IO) - - private val implementorScope = - ControlScope( - "implementor", - Job(), - Executors.newFixedThreadPool(config.maxNumImplementorTaskThreads) - .asCoroutineDispatcher() - ) - - private val failFastScope = ControlScope("input", Job(), Dispatchers.IO) - - suspend fun launch(task: WrappedTask) { - val scope = - when (task.innerTask) { - is InternalScope -> internalScope - is ImplementorScope -> implementorScope - is KillableScope -> failFastScope + private val supervisor = Job() + private val ioScope = CoroutineScope(Dispatchers.IO + supervisor) + private val defaultScope = CoroutineScope(Dispatchers.Default + supervisor) + private val killOnSyncFailure = ConcurrentHashSet() + private val cancelAtEndOfSync = ConcurrentHashSet() + + suspend fun launch(task: Task) { + val scope = if (task.isIO) ioScope else defaultScope + val job = + scope.launch { + log.info { "Launching $task" } + task.execute() + log.info { "Task $task completed" } } - scope.scope.launch { - var nJobs = scope.runningJobs.incrementAndGet() - log.info { "Launching task $task in scope ${scope.name} ($nJobs now running)" } - val elapsed = measureTimeMillis { task.execute() } - nJobs = scope.runningJobs.decrementAndGet() - log.info { "Task $task completed in $elapsed ms ($nJobs now running)" } + if (task.cancelAtEndOfSync) { + cancelAtEndOfSync.add(job) + } + if (task.killOnSyncFailure) { + killOnSyncFailure.add(job) } } suspend fun close() { - // Under normal operation, all tasks should be complete - // (except things like force flush, which loop). So - // - it's safe to force cancel the internal tasks - // - implementor scope should join immediately - log.info { "Closing task scopes (${implementorScope.runningJobs.get()} remaining)" } + log.info { "Closing normally, canceling long-running tasks" } + cancelAtEndOfSync.forEach { it.cancel() } + val uncaughtExceptions = AtomicReference() - implementorScope.job.children.forEach { + log.info { "Verifying task completion" } + supervisor.children.forEach { it.invokeOnCompletion { cause -> if (cause != null) { - log.error { "Uncaught exception in implementor task: $cause" } + log.error { "Uncaught exception in task: $cause" } uncaughtExceptions.set(cause) } } } - implementorScope.job.complete() - implementorScope.job.join() if (uncaughtExceptions.get() != null) { - throw IllegalStateException( - "Uncaught exceptions in implementor tasks", - uncaughtExceptions.get() - ) - } - log.info { - "Implementor tasks completed, cancelling internal tasks (${internalScope.runningJobs.get()} remaining)." + throw uncaughtExceptions.get() } - internalScope.job.cancel() } suspend fun kill() { - log.info { "Killing task scopes" } - // Terminate tasks which should be immediately terminated - failFastScope.job.cancel() + log.info { "Failing, killing input tasks and canceling long-running tasks" } + killOnSyncFailure.forEach { it.cancel() } + cancelAtEndOfSync.forEach { it.cancel() } // Give the implementor tasks a chance to fail gracefully withTimeoutOrNull(timeoutMs) { log.info { "Cancelled internal tasks, waiting ${timeoutMs}ms for implementor tasks to complete" } - implementorScope.job.complete() - implementorScope.job.join() + supervisor.complete() log.info { "Implementor tasks completed" } } ?: run { log.error { "Implementor tasks did not complete within ${timeoutMs}ms, cancelling" } - implementorScope.job.cancel() + supervisor.cancel() } - - log.info { "Cancelling internal tasks" } - internalScope.job.cancel() } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/CloseStreamTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/CloseStreamTask.kt index 14a1688e7ad9b..7c6e4298117fb 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/CloseStreamTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/CloseStreamTask.kt @@ -7,12 +7,12 @@ package io.airbyte.cdk.load.task.implementor import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.ImplementorScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.write.StreamLoader import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton -interface CloseStreamTask : ImplementorScope +interface CloseStreamTask : Task /** * Wraps @[StreamLoader.close] and marks the stream as closed in the stream manager. Also starts the @@ -24,6 +24,9 @@ class DefaultCloseStreamTask( val streamDescriptor: DestinationStream.Descriptor, private val taskLauncher: DestinationTaskLauncher ) : CloseStreamTask { + override val isIO = true + override val cancelAtEndOfSync = false + override val killOnSyncFailure = false override suspend fun execute() { val streamLoader = syncManager.getOrAwaitStreamLoader(streamDescriptor) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailStreamTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailStreamTask.kt index 9959a3286ab0f..aa2cc39bce590 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailStreamTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailStreamTask.kt @@ -9,12 +9,12 @@ import io.airbyte.cdk.load.state.StreamProcessingFailed import io.airbyte.cdk.load.state.StreamProcessingSucceeded import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.ImplementorScope +import io.airbyte.cdk.load.task.Task import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton -interface FailStreamTask : ImplementorScope +interface FailStreamTask : Task /** * FailStreamTask is a task that is executed when the processing of a stream fails in the @@ -28,6 +28,10 @@ class DefaultFailStreamTask( ) : FailStreamTask { val log = KotlinLogging.logger {} + override val isIO = true + override val cancelAtEndOfSync = false + override val killOnSyncFailure = false + override suspend fun execute() { val streamManager = syncManager.getStreamManager(stream) streamManager.markProcessingFailed(exception) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailSyncTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailSyncTask.kt index 10f64ab9de0f2..5af6119ce7625 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailSyncTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/FailSyncTask.kt @@ -7,13 +7,13 @@ package io.airbyte.cdk.load.task.implementor import io.airbyte.cdk.load.state.CheckpointManager import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.ImplementorScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.write.DestinationWriter import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton -interface FailSyncTask : ImplementorScope +interface FailSyncTask : Task /** * FailSyncTask is a task that is executed only when the destination itself fails during a sync. If @@ -29,6 +29,10 @@ class DefaultFailSyncTask( ) : FailSyncTask { private val log = KotlinLogging.logger {} + override val isIO = true + override val cancelAtEndOfSync = false + override val killOnSyncFailure = false + override suspend fun execute() { // Ensure any remaining ready state gets captured: don't waste work! checkpointManager.flushReadyCheckpointMessages() diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/OpenStreamTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/OpenStreamTask.kt index 80f8e99024c7e..6580529111118 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/OpenStreamTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/OpenStreamTask.kt @@ -7,13 +7,13 @@ package io.airbyte.cdk.load.task.implementor import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.ImplementorScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.write.DestinationWriter import io.airbyte.cdk.load.write.StreamLoader import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton -interface OpenStreamTask : ImplementorScope +interface OpenStreamTask : Task /** * Wraps @[StreamLoader.start] and starts the spill-to-disk tasks. @@ -27,6 +27,10 @@ class DefaultOpenStreamTask( private val taskLauncher: DestinationTaskLauncher, private val stream: DestinationStream, ) : OpenStreamTask { + override val isIO = true + override val cancelAtEndOfSync = false + override val killOnSyncFailure = false + override suspend fun execute() { val streamLoader = destinationWriter.createStreamLoader(stream) val result = runCatching { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessBatchTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessBatchTask.kt index 1d0e43d86242c..c34b4f766b35f 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessBatchTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessBatchTask.kt @@ -8,14 +8,14 @@ import io.airbyte.cdk.load.message.BatchEnvelope import io.airbyte.cdk.load.message.MultiProducerChannel import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.KillableScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.write.StreamLoader import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import jakarta.inject.Named import jakarta.inject.Singleton -interface ProcessBatchTask : KillableScope +interface ProcessBatchTask : Task /** Wraps @[StreamLoader.processBatch] and handles the resulting batch. */ class DefaultProcessBatchTask( @@ -23,6 +23,10 @@ class DefaultProcessBatchTask( private val batchQueue: MultiProducerChannel>, private val taskLauncher: DestinationTaskLauncher ) : ProcessBatchTask { + override val isIO = true + override val cancelAtEndOfSync = false + override val killOnSyncFailure = false + val log = KotlinLogging.logger {} override suspend fun execute() { batchQueue.consume().collect { batchEnvelope -> diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt index 0f2dcb0c3cf79..006fdf96ded41 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessFileTask.kt @@ -11,7 +11,7 @@ import io.airbyte.cdk.load.message.MessageQueue import io.airbyte.cdk.load.message.MultiProducerChannel import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.ImplementorScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.util.use import io.airbyte.cdk.load.write.FileBatchAccumulator import io.github.oshai.kotlinlogging.KotlinLogging @@ -20,7 +20,7 @@ import jakarta.inject.Named import jakarta.inject.Singleton import java.util.concurrent.ConcurrentHashMap -interface ProcessFileTask : ImplementorScope +interface ProcessFileTask : Task class DefaultProcessFileTask( private val syncManager: SyncManager, @@ -28,6 +28,10 @@ class DefaultProcessFileTask( private val inputQueue: MessageQueue, private val outputQueue: MultiProducerChannel>, ) : ProcessFileTask { + override val isIO = false + override val cancelAtEndOfSync = false + override val killOnSyncFailure = false + val log = KotlinLogging.logger {} private val accumulators = ConcurrentHashMap() diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTask.kt index 7cc62d5c57a69..34bb426ebb9e4 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTask.kt @@ -20,7 +20,7 @@ import io.airbyte.cdk.load.message.ProtocolMessageDeserializer import io.airbyte.cdk.load.state.ReservationManager import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.KillableScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.task.internal.SpilledRawMessagesLocalFile import io.airbyte.cdk.load.util.lineSequence import io.airbyte.cdk.load.util.use @@ -34,7 +34,7 @@ import java.io.InputStream import java.util.concurrent.ConcurrentHashMap import kotlin.io.path.inputStream -interface ProcessRecordsTask : KillableScope +interface ProcessRecordsTask : Task /** * Wraps @[StreamLoader.processRecords] and feeds it a lazy iterator over the last batch of spooled @@ -54,6 +54,11 @@ class DefaultProcessRecordsTask( private val outputQueue: MultiProducerChannel>, ) : ProcessRecordsTask { private val log = KotlinLogging.logger {} + + override val isIO = config.processRecordsIsIO + override val cancelAtEndOfSync = false + override val killOnSyncFailure = false + private val accumulators = ConcurrentHashMap() override suspend fun execute() { outputQueue.use { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/SetupTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/SetupTask.kt index 1bf807973d130..2712207ea5731 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/SetupTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/SetupTask.kt @@ -5,12 +5,12 @@ package io.airbyte.cdk.load.task.implementor import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.ImplementorScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.write.DestinationWriter import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton -interface SetupTask : ImplementorScope +interface SetupTask : Task /** * Wraps @[DestinationWriter.setup] and starts the open stream tasks. @@ -22,6 +22,10 @@ class DefaultSetupTask( private val destination: DestinationWriter, private val taskLauncher: DestinationTaskLauncher ) : SetupTask { + override val isIO = true + override val cancelAtEndOfSync = false + override val killOnSyncFailure = false + override suspend fun execute() { destination.setup() taskLauncher.handleSetupComplete() diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/TeardownTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/TeardownTask.kt index 64dada897c6b4..49dd04c713217 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/TeardownTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/TeardownTask.kt @@ -7,13 +7,13 @@ package io.airbyte.cdk.load.task.implementor import io.airbyte.cdk.load.state.CheckpointManager import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.ImplementorScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.write.DestinationWriter import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton -interface TeardownTask : ImplementorScope +interface TeardownTask : Task /** * Wraps @[DestinationWriter.teardown] and stops the task launcher. @@ -28,6 +28,10 @@ class DefaultTeardownTask( ) : TeardownTask { val log = KotlinLogging.logger {} + override val isIO = true + override val cancelAtEndOfSync = false + override val killOnSyncFailure = false + override suspend fun execute() { syncManager.awaitInputProcessingComplete() diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/FlushCheckpointsTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/FlushCheckpointsTask.kt index 37901ecb9fe61..a3322b661a7e8 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/FlushCheckpointsTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/FlushCheckpointsTask.kt @@ -5,15 +5,19 @@ package io.airbyte.cdk.load.task.internal import io.airbyte.cdk.load.state.CheckpointManager -import io.airbyte.cdk.load.task.InternalScope +import io.airbyte.cdk.load.task.Task import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton -interface FlushCheckpointsTask : InternalScope +interface FlushCheckpointsTask : Task class DefaultFlushCheckpointsTask( private val checkpointManager: CheckpointManager<*, *>, ) : FlushCheckpointsTask { + override val isIO: Boolean = false + override val cancelAtEndOfSync: Boolean = false + override val killOnSyncFailure: Boolean = false + override suspend fun execute() { checkpointManager.flushReadyCheckpointMessages() } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/FlushTickTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/FlushTickTask.kt index 0e69940b4c174..5729d30b2a22a 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/FlushTickTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/FlushTickTask.kt @@ -12,7 +12,7 @@ import io.airbyte.cdk.load.message.DestinationStreamEvent import io.airbyte.cdk.load.message.MessageQueueSupplier import io.airbyte.cdk.load.message.StreamFlushEvent import io.airbyte.cdk.load.state.Reserved -import io.airbyte.cdk.load.task.KillableScope +import io.airbyte.cdk.load.task.Task import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import io.micronaut.context.annotation.Value @@ -29,9 +29,13 @@ class FlushTickTask( private val catalog: DestinationCatalog, private val recordQueueSupplier: MessageQueueSupplier>, -) : KillableScope { +) : Task { private val log = KotlinLogging.logger {} + override val isIO = false + override val cancelAtEndOfSync = true + override val killOnSyncFailure = true + override suspend fun execute() { while (true) { waitAndPublishFlushTick() diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt index e084bcc4fe416..ef630725e0a77 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt @@ -33,7 +33,7 @@ import io.airbyte.cdk.load.message.Undefined import io.airbyte.cdk.load.state.Reserved import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.KillableScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage import io.airbyte.cdk.load.util.use import io.github.oshai.kotlinlogging.KotlinLogging @@ -41,7 +41,7 @@ import io.micronaut.context.annotation.Secondary import jakarta.inject.Named import jakarta.inject.Singleton -interface InputConsumerTask : KillableScope +interface InputConsumerTask : Task /** * Routes @[DestinationStreamAffinedMessage]s by stream to the appropriate channel and @ @@ -68,6 +68,10 @@ class DefaultInputConsumerTask( ) : InputConsumerTask { private val log = KotlinLogging.logger {} + override val isIO = true + override val cancelAtEndOfSync = false + override val killOnSyncFailure = true // ie, stop consuming and let everything else finish + private suspend fun handleRecord( reserved: Reserved, sizeBytes: Long diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt index 182bbe3d9fba6..46899ca6f528d 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt @@ -24,7 +24,7 @@ import io.airbyte.cdk.load.state.ReservationManager import io.airbyte.cdk.load.state.Reserved import io.airbyte.cdk.load.state.TimeWindowTrigger import io.airbyte.cdk.load.task.DestinationTaskLauncher -import io.airbyte.cdk.load.task.KillableScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.task.implementor.FileAggregateMessage import io.airbyte.cdk.load.util.use import io.airbyte.cdk.load.util.withNextAdjacentValue @@ -40,7 +40,7 @@ import kotlin.io.path.deleteExisting import kotlin.io.path.outputStream import kotlinx.coroutines.flow.fold -interface SpillToDiskTask : KillableScope +interface SpillToDiskTask : Task /** * Reads records from the message queue and writes them to disk. Completes once the upstream @@ -60,6 +60,10 @@ class DefaultSpillToDiskTask( ) : SpillToDiskTask { private val log = KotlinLogging.logger {} + override val isIO = true + override val cancelAtEndOfSync = false + override val killOnSyncFailure = false + override suspend fun execute() { val initialAccumulator = fileAccFactory.make() diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/TimedForcedCheckpointFlushTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/TimedForcedCheckpointFlushTask.kt index 92aaf8b12beb1..9ae7225e866d3 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/TimedForcedCheckpointFlushTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/TimedForcedCheckpointFlushTask.kt @@ -10,13 +10,13 @@ import io.airbyte.cdk.load.file.TimeProvider import io.airbyte.cdk.load.message.ChannelMessageQueue import io.airbyte.cdk.load.message.QueueWriter import io.airbyte.cdk.load.state.CheckpointManager -import io.airbyte.cdk.load.task.KillableScope +import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.util.use import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton -interface TimedForcedCheckpointFlushTask : KillableScope +interface TimedForcedCheckpointFlushTask : Task @Singleton @Secondary @@ -28,6 +28,10 @@ class DefaultTimedForcedCheckpointFlushTask( ) : TimedForcedCheckpointFlushTask { private val log = KotlinLogging.logger {} + override val isIO = true + override val cancelAtEndOfSync = true // won't terminate otherwise + override val killOnSyncFailure = false + override suspend fun execute() { val cadenceMs = config.maxCheckpointFlushTimeMs // Wait for the configured time diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/UpdateCheckpointsTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/UpdateCheckpointsTask.kt index c73a8ffd63767..09ec7cdf6ede5 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/UpdateCheckpointsTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/UpdateCheckpointsTask.kt @@ -13,12 +13,12 @@ import io.airbyte.cdk.load.message.StreamCheckpointWrapped import io.airbyte.cdk.load.state.CheckpointManager import io.airbyte.cdk.load.state.Reserved import io.airbyte.cdk.load.state.SyncManager -import io.airbyte.cdk.load.task.InternalScope +import io.airbyte.cdk.load.task.Task import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton -interface UpdateCheckpointsTask : InternalScope +interface UpdateCheckpointsTask : Task @Singleton @Secondary @@ -29,6 +29,11 @@ class DefaultUpdateCheckpointsTask( private val checkpointMessageQueue: MessageQueue> ) : UpdateCheckpointsTask { val log = KotlinLogging.logger {} + + override val isIO = true + override val cancelAtEndOfSync = false + override val killOnSyncFailure = false + override suspend fun execute() { log.info { "Starting to consume checkpoint messages (state) for updating" } checkpointMessageQueue.consume().collect { diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt index 78ca3d796e674..48c47cfb73b14 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt @@ -73,7 +73,6 @@ import org.junit.jupiter.api.Test "DestinationTaskLauncherTest", "MockDestinationConfiguration", "MockDestinationCatalog", - "MockScopeProvider", ] ) class DestinationTaskLauncherTest { diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt index 3c4a978cf1bc4..18c179bb9353e 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt @@ -158,7 +158,7 @@ class DestinationTaskLauncherUTest { destinationTaskLauncher.handleTeardownComplete() coVerify { failStreamTaskFactory.make(any(), e, any()) } - coVerify { taskScopeProvider.launch(match { it.innerTask is FailStreamTask }) } + coVerify { taskScopeProvider.launch(match { it.innerTask is FailStreamTask }, isIO = true) } } @Test @@ -208,7 +208,7 @@ class DestinationTaskLauncherUTest { coEvery { task.execute() } throws Exception("spill to disk task failed") task } - coEvery { taskScopeProvider.launch(any()) } coAnswers + coEvery { taskScopeProvider.launch(any(), any(), any(), any()) } coAnswers { val task = firstArg() task.execute()