Skip to content

Commit

Permalink
#520 Add unit test suite for the new incremental methods of the raw f…
Browse files Browse the repository at this point in the history
…iles source.
  • Loading branch information
yruslan committed Dec 12, 2024
1 parent 24fc1de commit bc5019e
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ class RawFileSource(val sourceConfig: Config,
}

override def getDataIncremental(query: Query, onlyForInfoDate: Option[LocalDate], offsetFromOpt: Option[OffsetValue], offsetToOpt: Option[OffsetValue], columns: Seq[String]): SourceResult = {
if (onlyForInfoDate.isEmpty) {
throw new IllegalArgumentException("Incremental ingestion of raw files requires an info date to be part of filename pattern.")
}

val filePaths = getPaths(query, onlyForInfoDate.get, onlyForInfoDate.get)
val list = filePaths.map { path =>
(path.getPath.toString, path.getPath.getName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import com.typesafe.config.{Config, ConfigFactory}
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterAll
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType, OffsetValue}
import za.co.absa.pramen.api.{Query, Source}
import za.co.absa.pramen.core.ExternalChannelFactoryReflect
import za.co.absa.pramen.core.base.SparkTestBase
import za.co.absa.pramen.core.fixtures.TempDirFixture
import za.co.absa.pramen.core.metastore.peristence.MetastorePersistenceRaw.RAW_OFFSET_FIELD_KEY
import za.co.absa.pramen.core.source.RawFileSource.FILE_PATTERN_CASE_SENSITIVE_KEY
import za.co.absa.pramen.core.utils.{FsUtils, LocalFsUtils}

Expand Down Expand Up @@ -292,6 +294,79 @@ class RawFileSourceSuite extends AnyWordSpec with BeforeAndAfterAll with TempDir
}
}

"getOffsetInfo" should {
"always return the same column name and type" in {
val source = new RawFileSource(emptyConfig, null)(spark)

assert(source.getOffsetInfo.contains(OffsetInfo(RAW_OFFSET_FIELD_KEY, OffsetType.StringType)))
}
}

"getDataIncremental" should {
val conf = ConfigFactory.parseString(
s"$FILE_PATTERN_CASE_SENSITIVE_KEY = false"
)
val source = new RawFileSource(conf, null)(spark)

"work with from offset" in {
val files = source.getDataIncremental(Query.Path(filesPattern.toString), Some(infoDate), Some(OffsetValue.StringValue("FILE_TEST_2022-02-18_A.dat")), None, Seq.empty)

val fileNames = files.filesRead

assert(fileNames.length == 2)
assert(fileNames.contains("FILE_TEST_2022-02-18_B.dat"))
assert(fileNames.contains("FILE_TEST_2022-02-18_C.DAT"))
}

"work with to offset" in {
val files = source.getDataIncremental(Query.Path(filesPattern.toString), Some(infoDate), None, Some(OffsetValue.StringValue("FILE_TEST_2022-02-18_B.dat")), Seq.empty)

val fileNames = files.filesRead

assert(fileNames.length == 2)
assert(fileNames.contains("FILE_TEST_2022-02-18_A.dat"))
assert(fileNames.contains("FILE_TEST_2022-02-18_B.dat"))
}

"work from and to offset" in {
val fileB = Some(OffsetValue.StringValue("FILE_TEST_2022-02-18_B.dat"))
val files = source.getDataIncremental(Query.Path(filesPattern.toString), Some(infoDate), fileB, fileB, Seq.empty)

val fileNames = files.filesRead

assert(fileNames.length == 1)
assert(fileNames.contains("FILE_TEST_2022-02-18_B.dat"))
}

"work from and to offset and an empty data frame" in {
val fileB = Some(OffsetValue.StringValue("FILE_TEST_2022-02-18_D.dat"))
val files = source.getDataIncremental(Query.Path(filesPattern.toString), Some(infoDate), fileB, fileB, Seq.empty)

val fileNames = files.filesRead

assert(fileNames.isEmpty)
}

"work when no offsets are specified" in {
val files = source.getDataIncremental(Query.Path(filesPattern.toString), Some(infoDate), None, None, Seq.empty)

val fileNames = files.filesRead

assert(fileNames.length == 3)
assert(fileNames.contains("FILE_TEST_2022-02-18_A.dat"))
assert(fileNames.contains("FILE_TEST_2022-02-18_B.dat"))
assert(fileNames.contains("FILE_TEST_2022-02-18_C.DAT"))
}

"throw an exception when info date is not passed" in {
val ex = intercept[IllegalArgumentException] {
source.getDataIncremental(Query.Path(filesPattern.toString), None, None, None, Seq.empty)
}

assert(ex.getMessage == "Incremental ingestion of raw files requires an info date to be part of filename pattern.")
}
}

"getPaths" should {
"work for a directory" in {
val source = new RawFileSource(emptyConfig, null)(spark)
Expand Down

0 comments on commit bc5019e

Please sign in to comment.