Replies: 2 comments 1 reply
-
see: #307 (comment) Then to use the filter via programmatic config: spark.enableLineageTracking(
AgentConfig.builder()
.postProcessingFilter(myFilter)
.build()
) Assuming the filter is not specified elsewhere. If you need multiple filters, you would have to use a composite filter. Take a look into |
Beta Was this translation helpful? Give feedback.
0 replies
-
Hi %scala
import scala.util.parsing.json.JSON
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.agent.AgentConfig
import za.co.absa.spline.harvester.postprocessing.AbstractPostProcessingFilter
import za.co.absa.spline.harvester.postprocessing.PostProcessingFilter
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.HarvestingContext
import za.co.absa.spline.producer.model.ExecutionPlan
import za.co.absa.spline.producer.model.ExecutionEvent
import za.co.absa.spline.producer.model.ReadOperation
import za.co.absa.spline.producer.model.WriteOperation
import za.co.absa.spline.producer.model.DataOperation
import za.co.absa.spline.harvester.ExtraMetadataImplicits._
import za.co.absa.spline.harvester.SparkLineageInitializer._
val notebookInformationJson = dbutils.notebook.getContext.toJson
val outerMap = JSON.parseFull(notebookInformationJson).getOrElse(0).asInstanceOf[Map[String,String]]
val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]
val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
val notebookPath = extraContextMap("notebook_path").split("/")
val workspaceUrl=tagMap("browserHostName")
val workspaceName=dbutils.notebook().getContext().notebookPath.get
val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")+tagMap("browserHash")
val user = tagMap("user")
val name = notebookPath(notebookPath.size-1)
val notebookInfo = Map("notebookURL" -> notebookURL,
"user" -> user,
"workspaceName" ->workspaceName,
"workspaceUrl" -> workspaceUrl,
"name" -> name,
"mounts" -> dbutils.fs.ls("/FileStore/tables").map(_.path),
"timestamp" -> System.currentTimeMillis)
val notebookInfoJson = scala.util.parsing.json.JSONObject(notebookInfo)
class CustomFilter extends PostProcessingFilter {
def this(conf: Configuration) = this()
override def processExecutionEvent(event: ExecutionEvent, ctx: HarvestingContext): ExecutionEvent =
event.withAddedExtra(Map("foo" -> "bar"))
override def processExecutionPlan(plan: ExecutionPlan, ctx: HarvestingContext ): ExecutionPlan =
plan.withAddedExtra(Map( "notebookInfo" -> notebookInfoJson))
override def processReadOperation(op: ReadOperation, ctx: HarvestingContext ): ReadOperation =
op.withAddedExtra(Map("foo" -> "bar"))
override def processWriteOperation(op: WriteOperation, ctx: HarvestingContext): WriteOperation =
op.withAddedExtra(Map("foo" -> "bar"))
override def processDataOperation(op: DataOperation, ctx: HarvestingContext ): DataOperation =
op.withAddedExtra(Map("foo" -> "bar"))
}
val myInstance = new CustomFilter()
spark.enableLineageTracking(
AgentConfig.builder()
.postProcessingFilter(myInstance)
.build()
) |
Beta Was this translation helpful? Give feedback.
1 reply
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hi
i have this code to get details of notebook into the spline aget:
However from version 1.0.0 this is not supported
do you have a way to re right it that will be supported on 1.0.0
Beta Was this translation helpful? Give feedback.
All reactions