diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/StandardizationSink.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/StandardizationSink.scala index 1b19106d4..99b3f4a62 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/StandardizationSink.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/sink/StandardizationSink.scala @@ -219,13 +219,10 @@ class StandardizationSink(sinkConfig: Config, private[extras] def writeToRawFolder(df: DataFrame, recordCount: Long, - outputPartitionPath: Path)(implicit spark: SparkSession): Unit = { + outputPartitionPath: Path): Unit = { val outputPathStr = outputPartitionPath.toUri.toString log.info(s"Saving $recordCount records to the Enceladus raw folder: $outputPathStr") - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, outputPathStr) - fsUtils.createDirectoryRecursiveButLast(outputPartitionPath) - df.write .mode(SaveMode.Overwrite) .json(outputPathStr) @@ -233,13 +230,10 @@ class StandardizationSink(sinkConfig: Config, private[extras] def writeToPublishFolder(df: DataFrame, recordCount: Long, - outputPartitionPath: Path)(implicit spark: SparkSession): Unit = { + outputPartitionPath: Path): Unit = { val outputPathStr = outputPartitionPath.toUri.toString log.info(s"Saving $recordCount records to the Enceladus publish folder: $outputPathStr") - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, outputPathStr) - fsUtils.createDirectoryRecursiveButLast(outputPartitionPath) - df.write .mode(SaveMode.Overwrite) .parquet(outputPathStr)