Skip to content

Commit

Permalink
#374 Improve offset management DB operations and add test suites.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Sep 10, 2024
1 parent cb6a02f commit a5f69d8
Show file tree
Hide file tree
Showing 7 changed files with 414 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ import java.time.LocalDate
* with data.
*/
trait OffsetManager {
/**
* Returns offsets for an information date.
*
* If there are uncommitted offsets, they should be handled this way:
* - maximum offset should be derived from data,
* - the latest uncommitted offset should be committed to the latest offset
* - previous uncommitted offsets should be rolled back.
*/
def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset]

/**
* Returns the maximum information date the bookkeeping has offsets for.
*
Expand All @@ -41,31 +51,24 @@ trait OffsetManager {
* if onlyForInfoDate is empty
* offset must be a monotonically increasing field.
*/
def getMaximumDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] = ???

/** Returns all uncommitted offsets. If there are any:
* - maximum offset should be derived from data,
* - the latest uncommitted offset should be committed to the latest offset
* - previous uncommitted offsets should be rolled back.
*/
def getUncommittedOffsets(table: String, onlyForInfoDate: Option[LocalDate]): Seq[DataOffset] = ???
def getMaxInfoDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated]

/**
* Starts an uncommitted offset for an incremental ingestion for a day.
* This can only be done for the latest information date.
*/
def startWriteOffsets(table: String, infoDate: LocalDate, minOffset: OffsetValue): DataOffsetRequest = ???
def startWriteOffsets(table: String, infoDate: LocalDate, minOffset: OffsetValue): DataOffsetRequest

/**
* Commits changes to the table. If maxOffset is
* - the same as minOffset the effect is similar to rollbackOffsets().
* - greater than minOffset, a new entry is created.
* - less than minOffset - an exception will be thrown
*/
def commitOffsets(request: DataOffsetRequest, maxOffset: OffsetValue): Unit = ???
def commitOffsets(request: DataOffsetRequest, maxOffset: OffsetValue): Unit

/**
* Rolls back an offset request
*/
def rollbackOffsets(request: DataOffsetRequest): Unit = ???
def rollbackOffsets(request: DataOffsetRequest): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,81 +18,40 @@ package za.co.absa.pramen.core.bookkeeper

import org.slf4j.LoggerFactory
import slick.jdbc.H2Profile.api._
import slick.jdbc.JdbcBackend.Database
import za.co.absa.pramen.core.bookkeeper.model._
import za.co.absa.pramen.core.utils.{SlickUtils, TimeUtils}
import za.co.absa.pramen.core.utils.SlickUtils

import java.time.{Duration, Instant, LocalDate}
import java.time.{Instant, LocalDate}
import scala.util.control.NonFatal

