Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "Spark sink" #115

Closed
yruslan opened this issue Jan 4, 2023 · 0 comments
Closed

Add "Spark sink" #115

yruslan opened this issue Jan 4, 2023 · 0 comments
Assignees
Labels
enhancement New feature or request Pramen-Scala

Comments

@yruslan
Copy link
Collaborator

yruslan commented Jan 4, 2023

Background

Currently, we have "Spark source" that allows reading data sources located in Hadoop (HDFS, S3, etc) as an external source (za.co.absa.pramen.core.source.SparkSource).

But sometimes it is useful to have a "Spark sink" so the data from the metastore could be written as a part of a pipeline.

Specifically, such a sink can be used to run a pipeline on prem and write data to S3.

Feature

Add a new sink, SparkSink that will just do 'df.write(...)' with options provided by the configuration of the sink.

Example

This is how it might look like (not final).

sinks.conf:

pramen.sinks = [
  {
    name = "my_sink"
    factory.class = "za.co.absa.pramen.core.sink.SparkSink"

    # These is the format passed to 'df.write.format(...)'
    format = "csv"
    # These is the mode passed to 'df.write.format(...).mode(...)'
    mode = "append"

    ## Use at most one of for 'df.repartition(...).write(...)':
    # Number of partitions
    number.of.partitions = 10
    # Records per partition
    records.per.partition = 1000000

    # These are additional option passed to the writer as 'df.write(...).options(...)'
    option {
      sep = "|"
      quoteAll = "false"
      header = "true"
    }
  }
]

pipeline.conf:

pramen.operations = [
{
    name = "CSV sink"
    type = "sink"
    sink = "local_csv"

    schedule.type = "daily"

    dependencies = [
      {
        tables = [ users2 ]
        date.from = "@infoDate + 100"
        date.until = "@infoDate + 101"
        optional  = true
      }
    ]

    tables = [
      {
        input.metastore.table = my_table
        output.path = "s3://bucket/prefix"

        date {
          from = "@infoDate - 5"
          to = "@infoDate"
        }

        # This overrides options of the sink
        sink {
           mode = "overwrite"
          
           # These are additional option passed to the writer as 'df.write(...).options(...)'
           option {
              escape = ""
              emptyValue = ""
           }
        }

      }
    ]
  },
]
@yruslan yruslan added enhancement New feature or request Pramen-Scala labels Jan 4, 2023
jirifilip added a commit that referenced this issue Jan 12, 2023
jirifilip added a commit that referenced this issue Jan 12, 2023
jirifilip added a commit that referenced this issue Jan 12, 2023
jirifilip added a commit that referenced this issue Jan 12, 2023
@yruslan yruslan closed this as completed Jan 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request Pramen-Scala
Projects
None yet
Development

No branches or pull requests

2 participants