diff --git a/model/src/main/scala/za/co/absa/atum/model/RunStatus.scala b/model/src/main/scala/za/co/absa/atum/model/RunStatus.scala index 26cd8d50..2c61cdf4 100644 --- a/model/src/main/scala/za/co/absa/atum/model/RunStatus.scala +++ b/model/src/main/scala/za/co/absa/atum/model/RunStatus.scala @@ -15,10 +15,15 @@ package za.co.absa.atum.model +import com.fasterxml.jackson.core.`type`.TypeReference +import com.fasterxml.jackson.module.scala.JsonScalaEnumeration import za.co.absa.atum.model.RunState.RunState +class RunStateType extends TypeReference[RunState.type] + case class RunStatus ( + @JsonScalaEnumeration(classOf[RunStateType]) status: RunState, error: Option[RunError] ) diff --git a/model/src/test/scala/za/co/absa/atum/util/JacksonJsonSerializer.scala b/model/src/test/scala/za/co/absa/atum/util/JacksonJsonSerializer.scala new file mode 100644 index 00000000..d009ffb4 --- /dev/null +++ b/model/src/test/scala/za/co/absa/atum/util/JacksonJsonSerializer.scala @@ -0,0 +1,58 @@ +/* + * Copyright 2018-2019 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.util + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import scala.reflect.ClassTag +import scala.util.Try + +/** + * Sample serializer that is expected to be used for Atum's model externally, e.g. in Enceladus + */ +object JacksonJsonSerializer { + + val objectMapper: ObjectMapper = new ObjectMapper() + .registerModule(DefaultScalaModule) + .setSerializationInclusion(Include.NON_EMPTY) // e.g. null-values fields omitted + + + def fromJson[T](json: String) + (implicit ct: ClassTag[T]): T = { + val clazz = ct.runtimeClass.asInstanceOf[Class[T]] + if (clazz == classOf[String]) { + json.asInstanceOf[T] + } else { + objectMapper.readValue(json, clazz) + } + } + + def toJson[T](entity: T): String = { + entity match { + case str: String => + if (isValidJson(str)) str else objectMapper.writeValueAsString(entity) + case _ => + objectMapper.writeValueAsString(entity) + } + } + + def isValidJson[T](str: T with String): Boolean = { + Try(objectMapper.readTree(str)).isSuccess + } + +} diff --git a/model/src/test/scala/za/co/absa/atum/util/SerializationUtilsJsonSpec.scala b/model/src/test/scala/za/co/absa/atum/util/SerializationUtilsJsonSpec.scala index 971e57f3..2a359a27 100644 --- a/model/src/test/scala/za/co/absa/atum/util/SerializationUtilsJsonSpec.scala +++ b/model/src/test/scala/za/co/absa/atum/util/SerializationUtilsJsonSpec.scala @@ -21,7 +21,7 @@ import za.co.absa.atum.model.{Checkpoint, ControlMeasure, ControlMeasureMetadata import za.co.absa.atum.utils.SerializationUtils /** - * Unit tests for ControlMeasure SerializationUtils-based object serialization + * Unit tests for ControlMeasure and RunStatus SerializationUtils-based object serialization */ class SerializationUtilsJsonSpec extends AnyFlatSpec with Matchers { @@ -136,4 +136,13 @@ class SerializationUtilsJsonSpec extends AnyFlatSpec with Matchers { SerializationUtils.fromJson[Seq[RunStatus]](SerializationUtils.asJsonPretty(runStatuses)) shouldEqual runStatuses } + // jackson serialization support (notice the `runStatusesJson` being reused): + it should "serialize via Jackson's toJson" in { + JacksonJsonSerializer.toJson(runStatuses) shouldBe runStatusesJson + } + + it should "deserialize via Jackson's fromJson" in { + JacksonJsonSerializer.fromJson[Array[RunStatus]](runStatusesJson) shouldBe runStatuses // Array to overcome runtime erasure + } + } diff --git a/pom.xml b/pom.xml index 37a8a6a6..c882b787 100644 --- a/pom.xml +++ b/pom.xml @@ -120,6 +120,7 @@ 3.5.3 + 2.10.4 2.0.2 3.2.9 1.7.25 @@ -154,6 +155,16 @@ spark-core_${scala.binary.version} ${spark.version} provided + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.module + jackson-module-scala_${scala.binary.version} + + org.apache.spark @@ -169,6 +180,18 @@ ${json4s.version} + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + com.fasterxml.jackson.module + jackson-module-scala_${scala.binary.version} + ${jackson.version} + + + org.specs2 diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 65a6d8ce..a2408645 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -30,6 +30,7 @@ object Dependencies { val scalatest = "3.2.9" val specs2 = "2.5" val aws = "2.17.85" + val jacksonModuleScala = "2.10.4" val apacheCommonsLang3 = "3.12.0" val commonsConfiguration = "1.6" @@ -57,21 +58,44 @@ object Dependencies { } } - val sparkCore = moduleByScala("org.apache.spark" %% "spark-core" % _ % Provided)(Versions.spark2, Versions.spark3) _ - val sparkSql = moduleByScala("org.apache.spark" %% "spark-sql" % _ % Provided)(Versions.spark2, Versions.spark3) _ + + // extended version where to moduleId Fn takes 2 params: module version and scala version (to pass along) + def moduleByScalaUsingScalaVersion(moduleIdWithoutVersionNeedsScalaVersion: (String, String) => ModuleID) + (scala211Version: String, scala212Version: String) + (actualScalaVersion: String): ModuleID = { + actualScalaVersion match { + case _ if actualScalaVersion.startsWith("2.11") => moduleIdWithoutVersionNeedsScalaVersion.apply(scala211Version, actualScalaVersion) + case _ if actualScalaVersion.startsWith("2.12") => moduleIdWithoutVersionNeedsScalaVersion.apply(scala212Version, actualScalaVersion) + case _ => throw new IllegalArgumentException("Only Scala 2.11 and 2.12 are currently supported.") + } + } + + + lazy val sparkCore = { + def coreWithExcludes(version: String, scalaVersion: String): ModuleID = "org.apache.spark" %% "spark-core" % version % Provided exclude( + "com.fasterxml.jackson.core", "jackson-databind" + ) exclude( + "com.fasterxml.jackson.module", "jackson-module-scala_" + scalaVersion.substring(0, 4) // e.g. 2.11 + ) + moduleByScalaUsingScalaVersion(coreWithExcludes)(Versions.spark2, Versions.spark3) _ + } + + lazy val sparkSql = moduleByScala("org.apache.spark" %% "spark-sql" % _ % Provided)(Versions.spark2, Versions.spark3) _ lazy val scalaTest = "org.scalatest" %% "scalatest" % Versions.scalatest % Test - val json4sExt = moduleByScala("org.json4s" %% "json4s-ext" % _)(Versions.json4s_spark2, Versions.json4s_spark3) _ - val json4sCore = moduleByScala("org.json4s" %% "json4s-core" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3) _ - val json4sJackson = moduleByScala("org.json4s" %% "json4s-jackson" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3) _ - val json4sNative = moduleByScala("org.json4s" %% "json4s-native" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3)_ + lazy val json4sExt = moduleByScala("org.json4s" %% "json4s-ext" % _)(Versions.json4s_spark2, Versions.json4s_spark3) _ + lazy val json4sCore = moduleByScala("org.json4s" %% "json4s-core" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3) _ + lazy val json4sJackson = moduleByScala("org.json4s" %% "json4s-jackson" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3) _ + lazy val json4sNative = moduleByScala("org.json4s" %% "json4s-native" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3) _ lazy val absaCommons = "za.co.absa.commons" %% "commons" % Versions.absaCommons lazy val commonsConfiguration = "commons-configuration" % "commons-configuration" % Versions.commonsConfiguration lazy val apacheCommons = "org.apache.commons" % "commons-lang3" % Versions.apacheCommonsLang3 lazy val typeSafeConfig = "com.typesafe" % "config" % Versions.typesafeConfig + lazy val jacksonModuleScala = "com.fasterxml.jackson.module" %% "jackson-module-scala" % Versions.jacksonModuleScala + lazy val mockitoScala = "org.mockito" %% "mockito-scala" % Versions.mockitoScala % Test lazy val mockitoScalaScalatest = "org.mockito" %% "mockito-scala-scalatest" % Versions.mockitoScala % Test @@ -90,7 +114,8 @@ object Dependencies { def modelDependencies(scalaVersion: String): Seq[ModuleID] = Seq( json4sCore(scalaVersion), json4sJackson(scalaVersion), - json4sNative(scalaVersion) + json4sNative(scalaVersion), + jacksonModuleScala ) def coreDependencies(scalaVersion: String): Seq[ModuleID] = Seq(