Skip to content

Commit

Permalink
#374 Calculate throughput based on appended records for incremental j…
Browse files Browse the repository at this point in the history
…obs.
  • Loading branch information
yruslan committed Sep 30, 2024
1 parent 858138f commit 7da2627
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,11 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot

private[core] def getThroughputRps(task: TaskResult): TextElement = {
val recordCount = task.runStatus match {
case s: Succeeded => s.recordCount
case s: Succeeded =>
s.recordsAppended match {
case Some(appended) => appended
case None => s.recordCount
}
case _ => 0
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,17 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis
assert(actual.text == "225 r/s")
}

"work for an incremental successful task" in {
val builder = getBuilder()

val runStatus = RunStatusFactory.getDummySuccess(None, 1000000, recordsAppended = Some(500000), reason = TaskRunReason.New)
val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus)

val actual = builder.getThroughputRps(task)

assert(actual.text == "112 r/s")
}

"work for a raw file task" in {
val builder = getBuilder()

Expand Down

0 comments on commit 7da2627

Please sign in to comment.