Skip to content

Commit

Permalink
#374 Fixed Spark 2.4.8 support in integration tests.
Browse files Browse the repository at this point in the history
Apparently, Spark 2.4.8 infers '2021-02-18' as timestamp :O
  • Loading branch information
yruslan committed Sep 18, 2024
1 parent 35bb852 commit 9d9ee04
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ class TableReaderSpark(formatOpt: Option[String],
if (hasInfoDateColumn) {
if (infoDateBegin.equals(infoDateEnd)) {
log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='${dateFormatter.format(infoDateEnd)}'")
getDailyDataFrame(query, infoDateEnd)
val df = getDailyDataFrame(query, infoDateEnd)
df.printSchema()
df.show(false)
df
} else {
log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn BETWEEN '${dateFormatter.format(infoDateBegin)}' AND '${dateFormatter.format(infoDateEnd)}'")
getFilteredDataFrame(query, infoDateBegin, infoDateEnd)
Expand Down Expand Up @@ -90,7 +93,10 @@ class TableReaderSpark(formatOpt: Option[String],
infoDateOpt match {
case Some(infoDate) if hasInfoDateColumn =>
log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate'")
getData(query, infoDate, infoDate, columns)
val df = getData(query, infoDate, infoDate, columns)
df.printSchema()
df.show(false)
df
case _ =>
val offsetCol = offsetInfo.minimalOffset.getSparkCol(col(offsetInfo.offsetColumn))
log.info(s"Reading * FROM ${query.query} WHERE ${offsetInfo.offsetColumn} > ${minOffset.valueString} AND ${offsetInfo.offsetColumn} <= ${maxOffset.valueString}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pramen.sources.1 = [

has.information.date.column = ${has.information.date.column}
information.date.column = "info_date"
information.date.type = "string"
information.date.type = "date"
information.date.format = "yyyy-MM-dd"

offset.column {
Expand All @@ -56,6 +56,8 @@ pramen.sources.1 = [

format = "csv"

schema = ${csv.schema}

option {
header = true
inferSchema = ${infer.schema}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec

val csv1WithInfoDateStr = s"id,name,info_date\n0,Old,${infoDate.minusDays(1)}\n1,John,$infoDate\n2,Jack,$infoDate\n3,Jill,$infoDate\n99,New,${infoDate.plusDays(1)}\n"
val csv2WithInfoDateStr = s"id,name,info_date\n1,John,${infoDate.minusDays(1)}\n4,Mary,$infoDate\n5,Jane,$infoDate\n6,Kate,$infoDate\n999,New2,${infoDate.plusDays(1)}\n"
val csvWithInfoDateSchema = "id int,name string,info_date date"

val expectedWithInfoDate1: String =
"""{"id":1,"name":"John"}
Expand Down Expand Up @@ -641,7 +642,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv"))
fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n")

val conf = getConfig(tempDir, metastoreFormat, inferSchema = false)
val conf = getConfig(tempDir, metastoreFormat, inferSchema = false, csvSchema = "id string,name string")

val exitCode1 = AppRunner.runPipeline(conf)
assert(exitCode1 == 2)
Expand Down Expand Up @@ -690,9 +691,10 @@ class IncrementalPipelineLongFixture extends AnyWordSpec

val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv"))
val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv"))
println(csv1WithInfoDateStr)
fsUtils.writeFile(path1, csv1WithInfoDateStr)

val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true)
val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema)

val exitCode1 = AppRunner.runPipeline(conf)
assert(exitCode1 == 0)
Expand Down Expand Up @@ -737,7 +739,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv"))
fsUtils.writeFile(path1, csv1WithInfoDateStr)

val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true, isTransformerIncremental = false)
val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true, isTransformerIncremental = false, inferSchema = false, csvSchema = csvWithInfoDateSchema)

val exitCode1 = AppRunner.runPipeline(conf)
assert(exitCode1 == 0)
Expand Down Expand Up @@ -782,7 +784,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv"))
fsUtils.writeFile(path1, csv1WithInfoDateStr)

val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true)
val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema)

val exitCode1 = AppRunner.runPipeline(conf)
assert(exitCode1 == 0)
Expand All @@ -800,7 +802,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
fsUtils.deleteFile(path1)
fsUtils.writeFile(path2, csv2WithInfoDateStr)

val conf2 = getConfig(tempDir, metastoreFormat, isRerun = true, hasInfoDate = true)
val conf2 = getConfig(tempDir, metastoreFormat, isRerun = true, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema)
val exitCode2 = AppRunner.runPipeline(conf2)
assert(exitCode2 == 0)

Expand Down Expand Up @@ -834,7 +836,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
fsUtils.writeFile(path1, csv1Str)
fsUtils.writeFile(path2, csv2Str)

val conf1 = getConfig(tempDir, metastoreFormat, hasInfoDate = true)
val conf1 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema)

val exitCode1 = AppRunner.runPipeline(conf1)
assert(exitCode1 == 0)
Expand All @@ -849,7 +851,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
compareText(actualTable1Before, expectedWithInfoDate1)
compareText(actualTable2Before, expectedWithInfoDate1)

val conf2 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, isHistoricalRun = true, historyRunMode = "fill_gaps", useInfoDate = infoDate.plusDays(1))
val conf2 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, isHistoricalRun = true, historyRunMode = "fill_gaps", useInfoDate = infoDate.plusDays(1), inferSchema = false, csvSchema = csvWithInfoDateSchema)
val exitCode2 = AppRunner.runPipeline(conf2)
assert(exitCode2 == 0)

Expand Down Expand Up @@ -971,7 +973,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec

fsUtils.writeFile(path1, csv1Str)

val conf1 = getConfig(tempDir, metastoreFormat, hasInfoDate = true)
val conf1 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = "id int,name string,info_date date")

val exitCode1 = AppRunner.runPipeline(conf1)
assert(exitCode1 == 0)
Expand Down Expand Up @@ -1018,6 +1020,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
isHistoricalRun: Boolean = false,
historyRunMode: String = "force",
inferSchema: Boolean = true,
csvSchema: String = "id int,name string",
hasInfoDate: Boolean = false,
useInfoDate: LocalDate = infoDate,
resource: String = "/test/config/incremental_pipeline.conf"): Config = {
Expand All @@ -1043,6 +1046,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
|$historicalConfigStr
|has.information.date.column = $hasInfoDate
|metastore.format = $metastoreFormat
|csv.schema = "$csvSchema"
|
|pramen.bookkeeping.jdbc {
| driver = "$driver"
Expand Down

0 comments on commit 9d9ee04

Please sign in to comment.