From 9d9ee04e7973bfc371d166bbd12eb4307ba6dd63 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 18 Sep 2024 16:50:01 +0200 Subject: [PATCH] #374 Fixed Spark 2.4.8 support in integration tests. Apparently, Spark 2.4.8 infers '2021-02-18' as timestamp :O --- .../pramen/core/reader/TableReaderSpark.scala | 10 ++++++++-- .../test/config/incremental_pipeline.conf | 4 +++- .../IncrementalPipelineLongFixture.scala | 20 +++++++++++-------- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala index d18d25ca9..e928fb347 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala @@ -60,7 +60,10 @@ class TableReaderSpark(formatOpt: Option[String], if (hasInfoDateColumn) { if (infoDateBegin.equals(infoDateEnd)) { log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='${dateFormatter.format(infoDateEnd)}'") - getDailyDataFrame(query, infoDateEnd) + val df = getDailyDataFrame(query, infoDateEnd) + df.printSchema() + df.show(false) + df } else { log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn BETWEEN '${dateFormatter.format(infoDateBegin)}' AND '${dateFormatter.format(infoDateEnd)}'") getFilteredDataFrame(query, infoDateBegin, infoDateEnd) @@ -90,7 +93,10 @@ class TableReaderSpark(formatOpt: Option[String], infoDateOpt match { case Some(infoDate) if hasInfoDateColumn => log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate'") - getData(query, infoDate, infoDate, columns) + val df = getData(query, infoDate, infoDate, columns) + df.printSchema() + df.show(false) + df case _ => val offsetCol = offsetInfo.minimalOffset.getSparkCol(col(offsetInfo.offsetColumn)) log.info(s"Reading * FROM ${query.query} WHERE ${offsetInfo.offsetColumn} > ${minOffset.valueString} AND ${offsetInfo.offsetColumn} <= ${maxOffset.valueString}") diff --git a/pramen/core/src/test/resources/test/config/incremental_pipeline.conf b/pramen/core/src/test/resources/test/config/incremental_pipeline.conf index 8474a4f34..18d856f7e 100644 --- a/pramen/core/src/test/resources/test/config/incremental_pipeline.conf +++ b/pramen/core/src/test/resources/test/config/incremental_pipeline.conf @@ -45,7 +45,7 @@ pramen.sources.1 = [ has.information.date.column = ${has.information.date.column} information.date.column = "info_date" - information.date.type = "string" + information.date.type = "date" information.date.format = "yyyy-MM-dd" offset.column { @@ -56,6 +56,8 @@ pramen.sources.1 = [ format = "csv" + schema = ${csv.schema} + option { header = true inferSchema = ${infer.schema} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala index 63b3dbd26..244805765 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala @@ -85,6 +85,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec val csv1WithInfoDateStr = s"id,name,info_date\n0,Old,${infoDate.minusDays(1)}\n1,John,$infoDate\n2,Jack,$infoDate\n3,Jill,$infoDate\n99,New,${infoDate.plusDays(1)}\n" val csv2WithInfoDateStr = s"id,name,info_date\n1,John,${infoDate.minusDays(1)}\n4,Mary,$infoDate\n5,Jane,$infoDate\n6,Kate,$infoDate\n999,New2,${infoDate.plusDays(1)}\n" + val csvWithInfoDateSchema = "id int,name string,info_date date" val expectedWithInfoDate1: String = """{"id":1,"name":"John"} @@ -641,7 +642,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - val conf = getConfig(tempDir, metastoreFormat, inferSchema = false) + val conf = getConfig(tempDir, metastoreFormat, inferSchema = false, csvSchema = "id string,name string") val exitCode1 = AppRunner.runPipeline(conf) assert(exitCode1 == 2) @@ -690,9 +691,10 @@ class IncrementalPipelineLongFixture extends AnyWordSpec val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + println(csv1WithInfoDateStr) fsUtils.writeFile(path1, csv1WithInfoDateStr) - val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true) + val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema) val exitCode1 = AppRunner.runPipeline(conf) assert(exitCode1 == 0) @@ -737,7 +739,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) fsUtils.writeFile(path1, csv1WithInfoDateStr) - val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true, isTransformerIncremental = false) + val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true, isTransformerIncremental = false, inferSchema = false, csvSchema = csvWithInfoDateSchema) val exitCode1 = AppRunner.runPipeline(conf) assert(exitCode1 == 0) @@ -782,7 +784,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) fsUtils.writeFile(path1, csv1WithInfoDateStr) - val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true) + val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema) val exitCode1 = AppRunner.runPipeline(conf) assert(exitCode1 == 0) @@ -800,7 +802,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec fsUtils.deleteFile(path1) fsUtils.writeFile(path2, csv2WithInfoDateStr) - val conf2 = getConfig(tempDir, metastoreFormat, isRerun = true, hasInfoDate = true) + val conf2 = getConfig(tempDir, metastoreFormat, isRerun = true, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema) val exitCode2 = AppRunner.runPipeline(conf2) assert(exitCode2 == 0) @@ -834,7 +836,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec fsUtils.writeFile(path1, csv1Str) fsUtils.writeFile(path2, csv2Str) - val conf1 = getConfig(tempDir, metastoreFormat, hasInfoDate = true) + val conf1 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema) val exitCode1 = AppRunner.runPipeline(conf1) assert(exitCode1 == 0) @@ -849,7 +851,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec compareText(actualTable1Before, expectedWithInfoDate1) compareText(actualTable2Before, expectedWithInfoDate1) - val conf2 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, isHistoricalRun = true, historyRunMode = "fill_gaps", useInfoDate = infoDate.plusDays(1)) + val conf2 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, isHistoricalRun = true, historyRunMode = "fill_gaps", useInfoDate = infoDate.plusDays(1), inferSchema = false, csvSchema = csvWithInfoDateSchema) val exitCode2 = AppRunner.runPipeline(conf2) assert(exitCode2 == 0) @@ -971,7 +973,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec fsUtils.writeFile(path1, csv1Str) - val conf1 = getConfig(tempDir, metastoreFormat, hasInfoDate = true) + val conf1 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = "id int,name string,info_date date") val exitCode1 = AppRunner.runPipeline(conf1) assert(exitCode1 == 0) @@ -1018,6 +1020,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec isHistoricalRun: Boolean = false, historyRunMode: String = "force", inferSchema: Boolean = true, + csvSchema: String = "id int,name string", hasInfoDate: Boolean = false, useInfoDate: LocalDate = infoDate, resource: String = "/test/config/incremental_pipeline.conf"): Config = { @@ -1043,6 +1046,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec |$historicalConfigStr |has.information.date.column = $hasInfoDate |metastore.format = $metastoreFormat + |csv.schema = "$csvSchema" | |pramen.bookkeeping.jdbc { | driver = "$driver"