Skip to content

Commit

Permalink
#520 Move some common offset management code to OffsetManagerUtils.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Nov 28, 2024
1 parent 254dafa commit c381cf5
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.bookkeeper

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, max, min}
import org.apache.spark.sql.types.StringType
import za.co.absa.pramen.api.offset.{OffsetType, OffsetValue}
import za.co.absa.pramen.api.sql.SqlGeneratorBase

object OffsetManagerUtils {
def getMinMaxValueFromData(df: DataFrame, offsetColumn: String, offsetType: OffsetType): Option[(OffsetValue, OffsetValue)] = {
if (df.isEmpty) {
None
} else {
val row = df.agg(min(offsetType.getSparkCol(col(offsetColumn)).cast(StringType)),
max(offsetType.getSparkCol(col(offsetColumn))).cast(StringType))
.collect()(0)

val minValue = OffsetValue.fromString(offsetType.dataTypeString, row(0).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(0)}"))
val maxValue = OffsetValue.fromString(offsetType.dataTypeString, row(1).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(1)}"))

SqlGeneratorBase.validateOffsetValue(minValue)
SqlGeneratorBase.validateOffsetValue(maxValue)

Some(minValue, maxValue)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.metastore

import com.typesafe.config.Config
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.functions.{col, lit, max, min}
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.{DateType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
Expand All @@ -27,7 +27,7 @@ import za.co.absa.pramen.api.offset.{DataOffset, OffsetType, OffsetValue}
import za.co.absa.pramen.api.status.TaskRunReason
import za.co.absa.pramen.core.app.config.InfoDateConfig.DEFAULT_DATE_FORMAT
import za.co.absa.pramen.core.app.config.{InfoDateConfig, RuntimeConfig}
import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetCommitRequest}
import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetCommitRequest, OffsetManagerUtils}
import za.co.absa.pramen.core.config.Keys
import za.co.absa.pramen.core.metastore.model.{MetaTable, TrackingTable}
import za.co.absa.pramen.core.metastore.peristence.{MetastorePersistence, TransientJobManager}
Expand Down Expand Up @@ -219,9 +219,9 @@ class MetastoreImpl(appConfig: Config,

override def getCurrentBatch(tableName: String): DataFrame = {
validateTable(tableName)
if (isPostProcessing && isIncremental) {
if (isPostProcessing && isIncremental && !isRerun) {
metastore.getBatch(tableName, infoDate, None)
} else if (isIncremental && !isRerun && !isPostProcessing) {
} else if (isIncremental && !isRerun) {
getIncremental(tableName, outputTable, infoDate)
} else
metastore.getTable(tableName, Option(infoDate), Option(infoDate))
Expand Down Expand Up @@ -355,7 +355,7 @@ class MetastoreImpl(appConfig: Config,
val commitRequests = trackingTables.flatMap { trackingTable =>
val df = getTable(trackingTable.inputTable, Option(trackingTable.infoDate), Option(trackingTable.infoDate))

getMinMaxOffsetFromDf(df, trackingTable.batchIdColumn, trackingTable.currentMaxOffset) match {
getMinMaxOffsetFromMetastoreDf(df, trackingTable.batchIdColumn, trackingTable.currentMaxOffset) match {
case Some((minOffset, maxOffset)) =>
log.info(s"Commited offsets for table '${trackingTable.trackingName}' for '${trackingTable.infoDate}' with min='${minOffset.valueString}', max='${maxOffset.valueString}'.")
Some(OffsetCommitRequest(
Expand Down Expand Up @@ -433,26 +433,15 @@ object MetastoreImpl {
}
}

private[core] def getMinMaxOffsetFromDf(dfIn: DataFrame, batchIdColumn: String, currentMax: Option[OffsetValue]): Option[(OffsetValue, OffsetValue)] = {
private[core] def getMinMaxOffsetFromMetastoreDf(dfIn: DataFrame, batchIdColumn: String, currentMax: Option[OffsetValue]): Option[(OffsetValue, OffsetValue)] = {
val df = currentMax match {
case Some(currentMax) =>
dfIn.filter(col(batchIdColumn) > currentMax.getSparkLit)
case None =>
dfIn
}

if (df.isEmpty) {
None
} else {
val offsetType = if (df.schema.fields.find(_.name == batchIdColumn).get.dataType == StringType) OffsetType.StringType else OffsetType.IntegralType
val row = df.agg(min(offsetType.getSparkCol(col(batchIdColumn)).cast(StringType)),
max(offsetType.getSparkCol(col(batchIdColumn))).cast(StringType))
.collect()(0)

val minValue = OffsetValue.fromString(offsetType.dataTypeString, row(0).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(0)}"))
val maxValue = OffsetValue.fromString(offsetType.dataTypeString, row(1).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(1)}"))
Some(minValue, maxValue)
}
val offsetType = if (df.schema.fields.find(_.name == batchIdColumn).get.dataType == StringType) OffsetType.StringType else OffsetType.IntegralType
OffsetManagerUtils.getMinMaxValueFromData(df, batchIdColumn, offsetType)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession}
import za.co.absa.pramen.api.jobdef.SourceTable
import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType, OffsetValue}
import za.co.absa.pramen.api.sql.SqlGeneratorBase
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType}
import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason}
import za.co.absa.pramen.api.{DataFormat, Reason, Source}
import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest}
import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetManager}
import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetManager, OffsetManagerUtils}
import za.co.absa.pramen.core.metastore.Metastore
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental}
Expand Down Expand Up @@ -156,17 +155,16 @@ class IncrementalIngestionJob(operationDef: OperationDef,
else
metastore.getBatch(outputTable.name, infoDate, None)

if (updatedDf.isEmpty) {
om.rollbackOffsets(req)
} else {
val (minOffset, maxOffset) = getMinMaxOffsetFromDf(updatedDf, offsetInfo)

if (isRerun) {
om.commitRerun(req, minOffset, maxOffset)
} else {
om.commitOffsets(req, minOffset, maxOffset)
}
OffsetManagerUtils.getMinMaxValueFromData(updatedDf, offsetInfo.offsetColumn, offsetInfo.offsetType) match {
case Some((minOffset, maxOffset)) =>
if (isRerun) {
om.commitRerun(req, minOffset, maxOffset)
} else {
om.commitOffsets(req, minOffset, maxOffset)
}
case _ => om.rollbackOffsets(req)
}

statsToReturn
} catch {
case ex: Throwable =>
Expand Down Expand Up @@ -267,7 +265,7 @@ class IncrementalIngestionJob(operationDef: OperationDef,
throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' not found in the output table '${outputTable.name}'. Cannot update uncommitted offsets.")
}

val (newMinOffset, newMaxOffset) = getMinMaxOffsetFromDf(df, offsetInfo)
val (newMinOffset, newMaxOffset) = OffsetManagerUtils.getMinMaxValueFromData(df, offsetInfo.offsetColumn, offsetInfo.offsetType).get

log.warn(s"Fixing uncommitted offsets. New offset to commit for ${outputTable.name} at $infoDate: " +
s"min offset: ${newMinOffset.valueString}, max offset: ${newMaxOffset.valueString}.")
Expand All @@ -290,20 +288,6 @@ class IncrementalIngestionJob(operationDef: OperationDef,
}
}

private[core] def getMinMaxOffsetFromDf(df: DataFrame, offsetInfo: OffsetInfo): (OffsetValue, OffsetValue) = {
val row = df.agg(min(offsetInfo.offsetType.getSparkCol(col(offsetInfo.offsetColumn)).cast(StringType)),
max(offsetInfo.offsetType.getSparkCol(col(offsetInfo.offsetColumn))).cast(StringType))
.collect()(0)

val minValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(0).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(0)}"))
val maxValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(1).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(1)}"))

SqlGeneratorBase.validateOffsetValue(minValue)
SqlGeneratorBase.validateOffsetValue(maxValue)

(minValue, maxValue)
}

private[core] def validateOffsetColumn(df: DataFrame, offsetInfo: OffsetInfo): Unit = {
if (!df.schema.fields.exists(_.name.equalsIgnoreCase(offsetInfo.offsetColumn))) {
throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' not found in the output table '${outputTable.name}'.")
Expand Down

0 comments on commit c381cf5

Please sign in to comment.