From 7da2627cac8ed1546bb8ee3ab182000da38ce276 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 27 Sep 2024 13:10:12 +0200 Subject: [PATCH] #374 Calculate throughput based on appended records for incremental jobs. --- .../pipeline/PipelineNotificationBuilderHtml.scala | 6 +++++- .../PipelineNotificationBuilderHtmlSuite.scala | 11 +++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala index 571e15c99..14ff23435 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala @@ -500,7 +500,11 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot private[core] def getThroughputRps(task: TaskResult): TextElement = { val recordCount = task.runStatus match { - case s: Succeeded => s.recordCount + case s: Succeeded => + s.recordsAppended match { + case Some(appended) => appended + case None => s.recordCount + } case _ => 0 } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala index b755d2e2a..98a2444bf 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala @@ -421,6 +421,17 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis assert(actual.text == "225 r/s") } + "work for an incremental successful task" in { + val builder = getBuilder() + + val runStatus = RunStatusFactory.getDummySuccess(None, 1000000, recordsAppended = Some(500000), reason = TaskRunReason.New) + val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus) + + val actual = builder.getThroughputRps(task) + + assert(actual.text == "112 r/s") + } + "work for a raw file task" in { val builder = getBuilder()