From c24359727d2178b63453d6a14d7472079551fdca Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Mon, 11 Mar 2024 09:54:17 +0100 Subject: [PATCH] #369 Fix lazy created lazy if they depend on transient tables. --- .../za/co/absa/pramen/core/pipeline/OperationSplitter.scala | 4 ++-- .../co/absa/pramen/core/pipeline/OperationSplitterSuite.scala | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationSplitter.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationSplitter.scala index 70d9dd18a..dc839cd4f 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationSplitter.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationSplitter.scala @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.Config import org.apache.spark.sql.SparkSession import org.slf4j.LoggerFactory -import za.co.absa.pramen.api.Transformer +import za.co.absa.pramen.api.{DataFormat, Transformer} import za.co.absa.pramen.core.app.config.GeneralConfig.TEMPORARY_DIRECTORY_KEY import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.config.Keys.SPECIAL_CHARACTERS_IN_COLUMN_NAMES @@ -151,7 +151,7 @@ class OperationSplitter(conf: Config, val outputTableName = sinkTable.outputTableName.getOrElse(s"${sinkTable.metaTableName}->$sinkName") - val outputTable = inputTable.copy(name = outputTableName, hiveTable = None) + val outputTable = inputTable.copy(name = outputTableName, format = DataFormat.Null(), hiveTable = None) val notificationTargets = operationDef.notificationTargets .map(targetName => getNotificationTarget(conf, targetName, sinkTable.conf)) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/OperationSplitterSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/OperationSplitterSuite.scala index 3986d117f..baffc32be 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/OperationSplitterSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/OperationSplitterSuite.scala @@ -18,6 +18,7 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.{Config, ConfigFactory} import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.api.DataFormat import za.co.absa.pramen.core.OperationDefFactory import za.co.absa.pramen.core.base.SparkTestBase import za.co.absa.pramen.core.databricks.DatabricksClient @@ -221,7 +222,9 @@ class OperationSplitterSuite extends AnyWordSpec with SparkTestBase { assert(job.length == 2) assert(job.head.isInstanceOf[SinkJob]) + assert(job.head.asInstanceOf[SinkJob].outputTable.format.isInstanceOf[DataFormat.Null]) assert(job(1).isInstanceOf[SinkJob]) + assert(job(1).asInstanceOf[SinkJob].outputTable.format.isInstanceOf[DataFormat.Null]) } "create transfer jobs" in {