diff --git a/pramen/core/src/test/resources/test/config/incremental_pipeline.conf b/pramen/core/src/test/resources/test/config/incremental_pipeline.conf index 8474a4f34..18d856f7e 100644 --- a/pramen/core/src/test/resources/test/config/incremental_pipeline.conf +++ b/pramen/core/src/test/resources/test/config/incremental_pipeline.conf @@ -45,7 +45,7 @@ pramen.sources.1 = [ has.information.date.column = ${has.information.date.column} information.date.column = "info_date" - information.date.type = "string" + information.date.type = "date" information.date.format = "yyyy-MM-dd" offset.column { @@ -56,6 +56,8 @@ pramen.sources.1 = [ format = "csv" + schema = ${csv.schema} + option { header = true inferSchema = ${infer.schema} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala index 63b3dbd26..244805765 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala @@ -85,6 +85,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec val csv1WithInfoDateStr = s"id,name,info_date\n0,Old,${infoDate.minusDays(1)}\n1,John,$infoDate\n2,Jack,$infoDate\n3,Jill,$infoDate\n99,New,${infoDate.plusDays(1)}\n" val csv2WithInfoDateStr = s"id,name,info_date\n1,John,${infoDate.minusDays(1)}\n4,Mary,$infoDate\n5,Jane,$infoDate\n6,Kate,$infoDate\n999,New2,${infoDate.plusDays(1)}\n" + val csvWithInfoDateSchema = "id int,name string,info_date date" val expectedWithInfoDate1: String = """{"id":1,"name":"John"} @@ -641,7 +642,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - val conf = getConfig(tempDir, metastoreFormat, inferSchema = false) + val conf = getConfig(tempDir, metastoreFormat, inferSchema = false, csvSchema = "id string,name string") val exitCode1 = AppRunner.runPipeline(conf) assert(exitCode1 == 2) @@ -690,9 +691,10 @@ class IncrementalPipelineLongFixture extends AnyWordSpec val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + println(csv1WithInfoDateStr) fsUtils.writeFile(path1, csv1WithInfoDateStr) - val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true) + val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema) val exitCode1 = AppRunner.runPipeline(conf) assert(exitCode1 == 0) @@ -737,7 +739,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) fsUtils.writeFile(path1, csv1WithInfoDateStr) - val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true, isTransformerIncremental = false) + val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true, isTransformerIncremental = false, inferSchema = false, csvSchema = csvWithInfoDateSchema) val exitCode1 = AppRunner.runPipeline(conf) assert(exitCode1 == 0) @@ -782,7 +784,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) fsUtils.writeFile(path1, csv1WithInfoDateStr) - val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true) + val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema) val exitCode1 = AppRunner.runPipeline(conf) assert(exitCode1 == 0) @@ -800,7 +802,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec fsUtils.deleteFile(path1) fsUtils.writeFile(path2, csv2WithInfoDateStr) - val conf2 = getConfig(tempDir, metastoreFormat, isRerun = true, hasInfoDate = true) + val conf2 = getConfig(tempDir, metastoreFormat, isRerun = true, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema) val exitCode2 = AppRunner.runPipeline(conf2) assert(exitCode2 == 0) @@ -834,7 +836,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec fsUtils.writeFile(path1, csv1Str) fsUtils.writeFile(path2, csv2Str) - val conf1 = getConfig(tempDir, metastoreFormat, hasInfoDate = true) + val conf1 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema) val exitCode1 = AppRunner.runPipeline(conf1) assert(exitCode1 == 0) @@ -849,7 +851,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec compareText(actualTable1Before, expectedWithInfoDate1) compareText(actualTable2Before, expectedWithInfoDate1) - val conf2 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, isHistoricalRun = true, historyRunMode = "fill_gaps", useInfoDate = infoDate.plusDays(1)) + val conf2 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, isHistoricalRun = true, historyRunMode = "fill_gaps", useInfoDate = infoDate.plusDays(1), inferSchema = false, csvSchema = csvWithInfoDateSchema) val exitCode2 = AppRunner.runPipeline(conf2) assert(exitCode2 == 0) @@ -971,7 +973,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec fsUtils.writeFile(path1, csv1Str) - val conf1 = getConfig(tempDir, metastoreFormat, hasInfoDate = true) + val conf1 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = "id int,name string,info_date date") val exitCode1 = AppRunner.runPipeline(conf1) assert(exitCode1 == 0) @@ -1018,6 +1020,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec isHistoricalRun: Boolean = false, historyRunMode: String = "force", inferSchema: Boolean = true, + csvSchema: String = "id int,name string", hasInfoDate: Boolean = false, useInfoDate: LocalDate = infoDate, resource: String = "/test/config/incremental_pipeline.conf"): Config = { @@ -1043,6 +1046,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec |$historicalConfigStr |has.information.date.column = $hasInfoDate |metastore.format = $metastoreFormat + |csv.schema = "$csvSchema" | |pramen.bookkeeping.jdbc { | driver = "$driver"