From 61bcaff1c51de48ba19561816da57eba302b70c3 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 6 Sep 2024 11:48:36 +0200 Subject: [PATCH] #374 Add a table in bookeeping for storing offsets. --- .../core/bookkeeper/model/OffsetRecord.scala | 26 ++++++++++++++ .../core/bookkeeper/model/OffsetRecords.scala | 35 +++++++++++++++++++ .../za/co/absa/pramen/core/rdb/PramenDb.scala | 8 +++-- .../bookkeeper/BookkeeperJdbcSuite.scala | 1 + 4 files changed, 68 insertions(+), 2 deletions(-) create mode 100644 pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecord.scala create mode 100644 pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecord.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecord.scala new file mode 100644 index 000000000..925e915e5 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecord.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.bookkeeper.model + +case class OffsetRecord(pramenTableName: String, + infoDate: String, /* Use String to workaround serialization issues */ + dataType: String, + minOffset: String, + maxOffset: String, + committed: Boolean, + lastUpdated: Long /* Use epoch seconds */ + ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala new file mode 100644 index 000000000..d3954d591 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/OffsetRecords.scala @@ -0,0 +1,35 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.bookkeeper.model + +import slick.jdbc.H2Profile.api._ + +class OffsetRecords(tag: Tag) extends Table[OffsetRecord](tag, "offsets") { + def pramenTableName = column[String]("table_name", O.Length(128)) + def infoDate = column[String]("info_date", O.Length(20)) + def dataType = column[String]("data_type", O.Length(20)) + def minOffset = column[String]("min_offset", O.Length(128)) + def maxOffset = column[String]("min_offset", O.Length(128)) + def committed = column[Boolean]("committed") + def lastUpdated = column[Long]("last_updated") + def * = (pramenTableName, infoDate, dataType, minOffset, maxOffset, committed, lastUpdated) <> (OffsetRecord.tupled, OffsetRecord.unapply) + def idx1 = index("offset_idx_1", (pramenTableName, infoDate), unique = false) +} + +object OffsetRecords { + lazy val records = TableQuery[OffsetRecords] +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala index 914a2a997..30e68834e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.rdb import org.slf4j.LoggerFactory import slick.jdbc.H2Profile import slick.jdbc.H2Profile.api._ -import za.co.absa.pramen.core.bookkeeper.model.{BookkeepingRecords, MetadataRecords, SchemaRecords} +import za.co.absa.pramen.core.bookkeeper.model.{BookkeepingRecords, MetadataRecords, OffsetRecords, SchemaRecords} import za.co.absa.pramen.core.journal.model.JournalTasks import za.co.absa.pramen.core.lock.model.LockTickets import za.co.absa.pramen.core.rdb.PramenDb.MODEL_VERSION @@ -71,6 +71,10 @@ class PramenDb(val jdbcConfig: JdbcConfig, addColumn(JournalTasks.journalTasks.baseTableRow.tableName, "environmentName", "varchar(128)") addColumn(JournalTasks.journalTasks.baseTableRow.tableName, "tenant", "varchar(200)") } + + if (dbVersion < 5) { + initTable(OffsetRecords.records.schema) + } } def initTable(schema: H2Profile.SchemaDescription): Unit = { @@ -106,7 +110,7 @@ class PramenDb(val jdbcConfig: JdbcConfig, } object PramenDb { - val MODEL_VERSION = 4 + val MODEL_VERSION = 5 val DEFAULT_RETRIES = 3 def apply(jdbcConfig: JdbcConfig): PramenDb = { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala index 2a39ec42a..cecd56a6a 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala @@ -49,6 +49,7 @@ class BookkeeperJdbcSuite extends BookkeeperCommonSuite with RelationalDbFixture assert(getTables.exists(_.equalsIgnoreCase("bookkeeping"))) assert(getTables.exists(_.equalsIgnoreCase("schemas"))) assert(getTables.exists(_.equalsIgnoreCase("metadata"))) + assert(getTables.exists(_.equalsIgnoreCase("offsets"))) } }