Skip to content

Commit

Permalink
Avoid filtering by lastSeenFile where a post process action is config…
Browse files Browse the repository at this point in the history
…ured (#192)

Avoid using lastModified time for filtering in cases where a post process action is configured.

In the CloudSourceFileQueue in cases where a post process action is configured, we no configure the lastSeenFile when retrieving the new list of files. This means the files in the BatchLister will no longer be filtered, and consequently no files will be skipped by the source.
  • Loading branch information
davidsloan authored Jan 9, 2025
1 parent 9689a37 commit 771ed92
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cloud.common.source.files

import cats.implicits.catsSyntaxEitherId
import cats.implicits.catsSyntaxOptionId
import cats.implicits.toShow
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator
import io.lenses.streamreactor.connect.cloud.common.source.config.PostProcessAction
import io.lenses.streamreactor.connect.cloud.common.storage.FileListError
import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata
import io.lenses.streamreactor.connect.cloud.common.storage.ListOfKeysResponse
Expand All @@ -32,15 +32,17 @@ trait SourceFileQueue {
def next(): Either[FileListError, Option[CloudLocation]]
}

case class LastSeenFileTracker[SM <: FileMetadata](var lastSeenFile: Option[SM])

/**
* Blocking processor for queues of operations. Used to ensure consistency.
* Will block any further writes by the current file until the remote has caught up.
*/
class CloudSourceFileQueue[SM <: FileMetadata] private (
private val taskId: ConnectorTaskId,
private val batchListerFn: Option[SM] => Either[FileListError, Option[ListResponse[String, SM]]],
private var files: Seq[CloudLocation],
private var lastSeenFile: Option[SM],
private val taskId: ConnectorTaskId,
private val batchListerFn: Option[SM] => Either[FileListError, Option[ListResponse[String, SM]]],
private var files: Seq[CloudLocation],
private var lastSeenFileTracker: Option[LastSeenFileTracker[SM]],
)(
implicit
cloudLocationValidator: CloudLocationValidator,
Expand All @@ -53,7 +55,7 @@ class CloudSourceFileQueue[SM <: FileMetadata] private (
)(
implicit
cloudLocationValidator: CloudLocationValidator,
) = this(taskId, batchListerFn, Seq.empty, None)
) = this(taskId, batchListerFn, Seq.empty, Some(LastSeenFileTracker[SM](None)))

override def next(): Either[FileListError, Option[CloudLocation]] =
files match {
Expand All @@ -73,10 +75,11 @@ class CloudSourceFileQueue[SM <: FileMetadata] private (

private def retrieveNextFile(
): Either[FileListError, Option[CloudLocation]] = {
val nextBatch: Either[FileListError, Option[ListResponse[String, SM]]] = batchListerFn(lastSeenFile)
val nextBatch: Either[FileListError, Option[ListResponse[String, SM]]] =
batchListerFn(lastSeenFileTracker.flatMap(_.lastSeenFile))
nextBatch.flatMap {
case Some(ListOfKeysResponse(bucket, prefix, value, meta)) =>
lastSeenFile = meta.some
lastSeenFileTracker = lastSeenFileTracker.map(_.copy(lastSeenFile = meta.some))
files = value.map(path =>
CloudLocation(
bucket,
Expand All @@ -98,10 +101,11 @@ class CloudSourceFileQueue[SM <: FileMetadata] private (

object CloudSourceFileQueue {
def from[SM <: FileMetadata](
batchListerFn: Option[SM] => Either[FileListError, Option[ListOfKeysResponse[SM]]],
storageInterface: StorageInterface[SM],
startingFile: CloudLocation,
taskId: ConnectorTaskId,
batchListerFn: Option[SM] => Either[FileListError, Option[ListOfKeysResponse[SM]]],
storageInterface: StorageInterface[SM],
startingFile: CloudLocation,
taskId: ConnectorTaskId,
maybePostProcessAction: Option[PostProcessAction],
)(
implicit
cloudLocationValidator: CloudLocationValidator,
Expand All @@ -114,7 +118,12 @@ object CloudSourceFileQueue {
case _ =>
Option.empty[SM]
}
new CloudSourceFileQueue(taskId, batchListerFn, Seq(startingFile), lastSeen)
new CloudSourceFileQueue[SM](
taskId,
batchListerFn,
Seq(startingFile),
Option.when(maybePostProcessAction.isEmpty)(LastSeenFileTracker[SM](lastSeen)),
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ object ReaderManagerBuilder extends LazyLogging {
storageInterface,
location,
connectorTaskId,
adaptedSbo.postProcessAction,
)
}
sourceFileQueue <- IO.fromEither(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator
import io.lenses.streamreactor.connect.cloud.common.sink.seek.TestFileMetadata
import io.lenses.streamreactor.connect.cloud.common.source.config.DeletePostProcessAction
import io.lenses.streamreactor.connect.cloud.common.storage.FileListError
import io.lenses.streamreactor.connect.cloud.common.storage.ListOfKeysResponse
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData
import org.mockito.ArgumentMatchers._
import org.mockito.InOrder
import org.mockito.MockitoSugar
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
Expand Down Expand Up @@ -83,12 +85,7 @@ class CloudSourceFileQueueTest extends AnyFlatSpec with Matchers with MockitoSug
any[Option[TestFileMetadata]],
)
// file 0 = 0.json
sourceFileQueue.next() should be(Right(Some(fileLocs(0).atLine(-1).withTimestamp(lastModified))))
order.verify(batchListerFn)(none)

// file 1 = 1.json
sourceFileQueue.next() should be(Right(Some(fileLocs(1).atLine(-1).withTimestamp(lastModified))))
order.verifyNoMoreInteractions()
verifyRefreshAndRollover(batchListerFn, sourceFileQueue, order)

// file 2 = 2.json
sourceFileQueue.next() should be(Right(Some(fileLocs(2).atLine(-1).withTimestamp(lastModified))))
Expand Down Expand Up @@ -127,6 +124,7 @@ class CloudSourceFileQueueTest extends AnyFlatSpec with Matchers with MockitoSug
mockStorageIface,
fileLocs(2).atLine(1000).withTimestamp(lastModified),
taskId,
Option.empty,
)

val order = inOrder(batchListerFn)
Expand Down Expand Up @@ -175,4 +173,54 @@ class CloudSourceFileQueueTest extends AnyFlatSpec with Matchers with MockitoSug
sourceFileQueue.next() shouldBe expected
}

"list" should "not pass lastSeenFile to batchListerFn when PostProcessAction is set" in {

val batchListerFn =
mock[Option[TestFileMetadata] => Either[FileListError, Option[ListOfKeysResponse[TestFileMetadata]]]]

val mockStorageIface = mock[StorageInterface[TestFileMetadata]]
when(
mockStorageIface.seekToFile(
anyString(),
anyString(),
any[Option[Instant]],
),
).thenAnswer((_: String, file: String, _: Option[Instant]) => TestFileMetadata(file, lastModified).some)

val sourceFileQueue =
CloudSourceFileQueue.from(
batchListerFn,
mockStorageIface,
fileLocs(1).atLine(1000).withTimestamp(lastModified),
taskId,
Some(new DeletePostProcessAction()),
)

doAnswer(x => listBatch(x)).when(batchListerFn)(
any[Option[TestFileMetadata]],
)

val order = inOrder(batchListerFn)

// we are starting from a previously read file, so we use fileLocs(1) - this doesn't cause a read to the storage layer to discover next file
sourceFileQueue.next() should be(Right(Some(fileLocs(1).atLine(1000).withTimestamp(lastModified))))
order.verifyNoMoreInteractions()

verifyRefreshAndRollover(batchListerFn, sourceFileQueue, order)
verifyRefreshAndRollover(batchListerFn, sourceFileQueue, order)
verifyRefreshAndRollover(batchListerFn, sourceFileQueue, order)

}

private def verifyRefreshAndRollover(
batchListerFn: Option[TestFileMetadata] => Either[FileListError, Option[ListOfKeysResponse[TestFileMetadata]]],
sourceFileQueue: CloudSourceFileQueue[TestFileMetadata],
order: InOrder,
): Unit = {
// REFRESH
sourceFileQueue.next() should be(Right(Some(fileLocs(0).atLine(-1).withTimestamp(lastModified))))
order.verify(batchListerFn)(none)
sourceFileQueue.next() should be(Right(Some(fileLocs(1).atLine(-1).withTimestamp(lastModified))))
order.verifyNoMoreInteractions()
}
}

0 comments on commit 771ed92

Please sign in to comment.