diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/CheckpointManager.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/CheckpointManager.kt index ee20f281d0b5..6085392bd506 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/CheckpointManager.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/CheckpointManager.kt @@ -147,6 +147,10 @@ abstract class StreamsCheckpointManager : CheckpointManager : CheckpointManager() for (stream in catalog.streams) { + val manager = syncManager.getStreamManager(stream.descriptor) - val streamCheckpoints = streamCheckpoints[stream.descriptor] ?: return + val streamCheckpoints = streamCheckpoints[stream.descriptor] + if (streamCheckpoints == null) { + noCheckpointStreams.add(stream.descriptor) + + continue + } while (true) { val (nextIndex, nextMessage) = streamCheckpoints.peek() ?: break if (manager.areRecordsPersistedUntil(nextIndex)) { + log.info { "Flushing checkpoint for stream: ${stream.descriptor} at index: $nextIndex" } @@ -184,6 +196,9 @@ abstract class StreamsCheckpointManager : CheckpointManager) -> Unit = + mockk) -> Unit>(relaxed = true) + @MockK(relaxed = true) lateinit var timeProvider: TimeProvider + + @Test + fun `test checkpoint manager does not ignore ready checkpoint after empty one`() = runTest { + // Populate the mock catalog with two streams in order + val stream1 = + DestinationStream( + DestinationStream.Descriptor("test", "stream1"), + importType = Append, + schema = ObjectTypeWithEmptySchema, + generationId = 10L, + minimumGenerationId = 10L, + syncId = 101L + ) + val stream2 = + DestinationStream( + DestinationStream.Descriptor("test", "stream2"), + importType = Append, + schema = ObjectTypeWithEmptySchema, + generationId = 10L, + minimumGenerationId = 10L, + syncId = 101L + ) + + coEvery { catalog.streams } returns listOf(stream1, stream2) + + val checkpointManager = + DefaultCheckpointManager(catalog, syncManager, outputConsumer, timeProvider) + + // Add a checkpoint for only the second stream + val message = mockk>(relaxed = true) + checkpointManager.addStreamCheckpoint(stream2.descriptor, 1, message) + + // Ensure second stream is data sufficient + val stream2Manager = mockk(relaxed = true) + coEvery { stream2Manager.areRecordsPersistedUntil(any()) } returns true + coEvery { syncManager.getStreamManager(stream2.descriptor) } returns stream2Manager + + // Ensure { first stream is empty } doesn't block sending the second stream + coEvery { outputConsumer.invoke(any()) } returns Unit + checkpointManager.flushReadyCheckpointMessages() + coVerify { outputConsumer.invoke(message) } + } +}