diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala index 05df34a22..a4f6b3e45 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala @@ -45,11 +45,11 @@ trait Source extends ExternalChannel { def hasInfoDateColumn(query: Query): Boolean = true /** - * If non-empty, the source + query is configured for incremental ingestion, returns minimum value with type + * If non-empty, the source is configured for incremental ingestion, returns minimum value with type * - * If empty, the source + query can't be used for incremental ingestion. + * If empty, the source can't be used for incremental ingestion. */ - def getOffsetInfo(query: Query): Option[OffsetInfo] = None + def getOffsetInfo: Option[OffsetInfo] = None /** * Validates if the source is okay and the ingestion can proceed. diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlColumnType.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlColumnType.scala index b89d73f3d..8852b3b16 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlColumnType.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlColumnType.scala @@ -44,4 +44,20 @@ object SqlColumnType { case _ => None } } + + def fromStringStrict(s: String, configPath: String = ""): SqlColumnType = { + s.toLowerCase() match { + case DATE.toString => DATE + case DATETIME.toString => DATETIME + case STRING.toString => STRING + case NUMBER.toString => NUMBER + case _ => + val allowedTypes = Seq(DATE.toString, DATETIME.toString, STRING.toString, NUMBER.toString).mkString(", ") + if (configPath.nonEmpty) { + throw new IllegalArgumentException(s"Unknown information type '$s' configured at $configPath. Allowed valued: $allowedTypes.") + } else { + throw new IllegalArgumentException(s"Unknown information type '$s'. Allowed valued: $allowedTypes.") + } + } + } } diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlConfig.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlConfig.scala index eba62e797..6ef6f564b 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlConfig.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/sql/SqlConfig.scala @@ -17,11 +17,13 @@ package za.co.absa.pramen.api.sql import com.typesafe.config.Config +import za.co.absa.pramen.api.offset.OffsetInfo case class SqlConfig( infoDateColumn: String, infoDateType: SqlColumnType, dateFormatApp: String, + offsetInfo: Option[OffsetInfo], identifierQuotingPolicy: QuotingPolicy, sqlGeneratorClass: Option[String], extraConfig: Config diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala index 3ebbfe0cd..32fd5425a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala @@ -74,7 +74,7 @@ class IncrementalIngestionJob(operationDef: OperationDef, private def handleUncommittedOffsets(om: OffsetManager, mt: Metastore, infoDate: LocalDate, uncommittedOffsets: Array[DataOffset]): Unit = { val minOffset = uncommittedOffsets.map(_.minOffset).min - val offsetInfo = source.getOffsetInfo(sourceTable.query).getOrElse(throw new IllegalArgumentException(s"Offset column not defined for the ingestion job '${operationDef.name}', " + + val offsetInfo = source.getOffsetInfo.getOrElse(throw new IllegalArgumentException(s"Offset column not defined for the ingestion job '${operationDef.name}', " + s"query: '${sourceTable.query.query}''")) val df = try { @@ -126,7 +126,7 @@ class IncrementalIngestionJob(operationDef: OperationDef, } override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = { - if (source.getOffsetInfo(sourceTable.query).nonEmpty) { + if (source.getOffsetInfo.nonEmpty) { Reason.Ready } else { Reason.NotReady(s"Offset column is not configured for source '$sourceName' of '${operationDef.name}'") @@ -169,7 +169,7 @@ class IncrementalIngestionJob(operationDef: OperationDef, val om = bookkeeper.getOffsetManager - val offsetInfo = source.getOffsetInfo(sourceTable.query).getOrElse( + val offsetInfo = source.getOffsetInfo.getOrElse( throw new IllegalArgumentException(s"Offset type is not configured for the source '$sourceName' outputting to '${outputTable.name}''") ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbc.scala index 95131a76f..edbf72043 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbc.scala @@ -20,9 +20,9 @@ import com.typesafe.config.Config import org.apache.spark.sql.{DataFrame, SparkSession} import org.slf4j.LoggerFactory import za.co.absa.pramen.api.Query -import za.co.absa.pramen.api.offset.OffsetValue +import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetValue} import za.co.absa.pramen.core.config.Keys -import za.co.absa.pramen.core.reader.model.TableReaderJdbcConfig +import za.co.absa.pramen.core.reader.model.{OffsetInfoParser, TableReaderJdbcConfig} import za.co.absa.pramen.core.utils.{ConfigUtils, JdbcNativeUtils, JdbcSparkUtils, TimeUtils} import java.time.format.DateTimeFormatter diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcBase.scala index 06550fd0e..b82599f30 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcBase.scala @@ -19,6 +19,7 @@ package za.co.absa.pramen.core.reader import com.typesafe.config.Config import org.slf4j.LoggerFactory import za.co.absa.pramen.api.TableReader +import za.co.absa.pramen.api.offset.OffsetInfo import za.co.absa.pramen.api.sql.{SqlColumnType, SqlConfig} import za.co.absa.pramen.core.reader.model.TableReaderJdbcConfig import za.co.absa.pramen.core.sql.SqlGeneratorLoader @@ -44,18 +45,13 @@ abstract class TableReaderJdbcBase(jdbcReaderConfig: TableReaderJdbcConfig, } private[core] def getSqlConfig: SqlConfig = { - val dateFieldType = SqlColumnType.fromString(jdbcReaderConfig.infoDateType) - dateFieldType match { - case Some(infoDateType) => - SqlConfig(jdbcReaderConfig.infoDateColumn, - infoDateType, - jdbcReaderConfig.infoDateFormat, - jdbcReaderConfig.identifierQuotingPolicy, - jdbcReaderConfig.sqlGeneratorClass, - ConfigUtils.getExtraConfig(conf, "sql")) - case None => throw new IllegalArgumentException(s"Unknown info date type specified (${jdbcReaderConfig.infoDateType}). " + - s"It should be one of: date, string, number") - } + SqlConfig(jdbcReaderConfig.infoDateColumn, + jdbcReaderConfig.infoDateType, + jdbcReaderConfig.infoDateFormat, + jdbcReaderConfig.offsetInfoOpt, + jdbcReaderConfig.identifierQuotingPolicy, + jdbcReaderConfig.sqlGeneratorClass, + ConfigUtils.getExtraConfig(conf, "sql")) } protected def logConfiguration(): Unit = { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcNative.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcNative.scala index aa17e3cd1..6d300a5c2 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcNative.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderJdbcNative.scala @@ -20,8 +20,8 @@ import com.typesafe.config.Config import org.apache.spark.sql.{DataFrame, SparkSession} import org.slf4j.LoggerFactory import za.co.absa.pramen.api.Query -import za.co.absa.pramen.api.offset.OffsetValue -import za.co.absa.pramen.core.reader.model.{JdbcConfig, TableReaderJdbcConfig} +import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetValue} +import za.co.absa.pramen.core.reader.model.{JdbcConfig, OffsetInfoParser, TableReaderJdbcConfig} import za.co.absa.pramen.core.utils.{JdbcNativeUtils, JdbcSparkUtils, StringUtils, TimeUtils} import java.time.{Instant, LocalDate} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/model/OffsetInfoParser.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/model/OffsetInfoParser.scala new file mode 100644 index 000000000..384036500 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/model/OffsetInfoParser.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.reader.model + +import com.typesafe.config.Config +import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetValue} +import za.co.absa.pramen.core.utils.ConfigUtils + +object OffsetInfoParser { + val OFFSET_COLUMN_NAME_KEY = "offset.column.name" + val OFFSET_COLUMN_TYPE_KEY = "offset.column.type" + + def fromConfig(conf: Config): Option[OffsetInfo] = { + for { + columnName <- ConfigUtils.getOptionString(conf, OFFSET_COLUMN_NAME_KEY) + columnType <- ConfigUtils.getOptionString(conf, OFFSET_COLUMN_TYPE_KEY) + } yield OffsetInfo(columnName, OffsetValue.getMinimumForType(columnType)) + } + +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/model/TableReaderJdbcConfig.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/model/TableReaderJdbcConfig.scala index db49337d9..1485ca2d4 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/model/TableReaderJdbcConfig.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/model/TableReaderJdbcConfig.scala @@ -18,18 +18,17 @@ package za.co.absa.pramen.core.reader.model import com.typesafe.config.Config import org.slf4j.LoggerFactory -import za.co.absa.pramen.api.sql.QuotingPolicy +import za.co.absa.pramen.api.offset.OffsetInfo +import za.co.absa.pramen.api.sql.{QuotingPolicy, SqlColumnType} import za.co.absa.pramen.core.utils.ConfigUtils case class TableReaderJdbcConfig( jdbcConfig: JdbcConfig, hasInfoDate: Boolean, infoDateColumn: String, - infoDateType: String, + infoDateType: SqlColumnType, infoDateFormat: String = "yyyy-MM-dd", - hasOffsetColumn: Boolean, - offsetColumn: String, - offsetColumnType: String, + offsetInfoOpt: Option[OffsetInfo], limitRecords: Option[Int] = None, saveTimestampsAsDates: Boolean = false, correctDecimalsInSchema: Boolean = false, @@ -49,10 +48,6 @@ object TableReaderJdbcConfig { val INFORMATION_DATE_FORMAT = "information.date.format" val INFORMATION_DATE_APP_FORMAT = "information.date.app.format" - val OFFSET_COLUMN_ENABLED_KEY = "offset.column.enabled" - val OFFSET_COLUMN_NAME_KEY = "offset.column.name" - val OFFSET_COLUMN_TYPE_KEY = "offset.column.type" - val JDBC_SYNC_LIMIT_RECORDS = "limit.records" val JDBC_TIMESTAMPS_AS_DATES = "save.timestamps.as.dates" val CORRECT_DECIMALS_IN_SCHEMA = "correct.decimals.in.schema" @@ -73,13 +68,9 @@ object TableReaderJdbcConfig { INFORMATION_DATE_COLUMN :: INFORMATION_DATE_TYPE :: Nil) } - val hasOffsetColumn = ConfigUtils.getOptionBoolean(conf, OFFSET_COLUMN_ENABLED_KEY).getOrElse(false) + val infoDateTypeStr = ConfigUtils.getOptionString(conf, INFORMATION_DATE_TYPE).getOrElse("date") - if (hasOffsetColumn) { - ConfigUtils.validatePathsExistence(conf, - parent, - OFFSET_COLUMN_NAME_KEY :: OFFSET_COLUMN_TYPE_KEY :: Nil) - } + val infoDateType = SqlColumnType.fromStringStrict(infoDateTypeStr, parent) val saveTimestampsAsDates = ConfigUtils.getOptionBoolean(conf, JDBC_TIMESTAMPS_AS_DATES).getOrElse(false) @@ -89,6 +80,8 @@ object TableReaderJdbcConfig { val infoDateFormat = getInfoDateFormat(conf) + val offsetInfoOpt = OffsetInfoParser.fromConfig(conf) + val identifierQuotingPolicy = ConfigUtils.getOptionString(conf, IDENTIFIER_QUOTING_POLICY) .map(s => QuotingPolicy.fromString(s)) .getOrElse(QuotingPolicy.Auto) @@ -97,11 +90,9 @@ object TableReaderJdbcConfig { jdbcConfig = JdbcConfig.load(conf, parent), hasInfoDate = conf.getBoolean(HAS_INFO_DATE), infoDateColumn = ConfigUtils.getOptionString(conf, INFORMATION_DATE_COLUMN).getOrElse(""), - infoDateType = ConfigUtils.getOptionString(conf, INFORMATION_DATE_TYPE).getOrElse("date"), + infoDateType = infoDateType, infoDateFormat, - hasOffsetColumn, - ConfigUtils.getOptionString(conf, OFFSET_COLUMN_NAME_KEY).getOrElse(""), - ConfigUtils.getOptionString(conf, OFFSET_COLUMN_TYPE_KEY).getOrElse("integral"), + offsetInfoOpt, limitRecords = ConfigUtils.getOptionInt(conf, JDBC_SYNC_LIMIT_RECORDS), saveTimestampsAsDates, correctDecimalsInSchema = ConfigUtils.getOptionBoolean(conf, CORRECT_DECIMALS_IN_SCHEMA).getOrElse(false), diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/source/SparkSource.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/source/SparkSource.scala index 757d6d111..c7eb85861 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/source/SparkSource.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/source/SparkSource.scala @@ -25,6 +25,7 @@ import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetValue} import za.co.absa.pramen.api.sql.SqlColumnType import za.co.absa.pramen.core.config.Keys.KEYS_TO_REDACT import za.co.absa.pramen.core.reader.TableReaderSpark +import za.co.absa.pramen.core.reader.model.OffsetInfoParser import za.co.absa.pramen.core.reader.model.TableReaderJdbcConfig._ import za.co.absa.pramen.core.utils.{ConfigUtils, FsUtils} @@ -36,9 +37,7 @@ class SparkSource(val format: Option[String], val infoDateColumn: String, val infoDateType: SqlColumnType, val infoDateFormat: String, - val hasOffsetColumn: Boolean, - val offsetColumn: String, - val offsetColumnType: String, + val offsetInfo: Option[OffsetInfo], val sourceConfig: Config, val options: Map[String, String])(implicit spark: SparkSession) extends Source { private val log = LoggerFactory.getLogger(this.getClass) @@ -47,12 +46,8 @@ class SparkSource(val format: Option[String], override def hasInfoDateColumn(query: Query): Boolean = hasInfoDateCol - override def getOffsetInfo(query: Query): Option[OffsetInfo] = { - if (hasOffsetColumn) { - Option(OffsetInfo(offsetColumn, OffsetValue.getMinimumForType(offsetColumnType))) - } else { - None - } + override def getOffsetInfo: Option[OffsetInfo] = { + offsetInfo } override def getRecordCount(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = { @@ -72,17 +67,16 @@ class SparkSource(val format: Option[String], } def getReader(query: Query): TableReader = { - val offsetInfoOpt = getOffsetInfo(query) val tableReader = query match { case Query.Table(table) => log.info(s"Using TableReaderSpark to read table: $table") - new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateType, infoDateFormat, offsetInfoOpt, options) + new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateType, infoDateFormat, getOffsetInfo, options) case Query.Sql(sql) => log.info(s"Using TableReaderSpark to read SQL for: $sql") - new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateType, infoDateFormat, offsetInfoOpt, options) + new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateType, infoDateFormat, getOffsetInfo, options) case Query.Path(path) => log.info(s"Using TableReaderSpark to read '$format' from: $path") - new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateType, infoDateFormat, offsetInfoOpt, options) + new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateType, infoDateFormat, getOffsetInfo, options) case other => throw new IllegalArgumentException(s"'${other.name}' is not supported by the Spark source. Use 'path', 'table' or 'sql' instead.") } @@ -144,16 +138,10 @@ object SparkSource extends ExternalChannelFactory[SparkSource] { ("", SqlColumnType.DATE, "") } - val hasOffsetColumn = ConfigUtils.getOptionBoolean(conf, OFFSET_COLUMN_ENABLED_KEY).getOrElse(false) - - val (offsetColumn, offsetDataType) = if (hasOffsetColumn) { - (conf.getString(OFFSET_COLUMN_NAME_KEY), conf.getString(OFFSET_COLUMN_TYPE_KEY)) - } else { - ("", "long") - } + val offsetInfoOpt = OffsetInfoParser.fromConfig(conf) val options = ConfigUtils.getExtraOptions(conf, "option") - new SparkSource(format, schema, hasInfoDate, infoDateColumn, infoDateType, infoDateFormat, hasOffsetColumn, offsetColumn, offsetDataType, conf, options)(spark) + new SparkSource(format, schema, hasInfoDate, infoDateColumn, infoDateType, infoDateFormat, offsetInfoOpt, conf, options)(spark) } } diff --git a/pramen/core/src/test/resources/test/config/incremental_pipeline.conf b/pramen/core/src/test/resources/test/config/incremental_pipeline.conf index 18d856f7e..ff8d13d48 100644 --- a/pramen/core/src/test/resources/test/config/incremental_pipeline.conf +++ b/pramen/core/src/test/resources/test/config/incremental_pipeline.conf @@ -49,7 +49,6 @@ pramen.sources.1 = [ information.date.format = "yyyy-MM-dd" offset.column { - enabled = true name = "id" type = "integral" } diff --git a/pramen/core/src/test/resources/test/config/incremental_pipeline_ts.conf b/pramen/core/src/test/resources/test/config/incremental_pipeline_ts.conf index eee4f9601..b04682d8e 100644 --- a/pramen/core/src/test/resources/test/config/incremental_pipeline_ts.conf +++ b/pramen/core/src/test/resources/test/config/incremental_pipeline_ts.conf @@ -48,7 +48,6 @@ pramen.sources.1 = [ information.date.type = "datetime" offset.column { - enabled = true name = "ts" type = "datetime" } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/DisableCountQueryLongSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/DisableCountQueryLongSuite.scala index dd53059ff..db94a0501 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/DisableCountQueryLongSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/DisableCountQueryLongSuite.scala @@ -36,7 +36,7 @@ class DisableCountQueryLongSuite extends AnyWordSpec with SparkTestBase with Tem |{"id":"3","name":"Jill"} |""".stripMargin - "be able to access inner source configuration" in { + "be able to access inner source configintegruration" in { withTempDirectory("integration_disable_count_query") { tempDir => val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/DummySqlConfigFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/DummySqlConfigFactory.scala index 42d8bfdf0..f042a6023 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/DummySqlConfigFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/DummySqlConfigFactory.scala @@ -17,18 +17,21 @@ package za.co.absa.pramen.core.mocks import com.typesafe.config.ConfigFactory +import za.co.absa.pramen.api.offset.OffsetInfo import za.co.absa.pramen.api.sql.{QuotingPolicy, SqlColumnType, SqlConfig} object DummySqlConfigFactory { def getDummyConfig(infoDateColumn: String = "col", infoDateType: SqlColumnType = SqlColumnType.DATE, dateFormatApp: String = "yyyy-MM-dd", + offsetInfo: Option[OffsetInfo] = None, identifierQuotingPolicy: QuotingPolicy = QuotingPolicy.Auto, sqlGeneratorClass: Option[String] = None ): SqlConfig = SqlConfig( infoDateColumn = infoDateColumn, infoDateType = infoDateType, dateFormatApp = dateFormatApp, + offsetInfo = offsetInfo, identifierQuotingPolicy = identifierQuotingPolicy, sqlGeneratorClass = sqlGeneratorClass, ConfigFactory.empty()) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcSuite.scala index 77aad6f17..20f1f0c50 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderJdbcSuite.scala @@ -21,7 +21,7 @@ import org.mockito.Mockito.{mock, when => whenMock} import org.scalatest.BeforeAndAfterAll import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.api.Query -import za.co.absa.pramen.api.sql.QuotingPolicy +import za.co.absa.pramen.api.sql.{QuotingPolicy, SqlColumnType} import za.co.absa.pramen.core.base.SparkTestBase import za.co.absa.pramen.core.fixtures.RelationalDbFixture import za.co.absa.pramen.core.mocks.SqlGeneratorDummy @@ -62,6 +62,11 @@ class TableReaderJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Spark | information.date.type = "number" | information.date.format = "yyyy-MM-DD" | + | offset.column { + | name = "ts" + | type = "datetime" + | } + | | identifier.quoting.policy = "never" |} |reader_legacy { @@ -104,8 +109,11 @@ class TableReaderJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Spark assert(jdbc.jdbcConfig.user.contains(user)) assert(jdbc.jdbcConfig.password.contains(password)) assert(jdbc.infoDateColumn == "INFO_DATE") - assert(jdbc.infoDateType == "number") + assert(jdbc.infoDateType == SqlColumnType.NUMBER) assert(jdbc.infoDateFormat == "yyyy-MM-DD") + assert(jdbc.offsetInfoOpt.nonEmpty) + assert(jdbc.offsetInfoOpt.get.offsetColumn == "ts") + assert(jdbc.offsetInfoOpt.get.minimalOffset.dataTypeString == "datetime") assert(jdbc.identifierQuotingPolicy == QuotingPolicy.Never) assert(jdbc.sqlGeneratorClass.isEmpty) assert(!jdbc.hasInfoDate) @@ -123,7 +131,7 @@ class TableReaderJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Spark assert(jdbc.jdbcConfig.user.contains(user)) assert(jdbc.jdbcConfig.password.contains(password)) assert(jdbc.infoDateColumn == "INFO_DATE") - assert(jdbc.infoDateType == "date") + assert(jdbc.infoDateType == SqlColumnType.DATE) assert(jdbc.infoDateFormat == "YYYY-MM-dd") assert(jdbc.identifierQuotingPolicy == QuotingPolicy.Auto) assert(jdbc.sqlGeneratorClass.contains("za.co.absa.pramen.core.mocks.SqlGeneratorDummy")) @@ -142,8 +150,9 @@ class TableReaderJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Spark assert(jdbc.jdbcConfig.user.contains(user)) assert(jdbc.jdbcConfig.password.contains(password)) assert(jdbc.infoDateColumn == "INFO_DATE") - assert(jdbc.infoDateType == "date") + assert(jdbc.infoDateType == SqlColumnType.DATE) assert(jdbc.infoDateFormat == "yyyy-MM-dd") + assert(jdbc.offsetInfoOpt.isEmpty) assert(jdbc.hasInfoDate) assert(!jdbc.saveTimestampsAsDates) assert(jdbc.identifierQuotingPolicy == QuotingPolicy.Auto) @@ -213,7 +222,7 @@ class TableReaderJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Spark assert(jdbc.hasInfoDate) assert(jdbc.infoDateColumn == "sync_date") - assert(jdbc.infoDateType == "date") + assert(jdbc.infoDateType == SqlColumnType.DATE) } "getWithRetry" should { @@ -456,10 +465,8 @@ class TableReaderJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Spark .withValue("information.date.column", ConfigValueFactory.fromAnyRef("info_date")) .withValue("information.date.type", ConfigValueFactory.fromAnyRef("not_exist")) - val reader = TableReaderJdbc(testConfig, "reader") - assertThrows[IllegalArgumentException] { - reader.getSqlConfig + TableReaderJdbc(testConfig, "reader") } } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlColumnTypeSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlColumnTypeSuite.scala index c689fa684..fb6a29b0d 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlColumnTypeSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/sql/SqlColumnTypeSuite.scala @@ -41,4 +41,37 @@ class SqlColumnTypeSuite extends AnyWordSpec { } } + "fromStringStrict" should { + "return corresponding type" in { + assert(fromStringStrict("date") == DATE) + assert(fromStringStrict("datetime") == DATETIME) + assert(fromStringStrict("string") == STRING) + assert(fromStringStrict("number") == NUMBER) + } + + "support mixed case values" in { + assert(fromStringStrict("Date") == DATE) + assert(fromStringStrict("DateTimE") == DATETIME) + assert(fromStringStrict("STRING") == STRING) + assert(fromStringStrict("nUmbeR") == NUMBER) + } + + "throw IllegalArgumentException for unknown type with parent" in { + val ex = intercept[IllegalArgumentException] { + fromStringStrict("Hello", "parent") + } + + assert(ex.getMessage.contains("Unknown information type 'Hello' configured at parent. Allowed valued: date, datetime, string, number.")) + } + + "throw IllegalArgumentException for unknown type without parent" in { + val ex = intercept[IllegalArgumentException] { + fromStringStrict("Hello") + } + + assert(ex.getMessage.contains("Unknown information type 'Hello'. Allowed valued: date, datetime, string, number.")) + } + + } + }