diff --git a/model/src/main/scala/za/co/absa/atum/model/RunStatus.scala b/model/src/main/scala/za/co/absa/atum/model/RunStatus.scala
index 26cd8d50..2c61cdf4 100644
--- a/model/src/main/scala/za/co/absa/atum/model/RunStatus.scala
+++ b/model/src/main/scala/za/co/absa/atum/model/RunStatus.scala
@@ -15,10 +15,15 @@
package za.co.absa.atum.model
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.module.scala.JsonScalaEnumeration
import za.co.absa.atum.model.RunState.RunState
+class RunStateType extends TypeReference[RunState.type]
+
case class RunStatus
(
+ @JsonScalaEnumeration(classOf[RunStateType])
status: RunState,
error: Option[RunError]
)
diff --git a/model/src/test/scala/za/co/absa/atum/util/JacksonJsonSerializer.scala b/model/src/test/scala/za/co/absa/atum/util/JacksonJsonSerializer.scala
new file mode 100644
index 00000000..d009ffb4
--- /dev/null
+++ b/model/src/test/scala/za/co/absa/atum/util/JacksonJsonSerializer.scala
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2018-2019 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.atum.util
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+import scala.reflect.ClassTag
+import scala.util.Try
+
+/**
+ * Sample serializer that is expected to be used for Atum's model externally, e.g. in Enceladus
+ */
+object JacksonJsonSerializer {
+
+ val objectMapper: ObjectMapper = new ObjectMapper()
+ .registerModule(DefaultScalaModule)
+ .setSerializationInclusion(Include.NON_EMPTY) // e.g. null-values fields omitted
+
+
+ def fromJson[T](json: String)
+ (implicit ct: ClassTag[T]): T = {
+ val clazz = ct.runtimeClass.asInstanceOf[Class[T]]
+ if (clazz == classOf[String]) {
+ json.asInstanceOf[T]
+ } else {
+ objectMapper.readValue(json, clazz)
+ }
+ }
+
+ def toJson[T](entity: T): String = {
+ entity match {
+ case str: String =>
+ if (isValidJson(str)) str else objectMapper.writeValueAsString(entity)
+ case _ =>
+ objectMapper.writeValueAsString(entity)
+ }
+ }
+
+ def isValidJson[T](str: T with String): Boolean = {
+ Try(objectMapper.readTree(str)).isSuccess
+ }
+
+}
diff --git a/model/src/test/scala/za/co/absa/atum/util/SerializationUtilsJsonSpec.scala b/model/src/test/scala/za/co/absa/atum/util/SerializationUtilsJsonSpec.scala
index 971e57f3..2a359a27 100644
--- a/model/src/test/scala/za/co/absa/atum/util/SerializationUtilsJsonSpec.scala
+++ b/model/src/test/scala/za/co/absa/atum/util/SerializationUtilsJsonSpec.scala
@@ -21,7 +21,7 @@ import za.co.absa.atum.model.{Checkpoint, ControlMeasure, ControlMeasureMetadata
import za.co.absa.atum.utils.SerializationUtils
/**
- * Unit tests for ControlMeasure SerializationUtils-based object serialization
+ * Unit tests for ControlMeasure and RunStatus SerializationUtils-based object serialization
*/
class SerializationUtilsJsonSpec extends AnyFlatSpec with Matchers {
@@ -136,4 +136,13 @@ class SerializationUtilsJsonSpec extends AnyFlatSpec with Matchers {
SerializationUtils.fromJson[Seq[RunStatus]](SerializationUtils.asJsonPretty(runStatuses)) shouldEqual runStatuses
}
+ // jackson serialization support (notice the `runStatusesJson` being reused):
+ it should "serialize via Jackson's toJson" in {
+ JacksonJsonSerializer.toJson(runStatuses) shouldBe runStatusesJson
+ }
+
+ it should "deserialize via Jackson's fromJson" in {
+ JacksonJsonSerializer.fromJson[Array[RunStatus]](runStatusesJson) shouldBe runStatuses // Array to overcome runtime erasure
+ }
+
}
diff --git a/pom.xml b/pom.xml
index 37a8a6a6..c882b787 100644
--- a/pom.xml
+++ b/pom.xml
@@ -120,6 +120,7 @@
3.5.3
+ 2.10.4
2.0.2
3.2.9
1.7.25
@@ -154,6 +155,16 @@
spark-core_${scala.binary.version}
${spark.version}
provided
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ com.fasterxml.jackson.module
+ jackson-module-scala_${scala.binary.version}
+
+
org.apache.spark
@@ -169,6 +180,18 @@
${json4s.version}
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${jackson.version}
+
+
+ com.fasterxml.jackson.module
+ jackson-module-scala_${scala.binary.version}
+ ${jackson.version}
+
+
+
org.specs2
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 65a6d8ce..a2408645 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -30,6 +30,7 @@ object Dependencies {
val scalatest = "3.2.9"
val specs2 = "2.5"
val aws = "2.17.85"
+ val jacksonModuleScala = "2.10.4"
val apacheCommonsLang3 = "3.12.0"
val commonsConfiguration = "1.6"
@@ -57,21 +58,44 @@ object Dependencies {
}
}
- val sparkCore = moduleByScala("org.apache.spark" %% "spark-core" % _ % Provided)(Versions.spark2, Versions.spark3) _
- val sparkSql = moduleByScala("org.apache.spark" %% "spark-sql" % _ % Provided)(Versions.spark2, Versions.spark3) _
+
+ // extended version where to moduleId Fn takes 2 params: module version and scala version (to pass along)
+ def moduleByScalaUsingScalaVersion(moduleIdWithoutVersionNeedsScalaVersion: (String, String) => ModuleID)
+ (scala211Version: String, scala212Version: String)
+ (actualScalaVersion: String): ModuleID = {
+ actualScalaVersion match {
+ case _ if actualScalaVersion.startsWith("2.11") => moduleIdWithoutVersionNeedsScalaVersion.apply(scala211Version, actualScalaVersion)
+ case _ if actualScalaVersion.startsWith("2.12") => moduleIdWithoutVersionNeedsScalaVersion.apply(scala212Version, actualScalaVersion)
+ case _ => throw new IllegalArgumentException("Only Scala 2.11 and 2.12 are currently supported.")
+ }
+ }
+
+
+ lazy val sparkCore = {
+ def coreWithExcludes(version: String, scalaVersion: String): ModuleID = "org.apache.spark" %% "spark-core" % version % Provided exclude(
+ "com.fasterxml.jackson.core", "jackson-databind"
+ ) exclude(
+ "com.fasterxml.jackson.module", "jackson-module-scala_" + scalaVersion.substring(0, 4) // e.g. 2.11
+ )
+ moduleByScalaUsingScalaVersion(coreWithExcludes)(Versions.spark2, Versions.spark3) _
+ }
+
+ lazy val sparkSql = moduleByScala("org.apache.spark" %% "spark-sql" % _ % Provided)(Versions.spark2, Versions.spark3) _
lazy val scalaTest = "org.scalatest" %% "scalatest" % Versions.scalatest % Test
- val json4sExt = moduleByScala("org.json4s" %% "json4s-ext" % _)(Versions.json4s_spark2, Versions.json4s_spark3) _
- val json4sCore = moduleByScala("org.json4s" %% "json4s-core" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3) _
- val json4sJackson = moduleByScala("org.json4s" %% "json4s-jackson" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3) _
- val json4sNative = moduleByScala("org.json4s" %% "json4s-native" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3)_
+ lazy val json4sExt = moduleByScala("org.json4s" %% "json4s-ext" % _)(Versions.json4s_spark2, Versions.json4s_spark3) _
+ lazy val json4sCore = moduleByScala("org.json4s" %% "json4s-core" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3) _
+ lazy val json4sJackson = moduleByScala("org.json4s" %% "json4s-jackson" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3) _
+ lazy val json4sNative = moduleByScala("org.json4s" %% "json4s-native" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3) _
lazy val absaCommons = "za.co.absa.commons" %% "commons" % Versions.absaCommons
lazy val commonsConfiguration = "commons-configuration" % "commons-configuration" % Versions.commonsConfiguration
lazy val apacheCommons = "org.apache.commons" % "commons-lang3" % Versions.apacheCommonsLang3
lazy val typeSafeConfig = "com.typesafe" % "config" % Versions.typesafeConfig
+ lazy val jacksonModuleScala = "com.fasterxml.jackson.module" %% "jackson-module-scala" % Versions.jacksonModuleScala
+
lazy val mockitoScala = "org.mockito" %% "mockito-scala" % Versions.mockitoScala % Test
lazy val mockitoScalaScalatest = "org.mockito" %% "mockito-scala-scalatest" % Versions.mockitoScala % Test
@@ -90,7 +114,8 @@ object Dependencies {
def modelDependencies(scalaVersion: String): Seq[ModuleID] = Seq(
json4sCore(scalaVersion),
json4sJackson(scalaVersion),
- json4sNative(scalaVersion)
+ json4sNative(scalaVersion),
+ jacksonModuleScala
)
def coreDependencies(scalaVersion: String): Seq[ModuleID] = Seq(