Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid duplicate instantiation of sink #527

Closed
kevinwallimann opened this issue Dec 18, 2024 · 0 comments · Fixed by #528
Closed

Avoid duplicate instantiation of sink #527

kevinwallimann opened this issue Dec 18, 2024 · 0 comments · Fixed by #528
Assignees
Labels
bug Something isn't working

Comments

@kevinwallimann
Copy link
Contributor

Describe the bug

Currently, the sink is created twice when a table-specific configuration is specified, once without the table-specific config, and once with the table-specific config, although only the second instance is effectively used. This leads to an error in the first instantiation, if the configuration is used in the constructor, e.g. for validation.

Code and/or configuration snippet that caused the issue

pramen {
sinks = [
  {
    name = "my_sink"
    factory.class = "za.co.absa.myproject.MyAwesomeSink"
  }
],
operations = [
  {
    name = "Execute my awesome sink"
    type = "sink"
    schedule.type = "daily"
    sink = "my_awesome_sink"

    dependencies = [
      {
        tables = ["my_table"]
        date.from = "@infoDate"
      }
    ]

    tables = [
      {
        input.metastore.table = "my_table"
        output {
          raw.table = "my_table_out"
        }
        sink.my.awesome.sink {
          item1 = "value1"
          item2 = "value2"
        }
      }
    ]
  }
}

The implementation of the sink looks like this

class MyAwesomeSink(val config: Config)(implicit spark: SparkSession) extends Sink {
    ConfigUtils.validatePathsExistence(config, "", Seq("my.awesome.sink.item1","my.awesome.sink.item2"))

   // etc.
}
object MyAwesomeSink extends ExternalChannelFactory[MyAwesomeSink] {
  override def apply(conf: Config, parentPath: String, spark: SparkSession): MyAwesomeSink =
    new MyAwesomeSink(conf)(spark)
}

The error message looks like similar to this:

java.lang.IllegalArgumentException: Mandatory configuration options are missing: my.awesome.sink.item1,my.awesome.sink.item2
  za.co.absa.pramen.core.utils.ConfigUtils$.validatePathsExistence(ConfigUtils.scala:499)
  za.co.absa.myproject.MyAwesomeSink.(MyAwesomeSink.scala:71)
  za.co.absa.myproject.MyAwesomeSink$.apply(MyAwesomeSink.scala:133)
  za.co.absa.myproject.MyAwesomeSink$.apply(MyAwesomeSink.scala:131)
  za.co.absa.pramen.core.ExternalChannelFactoryReflect$.fromConfig(ExternalChannelFactoryReflect.scala:55)
  za.co.absa.pramen.core.ExternalChannelFactoryReflect$.fromConfigByName(ExternalChannelFactoryReflect.scala:110)
  za.co.absa.pramen.core.sink.SinkManager$.getSinkByName(SinkManager.scala:36)
  za.co.absa.pramen.core.pipeline.OperationSplitter.createSink(OperationSplitter.scala:149)
  za.co.absa.pramen.core.pipeline.OperationSplitter.createJobs(OperationSplitter.scala:49)
  za.co.absa.pramen.core.runner.AppRunner$.$anonfun$splitJobs$2(AppRunner.scala:200)
  scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
  scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
  scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
  scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
  za.co.absa.pramen.core.runner.AppRunner$.$anonfun$splitJobs$1(AppRunner.scala:193)
  scala.util.Try$.apply(Try.scala:213)
  za.co.absa.pramen.core.runner.AppRunner$.splitJobs(AppRunner.scala:186)
  za.co.absa.pramen.core.runner.AppRunner$.$anonfun$runPipeline$8(AppRunner.scala:64)
  scala.util.Success.flatMap(Try.scala:251)
  za.co.absa.pramen.core.runner.AppRunner$.$anonfun$runPipeline$7(AppRunner.scala:63)
  scala.util.Success.flatMap(Try.scala:251)
  za.co.absa.pramen.core.runner.AppRunner$.$anonfun$runPipeline$6(AppRunner.scala:62)
  scala.util.Success.flatMap(Try.scala:251)
  za.co.absa.pramen.core.runner.AppRunner$.$anonfun$runPipeline$5(AppRunner.scala:61)
  scala.util.Success.flatMap(Try.scala:251)
  za.co.absa.pramen.core.runner.AppRunner$.$anonfun$runPipeline$4(AppRunner.scala:60)
  scala.util.Success.flatMap(Try.scala:251)
  za.co.absa.pramen.core.runner.AppRunner$.$anonfun$runPipeline$3(AppRunner.scala:59)
  scala.util.Success.flatMap(Try.scala:251)
  za.co.absa.pramen.core.runner.AppRunner$.$anonfun$runPipeline$1(AppRunner.scala:58)
  scala.util.Success.flatMap(Try.scala:251)
  za.co.absa.pramen.core.runner.AppRunner$.runPipeline(AppRunner.scala:57)
  za.co.absa.pramen.runner.PipelineRunner$.$anonfun$main$1(PipelineRunner.scala:51)
  za.co.absa.pramen.runner.PipelineRunner$.$anonfun$main$1$adapted(PipelineRunner.scala:43)
  scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:75)
  za.co.absa.pramen.runner.PipelineRunner$.main(PipelineRunner.scala:43)

Expected behavior

The sink should only be created once and the validation logic in the constructor should pass without errors.

Context

  • Pramen version: 1.10.1
  • Spark version: 3.3.4
  • Scala version: 2.12.12
@kevinwallimann kevinwallimann added the bug Something isn't working label Dec 18, 2024
@kevinwallimann kevinwallimann self-assigned this Dec 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant