Skip to content

Commit

Permalink
1.7.3 (#183)
Browse files Browse the repository at this point in the history
* Updated Audit API
* Updated JSON API
* Updated documentation
* Updated Http Module
  • Loading branch information
tharwaninitin authored Jul 19, 2023
1 parent 513eaf8 commit be0acff
Show file tree
Hide file tree
Showing 91 changed files with 1,040 additions and 552 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ jobs:
- name: Run JDBC Tests
run: sbt ";project jdbc; test; Test/runMain etlflow.SampleJobWithDbLogging"
env:
DB_URL: jdbc:h2:file:~/h2db/coordinator
DB_URL: jdbc:h2:file:~/h2db/etlflow
DB_USER: etlflow
DB_PWD: etlflow
DB_DRIVER: org.h2.Driver
Expand Down Expand Up @@ -133,6 +133,7 @@ jobs:
run: sbt ";project spark; +test"
http:
runs-on: ubuntu-latest
timeout-minutes: 10
strategy:
matrix:
java-version: [ 11, 17 ]
Expand Down
11 changes: 11 additions & 0 deletions .github/workflows/ghdeps.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
name: SBT Github Update Dependency Graph
on: [push]
jobs:
dependency-graph:
name: Update Dependency Graph
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: scalacenter/sbt-dependency-submission@v2
with:
modules-ignore: etlflow_2.12 etlflow-docs_2.12 examples_2.12 etlflow-redis_2.12 etlflow-redis_2.13 examplespark_2.12 examplespark_2.13 etlflow-spark_2.12 etlflow-spark_2.13
2 changes: 2 additions & 0 deletions .github/workflows/updates.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ jobs:
run: sbt redis/dependencyUpdates
- name: Check Dependency Updates for spark
run: sbt spark/dependencyUpdates
- name: Check Dependency Updates for FTP
run: sbt ftp/dependencyUpdates
185 changes: 137 additions & 48 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/etlflow-core_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/etlflow-core)
[![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/etlflow-core_2.12/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/etlflow-core_2.12)

**EtlFlow** is an ecosystem of functional libraries in Scala based on ZIO for running complex **_Auditable_** workflows which can interact with Google Cloud Platform, AWS, Kubernetes, Databases and more.
**EtlFlow** is an ecosystem of functional libraries in Scala based on ZIO for running complex **_Auditable_** jobs/workflows which can interact with Google Cloud Platform, AWS, Kubernetes, Databases and more.

Below are some salient features of this library.
Below are some important features of this library, some of which come from ZIO.

- **Functional**. Rapidly compose complex **workflows** from simple tasks.
- **Universal**. It provides a consistent way to interact with different services/products across cloud platforms like GCP, AWS, Azure, and On-Premises systems.
- **Functional**. Rapidly compose complex **jobs/workflows** from simple tasks.
- **Auditable**. Build jobs/workflows that provides auditability by default for multiple backends.
- **Resource-safe**. Build jobs/workflows that never leak resources (including threads!), even when they fail.
- **Compile Time DI**. Build jobs/workflows that allows resolving dependencies at compile time.
- **Fibers**. Built on non-blocking fibers that never waste or leak resources, which lets you build scalable, resilient, and reactive applications
- **Resource-safe**. Build workflows that never leak resources (including threads!), even when they fail.
- **Concurrent**. Easily build concurrent workflows without deadlocks, race conditions, or complexity.
- **Asynchronous**. Write sequential code that looks the same whether it's asynchronous or synchronous.
- **Concurrent and Asynchronous**. Easily build concurrent asynchronous or synchronous jobs/workflows without deadlocks, race conditions, or complexity.
- **Type-safe**. Use the full power of the Scala compiler to catch bugs at compile time.
- **Testable**. Inject test services into your workflow for fast, deterministic, and type-safe testing.
- **Resilient**. Build workflows that never lose errors, and which respond to failure locally and flexibly.
- **Testable**. Inject test services into your job/workflow for fast, deterministic, and type-safe testing.
- **Resilient**. Build jobs/workflows that never lose errors, and which respond to failure locally and flexibly.

[//]: # (## Documentation)
[//]: # ()
[//]: # (__Library Documentation__ https://tharwaninitin.github.io/etlflow/site/docs)
[//]: <> (__Scala Test Coverage Report__ https://tharwaninitin.github.io/etlflow/testcovrep/)

Expand Down Expand Up @@ -51,7 +51,7 @@ Below are some salient features of this library.
| Spark | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/etlflow-spark_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/etlflow-spark) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/etlflow-spark_2.12/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/etlflow-spark_2.12) | [![etlflow-spark Scala version support](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-spark/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-spark) | Java 8 + |

## Requirements and Installation
This project is compiled with scala versions 2.12.17, 2.13.10, 3.2.1
This project is compiled with scala versions 2.12.17, 2.13.10, 3.3.0

Available via [maven central](https://mvnrepository.com/artifact/com.github.tharwaninitin/etlflow-core).
Add the below latest release as a dependency to your project
Expand All @@ -60,38 +60,37 @@ Add the below latest release as a dependency to your project

__SBT__
```scala
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-core" % "1.7.2"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-gcp" % "1.7.2"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-jdbc" % "1.7.2"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-spark" % "1.7.2"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-k8s" % "1.7.2"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-http" % "1.7.2"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-ftp" % "1.7.2"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-redis" % "1.7.2"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-aws" % "1.7.2"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-email" % "1.7.2"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-core" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-gcp" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-jdbc" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-spark" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-k8s" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-http" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-ftp" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-redis" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-aws" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-email" % "1.7.3"
```
__Maven__
```
<dependency>
<groupId>com.github.tharwaninitin</groupId>
<artifactId>etlflow-core_2.12</artifactId>
<version>1.7.2</version>
<version>1.7.3</version>
</dependency>
```

# Etlflow Modules
<!-- TOC -->
- [Etlflow Modules](#etlflow-modules)
- [Core](#core)
- [Task](#task)
- [Audit](#audit)
- [Config](#config)
- [Json](#json)
- [Task API](#task-api)
- [Audit API](#audit-api)
- [Job API](#job-api)
- [GCP](#gcp)
- [Dataproc](#dataproc)
- [K8S](#k8s)
- [JDBC](#jdbc)
- [K8S](#k8s)
- [Http](#http)
- [Email](#email)
- [Aws](#aws)
Expand All @@ -100,25 +99,113 @@ __Maven__
<!-- /TOC -->

## Core
The core module provides **Task** and **Audit** APIs, which are used by all tasks in different modules. It also provides a **Job** API that facilitates grouping multiple tasks together to leverage **auditing** and **logging** capabilities at the job/workflow level.
### Task API
Below is the simplest example of **creating** a **Task** and **running** it using EtlFlow.
This example uses the noop audit backend, which does nothing. This is useful when you want to test a task that requires an audit backend to be passed in.

```scala
// Todo
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-core" % "1.7.3"
```
### Task

```scala
// Todo
import etlflow.audit.Audit
import etlflow.task.GenericTask
import zio._

object Job1 extends ZIOAppDefault {

def executeTask(): Task[Unit] = ZIO.logInfo(s"Hello EtlFlow Task")

val genericTask1: GenericTask[Any, Unit] = GenericTask(
name = "Generic Task",
task = executeTask()
)

val task1: RIO[Audit, Unit] = genericTask1.toZIO

override def run: Task[Unit] = task1.provide(etlflow.audit.noop)
}
```
### Audit
### Audit API
EtlFlow provides an auditing interface that can be used to track the execution of tasks and jobs (collections of tasks).
The auditing interface is integrated with the Task Interface.
Each task uses this interface to maintain the state of all tasks in the job/workflow in the backend of choice for end-to-end auditability.
Currently, there are audit backend implementations available for BigQuery, MySQL, and Postgres.
Audit has a simple and concise interface, which makes it quite easy to add any new backend.
```scala
// Todo
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-core" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-jdbc" % "1.7.3"
```
### Config
```scala
// Todo
import etlflow.task.{GenericTask, DBQueryTask}
import etlflow.model.Credential.JDBC
import zio._

object Job2 extends ZIOAppDefault {

private val task1 = GenericTask(
name = "Generic Task 1",
task = ZIO.logInfo(s"Task 1")
)

private val task2 = GenericTask(
name = "Generic Task 2",
task = ZIO.logInfo(s"Task 2")
)

val job = for {
_ <- task1.toZIO
_ <- task2.toZIO
} yield ()

private val cred = JDBC(sys.env("DB_URL"), sys.env("DB_USER"), sys.env("DB_PWD"), sys.env("DB_DRIVER"))

override def run: Task[Unit] = job.provide(etlflow.audit.DB(cred))
}
```
### Json
Here's a snapshot of data for the `task_run` table after this job has run:

| task_run_id | job_run_id | task_name | task_type | metadata | status | created_at | modified_at |
|-------------|------------|-----------|-------------|----------|---------|-------------------------|-------------------------|
| 1 | 100 | Task 1 | GenericTask | {} | SUCCESS | 2023-07-13 10:00:00 UTC | 2023-07-13 11:00:00 UTC |
| 2 | 100 | Task 2 | GenericTask | {} | RUNNING | 2023-07-13 12:00:00 UTC | 2023-07-13 13:00:00 UTC |

### Job API
Job API enables grouping multiple tasks together for auditing capabilities at the job level, below is the example of creating a JobApp and running it using EtlFlow.
By default, it uses noop audit layer but here we are using JDBC layer to persist auditing information in database.
```scala
// Todo
import etlflow._
import etlflow.audit.Audit
import etlflow.task._
import zio._

object MyJobApp extends JobApp {

private val cred = JDBC(sys.env("DB_URL"), sys.env("DB_USER"), sys.env("DB_PWD"), sys.env("DB_DRIVER"))

override val auditLayer: Layer[Throwable, Audit] = etlflow.audit.DB(cred)

private val task1 = GenericTask(
name = "Task_1",
task = ZIO.logInfo(s"Hello EtlFlow Task")
)

def job(args: Chunk[String]): RIO[audit.Audit, Unit] = task1.toZIO
}
```

Here's a snapshot of data for the `job_run` and `task_run` table after this job has run:

| job_run_id | job_name | metadata | status | created_at | modified_at |
|------------|----------|----------|---------|-------------------------|-------------------------|
| 1 | MyJobApp | {} | SUCCESS | 2023-07-13 10:00:00 UTC | 2023-07-13 11:00:00 UTC |


| task_run_id | job_run_id | task_name | task_type | metadata | status | created_at | modified_at |
|-------------|------------|-----------|-------------|----------|---------|-------------------------|-------------------------|
| 1 | 1 | Task 1 | GenericTask | {} | SUCCESS | 2023-07-13 10:00:00 UTC | 2023-07-13 11:00:00 UTC |

## GCP
```shell
# To run all below GCP examples set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the location of the service account json key.
Expand All @@ -138,30 +225,36 @@ val dpCluster: String = "DP_CLUSTER"
val dpEndpoint: String = "DP_ENDPOINT"
val dpBucket: String = "DP_BUCKET"

val createCluster = DPCreateTask("DPCreateTask", dpCluster, ClusterProps(dpBucket)).toZIO
val deleteCluster = DPDeleteTask("DPDeleteTask", dpCluster).toZIO
val createCluster = DPCreateTask("DPCreateTask", dpCluster, ClusterProps(dpBucket))
val deleteCluster = DPDeleteTask("DPDeleteTask", dpCluster)

val args = List("1000")
val mainClass = "org.apache.spark.examples.SparkPi"
val libs = List("file:///usr/lib/spark/examples/jars/spark-examples.jar")
val conf = Map("spark.executor.memory" -> "1g", "spark.driver.memory" -> "1g")

val sparkJob = DPSparkJobTask("DPSparkJobTask", args, mainClass, libs, conf).toZIO
val sparkJob = DPSparkJobTask("DPSparkJobTask", args, mainClass, libs, conf)

val programGCP: RIO[DPJob with DPCluster with Audit, Unit] = for {
_ <- createCluster
_ <- sparkJob
_ <- deleteCluster
_ <- createCluster.toZIO
_ <- sparkJob.toZIO
_ <- deleteCluster.toZIO
} yield ()

val dpJobLayer = DPJob.live(dpCluster, gcpProject, gcpRegion, dpEndpoint)
val dpClusterLayer = DPCluster.live(gcpProject, gcpRegion, dpEndpoint)

programGCP.provide(dpJobLayer ++ dpClusterLayer ++ audit.noop)
programGCP.provide(dpJobLayer,dpClusterLayer,audit.noop)
```
Check [this](examples/examplegcp/src/main/scala/examples/Job1GCP.scala) for complete example.

## JDBC
```scala
// Todo
```

## K8S
This module depends on kubernetes official Java client library version 18.0.1
```scala
import etlflow.task._
import etlflow.k8s._
Expand All @@ -183,13 +276,9 @@ val programK8S: RIO[K8S with Audit, Unit] = for {
_ <- K8SDeleteJobTask("DeleteKubeJobTask", jobName).toZIO
} yield ()

programK8S.provide(K8S.live() ++ audit.noop)
programK8S.provide(K8S.live(),audit.noop)
```
Check [this](examples/examplek8s/src/main/scala/examples/Job1K8S.scala) for complete example.
## JDBC
```scala
// Todo
```
## Http
```scala
// Todo
Expand Down
69 changes: 65 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ lazy val k8sSettings = Seq(

lazy val ftpSettings = Seq(
name := "etlflow-ftp",
crossScalaVersions := Scala2Versions,
crossScalaVersions := AllScalaVersions,
libraryDependencies ++= ftpLibs ++ coreTestLibs
)

Expand Down Expand Up @@ -153,8 +153,69 @@ lazy val docs = project
.settings(
name := "etlflow-docs",
publish / skip := true,
mdocVariables := Map("VERSION" -> version.value, "Scala212" -> Scala212, "Scala213" -> Scala213, "Scala3" -> Scala3),
mdocIn := new File("docs/readme.template.md"),
mdocOut := new File("README.md")
mdocVariables := Map(
"VERSION" -> version.value,
"Scala212" -> Scala212,
"Scala213" -> Scala213,
"Scala3" -> Scala3,
"k8s" -> K8SVersion
),
mdocIn := new File("docs/readme.template.md"),
mdocOut := new File("README.md")
)
.enablePlugins(MdocPlugin)

lazy val examples = (project in file("examples"))
.settings(
crossScalaVersions := Nil, // crossScalaVersions must be set to Nil on the aggregating project
publish / skip := true,
scalacOptions ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, 12)) => s2copts ++ s212copts
case Some((2, 13)) => s2copts
case Some((3, _)) => s3copts
case _ => Seq()
}
}
)
.aggregate(examplecore, examplek8s, examplegcp, examplespark)

lazy val examplecore = (project in file("examples/examplecore"))
.settings(
name := "examplecore",
crossScalaVersions := AllScalaVersions,
libraryDependencies ++= List(
"ch.qos.logback" % "logback-classic" % LogbackVersion,
"org.postgresql" % "postgresql" % PgVersion
)
)
.dependsOn(jdbc)

lazy val examplek8s = (project in file("examples/examplek8s"))
.settings(
name := "examplek8s",
crossScalaVersions := AllScalaVersions,
libraryDependencies ++= List("ch.qos.logback" % "logback-classic" % LogbackVersion)
)
.dependsOn(k8s)

lazy val examplegcp = (project in file("examples/examplegcp"))
.settings(
name := "examplegcp",
crossScalaVersions := AllScalaVersions,
libraryDependencies ++= List("ch.qos.logback" % "logback-classic" % LogbackVersion)
)
.dependsOn(gcp)

lazy val examplespark = (project in file("examples/examplespark"))
.settings(
name := "examplespark",
crossScalaVersions := Scala2Versions,
libraryDependencies ++= List(
"org.apache.spark" %% "spark-sql" % SparkVersion,
"ch.qos.logback" % "logback-classic" % LogbackVersion,
"com.google.cloud.bigdataoss" % "gcs-connector" % HadoopGCSVersion
// "com.google.cloud.spark" %% "spark-bigquery-with-dependencies" % SparkBQVersion
)
)
.dependsOn(spark)
Loading

0 comments on commit be0acff

Please sign in to comment.