EtlFlow is an ecosystem of functional libraries in Scala based on ZIO for running complex Auditable jobs/workflows which can interact with Google Cloud Platform, AWS, Kubernetes, Databases and more.
Below are some important features of this library, some of which come from ZIO.
- Universal. It provides a consistent way to interact with different services/products across cloud platforms like GCP, AWS, Azure, and On-Premises systems.
- Functional. Rapidly compose complex jobs/workflows from simple tasks.
- Auditable. Build jobs/workflows that provides auditability by default for multiple backends.
- Resource-safe. Build jobs/workflows that never leak resources (including threads!), even when they fail.
- Compile Time DI. Build jobs/workflows that allows resolving dependencies at compile time.
- Fibers. Built on non-blocking fibers that never waste or leak resources, which lets you build scalable, resilient, and reactive applications
- Concurrent and Asynchronous. Easily build concurrent asynchronous or synchronous jobs/workflows without deadlocks, race conditions, or complexity.
- Type-safe. Use the full power of the Scala compiler to catch bugs at compile time.
- Testable. Inject test services into your job/workflow for fast, deterministic, and type-safe testing.
- Resilient. Build jobs/workflows that never lose errors, and which respond to failure locally and flexibly.
- Core Module:
In this example project, you can explore core features of etlflow, Task and Audit API. - GCP Module (GCS, DataProc, BigQuery tasks):
In this example project, you can explore GCP tasks. - K8S Module (K8S tasks):
In this example project, you can explore different Kubernetes tasks. - Spark Module (Spark tasks):
In this example project, you can explore different Apache Spark tasks.
This project is compiled with scala versions 2.12.17, 2.13.10, 3.3.0
Available via maven central. Add the below latest release as a dependency to your project
SBT
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-core" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-gcp" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-jdbc" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-spark" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-k8s" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-http" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-ftp" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-redis" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-aws" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-email" % "1.7.3"
Maven
<dependency>
<groupId>com.github.tharwaninitin</groupId>
<artifactId>etlflow-core_2.12</artifactId>
<version>1.7.3</version>
</dependency>
The core module provides Task and Audit APIs, which are used by all tasks in different modules. It also provides a Job API that facilitates grouping multiple tasks together to leverage auditing and logging capabilities at the job/workflow level.
Below is the simplest example of creating a Task and running it using EtlFlow. This example uses the noop audit backend, which does nothing. This is useful when you want to test a task that requires an audit backend to be passed in.
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-core" % "1.7.3"
import etlflow.audit.Audit
import etlflow.task.GenericTask
import zio._
object Job1 extends ZIOAppDefault {
def executeTask(): Task[Unit] = ZIO.logInfo(s"Hello EtlFlow Task")
val genericTask1: GenericTask[Any, Unit] = GenericTask(
name = "Generic Task",
task = executeTask()
)
val task1: RIO[Audit, Unit] = genericTask1.toZIO
override def run: Task[Unit] = task1.provide(etlflow.audit.noop)
}
EtlFlow provides an auditing interface that can be used to track the execution of tasks and jobs (collections of tasks). The auditing interface is integrated with the Task Interface. Each task uses this interface to maintain the state of all tasks in the job/workflow in the backend of choice for end-to-end auditability. Currently, there are audit backend implementations available for BigQuery, MySQL, and Postgres. Audit has a simple and concise interface, which makes it quite easy to add any new backend.
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-core" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-jdbc" % "1.7.3"
import etlflow.task.{GenericTask, DBQueryTask}
import etlflow.model.Credential.JDBC
import zio._
object Job2 extends ZIOAppDefault {
private val task1 = GenericTask(
name = "Generic Task 1",
task = ZIO.logInfo(s"Task 1")
)
private val task2 = GenericTask(
name = "Generic Task 2",
task = ZIO.logInfo(s"Task 2")
)
val job = for {
_ <- task1.toZIO
_ <- task2.toZIO
} yield ()
private val cred = JDBC(sys.env("DB_URL"), sys.env("DB_USER"), sys.env("DB_PWD"), sys.env("DB_DRIVER"))
override def run: Task[Unit] = job.provide(etlflow.audit.DB(cred))
}
Here's a snapshot of data for the task_run
table after this job has run:
task_run_id | job_run_id | task_name | task_type | metadata | status | created_at | modified_at |
---|---|---|---|---|---|---|---|
1 | 100 | Task 1 | GenericTask | {} | SUCCESS | 2023-07-13 10:00:00 UTC | 2023-07-13 11:00:00 UTC |
2 | 100 | Task 2 | GenericTask | {} | RUNNING | 2023-07-13 12:00:00 UTC | 2023-07-13 13:00:00 UTC |
Job API enables grouping multiple tasks together for auditing capabilities at the job level, below is the example of creating a JobApp and running it using EtlFlow. By default, it uses noop audit layer but here we are using JDBC layer to persist auditing information in database.
import etlflow._
import etlflow.audit.Audit
import etlflow.task._
import zio._
object MyJobApp extends JobApp {
private val cred = JDBC(sys.env("DB_URL"), sys.env("DB_USER"), sys.env("DB_PWD"), sys.env("DB_DRIVER"))
override val auditLayer: Layer[Throwable, Audit] = etlflow.audit.DB(cred)
private val task1 = GenericTask(
name = "Task_1",
task = ZIO.logInfo(s"Hello EtlFlow Task")
)
def job(args: Chunk[String]): RIO[audit.Audit, Unit] = task1.toZIO
}
Here's a snapshot of data for the job_run
and task_run
table after this job has run:
job_run_id | job_name | metadata | status | created_at | modified_at |
---|---|---|---|---|---|
1 | MyJobApp | {} | SUCCESS | 2023-07-13 10:00:00 UTC | 2023-07-13 11:00:00 UTC |
task_run_id | job_run_id | task_name | task_type | metadata | status | created_at | modified_at |
---|---|---|---|---|---|---|---|
1 | 1 | Task 1 | GenericTask | {} | SUCCESS | 2023-07-13 10:00:00 UTC | 2023-07-13 11:00:00 UTC |
# To run all below GCP examples set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the location of the service account json key.
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/key.json
import etlflow.task._
import gcp4zio.dp._
import etlflow.audit
import etlflow.audit.Audit
import zio._
val gcpProject: String = "GCP_PROJECT"
val gcpRegion: String = "GCP_REGION"
val dpCluster: String = "DP_CLUSTER"
val dpEndpoint: String = "DP_ENDPOINT"
val dpBucket: String = "DP_BUCKET"
val createCluster = DPCreateTask("DPCreateTask", dpCluster, ClusterProps(dpBucket))
val deleteCluster = DPDeleteTask("DPDeleteTask", dpCluster)
val args = List("1000")
val mainClass = "org.apache.spark.examples.SparkPi"
val libs = List("file:///usr/lib/spark/examples/jars/spark-examples.jar")
val conf = Map("spark.executor.memory" -> "1g", "spark.driver.memory" -> "1g")
val sparkJob = DPSparkJobTask("DPSparkJobTask", args, mainClass, libs, conf)
val programGCP: RIO[DPJob with DPCluster with Audit, Unit] = for {
_ <- createCluster.toZIO
_ <- sparkJob.toZIO
_ <- deleteCluster.toZIO
} yield ()
val dpJobLayer = DPJob.live(dpCluster, gcpProject, gcpRegion, dpEndpoint)
val dpClusterLayer = DPCluster.live(gcpProject, gcpRegion, dpEndpoint)
programGCP.provide(dpJobLayer,dpClusterLayer,audit.noop)
Check this for complete example.
// Todo
This module depends on kubernetes official Java client library version 18.0.1
import etlflow.task._
import etlflow.k8s._
import etlflow.audit
import etlflow.audit.Audit
import zio._
val jobName: String = "hello"
val programK8S: RIO[K8S with Audit, Unit] = for {
_ <- K8SJobTask(
name = "CreateKubeJobTask",
jobName = jobName,
image = "busybox:1.28",
command = Some(List("/bin/sh", "-c", "sleep 5; ls /etc/key; date; echo Hello from the Kubernetes cluster"))
).toZIO
_ <- K8STrackJobTask("TrackKubeJobTask", jobName).toZIO
_ <- K8SJobLogTask("GetKubeJobLogTask", jobName).toZIO
_ <- K8SDeleteJobTask("DeleteKubeJobTask", jobName).toZIO
} yield ()
programK8S.provide(K8S.live(),audit.noop)
Check this for complete example.
// Todo
// Todo
// Todo
// Todo
// Todo
Please feel free to add issues to report any bugs or to propose new features.