Skip to content

Commit

Permalink
1.6.0 (#177)
Browse files Browse the repository at this point in the history
* Updated Audit API, added experimental EtlJob API
* Updated JDBC module, added support for JDBC H2 database
* Added JSON support in core module
* Updated BQ an DB Audit defaults
  • Loading branch information
tharwaninitin authored Jan 15, 2023
1 parent f033d30 commit a6242f2
Show file tree
Hide file tree
Showing 53 changed files with 544 additions and 265 deletions.
17 changes: 17 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,23 @@ jobs:
DB_USER: etlflow
DB_PWD: etlflow
DB_DRIVER: com.mysql.cj.jdbc.Driver
dbh2:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up JDK
uses: actions/setup-java@v3
with:
java-version: 11
distribution: 'adopt'
cache: 'sbt'
- name: Run JDBC Tests
run: sbt ";project jdbc; test; Test/runMain etlflow.SampleJobWithDbLogging"
env:
DB_URL: jdbc:h2:file:~/h2db/coordinator
DB_USER: etlflow
DB_PWD: etlflow
DB_DRIVER: org.h2.Driver
spark:
runs-on: ubuntu-latest
strategy:
Expand Down
24 changes: 12 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,22 @@ Add the below latest release as a dependency to your project

__SBT__
```scala
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-core" % "1.5.0"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-gcp" % "1.5.0"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-jdbc" % "1.5.0"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-spark" % "1.5.0"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-k8s" % "1.5.0"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-http" % "1.5.0"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-redis" % "1.5.0"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-aws" % "1.5.0"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-email" % "1.5.0"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-core" % "1.6.0"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-gcp" % "1.6.0"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-jdbc" % "1.6.0"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-spark" % "1.6.0"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-k8s" % "1.6.0"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-http" % "1.6.0"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-redis" % "1.6.0"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-aws" % "1.6.0"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-email" % "1.6.0"
```
__Maven__
```
<dependency>
<groupId>com.github.tharwaninitin</groupId>
<artifactId>etlflow-core_2.12</artifactId>
<version>1.5.0</version>
<version>1.6.0</version>
</dependency>
```

Expand Down Expand Up @@ -124,7 +124,7 @@ val programGCP: RIO[DPJob with DPCluster with Audit, Unit] = for {
val dpJobLayer = DPJob.live(dpCluster, gcpProject, gcpRegion, dpEndpoint)
val dpClusterLayer = DPCluster.live(gcpProject, gcpRegion, dpEndpoint)

programGCP.provide(dpJobLayer ++ dpClusterLayer ++ audit.test)
programGCP.provide(dpJobLayer ++ dpClusterLayer ++ audit.noop)
```
Check [this](examples/examplegcp/src/main/scala/examples/Job1GCP.scala) for complete example.

Expand All @@ -151,7 +151,7 @@ val programK8S: RIO[K8S with Audit, Unit] = for {
_ <- DeleteKubeJobTask("DeleteKubeJobTask", jobName).execute
} yield ()

programK8S.provide(K8S.live() ++ audit.test)
programK8S.provide(K8S.live() ++ audit.noop)
```
Check [this](examples/examplek8s/src/main/scala/examples/Job1K8S.scala) for complete example.
## JDBC
Expand Down
2 changes: 0 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import Versions._

Global / onChangedBuildSource := ReloadOnSourceChanges

ThisBuild / version := EtlFlowVersion

lazy val commonSettings = Seq(
scalaVersion := Scala212,
dependencyUpdatesFailBuild := true,
Expand Down
4 changes: 2 additions & 2 deletions docs/readme.template.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ val programGCP: RIO[DPJob with DPCluster with Audit, Unit] = for {
val dpJobLayer = DPJob.live(dpCluster, gcpProject, gcpRegion, dpEndpoint)
val dpClusterLayer = DPCluster.live(gcpProject, gcpRegion, dpEndpoint)

programGCP.provide(dpJobLayer ++ dpClusterLayer ++ audit.test)
programGCP.provide(dpJobLayer ++ dpClusterLayer ++ audit.noop)
```
Check [this](examples/examplegcp/src/main/scala/examples/Job1GCP.scala) for complete example.

Expand All @@ -151,7 +151,7 @@ val programK8S: RIO[K8S with Audit, Unit] = for {
_ <- DeleteKubeJobTask("DeleteKubeJobTask", jobName).execute
} yield ()

programK8S.provide(K8S.live() ++ audit.test)
programK8S.provide(K8S.live() ++ audit.noop)
```
Check [this](examples/examplek8s/src/main/scala/examples/Job1K8S.scala) for complete example.
## JDBC
Expand Down
8 changes: 4 additions & 4 deletions examples.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ lazy val examplecore = (project in file("examples/examplecore"))
name := "examplecore",
crossScalaVersions := AllScalaVersions,
libraryDependencies ++= List(
"com.github.tharwaninitin" %% "etlflow-jdbc" % EtlFlowVersion,
"com.github.tharwaninitin" %% "etlflow-jdbc" % version.value,
"ch.qos.logback" % "logback-classic" % LogbackVersion,
"org.postgresql" % "postgresql" % PgVersion
)
Expand All @@ -32,7 +32,7 @@ lazy val examplek8s = (project in file("examples/examplek8s"))
name := "examplek8s",
crossScalaVersions := AllScalaVersions,
libraryDependencies ++= List(
"com.github.tharwaninitin" %% "etlflow-k8s" % EtlFlowVersion,
"com.github.tharwaninitin" %% "etlflow-k8s" % version.value,
"ch.qos.logback" % "logback-classic" % LogbackVersion
)
)
Expand All @@ -42,7 +42,7 @@ lazy val examplegcp = (project in file("examples/examplegcp"))
name := "examplegcp",
crossScalaVersions := AllScalaVersions,
libraryDependencies ++= List(
"com.github.tharwaninitin" %% "etlflow-gcp" % EtlFlowVersion,
"com.github.tharwaninitin" %% "etlflow-gcp" % version.value,
"ch.qos.logback" % "logback-classic" % LogbackVersion
)
)
Expand All @@ -52,7 +52,7 @@ lazy val examplespark = (project in file("examples/examplespark"))
name := "examplespark",
crossScalaVersions := Scala2Versions,
libraryDependencies ++= List(
"com.github.tharwaninitin" %% "etlflow-spark" % EtlFlowVersion,
"com.github.tharwaninitin" %% "etlflow-spark" % version.value,
("org.apache.spark" %% "spark-sql" % SparkVersion).excludeAll(ExclusionRule(organization = "org.scala-lang.modules")),
"ch.qos.logback" % "logback-classic" % LogbackVersion,
"com.google.cloud.bigdataoss" % "gcs-connector" % HadoopGCSVersion
Expand Down
2 changes: 1 addition & 1 deletion examples/examplecore/src/main/scala/examples/Job1.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ object Job1 extends zio.ZIOAppDefault with ApplicationLogger {
function = executeTask()
)

override def run: Task[Unit] = task1.execute.provideLayer(etlflow.audit.test)
override def run: Task[Unit] = task1.execute.provideLayer(etlflow.audit.noop)
}
2 changes: 1 addition & 1 deletion examples/examplecore/src/main/scala/examples/Job2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ object Job2 extends zio.ZIOAppDefault with ApplicationLogger {
_ <- task3.execute
} yield ()

override def run: Task[Unit] = job.provideLayer(etlflow.audit.test)
override def run: Task[Unit] = job.provideLayer(etlflow.audit.noop)
}
6 changes: 3 additions & 3 deletions examples/examplecore/src/main/scala/examples/Job3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ object Job3 extends zio.ZIOAppDefault with ApplicationLogger {
query = "SELECT job_name,job_run_id,state FROM jobrun LIMIT 10"
)(rs => EtlJobRun(rs.string("job_name"), rs.string("job_run_id"), rs.string("state")))

private def processData(ip: List[EtlJobRun]): Unit = {
private def processData(ip: Iterable[EtlJobRun]): Unit = {
logger.info("Processing Data")
ip.foreach(jr => logger.info(s"$jr"))
}

private def task2(ip: List[EtlJobRun]): GenericTask[Unit] = GenericTask(
private def task2(ip: Iterable[EtlJobRun]): GenericTask[Unit] = GenericTask(
name = "ProcessData",
function = processData(ip)
)
Expand All @@ -34,5 +34,5 @@ object Job3 extends zio.ZIOAppDefault with ApplicationLogger {
_ <- task2(op).execute
} yield ()

override def run: Task[Unit] = job.provideLayer(etlflow.audit.test)
override def run: Task[Unit] = job.provideLayer(etlflow.audit.noop)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import etlflow.task.{DBReadTask, GenericTask}
import zio.{Chunk, RIO, ZLayer}

@SuppressWarnings(Array("org.wartremover.warts.ToString"))
object Job4EtlFlow extends JobApp {
object Job3EtlFlow extends JobApp {

case class EtlJobRun(job_name: String, job_run_id: String, state: String)

Expand All @@ -20,12 +20,12 @@ object Job4EtlFlow extends JobApp {
query = "SELECT job_name,job_run_id,status FROM jobrun LIMIT 10"
)(rs => EtlJobRun(rs.string("job_name"), rs.string("job_run_id"), rs.string("status")))

private def processData(ip: List[EtlJobRun]): Unit = {
private def processData(ip: Iterable[EtlJobRun]): Unit = {
logger.info("Processing Data")
ip.foreach(jr => logger.info(s"$jr"))
}

private def task2(ip: List[EtlJobRun]): GenericTask[Unit] = GenericTask(
private def task2(ip: Iterable[EtlJobRun]): GenericTask[Unit] = GenericTask(
name = "ProcessData",
function = processData(ip)
)
Expand Down
3 changes: 2 additions & 1 deletion examples/examplegcp/src/main/scala/examples/Job1GCP.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package examples

import etlflow.audit
import etlflow.log.ApplicationLogger
import etlflow.task._
import gcp4zio.dp._
Expand Down Expand Up @@ -42,5 +43,5 @@ object Job1GCP extends ZIOAppDefault with ApplicationLogger {

private val dpClusterLayer = DPCluster.live(gcpProject, gcpRegion, dpEndpoint)

override def run: Task[Unit] = program.provide(dpJobLayer ++ dpClusterLayer ++ etlflow.audit.test)
override def run: Task[Unit] = program.provide(dpJobLayer ++ dpClusterLayer ++ audit.noop)
}
4 changes: 2 additions & 2 deletions examples/examplek8s/src/main/scala/examples/Job1K8S.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package examples

import etlflow.audit.Audit
import etlflow.audit
import etlflow.audit.Audit
import etlflow.k8s._
import etlflow.log.ApplicationLogger
import etlflow.task._
Expand All @@ -28,5 +28,5 @@ object Job1K8S extends ZIOAppDefault with ApplicationLogger {
_ <- DeleteKubeJobTask("DeleteKubeJobTask", jobName).execute
} yield ()

override def run: Task[Unit] = ZIO.logInfo("Starting Job1K8S") *> program.provide(K8S.live() ++ audit.test)
override def run: Task[Unit] = ZIO.logInfo("Starting Job1K8S") *> program.provide(K8S.live() ++ audit.noop)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ object EtlJobBqToJdbc extends zio.ZIOAppDefault with ApplicationLogger {
outputType = RDB(JDBC(sys.env("DB_URL"), sys.env("DB_USER"), sys.env("DB_PWD"), sys.env("DB_DRIVER"))),
outputLocation = "ratings",
outputSaveMode = SaveMode.Overwrite
).execute.provideLayer(SparkLive.live(spark) ++ etlflow.audit.test)
).execute.provideLayer(SparkLive.live(spark) ++ etlflow.audit.noop)

override def run: Task[Unit] = task1
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object EtlJobCsvToCsvGcs extends zio.ZIOAppDefault with ApplicationLogger {
outputLocation = gcsOutputPath,
outputPartitionCol = Seq(f"$tempDateCol"),
outputSaveMode = SaveMode.Overwrite
).execute.provideLayer(SparkLive.live(spark) ++ etlflow.audit.test)
).execute.provideLayer(SparkLive.live(spark) ++ etlflow.audit.noop)

override def run: Task[Unit] = task1
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,5 @@ object EtlJobCsvToParquetGcs extends zio.ZIOAppDefault with ApplicationLogger {
_ <- task2.execute
} yield ()

override def run: Task[Unit] = job.provideLayer(SparkLive.live(spark) ++ etlflow.audit.test)
override def run: Task[Unit] = job.provideLayer(SparkLive.live(spark) ++ etlflow.audit.noop)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ object EtlJobParquetToJdbc extends zio.ZIOAppDefault with ApplicationLogger {
outputSaveMode = SaveMode.Overwrite
)

private val job = task1.execute.provideLayer(SparkLive.live(spark) ++ etlflow.audit.test)
private val job = task1.execute.provideLayer(SparkLive.live(spark) ++ etlflow.audit.noop)

override def run: Task[Unit] = job
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object EtlJobParquetToOrc extends zio.ZIOAppDefault with ApplicationLogger {
outputFilename = Some("ratings.orc")
)

private val job = task1.execute.provideLayer(SparkLive.live(spark) ++ etlflow.audit.test)
private val job = task1.execute.provideLayer(SparkLive.live(spark) ++ etlflow.audit.noop)

override def run: Task[Unit] = job
}
2 changes: 1 addition & 1 deletion modules/aws/src/test/scala/etlflow/S3TestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,5 @@ object S3TestSuite extends ZIOSpecDefault with TestHelper with ApplicationLogger
.runCollect
assertZIO(task.foldZIO(ex => ZIO.fail(ex.getMessage), _ => ZIO.succeed("ok")))(equalTo("ok"))
}
) @@ TestAspect.sequential).provideLayerShared(env ++ audit.test)
) @@ TestAspect.sequential).provideLayerShared(env ++ audit.noop)
}
23 changes: 14 additions & 9 deletions modules/core/src/main/scala/etlflow/audit/Audit.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package etlflow.audit

