Skip to content

Commit

Permalink
#374 Add an end to end test for incremental processing.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Sep 12, 2024
1 parent d5e2fc0 commit 86b70e8
Show file tree
Hide file tree
Showing 3 changed files with 262 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,22 @@ class IncrementalIngestionJob(operationDef: OperationDef,

val req = om.startWriteOffsets(outputTable.name, infoDate, minimumOffset)

val stats = metastore.saveTable(outputTable.name, infoDate, dfToSave, inputRecordCount, saveModeOverride = Some(SaveMode.Append))
val stats = try {
val statsToReturn = metastore.saveTable(outputTable.name, infoDate, dfToSave, inputRecordCount, saveModeOverride = Some(SaveMode.Append))

val updatedDf = metastore.getTable(outputTable.name, Option(infoDate), Option(infoDate))
val updatedDf = metastore.getCurrentBatch(outputTable.name, infoDate)

if (updatedDf.isEmpty) {
om.rollbackOffsets(req)
} else {
val maxOffset = updatedDf.agg(max(col(offsetInfo.offsetColumn)).cast(StringType)).collect()(0)(0).asInstanceOf[String]
om.commitOffsets(req, OffsetValue.fromString(offsetInfo.minimalOffset.dataTypeString, maxOffset))
if (updatedDf.isEmpty) {
om.rollbackOffsets(req)
} else {
val maxOffset = updatedDf.agg(max(col(offsetInfo.offsetColumn)).cast(StringType)).collect()(0)(0).asInstanceOf[String]
om.commitOffsets(req, OffsetValue.fromString(offsetInfo.minimalOffset.dataTypeString, maxOffset))
}
statsToReturn
} catch {
case ex: Throwable =>
om.rollbackOffsets(req)
throw ex
}

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Copyright 2022 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This variable is expected to be set up by the test suite
#base.path = "/tmp"

pramen {
pipeline.name = "Integration test with a file-based source"

bookkeeping.enabled = true
stop.spark.session = false
}

pramen.metastore {
tables = [
{
name = "table1"
format = "parquet"
path = ${base.path}/table1
},
{
name = "table2"
format = "parquet"
path = ${base.path}/table2
}
]
}

pramen.sources.1 = [
{
name = "spark_source"
factory.class = "za.co.absa.pramen.core.source.SparkSource"

has.information.date.column = false

offset.column {
enabled = true
name = "id"
type = "long"
}

format = "csv"

option {
header = true
}
}
]

pramen.operations = [
{
name = "Sourcing from a folder"
type = "ingestion"
schedule.type = "incremental"

source = "spark_source"

info.date.expr = "@runDate"

tables = [
{
input.path = ${base.path}/landing
output.metastore.table = table1
}
]
},
{
name = "Running a transformer"
type = "transformation"

class = "za.co.absa.pramen.core.transformers.IdentityTransformer"
schedule.type = "incremental"

output.table = "table2"

dependencies = [
{
tables = [ table1 ]
date.from = "@infoDate"
optional = true # Since no bookkeeping available the table will be seen as empty for the dependency manager
}
]

option {
input.table = "table1"
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.integration

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.hadoop.fs.Path
import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import za.co.absa.pramen.core.base.SparkTestBase
import za.co.absa.pramen.core.fixtures.{RelationalDbFixture, TempDirFixture, TextComparisonFixture}
import za.co.absa.pramen.core.rdb.PramenDb
import za.co.absa.pramen.core.reader.JdbcUrlSelectorImpl
import za.co.absa.pramen.core.reader.model.JdbcConfig
import za.co.absa.pramen.core.runner.AppRunner
import za.co.absa.pramen.core.utils.{FsUtils, JdbcNativeUtils, ResourceUtils}

import java.time.LocalDate

class IncrementalPipelineSuite extends AnyWordSpec
with SparkTestBase
with RelationalDbFixture
with BeforeAndAfter
with BeforeAndAfterAll
with TempDirFixture
with TextComparisonFixture {

val jdbcConfig: JdbcConfig = JdbcConfig(driver, Some(url), Nil, None, Some(user), Some(password))
val pramenDb: PramenDb = PramenDb(jdbcConfig)

before {
pramenDb.rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;")
pramenDb.setupDatabase()
}

override def afterAll(): Unit = {
pramenDb.close()
super.afterAll()
}

private val infoDate = LocalDate.of(2021, 2, 18)

"File-based sourcing" should {
val expected1 =
"""{"id":"1","name":"John"}
|{"id":"2","name":"Jack"}
|{"id":"3","name":"Jill"}
|""".stripMargin

val expected2 =
"""{"id":"1","name":"John"}
|{"id":"2","name":"Jack"}
|{"id":"3","name":"Jill"}
|{"id":"4","name":"Mary"}
|{"id":"5","name":"Jane"}
|{"id":"6","name":"Kate"}
|""".stripMargin

"work end to end as a normal run" in {
withTempDirectory("integration_file_based") { tempDir =>
val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir)

val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv"))
val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv"))
fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n")

val conf = getConfig(tempDir)

val exitCode1 = AppRunner.runPipeline(conf)
assert(exitCode1 == 0)

val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate")
val df1 = spark.read.parquet(table1Path.toString)
val actual1 = df1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n")

compareText(actual1, expected1)

fsUtils.deleteFile(path1)
fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n")

val exitCode2 = AppRunner.runPipeline(conf)
assert(exitCode2 == 0)

val df2 = spark.read.parquet(table1Path.toString)

val batchIds = df2.select("pramen_batchid").distinct().collect()

assert(batchIds.length == 2)

val actual2 = df2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n")

compareText(actual2, expected2)

JdbcNativeUtils.withResultSet(new JdbcUrlSelectorImpl(jdbcConfig), "SELECT * FROM \"offsets\"", 1) { rs =>
val mt = rs.getMetaData

for (i <- 1 to mt.getColumnCount) {
print(mt.getColumnName(i) + "\t")
}
println("")

while (rs.next()) {
for (i <- 1 to mt.getColumnCount) {
print(rs.getString(i) + "\t")
}
println("")
}
}
}
}
}

def getConfig(basePath: String, isRerun: Boolean = false, useDataFrame: Boolean = false): Config = {
val configContents = ResourceUtils.getResourceString("/test/config/incremental_pipeline.conf")
val basePathEscaped = basePath.replace("\\", "\\\\")

val conf = ConfigFactory.parseString(
s"""base.path = "$basePathEscaped"
|use.dataframe = $useDataFrame
|pramen.runtime.is.rerun = $isRerun
|pramen.current.date = "$infoDate"
|
|pramen.bookkeeping.jdbc {
| driver = "$driver"
| url = "$url"
| user = "$user"
| password = "$password"
|}
|$configContents
|""".stripMargin
).withFallback(ConfigFactory.load())
.resolve()

conf
}

}

0 comments on commit 86b70e8

Please sign in to comment.