diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3d2371182..d1342466d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,9 @@
-# Version 0.21.0 - 2033-01-26
+# Version 0.21.1 - 2022-01-28
+
+* flowexec now returns different exit codes depending on the processing result
+
+
+# Version 0.21.0 - 2022-01-26
* Fix wrong dependencies in Swagger plugin
* Implement basic schema inference for local CSV files
diff --git a/docker/pom.xml b/docker/pom.xml
index 5c7b3f493..215e56311 100644
--- a/docker/pom.xml
+++ b/docker/pom.xml
@@ -10,7 +10,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../pom.xml
diff --git a/docs/cli/flowexec.md b/docs/cli/flowexec.md
index 8ca357edb..be9be66d5 100644
--- a/docs/cli/flowexec.md
+++ b/docs/cli/flowexec.md
@@ -15,6 +15,19 @@ or for inspecting individual entities.
* `--spark-name ` Sets the Spark application name
+## Exit Codes
+
+`flowexec` provides different exit codes depending on the result of the execution
+
+| exit code | description |
+|-----------|--------------------------------------------------------------------------------|
+| 0 | Everything worked out nicely, no error. This includes skipped |
+| 2 | There were individual errors, but the run was successful (Success with Errors) |
+| 3 | There were execution errors |
+| 4 | The command line was not correct |
+| 5 | An uncaught exception occurred |
+
+
## Project Commands
The most important command group is for executing a specific lifecycle or a individual phase for the whole project.
```shell script
diff --git a/flowman-client/pom.xml b/flowman-client/pom.xml
index 13ae3af03..34560b135 100644
--- a/flowman-client/pom.xml
+++ b/flowman-client/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../pom.xml
diff --git a/flowman-common/pom.xml b/flowman-common/pom.xml
index 116726ff2..be15351d6 100644
--- a/flowman-common/pom.xml
+++ b/flowman-common/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../pom.xml
diff --git a/flowman-core/pom.xml b/flowman-core/pom.xml
index 7cc1eb875..d7d196e80 100644
--- a/flowman-core/pom.xml
+++ b/flowman-core/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../pom.xml
diff --git a/flowman-core/src/main/scala/com/dimajix/flowman/execution/AssertionRunner.scala b/flowman-core/src/main/scala/com/dimajix/flowman/execution/AssertionRunner.scala
index 751ab717b..805c858d9 100644
--- a/flowman-core/src/main/scala/com/dimajix/flowman/execution/AssertionRunner.scala
+++ b/flowman-core/src/main/scala/com/dimajix/flowman/execution/AssertionRunner.scala
@@ -53,23 +53,9 @@ class AssertionRunner(
execution.monitorAssertion(assertion) { execution =>
if (!error || keepGoing) {
val result = executeAssertion(execution, assertion, dryRun)
- val success = result.success
- error |= !success
+ error |= !result.success
- val description = result.description.getOrElse(result.name)
- if (result.exception.nonEmpty) {
- val ex = result.exception.get
- logger.error(s" ✘ exception: $description: ${ex.getMessage}")
- }
- else if (!success) {
- logger.error(red(s" ✘ failed: $description"))
- result.children.filter(_.failure).foreach { result =>
- logger.error(red(s" ✘ failed ${result.name}"))
- }
- }
- else {
- logger.info(green(s" ✓ passed: $description"))
- }
+ logResult(result)
result
}
@@ -83,6 +69,27 @@ class AssertionRunner(
}
}
+ private def logResult(result:AssertionResult) : Unit = {
+ val description = result.description.getOrElse(result.name)
+ result.exception match {
+ case Some(ex) =>
+ logger.error(s" ✘ exception $description: ${ex.getMessage}")
+ case None if (!result.success) =>
+ logger.error(red(s" ✘ failed: $description"))
+ // If an error occured, walk through the children to find a possible exception or failure to display
+ result.children.filter(_.failure).foreach { result =>
+ result.exception match {
+ case Some(ex) =>
+ logger.error(red(s" ✘ exception ${result.name}: ${ex.getMessage}"))
+ case None =>
+ logger.error(red(s" ✘ failed ${result.name}"))
+ }
+ }
+ case None =>
+ logger.info(green(s" ✓ passed: $description"))
+ }
+ }
+
private def executeAssertion(execution:Execution, assertion: Assertion, dryRun:Boolean) : AssertionResult = {
val startTime = Instant.now()
try {
diff --git a/flowman-core/src/main/scala/com/dimajix/flowman/execution/Runner.scala b/flowman-core/src/main/scala/com/dimajix/flowman/execution/Runner.scala
index 429ab1863..549c99b83 100644
--- a/flowman-core/src/main/scala/com/dimajix/flowman/execution/Runner.scala
+++ b/flowman-core/src/main/scala/com/dimajix/flowman/execution/Runner.scala
@@ -81,7 +81,7 @@ private[execution] sealed class RunnerImpl {
case Status.SUCCESS =>
logger.info(green(s"Successfully finished phase '$phase' for target '${target.identifier}' in ${fmt(duration)}"))
case Status.SUCCESS_WITH_ERRORS =>
- logger.info(yellow(s"Successfully finished phase '$phase' for target '${target.identifier}' with errors in ${fmt(duration)}"))
+ logger.warn(yellow(s"Successfully finished phase '$phase' for target '${target.identifier}' with errors in ${fmt(duration)}"))
case Status.SKIPPED =>
logger.info(green(s"Skipped phase '$phase' for target '${target.identifier}'"))
case Status.FAILED if result.exception.nonEmpty =>
@@ -116,8 +116,9 @@ private[execution] sealed class RunnerImpl {
result
}
- private val lineSize = 109
- private val separator = boldWhite(StringUtils.repeat('-', lineSize))
+ protected val lineSize = 109
+ protected val separator = boldWhite(StringUtils.repeat('-', lineSize))
+ protected val doubleSeparator = boldWhite(StringUtils.repeat('=', lineSize))
def logSubtitle(s:String) : Unit = {
val l = (lineSize - 2 - s.length) / 2
val t = if (l > 3) {
@@ -149,7 +150,7 @@ private[execution] sealed class RunnerImpl {
logger.info("")
}
- def logStatus(title:String, status:Status, duration: Duration, endTime:Instant) : Unit = {
+ def logStatus(title:String, status:Status, duration: Duration, endTime:Instant, double:Boolean=false) : Unit = {
val msg = status match {
case Status.SUCCESS|Status.SKIPPED =>
boldGreen(s"${status.upper} $title")
@@ -163,27 +164,46 @@ private[execution] sealed class RunnerImpl {
boldRed(s"UNKNOWN STATE '$status' in $title. Assuming failure")
}
- logger.info(separator)
+ val sep = if (double) doubleSeparator else separator
+ logger.info(sep)
logger.info(msg)
- logger.info(separator)
+ logger.info(sep)
logger.info(s"Total time: ${fmt(duration)}")
logger.info(s"Finished at: ${endTime.atZone(ZoneId.systemDefault())}")
- logger.info(separator)
+ logger.info(sep)
}
- def logResult(title:String, result:Result[_]) : Unit = {
- logger.info(separator)
- logger.info(boldWhite(s"Execution summary for ${result.category.lower} '${result.identifier}'"))
- logger.info("")
- for (child <- result.children) {
- val name = child.identifier.toString
- val status = s"${this.status(child.status)} [${StringUtils.leftPad(fmt(child.duration), 10)}]"
- val dots = StringUtils.repeat('.', lineSize - child.status.upper.length - name.length - 15)
- logger.info(s"$name $dots $status")
+ def logJobResult(title:String, result:JobResult) : Unit = {
+ if (result.children.length > 1) {
+ logger.info(separator)
+ logger.info(boldWhite(s"Execution summary for ${result.category.lower} '${result.identifier}'"))
+ logger.info("")
+ for (child <- result.children) {
+ val name = child.identifier.toString
+ val status = s"${this.status(child.status)} [${StringUtils.leftPad(fmt(child.duration), 10)}]"
+ val dots = StringUtils.repeat('.', lineSize - child.status.upper.length - name.length - 15)
+ logger.info(s"$name $dots $status")
+ }
}
logStatus(title, result.status, result.duration, result.endTime)
}
+ def logLifecycleResult(title:String, result:LifecycleResult) : Unit = {
+ logger.info("")
+ if (result.children.length > 1) {
+ logger.info(doubleSeparator)
+ logger.info(boldWhite(s"Overall lifecycle summary for ${result.category.lower} '${result.identifier}'"))
+ logger.info("")
+ for (child <- result.children) {
+ val name = s"Phase ${child.phase.upper}"
+ val status = s"${this.status(child.status)} [${StringUtils.leftPad(fmt(child.duration), 10)}]"
+ val dots = StringUtils.repeat('.', lineSize - child.status.upper.length - name.length - 15)
+ logger.info(s"$name $dots $status")
+ }
+ }
+ logStatus(title, result.status, result.duration, result.endTime, double=true)
+ }
+
private def status(status:Status) : String = {
status match {
case Status.SUCCESS|Status.SKIPPED => boldGreen(status.upper)
@@ -231,12 +251,17 @@ private[execution] final class JobRunnerImpl(runner:Runner) extends RunnerImpl {
require(phases != null)
require(args != null)
+ logger.info("")
+ logger.info(separator)
+ logger.info(s"Executing phases ${phases.map(p => "'" + p + "'").mkString(",")} for job '${job.identifier}'")
+
val startTime = Instant.now()
val isolated2 = isolated || job.parameters.nonEmpty || job.environment.nonEmpty
withExecution(parentExecution, isolated2) { execution =>
runner.withJobContext(job, args, Some(execution), force, dryRun, isolated2) { (context, arguments) =>
+ val title = s"lifecycle for job '${job.identifier}' ${arguments.map(kv => kv._1 + "=" + kv._2).mkString(", ")}"
val listeners = if (!dryRun) stateStoreListener +: (runner.hooks ++ job.hooks).map(_.instantiate(context)) else Seq()
- execution.withListeners(listeners) { execution =>
+ val result = execution.withListeners(listeners) { execution =>
execution.monitorLifecycle(job, arguments, phases) { execution =>
val results = Result.flatMap(phases, keepGoing) { phase =>
// Check if build phase really contains any active target. Otherwise we skip this phase and mark it
@@ -263,6 +288,9 @@ private[execution] final class JobRunnerImpl(runner:Runner) extends RunnerImpl {
LifecycleResult(job, instance, results, startTime)
}
}
+
+ logLifecycleResult(title, result)
+ result
}
}
}
@@ -302,7 +330,7 @@ private[execution] final class JobRunnerImpl(runner:Runner) extends RunnerImpl {
}
}
- logResult(title, result)
+ logJobResult(title, result)
result
}
}
@@ -582,7 +610,6 @@ final class Runner(
require(phases != null)
require(args != null)
- logger.info(s"Executing phases ${phases.map(p => "'" + p + "'").mkString(",")} for job '${job.identifier}'")
val runner = new JobRunnerImpl(this)
val result = runner.executeJob(job, phases, args, targets, dirtyTargets=dirtyTargets, force=force, keepGoing=keepGoing, dryRun=dryRun, isolated=isolated)
result.status
diff --git a/flowman-core/src/main/scala/com/dimajix/flowman/model/result.scala b/flowman-core/src/main/scala/com/dimajix/flowman/model/result.scala
index d9ea3755a..b76fd0f0d 100644
--- a/flowman-core/src/main/scala/com/dimajix/flowman/model/result.scala
+++ b/flowman-core/src/main/scala/com/dimajix/flowman/model/result.scala
@@ -124,7 +124,7 @@ object LifecycleResult {
startTime=startTime,
endTime=Instant.now()
)
- def apply(job:Job, lifecycle: JobLifecycle, children : Seq[Result[_]], startTime:Instant) : LifecycleResult =
+ def apply(job:Job, lifecycle: JobLifecycle, children : Seq[JobResult], startTime:Instant) : LifecycleResult =
LifecycleResult(
job,
lifecycle,
@@ -147,7 +147,7 @@ object LifecycleResult {
final case class LifecycleResult(
job: Job,
lifecycle: JobLifecycle,
- override val children : Seq[Result[_]],
+ override val children : Seq[JobResult],
override val status: Status,
override val exception: Option[Throwable] = None,
override val startTime : Instant,
diff --git a/flowman-dist/pom.xml b/flowman-dist/pom.xml
index 674e7dc8a..b7ae5ddd8 100644
--- a/flowman-dist/pom.xml
+++ b/flowman-dist/pom.xml
@@ -10,7 +10,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../pom.xml
diff --git a/flowman-dsl/pom.xml b/flowman-dsl/pom.xml
index 1515cd655..9f05c9891 100644
--- a/flowman-dsl/pom.xml
+++ b/flowman-dsl/pom.xml
@@ -9,7 +9,7 @@
flowman-root
com.dimajix.flowman
- 0.21.0
+ 0.21.1
../pom.xml
diff --git a/flowman-hub/pom.xml b/flowman-hub/pom.xml
index f1ca19e4a..af6f160ec 100644
--- a/flowman-hub/pom.xml
+++ b/flowman-hub/pom.xml
@@ -9,7 +9,7 @@
flowman-root
com.dimajix.flowman
- 0.21.0
+ 0.21.1
../pom.xml
diff --git a/flowman-parent/pom.xml b/flowman-parent/pom.xml
index 927e555cd..9dd5f540f 100644
--- a/flowman-parent/pom.xml
+++ b/flowman-parent/pom.xml
@@ -10,7 +10,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../pom.xml
diff --git a/flowman-plugins/aws/pom.xml b/flowman-plugins/aws/pom.xml
index e4d6b174a..c50019433 100644
--- a/flowman-plugins/aws/pom.xml
+++ b/flowman-plugins/aws/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../../pom.xml
diff --git a/flowman-plugins/azure/pom.xml b/flowman-plugins/azure/pom.xml
index 61b557e0f..3939100dd 100644
--- a/flowman-plugins/azure/pom.xml
+++ b/flowman-plugins/azure/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../../pom.xml
diff --git a/flowman-plugins/delta/pom.xml b/flowman-plugins/delta/pom.xml
index 4f2ddb4f5..18e1d1abc 100644
--- a/flowman-plugins/delta/pom.xml
+++ b/flowman-plugins/delta/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../../pom.xml
diff --git a/flowman-plugins/impala/pom.xml b/flowman-plugins/impala/pom.xml
index 0a66e5e4e..7cb2969c6 100644
--- a/flowman-plugins/impala/pom.xml
+++ b/flowman-plugins/impala/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../../pom.xml
diff --git a/flowman-plugins/json/pom.xml b/flowman-plugins/json/pom.xml
index 03936bbe9..fc0d5c863 100644
--- a/flowman-plugins/json/pom.xml
+++ b/flowman-plugins/json/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../../pom.xml
diff --git a/flowman-plugins/kafka/pom.xml b/flowman-plugins/kafka/pom.xml
index 90942d7e8..28d0f3eb8 100644
--- a/flowman-plugins/kafka/pom.xml
+++ b/flowman-plugins/kafka/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../../pom.xml
diff --git a/flowman-plugins/mariadb/pom.xml b/flowman-plugins/mariadb/pom.xml
index 3639182d2..3d0321fe1 100644
--- a/flowman-plugins/mariadb/pom.xml
+++ b/flowman-plugins/mariadb/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../../pom.xml
diff --git a/flowman-plugins/mssqlserver/pom.xml b/flowman-plugins/mssqlserver/pom.xml
index d8583839e..a2caaabb1 100644
--- a/flowman-plugins/mssqlserver/pom.xml
+++ b/flowman-plugins/mssqlserver/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../../pom.xml
diff --git a/flowman-plugins/mysql/pom.xml b/flowman-plugins/mysql/pom.xml
index 077903e1f..ca161e973 100644
--- a/flowman-plugins/mysql/pom.xml
+++ b/flowman-plugins/mysql/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../../pom.xml
diff --git a/flowman-plugins/openapi/pom.xml b/flowman-plugins/openapi/pom.xml
index b72846b1f..de43703a8 100644
--- a/flowman-plugins/openapi/pom.xml
+++ b/flowman-plugins/openapi/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../../pom.xml
diff --git a/flowman-plugins/swagger/pom.xml b/flowman-plugins/swagger/pom.xml
index 5ddf0850d..490df8902 100644
--- a/flowman-plugins/swagger/pom.xml
+++ b/flowman-plugins/swagger/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../../pom.xml
diff --git a/flowman-scalatest-compat/pom.xml b/flowman-scalatest-compat/pom.xml
index 64abce90d..c7c0931b1 100644
--- a/flowman-scalatest-compat/pom.xml
+++ b/flowman-scalatest-compat/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../pom.xml
diff --git a/flowman-server-ui/pom.xml b/flowman-server-ui/pom.xml
index d873e893e..9bda56927 100644
--- a/flowman-server-ui/pom.xml
+++ b/flowman-server-ui/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../pom.xml
diff --git a/flowman-server/pom.xml b/flowman-server/pom.xml
index 70a1a97ab..117c4fcc5 100644
--- a/flowman-server/pom.xml
+++ b/flowman-server/pom.xml
@@ -9,7 +9,7 @@
flowman-root
com.dimajix.flowman
- 0.21.0
+ 0.21.1
../pom.xml
diff --git a/flowman-spark-extensions/pom.xml b/flowman-spark-extensions/pom.xml
index f6ab81f0f..71f51e441 100644
--- a/flowman-spark-extensions/pom.xml
+++ b/flowman-spark-extensions/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../pom.xml
diff --git a/flowman-spark-testing/pom.xml b/flowman-spark-testing/pom.xml
index 6b75e4b87..be280c407 100644
--- a/flowman-spark-testing/pom.xml
+++ b/flowman-spark-testing/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../pom.xml
diff --git a/flowman-spec/pom.xml b/flowman-spec/pom.xml
index cf92f552f..9046bbeda 100644
--- a/flowman-spec/pom.xml
+++ b/flowman-spec/pom.xml
@@ -9,7 +9,7 @@
flowman-root
com.dimajix.flowman
- 0.21.0
+ 0.21.1
../pom.xml
diff --git a/flowman-spec/src/main/scala/com/dimajix/flowman/spec/target/ValidateTarget.scala b/flowman-spec/src/main/scala/com/dimajix/flowman/spec/target/ValidateTarget.scala
index 32d7768d2..502066bc8 100644
--- a/flowman-spec/src/main/scala/com/dimajix/flowman/spec/target/ValidateTarget.scala
+++ b/flowman-spec/src/main/scala/com/dimajix/flowman/spec/target/ValidateTarget.scala
@@ -113,11 +113,12 @@ case class ValidateTarget(
val status = Status.ofAll(result.map(_.status))
if (!status.success) {
- logger.error(s"Validation $identifier failed.")
- if (errorMode != ErrorMode.FAIL_NEVER) {
- TargetResult(this, Phase.VALIDATE, result, new ValidationFailedException(identifier), startTime)
+ if (errorMode != ErrorMode.FAIL_NEVER || result.exists(_.numExceptions > 0)) {
+ logger.error(s"Validation '$identifier' failed.")
+ TargetResult(this, Phase.VALIDATE, result, new ValidationFailedException(identifier, result.flatMap(_.exception).headOption.orNull), startTime)
}
else {
+ logger.error(s"Validation '$identifier' failed - the result is marked as 'success with errors'.")
TargetResult(this, Phase.VALIDATE, result, Status.SUCCESS_WITH_ERRORS, startTime)
}
}
diff --git a/flowman-spec/src/main/scala/com/dimajix/flowman/spec/target/VerifyTarget.scala b/flowman-spec/src/main/scala/com/dimajix/flowman/spec/target/VerifyTarget.scala
index 8f01e309b..0214718ac 100644
--- a/flowman-spec/src/main/scala/com/dimajix/flowman/spec/target/VerifyTarget.scala
+++ b/flowman-spec/src/main/scala/com/dimajix/flowman/spec/target/VerifyTarget.scala
@@ -113,11 +113,12 @@ case class VerifyTarget(
val status = Status.ofAll(result.map(_.status))
if (!status.success) {
- logger.error(s"Verification $identifier failed.")
- if (errorMode != ErrorMode.FAIL_NEVER) {
- TargetResult(this, Phase.VERIFY, result, new VerificationFailedException(identifier), startTime)
+ if (errorMode != ErrorMode.FAIL_NEVER || result.exists(_.numExceptions > 0)) {
+ logger.error(s"Verification '$identifier' failed.")
+ TargetResult(this, Phase.VERIFY, result, new VerificationFailedException(identifier, result.flatMap(_.exception).headOption.orNull), startTime)
}
else {
+ logger.error(s"Verification '$identifier' failed - the result is marked as 'success with errors'.")
TargetResult(this, Phase.VERIFY, result, Status.SUCCESS_WITH_ERRORS, startTime)
}
}
diff --git a/flowman-spec/src/test/scala/com/dimajix/flowman/spec/target/ValidateTargetTest.scala b/flowman-spec/src/test/scala/com/dimajix/flowman/spec/target/ValidateTargetTest.scala
index 2c8339212..e4d3cad8d 100644
--- a/flowman-spec/src/test/scala/com/dimajix/flowman/spec/target/ValidateTargetTest.scala
+++ b/flowman-spec/src/test/scala/com/dimajix/flowman/spec/target/ValidateTargetTest.scala
@@ -16,6 +16,8 @@
package com.dimajix.flowman.spec.target
+import java.time.Instant
+
import scala.collection.immutable.ListMap
import org.scalamock.scalatest.MockFactory
@@ -286,4 +288,60 @@ class ValidateTargetTest extends AnyFlatSpec with Matchers with MockFactory {
result.numExceptions should be (0)
result.children.size should be (2)
}
+
+ it should "throw an exception if fail_never is used, but an exception is thrown inside an assertion" in {
+ val session = Session.builder.disableSpark().build()
+ val execution = session.execution
+ val context = session.context
+
+ val assertion1 = mock[Assertion]
+ val assertion2 = mock[Assertion]
+ val target = ValidateTarget(
+ Target.Properties(context),
+ ListMap(
+ "a1" -> assertion1,
+ "a2" -> assertion2
+ ),
+ errorMode = ErrorMode.FAIL_NEVER
+ )
+
+ (assertion1.requires _).expects().returns(Set())
+ (assertion1.inputs _).expects().atLeastOnce().returns(Seq())
+ (assertion1.name _).expects().returns("a1")
+ (assertion1.description _).expects().returns(None)
+ (assertion1.context _).expects().returns(context)
+ (assertion1.execute _).expects(*,*).returns(
+ AssertionResult(
+ assertion1,
+ new IllegalArgumentException("Some error"),
+ Instant.now()
+ )
+ )
+
+ (assertion2.requires _).expects().returns(Set())
+ (assertion2.inputs _).expects().atLeastOnce().returns(Seq())
+ (assertion2.name _).expects().returns("a2")
+ (assertion2.description _).expects().returns(None)
+ (assertion2.context _).expects().returns(context)
+ (assertion2.execute _).expects(*,*).returns(
+ AssertionResult(assertion2, Seq(AssertionTestResult("a3", None, true)))
+ )
+
+ target.phases should be (Set(Phase.VALIDATE))
+ target.requires(Phase.VALIDATE) should be (Set())
+ target.provides(Phase.VALIDATE) should be (Set())
+ target.before should be (Seq())
+ target.after should be (Seq())
+
+ target.dirty(execution, Phase.VALIDATE) should be (Yes)
+ val result = target.execute(execution, Phase.VALIDATE)
+ result.target should be (target)
+ result.phase should be (Phase.VALIDATE)
+ result.status should be (Status.FAILED)
+ result.exception.get shouldBe a[ValidationFailedException]
+ result.numFailures should be (1)
+ result.numSuccesses should be (1)
+ result.numExceptions should be (2)
+ result.children.size should be (2)
+ }
}
diff --git a/flowman-studio-ui/pom.xml b/flowman-studio-ui/pom.xml
index 6b52a83af..51cfee95e 100644
--- a/flowman-studio-ui/pom.xml
+++ b/flowman-studio-ui/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../pom.xml
diff --git a/flowman-studio/pom.xml b/flowman-studio/pom.xml
index 9fc83ce3c..ab2ab9fd5 100644
--- a/flowman-studio/pom.xml
+++ b/flowman-studio/pom.xml
@@ -9,7 +9,7 @@
flowman-root
com.dimajix.flowman
- 0.21.0
+ 0.21.1
../pom.xml
diff --git a/flowman-testing/pom.xml b/flowman-testing/pom.xml
index edf3e3898..b2e3f6869 100644
--- a/flowman-testing/pom.xml
+++ b/flowman-testing/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../pom.xml
diff --git a/flowman-tools/pom.xml b/flowman-tools/pom.xml
index 033fe5dbc..e04344483 100644
--- a/flowman-tools/pom.xml
+++ b/flowman-tools/pom.xml
@@ -9,7 +9,7 @@
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
../pom.xml
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/Command.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/Command.scala
index 6886c7686..18d92f2fb 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/Command.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/Command.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@ import org.kohsuke.args4j.Option
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
@@ -50,5 +51,5 @@ abstract class Command {
out.println
}
- def execute(session: Session, project:Project, context:Context) : Boolean
+ def execute(session: Session, project:Project, context:Context) : Status
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/Driver.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/Driver.scala
index 943d037f2..46b8138a5 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/Driver.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/Driver.scala
@@ -34,6 +34,7 @@ import com.dimajix.flowman.SPARK_BUILD_VERSION
import com.dimajix.flowman.common.Logging
import com.dimajix.flowman.common.ToolConfig
import com.dimajix.flowman.common.ParserUtils.splitSettings
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.tools.Tool
import com.dimajix.flowman.util.ConsoleColors
import com.dimajix.flowman.util.ConsoleColors.yellow
@@ -47,22 +48,26 @@ object Driver {
run(args:_*)
}
match {
- case Success (true) =>
+ case Success (Status.SUCCESS) =>
System.exit(0)
- case Success (false) =>
- System.exit(1)
+ case Success (Status.SKIPPED) =>
+ System.exit(0)
+ case Success (Status.SUCCESS_WITH_ERRORS) =>
+ System.exit(2)
+ case Success (_) =>
+ System.exit(3)
case Failure(ex:CmdLineException) =>
System.err.println(ex.getMessage)
ex.getParser.printUsage(System.err)
System.err.println
- System.exit(1)
+ System.exit(4)
case Failure(exception) =>
exception.printStackTrace(System.err)
- System.exit(1)
+ System.exit(5)
}
}
- def run(args: String*) : Boolean = {
+ def run(args: String*) : Status = {
val options = new Arguments(args.toArray)
// Check if only help or version is requested
if (options.version) {
@@ -72,11 +77,11 @@ object Driver {
println(s"Hadoop version $HADOOP_VERSION")
println(s"Scala version $SCALA_VERSION")
println(s"Java version $JAVA_VERSION")
- true
+ Status.SUCCESS
}
else if (options.help) {
options.printHelp(System.out)
- true
+ Status.SUCCESS
}
else {
Logging.setSparkLogging(options.sparkLogging)
@@ -95,14 +100,14 @@ class Driver(options:Arguments) extends Tool {
* Main method for running this command
* @return
*/
- def run() : Boolean = {
+ def run() : Status = {
// Disable colors in batch mode
ConsoleColors.disabled = options.batchMode
val command = options.command
if (command.help) {
command.printHelp(System.out)
- true
+ Status.SUCCESS
}
else {
// Create Flowman Session, which also includes a Spark Session
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/NestedCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/NestedCommand.scala
index 6b07b4761..5e899f104 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/NestedCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/NestedCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@ import org.kohsuke.args4j.CmdLineParser
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
@@ -48,7 +49,7 @@ abstract class NestedCommand extends Command {
}
- override def execute(session: Session, project:Project, context:Context) : Boolean = {
+ override def execute(session: Session, project:Project, context:Context) : Status = {
command.execute(session, project, context)
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/VersionCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/VersionCommand.scala
index 0635eba19..4d9923df4 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/VersionCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/VersionCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2020-2021 Kaya Kupferschmidt
+ * Copyright 2020-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,11 +27,12 @@ import com.dimajix.flowman.SPARK_VERSION
import com.dimajix.flowman.common.ToolConfig
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
class VersionCommand extends Command {
- override def execute(session: Session, project: Project, context: Context): Boolean = {
+ override def execute(session: Session, project: Project, context: Context): Status = {
println(s"Flowman version $FLOWMAN_VERSION")
println(s"Flowman home directory ${ToolConfig.homeDirectory.getOrElse("")}")
println(s"Flowman config directory ${ToolConfig.confDirectory.getOrElse("")}")
@@ -39,6 +40,6 @@ class VersionCommand extends Command {
println(s"Hadoop version $HADOOP_VERSION (build for $HADOOP_BUILD_VERSION)")
println(s"Scala version $SCALA_VERSION (build for $SCALA_BUILD_VERSION)")
println(s"Java version $JAVA_VERSION")
- true
+ Status.SUCCESS
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/history/HistoryCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/history/HistoryCommand.scala
index 422e1f1f8..eb367aaa4 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/history/HistoryCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/history/HistoryCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Kaya Kupferschmidt
+ * Copyright 2020-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/history/InspectJobHistoryCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/history/InspectJobHistoryCommand.scala
index 36a3a270d..85c19d3cc 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/history/InspectJobHistoryCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/history/InspectJobHistoryCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Kaya Kupferschmidt
+ * Copyright 2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@ import org.slf4j.LoggerFactory
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.history.JobQuery
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
@@ -32,7 +33,7 @@ class InspectJobHistoryCommand extends Command {
@Argument(usage = "Job run ID", metaVar = "", required = true)
var jobId: String = ""
- override def execute(session: Session, project: Project, context: Context): Boolean = {
+ override def execute(session: Session, project: Project, context: Context): Status = {
val query = JobQuery(
id = Seq(jobId)
)
@@ -56,10 +57,10 @@ class InspectJobHistoryCommand extends Command {
metrics.foreach { m =>
println(s" ${m.name} ts=${m.ts} labels=${m.labels.map(kv => kv._1 + "=" + kv._2).mkString("(",",",")")} value=${m.value}")
}
- true
+ Status.SUCCESS
case None =>
logger.error(s"Cannot find job run with id '$jobId'")
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/history/InspectTargetHistoryCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/history/InspectTargetHistoryCommand.scala
index 8d08b15bd..3ff59c0f9 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/history/InspectTargetHistoryCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/history/InspectTargetHistoryCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Kaya Kupferschmidt
+ * Copyright 2021-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@ import org.slf4j.LoggerFactory
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.history.JobQuery
import com.dimajix.flowman.history.TargetQuery
import com.dimajix.flowman.model.Project
@@ -33,7 +34,7 @@ class InspectTargetHistoryCommand extends Command {
@Argument(usage = "Target run ID", metaVar = "", required = true)
var targetId: String = ""
- override def execute(session: Session, project: Project, context: Context): Boolean = {
+ override def execute(session: Session, project: Project, context: Context): Status = {
val query = TargetQuery(
id = Seq(targetId)
)
@@ -53,10 +54,10 @@ class InspectTargetHistoryCommand extends Command {
target.partitions.foreach { m =>
println(s" ${m._1} = ${m._2}")
}
- true
+ Status.SUCCESS
case None =>
logger.error(s"Cannot find target run with id '$targetId'")
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/history/SearchJobHistoryCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/history/SearchJobHistoryCommand.scala
index 2a336e687..22bb81756 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/history/SearchJobHistoryCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/history/SearchJobHistoryCommand.scala
@@ -44,7 +44,7 @@ class SearchJobHistoryCommand extends Command {
@Option(name = "-n", aliases=Array("--limit"), usage = "maximum number of results", metaVar = "")
var limit:Int = 100
- override def execute(session: Session, project: Project, context: Context): Boolean = {
+ override def execute(session: Session, project: Project, context: Context): Status = {
val query = JobQuery(
namespace = session.namespace.map(_.name).toSeq,
project = split(Some(this.project).filter(_.nonEmpty).getOrElse(project.name)),
@@ -55,7 +55,7 @@ class SearchJobHistoryCommand extends Command {
)
val jobs = session.history.findJobs(query, Seq(JobOrder.BY_DATETIME), limit, 0)
ConsoleUtils.showTable(jobs)
- true
+ Status.SUCCESS
}
private def split(arg:String) : Seq[String] = {
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/history/SearchTargetHistoryCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/history/SearchTargetHistoryCommand.scala
index 81abddc36..210bf0b43 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/history/SearchTargetHistoryCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/history/SearchTargetHistoryCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Kaya Kupferschmidt
+ * Copyright 2020-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -45,7 +45,7 @@ class SearchTargetHistoryCommand extends Command {
@Option(name = "-n", aliases=Array("--limit"), usage = "maximum number of results", metaVar = "")
var limit:Int = 100
- override def execute(session: Session, project: Project, context: Context): Boolean = {
+ override def execute(session: Session, project: Project, context: Context): Status = {
val query = TargetQuery(
namespace = session.namespace.map(_.name).toSeq,
project = split(Some(this.project).filter(_.nonEmpty).getOrElse(project.name)),
@@ -57,7 +57,7 @@ class SearchTargetHistoryCommand extends Command {
)
val targets = session.history.findTargets(query, Seq(TargetOrder.BY_DATETIME), limit, 0)
ConsoleUtils.showTable(targets)
- true
+ Status.SUCCESS
}
private def split(arg:String) : Seq[String] = {
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/info/InfoCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/info/InfoCommand.scala
index d14fa8ed4..ff382afa0 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/info/InfoCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/info/InfoCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,12 +21,13 @@ import scala.collection.JavaConverters._
import com.dimajix.flowman.common.ToolConfig
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
class InfoCommand extends Command {
- override def execute(session: Session, project:Project, context:Context): Boolean = {
+ override def execute(session: Session, project:Project, context:Context): Status = {
println(s"Flowman home directory: ${ToolConfig.homeDirectory.getOrElse("")}")
println(s"Flowman config directory: ${ToolConfig.confDirectory.getOrElse("")}")
println(s"Flowman plugin directory: ${ToolConfig.pluginDirectory.getOrElse("")}")
@@ -71,6 +72,6 @@ class InfoCommand extends Command {
.sortBy(_.getKey)
.foreach(kv => println(s" ${kv.getKey}=${kv.getValue}"))
- true
+ Status.SUCCESS
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/job/InspectCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/job/InspectCommand.scala
index 659f927f5..6e9b238aa 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/job/InspectCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/job/InspectCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2020-2021 Kaya Kupferschmidt
+ * Copyright 2020-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@ import com.dimajix.common.ExceptionUtils.reasons
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.NoSuchJobException
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.JobIdentifier
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
@@ -36,7 +37,7 @@ class InspectCommand extends Command {
@Argument(index=0, required=true, usage = "name of job to inspect", metaVar = "")
var job: String = ""
- override def execute(session: Session, project:Project, context:Context): Boolean = {
+ override def execute(session: Session, project:Project, context:Context): Status = {
try {
val job = context.getJob(JobIdentifier(this.job))
println(s"Name: ${job.name}")
@@ -53,15 +54,15 @@ class InspectCommand extends Command {
.toSeq
.sortBy(_._1)
.foreach{ case(k,v) => println(s" $k=$v") }
- true
+ Status.SUCCESS
}
catch {
case ex:NoSuchJobException =>
logger.error(s"Cannot resolve job '${ex.job}'")
- false
+ Status.FAILED
case NonFatal(e) =>
logger.error(s"Error inspecting '$job': ${reasons(e)}")
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/job/ListCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/job/ListCommand.scala
index 13719d50b..1d6180b8f 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/job/ListCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/job/ListCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018-2019 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
@@ -27,8 +28,8 @@ import com.dimajix.flowman.tools.exec.Command
class ListCommand extends Command {
private val logger = LoggerFactory.getLogger(classOf[ListCommand])
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
project.jobs.keys.toList.sorted.foreach(println)
- true
+ Status.SUCCESS
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/job/PhaseCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/job/PhaseCommand.scala
index 666c80ee2..7f6786961 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/job/PhaseCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/job/PhaseCommand.scala
@@ -60,7 +60,7 @@ sealed class PhaseCommand(phase:Phase) extends Command {
@Option(name = "-j", aliases=Array("--jobs"), usage = "number of jobs to run in parallel")
var parallelism: Int = 1
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
val args = splitSettings(this.args).toMap
Try {
context.getJob(JobIdentifier(job))
@@ -68,25 +68,23 @@ sealed class PhaseCommand(phase:Phase) extends Command {
match {
case Failure(e) =>
logger.error(s"Error instantiating job '$job': ${reasons(e)}")
- false
+ Status.FAILED
case Success(job) =>
executeJob(session, job, job.parseArguments(args))
}
}
- private def executeJob(session: Session, job:Job, args:Map[String,FieldValue]) : Boolean = {
+ private def executeJob(session: Session, job:Job, args:Map[String,FieldValue]) : Status = {
val lifecycle =
if (noLifecycle)
Seq(phase)
else
Lifecycle.ofPhase(phase)
- val status = if (parallelism > 1)
+ if (parallelism > 1)
executeParallel(session, job, args, lifecycle)
else
executeLinear(session, job, args, lifecycle)
-
- status.success
}
private def executeLinear(session: Session, job:Job, args:Map[String,FieldValue], lifecycle: Seq[Phase]) : Status = {
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/CacheCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/CacheCommand.scala
index f0a63b318..11ee5113e 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/CacheCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/CacheCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,12 +20,12 @@ import scala.util.control.NonFatal
import org.apache.spark.storage.StorageLevel
import org.kohsuke.args4j.Argument
-import org.kohsuke.args4j.Option
import org.slf4j.LoggerFactory
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.NoSuchMappingException
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.MappingOutputIdentifier
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
@@ -38,7 +38,7 @@ class CacheCommand extends Command {
var mapping: String = ""
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
logger.info(s"Caching mapping '$mapping'")
try {
@@ -48,15 +48,15 @@ class CacheCommand extends Command {
val table = executor.instantiate(instance, id.output)
if (table.storageLevel == StorageLevel.NONE)
table.cache()
- true
+ Status.SUCCESS
}
catch {
case ex:NoSuchMappingException =>
logger.error(s"Cannot resolve mapping '${ex.mapping}'")
- false
+ Status.FAILED
case NonFatal(e) =>
logger.error(s"Caught exception while caching mapping '$mapping", e)
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/CountCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/CountCommand.scala
index 504a21943..319f27d6b 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/CountCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/CountCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -41,19 +41,19 @@ class CountCommand extends Command {
@Argument(usage = "specifies the mapping to count", metaVar = "", required = true)
var mapping: String = ""
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
val task = CountTarget(context, MappingOutputIdentifier(mapping))
task.execute(session.execution, Phase.BUILD).toTry match {
case Success(_) =>
logger.info("Successfully counted mapping")
- true
+ Status.SUCCESS
case Failure(ex:NoSuchMappingException) =>
logger.error(s"Cannot resolve mapping '${ex.mapping}'")
- false
+ Status.FAILED
case Failure(e) =>
logger.error(s"Caught exception while counting mapping '$mapping'", e)
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/DependencyTreeCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/DependencyTreeCommand.scala
index a98150d71..112b9902c 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/DependencyTreeCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/DependencyTreeCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Kaya Kupferschmidt
+ * Copyright 2021-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@ import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.NoSuchMappingException
import com.dimajix.flowman.execution.Phase
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.graph.GraphBuilder
import com.dimajix.flowman.model.MappingIdentifier
import com.dimajix.flowman.model.Project
@@ -37,21 +38,21 @@ class DependencyTreeCommand extends Command {
@Argument(required = true, usage = "specifies mapping to inspect", metaVar = "")
var mapping: String = ""
- override def execute(session: Session, project: Project, context: Context): Boolean = {
+ override def execute(session: Session, project: Project, context: Context): Status = {
try {
val mapping = context.getMapping(MappingIdentifier(this.mapping))
val graph = new GraphBuilder(context, Phase.BUILD).addMapping(mapping).build()
val node = graph.mapping(mapping)
println(node.upstreamDependencyTree)
- true
+ Status.SUCCESS
}
catch {
case ex:NoSuchMappingException =>
logger.error(s"Cannot resolve mapping '${ex.mapping}'")
- false
+ Status.FAILED
case NonFatal(e) =>
logger.error(s"Error '$mapping': ${e.getMessage}")
- false
+ Status.FAILED
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/DescribeCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/DescribeCommand.scala
index 35c17d340..9060439e6 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/DescribeCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/DescribeCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018-2021 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.NoSuchMappingException
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.MappingOutputIdentifier
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
@@ -38,7 +39,7 @@ class DescribeCommand extends Command {
@Argument(usage = "specifies the mapping to describe", metaVar = "", required = true)
var mapping: String = ""
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
try {
val identifier = MappingOutputIdentifier(this.mapping)
val mapping = context.getMapping(identifier.mapping)
@@ -52,15 +53,15 @@ class DescribeCommand extends Command {
val schema = executor.describe(mapping, identifier.output)
schema.printTree()
}
- true
+ Status.SUCCESS
}
catch {
case ex:NoSuchMappingException =>
logger.error(s"Cannot resolve mapping '${ex.mapping}'")
- false
+ Status.FAILED
case NonFatal(e) =>
logger.error(s"Error describing mapping '$mapping'", e)
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/ExplainCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/ExplainCommand.scala
index cb1ace6de..c355ec5a4 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/ExplainCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/ExplainCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018-2021 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.NoSuchMappingException
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.MappingOutputIdentifier
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
@@ -39,7 +40,7 @@ class ExplainCommand extends Command {
var mapping: String = ""
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
logger.info(s"Explaining mapping '$mapping'")
try {
@@ -48,15 +49,15 @@ class ExplainCommand extends Command {
val executor = session.execution
val table = executor.instantiate(instance, id.output)
table.explain(extended)
- true
+ Status.SUCCESS
}
catch {
case ex:NoSuchMappingException =>
logger.error(s"Cannot resolve mapping '${ex.mapping}'")
- false
+ Status.FAILED
case NonFatal(e) =>
logger.error(s"Error explaining mapping '$mapping", e)
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/ExportSchemaCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/ExportSchemaCommand.scala
index eba8e5138..62deea329 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/ExportSchemaCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/ExportSchemaCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,7 +19,6 @@ package com.dimajix.flowman.tools.exec.mapping
import scala.util.Failure
import scala.util.Success
import scala.util.Try
-import scala.util.control.NonFatal
import org.kohsuke.args4j.Argument
import org.kohsuke.args4j.Option
@@ -28,6 +27,7 @@ import org.slf4j.LoggerFactory
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.NoSuchMappingException
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.MappingOutputIdentifier
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
@@ -47,7 +47,7 @@ class ExportSchemaCommand extends Command {
@Argument(usage = "specifies the output filename", metaVar = "", required = true, index = 1)
var filename: String = ""
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
logger.info(s"Exporting the schema of mapping '$mapping' to '$filename'")
Try {
@@ -68,13 +68,13 @@ class ExportSchemaCommand extends Command {
} match {
case Success(_) =>
logger.info("Successfully saved schema")
- true
+ Status.SUCCESS
case Failure(ex:NoSuchMappingException) =>
logger.error(s"Cannot resolve mapping '${ex.mapping}'")
- false
- case Failure(NonFatal(e)) =>
+ Status.FAILED
+ case Failure(e) =>
logger.error(s"Caught exception while save the schema of mapping '$mapping'", e)
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/InspectCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/InspectCommand.scala
index 084b6d5f0..9ca9d8bf7 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/InspectCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/InspectCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Kaya Kupferschmidt
+ * Copyright 2021-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,6 +27,8 @@ import com.dimajix.flowman.tools.exec.Command
import org.kohsuke.args4j.Argument
import org.slf4j.LoggerFactory
+import com.dimajix.flowman.execution.Status
+
class InspectCommand extends Command {
private val logger = LoggerFactory.getLogger(classOf[InspectCommand])
@@ -34,7 +36,7 @@ class InspectCommand extends Command {
@Argument(required = true, usage = "specifies mapping to inspect", metaVar = "")
var mapping: String = ""
- override def execute(session: Session, project: Project, context: Context): Boolean = {
+ override def execute(session: Session, project: Project, context: Context): Status = {
try {
val mapping = context.getMapping(MappingIdentifier(this.mapping))
println("Mapping:")
@@ -50,15 +52,15 @@ class InspectCommand extends Command {
.map(_.toString)
.toSeq.sorted
.foreach{ p => println(s" $p") }
- true
+ Status.SUCCESS
}
catch {
case ex:NoSuchMappingException =>
logger.error(s"Cannot resolve mapping '${ex.mapping}'")
- false
+ Status.FAILED
case NonFatal(e) =>
logger.error(s"Error '$mapping': ${e.getMessage}")
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/ListCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/ListCommand.scala
index df7aa1111..23c9f834a 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/ListCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/ListCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
@@ -27,8 +28,8 @@ import com.dimajix.flowman.tools.exec.Command
class ListCommand extends Command {
private val logger = LoggerFactory.getLogger(classOf[ListCommand])
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
project.mappings.keys.toList.sorted.foreach(println)
- true
+ Status.SUCCESS
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/SaveCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/SaveCommand.scala
index 6a29f9a79..d530b4887 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/SaveCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/SaveCommand.scala
@@ -29,6 +29,7 @@ import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.NoSuchMappingException
import com.dimajix.flowman.execution.Phase
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.MappingOutputIdentifier
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.spec.target.FileTarget
@@ -47,18 +48,18 @@ class SaveCommand extends Command {
@Argument(usage = "specifies the output filename", metaVar = "", required = true, index = 1)
var location: String = ""
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
val task = FileTarget(context, MappingOutputIdentifier(mapping), new Path(location), format, splitSettings(options).toMap)
task.execute(session.execution, Phase.BUILD).toTry match {
- case Success(_) =>
- true
+ case Success(s) =>
+ s
case Failure(ex:NoSuchMappingException) =>
logger.error(s"Cannot resolve mapping '${ex.mapping}'")
- false
+ Status.FAILED
case Failure(e) =>
logger.error(s"Caught exception while save mapping '$mapping'", e)
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/ShowCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/ShowCommand.scala
index 72fd24ede..7b69e11a9 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/ShowCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/ShowCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018-2020 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,8 +18,6 @@ package com.dimajix.flowman.tools.exec.mapping
import scala.util.Failure
import scala.util.Success
-import scala.util.Try
-import scala.util.control.NonFatal
import org.kohsuke.args4j.Argument
import org.kohsuke.args4j.Option
@@ -30,6 +28,7 @@ import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.NoSuchMappingException
import com.dimajix.flowman.execution.Phase
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.MappingOutputIdentifier
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.spec.target.ConsoleTarget
@@ -51,19 +50,19 @@ class ShowCommand extends Command {
var csv: Boolean = false
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
val columns = ParserUtils.parseDelimitedList(this.columns)
val task = ConsoleTarget(context, MappingOutputIdentifier(mapping), limit, columns, !noHeader, csv)
task.execute(session.execution, Phase.BUILD).toTry match {
- case Success(_) =>
- true
+ case Success(s) =>
+ s
case Failure(ex:NoSuchMappingException) =>
logger.error(s"Cannot resolve mapping '${ex.mapping}'")
- false
+ Status.FAILED
case Failure(e) =>
logger.error(s"Caught exception while dumping mapping '$mapping'", e)
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/ValidateCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/ValidateCommand.scala
index 065428469..1eda12612 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/ValidateCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/mapping/ValidateCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.NoSuchMappingException
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.MappingIdentifier
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
@@ -38,7 +39,7 @@ class ValidateCommand extends Command {
@Argument(usage = "specifies mappings to validate", metaVar = "")
var mappings: Array[String] = Array()
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
logger.info("Validating mappings {}", if (mappings != null) mappings.mkString(",") else "all")
// Then execute output operations
@@ -54,16 +55,16 @@ class ValidateCommand extends Command {
} match {
case Success(true) =>
logger.info("Successfully validated mappings")
- true
+ Status.SUCCESS
case Success(false) =>
logger.error("Validation of mappings failed")
- false
+ Status.FAILED
case Failure(ex:NoSuchMappingException) =>
logger.error(s"Cannot resolve mapping '${ex.mapping}'")
- false
- case Failure(NonFatal(e)) =>
+ Status.FAILED
+ case Failure(e) =>
logger.error("Caught exception while validating mapping", e)
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/DescribeCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/DescribeCommand.scala
index 40051351d..0d08cbb2f 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/DescribeCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/DescribeCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,9 +16,6 @@
package com.dimajix.flowman.tools.exec.model
-import scala.util.Failure
-import scala.util.Success
-import scala.util.Try
import scala.util.control.NonFatal
import org.kohsuke.args4j.Argument
@@ -26,9 +23,9 @@ import org.kohsuke.args4j.Option
import org.slf4j.LoggerFactory
import com.dimajix.flowman.execution.Context
-import com.dimajix.flowman.execution.NoSuchJobException
import com.dimajix.flowman.execution.NoSuchRelationException
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.model.RelationIdentifier
import com.dimajix.flowman.tools.exec.Command
@@ -42,7 +39,7 @@ class DescribeCommand extends Command {
@Argument(usage = "specifies the relation to describe", metaVar = "", required = true)
var relation: String = ""
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
try {
val identifier = RelationIdentifier(this.relation)
val relation = context.getRelation(identifier)
@@ -56,14 +53,14 @@ class DescribeCommand extends Command {
val schema = relation.describe(execution)
schema.printTree()
}
- true
+ Status.SUCCESS
} catch {
case ex:NoSuchRelationException =>
logger.error(s"Cannot resolve relation '${ex.relation}'")
- false
+ Status.FAILED
case NonFatal(e) =>
logger.error(s"Error describing relation '$relation':", e)
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/ExportSchemaCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/ExportSchemaCommand.scala
index bfa410f8b..b755eceeb 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/ExportSchemaCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/ExportSchemaCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.model.RelationIdentifier
import com.dimajix.flowman.tools.exec.Command
@@ -43,7 +44,7 @@ class ExportSchemaCommand extends Command {
@Argument(usage = "specifies the output filename", metaVar = "", required = true)
var filename: String = ""
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
logger.info(s"Exporting the schema of model '$relation' to '$filename'")
Try {
@@ -54,10 +55,10 @@ class ExportSchemaCommand extends Command {
} match {
case Success(_) =>
logger.info("Successfully saved schema")
- true
- case Failure(NonFatal(e)) =>
+ Status.SUCCESS
+ case Failure(e) =>
logger.error(s"Caught exception while save the schema of model '$relation'", e)
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/InspectCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/InspectCommand.scala
index 7bf0d5a65..aca63730b 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/InspectCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/InspectCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Kaya Kupferschmidt
+ * Copyright 2021-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@ import org.kohsuke.args4j.Argument
import org.slf4j.LoggerFactory
import com.dimajix.common.ExceptionUtils.reasons
+import com.dimajix.flowman.execution.Status
class InspectCommand extends Command {
@@ -36,7 +37,7 @@ class InspectCommand extends Command {
@Argument(required = true, usage = "specifies relation to inspect", metaVar = "")
var relation: String = ""
- override def execute(session: Session, project: Project, context: Context): Boolean = {
+ override def execute(session: Session, project: Project, context: Context): Status = {
try {
val relation = context.getRelation(RelationIdentifier(this.relation))
println("Relation:")
@@ -52,15 +53,15 @@ class InspectCommand extends Command {
.map(_.toString)
.toSeq.sorted
.foreach{ p => println(s" $p") }
- true
+ Status.SUCCESS
}
catch {
case ex:NoSuchRelationException =>
logger.error(s"Cannot resolve relation '${ex.relation}'")
- false
+ Status.FAILED
case NonFatal(e) =>
logger.error(s"Error inspecting '$relation': ${reasons(e)}")
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/ListCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/ListCommand.scala
index 7701497d3..968caf3e3 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/ListCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/ListCommand.scala
@@ -1,9 +1,26 @@
+/*
+ * Copyright 2021-2022 Kaya Kupferschmidt
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.dimajix.flowman.tools.exec.model
import org.slf4j.LoggerFactory
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
@@ -11,9 +28,9 @@ import com.dimajix.flowman.tools.exec.Command
class ListCommand extends Command {
private val logger = LoggerFactory.getLogger(classOf[ListCommand])
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
project.relations.keys.toList.sorted.foreach(println)
- true
+ Status.SUCCESS
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/PhaseCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/PhaseCommand.scala
index c3a3e688a..c407fad77 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/PhaseCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/PhaseCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018-2021 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@ import com.dimajix.flowman.common.ParserUtils
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Phase
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.MappingOutputIdentifier
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.model.RelationIdentifier
@@ -48,7 +49,7 @@ class PhaseCommand(phase:Phase) extends Command {
@Option(name = "-p", aliases=Array("--partition"), usage = "specify partition to work on, as partition1=value1,partition2=value2")
var partition: String = ""
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
logger.info(s"Executing phase '$phase' for relations ${if (relations != null) relations.mkString(",") else "all"}")
val toRun =
@@ -66,9 +67,7 @@ class PhaseCommand(phase:Phase) extends Command {
}
val runner = session.runner
- val result = runner.executeTargets(targets, Seq(phase), force=force, keepGoing=keepGoing, dryRun=dryRun, isolated=false)
-
- result.success
+ runner.executeTargets(targets, Seq(phase), force=force, keepGoing=keepGoing, dryRun=dryRun, isolated=false)
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/ShowCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/ShowCommand.scala
index ac9245b7c..74293ce7d 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/ShowCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/model/ShowCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018-2020 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@ import com.dimajix.flowman.common.ParserUtils
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Phase
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.model.RelationIdentifier
import com.dimajix.flowman.spec.target.ConsoleTarget
@@ -52,19 +53,17 @@ class ShowCommand extends Command {
@Option(name="-p", aliases=Array("--partition"), usage = "specify partition to work on, as partition1=value1,partition2=value2")
var partition: String = ""
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
val columns = ParserUtils.parseDelimitedList(this.columns)
val partition = ParserUtils.parseDelimitedKeyValues(this.partition).map { case(k,v) => (k,SingleValue(v)) }
val task = ConsoleTarget(context, RelationIdentifier(relation), limit, columns, partition, !noHeader, csv)
- Try {
- task.execute(session.execution, Phase.BUILD).rethrow()
- } match {
- case Success(_) =>
- true
- case Failure(NonFatal(e)) =>
+ task.execute(session.execution, Phase.BUILD).toTry match {
+ case Success(s) =>
+ s
+ case Failure(e) =>
logger.error(s"Caught exception while dumping relation '$relation'", e)
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/namespace/InspectCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/namespace/InspectCommand.scala
index 1c4aaa7fe..7ed255c0c 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/namespace/InspectCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/namespace/InspectCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,12 +18,13 @@ package com.dimajix.flowman.tools.exec.namespace
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
class InspectCommand extends Command {
- override def execute(session: Session, project:Project, context:Context): Boolean = {
+ override def execute(session: Session, project:Project, context:Context): Status = {
session.namespace.foreach { ns =>
println("Namespace:")
println(s" name: ${ns.name}")
@@ -54,6 +55,6 @@ class InspectCommand extends Command {
.foreach{ case(k,v) => println(s" $k=$v") }
}
- true
+ Status.SUCCESS
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/project/InspectCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/project/InspectCommand.scala
index 9f4657585..cfd1f463a 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/project/InspectCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/project/InspectCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Kaya Kupferschmidt
+ * Copyright 2020-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,12 +18,13 @@ package com.dimajix.flowman.tools.exec.project
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
class InspectCommand extends Command {
- override def execute(session: Session, project:Project, context:Context): Boolean = {
+ override def execute(session: Session, project:Project, context:Context): Status = {
println("Project:")
println(s" name: ${project.name}")
println(s" version: ${project.version.getOrElse("")}")
@@ -65,6 +66,6 @@ class InspectCommand extends Command {
.toSeq
.sortBy(_._1)
.foreach{ p => println(s" ${p._1}") }
- true
+ Status.SUCCESS
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/project/PhaseCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/project/PhaseCommand.scala
index 91628ccf5..08ebd117f 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/project/PhaseCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/project/PhaseCommand.scala
@@ -30,6 +30,7 @@ import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Lifecycle
import com.dimajix.flowman.execution.Phase
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Job
import com.dimajix.flowman.model.JobIdentifier
import com.dimajix.flowman.model.Project
@@ -55,7 +56,7 @@ sealed class PhaseCommand(phase:Phase) extends Command {
@Option(name = "-nl", aliases=Array("--no-lifecycle"), usage = "only executes the specific phase and not the whole lifecycle")
var noLifecycle: Boolean = false
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
val args = splitSettings(this.args).toMap
val job = "main"
Try {
@@ -64,13 +65,13 @@ sealed class PhaseCommand(phase:Phase) extends Command {
match {
case Failure(e) =>
logger.error(s"Error instantiating job '$job': ${reasons(e)}")
- false
+ Status.FAILED
case Success(job) =>
executeJob(session, job, job.parseArguments(args))
}
}
- private def executeJob(session: Session, job:Job, args:Map[String,FieldValue]) : Boolean = {
+ private def executeJob(session: Session, job:Job, args:Map[String,FieldValue]) : Status = {
val lifecycle =
if (noLifecycle)
Seq(phase)
@@ -81,10 +82,9 @@ sealed class PhaseCommand(phase:Phase) extends Command {
val jobArgs = args.map(kv => kv._1 + "=" + kv._2).mkString(", ")
logger.info(s"Executing job '${job.name}' $jobDescription with args $jobArgs")
- job.interpolate(args).forall { args =>
+ Status.ofAll(job.interpolate(args), keepGoing=keepGoing) { args =>
val runner = session.runner
- val result = runner.executeJob(job, lifecycle, args, targets.map(_.r), dirtyTargets=dirtyTargets.map(_.r), force=force, keepGoing=keepGoing, dryRun=dryRun)
- result.success
+ runner.executeJob(job, lifecycle, args, targets.map(_.r), dirtyTargets=dirtyTargets.map(_.r), force=force, keepGoing=keepGoing, dryRun=dryRun)
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/sql/SqlCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/sql/SqlCommand.scala
index e3f45a171..76f29b0fa 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/sql/SqlCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/sql/SqlCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2020-2021 Kaya Kupferschmidt
+ * Copyright 2020-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory
import com.dimajix.common.ExceptionUtils.reasons
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Mapping
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.spec.mapping.SqlMapping
@@ -45,7 +46,7 @@ class SqlCommand extends Command {
@Argument(index = 0, required = true, usage = "SQL statement to execute", metaVar = "", handler = classOf[RestOfArgumentsHandler])
var statement: Array[String] = Array()
- override def execute(session: Session, project: Project, context: Context): Boolean = {
+ override def execute(session: Session, project: Project, context: Context): Status = {
val mapping = SqlMapping(
Mapping.Properties(context, "sql-" + Clock.systemUTC().millis()),
sql = Some(statement.mkString(" "))
@@ -54,12 +55,12 @@ class SqlCommand extends Command {
val executor = session.execution
val df = executor.instantiate(mapping, "main")
ConsoleUtils.showDataFrame(df, limit, csv)
- true
+ Status.SUCCESS
}
catch {
case NonFatal(ex) =>
logger.error("Cannot execute sql: " + reasons(ex))
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/target/DependencyTreeCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/target/DependencyTreeCommand.scala
index 8d9069e57..00125b9c9 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/target/DependencyTreeCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/target/DependencyTreeCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Kaya Kupferschmidt
+ * Copyright 2021-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@ import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.NoSuchTargetException
import com.dimajix.flowman.execution.Phase
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.graph.GraphBuilder
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.model.TargetIdentifier
@@ -38,21 +39,21 @@ class DependencyTreeCommand extends Command {
@Argument(required = true, usage = "specifies target to inspect", metaVar = "")
var target: String = ""
- override def execute(session: Session, project: Project, context: Context): Boolean = {
+ override def execute(session: Session, project: Project, context: Context): Status = {
try {
val target = context.getTarget(TargetIdentifier(this.target))
val graph = new GraphBuilder(context, Phase.BUILD).addTarget(target).build()
val node = graph.target(target)
println(node.upstreamDependencyTree)
- true
+ Status.SUCCESS
}
catch {
case ex:NoSuchTargetException =>
logger.error(s"Cannot resolve target '${ex.target}'")
- false
+ Status.FAILED
case NonFatal(e) =>
logger.error(s"Error analyzing '$target': ${reasons(e)}")
- false
+ Status.FAILED
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/target/InspectCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/target/InspectCommand.scala
index 25d3dcf82..497b35c82 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/target/InspectCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/target/InspectCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Kaya Kupferschmidt
+ * Copyright 2021-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@ import com.dimajix.common.ExceptionUtils.reasons
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.NoSuchTargetException
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.model.TargetIdentifier
import com.dimajix.flowman.tools.exec.Command
@@ -36,7 +37,7 @@ class InspectCommand extends Command {
@Argument(required = true, usage = "specifies target to inspect", metaVar = "")
var target: String = ""
- override def execute(session: Session, project: Project, context: Context): Boolean = {
+ override def execute(session: Session, project: Project, context: Context): Status = {
try {
val target = context.getTarget(TargetIdentifier(this.target))
println("Target:")
@@ -58,15 +59,15 @@ class InspectCommand extends Command {
.toSeq.sorted
.foreach{ p => println(s" $p") }
}
- true
+ Status.SUCCESS
}
catch {
case ex:NoSuchTargetException =>
logger.error(s"Cannot resolve target '${ex.target}'")
- false
+ Status.FAILED
case NonFatal(e) =>
logger.error(s"Error inspecting '$target': ${reasons(e)}")
- false
+ Status.FAILED
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/target/ListCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/target/ListCommand.scala
index ef42cb67f..edf6dab3b 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/target/ListCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/target/ListCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
@@ -27,8 +28,8 @@ import com.dimajix.flowman.tools.exec.Command
class ListCommand extends Command {
private val logger = LoggerFactory.getLogger(classOf[ListCommand])
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
project.targets.keys.toList.sorted.foreach(println)
- true
+ Status.SUCCESS
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/target/PhaseCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/target/PhaseCommand.scala
index 864fb2915..ee4a61c3d 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/target/PhaseCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/target/PhaseCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018-2021 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -51,25 +51,18 @@ class PhaseCommand(phase:Phase) extends Command {
var noLifecycle: Boolean = false
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
val lifecycle =
if (noLifecycle)
Seq(phase)
else
Lifecycle.ofPhase(phase)
- Try {
- val allTargets = targets.flatMap(_.split(",")).map { t =>
- context.getTarget(TargetIdentifier(t))
- }
- val runner = session.runner
- runner.executeTargets(allTargets, lifecycle, force=force, keepGoing=keepGoing, dryRun=dryRun, isolated=false)
- } match {
- case Success(status) => status.success
- case Failure(e) =>
- logger.error(s"Error ${phase.upper} target '${targets.mkString(",")}: ${reasons(e)}")
- false
+ val allTargets = targets.flatMap(_.split(",")).map { t =>
+ context.getTarget(TargetIdentifier(t))
}
+ val runner = session.runner
+ runner.executeTargets(allTargets, lifecycle, force=force, keepGoing=keepGoing, dryRun=dryRun, isolated=false)
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/test/InspectCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/test/InspectCommand.scala
similarity index 91%
rename from flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/test/InspectCommand.scala
rename to flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/test/InspectCommand.scala
index d542cde8c..5211b75cf 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/test/InspectCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/test/InspectCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2020 Kaya Kupferschmidt
+ * Copyright 2020-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package com.dimajix.flowman.tools.shell.test
+package com.dimajix.flowman.tools.exec.test
import scala.util.control.NonFatal
@@ -22,9 +22,9 @@ import org.kohsuke.args4j.Argument
import org.slf4j.LoggerFactory
import com.dimajix.flowman.execution.Context
-import com.dimajix.flowman.execution.NoSuchJobException
import com.dimajix.flowman.execution.NoSuchTestException
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.model.TestIdentifier
import com.dimajix.flowman.tools.exec.Command
@@ -36,7 +36,7 @@ class InspectCommand extends Command {
@Argument(index=0, required=true, usage = "name of test to inspect", metaVar = "")
var test: String = ""
- override def execute(session: Session, project:Project, context:Context): Boolean = {
+ override def execute(session: Session, project:Project, context:Context): Status = {
try {
val test = context.getTest(TestIdentifier(this.test))
println(s"Name: ${test.name}")
@@ -69,15 +69,15 @@ class InspectCommand extends Command {
.toSeq
.sorted
.foreach{ p => println(s" $p") }
- true
+ Status.SUCCESS
}
catch {
case ex:NoSuchTestException =>
logger.error(s"Cannot resolve test '${ex.test}'")
- false
+ Status.FAILED
case NonFatal(e) =>
logger.error(s"Error '$test': ${e.getMessage}")
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/test/ListCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/test/ListCommand.scala
index 35c256f1c..df0fc0e1c 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/test/ListCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/test/ListCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
@@ -27,9 +28,9 @@ import com.dimajix.flowman.tools.exec.Command
class ListCommand extends Command {
private val logger = LoggerFactory.getLogger(classOf[ListCommand])
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
project.tests.keys.toList.sorted.foreach(println)
- true
+ Status.SUCCESS
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/test/RunCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/test/RunCommand.scala
index 43d5cf1ee..fe7e5b922 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/test/RunCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/test/RunCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018-2021 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -43,7 +43,7 @@ class RunCommand extends Command {
var parallelism: Int = 1
- override def execute(session: Session, project: Project, context:Context) : Boolean = {
+ override def execute(session: Session, project: Project, context:Context) : Status = {
val allTests = if (tests.nonEmpty) {
tests.flatMap(_.split(",")).toSeq.distinct
}
@@ -59,7 +59,7 @@ class RunCommand extends Command {
if(!status.success) {
logger.error(red("There have been test failures"))
}
- status.success
+ status
}
private def executeLinear(session: Session, context:Context, tests:Seq[String]) : Status = {
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/test/TestCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/test/TestCommand.scala
index c85b53170..30d9d0ad0 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/test/TestCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/exec/test/TestCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@ import com.dimajix.flowman.tools.exec.NestedCommand
class TestCommand extends NestedCommand {
@Argument(required=true,index=0,metaVar="",usage="the subcommand to run",handler=classOf[SubCommandHandler])
@SubCommands(Array(
+ new SubCommand(name="inspect",impl=classOf[InspectCommand]),
new SubCommand(name="list",impl=classOf[ListCommand]),
new SubCommand(name="run",impl=classOf[RunCommand])
))
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/EvaluateCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/EvaluateCommand.scala
index 97c7bc79d..b30523d97 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/EvaluateCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/EvaluateCommand.scala
@@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
@@ -34,15 +35,15 @@ class EvaluateCommand extends Command {
@Argument(index=0, required=true, usage = "expression to evaluate", metaVar = "", handler=classOf[RestOfArgumentsHandler])
var args: Array[String] = Array()
- override def execute(session: Session, project:Project, context:Context): Boolean = {
+ override def execute(session: Session, project:Project, context:Context): Status = {
try {
println(context.evaluate(args.mkString(" ")))
- true
+ Status.SUCCESS
}
catch {
case NonFatal(e) =>
logger.error(s"Error: ${e.getMessage}")
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/ExitCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/ExitCommand.scala
index 6e0fafdd6..654c30f5a 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/ExitCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/ExitCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Kaya Kupferschmidt
+ * Copyright 2021-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,14 +18,15 @@ package com.dimajix.flowman.tools.shell
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
class ExitCommand extends Command {
- override def execute(session: Session, project: Project, context: Context): Boolean = {
+ override def execute(session: Session, project: Project, context: Context): Status = {
session.shutdown()
System.exit(0)
- true
+ Status.SUCCESS
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/job/EnterCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/job/EnterCommand.scala
index d1ad5c77e..7bb8f41d8 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/job/EnterCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/job/EnterCommand.scala
@@ -27,6 +27,7 @@ import com.dimajix.flowman.execution.Session
import com.dimajix.flowman.model.JobIdentifier
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.common.ParserUtils.splitSettings
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.tools.exec.Command
import com.dimajix.flowman.tools.shell.Shell
@@ -39,20 +40,20 @@ class EnterCommand extends Command {
@Argument(index=1, required=false, usage = "specifies job parameters", metaVar = "=")
var args: Array[String] = Array()
- override def execute(session: Session, project:Project, context:Context): Boolean = {
+ override def execute(session: Session, project:Project, context:Context): Status = {
try {
val job = context.getJob(JobIdentifier(this.job))
val args = splitSettings(this.args).toMap
Shell.instance.enterJob(job, args)
- true
+ Status.SUCCESS
}
catch {
case ex:NoSuchJobException =>
logger.error(s"Cannot resolve job '${ex.job}'")
- false
+ Status.FAILED
case NonFatal(e) =>
logger.error(s"Error entering job '$job': ${e.getMessage}")
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/job/LeaveCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/job/LeaveCommand.scala
index e4b34754d..c56b310d2 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/job/LeaveCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/job/LeaveCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018-2019 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,14 +18,15 @@ package com.dimajix.flowman.tools.shell.job
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
import com.dimajix.flowman.tools.shell.Shell
class LeaveCommand extends Command {
- override def execute(session: Session, project:Project, context:Context): Boolean = {
+ override def execute(session: Session, project:Project, context:Context): Status = {
Shell.instance.leaveJob()
- true
+ Status.SUCCESS
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/project/LoadCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/project/LoadCommand.scala
index 1e582e71e..e2466936c 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/project/LoadCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/project/LoadCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018-2019 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
import com.dimajix.flowman.tools.shell.Shell
@@ -35,15 +36,15 @@ class LoadCommand extends Command {
@Argument(index=0, required=true, usage = "filename or directory of project to load", metaVar = "")
var project: String = ""
- override def execute(session: Session, project:Project, context:Context): Boolean = {
+ override def execute(session: Session, project:Project, context:Context): Status = {
try {
Shell.instance.loadProject(new Path(this.project))
- true
+ Status.SUCCESS
}
catch {
case NonFatal(e) =>
logger.error(s"Error loading project '${this.project}': ${e.getMessage}")
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/project/ReloadCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/project/ReloadCommand.scala
index c5d7d2964..22b3ccbb5 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/project/ReloadCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/project/ReloadCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018-2019 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
import com.dimajix.flowman.tools.shell.Shell
@@ -31,20 +32,20 @@ import com.dimajix.flowman.tools.shell.Shell
class ReloadCommand extends Command {
private val logger = LoggerFactory.getLogger(classOf[ReloadCommand])
- override def execute(session: Session, project:Project, context:Context): Boolean = {
+ override def execute(session: Session, project:Project, context:Context): Status = {
project.filename.map { fn =>
try {
Shell.instance.loadProject(fn.path)
- true
+ Status.SUCCESS
}
catch {
case NonFatal(e) =>
logger.error(s"Error reloading current project '${fn}': ${e.getMessage}")
- false
+ Status.FAILED
}
}.getOrElse {
logger.warn(s"Cannot reload current project, since it has no path")
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/test/EnterCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/test/EnterCommand.scala
index f8334fa47..3d384522b 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/test/EnterCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/test/EnterCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018-2019 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.NoSuchTestException
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.model.TestIdentifier
import com.dimajix.flowman.tools.exec.Command
@@ -36,19 +37,19 @@ class EnterCommand extends Command {
@Argument(index=0, required=true, usage = "name of test to enter", metaVar = "")
var test: String = ""
- override def execute(session: Session, project:Project, context:Context): Boolean = {
+ override def execute(session: Session, project:Project, context:Context): Status = {
try {
val test = context.getTest(TestIdentifier(this.test))
Shell.instance.enterTest(test)
- true
+ Status.SUCCESS
}
catch {
case ex:NoSuchTestException =>
logger.error(s"Cannot resolve test '${ex.test}'")
- false
+ Status.FAILED
case NonFatal(e) =>
logger.error(s"Error entering test '$test': ${e.getMessage}")
- false
+ Status.FAILED
}
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/test/LeaveCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/test/LeaveCommand.scala
index fe88daf5a..efe202f15 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/test/LeaveCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/test/LeaveCommand.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018-2019 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,14 +18,15 @@ package com.dimajix.flowman.tools.shell.test
import com.dimajix.flowman.execution.Context
import com.dimajix.flowman.execution.Session
+import com.dimajix.flowman.execution.Status
import com.dimajix.flowman.model.Project
import com.dimajix.flowman.tools.exec.Command
import com.dimajix.flowman.tools.shell.Shell
class LeaveCommand extends Command {
- override def execute(session: Session, project:Project, context:Context): Boolean = {
+ override def execute(session: Session, project:Project, context:Context): Status = {
Shell.instance.leaveTest()
- true
+ Status.SUCCESS
}
}
diff --git a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/test/TestCommand.scala b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/test/TestCommand.scala
index 31c8968ee..9228b185b 100644
--- a/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/test/TestCommand.scala
+++ b/flowman-tools/src/main/scala/com/dimajix/flowman/tools/shell/test/TestCommand.scala
@@ -23,6 +23,7 @@ import org.kohsuke.args4j.spi.SubCommands
import com.dimajix.flowman.tools.exec.Command
import com.dimajix.flowman.tools.exec.NestedCommand
+import com.dimajix.flowman.tools.exec.test.InspectCommand
import com.dimajix.flowman.tools.exec.test.ListCommand
import com.dimajix.flowman.tools.exec.test.RunCommand
diff --git a/flowman-tools/src/test/scala/com/dimajix/flowman/tools/exec/DriverTest.scala b/flowman-tools/src/test/scala/com/dimajix/flowman/tools/exec/DriverTest.scala
index 9b28b1ca8..677c176fe 100644
--- a/flowman-tools/src/test/scala/com/dimajix/flowman/tools/exec/DriverTest.scala
+++ b/flowman-tools/src/test/scala/com/dimajix/flowman/tools/exec/DriverTest.scala
@@ -1,5 +1,5 @@
/*
- * Copyright 2018-2019 Kaya Kupferschmidt
+ * Copyright 2018-2022 Kaya Kupferschmidt
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,10 +20,12 @@ import org.kohsuke.args4j.CmdLineException
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
+import com.dimajix.flowman.execution.Status
+
class DriverTest extends AnyFlatSpec with Matchers {
"The Driver" should "fail with an exception on wrong arguments" in {
- Driver.run() should be (true)
+ Driver.run() should be (Status.SUCCESS)
a[CmdLineException] shouldBe thrownBy(Driver.run("no_such_command"))
a[CmdLineException] shouldBe thrownBy(Driver.run("project", "run"))
diff --git a/pom.xml b/pom.xml
index d1d243911..6315c494d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
com.dimajix.flowman
flowman-root
- 0.21.0
+ 0.21.1
pom
Flowman root pom
A Spark based ETL tool