Skip to content

Commit

Permalink
Destination-S3-V2: Bug: Do not hang on stream missing state (#51043)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Jan 10, 2025
1 parent 05b6d0b commit 64f591f
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ abstract class StreamsCheckpointManager<T> : CheckpointManager<DestinationStream
}

private suspend fun flushGlobalCheckpoints() {
if (globalCheckpoints.isEmpty()) {
log.info { "No global checkpoints to flush" }
return
}
while (!globalCheckpoints.isEmpty()) {
val head = globalCheckpoints.peek()
val allStreamsPersisted =
Expand All @@ -167,12 +171,20 @@ abstract class StreamsCheckpointManager<T> : CheckpointManager<DestinationStream
}

private suspend fun flushStreamCheckpoints() {
val noCheckpointStreams = mutableSetOf<DestinationStream.Descriptor>()
for (stream in catalog.streams) {

val manager = syncManager.getStreamManager(stream.descriptor)
val streamCheckpoints = streamCheckpoints[stream.descriptor] ?: return
val streamCheckpoints = streamCheckpoints[stream.descriptor]
if (streamCheckpoints == null) {
noCheckpointStreams.add(stream.descriptor)

continue
}
while (true) {
val (nextIndex, nextMessage) = streamCheckpoints.peek() ?: break
if (manager.areRecordsPersistedUntil(nextIndex)) {

log.info {
"Flushing checkpoint for stream: ${stream.descriptor} at index: $nextIndex"
}
Expand All @@ -184,6 +196,9 @@ abstract class StreamsCheckpointManager<T> : CheckpointManager<DestinationStream
}
}
}
if (noCheckpointStreams.isNotEmpty()) {
log.info { "No checkpoints for streams: $noCheckpointStreams" }
}
}

private suspend fun validateAndSendMessage(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.state

import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema
import io.airbyte.cdk.load.file.TimeProvider
import io.airbyte.cdk.load.message.CheckpointMessage
import io.mockk.coEvery
import io.mockk.coVerify
import io.mockk.impl.annotations.MockK
import io.mockk.mockk
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test

class CheckpointManagerUTest {
@MockK(relaxed = true) lateinit var catalog: DestinationCatalog
@MockK(relaxed = true) lateinit var syncManager: SyncManager
private val outputConsumer: suspend (Reserved<CheckpointMessage>) -> Unit =
mockk<suspend (Reserved<CheckpointMessage>) -> Unit>(relaxed = true)
@MockK(relaxed = true) lateinit var timeProvider: TimeProvider

@Test
fun `test checkpoint manager does not ignore ready checkpoint after empty one`() = runTest {
// Populate the mock catalog with two streams in order
val stream1 =
DestinationStream(
DestinationStream.Descriptor("test", "stream1"),
importType = Append,
schema = ObjectTypeWithEmptySchema,
generationId = 10L,
minimumGenerationId = 10L,
syncId = 101L
)
val stream2 =
DestinationStream(
DestinationStream.Descriptor("test", "stream2"),
importType = Append,
schema = ObjectTypeWithEmptySchema,
generationId = 10L,
minimumGenerationId = 10L,
syncId = 101L
)

coEvery { catalog.streams } returns listOf(stream1, stream2)

val checkpointManager =
DefaultCheckpointManager(catalog, syncManager, outputConsumer, timeProvider)

// Add a checkpoint for only the second stream
val message = mockk<Reserved<CheckpointMessage>>(relaxed = true)
checkpointManager.addStreamCheckpoint(stream2.descriptor, 1, message)

// Ensure second stream is data sufficient
val stream2Manager = mockk<StreamManager>(relaxed = true)
coEvery { stream2Manager.areRecordsPersistedUntil(any()) } returns true
coEvery { syncManager.getStreamManager(stream2.descriptor) } returns stream2Manager

// Ensure { first stream is empty } doesn't block sending the second stream
coEvery { outputConsumer.invoke(any()) } returns Unit
checkpointManager.flushReadyCheckpointMessages()
coVerify { outputConsumer.invoke(message) }
}
}

0 comments on commit 64f591f

Please sign in to comment.