class OffsetManagerJdbc(db: Database) {
class OffsetManagerJdbc(db: Database) extends OffsetManager {
import za.co.absa.pramen.core.utils.FutureImplicits._

private val log = LoggerFactory.getLogger(this.getClass)

def getMaximumDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] = {
val maxInfoDateOpt = onlyForInfoDate.orElse(getMaximumInfoDate(table))
override def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset] = {
val offsets = getOffsetRecords(table, infoDate)

try {
maxInfoDateOpt.flatMap { infoDate =>
val infoDateStr = infoDate.toString

val query = OffsetRecords.records
.filter(r => r.pramenTableName === table && r.infoDate === infoDateStr && r.committedAt.nonEmpty)
.groupBy { _ => true }
.map {
case (_, group) => (group.map(_.dataType).max, group.map(_.minOffset).min, group.map(_.maxOffset).max)
}

val action = query.result
val sql = action.statements.mkString("; ")

val start = Instant.now
val result = db.run(action).execute()
val finish = Instant.now

val elapsedTime = TimeUtils.prettyPrintElapsedTimeShort(finish.toEpochMilli - start.toEpochMilli)
if (Duration.between(start, finish).toMillis > 1000L) {
log.warn(s"Query execution time: $elapsedTime. SQL: $sql")
} else {
log.debug(s"Query execution time: $elapsedTime. SQL: $sql")
}

if (result.nonEmpty && result.head._1.nonEmpty && result.head._2.nonEmpty && result.head._3.nonEmpty) {
val minOffset = OffsetValue.fromString(result.head._1.get, result.head._2.get)
val maxOffset = OffsetValue.fromString(result.head._1.get, result.head._3.get)
Some(DataOffsetAggregated(
table, infoDate, minOffset, maxOffset
))
} else {
None
}
}
} catch {
case NonFatal(ex) => throw new RuntimeException(s"Unable to read from the offset table.", ex)
if (offsets.isEmpty) {
return Array.empty
}

offsets.map(DataOffset.fromOffsetRecord)
}

def getUncommittedOffsets(table: String, onlyForInfoDate: Option[LocalDate]): Seq[DataOffset] = {
val query = onlyForInfoDate match {
case Some(infoDate) =>
val dateSte = infoDate.toString
OffsetRecords.records
.filter(r => r.pramenTableName === table && r.committedAt.isEmpty && r.infoDate === dateSte)
case None =>
OffsetRecords.records
.filter(r => r.pramenTableName === table && r.committedAt.isEmpty)
}
override def getMaxInfoDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] = {
val maxInfoDateOpt = onlyForInfoDate.orElse(getMaximumInfoDate(table))

try {
SlickUtils.executeQuery[OffsetRecords, OffsetRecord](db, query)
.map(DataOffset.fromOffsetRecord)
maxInfoDateOpt.flatMap { infoDate =>
getMinMaxOffsets(table, infoDate)
}
} catch {
case NonFatal(ex) => throw new RuntimeException(s"Unable to read from the offset table.", ex)
}
}

def startWriteOffsets(table: String, infoDate: LocalDate, minOffset: OffsetValue): DataOffsetRequest = {
override def startWriteOffsets(table: String, infoDate: LocalDate, minOffset: OffsetValue): DataOffsetRequest = {
val createdAt = Instant.now().toEpochMilli

val record = OffsetRecord(table, infoDate.toString, minOffset.dataTypeString, minOffset.valueString, "", createdAt, None)
Expand All @@ -104,7 +63,7 @@ class OffsetManagerJdbc(db: Database) {
DataOffsetRequest(table, infoDate, minOffset, createdAt)
}

def commitOffsets(request: DataOffsetRequest, maxOffset: OffsetValue): Unit = {
override def commitOffsets(request: DataOffsetRequest, maxOffset: OffsetValue): Unit = {
val committedAt = Instant.now().toEpochMilli

db.run(
Expand All @@ -115,16 +74,17 @@ class OffsetManagerJdbc(db: Database) {
).execute()
}

def rollbackOffsets(request: DataOffsetRequest): Unit = {
override def rollbackOffsets(request: DataOffsetRequest): Unit = {
db.run(
OffsetRecords.records
.filter(r => r.pramenTableName === request.tableName && r.infoDate === request.infoDate.toString && r.createdAt === request.createdAt)
.delete
).execute()
}

def getMaximumInfoDate(table: String): Option[LocalDate] = {
private[core] def getMaximumInfoDate(table: String): Option[LocalDate] = {
val query = OffsetRecords.records
.filter(r => r.pramenTableName === table)
.map(_.infoDate).max

try {
Expand All @@ -135,4 +95,42 @@ class OffsetManagerJdbc(db: Database) {
}
}

private[core] def getOffsetRecords(table: String, infoDate: LocalDate): Array[OffsetRecord] = {
val infoDateStr = infoDate.toString
val query = OffsetRecords.records
.filter(r => r.pramenTableName === table && r.infoDate === infoDateStr)

SlickUtils.executeQuery[OffsetRecords, OffsetRecord](db, query)
.toArray[OffsetRecord]
}

private[core] def getMinMaxOffsets(table: String, infoDate: LocalDate): Option[DataOffsetAggregated] = {
val offsets = getOffsetRecords(table, infoDate)

if (offsets.isEmpty) {
return None
}

validateOffsets(table, infoDate, offsets)

val offsetDataType = offsets.head.dataType
val minOffset = OffsetValue.fromString(offsetDataType, offsets.map(_.minOffset).min)
val maxOffset = OffsetValue.fromString(offsetDataType, offsets.map(_.maxOffset).max)

Some(DataOffsetAggregated(table, infoDate, minOffset, maxOffset, offsets.map(DataOffset.fromOffsetRecord)))
}

/**
* Checks offsets for inconsistencies. They include:
* - inconsistent offset value types
*
* @param offsets An array of offset records
*/
private[core] def validateOffsets(table: String, infoDate: LocalDate, offsets: Array[OffsetRecord]): Unit = {
val inconsistentOffsets = offsets.groupBy(_.dataType).keys.toArray.sorted
if (inconsistentOffsets.length > 1) {
throw new RuntimeException(s"Inconsistent offset value types found for $table at $infoDate: ${inconsistentOffsets.mkString(", ")}")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package za.co.absa.pramen.core.bookkeeper.model

import java.time.LocalDate

case class DataOffsetAggregated(tableName: String,
maximumInfoDate: LocalDate,
minimumOffset: OffsetValue,
maximumOffset: OffsetValue
case class DataOffsetAggregated(
tableName: String,
maximumInfoDate: LocalDate,
minimumOffset: OffsetValue,
maximumOffset: OffsetValue,
offsetsForTheDay: Array[DataOffset]
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ class OffsetRecords(tag: Tag) extends Table[OffsetRecord](tag, "offsets") {
def infoDate = column[String]("info_date", O.Length(20))
def dataType = column[String]("data_type", O.Length(20))
def minOffset = column[String]("min_offset", O.Length(64))
def maxOffset = column[String]("min_offset", O.Length(64))
def maxOffset = column[String]("max_offset", O.Length(64))
def createdAt = column[Long]("created_at")
def committedAt = column[Option[Long]]("committed_at")
def * = (pramenTableName, infoDate, dataType, minOffset, maxOffset, createdAt, committedAt) <> (OffsetRecord.tupled, OffsetRecord.unapply)
def idx2 = index("offset_idx_1", (pramenTableName, infoDate, createdAt), unique = true)
def idx1 = index("offset_idx_1", (pramenTableName, infoDate, createdAt), unique = true)
}

object OffsetRecords {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,31 @@ sealed trait OffsetValue {

object OffsetValue {
val LONG_TYPE_STR = "long"
val STRING_TYPE_STR = "string"

case class LongType(value: Long) extends OffsetValue {
override val dataTypeString: String = LONG_TYPE_STR

override def valueString: String = value.toString
}

case class StringType(s: String) extends OffsetValue {
override val dataTypeString: String = STRING_TYPE_STR

override def valueString: String = s
}

def getMinimumForType(dataType: String): OffsetValue = {
dataType match {
case LONG_TYPE_STR => LongType(Long.MinValue)
case STRING_TYPE_STR => StringType("")
case _ => throw new IllegalArgumentException(s"Unknown offset data type: $dataType")
}
}

def fromString(dataType: String, value: String): OffsetValue = dataType match {
case LONG_TYPE_STR => LongType(value.toLong)
case STRING_TYPE_STR => StringType(value)
case _ => throw new IllegalArgumentException(s"Unknown offset data type: $dataType")
}
}
Loading

0 comments on commit a5f69d8

Please sign in to comment.