From 365461d3b9814455d8fefde7bc5c2293deb2b63f Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 12 Dec 2024 08:51:51 +0100 Subject: [PATCH] #520 Add unit test suite for the offset manager utils. --- .../bookkeeper/OffsetManagerUtilsSuite.scala | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerUtilsSuite.scala diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerUtilsSuite.scala new file mode 100644 index 00000000..0c37d541 --- /dev/null +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerUtilsSuite.scala @@ -0,0 +1,62 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.tests.bookkeeper + +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.TimestampType +import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.api.offset.{OffsetType, OffsetValue} +import za.co.absa.pramen.core.base.SparkTestBase +import za.co.absa.pramen.core.bookkeeper.OffsetManagerUtils + +import java.time.Instant + +class OffsetManagerUtilsSuite extends AnyWordSpec with SparkTestBase { + + import spark.implicits._ + + "getMinMaxValueFromData" should { + "work for an integral data type" in { + val df = List(("A", 1), ("B", 2), ("C", 3)).toDF("a", "offset") + + val (minValue, maxValue) = OffsetManagerUtils.getMinMaxValueFromData(df, "offset", OffsetType.IntegralType).get + + assert(minValue == OffsetValue.IntegralValue(1)) + assert(maxValue == OffsetValue.IntegralValue(3)) + } + + "work for an string data type" in { + val df = List(("A", 1), ("B", 2), ("C", 3)).toDF("offset", "b") + + val (minValue, maxValue) = OffsetManagerUtils.getMinMaxValueFromData(df, "offset", OffsetType.StringType).get + + assert(minValue == OffsetValue.StringValue("A")) + assert(maxValue == OffsetValue.StringValue("C")) + } + + "work for an datetime data type" in { + val baseTime = 1733989092000L + val df = List(("A", baseTime), ("B", baseTime + 1000), ("C", baseTime + 1500)).toDF("a", "offset") + .withColumn("offset", (col("offset") / 1000).cast(TimestampType)) + + val (minValue, maxValue) = OffsetManagerUtils.getMinMaxValueFromData(df, "offset", OffsetType.DateTimeType).get + + assert(minValue == OffsetValue.DateTimeValue(Instant.ofEpochMilli(baseTime))) + assert(maxValue == OffsetValue.DateTimeValue(Instant.ofEpochMilli(baseTime + 1500))) + } + } +}