Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#131 Backport of jackson serialization support (RunStatus enum) #132

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions model/src/main/scala/za/co/absa/atum/model/RunStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@

package za.co.absa.atum.model

import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.module.scala.JsonScalaEnumeration
import za.co.absa.atum.model.RunState.RunState

class RunStateType extends TypeReference[RunState.type]

case class RunStatus
(
@JsonScalaEnumeration(classOf[RunStateType])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note: The whole class feels little utilitarian for Enceladus' purposes. But no point of removing it now..

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No such class in the next major version 👍

status: RunState,
error: Option[RunError]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2018-2019 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.util

import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import scala.reflect.ClassTag
import scala.util.Try

/**
* Sample serializer that is expected to be used for Atum's model externally, e.g. in Enceladus
*/
object JacksonJsonSerializer {

val objectMapper: ObjectMapper = new ObjectMapper()
.registerModule(DefaultScalaModule)
.setSerializationInclusion(Include.NON_EMPTY) // e.g. null-values fields omitted


def fromJson[T](json: String)
(implicit ct: ClassTag[T]): T = {
val clazz = ct.runtimeClass.asInstanceOf[Class[T]]
if (clazz == classOf[String]) {
json.asInstanceOf[T]
} else {
objectMapper.readValue(json, clazz)
}
}

def toJson[T](entity: T): String = {
entity match {
case str: String =>
if (isValidJson(str)) str else objectMapper.writeValueAsString(entity)
case _ =>
objectMapper.writeValueAsString(entity)
}
}

def isValidJson[T](str: T with String): Boolean = {
Try(objectMapper.readTree(str)).isSuccess
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import za.co.absa.atum.model.{Checkpoint, ControlMeasure, ControlMeasureMetadata
import za.co.absa.atum.utils.SerializationUtils

/**
* Unit tests for ControlMeasure SerializationUtils-based object serialization
* Unit tests for ControlMeasure and RunStatus SerializationUtils-based object serialization
*/
class SerializationUtilsJsonSpec extends AnyFlatSpec with Matchers {

Expand Down Expand Up @@ -136,4 +136,13 @@ class SerializationUtilsJsonSpec extends AnyFlatSpec with Matchers {
SerializationUtils.fromJson[Seq[RunStatus]](SerializationUtils.asJsonPretty(runStatuses)) shouldEqual runStatuses
}

// jackson serialization support (notice the `runStatusesJson` being reused):
it should "serialize via Jackson's toJson" in {
JacksonJsonSerializer.toJson(runStatuses) shouldBe runStatusesJson
}

it should "deserialize via Jackson's fromJson" in {
JacksonJsonSerializer.fromJson[Array[RunStatus]](runStatusesJson) shouldBe runStatuses // Array to overcome runtime erasure
}

}
23 changes: 23 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@

<!-- Frameworks and libraries -->
<json4s.version>3.5.3</json4s.version> <!-- This version is set to be compatible with Spark 2.4.5 -->
<jackson.version>2.10.4</jackson.version> <!-- Spark 2.4.x default is 2.8.4 -->
<scalatest.maven.version>2.0.2</scalatest.maven.version>
<scalatest.version>3.2.9</scalatest.version>
<slf4j.version>1.7.25</slf4j.version>
Expand Down Expand Up @@ -154,6 +155,16 @@
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These exclusions are missing from sbt. Is that OK?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for noticing. I have added the same excludes to sbt version as well.

<exclusion>
<artifactId>com.fasterxml.jackson.core</artifactId>
<groupId>jackson-databind</groupId>
</exclusion>
<exclusion>
<artifactId>com.fasterxml.jackson.module</artifactId>
<groupId>jackson-module-scala_${scala.binary.version}</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand All @@ -169,6 +180,18 @@
<version>${json4s.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<version>${jackson.version}</version>
</dependency>


<!-- scalatest -->
<dependency>
<groupId>org.specs2</groupId>
Expand Down
39 changes: 32 additions & 7 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ object Dependencies {
val scalatest = "3.2.9"
val specs2 = "2.5"
val aws = "2.17.85"
val jacksonModuleScala = "2.10.4"

val apacheCommonsLang3 = "3.12.0"
val commonsConfiguration = "1.6"
Expand Down Expand Up @@ -57,21 +58,44 @@ object Dependencies {
}
}

val sparkCore = moduleByScala("org.apache.spark" %% "spark-core" % _ % Provided)(Versions.spark2, Versions.spark3) _
val sparkSql = moduleByScala("org.apache.spark" %% "spark-sql" % _ % Provided)(Versions.spark2, Versions.spark3) _

// extended version where to moduleId Fn takes 2 params: module version and scala version (to pass along)
def moduleByScalaUsingScalaVersion(moduleIdWithoutVersionNeedsScalaVersion: (String, String) => ModuleID)
(scala211Version: String, scala212Version: String)
(actualScalaVersion: String): ModuleID = {
actualScalaVersion match {
case _ if actualScalaVersion.startsWith("2.11") => moduleIdWithoutVersionNeedsScalaVersion.apply(scala211Version, actualScalaVersion)
case _ if actualScalaVersion.startsWith("2.12") => moduleIdWithoutVersionNeedsScalaVersion.apply(scala212Version, actualScalaVersion)
case _ => throw new IllegalArgumentException("Only Scala 2.11 and 2.12 are currently supported.")
}
}


lazy val sparkCore = {
def coreWithExcludes(version: String, scalaVersion: String): ModuleID = "org.apache.spark" %% "spark-core" % version % Provided exclude(
"com.fasterxml.jackson.core", "jackson-databind"
) exclude(
"com.fasterxml.jackson.module", "jackson-module-scala_" + scalaVersion.substring(0, 4) // e.g. 2.11
)
moduleByScalaUsingScalaVersion(coreWithExcludes)(Versions.spark2, Versions.spark3) _
}

lazy val sparkSql = moduleByScala("org.apache.spark" %% "spark-sql" % _ % Provided)(Versions.spark2, Versions.spark3) _

lazy val scalaTest = "org.scalatest" %% "scalatest" % Versions.scalatest % Test

val json4sExt = moduleByScala("org.json4s" %% "json4s-ext" % _)(Versions.json4s_spark2, Versions.json4s_spark3) _
val json4sCore = moduleByScala("org.json4s" %% "json4s-core" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3) _
val json4sJackson = moduleByScala("org.json4s" %% "json4s-jackson" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3) _
val json4sNative = moduleByScala("org.json4s" %% "json4s-native" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3)_
lazy val json4sExt = moduleByScala("org.json4s" %% "json4s-ext" % _)(Versions.json4s_spark2, Versions.json4s_spark3) _
lazy val json4sCore = moduleByScala("org.json4s" %% "json4s-core" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3) _
lazy val json4sJackson = moduleByScala("org.json4s" %% "json4s-jackson" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3) _
lazy val json4sNative = moduleByScala("org.json4s" %% "json4s-native" % _ % Provided)(Versions.json4s_spark2, Versions.json4s_spark3) _

lazy val absaCommons = "za.co.absa.commons" %% "commons" % Versions.absaCommons
lazy val commonsConfiguration = "commons-configuration" % "commons-configuration" % Versions.commonsConfiguration
lazy val apacheCommons = "org.apache.commons" % "commons-lang3" % Versions.apacheCommonsLang3
lazy val typeSafeConfig = "com.typesafe" % "config" % Versions.typesafeConfig

lazy val jacksonModuleScala = "com.fasterxml.jackson.module" %% "jackson-module-scala" % Versions.jacksonModuleScala

lazy val mockitoScala = "org.mockito" %% "mockito-scala" % Versions.mockitoScala % Test
lazy val mockitoScalaScalatest = "org.mockito" %% "mockito-scala-scalatest" % Versions.mockitoScala % Test

Expand All @@ -90,7 +114,8 @@ object Dependencies {
def modelDependencies(scalaVersion: String): Seq[ModuleID] = Seq(
json4sCore(scalaVersion),
json4sJackson(scalaVersion),
json4sNative(scalaVersion)
json4sNative(scalaVersion),
jacksonModuleScala
)

def coreDependencies(scalaVersion: String): Seq[ModuleID] = Seq(
Expand Down