import etlflow.model._
import zio.{UIO, URIO, ZIO}
import zio.{RIO, Task, UIO, URIO, ZIO}

// format: off
@SuppressWarnings(Array("org.wartremover.warts.ToString"))
trait Audit {
val jobRunId: String

Expand All @@ -13,22 +14,26 @@ trait Audit {
def logTaskStart(taskRunId: String, taskName: String, props: Map[String,String], taskType: String): UIO[Unit]
def logTaskEnd(taskRunId: String, taskName: String, props: Map[String,String], taskType: String, error: Option[Throwable]): UIO[Unit]

def getJobRuns(query: String): UIO[Iterable[JobRun]]
def getTaskRuns(query: String): UIO[Iterable[TaskRun]]
def getJobRuns(query: String): Task[Iterable[JobRun]] = ZIO.logInfo(query) *> ZIO.succeed(List.empty[JobRun])
def getTaskRuns(query: String): Task[Iterable[TaskRun]] = ZIO.logInfo(query) *> ZIO.succeed(List.empty[TaskRun])

type RS
def fetchResults[T](query: String)(fn: RS => T): Task[Iterable[T]] = ZIO.logInfo(query + fn.toString) *> ZIO.succeed(Iterable.empty)
}

object Audit {
def logJobStart(jobName: String, props: Map[String,String]): URIO[Audit, Unit] =
ZIO.environmentWithZIO(_.get.logJobStart(jobName, props))
ZIO.serviceWithZIO(_.logJobStart(jobName, props))
def logJobEnd(jobName: String, props: Map[String,String], error: Option[Throwable] = None): URIO[Audit, Unit] =
ZIO.environmentWithZIO(_.get.logJobEnd(jobName, props, error))
ZIO.serviceWithZIO(_.logJobEnd(jobName, props, error))

def logTaskStart(taskRunId: String, taskName: String, props: Map[String,String], taskType: String): URIO[Audit, Unit] =
ZIO.environmentWithZIO(_.get.logTaskStart(taskRunId, taskName, props, taskType))
ZIO.serviceWithZIO(_.logTaskStart(taskRunId, taskName, props, taskType))
def logTaskEnd(taskRunId: String, taskName: String, props: Map[String,String], taskType: String, error: Option[Throwable] = None): URIO[Audit, Unit] =
ZIO.environmentWithZIO(_.get.logTaskEnd(taskRunId, taskName, props, taskType, error))
ZIO.serviceWithZIO(_.logTaskEnd(taskRunId, taskName, props, taskType, error))

def getJobRuns(query: String): URIO[Audit ,Iterable[JobRun]] = ZIO.environmentWithZIO(_.get.getJobRuns(query))
def getTaskRuns(query: String): URIO[Audit, Iterable[TaskRun]] = ZIO.environmentWithZIO(_.get.getTaskRuns(query))
def getJobRuns(query: String): RIO[Audit ,Iterable[JobRun]] = ZIO.serviceWithZIO(_.getJobRuns(query))
def getTaskRuns(query: String): RIO[Audit, Iterable[TaskRun]] = ZIO.serviceWithZIO(_.getTaskRuns(query))
def fetchResults[T](query: String)(fn: Audit#RS => T): RIO[Audit, Iterable[T]] = ZIO.serviceWithZIO[Audit](_.fetchResults(query)(fn))
}
// format: on
11 changes: 3 additions & 8 deletions modules/core/src/main/scala/etlflow/audit/Console.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package etlflow.audit

import etlflow.model.{JobRun, TaskRun}
import zio.{UIO, ZIO}

object Console extends Audit {
Expand All @@ -27,19 +26,15 @@ object Console extends Audit {
}

override def logJobStart(jobName: String, props: Map[String, String]): UIO[Unit] =
ZIO.logInfo(s"Job started")
ZIO.logInfo(s"Job $jobName started")

override def logJobEnd(
jobName: String,
props: Map[String, String],
error: Option[Throwable]
): UIO[Unit] = error.fold {
ZIO.logInfo(s"Job completed with success")
ZIO.logInfo(s"Job $jobName completed with success")
} { ex =>
ZIO.logError(s"Job completed with failure ${ex.getMessage}")
ZIO.logError(s"Job $jobName completed with failure ${ex.getMessage}")
}

override def getJobRuns(query: String): UIO[Iterable[JobRun]] = ZIO.succeed(List.empty[JobRun])

override def getTaskRuns(query: String): UIO[Iterable[TaskRun]] = ZIO.succeed(List.empty[TaskRun])
}
6 changes: 0 additions & 6 deletions modules/core/src/main/scala/etlflow/audit/Memory.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package etlflow.audit

import etlflow.log.ApplicationLogger
import etlflow.model.{JobRun, TaskRun}
import etlflow.utils.DateTimeApi
import zio.{Ref, UIO, ZIO}
import scala.collection.mutable
Expand Down Expand Up @@ -64,11 +63,6 @@ case class Memory(jobRunId: String) extends Audit with ApplicationLogger {
value.values.toList.sortBy(_.start_time).foreach(x => logger.info(x.toString()))
}
} yield ()

override def getJobRuns(query: String): UIO[Iterable[JobRun]] = ZIO.succeed(List.empty[JobRun])

override def getTaskRuns(query: String): UIO[Iterable[TaskRun]] = ZIO.succeed(List.empty[TaskRun])

}

object Memory {
Expand Down
5 changes: 0 additions & 5 deletions modules/core/src/main/scala/etlflow/audit/Slack.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package etlflow.audit

import etlflow.log.ApplicationLogger
import etlflow.model.{JobRun, TaskRun}
import zio.{UIO, ZIO}
import java.io.{BufferedWriter, OutputStreamWriter}
import java.net.{HttpURLConnection, URL}
Expand Down Expand Up @@ -104,8 +103,4 @@ case class Slack(jobRunId: String, slackUrl: String) extends Audit with Applicat

sendSlackNotification(data)
}.orDie

override def getJobRuns(query: String): UIO[Iterable[JobRun]] = ZIO.succeed(List.empty[JobRun])

override def getTaskRuns(query: String): UIO[Iterable[TaskRun]] = ZIO.succeed(List.empty[TaskRun])
}
Loading

0 comments on commit a6242f2

Please sign in to comment.