Skip to content

Commit

Permalink
1.7.2 (#182)
Browse files Browse the repository at this point in the history
* Added FTP module
* Update GCSCopyTask.scala
* Updated Http module
  • Loading branch information
tharwaninitin authored Feb 5, 2023
1 parent af6e0f9 commit 513eaf8
Show file tree
Hide file tree
Showing 35 changed files with 521 additions and 273 deletions.
21 changes: 21 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,27 @@ jobs:
cache: 'sbt'
- name: Run AWS Tests
run: sbt ";project aws; +test"
ftp:
runs-on: ubuntu-latest
strategy:
matrix:
java-version: [ 11, 17 ] # removed version 8 temporarily to avoid sbt-updates failing because of the latest version of LogbackVersion
fail-fast: false
steps:
- name: Checkout Code
uses: actions/checkout@v2
- name: Setup ftp
run: |
docker run -d -p 2222:22 --name ftp \
atmoz/sftp foo:foo:::upload
- name: Set up JDK
uses: actions/setup-java@v3
with:
java-version: ${{ matrix.java-version }}
distribution: 'adopt'
cache: 'sbt'
- name: Run FTP Tests
run: sbt ";project ftp; +test"
k8s:
runs-on: ubuntu-latest
strategy:
Expand Down
22 changes: 12 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Below are some salient features of this library.
| K8S | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/etlflow-k8s_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/etlflow-k8s) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/etlflow-k8s_3/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/etlflow-k8s_3) | [![etlflow-k8s Scala version support](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-k8s/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-k8s) | Java 8 + |
| Email | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/etlflow-email_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/etlflow-email) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/etlflow-email_3/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/etlflow-email_3) | [![etlflow-email Scala version support](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-email/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-email) | Java 8 + |
| AWS | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/etlflow-aws_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/etlflow-aws) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/etlflow-aws_3/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/etlflow-aws_3) | [![etlflow-aws Scala version support](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-aws/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-aws) | Java 8 + |
| FTP | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/etlflow-ftp_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/etlflow-ftp) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/etlflow-ftp_3/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/etlflow-ftp_3) | [![etlflow-ftp Scala version support](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-ftp/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-ftp) | Java 8 + |
| Redis | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/etlflow-redis_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/etlflow-redis) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/etlflow-redis_2.12/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/etlflow-redis_2.12) | [![etlflow-redis Scala version support](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-redis/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-redis) | Java 8 + |
| Spark | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/etlflow-spark_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/etlflow-spark) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/etlflow-spark_2.12/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/etlflow-spark_2.12) | [![etlflow-spark Scala version support](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-spark/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-spark) | Java 8 + |

Expand All @@ -59,22 +60,23 @@ Add the below latest release as a dependency to your project

__SBT__
```scala
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-core" % "1.7.1"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-gcp" % "1.7.1"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-jdbc" % "1.7.1"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-spark" % "1.7.1"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-k8s" % "1.7.1"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-http" % "1.7.1"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-redis" % "1.7.1"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-aws" % "1.7.1"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-email" % "1.7.1"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-core" % "1.7.2"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-gcp" % "1.7.2"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-jdbc" % "1.7.2"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-spark" % "1.7.2"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-k8s" % "1.7.2"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-http" % "1.7.2"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-ftp" % "1.7.2"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-redis" % "1.7.2"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-aws" % "1.7.2"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-email" % "1.7.2"
```
__Maven__
```
<dependency>
<groupId>com.github.tharwaninitin</groupId>
<artifactId>etlflow-core_2.12</artifactId>
<version>1.7.1</version>
<version>1.7.2</version>
</dependency>
```

Expand Down
15 changes: 13 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,18 @@ lazy val k8sSettings = Seq(
libraryDependencies ++= k8sLibs ++ coreTestLibs
)

lazy val ftpSettings = Seq(
name := "etlflow-ftp",
crossScalaVersions := Scala2Versions,
libraryDependencies ++= ftpLibs ++ coreTestLibs
)

lazy val etlflow = (project in file("."))
.settings(
crossScalaVersions := Nil, // crossScalaVersions must be set to Nil on the aggregating project
publish / skip := true
)
.aggregate(core.js, core.jvm, spark, jdbc, http, redis, email, aws, gcp, k8s)
.aggregate(core.js, core.jvm, spark, jdbc, http, redis, email, aws, gcp, k8s, ftp)

lazy val core =
(crossProject(JSPlatform, JVMPlatform).withoutSuffixFor(JVMPlatform).crossType(CrossType.Pure) in file("modules/core"))
Expand Down Expand Up @@ -134,7 +140,12 @@ lazy val gcp = (project in file("modules/gcp"))
lazy val k8s = (project in file("modules/k8s"))
.settings(commonSettings)
.settings(k8sSettings)
.dependsOn(core.jvm, http)
.dependsOn(core.jvm)

lazy val ftp = (project in file("modules/ftp"))
.settings(commonSettings)
.settings(ftpSettings)
.dependsOn(core.jvm)

lazy val docs = project
.in(file("modules/docs")) // important: it must not be docs/
Expand Down
2 changes: 2 additions & 0 deletions docs/readme.template.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Below are some salient features of this library.
| K8S | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/etlflow-k8s_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/etlflow-k8s) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/etlflow-k8s_3/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/etlflow-k8s_3) | [![etlflow-k8s Scala version support](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-k8s/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-k8s) | Java 8 + |
| Email | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/etlflow-email_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/etlflow-email) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/etlflow-email_3/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/etlflow-email_3) | [![etlflow-email Scala version support](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-email/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-email) | Java 8 + |
| AWS | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/etlflow-aws_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/etlflow-aws) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/etlflow-aws_3/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/etlflow-aws_3) | [![etlflow-aws Scala version support](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-aws/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-aws) | Java 8 + |
| FTP | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/etlflow-ftp_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/etlflow-ftp) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/etlflow-ftp_3/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/etlflow-ftp_3) | [![etlflow-ftp Scala version support](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-ftp/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-ftp) | Java 8 + |
| Redis | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/etlflow-redis_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/etlflow-redis) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/etlflow-redis_2.12/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/etlflow-redis_2.12) | [![etlflow-redis Scala version support](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-redis/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-redis) | Java 8 + |
| Spark | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/etlflow-spark_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/etlflow-spark) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/etlflow-spark_2.12/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/etlflow-spark_2.12) | [![etlflow-spark Scala version support](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-spark/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/etlflow/etlflow-spark) | Java 8 + |

Expand All @@ -65,6 +66,7 @@ libraryDependencies += "com.github.tharwaninitin" %% "etlflow-jdbc" % "@VERSION@
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-spark" % "@VERSION@"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-k8s" % "@VERSION@"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-http" % "@VERSION@"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-ftp" % "@VERSION@"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-redis" % "@VERSION@"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-aws" % "@VERSION@"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-email" % "@VERSION@"
Expand Down
Binary file modified moduleDep.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
12 changes: 6 additions & 6 deletions modules/core/src/main/scala/etlflow/JobApp.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package etlflow

import etlflow.audit.Audit
import etlflow.json.JSON
import etlflow.log.ApplicationLogger
import zio._

Expand Down Expand Up @@ -44,12 +45,11 @@ trait JobApp extends ZIOAppDefault with ApplicationLogger {
*/
final def execute(cliArgs: Chunk[String]): RIO[Audit, Unit] =
for {
args <- ZIO.succeed(cliArgs.zipWithIndex.map(t => (t._2.toString, t._1)).toMap)
_ <- Audit.logJobStart(name, args)
_ <- job(cliArgs).tapError { ex =>
Audit.logJobEnd(name, args, Some(ex))
}
_ <- Audit.logJobEnd(name, args)
args <- ZIO.succeed(cliArgs.zipWithIndex.map(t => (t._2.toString, t._1)).toMap)
props <- JSON.convertToStringZIO(args)
_ <- Audit.logJobStart(name, props)
_ <- job(cliArgs).tapError(ex => Audit.logJobEnd(Some(ex)))
_ <- Audit.logJobEnd(None)
} yield ()

/** This is just a wrapper around default run method available with ZIOAppDefault to call [[execute execute(Chunk[String])]]
Expand Down
21 changes: 9 additions & 12 deletions modules/core/src/main/scala/etlflow/audit/Audit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import zio.{RIO, Task, UIO, URIO, ZIO}
trait Audit {
val jobRunId: String

def logJobStart(jobName: String, props: Map[String,String]): UIO[Unit]
def logJobEnd(jobName: String, props: Map[String,String], error: Option[Throwable]): UIO[Unit]
def logJobStart(jobName: String, props: String): UIO[Unit]
def logJobEnd(error: Option[Throwable]): UIO[Unit]

def logTaskStart(taskRunId: String, taskName: String, props: Map[String,String], taskType: String): UIO[Unit]
def logTaskEnd(taskRunId: String, taskName: String, props: Map[String,String], taskType: String, error: Option[Throwable]): UIO[Unit]
def logTaskStart(taskRunId: String, taskName: String, props: String, taskType: String): UIO[Unit]
def logTaskEnd(taskRunId: String, error: Option[Throwable]): UIO[Unit]

def getJobRuns(query: String): Task[Iterable[JobRun]] = ZIO.logInfo(query) *> ZIO.succeed(List.empty[JobRun])
def getTaskRuns(query: String): Task[Iterable[TaskRun]] = ZIO.logInfo(query) *> ZIO.succeed(List.empty[TaskRun])
Expand All @@ -23,18 +23,15 @@ trait Audit {
}

object Audit {
def logJobStart(jobName: String, props: Map[String,String]): URIO[Audit, Unit] =
ZIO.serviceWithZIO(_.logJobStart(jobName, props))
def logJobEnd(jobName: String, props: Map[String,String], error: Option[Throwable] = None): URIO[Audit, Unit] =
ZIO.serviceWithZIO(_.logJobEnd(jobName, props, error))
def logJobStart(jobName: String, props: String): URIO[Audit, Unit] = ZIO.serviceWithZIO(_.logJobStart(jobName, props))
def logJobEnd(error: Option[Throwable] = None): URIO[Audit, Unit] = ZIO.serviceWithZIO(_.logJobEnd(error))

def logTaskStart(taskRunId: String, taskName: String, props: Map[String,String], taskType: String): URIO[Audit, Unit] =
ZIO.serviceWithZIO(_.logTaskStart(taskRunId, taskName, props, taskType))
def logTaskEnd(taskRunId: String, taskName: String, props: Map[String,String], taskType: String, error: Option[Throwable] = None): URIO[Audit, Unit] =
ZIO.serviceWithZIO(_.logTaskEnd(taskRunId, taskName, props, taskType, error))
def logTaskStart(taskRunId: String, taskName: String, props: String, taskType: String): URIO[Audit, Unit] = ZIO.serviceWithZIO(_.logTaskStart(taskRunId, taskName, props, taskType))
def logTaskEnd(taskRunId: String, error: Option[Throwable] = None): URIO[Audit, Unit] = ZIO.serviceWithZIO(_.logTaskEnd(taskRunId, error))

def getJobRuns(query: String): RIO[Audit ,Iterable[JobRun]] = ZIO.serviceWithZIO(_.getJobRuns(query))
def getTaskRuns(query: String): RIO[Audit, Iterable[TaskRun]] = ZIO.serviceWithZIO(_.getTaskRuns(query))

def fetchResults[T](query: String)(fn: Audit#RS => T): RIO[Audit, Iterable[T]] = ZIO.serviceWithZIO[Audit](_.fetchResults(query)(fn))
def executeQuery(query: String): RIO[Audit, Unit] = ZIO.serviceWithZIO(_.executeQuery(query))
}
Expand Down
26 changes: 8 additions & 18 deletions modules/core/src/main/scala/etlflow/audit/Console.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,22 @@ object Console extends Audit {
override def logTaskStart(
taskRunId: String,
taskName: String,
props: Map[String, String],
props: String,
taskType: String
): UIO[Unit] = ZIO.logInfo(s"Task $taskName started")

override def logTaskEnd(
taskRunId: String,
taskName: String,
props: Map[String, String],
taskType: String,
error: Option[Throwable]
): UIO[Unit] = error.fold {
ZIO.logInfo(s"Task $taskName completed successfully")
override def logTaskEnd(taskRunId: String, error: Option[Throwable]): UIO[Unit] = error.fold {
ZIO.logInfo(s"Task $taskRunId completed successfully")
} { ex =>
ZIO.logError(s"Task $taskName failed, Error StackTrace:" + "\n" + ex.getStackTrace.mkString("\n"))
ZIO.logError(s"Task failed, Error StackTrace:" + "\n" + ex.getStackTrace.mkString("\n"))
}

override def logJobStart(jobName: String, props: Map[String, String]): UIO[Unit] =
override def logJobStart(jobName: String, props: String): UIO[Unit] =
ZIO.logInfo(s"Job $jobName started")

override def logJobEnd(
jobName: String,
props: Map[String, String],
error: Option[Throwable]
): UIO[Unit] = error.fold {
ZIO.logInfo(s"Job $jobName completed with success")
override def logJobEnd(error: Option[Throwable]): UIO[Unit] = error.fold {
ZIO.logInfo(s"Job completed with success")
} { ex =>
ZIO.logError(s"Job $jobName completed with failure ${ex.getMessage}")
ZIO.logError(s"Job completed with failure ${ex.getMessage}")
}
}
58 changes: 23 additions & 35 deletions modules/core/src/main/scala/etlflow/audit/Memory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ case class Memory(jobRunId: String) extends Audit with ApplicationLogger {
override def logTaskStart(
taskRunId: String,
taskName: String,
props: Map[String, String],
props: String,
taskType: String
): UIO[Unit] =
for {
Expand All @@ -25,44 +25,32 @@ case class Memory(jobRunId: String) extends Audit with ApplicationLogger {
}
} yield ()

override def logTaskEnd(
taskRunId: String,
taskName: String,
props: Map[String, String],
taskType: String,
error: Option[Throwable]
): UIO[Unit] =
for {
stateRef <- state
_ <- stateRef.update { st =>
error.fold {
st.update(taskRunId, st(taskRunId).copy(status = Status.Succeed, end_time = Some(DateTimeApi.getCurrentTimestamp)))
} { ex =>
st.update(taskRunId, st(taskRunId).copy(status = Status.Failed(ex), end_time = Some(DateTimeApi.getCurrentTimestamp)))
}
st
override def logTaskEnd(taskRunId: String, error: Option[Throwable]): UIO[Unit] = for {
stateRef <- state
_ <- stateRef.update { st =>
error.fold {
st.update(taskRunId, st(taskRunId).copy(status = Status.Succeed, end_time = Some(DateTimeApi.getCurrentTimestamp)))
} { ex =>
st.update(taskRunId, st(taskRunId).copy(status = Status.Failed(ex), end_time = Some(DateTimeApi.getCurrentTimestamp)))
}
} yield ()
st
}
} yield ()

override def logJobStart(jobName: String, props: Map[String, String]): UIO[Unit] =
override def logJobStart(jobName: String, props: String): UIO[Unit] =
ZIO.succeed(logger.info(s"Job $jobName started"))

override def logJobEnd(
jobName: String,
props: Map[String, String],
error: Option[Throwable]
): UIO[Unit] =
for {
stateRef <- state
value <- stateRef.get
_ = error.fold {
logger.info(s"Job completed with success")
value.values.toList.sortBy(_.start_time).foreach(x => logger.info(x.toString()))
} { ex =>
logger.error(s"Job completed with failure ${ex.getMessage}")
value.values.toList.sortBy(_.start_time).foreach(x => logger.info(x.toString()))
}
} yield ()
override def logJobEnd(error: Option[Throwable]): UIO[Unit] = for {
stateRef <- state
value <- stateRef.get
_ = error.fold {
logger.info(s"Job completed with success")
value.values.toList.sortBy(_.start_time).foreach(x => logger.info(x.toString()))
} { ex =>
logger.error(s"Job completed with failure ${ex.getMessage}")
value.values.toList.sortBy(_.start_time).foreach(x => logger.info(x.toString()))
}
} yield ()
}

object Memory {
Expand Down
Loading

0 comments on commit 513eaf8

Please sign in to comment.