Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination-S3-V2: Bug: Do not hang on stream missing state #51043

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ abstract class StreamsCheckpointManager<T> : CheckpointManager<DestinationStream
}

private suspend fun flushGlobalCheckpoints() {
if (globalCheckpoints.isEmpty()) {
log.info { "No global checkpoints to flush" }
return
}
while (!globalCheckpoints.isEmpty()) {
val head = globalCheckpoints.peek()
val allStreamsPersisted =
Expand All @@ -167,12 +171,20 @@ abstract class StreamsCheckpointManager<T> : CheckpointManager<DestinationStream
}

private suspend fun flushStreamCheckpoints() {
val noCheckpointStreams = mutableSetOf<DestinationStream.Descriptor>()
for (stream in catalog.streams) {

val manager = syncManager.getStreamManager(stream.descriptor)
val streamCheckpoints = streamCheckpoints[stream.descriptor] ?: return
val streamCheckpoints = streamCheckpoints[stream.descriptor]
if (streamCheckpoints == null) {
noCheckpointStreams.add(stream.descriptor)

continue
}
while (true) {
val (nextIndex, nextMessage) = streamCheckpoints.peek() ?: break
if (manager.areRecordsPersistedUntil(nextIndex)) {

log.info {
"Flushing checkpoint for stream: ${stream.descriptor} at index: $nextIndex"
}
Expand All @@ -184,6 +196,9 @@ abstract class StreamsCheckpointManager<T> : CheckpointManager<DestinationStream
}
}
}
if (noCheckpointStreams.isNotEmpty()) {
log.info { "No checkpoints for streams: $noCheckpointStreams" }
}
}

private suspend fun validateAndSendMessage(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.state

import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema
import io.airbyte.cdk.load.file.TimeProvider
import io.airbyte.cdk.load.message.CheckpointMessage
import io.mockk.coEvery
import io.mockk.coVerify
import io.mockk.impl.annotations.MockK
import io.mockk.mockk
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test

class CheckpointManagerUTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need to extend this via @ExtendWith(MockKExtension::class) to get the mock annotations working?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess not? I stepped through with the debugger and the catalog was populated with a mock that was behaving as expected.

@MockK(relaxed = true) lateinit var catalog: DestinationCatalog
@MockK(relaxed = true) lateinit var syncManager: SyncManager
private val outputConsumer: suspend (Reserved<CheckpointMessage>) -> Unit =
mockk<suspend (Reserved<CheckpointMessage>) -> Unit>(relaxed = true)
@MockK(relaxed = true) lateinit var timeProvider: TimeProvider

@Test
fun `test checkpoint manager does not ignore ready checkpoint after empty one`() = runTest {
// Populate the mock catalog with two streams in order
val stream1 =
DestinationStream(
DestinationStream.Descriptor("test", "stream1"),
importType = Append,
schema = ObjectTypeWithEmptySchema,
generationId = 10L,
minimumGenerationId = 10L,
syncId = 101L
)
val stream2 =
DestinationStream(
DestinationStream.Descriptor("test", "stream2"),
importType = Append,
schema = ObjectTypeWithEmptySchema,
generationId = 10L,
minimumGenerationId = 10L,
syncId = 101L
)

coEvery { catalog.streams } returns listOf(stream1, stream2)

val checkpointManager =
DefaultCheckpointManager(catalog, syncManager, outputConsumer, timeProvider)

// Add a checkpoint for only the second stream
val message = mockk<Reserved<CheckpointMessage>>(relaxed = true)
checkpointManager.addStreamCheckpoint(stream2.descriptor, 1, message)

// Ensure second stream is data sufficient
val stream2Manager = mockk<StreamManager>(relaxed = true)
coEvery { stream2Manager.areRecordsPersistedUntil(any()) } returns true
coEvery { syncManager.getStreamManager(stream2.descriptor) } returns stream2Manager

// Ensure { first stream is empty } doesn't block sending the second stream
coEvery { outputConsumer.invoke(any()) } returns Unit
checkpointManager.flushReadyCheckpointMessages()
coVerify { outputConsumer.invoke(message) }
}
}
Loading