Skip to content

Commit

Permalink
conversion adds openlineage facets into spline extra
Browse files Browse the repository at this point in the history
  • Loading branch information
cerveada committed Jun 1, 2022
1 parent 87f7882 commit 84699a4
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object OpenLineageToSplineConverter {
expressions = None,
systemInfo = SystemInfoExtractor.extract(runEvent),
agentInfo = Some(NameAndVersion(s"spline-open-lineage-aggregator", AggregatorBuildInfo.Version)),
extraInfo = Map.empty
extraInfo = createExtra(runEvent.job.facets)
)

val planId = generateId(planWithoutId)
Expand All @@ -89,7 +89,7 @@ object OpenLineageToSplineConverter {
durationNs = None,
discriminator = None,
error = None,
extra = Map.empty
extra = createExtra(runEvent.run.facets)
)

(plan, event)
Expand All @@ -103,7 +103,8 @@ object OpenLineageToSplineConverter {
name = None,
output = None,
params = Map.empty,
extra = Map.empty
extra = createExtra(in.facets) ++
in.inputFacets.map(fs => Map("inputFacets" -> fs)).getOrElse(Map.empty)
))
}

Expand All @@ -115,10 +116,16 @@ object OpenLineageToSplineConverter {
name = None,
childIds = inputIds,
params = Map.empty,
extra = Map.empty
extra = createExtra(output.facets) ++
output.outputFacets.map(fs => Map("outputFacets" -> fs)).getOrElse(Map.empty)
)
}

private def createExtra(facets: Option[Map[String, Any]]): Map[String, Any] =
facets
.map(facetMap => Map("facets" -> facetMap))
.getOrElse(Map.empty)

private val ExecutionPlanUUIDNamespace: UUID = UUID.fromString("475196d0-16ca-4cba-aec7-c9f2ddd9326c")

private def generateId(entity: AnyRef): UUID = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ package za.co.absa.spline.ol.model.openlineage.v0_3_1
case class InputDataset (
namespace: String,
name: String,
facets: Option[Map[String, DatasetFacet]],
inputFacets: Option[Map[String, InputDatasetFacet]]
facets: Option[Map[String, Any]],
inputFacets: Option[Map[String, Any]]
)
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ package za.co.absa.spline.ol.model.openlineage.v0_3_1
case class Job (
namespace: String,
name: String,
facets: Option[Map[String, JobFacet]]
facets: Option[Map[String, Any]]
)
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ package za.co.absa.spline.ol.model.openlineage.v0_3_1
case class OutputDataset (
namespace: String,
name: String,
facets: Option[Map[String, DatasetFacet]],
outputFacets: Option[Map[String, OutputDatasetFacet]]
facets: Option[Map[String, Any]],
outputFacets: Option[Map[String, Any]]
)
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ import java.util.UUID
*/
case class Run (
runId: UUID,
facets: Option[Map[String, RunFacet]]
facets: Option[Map[String, Any]]
)

0 comments on commit 84699a4

Please sign in to comment.