Skip to content

Commit

Permalink
#374 Add number of appended records to journal.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Sep 30, 2024
1 parent 6f13121 commit 451f556
Show file tree
Hide file tree
Showing 10 changed files with 26 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class JournalHadoopCsv(journalPath: String)
inputRecordCountOld = v.inputRecordCountOld,
outputRecordCount = v.outputRecordCount,
outputRecordCountOld = v.outputRecordCountOld,
appendedRecordCount = v.appendedRecordCount,
outputSize = v.outputSize,
startedAt = v.startedAt,
finishedAt = v.finishedAt,
Expand All @@ -97,6 +98,7 @@ class JournalHadoopCsv(journalPath: String)

val outputRecordCount = t.outputRecordCount.map(_.toString).getOrElse("")
val outputRecordCountOld = t.outputRecordCountOld.map(_.toString).getOrElse("")
val appendedRecordCount = t.appendedRecordCount.map(_.toString).getOrElse("")
val outputSize = t.outputSize.map(_.toString).getOrElse("")

val record = removeSeparators(t.jobName) ::
Expand All @@ -108,6 +110,7 @@ class JournalHadoopCsv(journalPath: String)
t.inputRecordCountOld ::
outputRecordCount ::
outputRecordCountOld ::
appendedRecordCount ::
outputSize ::
t.startedAt ::
t.finishedAt ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class JournalJdbc(db: Database) extends Journal {
entry.inputRecordCountOld,
entry.outputRecordCount,
entry.outputRecordCountOld,
entry.appendedRecordCount,
entry.outputSize,
entry.startedAt,
entry.finishedAt,
Expand Down Expand Up @@ -82,6 +83,7 @@ class JournalJdbc(db: Database) extends Journal {
inputRecordCountOld = v.inputRecordCountOld,
outputRecordCount = v.outputRecordCount,
outputRecordCountOld = v.outputRecordCountOld,
appendedRecordCount = v.appendedRecordCount,
outputSize = v.outputSize,
startedAt = v.startedAt,
finishedAt = v.finishedAt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ case class JournalTask(
inputRecordCountOld: Long,
outputRecordCount: Option[Long],
outputRecordCountOld: Option[Long],
appendedRecordCount: Option[Long],
outputSize: Option[Long],
startedAt: Long,
finishedAt: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class JournalTasks(tag: Tag) extends Table[JournalTask](tag, "journal") {
def inputRecordCountOld = column[Long]("input_record_count_old")
def outputRecordCount = column[Option[Long]]("output_record_count")
def outputRecordCountOld = column[Option[Long]]("output_record_count_old")
def appendedRecordCount = column[Option[Long]]("appended_record_count")
def outputSize = column[Option[Long]]("output_size")
def startedAt = column[Long]("started_at")
def finishedAt = column[Long]("finished_at")
Expand All @@ -41,7 +42,7 @@ class JournalTasks(tag: Tag) extends Table[JournalTask](tag, "journal") {
def tenant = column[Option[String]]("tenant", O.Length(200))
def * = (jobName, pramenTableName, periodBegin, periodEnd,
informationDate, inputRecordCount, inputRecordCountOld, outputRecordCount,
outputRecordCountOld, outputSize, startedAt, finishedAt, status, failureReason, sparkApplicationId, pipelineId, pipelineName, environmentName, tenant) <> (JournalTask.tupled, JournalTask.unapply)
outputRecordCountOld, appendedRecordCount, outputSize, startedAt, finishedAt, status, failureReason, sparkApplicationId, pipelineId, pipelineName, environmentName, tenant) <> (JournalTask.tupled, JournalTask.unapply)
def idx1 = index("idx_started_at", startedAt, unique = false)
def idx2 = index("idx_finished_at", finishedAt, unique = false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ case class TaskCompleted(
inputRecordCountOld: Long,
outputRecordCount: Option[Long],
outputRecordCountOld: Option[Long],
appendedRecordCount: Option[Long],
outputSize: Option[Long],
startedAt: Long,
finishedAt: Long,
Expand All @@ -58,9 +59,9 @@ object TaskCompleted {
val failureReason = taskResult.runStatus.getReason
val sparkApplicationId = Option(taskResult.applicationId)

val (recordCountOld, inputRecordCount, outputRecordCount, sizeBytes) = taskResult.runStatus match {
case s: Succeeded => (s.recordCountOld, s.recordCount, Some(s.recordCount), s.sizeBytes)
case _ => (None, 0L, None, None)
val (recordCountOld, inputRecordCount, outputRecordCount, sizeBytes, appendedRecords) = taskResult.runStatus match {
case s: Succeeded => (s.recordCountOld, s.recordCount, Some(s.recordCount), s.sizeBytes, s.recordsAppended)
case _ => (None, 0L, None, None, None)
}

TaskCompleted(
Expand All @@ -73,6 +74,7 @@ object TaskCompleted {
recordCountOld.getOrElse(0L),
outputRecordCount,
recordCountOld,
appendedRecords,
sizeBytes,
taskStarted,
taskFinished,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ case class TaskCompletedCsv(
outputRecordCount: Option[Long],
outputRecordCountOld: Option[Long],
outputSize: Option[Long],
appendedRecordCount: Option[Long],
startedAt: Long,
finishedAt: Long,
status: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class PramenDb(val jdbcConfig: JdbcConfig,
if (dbVersion < 5) {
initTable(OffsetRecords.records.schema)
}

if (0 < dbVersion && dbVersion < 6) {
addColumn(JournalTasks.journalTasks.baseTableRow.tableName, "appended_record_count", "bigint")
}
}

def initTable(schema: H2Profile.SchemaDescription): Unit = {
Expand Down Expand Up @@ -110,7 +114,7 @@ class PramenDb(val jdbcConfig: JdbcConfig,
}

object PramenDb {
val MODEL_VERSION = 5
val MODEL_VERSION = 6
val DEFAULT_RETRIES = 3

def apply(jdbcConfig: JdbcConfig): PramenDb = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,8 @@ abstract class TaskRunnerBase(conf: Config,
log.warn("Skipping the interrupted exception of the killed task.")
} else {
pipelineState.addTaskCompletion(Seq(updatedResult))
addJournalEntry(task, updatedResult, pipelineState.getState.pipelineInfo)
if (taskResult.runStatus != RunStatus.NotRan)
addJournalEntry(task, updatedResult, pipelineState.getState.pipelineInfo)
}

updatedResult.runStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ object TaskCompletedFactory {
inputRecordCountOld: Long = 1000L,
outputRecordCount: Option[Long] = Some(1000L),
outputRecordCountOld: Option[Long] = Some(1000L),
appendedRecordCount: Option[Long] = None,
outputSize: Option[Long] = None,
startedAt: Long = 1234567L,
finishedAt: Long = 1234568L,
Expand All @@ -52,6 +53,7 @@ object TaskCompletedFactory {
inputRecordCountOld,
outputRecordCount,
outputRecordCountOld,
appendedRecordCount,
outputSize,
startedAt,
finishedAt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object TestCases {
val instant2: Instant = Instant.ofEpochSecond(1597318835)
val instant3: Instant = Instant.ofEpochSecond(1597318839)

val task1: TaskCompleted = model.TaskCompleted("job1", "table1", infoDate1, infoDate1, infoDate1, 100, 0, Some(100), None, None, 597318830, 1597318830, "New", Some("Test1"), Some("abc123"), Some("p_id_1"), Some("p_1"), Some("TEST"), Some("T1"))
val task2: TaskCompleted = model.TaskCompleted("job1", "table1", infoDate2, infoDate2, infoDate2, 100, 0, Some(100), None, None, 1597318835, 1597318835, "Late", Some("Test2"), Some("abc123"), Some("p_id_2"), Some("p_2"), Some("TEST"), Some("T2"))
val task3: TaskCompleted = model.TaskCompleted("job2", "table2", infoDate3, infoDate3, infoDate3, 100, 0, Some(100), None, None, 1597318839, 1597318839, "Fail", Some("Test3"), Some("abc123"), Some("p_id_3"), Some("p_2"), Some("TEST"), Some("T2"))
val task1: TaskCompleted = model.TaskCompleted("job1", "table1", infoDate1, infoDate1, infoDate1, 100, 0, Some(100), None, None, None, 597318830, 1597318830, "New", Some("Test1"), Some("abc123"), Some("p_id_1"), Some("p_1"), Some("TEST"), Some("T1"))
val task2: TaskCompleted = model.TaskCompleted("job1", "table1", infoDate2, infoDate2, infoDate2, 100, 0, Some(100), None, None, None, 1597318835, 1597318835, "Late", Some("Test2"), Some("abc123"), Some("p_id_2"), Some("p_2"), Some("TEST"), Some("T2"))
val task3: TaskCompleted = model.TaskCompleted("job2", "table2", infoDate3, infoDate3, infoDate3, 100, 0, Some(100), None, None, None, 1597318839, 1597318839, "Fail", Some("Test3"), Some("abc123"), Some("p_id_3"), Some("p_2"), Some("TEST"), Some("T2"))
}

0 comments on commit 451f556

Please sign in to comment.