Skip to content

Commit

Permalink
#374 Add a table in bookeeping for storing offsets.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Sep 6, 2024
1 parent f8b7f5f commit 61bcaff
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.bookkeeper.model

case class OffsetRecord(pramenTableName: String,
infoDate: String, /* Use String to workaround serialization issues */
dataType: String,
minOffset: String,
maxOffset: String,
committed: Boolean,
lastUpdated: Long /* Use epoch seconds */
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.bookkeeper.model

import slick.jdbc.H2Profile.api._

class OffsetRecords(tag: Tag) extends Table[OffsetRecord](tag, "offsets") {
def pramenTableName = column[String]("table_name", O.Length(128))
def infoDate = column[String]("info_date", O.Length(20))
def dataType = column[String]("data_type", O.Length(20))
def minOffset = column[String]("min_offset", O.Length(128))
def maxOffset = column[String]("min_offset", O.Length(128))
def committed = column[Boolean]("committed")
def lastUpdated = column[Long]("last_updated")
def * = (pramenTableName, infoDate, dataType, minOffset, maxOffset, committed, lastUpdated) <> (OffsetRecord.tupled, OffsetRecord.unapply)
def idx1 = index("offset_idx_1", (pramenTableName, infoDate), unique = false)
}

object OffsetRecords {
lazy val records = TableQuery[OffsetRecords]
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.rdb
import org.slf4j.LoggerFactory
import slick.jdbc.H2Profile
import slick.jdbc.H2Profile.api._
import za.co.absa.pramen.core.bookkeeper.model.{BookkeepingRecords, MetadataRecords, SchemaRecords}
import za.co.absa.pramen.core.bookkeeper.model.{BookkeepingRecords, MetadataRecords, OffsetRecords, SchemaRecords}
import za.co.absa.pramen.core.journal.model.JournalTasks
import za.co.absa.pramen.core.lock.model.LockTickets
import za.co.absa.pramen.core.rdb.PramenDb.MODEL_VERSION
Expand Down Expand Up @@ -71,6 +71,10 @@ class PramenDb(val jdbcConfig: JdbcConfig,
addColumn(JournalTasks.journalTasks.baseTableRow.tableName, "environmentName", "varchar(128)")
addColumn(JournalTasks.journalTasks.baseTableRow.tableName, "tenant", "varchar(200)")
}

if (dbVersion < 5) {
initTable(OffsetRecords.records.schema)
}
}

def initTable(schema: H2Profile.SchemaDescription): Unit = {
Expand Down Expand Up @@ -106,7 +110,7 @@ class PramenDb(val jdbcConfig: JdbcConfig,
}

object PramenDb {
val MODEL_VERSION = 4
val MODEL_VERSION = 5
val DEFAULT_RETRIES = 3

def apply(jdbcConfig: JdbcConfig): PramenDb = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class BookkeeperJdbcSuite extends BookkeeperCommonSuite with RelationalDbFixture
assert(getTables.exists(_.equalsIgnoreCase("bookkeeping")))
assert(getTables.exists(_.equalsIgnoreCase("schemas")))
assert(getTables.exists(_.equalsIgnoreCase("metadata")))
assert(getTables.exists(_.equalsIgnoreCase("offsets")))
}
}

Expand Down

0 comments on commit 61bcaff

Please sign in to comment.