From a6242f2644b7d8fd4094dcb13c89c0faa7731c92 Mon Sep 17 00:00:00 2001 From: Nitin Tharwani <52317230+tharwaninitin@users.noreply.github.com> Date: Mon, 16 Jan 2023 01:19:16 +0530 Subject: [PATCH] 1.6.0 (#177) * Updated Audit API, added experimental EtlJob API * Updated JDBC module, added support for JDBC H2 database * Added JSON support in core module * Updated BQ an DB Audit defaults --- .github/workflows/ci.yml | 17 ++ README.md | 24 +-- build.sbt | 2 - docs/readme.template.md | 4 +- examples.sbt | 8 +- .../src/main/scala/examples/Job1.scala | 2 +- .../src/main/scala/examples/Job2.scala | 2 +- .../src/main/scala/examples/Job3.scala | 6 +- .../{Job4EtlFlow.scala => Job3EtlFlow.scala} | 6 +- .../src/main/scala/examples/Job1GCP.scala | 3 +- .../src/main/scala/examples/Job1K8S.scala | 4 +- .../main/scala/examples/EtlJobBqToJdbc.scala | 2 +- .../scala/examples/EtlJobCsvToCsvGcs.scala | 2 +- .../examples/EtlJobCsvToParquetGcs.scala | 2 +- .../scala/examples/EtlJobParquetToJdbc.scala | 2 +- .../scala/examples/EtlJobParquetToOrc.scala | 2 +- .../src/test/scala/etlflow/S3TestSuite.scala | 2 +- .../src/main/scala/etlflow/audit/Audit.scala | 23 ++- .../main/scala/etlflow/audit/Console.scala | 11 +- .../src/main/scala/etlflow/audit/Memory.scala | 6 - .../src/main/scala/etlflow/audit/Slack.scala | 5 - .../main/scala/etlflow/audit/package.scala | 2 +- .../src/main/scala/etlflow/job/EtlJob.scala | 36 ++++ .../src/main/scala/etlflow/json/JSON.scala | 19 +++ .../etlflow/model/EtlFlowException.scala | 2 +- .../src/main/scala/etlflow/model/JobRun.scala | 5 + .../main/scala/etlflow/model/TaskRun.scala | 5 + .../src/main/scala/etlflow/task/EtlTask.scala | 40 ++++- .../test/scala-2/etlflow/json/Implicits.scala | 21 +++ .../test/scala-3/etlflow.json/Implicits.scala | 21 +++ .../test/scala/etlflow/EtlJobTestSuite.scala | 67 ++++++++ .../test/scala/etlflow/RunTestSuites.scala | 7 +- .../scala/etlflow/json/JsonTestSuite.scala | 66 ++++++++ .../src/test/scala/etlflow/json/Schema.scala | 20 +++ .../etlflow/task/SendMailTestSuite.scala | 5 +- .../gcp/src/main/scala/etlflow/audit/BQ.scala | 51 ++---- .../gcp/src/test/scala/etlflow/RunTests.scala | 2 +- .../scala/etlflow/audit/BQTestSuite.scala | 9 + .../etlflow/task/HttpTaskTestSuite.scala | 2 +- .../src/main/scala/etlflow/audit/DB.scala | 157 +++++++----------- .../jdbc/src/main/scala/etlflow/db/DB.scala | 28 ++-- .../src/main/scala/etlflow/db/DBImpl.scala | 37 ++--- .../main/scala/etlflow/task/DBReadTask.scala | 6 +- modules/jdbc/src/test/Readme.md | 25 ++- .../etlflow/SampleJobWithDbLogging.scala | 4 +- .../scala/etlflow/audit/DbTestSuite.scala | 7 + .../test/scala/etlflow/db/DbTestSuite.scala | 8 +- .../k8s/src/test/scala/etlflow/RunTests.scala | 2 +- .../scala/etlflow/task/RedisTaskSuite.scala | 7 +- .../src/test/scala/etlflow/RunAllTests.scala | 5 +- project/Dependencies.scala | 6 +- project/Versions.scala | 1 - version.sbt | 1 + 53 files changed, 544 insertions(+), 265 deletions(-) rename examples/examplecore/src/main/scala/examples/{Job4EtlFlow.scala => Job3EtlFlow.scala} (86%) create mode 100644 modules/core/src/main/scala/etlflow/job/EtlJob.scala create mode 100644 modules/core/src/main/scala/etlflow/json/JSON.scala create mode 100644 modules/core/src/test/scala-2/etlflow/json/Implicits.scala create mode 100644 modules/core/src/test/scala-3/etlflow.json/Implicits.scala create mode 100644 modules/core/src/test/scala/etlflow/EtlJobTestSuite.scala create mode 100644 modules/core/src/test/scala/etlflow/json/JsonTestSuite.scala create mode 100644 modules/core/src/test/scala/etlflow/json/Schema.scala create mode 100644 version.sbt diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 57d5d065..2288f46b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -81,6 +81,23 @@ jobs: DB_USER: etlflow DB_PWD: etlflow DB_DRIVER: com.mysql.cj.jdbc.Driver + dbh2: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up JDK + uses: actions/setup-java@v3 + with: + java-version: 11 + distribution: 'adopt' + cache: 'sbt' + - name: Run JDBC Tests + run: sbt ";project jdbc; test; Test/runMain etlflow.SampleJobWithDbLogging" + env: + DB_URL: jdbc:h2:file:~/h2db/coordinator + DB_USER: etlflow + DB_PWD: etlflow + DB_DRIVER: org.h2.Driver spark: runs-on: ubuntu-latest strategy: diff --git a/README.md b/README.md index 7d06eaae..9423f638 100644 --- a/README.md +++ b/README.md @@ -48,22 +48,22 @@ Add the below latest release as a dependency to your project __SBT__ ```scala -libraryDependencies += "com.github.tharwaninitin" %% "etlflow-core" % "1.5.0" -libraryDependencies += "com.github.tharwaninitin" %% "etlflow-gcp" % "1.5.0" -libraryDependencies += "com.github.tharwaninitin" %% "etlflow-jdbc" % "1.5.0" -libraryDependencies += "com.github.tharwaninitin" %% "etlflow-spark" % "1.5.0" -libraryDependencies += "com.github.tharwaninitin" %% "etlflow-k8s" % "1.5.0" -libraryDependencies += "com.github.tharwaninitin" %% "etlflow-http" % "1.5.0" -libraryDependencies += "com.github.tharwaninitin" %% "etlflow-redis" % "1.5.0" -libraryDependencies += "com.github.tharwaninitin" %% "etlflow-aws" % "1.5.0" -libraryDependencies += "com.github.tharwaninitin" %% "etlflow-email" % "1.5.0" +libraryDependencies += "com.github.tharwaninitin" %% "etlflow-core" % "1.6.0" +libraryDependencies += "com.github.tharwaninitin" %% "etlflow-gcp" % "1.6.0" +libraryDependencies += "com.github.tharwaninitin" %% "etlflow-jdbc" % "1.6.0" +libraryDependencies += "com.github.tharwaninitin" %% "etlflow-spark" % "1.6.0" +libraryDependencies += "com.github.tharwaninitin" %% "etlflow-k8s" % "1.6.0" +libraryDependencies += "com.github.tharwaninitin" %% "etlflow-http" % "1.6.0" +libraryDependencies += "com.github.tharwaninitin" %% "etlflow-redis" % "1.6.0" +libraryDependencies += "com.github.tharwaninitin" %% "etlflow-aws" % "1.6.0" +libraryDependencies += "com.github.tharwaninitin" %% "etlflow-email" % "1.6.0" ``` __Maven__ ``` com.github.tharwaninitin etlflow-core_2.12 - 1.5.0 + 1.6.0 ``` @@ -124,7 +124,7 @@ val programGCP: RIO[DPJob with DPCluster with Audit, Unit] = for { val dpJobLayer = DPJob.live(dpCluster, gcpProject, gcpRegion, dpEndpoint) val dpClusterLayer = DPCluster.live(gcpProject, gcpRegion, dpEndpoint) -programGCP.provide(dpJobLayer ++ dpClusterLayer ++ audit.test) +programGCP.provide(dpJobLayer ++ dpClusterLayer ++ audit.noop) ``` Check [this](examples/examplegcp/src/main/scala/examples/Job1GCP.scala) for complete example. @@ -151,7 +151,7 @@ val programK8S: RIO[K8S with Audit, Unit] = for { _ <- DeleteKubeJobTask("DeleteKubeJobTask", jobName).execute } yield () -programK8S.provide(K8S.live() ++ audit.test) +programK8S.provide(K8S.live() ++ audit.noop) ``` Check [this](examples/examplek8s/src/main/scala/examples/Job1K8S.scala) for complete example. ## JDBC diff --git a/build.sbt b/build.sbt index 10f9c391..01a14a0d 100644 --- a/build.sbt +++ b/build.sbt @@ -4,8 +4,6 @@ import Versions._ Global / onChangedBuildSource := ReloadOnSourceChanges -ThisBuild / version := EtlFlowVersion - lazy val commonSettings = Seq( scalaVersion := Scala212, dependencyUpdatesFailBuild := true, diff --git a/docs/readme.template.md b/docs/readme.template.md index 90303b51..ace38c06 100644 --- a/docs/readme.template.md +++ b/docs/readme.template.md @@ -124,7 +124,7 @@ val programGCP: RIO[DPJob with DPCluster with Audit, Unit] = for { val dpJobLayer = DPJob.live(dpCluster, gcpProject, gcpRegion, dpEndpoint) val dpClusterLayer = DPCluster.live(gcpProject, gcpRegion, dpEndpoint) -programGCP.provide(dpJobLayer ++ dpClusterLayer ++ audit.test) +programGCP.provide(dpJobLayer ++ dpClusterLayer ++ audit.noop) ``` Check [this](examples/examplegcp/src/main/scala/examples/Job1GCP.scala) for complete example. @@ -151,7 +151,7 @@ val programK8S: RIO[K8S with Audit, Unit] = for { _ <- DeleteKubeJobTask("DeleteKubeJobTask", jobName).execute } yield () -programK8S.provide(K8S.live() ++ audit.test) +programK8S.provide(K8S.live() ++ audit.noop) ``` Check [this](examples/examplek8s/src/main/scala/examples/Job1K8S.scala) for complete example. ## JDBC diff --git a/examples.sbt b/examples.sbt index ac4fddb6..95673dc0 100644 --- a/examples.sbt +++ b/examples.sbt @@ -21,7 +21,7 @@ lazy val examplecore = (project in file("examples/examplecore")) name := "examplecore", crossScalaVersions := AllScalaVersions, libraryDependencies ++= List( - "com.github.tharwaninitin" %% "etlflow-jdbc" % EtlFlowVersion, + "com.github.tharwaninitin" %% "etlflow-jdbc" % version.value, "ch.qos.logback" % "logback-classic" % LogbackVersion, "org.postgresql" % "postgresql" % PgVersion ) @@ -32,7 +32,7 @@ lazy val examplek8s = (project in file("examples/examplek8s")) name := "examplek8s", crossScalaVersions := AllScalaVersions, libraryDependencies ++= List( - "com.github.tharwaninitin" %% "etlflow-k8s" % EtlFlowVersion, + "com.github.tharwaninitin" %% "etlflow-k8s" % version.value, "ch.qos.logback" % "logback-classic" % LogbackVersion ) ) @@ -42,7 +42,7 @@ lazy val examplegcp = (project in file("examples/examplegcp")) name := "examplegcp", crossScalaVersions := AllScalaVersions, libraryDependencies ++= List( - "com.github.tharwaninitin" %% "etlflow-gcp" % EtlFlowVersion, + "com.github.tharwaninitin" %% "etlflow-gcp" % version.value, "ch.qos.logback" % "logback-classic" % LogbackVersion ) ) @@ -52,7 +52,7 @@ lazy val examplespark = (project in file("examples/examplespark")) name := "examplespark", crossScalaVersions := Scala2Versions, libraryDependencies ++= List( - "com.github.tharwaninitin" %% "etlflow-spark" % EtlFlowVersion, + "com.github.tharwaninitin" %% "etlflow-spark" % version.value, ("org.apache.spark" %% "spark-sql" % SparkVersion).excludeAll(ExclusionRule(organization = "org.scala-lang.modules")), "ch.qos.logback" % "logback-classic" % LogbackVersion, "com.google.cloud.bigdataoss" % "gcs-connector" % HadoopGCSVersion diff --git a/examples/examplecore/src/main/scala/examples/Job1.scala b/examples/examplecore/src/main/scala/examples/Job1.scala index 52d865d2..5a980f1e 100644 --- a/examples/examplecore/src/main/scala/examples/Job1.scala +++ b/examples/examplecore/src/main/scala/examples/Job1.scala @@ -15,5 +15,5 @@ object Job1 extends zio.ZIOAppDefault with ApplicationLogger { function = executeTask() ) - override def run: Task[Unit] = task1.execute.provideLayer(etlflow.audit.test) + override def run: Task[Unit] = task1.execute.provideLayer(etlflow.audit.noop) } diff --git a/examples/examplecore/src/main/scala/examples/Job2.scala b/examples/examplecore/src/main/scala/examples/Job2.scala index 53093d95..dc36eaa9 100644 --- a/examples/examplecore/src/main/scala/examples/Job2.scala +++ b/examples/examplecore/src/main/scala/examples/Job2.scala @@ -42,5 +42,5 @@ object Job2 extends zio.ZIOAppDefault with ApplicationLogger { _ <- task3.execute } yield () - override def run: Task[Unit] = job.provideLayer(etlflow.audit.test) + override def run: Task[Unit] = job.provideLayer(etlflow.audit.noop) } diff --git a/examples/examplecore/src/main/scala/examples/Job3.scala b/examples/examplecore/src/main/scala/examples/Job3.scala index ca35aded..9fe530f2 100644 --- a/examples/examplecore/src/main/scala/examples/Job3.scala +++ b/examples/examplecore/src/main/scala/examples/Job3.scala @@ -19,12 +19,12 @@ object Job3 extends zio.ZIOAppDefault with ApplicationLogger { query = "SELECT job_name,job_run_id,state FROM jobrun LIMIT 10" )(rs => EtlJobRun(rs.string("job_name"), rs.string("job_run_id"), rs.string("state"))) - private def processData(ip: List[EtlJobRun]): Unit = { + private def processData(ip: Iterable[EtlJobRun]): Unit = { logger.info("Processing Data") ip.foreach(jr => logger.info(s"$jr")) } - private def task2(ip: List[EtlJobRun]): GenericTask[Unit] = GenericTask( + private def task2(ip: Iterable[EtlJobRun]): GenericTask[Unit] = GenericTask( name = "ProcessData", function = processData(ip) ) @@ -34,5 +34,5 @@ object Job3 extends zio.ZIOAppDefault with ApplicationLogger { _ <- task2(op).execute } yield () - override def run: Task[Unit] = job.provideLayer(etlflow.audit.test) + override def run: Task[Unit] = job.provideLayer(etlflow.audit.noop) } diff --git a/examples/examplecore/src/main/scala/examples/Job4EtlFlow.scala b/examples/examplecore/src/main/scala/examples/Job3EtlFlow.scala similarity index 86% rename from examples/examplecore/src/main/scala/examples/Job4EtlFlow.scala rename to examples/examplecore/src/main/scala/examples/Job3EtlFlow.scala index 5b363964..1fab6482 100644 --- a/examples/examplecore/src/main/scala/examples/Job4EtlFlow.scala +++ b/examples/examplecore/src/main/scala/examples/Job3EtlFlow.scala @@ -7,7 +7,7 @@ import etlflow.task.{DBReadTask, GenericTask} import zio.{Chunk, RIO, ZLayer} @SuppressWarnings(Array("org.wartremover.warts.ToString")) -object Job4EtlFlow extends JobApp { +object Job3EtlFlow extends JobApp { case class EtlJobRun(job_name: String, job_run_id: String, state: String) @@ -20,12 +20,12 @@ object Job4EtlFlow extends JobApp { query = "SELECT job_name,job_run_id,status FROM jobrun LIMIT 10" )(rs => EtlJobRun(rs.string("job_name"), rs.string("job_run_id"), rs.string("status"))) - private def processData(ip: List[EtlJobRun]): Unit = { + private def processData(ip: Iterable[EtlJobRun]): Unit = { logger.info("Processing Data") ip.foreach(jr => logger.info(s"$jr")) } - private def task2(ip: List[EtlJobRun]): GenericTask[Unit] = GenericTask( + private def task2(ip: Iterable[EtlJobRun]): GenericTask[Unit] = GenericTask( name = "ProcessData", function = processData(ip) ) diff --git a/examples/examplegcp/src/main/scala/examples/Job1GCP.scala b/examples/examplegcp/src/main/scala/examples/Job1GCP.scala index d09612a4..d7e75ce0 100644 --- a/examples/examplegcp/src/main/scala/examples/Job1GCP.scala +++ b/examples/examplegcp/src/main/scala/examples/Job1GCP.scala @@ -1,5 +1,6 @@ package examples +import etlflow.audit import etlflow.log.ApplicationLogger import etlflow.task._ import gcp4zio.dp._ @@ -42,5 +43,5 @@ object Job1GCP extends ZIOAppDefault with ApplicationLogger { private val dpClusterLayer = DPCluster.live(gcpProject, gcpRegion, dpEndpoint) - override def run: Task[Unit] = program.provide(dpJobLayer ++ dpClusterLayer ++ etlflow.audit.test) + override def run: Task[Unit] = program.provide(dpJobLayer ++ dpClusterLayer ++ audit.noop) } diff --git a/examples/examplek8s/src/main/scala/examples/Job1K8S.scala b/examples/examplek8s/src/main/scala/examples/Job1K8S.scala index 835392aa..3192fd5b 100644 --- a/examples/examplek8s/src/main/scala/examples/Job1K8S.scala +++ b/examples/examplek8s/src/main/scala/examples/Job1K8S.scala @@ -1,7 +1,7 @@ package examples -import etlflow.audit.Audit import etlflow.audit +import etlflow.audit.Audit import etlflow.k8s._ import etlflow.log.ApplicationLogger import etlflow.task._ @@ -28,5 +28,5 @@ object Job1K8S extends ZIOAppDefault with ApplicationLogger { _ <- DeleteKubeJobTask("DeleteKubeJobTask", jobName).execute } yield () - override def run: Task[Unit] = ZIO.logInfo("Starting Job1K8S") *> program.provide(K8S.live() ++ audit.test) + override def run: Task[Unit] = ZIO.logInfo("Starting Job1K8S") *> program.provide(K8S.live() ++ audit.noop) } diff --git a/examples/examplespark/src/main/scala/examples/EtlJobBqToJdbc.scala b/examples/examplespark/src/main/scala/examples/EtlJobBqToJdbc.scala index 8e8191c9..8cc699e0 100644 --- a/examples/examplespark/src/main/scala/examples/EtlJobBqToJdbc.scala +++ b/examples/examplespark/src/main/scala/examples/EtlJobBqToJdbc.scala @@ -20,7 +20,7 @@ object EtlJobBqToJdbc extends zio.ZIOAppDefault with ApplicationLogger { outputType = RDB(JDBC(sys.env("DB_URL"), sys.env("DB_USER"), sys.env("DB_PWD"), sys.env("DB_DRIVER"))), outputLocation = "ratings", outputSaveMode = SaveMode.Overwrite - ).execute.provideLayer(SparkLive.live(spark) ++ etlflow.audit.test) + ).execute.provideLayer(SparkLive.live(spark) ++ etlflow.audit.noop) override def run: Task[Unit] = task1 } diff --git a/examples/examplespark/src/main/scala/examples/EtlJobCsvToCsvGcs.scala b/examples/examplespark/src/main/scala/examples/EtlJobCsvToCsvGcs.scala index 96f9a4e5..06bbaa5d 100644 --- a/examples/examplespark/src/main/scala/examples/EtlJobCsvToCsvGcs.scala +++ b/examples/examplespark/src/main/scala/examples/EtlJobCsvToCsvGcs.scala @@ -58,7 +58,7 @@ object EtlJobCsvToCsvGcs extends zio.ZIOAppDefault with ApplicationLogger { outputLocation = gcsOutputPath, outputPartitionCol = Seq(f"$tempDateCol"), outputSaveMode = SaveMode.Overwrite - ).execute.provideLayer(SparkLive.live(spark) ++ etlflow.audit.test) + ).execute.provideLayer(SparkLive.live(spark) ++ etlflow.audit.noop) override def run: Task[Unit] = task1 } diff --git a/examples/examplespark/src/main/scala/examples/EtlJobCsvToParquetGcs.scala b/examples/examplespark/src/main/scala/examples/EtlJobCsvToParquetGcs.scala index 8918cc8b..7c0c8b8d 100644 --- a/examples/examplespark/src/main/scala/examples/EtlJobCsvToParquetGcs.scala +++ b/examples/examplespark/src/main/scala/examples/EtlJobCsvToParquetGcs.scala @@ -73,5 +73,5 @@ object EtlJobCsvToParquetGcs extends zio.ZIOAppDefault with ApplicationLogger { _ <- task2.execute } yield () - override def run: Task[Unit] = job.provideLayer(SparkLive.live(spark) ++ etlflow.audit.test) + override def run: Task[Unit] = job.provideLayer(SparkLive.live(spark) ++ etlflow.audit.noop) } diff --git a/examples/examplespark/src/main/scala/examples/EtlJobParquetToJdbc.scala b/examples/examplespark/src/main/scala/examples/EtlJobParquetToJdbc.scala index 64fd0674..ea3179f2 100644 --- a/examples/examplespark/src/main/scala/examples/EtlJobParquetToJdbc.scala +++ b/examples/examplespark/src/main/scala/examples/EtlJobParquetToJdbc.scala @@ -23,7 +23,7 @@ object EtlJobParquetToJdbc extends zio.ZIOAppDefault with ApplicationLogger { outputSaveMode = SaveMode.Overwrite ) - private val job = task1.execute.provideLayer(SparkLive.live(spark) ++ etlflow.audit.test) + private val job = task1.execute.provideLayer(SparkLive.live(spark) ++ etlflow.audit.noop) override def run: Task[Unit] = job } diff --git a/examples/examplespark/src/main/scala/examples/EtlJobParquetToOrc.scala b/examples/examplespark/src/main/scala/examples/EtlJobParquetToOrc.scala index b6394d4a..f2c6a393 100644 --- a/examples/examplespark/src/main/scala/examples/EtlJobParquetToOrc.scala +++ b/examples/examplespark/src/main/scala/examples/EtlJobParquetToOrc.scala @@ -25,7 +25,7 @@ object EtlJobParquetToOrc extends zio.ZIOAppDefault with ApplicationLogger { outputFilename = Some("ratings.orc") ) - private val job = task1.execute.provideLayer(SparkLive.live(spark) ++ etlflow.audit.test) + private val job = task1.execute.provideLayer(SparkLive.live(spark) ++ etlflow.audit.noop) override def run: Task[Unit] = job } diff --git a/modules/aws/src/test/scala/etlflow/S3TestSuite.scala b/modules/aws/src/test/scala/etlflow/S3TestSuite.scala index b3b84a85..4e34dded 100644 --- a/modules/aws/src/test/scala/etlflow/S3TestSuite.scala +++ b/modules/aws/src/test/scala/etlflow/S3TestSuite.scala @@ -75,5 +75,5 @@ object S3TestSuite extends ZIOSpecDefault with TestHelper with ApplicationLogger .runCollect assertZIO(task.foldZIO(ex => ZIO.fail(ex.getMessage), _ => ZIO.succeed("ok")))(equalTo("ok")) } - ) @@ TestAspect.sequential).provideLayerShared(env ++ audit.test) + ) @@ TestAspect.sequential).provideLayerShared(env ++ audit.noop) } diff --git a/modules/core/src/main/scala/etlflow/audit/Audit.scala b/modules/core/src/main/scala/etlflow/audit/Audit.scala index 2d36ad4e..258ba84b 100644 --- a/modules/core/src/main/scala/etlflow/audit/Audit.scala +++ b/modules/core/src/main/scala/etlflow/audit/Audit.scala @@ -1,9 +1,10 @@ package etlflow.audit import etlflow.model._ -import zio.{UIO, URIO, ZIO} +import zio.{RIO, Task, UIO, URIO, ZIO} // format: off +@SuppressWarnings(Array("org.wartremover.warts.ToString")) trait Audit { val jobRunId: String @@ -13,22 +14,26 @@ trait Audit { def logTaskStart(taskRunId: String, taskName: String, props: Map[String,String], taskType: String): UIO[Unit] def logTaskEnd(taskRunId: String, taskName: String, props: Map[String,String], taskType: String, error: Option[Throwable]): UIO[Unit] - def getJobRuns(query: String): UIO[Iterable[JobRun]] - def getTaskRuns(query: String): UIO[Iterable[TaskRun]] + def getJobRuns(query: String): Task[Iterable[JobRun]] = ZIO.logInfo(query) *> ZIO.succeed(List.empty[JobRun]) + def getTaskRuns(query: String): Task[Iterable[TaskRun]] = ZIO.logInfo(query) *> ZIO.succeed(List.empty[TaskRun]) + + type RS + def fetchResults[T](query: String)(fn: RS => T): Task[Iterable[T]] = ZIO.logInfo(query + fn.toString) *> ZIO.succeed(Iterable.empty) } object Audit { def logJobStart(jobName: String, props: Map[String,String]): URIO[Audit, Unit] = - ZIO.environmentWithZIO(_.get.logJobStart(jobName, props)) + ZIO.serviceWithZIO(_.logJobStart(jobName, props)) def logJobEnd(jobName: String, props: Map[String,String], error: Option[Throwable] = None): URIO[Audit, Unit] = - ZIO.environmentWithZIO(_.get.logJobEnd(jobName, props, error)) + ZIO.serviceWithZIO(_.logJobEnd(jobName, props, error)) def logTaskStart(taskRunId: String, taskName: String, props: Map[String,String], taskType: String): URIO[Audit, Unit] = - ZIO.environmentWithZIO(_.get.logTaskStart(taskRunId, taskName, props, taskType)) + ZIO.serviceWithZIO(_.logTaskStart(taskRunId, taskName, props, taskType)) def logTaskEnd(taskRunId: String, taskName: String, props: Map[String,String], taskType: String, error: Option[Throwable] = None): URIO[Audit, Unit] = - ZIO.environmentWithZIO(_.get.logTaskEnd(taskRunId, taskName, props, taskType, error)) + ZIO.serviceWithZIO(_.logTaskEnd(taskRunId, taskName, props, taskType, error)) - def getJobRuns(query: String): URIO[Audit ,Iterable[JobRun]] = ZIO.environmentWithZIO(_.get.getJobRuns(query)) - def getTaskRuns(query: String): URIO[Audit, Iterable[TaskRun]] = ZIO.environmentWithZIO(_.get.getTaskRuns(query)) + def getJobRuns(query: String): RIO[Audit ,Iterable[JobRun]] = ZIO.serviceWithZIO(_.getJobRuns(query)) + def getTaskRuns(query: String): RIO[Audit, Iterable[TaskRun]] = ZIO.serviceWithZIO(_.getTaskRuns(query)) + def fetchResults[T](query: String)(fn: Audit#RS => T): RIO[Audit, Iterable[T]] = ZIO.serviceWithZIO[Audit](_.fetchResults(query)(fn)) } // format: on diff --git a/modules/core/src/main/scala/etlflow/audit/Console.scala b/modules/core/src/main/scala/etlflow/audit/Console.scala index 2de8fbcf..79817788 100644 --- a/modules/core/src/main/scala/etlflow/audit/Console.scala +++ b/modules/core/src/main/scala/etlflow/audit/Console.scala @@ -1,6 +1,5 @@ package etlflow.audit -import etlflow.model.{JobRun, TaskRun} import zio.{UIO, ZIO} object Console extends Audit { @@ -27,19 +26,15 @@ object Console extends Audit { } override def logJobStart(jobName: String, props: Map[String, String]): UIO[Unit] = - ZIO.logInfo(s"Job started") + ZIO.logInfo(s"Job $jobName started") override def logJobEnd( jobName: String, props: Map[String, String], error: Option[Throwable] ): UIO[Unit] = error.fold { - ZIO.logInfo(s"Job completed with success") + ZIO.logInfo(s"Job $jobName completed with success") } { ex => - ZIO.logError(s"Job completed with failure ${ex.getMessage}") + ZIO.logError(s"Job $jobName completed with failure ${ex.getMessage}") } - - override def getJobRuns(query: String): UIO[Iterable[JobRun]] = ZIO.succeed(List.empty[JobRun]) - - override def getTaskRuns(query: String): UIO[Iterable[TaskRun]] = ZIO.succeed(List.empty[TaskRun]) } diff --git a/modules/core/src/main/scala/etlflow/audit/Memory.scala b/modules/core/src/main/scala/etlflow/audit/Memory.scala index e38baa5d..f4941c39 100644 --- a/modules/core/src/main/scala/etlflow/audit/Memory.scala +++ b/modules/core/src/main/scala/etlflow/audit/Memory.scala @@ -1,7 +1,6 @@ package etlflow.audit import etlflow.log.ApplicationLogger -import etlflow.model.{JobRun, TaskRun} import etlflow.utils.DateTimeApi import zio.{Ref, UIO, ZIO} import scala.collection.mutable @@ -64,11 +63,6 @@ case class Memory(jobRunId: String) extends Audit with ApplicationLogger { value.values.toList.sortBy(_.start_time).foreach(x => logger.info(x.toString())) } } yield () - - override def getJobRuns(query: String): UIO[Iterable[JobRun]] = ZIO.succeed(List.empty[JobRun]) - - override def getTaskRuns(query: String): UIO[Iterable[TaskRun]] = ZIO.succeed(List.empty[TaskRun]) - } object Memory { diff --git a/modules/core/src/main/scala/etlflow/audit/Slack.scala b/modules/core/src/main/scala/etlflow/audit/Slack.scala index bafdaa52..876ca428 100644 --- a/modules/core/src/main/scala/etlflow/audit/Slack.scala +++ b/modules/core/src/main/scala/etlflow/audit/Slack.scala @@ -1,7 +1,6 @@ package etlflow.audit import etlflow.log.ApplicationLogger -import etlflow.model.{JobRun, TaskRun} import zio.{UIO, ZIO} import java.io.{BufferedWriter, OutputStreamWriter} import java.net.{HttpURLConnection, URL} @@ -104,8 +103,4 @@ case class Slack(jobRunId: String, slackUrl: String) extends Audit with Applicat sendSlackNotification(data) }.orDie - - override def getJobRuns(query: String): UIO[Iterable[JobRun]] = ZIO.succeed(List.empty[JobRun]) - - override def getTaskRuns(query: String): UIO[Iterable[TaskRun]] = ZIO.succeed(List.empty[TaskRun]) } diff --git a/modules/core/src/main/scala/etlflow/audit/package.scala b/modules/core/src/main/scala/etlflow/audit/package.scala index 042a292e..e527f180 100644 --- a/modules/core/src/main/scala/etlflow/audit/package.scala +++ b/modules/core/src/main/scala/etlflow/audit/package.scala @@ -4,7 +4,7 @@ import etlflow.model.{JobRun, TaskRun} import zio.{UIO, ULayer, ZIO, ZLayer} package object audit { - val test: ULayer[Audit] = ZLayer.succeed( + val noop: ULayer[Audit] = ZLayer.succeed( new Audit { override val jobRunId: String = "" override def logJobStart(jobName: String, props: Map[String, String]): UIO[Unit] = ZIO.unit diff --git a/modules/core/src/main/scala/etlflow/job/EtlJob.scala b/modules/core/src/main/scala/etlflow/job/EtlJob.scala new file mode 100644 index 00000000..25789666 --- /dev/null +++ b/modules/core/src/main/scala/etlflow/job/EtlJob.scala @@ -0,0 +1,36 @@ +package etlflow.job + +import etlflow.audit.Audit +import etlflow.task.EtlTask +import zio.RIO + +/** Experimental EtlJob API, don't use in production + */ +trait EtlJob[R, OP] { + + protected def process: RIO[R with Audit, OP] + + final def execute(name: String, props: Map[String, String]): RIO[R with Audit, OP] = for { + _ <- Audit.logJobStart(name, props) + op <- process.tapError(ex => Audit.logJobEnd(name, props, Some(ex))) + _ <- Audit.logJobEnd(name, props, None) + } yield op + + /** Experimental flatMap API for EtlJob, don't use in production + */ + def flatMap[R1, OP1](fn: OP => EtlTask[R1, OP1]): EtlJob[R with R1, OP1] = EtlJob.flatMap[R, OP, R1, OP1](this, fn) + + /** Experimental *>(variant of flatMap that ignores the value produced by this effect) API for EtlJob, don't use in production + */ + def *>[R1, OP1](that: EtlTask[R1, OP1]): EtlJob[R with R1, OP1] = EtlJob.flatMap[R, OP, R1, OP1](this, _ => that) +} + +object EtlJob { + + /** Experimental flatMap API for EtlJob, don't use in production + */ + def flatMap[R1, OP1, R2, OP2](currentJob: EtlJob[R1, OP1], fn: OP1 => EtlTask[R2, OP2]): EtlJob[R1 with R2, OP2] = + new EtlJob[R1 with R2, OP2] { + override protected def process: RIO[R1 with R2 with Audit, OP2] = currentJob.process.flatMap(op => fn(op).execute) + } +} diff --git a/modules/core/src/main/scala/etlflow/json/JSON.scala b/modules/core/src/main/scala/etlflow/json/JSON.scala new file mode 100644 index 00000000..654be3b1 --- /dev/null +++ b/modules/core/src/main/scala/etlflow/json/JSON.scala @@ -0,0 +1,19 @@ +package etlflow.json + +import etlflow.model.EtlFlowException.JsonDecodeException +import zio._ +import zio.json._ + +object JSON { + + def convertToObject[T](str: String)(implicit decoder: JsonDecoder[T]): Either[String, T] = str.fromJson[T] + + def convertToString[T](obj: T)(implicit decoder: JsonEncoder[T]): String = obj.toJson + + def convertToStringPretty[T](obj: T)(implicit decoder: JsonEncoder[T]): String = obj.toJsonPretty + + def convertToObjectZIO[T](str: String)(implicit decoder: JsonDecoder[T]): IO[JsonDecodeException, T] = + ZIO.fromEither(convertToObject[T](str)).mapError(str => JsonDecodeException(str)) + + def convertToStringZIO[T](obj: T)(implicit encoder: JsonEncoder[T]): UIO[String] = ZIO.succeed(convertToString[T](obj)) +} diff --git a/modules/core/src/main/scala/etlflow/model/EtlFlowException.scala b/modules/core/src/main/scala/etlflow/model/EtlFlowException.scala index f250ae46..13d16657 100644 --- a/modules/core/src/main/scala/etlflow/model/EtlFlowException.scala +++ b/modules/core/src/main/scala/etlflow/model/EtlFlowException.scala @@ -16,7 +16,7 @@ object EtlFlowException { override def toString: String = s"$msg" } - final case class EtlJobNotFoundException(msg: String) extends EtlFlowException(msg) { + final case class JsonDecodeException(msg: String) extends EtlFlowException(msg) { override def toString: String = s"$msg" } } diff --git a/modules/core/src/main/scala/etlflow/model/JobRun.scala b/modules/core/src/main/scala/etlflow/model/JobRun.scala index 59b5bda3..934429c1 100644 --- a/modules/core/src/main/scala/etlflow/model/JobRun.scala +++ b/modules/core/src/main/scala/etlflow/model/JobRun.scala @@ -1,5 +1,6 @@ package etlflow.model +import zio.json.{DeriveJsonCodec, JsonCodec} import java.time.ZonedDateTime case class JobRun( @@ -10,3 +11,7 @@ case class JobRun( createdAt: ZonedDateTime, modifiedAt: ZonedDateTime ) + +object JobRun { + implicit val codecJR: JsonCodec[JobRun] = DeriveJsonCodec.gen[JobRun] +} diff --git a/modules/core/src/main/scala/etlflow/model/TaskRun.scala b/modules/core/src/main/scala/etlflow/model/TaskRun.scala index 369a2820..855f808a 100644 --- a/modules/core/src/main/scala/etlflow/model/TaskRun.scala +++ b/modules/core/src/main/scala/etlflow/model/TaskRun.scala @@ -1,5 +1,6 @@ package etlflow.model +import zio.json.{DeriveJsonCodec, JsonCodec} import java.time.ZonedDateTime case class TaskRun( @@ -12,3 +13,7 @@ case class TaskRun( createdAt: ZonedDateTime, modifiedAt: ZonedDateTime ) + +object TaskRun { + implicit val codecTR: JsonCodec[TaskRun] = DeriveJsonCodec.gen[TaskRun] +} diff --git a/modules/core/src/main/scala/etlflow/task/EtlTask.scala b/modules/core/src/main/scala/etlflow/task/EtlTask.scala index a8d355f9..cdf3e57c 100644 --- a/modules/core/src/main/scala/etlflow/task/EtlTask.scala +++ b/modules/core/src/main/scala/etlflow/task/EtlTask.scala @@ -1,6 +1,7 @@ package etlflow.task import etlflow.audit.Audit +import etlflow.job.EtlJob import etlflow.log.ApplicationLogger import zio.{RIO, ZIO} @@ -16,9 +17,40 @@ trait EtlTask[R, OP] extends ApplicationLogger { final def execute: RIO[R with Audit, OP] = for { tri <- ZIO.succeed(java.util.UUID.randomUUID.toString) _ <- Audit.logTaskStart(tri, name, getTaskProperties, taskType) - op <- process.tapError { ex => - Audit.logTaskEnd(tri, name, getTaskProperties, taskType, Some(ex)) - } - _ <- Audit.logTaskEnd(tri, name, getTaskProperties, taskType, None) + op <- process.tapError(ex => Audit.logTaskEnd(tri, name, getTaskProperties, taskType, Some(ex))) + _ <- Audit.logTaskEnd(tri, name, getTaskProperties, taskType, None) } yield op + + /** Experimental map API for EtlTask, don't use in production + */ + def map[B](f: OP => B): EtlTask[R, B] = EtlTask.map(this, f) + + /** Experimental flatMap API for EtlTask to convert to EtlJob, don't use in production + */ + def flatMap[R1, OP1](fn: OP => EtlTask[R1, OP1]): EtlJob[R with R1, OP1] = EtlTask.flatMap[R, OP, R1, OP1](this, fn) + + /** Experimental *> API (variant of flatMap that ignores the value produced by this effect) for EtlTask to convert to EtlJob, + * don't use in production + */ + def *>[R1, OP1](that: EtlTask[R1, OP1]): EtlJob[R with R1, OP1] = EtlTask.flatMap[R, OP, R1, OP1](this, _ => that) +} + +@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements", "org.wartremover.warts.AsInstanceOf")) +object EtlTask { + + /** Experimental map API for EtlTask, don't use in production + */ + def flatMap[R1, OP1, R2, OP2](currentTask: EtlTask[R1, OP1], fn: OP1 => EtlTask[R2, OP2]): EtlJob[R1 with R2, OP2] = + new EtlJob[R1 with R2, OP2] { + override protected def process: RIO[R1 with R2 with Audit, OP2] = currentTask.execute.flatMap(op => fn(op).execute) + } + + /** Experimental flatMap API for EtlTask to convert to EtlJob, don't use in production + */ + def map[R, A, OP](currentTask: EtlTask[R, A], fn: A => OP): EtlTask[R, OP] = + new EtlTask[R, OP] { + override protected def process: RIO[R, OP] = currentTask.process.map(fn) + override val name: String = currentTask.name + override val taskType: String = currentTask.taskType + } } diff --git a/modules/core/src/test/scala-2/etlflow/json/Implicits.scala b/modules/core/src/test/scala-2/etlflow/json/Implicits.scala new file mode 100644 index 00000000..0c2bc1a9 --- /dev/null +++ b/modules/core/src/test/scala-2/etlflow/json/Implicits.scala @@ -0,0 +1,21 @@ +package etlflow.json + +import etlflow.json.Schema._ +import zio.json._ + +trait Implicits { + implicit val loggingLevelEncoder: JsonEncoder[LoggingLevel] = JsonEncoder[String].contramap { + case LoggingLevel.INFO => "info" + case LoggingLevel.JOB => "job" + case LoggingLevel.DEBUG => "debug" + } + implicit val loggingLevelDecoder: JsonDecoder[LoggingLevel] = JsonDecoder[String].map { + case "info" => LoggingLevel.INFO + case "job" => LoggingLevel.JOB + case "debug" => LoggingLevel.DEBUG + } + + implicit val etlJob23PropsEncoder: JsonEncoder[EtlJob23Props] = DeriveJsonEncoder.gen + implicit val etlJob23PropsDecoder: JsonDecoder[EtlJob23Props] = DeriveJsonDecoder.gen + implicit val studentDecoder: JsonDecoder[Student] = DeriveJsonDecoder.gen +} diff --git a/modules/core/src/test/scala-3/etlflow.json/Implicits.scala b/modules/core/src/test/scala-3/etlflow.json/Implicits.scala new file mode 100644 index 00000000..10fbab6e --- /dev/null +++ b/modules/core/src/test/scala-3/etlflow.json/Implicits.scala @@ -0,0 +1,21 @@ +package etlflow.json + +import etlflow.json.Schema._ +import zio.json._ + +trait Implicits { + given JsonEncoder[LoggingLevel] = JsonEncoder[String].contramap { + case LoggingLevel.INFO => "info" + case LoggingLevel.JOB => "job" + case LoggingLevel.DEBUG => "debug" + } + given JsonDecoder[LoggingLevel] = JsonDecoder[String].map { + case "info" => LoggingLevel.INFO + case "job" => LoggingLevel.JOB + case "debug" => LoggingLevel.DEBUG + } + + given JsonDecoder[Student] = DeriveJsonDecoder.gen + given JsonEncoder[EtlJob23Props] = DeriveJsonEncoder.gen + given JsonDecoder[EtlJob23Props] = DeriveJsonDecoder.gen +} diff --git a/modules/core/src/test/scala/etlflow/EtlJobTestSuite.scala b/modules/core/src/test/scala/etlflow/EtlJobTestSuite.scala new file mode 100644 index 00000000..4347c999 --- /dev/null +++ b/modules/core/src/test/scala/etlflow/EtlJobTestSuite.scala @@ -0,0 +1,67 @@ +package etlflow + +import etlflow.audit.Audit +import etlflow.log.ApplicationLogger +import etlflow.task.GenericTask +import zio.ZIO +import zio.test.Assertion.equalTo +import zio.test._ + +@SuppressWarnings(Array("org.wartremover.warts.Throw")) +object EtlJobTestSuite extends ApplicationLogger { + def processData(msg: String): Unit = logger.info(s"Hello World from $msg") + + def processDataFailure(msg: String): Unit = { + logger.error(s"Hello World from $msg") + throw new RuntimeException("!!! Failure") + } + + def sendString(msg: String): String = { + logger.info(s"Sent $msg") + msg + } + + def sendInt(msg: Int): Int = { + logger.info(s"Sent $msg") + msg + } + + val task1: GenericTask[Unit] = GenericTask( + name = "ProcessData1", + function = processData("ProcessData1") + ) + + val task2: GenericTask[Unit] = GenericTask( + name = "ProcessData2", + function = processData("ProcessData2") + ) + + val task2Failure: GenericTask[Unit] = GenericTask( + name = "ProcessData2", + function = processDataFailure("ProcessData2") + ) + + val task3: GenericTask[String] = GenericTask( + name = "ProcessData3", + function = sendString("ProcessData3") + ) + + val task4: GenericTask[Int] = GenericTask( + name = "ProcessData4", + function = sendInt(100) + ) + + val spec: Spec[Audit, Any] = + suite("EtlJob Test Suite")( + test("Test Job") { + val pipeline = task1 *> task2 *> task3 *> task4 + val pipelineZIO = pipeline.execute("Hello World 1", Map.empty).provide(audit.console) + assertZIO(pipelineZIO.foldZIO(ex => ZIO.succeed(ex.getMessage), _ => ZIO.succeed("ok")))(equalTo("ok")) + }, + test("Test Job (Failure)") { + val pipeline = task1 *> task2Failure *> task3 *> task4 + val pipelineZIO = pipeline.execute("Hello World 2", Map.empty).provide(audit.console) + assertZIO(pipelineZIO.foldZIO(ex => ZIO.succeed(ex.getMessage), _ => ZIO.succeed("ok")))(equalTo("!!! Failure")) + } + ) +} diff --git a/modules/core/src/test/scala/etlflow/RunTestSuites.scala b/modules/core/src/test/scala/etlflow/RunTestSuites.scala index 4dbeed88..f13151b9 100644 --- a/modules/core/src/test/scala/etlflow/RunTestSuites.scala +++ b/modules/core/src/test/scala/etlflow/RunTestSuites.scala @@ -1,5 +1,6 @@ package etlflow +import etlflow.json.JsonTestSuite import etlflow.log.ApplicationLogger import etlflow.utils._ import zio.test._ @@ -13,6 +14,8 @@ object RunTestSuites extends ZIOSpecDefault with ApplicationLogger { DateTimeAPITestSuite.spec, RetryTaskTestSuite.spec, GenericTaskTestSuite.spec, - ErrorHandlingTestSuite.spec - ) @@ TestAspect.sequential).provideShared(audit.test ++ Runtime.removeDefaultLoggers) + ErrorHandlingTestSuite.spec, + JsonTestSuite.spec, + EtlJobTestSuite.spec + ) @@ TestAspect.sequential).provideShared(audit.noop ++ Runtime.removeDefaultLoggers) } diff --git a/modules/core/src/test/scala/etlflow/json/JsonTestSuite.scala b/modules/core/src/test/scala/etlflow/json/JsonTestSuite.scala new file mode 100644 index 00000000..ffe698fa --- /dev/null +++ b/modules/core/src/test/scala/etlflow/json/JsonTestSuite.scala @@ -0,0 +1,66 @@ +package etlflow.json + +import etlflow.json.Schema._ +import zio.test.Assertion.equalTo +import zio.test._ +import scala.collection.immutable.ListMap + +object JsonTestSuite extends Implicits { + val student1Json: String = """{ + |"name":"John", + |"id":"63", + |"class": "101" + |}""".stripMargin + + val student2Json: String = """{ + |"name":"John", + |"id":"63" + |}""".stripMargin + + val ip1: EtlJob23Props = EtlJob23Props("data/movies/ratings/*", "test", "ratings_par", true, true, LoggingLevel.DEBUG) + val op1: String = + """{"ratings_input_path":"data/movies/ratings/*","ratings_output_dataset":"test","ratings_output_table_name":"ratings_par","job_send_slack_notification":true,"job_enable_db_logging":true,"job_notification_level":"debug"}""".stripMargin + + val ip2: Map[String, String] = ListMap( + Map( + "job_send_slack_notification" -> "false", + "job_enable_db_logging" -> "true", + "job_notification_level" -> "info", + "job_max_active_runs" -> "10", + "job_name" -> "etlflow.coretests.jobs.Job3DBSteps", + "job_description" -> "", + "job_props_name" -> "etlflow.coretests.Schema$EtlJob4Props", + "job_deploy_mode" -> "dataproc", + "job_retry_delay_in_minutes" -> "0", + "job_schedule" -> "0 30 7 ? * *", + "job_retries" -> "0" + ).toSeq.sortBy(_._2): _* + ) + + val op2 = + """{"job_description":"","job_retry_delay_in_minutes":"0","job_retries":"0","job_schedule":"0 30 7 ? * *","job_max_active_runs":"10","job_deploy_mode":"dataproc","job_props_name":"etlflow.coretests.Schema$EtlJob4Props","job_name":"etlflow.coretests.jobs.Job3DBSteps","job_send_slack_notification":"false","job_notification_level":"info","job_enable_db_logging":"true"}""".stripMargin + + val spec: Spec[Any, Any] = + suite("Json Test")( + test("convertToObject: String to Student") { + val ip = JSON.convertToObjectZIO[Student](student1Json) + assertZIO(ip)(equalTo(Student("63", "John", Some("101")))) + }, + test("convertToObject: String to Student") { + val ip = JSON.convertToObjectZIO[Student](student2Json) + assertZIO(ip)(equalTo(Student("63", "John", None))) + }, + test("convertToObject: String to Map") { + val ip = JSON.convertToObjectZIO[Map[String, String]](student2Json) + assertZIO(ip)(equalTo(Map("name" -> "John", "id" -> "63"))) + }, + test("convertToString: Case class to String") { + val ip = JSON.convertToStringZIO(ip1) + assertZIO(ip)(equalTo(op1)) + }, + test("convertToString: Map to String") { + val ip = JSON.convertToStringZIO[Map[String, String]](ip2) + assertZIO(ip)(equalTo(op2)) + } + ) +} diff --git a/modules/core/src/test/scala/etlflow/json/Schema.scala b/modules/core/src/test/scala/etlflow/json/Schema.scala new file mode 100644 index 00000000..89037052 --- /dev/null +++ b/modules/core/src/test/scala/etlflow/json/Schema.scala @@ -0,0 +1,20 @@ +package etlflow.json + +object Schema { + sealed trait LoggingLevel + object LoggingLevel { + case object JOB extends LoggingLevel + case object DEBUG extends LoggingLevel + case object INFO extends LoggingLevel + } + case class EtlJob23Props( + ratings_input_path: String = "", + ratings_output_dataset: String = "test", + ratings_output_table_name: String = "ratings", + job_send_slack_notification: Boolean = true, + job_enable_db_logging: Boolean = false, + job_notification_level: LoggingLevel = LoggingLevel.INFO + ) + case class Student(id: String, name: String, `class`: Option[String]) + case class HttpBinResponse(args: Map[String, String], headers: Map[String, String], origin: String, url: String) +} diff --git a/modules/email/src/test/scala/etlflow/task/SendMailTestSuite.scala b/modules/email/src/test/scala/etlflow/task/SendMailTestSuite.scala index 0e16bddc..ef0e552b 100644 --- a/modules/email/src/test/scala/etlflow/task/SendMailTestSuite.scala +++ b/modules/email/src/test/scala/etlflow/task/SendMailTestSuite.scala @@ -4,10 +4,9 @@ import etlflow.audit import etlflow.model.Credential.SMTP import zio.ZIO import zio.test.Assertion._ -import zio.test._ +import zio.test.{ZIOSpecDefault, _} import java.time.LocalDateTime import java.time.format.DateTimeFormatter -import zio.test.ZIOSpecDefault object SendMailTestSuite extends ZIOSpecDefault { @@ -57,5 +56,5 @@ object SendMailTestSuite extends ZIOSpecDefault { val props = task.getTaskProperties assertTrue(props == Map("subject" -> "SendMailTask Test Ran Successfully", "recipient_list" -> "abcd@abcd.com")) } - ).provideShared(audit.test) + ).provideShared(audit.noop) } diff --git a/modules/gcp/src/main/scala/etlflow/audit/BQ.scala b/modules/gcp/src/main/scala/etlflow/audit/BQ.scala index 9ff9e8a5..6bfe6bdf 100644 --- a/modules/gcp/src/main/scala/etlflow/audit/BQ.scala +++ b/modules/gcp/src/main/scala/etlflow/audit/BQ.scala @@ -1,13 +1,15 @@ package etlflow.audit +import com.google.cloud.bigquery.FieldValueList import etlflow.log.ApplicationLogger import etlflow.model.{JobRun, TaskRun} import etlflow.utils.MapToJson import gcp4zio.bq.{BQClient, BQImpl} -import zio.{TaskLayer, UIO, ZLayer} +import zio.{Task, TaskLayer, UIO, ZIO, ZLayer} import java.time.ZoneId +import java.util.UUID -@SuppressWarnings(Array("org.wartremover.warts.Var", "org.wartremover.warts.Throw", "org.wartremover.warts.ToString")) +@SuppressWarnings(Array("org.wartremover.warts.ToString")) object BQ extends ApplicationLogger { private[etlflow] case class BQAudit(jobRunId: String, client: BQImpl) extends etlflow.audit.Audit { @@ -19,10 +21,7 @@ object BQ extends ApplicationLogger { taskType: String ): UIO[Unit] = client .executeQuery(Sql.insertTaskRun(taskRunId, taskName, MapToJson(props), taskType, jobRunId)) - .fold( - e => logger.error(e.getMessage), - op => op - ) + .fold(e => logger.error(e.getMessage), op => op) override def logTaskEnd( taskRunId: String, @@ -34,17 +33,11 @@ object BQ extends ApplicationLogger { .executeQuery( Sql.updateTaskRun(taskRunId, MapToJson(props), error.fold("pass")(ex => s"failed with error: ${ex.getMessage}")) ) - .fold( - e => logger.error(e.getMessage), - op => op - ) + .fold(e => logger.error(e.getMessage), op => op) override def logJobStart(jobName: String, props: Map[String, String]): UIO[Unit] = client .executeQuery(Sql.insertJobRun(jobRunId, jobName, MapToJson(props))) - .fold( - e => logger.error(e.getMessage), - op => op - ) + .fold(e => logger.error(e.getMessage), op => op) override def logJobEnd( jobName: String, @@ -54,12 +47,9 @@ object BQ extends ApplicationLogger { .executeQuery( Sql.updateJobRun(jobRunId, error.fold("pass")(ex => s"failed with error: ${ex.getMessage}"), MapToJson(props)) ) - .fold( - e => logger.error(e.getMessage), - op => op - ) + .fold(e => logger.error(e.getMessage), op => op) - override def getJobRuns(query: String): UIO[Iterable[JobRun]] = client + override def getJobRuns(query: String): Task[Iterable[JobRun]] = client .getData(query)(fl => JobRun( fl.get("job_run_id").getStringValue, @@ -70,15 +60,9 @@ object BQ extends ApplicationLogger { fl.get("updated_at").getTimestampInstant.atZone(ZoneId.systemDefault()) ) ) - .fold( - { e => - logger.error(e.getMessage) - List.empty - }, - op => op - ) + .tapError(ex => ZIO.logError(ex.getMessage)) - override def getTaskRuns(query: String): UIO[Iterable[TaskRun]] = client + override def getTaskRuns(query: String): Task[Iterable[TaskRun]] = client .getData(query)(fl => TaskRun( fl.get("task_run_id").getStringValue, @@ -91,15 +75,12 @@ object BQ extends ApplicationLogger { fl.get("updated_at").getTimestampInstant.atZone(ZoneId.systemDefault()) ) ) - .fold( - { e => - logger.error(e.getMessage) - List.empty - }, - op => op - ) + .tapError(ex => ZIO.logError(ex.getMessage)) + + override type RS = FieldValueList + override def fetchResults[T](query: String)(fn: FieldValueList => T): Task[Iterable[T]] = client.getData(query)(fn) } - def apply(jri: String, credentials: Option[String] = None): TaskLayer[Audit] = + def apply(jri: String = UUID.randomUUID.toString, credentials: Option[String] = None): TaskLayer[Audit] = ZLayer.fromZIO(BQClient(credentials).map(bq => BQAudit(jri, BQImpl(bq)))) } diff --git a/modules/gcp/src/test/scala/etlflow/RunTests.scala b/modules/gcp/src/test/scala/etlflow/RunTests.scala index 20491381..947d2957 100644 --- a/modules/gcp/src/test/scala/etlflow/RunTests.scala +++ b/modules/gcp/src/test/scala/etlflow/RunTests.scala @@ -17,7 +17,7 @@ object RunTests extends ZIOSpecDefault with TestHelper with ApplicationLogger { private val dpClusterEnv = DPCluster.live(gcpProjectId.get, gcpRegion.get, dpEndpoint) - private val env = dpJobEnv ++ dpClusterEnv ++ BQ.live() ++ GCS.live() ++ audit.test ++ ZLayer.succeed(ClockLive) + private val env = dpJobEnv ++ dpClusterEnv ++ BQ.live() ++ GCS.live() ++ audit.noop ++ ZLayer.succeed(ClockLive) override def spec: Spec[TestEnvironment, Any] = (suite("GCP Tasks")( BQTestSuite.spec, diff --git a/modules/gcp/src/test/scala/etlflow/audit/BQTestSuite.scala b/modules/gcp/src/test/scala/etlflow/audit/BQTestSuite.scala index cd595cf7..40d284f5 100644 --- a/modules/gcp/src/test/scala/etlflow/audit/BQTestSuite.scala +++ b/modules/gcp/src/test/scala/etlflow/audit/BQTestSuite.scala @@ -1,5 +1,6 @@ package etlflow.audit +import com.google.cloud.bigquery.FieldValueList import zio.ZIO import zio.test._ @@ -34,6 +35,14 @@ object BQTestSuite { .getTaskRuns("SELECT * FROM etlflow.taskrun") .tap(op => ZIO.foreach(op)(i => ZIO.logInfo(i.toString))) .as(assertCompletes) + ), + zio.test.test("fetchResults Test")( + Audit + .fetchResults("SELECT job_name FROM etlflow.jobrun")(rs => + rs.asInstanceOf[FieldValueList].get("job_name").getStringValue + ) + .tap(op => ZIO.foreach(op)(i => ZIO.logInfo(i))) + .as(assertCompletes) ) ) @@ TestAspect.sequential } diff --git a/modules/http/src/test/scala/etlflow/task/HttpTaskTestSuite.scala b/modules/http/src/test/scala/etlflow/task/HttpTaskTestSuite.scala index abac3036..33098f2a 100644 --- a/modules/http/src/test/scala/etlflow/task/HttpTaskTestSuite.scala +++ b/modules/http/src/test/scala/etlflow/task/HttpTaskTestSuite.scala @@ -90,5 +90,5 @@ object HttpTaskTestSuite extends ZIOSpecDefault with ApplicationLogger { override def spec: Spec[TestEnvironment, Any] = (suite("Http Tasks")(test("Execute Http tasks") { assertZIO(job.foldZIO(ex => ZIO.fail(ex.getMessage), _ => ZIO.succeed("ok")))(equalTo("ok")) - }) @@ TestAspect.flaky).provideShared(audit.test) + }) @@ TestAspect.flaky).provideShared(audit.noop) } diff --git a/modules/jdbc/src/main/scala/etlflow/audit/DB.scala b/modules/jdbc/src/main/scala/etlflow/audit/DB.scala index 112c3e73..7be3a07c 100644 --- a/modules/jdbc/src/main/scala/etlflow/audit/DB.scala +++ b/modules/jdbc/src/main/scala/etlflow/audit/DB.scala @@ -1,27 +1,25 @@ package etlflow.audit +import etlflow.db.DBImpl import etlflow.log.ApplicationLogger import etlflow.model.Credential.JDBC import etlflow.model.{JobRun, TaskRun} import etlflow.utils.MapToJson -import scalikejdbc.NamedDB -import zio.{TaskLayer, UIO, ZIO, ZLayer} +import scalikejdbc.WrappedResultSet +import zio._ +import java.util.UUID +@SuppressWarnings(Array("org.wartremover.warts.ToString")) object DB extends ApplicationLogger { - private[etlflow] case class DBAudit(jobRunId: String, poolName: String) extends etlflow.audit.Audit { + private[etlflow] case class DBAudit(jobRunId: String, client: DBImpl) extends etlflow.audit.Audit { override def logTaskStart( taskRunId: String, taskName: String, props: Map[String, String], taskType: String ): UIO[Unit] = - ZIO - .attempt(NamedDB(poolName).localTx { implicit s => - Sql - .insertTaskRun(taskRunId, taskName, MapToJson(props), taskType, jobRunId) - .update - .apply() - }) + client + .executeQuery(Sql.insertTaskRun(taskRunId, taskName, MapToJson(props), taskType, jobRunId)) .fold(e => logger.error(e.getMessage), _ => ()) override def logTaskEnd( @@ -30,98 +28,69 @@ object DB extends ApplicationLogger { props: Map[String, String], taskType: String, error: Option[Throwable] - ): UIO[Unit] = - ZIO - .attempt(NamedDB(poolName).localTx { implicit s => - val status = error.fold("pass")(ex => s"failed with error: ${ex.getMessage}") - Sql - .updateTaskRun(taskRunId, MapToJson(props), status) - .update - .apply() - }) + ): UIO[Unit] = { + val status = error.fold("pass")(ex => s"failed with error: ${ex.getMessage}") + client + .executeQuery(Sql.updateTaskRun(taskRunId, MapToJson(props), status)) .fold(e => logger.error(e.getMessage), _ => ()) + } - override def logJobStart(jobName: String, props: Map[String, String]): UIO[Unit] = - ZIO - .attempt(NamedDB(poolName).localTx { implicit s => - val properties = MapToJson(props) - Sql - .insertJobRun(jobRunId, jobName, properties) - .update - .apply() - }) + override def logJobStart(jobName: String, props: Map[String, String]): UIO[Unit] = { + val properties = MapToJson(props) + client + .executeQuery(Sql.insertJobRun(jobRunId, jobName, properties)) .fold(e => logger.error(e.getMessage), _ => ()) + } - override def logJobEnd( - jobName: String, - props: Map[String, String], - error: Option[Throwable] - ): UIO[Unit] = - ZIO - .attempt(NamedDB(poolName).localTx { implicit s => - val status = error.fold("pass")(ex => s"failed with error: ${ex.getMessage}") - val properties = MapToJson(props) - Sql - .updateJobRun(jobRunId, status, properties) - .update - .apply() - }) + override def logJobEnd(jobName: String, props: Map[String, String], error: Option[Throwable]): UIO[Unit] = { + val status = error.fold("pass")(ex => s"failed with error: ${ex.getMessage}") + val properties = MapToJson(props) + client + .executeQuery(Sql.updateJobRun(jobRunId, status, properties)) .fold(e => logger.error(e.getMessage), _ => ()) + } + + override def getJobRuns(query: String): Task[Iterable[JobRun]] = client + .fetchResults(query) { rs => + JobRun( + rs.string("job_run_id"), + rs.string("job_name"), + rs.string("props"), + rs.string("status"), + rs.zonedDateTime("created_at"), + rs.zonedDateTime("updated_at") + ) + } + .tapError(ex => ZIO.logError(ex.getMessage)) - override def getJobRuns(query: String): UIO[Iterable[JobRun]] = ZIO - .attempt(NamedDB(poolName).localTx { implicit s => - scalikejdbc - .SQL(query) - .map(rs => - JobRun( - rs.string("job_run_id"), - rs.string("job_name"), - rs.string("props"), - rs.string("status"), - rs.zonedDateTime("created_at"), - rs.zonedDateTime("updated_at") - ) - ) - .list() - }) - .fold( - { e => - logger.error(e.getMessage) - List.empty - }, - op => op - ) + override def getTaskRuns(query: String): Task[Iterable[TaskRun]] = client + .fetchResults(query) { rs => + TaskRun( + rs.string("task_run_id"), + rs.string("job_run_id"), + rs.string("task_name"), + rs.string("task_type"), + rs.string("props"), + rs.string("status"), + rs.zonedDateTime("created_at"), + rs.zonedDateTime("updated_at") + ) + } + .tapError(ex => ZIO.logError(ex.getMessage)) - override def getTaskRuns(query: String): UIO[Iterable[TaskRun]] = ZIO - .attempt(NamedDB(poolName).localTx { implicit s => - scalikejdbc - .SQL(query) - .map(rs => - TaskRun( - rs.string("task_run_id"), - rs.string("job_run_id"), - rs.string("task_name"), - rs.string("task_type"), - rs.string("props"), - rs.string("status"), - rs.zonedDateTime("created_at"), - rs.zonedDateTime("updated_at") - ) - ) - .list() - }) - .fold( - { e => - logger.error(e.getMessage) - List.empty - }, - op => op - ) + override type RS = WrappedResultSet + override def fetchResults[T](query: String)(fn: WrappedResultSet => T): Task[Iterable[T]] = client.fetchResults(query)(fn) } - private[etlflow] def layer(jobRunId: String): ZLayer[String, Throwable, Audit] = - ZLayer(ZIO.service[String].map(pool => DBAudit(jobRunId, pool))) + private[etlflow] def layer(jobRunId: String, fetchSize: Option[Int]): ZLayer[String, Throwable, Audit] = + ZLayer(ZIO.service[String].map(pool => DBAudit(jobRunId, DBImpl(pool, fetchSize)))) - def apply(db: JDBC, jobRunId: String, poolName: String = "EtlFlow-Audit-Pool", poolSize: Int = 2): TaskLayer[Audit] = - etlflow.db.CP.layer(db, poolName, poolSize) >>> layer(jobRunId) + def apply( + db: JDBC, + jobRunId: String = UUID.randomUUID.toString, + poolName: String = "EtlFlow-Audit-Pool", + poolSize: Int = 2, + fetchSize: Option[Int] = None + ): TaskLayer[Audit] = + etlflow.db.CP.layer(db, poolName, poolSize) >>> layer(jobRunId, fetchSize) } diff --git a/modules/jdbc/src/main/scala/etlflow/db/DB.scala b/modules/jdbc/src/main/scala/etlflow/db/DB.scala index 279a9464..10054318 100644 --- a/modules/jdbc/src/main/scala/etlflow/db/DB.scala +++ b/modules/jdbc/src/main/scala/etlflow/db/DB.scala @@ -2,34 +2,38 @@ package etlflow.db import etlflow.audit.Audit import etlflow.model.Credential.JDBC -import scalikejdbc.WrappedResultSet +import scalikejdbc.{NoExtractor, SQL, WrappedResultSet} import zio.{RIO, Task, TaskLayer, URLayer, ZIO, ZLayer} trait DB { def executeQuery(query: String): Task[Unit] - def executeQuerySingleOutput[T](query: String)(fn: WrappedResultSet => T): Task[T] - def executeQueryListOutput[T](query: String)(fn: WrappedResultSet => T): Task[List[T]] + def executeQuery(query: SQL[Nothing, NoExtractor]): Task[Unit] + def fetchResult[T](query: String)(fn: WrappedResultSet => T): Task[T] + def fetchResults[T](query: String)(fn: WrappedResultSet => T): Task[Iterable[T]] } object DB { def executeQuery(query: String): RIO[DB, Unit] = ZIO.environmentWithZIO(_.get.executeQuery(query)) - def executeQuerySingleOutput[T](query: String)(fn: WrappedResultSet => T): RIO[DB, T] = - ZIO.environmentWithZIO(_.get.executeQuerySingleOutput(query)(fn)) + def executeQuery(query: SQL[Nothing, NoExtractor]): RIO[DB, Unit] = ZIO.environmentWithZIO(_.get.executeQuery(query)) - def executeQueryListOutput[T](query: String)(fn: WrappedResultSet => T): RIO[DB, List[T]] = - ZIO.environmentWithZIO(_.get.executeQueryListOutput(query)(fn)) + def fetchResult[T](query: String)(fn: WrappedResultSet => T): RIO[DB, T] = + ZIO.environmentWithZIO(_.get.fetchResult(query)(fn)) - val layer: URLayer[String, DB] = ZLayer(ZIO.service[String].map(pool => DBImpl(pool))) + def fetchResults[T](query: String)(fn: WrappedResultSet => T): RIO[DB, Iterable[T]] = + ZIO.environmentWithZIO(_.get.fetchResults(query)(fn)) - def live(db: JDBC, poolName: String = "EtlFlow-DB-Pool", poolSize: Int = 2): TaskLayer[DB] = - CP.layer(db, poolName, poolSize) >>> layer + def layer(fetchSize: Option[Int]): URLayer[String, DB] = ZLayer(ZIO.service[String].map(pool => DBImpl(pool, fetchSize))) + + def live(db: JDBC, poolName: String = "EtlFlow-DB-Pool", poolSize: Int = 2, fetchSize: Option[Int] = None): TaskLayer[DB] = + CP.layer(db, poolName, poolSize) >>> layer(fetchSize) def liveAudit( db: JDBC, jobRunId: String, poolName: String = "EtlFlow-DB-Audit-Pool", - poolSize: Int = 2 + poolSize: Int = 2, + fetchSize: Option[Int] = None ): TaskLayer[DB with Audit] = - CP.layer(db, poolName, poolSize) >>> (DB.layer ++ etlflow.audit.DB.layer(jobRunId)) + CP.layer(db, poolName, poolSize) >>> (DB.layer(fetchSize) ++ etlflow.audit.DB.layer(jobRunId, fetchSize)) } diff --git a/modules/jdbc/src/main/scala/etlflow/db/DBImpl.scala b/modules/jdbc/src/main/scala/etlflow/db/DBImpl.scala index bbe75364..c7b0cbc4 100644 --- a/modules/jdbc/src/main/scala/etlflow/db/DBImpl.scala +++ b/modules/jdbc/src/main/scala/etlflow/db/DBImpl.scala @@ -2,44 +2,37 @@ package etlflow.db import etlflow.log.ApplicationLogger import etlflow.model.EtlFlowException.DBException -import scalikejdbc.{NamedDB, WrappedResultSet} +import scalikejdbc.{NamedDB, NoExtractor, SQL, WrappedResultSet} import zio._ -private[etlflow] case class DBImpl(poolName: String) extends DB with ApplicationLogger { +private[etlflow] case class DBImpl(poolName: String, fetchSize: Option[Int]) extends DB with ApplicationLogger { override def executeQuery(query: String): IO[DBException, Unit] = ZIO - .attempt(NamedDB(poolName).localTx { implicit s => - scalikejdbc - .SQL(query) - .update() - }) + .attempt(NamedDB(poolName).localTx(implicit s => scalikejdbc.SQL(query).update())) + .mapError { e => + logger.error(e.getMessage) + DBException(e.getMessage) + } + .unit + override def executeQuery(query: SQL[Nothing, NoExtractor]): IO[DBException, Unit] = + ZIO + .attempt(NamedDB(poolName).localTx(implicit s => query.update())) .mapError { e => logger.error(e.getMessage) DBException(e.getMessage) } .unit @SuppressWarnings(Array("org.wartremover.warts.OptionPartial")) - override def executeQuerySingleOutput[T](query: String)(fn: WrappedResultSet => T): IO[DBException, T] = + override def fetchResult[T](query: String)(fn: WrappedResultSet => T): IO[DBException, T] = ZIO - .attempt(NamedDB(poolName).localTx { implicit s => - scalikejdbc - .SQL(query) - .map(fn) - .single() - .get - }) + .attempt(NamedDB(poolName).localTx(implicit s => scalikejdbc.SQL(query).map(fn).single().get)) .mapError { e => logger.error(e.getMessage) DBException(e.getMessage) } - override def executeQueryListOutput[T](query: String)(fn: WrappedResultSet => T): IO[DBException, List[T]] = + override def fetchResults[T](query: String)(fn: WrappedResultSet => T): IO[DBException, Iterable[T]] = ZIO - .attempt(NamedDB(poolName).localTx { implicit s => - scalikejdbc - .SQL(query) - .map(fn) - .list() - }) + .attempt(NamedDB(poolName).localTx(implicit s => scalikejdbc.SQL(query).fetchSize(fetchSize).map(fn).iterable())) .mapError { e => logger.error(e.getMessage) DBException(e.getMessage) diff --git a/modules/jdbc/src/main/scala/etlflow/task/DBReadTask.scala b/modules/jdbc/src/main/scala/etlflow/task/DBReadTask.scala index 265efb10..05fa45f9 100644 --- a/modules/jdbc/src/main/scala/etlflow/task/DBReadTask.scala +++ b/modules/jdbc/src/main/scala/etlflow/task/DBReadTask.scala @@ -4,12 +4,12 @@ import etlflow.db.DB import scalikejdbc.WrappedResultSet import zio.{RIO, ZIO} -case class DBReadTask[T](name: String, query: String)(fn: WrappedResultSet => T) extends EtlTask[DB, List[T]] { - override protected def process: RIO[DB, List[T]] = for { +case class DBReadTask[T](name: String, query: String)(fn: WrappedResultSet => T) extends EtlTask[DB, Iterable[T]] { + override protected def process: RIO[DB, Iterable[T]] = for { _ <- ZIO.logInfo("#" * 100) _ <- ZIO.logInfo(s"Starting DB Read Task: $name") _ <- ZIO.logInfo(s"Query: $query") - out <- DB.executeQueryListOutput[T](query)(fn) + out <- DB.fetchResults[T](query)(fn) } yield out override def getTaskProperties: Map[String, String] = Map("query" -> query) } diff --git a/modules/jdbc/src/test/Readme.md b/modules/jdbc/src/test/Readme.md index 8212287f..d6989d51 100644 --- a/modules/jdbc/src/test/Readme.md +++ b/modules/jdbc/src/test/Readme.md @@ -26,13 +26,30 @@ export DB_DRIVER=com.mysql.cj.jdbc.Driver export INIT=true ``` -#Run Sample Test Job +#Set environment variables for H2 +```shell +export DB_URL=jdbc:h2:file:~/h2db/coordinator +export DB_USER=etlflow +export DB_PWD=etlflow +export DB_DRIVER=org.h2.Driver +export INIT=true + +rm -r ~/h2db # Cleanup directory before running +``` + +#Create DB Schema ```shell -sbt ";project db; Test/runMain etlflow.audit.CreateDB" -sbt ";project db; Test/runMain etlflow.SampleJobWithDbLogging" +sbt ";project jdbc; Test/runMain etlflow.audit.CreateDB" ``` #Run Tests ```shell -sbt ";project db; test" +sbt ";project jdbc; test" ``` + +#Run Sample Test Job +```shell +sbt ";project jdbc; Test/runMain etlflow.SampleJobWithDbLogging" +``` + + diff --git a/modules/jdbc/src/test/scala/etlflow/SampleJobWithDbLogging.scala b/modules/jdbc/src/test/scala/etlflow/SampleJobWithDbLogging.scala index ce672c60..536883d4 100644 --- a/modules/jdbc/src/test/scala/etlflow/SampleJobWithDbLogging.scala +++ b/modules/jdbc/src/test/scala/etlflow/SampleJobWithDbLogging.scala @@ -19,12 +19,12 @@ object SampleJobWithDbLogging extends JobApp { query = "SELECT job_name,job_run_id,status FROM jobrun LIMIT 10" )(rs => EtlJobRun(rs.string("job_name"), rs.string("job_run_id"), rs.string("status"))) - private def processData(ip: List[EtlJobRun]): Unit = { + private def processData(ip: Iterable[EtlJobRun]): Unit = { logger.info("Processing Data") ip.foreach(jr => logger.info(s"$jr")) } - private def task2(ip: List[EtlJobRun]): GenericTask[Unit] = GenericTask( + private def task2(ip: Iterable[EtlJobRun]): GenericTask[Unit] = GenericTask( name = "ProcessData", function = processData(ip) ) diff --git a/modules/jdbc/src/test/scala/etlflow/audit/DbTestSuite.scala b/modules/jdbc/src/test/scala/etlflow/audit/DbTestSuite.scala index 9078c0cf..a591c377 100644 --- a/modules/jdbc/src/test/scala/etlflow/audit/DbTestSuite.scala +++ b/modules/jdbc/src/test/scala/etlflow/audit/DbTestSuite.scala @@ -1,5 +1,6 @@ package etlflow.audit +import scalikejdbc.WrappedResultSet import zio.ZIO import zio.test._ @@ -31,6 +32,12 @@ object DbTestSuite { .getTaskRuns("SELECT * FROM taskrun") .tap(op => ZIO.foreach(op)(i => ZIO.logInfo(i.toString))) .as(assertCompletes) + ), + zio.test.test("fetchResults Test")( + Audit + .fetchResults("SELECT job_name FROM jobrun")(rs => rs.asInstanceOf[WrappedResultSet].string("job_name")) + .tap(op => ZIO.foreach(op)(i => ZIO.logInfo(i))) + .as(assertCompletes) ) ) @@ TestAspect.sequential } diff --git a/modules/jdbc/src/test/scala/etlflow/db/DbTestSuite.scala b/modules/jdbc/src/test/scala/etlflow/db/DbTestSuite.scala index f0347560..7f31b164 100644 --- a/modules/jdbc/src/test/scala/etlflow/db/DbTestSuite.scala +++ b/modules/jdbc/src/test/scala/etlflow/db/DbTestSuite.scala @@ -20,16 +20,16 @@ object DbTestSuite { """.stripMargin assertZIO(DB.executeQuery(query).foldZIO(ex => ZIO.fail(ex.getMessage), _ => ZIO.succeed("Done")))(equalTo("Done")) }, - test("executeQuerySingleOutput Test")( + test("fetchResult Test")( assertZIO( DB - .executeQuerySingleOutput("""SELECT job_name FROM jobrun ORDER BY job_run_id LIMIT 1""")(rs => rs.string("job_name")) + .fetchResult("""SELECT job_name FROM jobrun ORDER BY job_run_id LIMIT 1""")(rs => rs.string("job_name")) .foldZIO(ex => ZIO.fail(ex.getMessage), op => ZIO.succeed(op)) )(equalTo("EtlJobDownload")) ), - test("executeQueryListOutput Test") { + test("fetchResults Test") { val res = DB - .executeQueryListOutput[TestDb]("SELECT job_name FROM jobrun")(rs => TestDb(rs.string("job_name"))) + .fetchResults[TestDb]("SELECT job_name FROM jobrun")(rs => TestDb(rs.string("job_name"))) .foldZIO(_ => ZIO.fail(List.empty[TestDb]), op => ZIO.succeed(op)) assertZIO(res)(equalTo(List(TestDb("EtlJobDownload"), TestDb("EtlJobSpr")))) } diff --git a/modules/k8s/src/test/scala/etlflow/RunTests.scala b/modules/k8s/src/test/scala/etlflow/RunTests.scala index a9e95cba..fac2040b 100644 --- a/modules/k8s/src/test/scala/etlflow/RunTests.scala +++ b/modules/k8s/src/test/scala/etlflow/RunTests.scala @@ -8,7 +8,7 @@ import zio.test._ object RunTests extends ZIOSpecDefault { - private val env: TaskLayer[K8S with Audit] = K8S.live() ++ audit.test + private val env: TaskLayer[K8S with Audit] = K8S.live() ++ audit.noop override def spec: Spec[TestEnvironment, Any] = (suite("Kube Tasks")( CreateKubeJobTestSuite.spec, diff --git a/modules/redis/src/test/scala/etlflow/task/RedisTaskSuite.scala b/modules/redis/src/test/scala/etlflow/task/RedisTaskSuite.scala index bb0cc876..0d8e19f8 100644 --- a/modules/redis/src/test/scala/etlflow/task/RedisTaskSuite.scala +++ b/modules/redis/src/test/scala/etlflow/task/RedisTaskSuite.scala @@ -1,11 +1,10 @@ package etlflow.task -import RedisTask.RedisCmd import etlflow.model.Credential.REDIS +import etlflow.task.RedisTask.RedisCmd import zio.ZIO import zio.test.Assertion.equalTo -import zio.test._ -import zio.test.ZIOSpecDefault +import zio.test.{ZIOSpecDefault, _} object RedisTaskSuite extends ZIOSpecDefault { @@ -52,5 +51,5 @@ object RedisTaskSuite extends ZIOSpecDefault { override def spec: Spec[TestEnvironment, Any] = suite("Redis Tasks")(test("Execute redis tasks") { assertZIO(job.foldZIO(ex => ZIO.fail(ex.getMessage), _ => ZIO.succeed("ok")))(equalTo("ok")) - }).provideLayerShared(etlflow.audit.test) + }).provideLayerShared(etlflow.audit.noop) } diff --git a/modules/spark/src/test/scala/etlflow/RunAllTests.scala b/modules/spark/src/test/scala/etlflow/RunAllTests.scala index 4af3d91c..9c6f13b8 100644 --- a/modules/spark/src/test/scala/etlflow/RunAllTests.scala +++ b/modules/spark/src/test/scala/etlflow/RunAllTests.scala @@ -2,8 +2,7 @@ package etlflow import etlflow.spark.SparkLive import etlflow.task._ -import zio.test._ -import zio.test.ZIOSpecDefault +import zio.test.{ZIOSpecDefault, _} object RunAllTests extends ZIOSpecDefault with TestSparkSession { override def spec: Spec[TestEnvironment, Any] = @@ -14,5 +13,5 @@ object RunAllTests extends ZIOSpecDefault with TestSparkSession { ParquetToJdbcGenericTestSuite.spec, TransformationTestSuite.spec // BQtoGCStoGCSTestSuite.spec @@ TestAspect.ignore - ).provideShared((SparkLive.live(spark) ++ audit.test).orDie) @@ TestAspect.sequential + ).provideShared((SparkLive.live(spark) ++ audit.noop).orDie) @@ TestAspect.sequential } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 26102e9f..c3e6164b 100755 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -5,7 +5,8 @@ object Dependencies { lazy val coreLibs = List( "dev.zio" %% "zio" % ZioVersion, - "dev.zio" %% "zio-logging-slf4j" % ZioLogVersion + "dev.zio" %% "zio-logging-slf4j" % ZioLogVersion, + "dev.zio" %% "zio-json" % ZioJsonVersion ) lazy val awsLibs = List( @@ -66,7 +67,8 @@ object Dependencies { lazy val jdbcTestLibs = List( "org.postgresql" % "postgresql" % PgVersion, - "mysql" % "mysql-connector-java" % MySqlVersion + "mysql" % "mysql-connector-java" % MySqlVersion, + "com.h2database" % "h2" % "2.1.+" ).map(_ % Test) lazy val sparkTestLibs = List( diff --git a/project/Versions.scala b/project/Versions.scala index 837a960d..13bdba10 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -4,7 +4,6 @@ object Versions { val Scala3 = "3.2.1" val AllScalaVersions = List(Scala212, Scala213, Scala3) val Scala2Versions = List(Scala212, Scala213) - val EtlFlowVersion = "1.5.0" val ZioVersion = "2.0.5" val ZioLogVersion = "2.1.7" diff --git a/version.sbt b/version.sbt new file mode 100644 index 00000000..17c0562c --- /dev/null +++ b/version.sbt @@ -0,0 +1 @@ +ThisBuild / version := "1.6.0"