Skip to content

Commit

Permalink
#374 Implement offset management DB operations.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Sep 9, 2024
1 parent e187ea0 commit cb6a02f
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,23 @@ import java.time.LocalDate
* with data.
*/
trait OffsetManager {
/** Returns the maximum information date the bookkeeping has offsets for. */
def getMaximumDateAndOffset(table: String): Option[DataOffsetAggregated] = ???
/**
* Returns the maximum information date the bookkeeping has offsets for.
*
* If onlyForInfoDate is not empty, offset management is being processed for each info date individually, e.g.
* info_date + offset column is monotonic.
*
* if onlyForInfoDate is empty
* offset must be a monotonically increasing field.
*/
def getMaximumDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] = ???

/** Returns all uncommitted offsets. If there are any:
* - maximum offset should be derived from data,
* - the latest uncommitted offset should be committed to the latest offset
* - previous uncommitted offsets should be rolled back.
*/
def getUncommittedOffsets(table: String): Seq[DataOffset] = ???
def getUncommittedOffsets(table: String, onlyForInfoDate: Option[LocalDate]): Seq[DataOffset] = ???

/**
* Starts an uncommitted offset for an incremental ingestion for a day.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.bookkeeper

import org.slf4j.LoggerFactory
import slick.jdbc.H2Profile.api._
import slick.jdbc.JdbcBackend.Database
import za.co.absa.pramen.core.bookkeeper.model._
import za.co.absa.pramen.core.utils.{SlickUtils, TimeUtils}

import java.time.{Duration, Instant, LocalDate}
import scala.util.control.NonFatal

class OffsetManagerJdbc(db: Database) {
import za.co.absa.pramen.core.utils.FutureImplicits._

private val log = LoggerFactory.getLogger(this.getClass)

def getMaximumDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] = {
val maxInfoDateOpt = onlyForInfoDate.orElse(getMaximumInfoDate(table))

try {
maxInfoDateOpt.flatMap { infoDate =>
val infoDateStr = infoDate.toString

val query = OffsetRecords.records
.filter(r => r.pramenTableName === table && r.infoDate === infoDateStr && r.committedAt.nonEmpty)
.groupBy { _ => true }
.map {
case (_, group) => (group.map(_.dataType).max, group.map(_.minOffset).min, group.map(_.maxOffset).max)
}

val action = query.result
val sql = action.statements.mkString("; ")

val start = Instant.now
val result = db.run(action).execute()
val finish = Instant.now

val elapsedTime = TimeUtils.prettyPrintElapsedTimeShort(finish.toEpochMilli - start.toEpochMilli)
if (Duration.between(start, finish).toMillis > 1000L) {
log.warn(s"Query execution time: $elapsedTime. SQL: $sql")
} else {
log.debug(s"Query execution time: $elapsedTime. SQL: $sql")
}

if (result.nonEmpty && result.head._1.nonEmpty && result.head._2.nonEmpty && result.head._3.nonEmpty) {
val minOffset = OffsetValue.fromString(result.head._1.get, result.head._2.get)
val maxOffset = OffsetValue.fromString(result.head._1.get, result.head._3.get)
Some(DataOffsetAggregated(
table, infoDate, minOffset, maxOffset
))
} else {
None
}
}
} catch {
case NonFatal(ex) => throw new RuntimeException(s"Unable to read from the offset table.", ex)
}
}

def getUncommittedOffsets(table: String, onlyForInfoDate: Option[LocalDate]): Seq[DataOffset] = {
val query = onlyForInfoDate match {
case Some(infoDate) =>
val dateSte = infoDate.toString
OffsetRecords.records
.filter(r => r.pramenTableName === table && r.committedAt.isEmpty && r.infoDate === dateSte)
case None =>
OffsetRecords.records
.filter(r => r.pramenTableName === table && r.committedAt.isEmpty)
}

try {
SlickUtils.executeQuery[OffsetRecords, OffsetRecord](db, query)
.map(DataOffset.fromOffsetRecord)
} catch {
case NonFatal(ex) => throw new RuntimeException(s"Unable to read from the offset table.", ex)
}
}

def startWriteOffsets(table: String, infoDate: LocalDate, minOffset: OffsetValue): DataOffsetRequest = {
val createdAt = Instant.now().toEpochMilli

val record = OffsetRecord(table, infoDate.toString, minOffset.dataTypeString, minOffset.valueString, "", createdAt, None)

db.run(
OffsetRecords.records += record
).execute()

DataOffsetRequest(table, infoDate, minOffset, createdAt)
}

def commitOffsets(request: DataOffsetRequest, maxOffset: OffsetValue): Unit = {
val committedAt = Instant.now().toEpochMilli

db.run(
OffsetRecords.records
.filter(r => r.pramenTableName === request.tableName && r.infoDate === request.infoDate.toString && r.createdAt === request.createdAt)
.map(r => (r.maxOffset, r.committedAt))
.update((maxOffset.valueString, Some(committedAt)))
).execute()
}

def rollbackOffsets(request: DataOffsetRequest): Unit = {
db.run(
OffsetRecords.records
.filter(r => r.pramenTableName === request.tableName && r.infoDate === request.infoDate.toString && r.createdAt === request.createdAt)
.delete
).execute()
}

def getMaximumInfoDate(table: String): Option[LocalDate] = {
val query = OffsetRecords.records
.map(_.infoDate).max

try {
SlickUtils.executeMaxString(db, query)
.map(LocalDate.parse)
} catch {
case NonFatal(ex) => throw new RuntimeException(s"Unable to read from the offset table.", ex)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,22 @@ case class DataOffset(tableName: String,
createdAt: Long,
committedAt: Option[Long]
)

object DataOffset {
def fromOffsetRecord(r: OffsetRecord): DataOffset = {
val maxOffsetOpt = if (r.maxOffset.nonEmpty) {
Option(OffsetValue.fromString(r.dataType, r.maxOffset))
} else {
None
}

DataOffset(
r.pramenTableName,
LocalDate.parse(r.infoDate),
OffsetValue.fromString(r.dataType, r.minOffset),
maxOffsetOpt,
r.createdAtMilli,
r.committedAtMilli
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ import java.time.LocalDate

case class DataOffsetAggregated(tableName: String,
maximumInfoDate: LocalDate,
minimumOffset: OffsetValue,
maximumOffset: OffsetValue
)
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ object SlickUtils {

private val log = LoggerFactory.getLogger(this.getClass)

private val WARN_IF_LONGER_MS = 1000L

/**
* Synchronously executes a query against a JDBC connection.
* - Handles exceptions
Expand All @@ -47,7 +49,7 @@ object SlickUtils {
val finish = Instant.now

val elapsedTime = TimeUtils.prettyPrintElapsedTimeShort(finish.toEpochMilli - start.toEpochMilli)
if (Duration.between(start, finish).toMillis > 1000L) {
if (Duration.between(start, finish).toMillis > WARN_IF_LONGER_MS) {
log.warn(s"Query execution time: $elapsedTime. SQL: $sql")
} else {
log.debug(s"Query execution time: $elapsedTime. SQL: $sql")
Expand Down Expand Up @@ -79,7 +81,39 @@ object SlickUtils {
val finish = Instant.now

val elapsedTime = TimeUtils.prettyPrintElapsedTimeShort(finish.toEpochMilli - start.toEpochMilli)
if (Duration.between(start, finish).toMillis > 1000L) {
if (Duration.between(start, finish).toMillis > WARN_IF_LONGER_MS) {
log.warn(s"Count execution time: $elapsedTime. SQL: $sql")
} else {
log.debug(s"Count execution time: $elapsedTime. SQL: $sql")
}

result
} catch {
case NonFatal(ex) => throw new RuntimeException(s"Error executing an SQL query: $sql", ex)
}
}

/**
* Synchronously executes a MAX query against a string column and JDBC connection.
* - Handles exceptions
* - Logs SQL statements generated by the framework
* - Measures execution time
*
* @param db A database
* @param rep A result of query
* @return The result of the query.
*/
def executeMaxString(db: Database, rep: Rep[Option[String]]): Option[String] = {
val action = rep.result
val sql = action.statements.mkString("; ")

try {
val start = Instant.now
val result = db.run(action).execute()
val finish = Instant.now

val elapsedTime = TimeUtils.prettyPrintElapsedTimeShort(finish.toEpochMilli - start.toEpochMilli)
if (Duration.between(start, finish).toMillis > WARN_IF_LONGER_MS) {
log.warn(s"Count execution time: $elapsedTime. SQL: $sql")
} else {
log.debug(s"Count execution time: $elapsedTime. SQL: $sql")
Expand Down

0 comments on commit cb6a02f

Please sign in to comment.