diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala index 76a33ac7c..7b8de7ce9 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala @@ -32,6 +32,16 @@ import java.time.LocalDate * with data. */ trait OffsetManager { + /** + * Returns offsets for an information date. + * + * If there are uncommitted offsets, they should be handled this way: + * - maximum offset should be derived from data, + * - the latest uncommitted offset should be committed to the latest offset + * - previous uncommitted offsets should be rolled back. + */ + def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset] + /** * Returns the maximum information date the bookkeeping has offsets for. * @@ -41,20 +51,13 @@ trait OffsetManager { * if onlyForInfoDate is empty * offset must be a monotonically increasing field. */ - def getMaximumDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] = ??? - - /** Returns all uncommitted offsets. If there are any: - * - maximum offset should be derived from data, - * - the latest uncommitted offset should be committed to the latest offset - * - previous uncommitted offsets should be rolled back. - */ - def getUncommittedOffsets(table: String, onlyForInfoDate: Option[LocalDate]): Seq[DataOffset] = ??? + def getMaxInfoDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] /** * Starts an uncommitted offset for an incremental ingestion for a day. * This can only be done for the latest information date. */ - def startWriteOffsets(table: String, infoDate: LocalDate, minOffset: OffsetValue): DataOffsetRequest = ??? + def startWriteOffsets(table: String, infoDate: LocalDate, minOffset: OffsetValue): DataOffsetRequest /** * Commits changes to the table. If maxOffset is @@ -62,10 +65,10 @@ trait OffsetManager { * - greater than minOffset, a new entry is created. * - less than minOffset - an exception will be thrown */ - def commitOffsets(request: DataOffsetRequest, maxOffset: OffsetValue): Unit = ??? + def commitOffsets(request: DataOffsetRequest, maxOffset: OffsetValue): Unit /** * Rolls back an offset request */ - def rollbackOffsets(request: DataOffsetRequest): Unit = ??? + def rollbackOffsets(request: DataOffsetRequest): Unit } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala index cdd4f75c5..f6fdea7b7 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala @@ -18,81 +18,40 @@ package za.co.absa.pramen.core.bookkeeper import org.slf4j.LoggerFactory import slick.jdbc.H2Profile.api._ -import slick.jdbc.JdbcBackend.Database import za.co.absa.pramen.core.bookkeeper.model._ -import za.co.absa.pramen.core.utils.{SlickUtils, TimeUtils} +import za.co.absa.pramen.core.utils.SlickUtils -import java.time.{Duration, Instant, LocalDate} +import java.time.{Instant, LocalDate} import scala.util.control.NonFatal -class OffsetManagerJdbc(db: Database) { +class OffsetManagerJdbc(db: Database) extends OffsetManager { import za.co.absa.pramen.core.utils.FutureImplicits._ private val log = LoggerFactory.getLogger(this.getClass) - def getMaximumDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] = { - val maxInfoDateOpt = onlyForInfoDate.orElse(getMaximumInfoDate(table)) + override def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset] = { + val offsets = getOffsetRecords(table, infoDate) - try { - maxInfoDateOpt.flatMap { infoDate => - val infoDateStr = infoDate.toString - - val query = OffsetRecords.records - .filter(r => r.pramenTableName === table && r.infoDate === infoDateStr && r.committedAt.nonEmpty) - .groupBy { _ => true } - .map { - case (_, group) => (group.map(_.dataType).max, group.map(_.minOffset).min, group.map(_.maxOffset).max) - } - - val action = query.result - val sql = action.statements.mkString("; ") - - val start = Instant.now - val result = db.run(action).execute() - val finish = Instant.now - - val elapsedTime = TimeUtils.prettyPrintElapsedTimeShort(finish.toEpochMilli - start.toEpochMilli) - if (Duration.between(start, finish).toMillis > 1000L) { - log.warn(s"Query execution time: $elapsedTime. SQL: $sql") - } else { - log.debug(s"Query execution time: $elapsedTime. SQL: $sql") - } - - if (result.nonEmpty && result.head._1.nonEmpty && result.head._2.nonEmpty && result.head._3.nonEmpty) { - val minOffset = OffsetValue.fromString(result.head._1.get, result.head._2.get) - val maxOffset = OffsetValue.fromString(result.head._1.get, result.head._3.get) - Some(DataOffsetAggregated( - table, infoDate, minOffset, maxOffset - )) - } else { - None - } - } - } catch { - case NonFatal(ex) => throw new RuntimeException(s"Unable to read from the offset table.", ex) + if (offsets.isEmpty) { + return Array.empty } + + offsets.map(DataOffset.fromOffsetRecord) } - def getUncommittedOffsets(table: String, onlyForInfoDate: Option[LocalDate]): Seq[DataOffset] = { - val query = onlyForInfoDate match { - case Some(infoDate) => - val dateSte = infoDate.toString - OffsetRecords.records - .filter(r => r.pramenTableName === table && r.committedAt.isEmpty && r.infoDate === dateSte) - case None => - OffsetRecords.records - .filter(r => r.pramenTableName === table && r.committedAt.isEmpty) - } + override def getMaxInfoDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] = { + val maxInfoDateOpt = onlyForInfoDate.orElse(getMaximumInfoDate(table)) try { - SlickUtils.executeQuery[OffsetRecords, OffsetRecord](db, query) - .map(DataOffset.fromOffsetRecord) + maxInfoDateOpt.flatMap { infoDate => + getMinMaxOffsets(table, infoDate) + } } catch { case NonFatal(ex) => throw new RuntimeException(s"Unable to read from the offset table.", ex) } } - def startWriteOffsets(table: String, infoDate: LocalDate, minOffset: OffsetValue): DataOffsetRequest = { + override def startWriteOffsets(table: String, infoDate: LocalDate, minOffset: OffsetValue): DataOffsetRequest = { val createdAt = Instant.now().toEpochMilli val record = OffsetRecord(table, infoDate.toString, minOffset.dataTypeString, minOffset.valueString, "", createdAt, None) @@ -104,7 +63,7 @@ class OffsetManagerJdbc(db: Database) { DataOffsetRequest(table, infoDate, minOffset, createdAt) } - def commitOffsets(request: DataOffsetRequest, maxOffset: OffsetValue): Unit = { + override def commitOffsets(request: DataOffsetRequest, maxOffset: OffsetValue): Unit = { val committedAt = Instant.now().toEpochMilli db.run( @@ -115,7 +74,7 @@ class OffsetManagerJdbc(db: Database) { ).execute() } - def rollbackOffsets(request: DataOffsetRequest): Unit = { + override def rollbackOffsets(request: DataOffsetRequest): Unit = { db.run( OffsetRecords.records .filter(r => r.pramenTableName === request.tableName && r.infoDate === request.infoDate.toString && r.createdAt === request.createdAt) @@ -123,8 +82,9 @@ class OffsetManagerJdbc(db: Database) { ).execute() } - def getMaximumInfoDate(table: String): Option[LocalDate] = { + private[core] def getMaximumInfoDate(table: String): Option[LocalDate] = { val query = OffsetRecords.records + .filter(r => r.pramenTableName === table) .map(_.infoDate).max try { @@ -135,4 +95,42 @@ class OffsetManagerJdbc(db: Database) { } } + private[core] def getOffsetRecords(table: String, infoDate: LocalDate): Array[OffsetRecord] = { + val infoDateStr = infoDate.toString + val query = OffsetRecords.records + .filter(r => r.pramenTableName === table && r.infoDate === infoDateStr) + + SlickUtils.executeQuery[OffsetRecords, OffsetRecord](db, query) + .toArray[OffsetRecord] + } + + private[core] def getMinMaxOffsets(table: String, infoDate: LocalDate): Option[DataOffsetAggregated] = { + val offsets = getOffsetRecords(table, infoDate) + + if (offsets.isEmpty) { + return None + } + + validateOffsets(table, infoDate, offsets) + + val offsetDataType = offsets.head.dataType + val minOffset = OffsetValue.fromString(offsetDataType, offsets.map(_.minOffset).min) + val maxOffset = OffsetValue.fromString(offsetDataType, offsets.map(_.maxOffset).max) + + Some(DataOffsetAggregated(table, infoDate, minOffset, maxOffset, offsets.map(DataOffset.fromOffsetRecord))) + } + + /** + * Checks offsets for inconsistencies. They include: + * - inconsistent offset value types + * + * @param offsets An array of offset records + */ + private[core] def validateOffsets(table: String, infoDate: LocalDate, offsets: Array[OffsetRecord]): Unit = { + val inconsistentOffsets = offsets.groupBy(_.dataType).keys.toArray.sorted + if (inconsistentOffsets.length > 1) { + throw new RuntimeException(s"Inconsistent offset value types found for $table at $infoDate: ${inconsistentOffsets.mkString(", ")}") + } + } + } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/DataOffsetAggregated.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/DataOffsetAggregated.scala index e4e0c4d71..ef993b884 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/DataOffsetAggregated.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/DataOffsetAggregated.scala @@ -18,8 +18,10 @@ package za.co.absa.pramen.core.bookkeeper.model import java.time.LocalDate -case class DataOffsetAggregated(tableName: String, - maximumInfoDate: LocalDate, - minimumOffset: OffsetValue, - maximumOffset: OffsetValue +case class DataOffsetAggregated( + tableName: String, + maximumInfoDate: LocalDate, + minimumOffset: OffsetValue, + maximumOffset: OffsetValue, + offsetsForTheDay: Array[DataOffset] ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala index 74d45e1b3..659a05ff2 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala @@ -23,11 +23,11 @@ class OffsetRecords(tag: Tag) extends Table[OffsetRecord](tag, "offsets") { def infoDate = column[String]("info_date", O.Length(20)) def dataType = column[String]("data_type", O.Length(20)) def minOffset = column[String]("min_offset", O.Length(64)) - def maxOffset = column[String]("min_offset", O.Length(64)) + def maxOffset = column[String]("max_offset", O.Length(64)) def createdAt = column[Long]("created_at") def committedAt = column[Option[Long]]("committed_at") def * = (pramenTableName, infoDate, dataType, minOffset, maxOffset, createdAt, committedAt) <> (OffsetRecord.tupled, OffsetRecord.unapply) - def idx2 = index("offset_idx_1", (pramenTableName, infoDate, createdAt), unique = true) + def idx1 = index("offset_idx_1", (pramenTableName, infoDate, createdAt), unique = true) } object OffsetRecords { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetValue.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetValue.scala index 37910c95e..b3fba03ca 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetValue.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetValue.scala @@ -24,6 +24,7 @@ sealed trait OffsetValue { object OffsetValue { val LONG_TYPE_STR = "long" + val STRING_TYPE_STR = "string" case class LongType(value: Long) extends OffsetValue { override val dataTypeString: String = LONG_TYPE_STR @@ -31,15 +32,23 @@ object OffsetValue { override def valueString: String = value.toString } + case class StringType(s: String) extends OffsetValue { + override val dataTypeString: String = STRING_TYPE_STR + + override def valueString: String = s + } + def getMinimumForType(dataType: String): OffsetValue = { dataType match { case LONG_TYPE_STR => LongType(Long.MinValue) + case STRING_TYPE_STR => StringType("") case _ => throw new IllegalArgumentException(s"Unknown offset data type: $dataType") } } def fromString(dataType: String, value: String): OffsetValue = dataType match { case LONG_TYPE_STR => LongType(value.toLong) + case STRING_TYPE_STR => StringType(value) case _ => throw new IllegalArgumentException(s"Unknown offset data type: $dataType") } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala new file mode 100644 index 000000000..c653e9d4e --- /dev/null +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala @@ -0,0 +1,249 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.tests.bookkeeper + +import org.scalatest.wordspec.AnyWordSpec +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} +import za.co.absa.pramen.core.bookkeeper.model.OffsetValue +import za.co.absa.pramen.core.bookkeeper.{OffsetManager, OffsetManagerJdbc} +import za.co.absa.pramen.core.fixtures.RelationalDbFixture +import za.co.absa.pramen.core.rdb.PramenDb +import za.co.absa.pramen.core.reader.model.JdbcConfig + +import java.time.{Instant, LocalDate} + +class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with BeforeAndAfter with BeforeAndAfterAll { + val jdbcConfig: JdbcConfig = JdbcConfig(driver, Some(url), Nil, None, Some(user), Some(password)) + val pramenDb: PramenDb = PramenDb(jdbcConfig) + + private val infoDate = LocalDate.of(2023, 8, 25) + + before { + pramenDb.rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + pramenDb.setupDatabase() + } + + override def afterAll(): Unit = { + pramenDb.close() + super.afterAll() + } + + def getOffsetManager: OffsetManager = { + new OffsetManagerJdbc(pramenDb.slickDb) + } + + "getOffsets" should { + "should return an empty array if nothing to return " in { + val om = getOffsetManager + + val actual = om.getOffsets("table1", infoDate) + + assert(actual.isEmpty) + } + + "return uncommitted offsets" in { + val now = Instant.now().toEpochMilli + val nextHour = now + 3600000 + val om = getOffsetManager + + om.startWriteOffsets("table1", infoDate, OffsetValue.getMinimumForType("long")) + + val actualNonEmpty = om.getOffsets("table1", infoDate) + val actualEmpty1 = om.getOffsets("table1", infoDate.plusDays(1)) + val actualEmpty2 = om.getOffsets("table1", infoDate.minusDays(1)) + + assert(actualEmpty1.isEmpty) + assert(actualEmpty2.isEmpty) + + assert(actualNonEmpty.nonEmpty) + assert(actualNonEmpty.length == 1) + + val offset = actualNonEmpty.head + + assert(offset.infoDate == infoDate) + assert(offset.createdAt >= now) + assert(offset.createdAt < nextHour) + assert(offset.committedAt.isEmpty) + assert(offset.minOffset == OffsetValue.getMinimumForType("long")) + assert(offset.maxOffset.isEmpty) + } + + "return committed offsets" in { + val now = Instant.now().toEpochMilli + val nextHour = now + 3600000 + val om = getOffsetManager + + val transactionReference = om.startWriteOffsets("table1", infoDate, OffsetValue.getMinimumForType("long")) + Thread.sleep(10) + om.commitOffsets(transactionReference, OffsetValue.LongType(100)) + + val actualNonEmpty = om.getOffsets("table1", infoDate) + val actualEmpty1 = om.getOffsets("table1", infoDate.plusDays(1)) + val actualEmpty2 = om.getOffsets("table1", infoDate.minusDays(1)) + + assert(actualEmpty1.isEmpty) + assert(actualEmpty2.isEmpty) + + assert(actualNonEmpty.nonEmpty) + assert(actualNonEmpty.length == 1) + + val offset = actualNonEmpty.head + + assert(offset.infoDate == infoDate) + assert(offset.createdAt >= now) + assert(offset.createdAt < nextHour) + assert(offset.committedAt.get >= now) + assert(offset.committedAt.get < nextHour) + assert(offset.committedAt.get > actualNonEmpty.head.createdAt) + assert(offset.minOffset == OffsetValue.getMinimumForType("long")) + assert(offset.maxOffset.get == OffsetValue.fromString("long", "100")) + } + } + + "getMaximumDateAndOffset" should { + "return None when no offsets have loaded yet" in { + val om = getOffsetManager + + val actual = om.getMaxInfoDateAndOffset("table1", None) + + assert(actual.isEmpty) + } + + "return None when no offsets have loaded yet for the info date" in { + val om = getOffsetManager + + val actual = om.getMaxInfoDateAndOffset("table1", Some(infoDate)) + + assert(actual.isEmpty) + } + + "return the maximum info date and offsets " in { + val om = getOffsetManager + + val t1 = om.startWriteOffsets("table1", infoDate, OffsetValue.getMinimumForType("long")) + om.commitOffsets(t1, OffsetValue.LongType(100)) + + val t2 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(100)) + om.commitOffsets(t2, OffsetValue.LongType(200)) + + val t3 = om.startWriteOffsets("table1", infoDate.plusDays(1), OffsetValue.LongType(200)) + om.commitOffsets(t3, OffsetValue.LongType(300)) + + om.getOffsets("table1", infoDate.plusDays(1)).foreach(println) + + val summaryMultiDay = om.getMaxInfoDateAndOffset("table1", None) + val summarySingleDay = om.getMaxInfoDateAndOffset("table1", Some(infoDate)) + + assert(summaryMultiDay.isDefined) + assert(summarySingleDay.isDefined) + + val actualM = summaryMultiDay.get + val actual1 = summarySingleDay.get + + assert(actualM.tableName == "table1") + assert(actualM.maximumInfoDate == infoDate.plusDays(1)) + assert(actualM.minimumOffset == OffsetValue.LongType(200)) + assert(actualM.maximumOffset == OffsetValue.LongType(300)) + assert(actualM.offsetsForTheDay.length == 1) + + assert(actual1.tableName == "table1") + assert(actual1.maximumInfoDate == infoDate) + assert(actual1.minimumOffset == OffsetValue.getMinimumForType("long")) + assert(actual1.maximumOffset == OffsetValue.LongType(200)) + assert(actual1.offsetsForTheDay.length == 2) + } + } + + "committing offsets" should { + "work for empty offsets" in { + val om = getOffsetManager + + val t1 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(0)) + om.commitOffsets(t1, OffsetValue.LongType(100)) + + val offsets = om.getOffsets("table1", infoDate) + + assert(offsets.length == 1) + + val offset = offsets.head + assert(offset.tableName == "table1") + assert(offset.infoDate == infoDate) + assert(offset.minOffset == OffsetValue.LongType(0)) + assert(offset.maxOffset.get == OffsetValue.LongType(100)) + } + + "work for non-empty offsets" in { + val om = getOffsetManager + + val t1 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(0)) + om.commitOffsets(t1, OffsetValue.LongType(100)) + + val t2 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(100)) + om.commitOffsets(t2, OffsetValue.LongType(200)) + + val offsets = om.getOffsets("table1", infoDate).sortBy(_.minOffset.valueString.toLong) + + assert(offsets.length == 2) + + val offset1 = offsets.head + assert(offset1.tableName == "table1") + assert(offset1.infoDate == infoDate) + assert(offset1.minOffset == OffsetValue.LongType(0)) + assert(offset1.maxOffset.get == OffsetValue.LongType(100)) + + val offset2 = offsets(1) + assert(offset2.tableName == "table1") + assert(offset2.infoDate == infoDate) + assert(offset2.minOffset == OffsetValue.LongType(100)) + assert(offset2.maxOffset.get == OffsetValue.LongType(200)) + } + } + + "rollback offsets" should { + "work for empty offsets" in { + val om = getOffsetManager + + val t1 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(0)) + om.rollbackOffsets(t1) + + val offsets = om.getOffsets("table1", infoDate) + + assert(offsets.isEmpty) + } + + "work for non-empty offsets" in { + val om = getOffsetManager + + val t1 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(0)) + om.commitOffsets(t1, OffsetValue.LongType(100)) + + val t2 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(100)) + om.rollbackOffsets(t2) + + val offsets = om.getOffsets("table1", infoDate).sortBy(_.minOffset.valueString.toLong) + + assert(offsets.length == 1) + + val offset1 = offsets.head + assert(offset1.tableName == "table1") + assert(offset1.infoDate == infoDate) + assert(offset1.minOffset == OffsetValue.LongType(0)) + assert(offset1.maxOffset.get == OffsetValue.LongType(100)) + } + } + +} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetValueSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetValueSuite.scala new file mode 100644 index 000000000..2d859a49e --- /dev/null +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetValueSuite.scala @@ -0,0 +1,77 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.tests.bookkeeper + +import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.core.bookkeeper.model.OffsetValue + +class OffsetValueSuite extends AnyWordSpec { + "OffsetValue" should { + "be able to create a LongType instance" in { + val offsetValue = OffsetValue.LongType(42) + assert(offsetValue.dataTypeString == "long") + assert(offsetValue.valueString == "42") + } + + "be able to create a StringType instance" in { + val offsetValue = OffsetValue.StringType("foo") + assert(offsetValue.dataTypeString == "string") + assert(offsetValue.valueString == "foo") + } + } + + "getMinimumForType" should { + "be able to get minimum value for long type" in { + val offsetValue = OffsetValue.getMinimumForType("long") + assert(offsetValue.dataTypeString == "long") + assert(offsetValue.valueString == Long.MinValue.toString) + } + + "be able to get minimum value for string type" in { + val offsetValue = OffsetValue.getMinimumForType("string") + assert(offsetValue.dataTypeString == "string") + assert(offsetValue.valueString == "") + } + + "throw an exception when trying to get minimum value for an unknown type" in { + assertThrows[IllegalArgumentException] { + OffsetValue.getMinimumForType("unknown") + } + } + } + + "fromString" should { + "be able to create a LongType instance from a string" in { + val offsetValue = OffsetValue.fromString("long", "42") + assert(offsetValue.dataTypeString == "long") + assert(offsetValue.valueString == "42") + } + + "be able to create a StringType instance from a string" in { + val offsetValue = OffsetValue.fromString("string", "foo") + assert(offsetValue.dataTypeString == "string") + assert(offsetValue.valueString == "foo") + } + + "throw an exception when trying to create an instance from a string with an unknown type" in { + assertThrows[IllegalArgumentException] { + OffsetValue.fromString("unknown", "42") + } + } + } + +}