Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spark-lineage): add support to custom env and platform_instance #4208

Merged
merged 23 commits into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
09441d3
fix(spark-lineage): select mock server port dynamically for unit test
MugdhaHardikar-GSLab Jan 31, 2022
cac9e4b
refactor(spark-lineage): remove dependency of spark from McpEmitter
MugdhaHardikar-GSLab Feb 2, 2022
6f64a20
fix(spark-lineage): fix checkstyle issues
MugdhaHardikar-GSLab Feb 3, 2022
8270c24
fix(spark-lineage): restore consumers
MugdhaHardikar-GSLab Feb 3, 2022
dffa6c4
Merge branch 'linkedin:master' into master
MugdhaHardikar-GSLab Feb 10, 2022
1675dfe
refactor(spark-lineage): enhance logging and documentation
MugdhaHardikar-GSLab Feb 10, 2022
d6583fc
Update McpEmitter.java
MugdhaHardikar-GSLab Feb 10, 2022
8e9291c
add debugging section
MugdhaHardikar-GSLab Feb 10, 2022
7ba4ddf
refactor(spark-lineage): enhance logs
MugdhaHardikar-GSLab Feb 10, 2022
d396664
Merge branch 'spark-logging' of https://github.com/MugdhaHardikar-GSL…
MugdhaHardikar-GSLab Feb 10, 2022
08b51d7
Merge pull request #1 from MugdhaHardikar-GSLab/spark-logging
MugdhaHardikar-GSLab Feb 10, 2022
2c54135
Add https and auth token support
MugdhaHardikar-GSLab Feb 16, 2022
7b8ae31
Merge branch 'linkedin:master' into master
MugdhaHardikar-GSLab Feb 21, 2022
bde1db6
Update README.md
MugdhaHardikar-GSLab Feb 18, 2022
bf11982
feat(spark-lineage): add support for custom dataset env
MugdhaHardikar-GSLab Feb 18, 2022
d66006e
refactor(spark-lineage): improve jdbc url parsing
MugdhaHardikar-GSLab Feb 21, 2022
e8adce2
feat(spark-lineage): add support for pipeline level 'platform_instance'
MugdhaHardikar-GSLab Feb 18, 2022
549f8aa
Update README.md
MugdhaHardikar-GSLab Feb 18, 2022
5169ebf
feat(spark-lineage): support platform instance at dataset level
MugdhaHardikar-GSLab Feb 22, 2022
8044010
Update README.md
MugdhaHardikar-GSLab Feb 22, 2022
713e46d
fix checkstyle
MugdhaHardikar-GSLab Feb 22, 2022
73a695c
refactor(spark-lineage): change config key name for platform instance
MugdhaHardikar-GSLab Feb 23, 2022
c82e4fb
refactor(spark-lineage): extract common code at abstract class level …
MugdhaHardikar-GSLab Mar 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ project.ext.externalDependency = [
'neo4jJavaDriver': 'org.neo4j.driver:neo4j-java-driver:4.0.1',
'opentelemetryApi': 'io.opentelemetry:opentelemetry-api:1.0.0',
'opentelemetryAnnotations': 'io.opentelemetry:opentelemetry-extension-annotations:1.0.0',
'opentracingJdbc':'io.opentracing.contrib:opentracing-jdbc:0.2.15',
'parseqTest': 'com.linkedin.parseq:parseq:3.0.7:test',
'parquet': 'org.apache.parquet:parquet-avro:1.12.0',
'picocli': 'info.picocli:picocli:4.5.0',
Expand Down Expand Up @@ -137,6 +138,7 @@ project.ext.externalDependency = [
'typesafeConfig':'com.typesafe:config:1.4.1',
'wiremock':'com.github.tomakehurst:wiremock:2.10.0',
'zookeeper': 'org.apache.zookeeper:zookeeper:3.4.14'

]

allprojects {
Expand Down
19 changes: 12 additions & 7 deletions metadata-integration/java/spark-lineage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,19 @@ spark = SparkSession.builder()
.enableHiveSupport()
.getOrCreate();
```

### Enable https and authentication token
Add below config in spark config

```
spark.datahub.rest.server https://<server URL>
spark.datahub.rest.token <token>
```
### Configuration details

| Field | Required | Default | Description |
|-------------------------------------------------|----------|---------|-------------------------------------------------------------------------|
| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:datahub-spark-lineage:0.8.23 |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this version still correct? Might be better to just leave it off if not

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Version is not latest, but we dont update documentation on every release and put a note to check for latest version

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but it's better to just not specify a specific version then

| spark.extraListeners | ✅ | | datahub.spark.DatahubSparkListener |
| spark.datahub.rest.server | ✅ | | Datahub server url eg:http://localhost:8080 |
| spark.datahub.rest.token | | | Authentication token. |
| spark.datahub.metadata.pipeline.platformInstance| | | Pipeline level platform instance |
| spark.datahub.metadata.dataset.platformInstance| | | dataset level platform instance |
| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD |


## What to Expect: The Metadata Model

Expand Down
2 changes: 2 additions & 0 deletions metadata-integration/java/spark-lineage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ dependencies {
annotationProcessor externalDependency.lombok

implementation externalDependency.typesafeConfig
implementation externalDependency.opentracingJdbc

implementation project(path: ':metadata-integration:java:datahub-client', configuration: 'shadow')

Expand Down Expand Up @@ -146,6 +147,7 @@ shadowJar {
relocate 'org.apache.commons.codec', 'datahub.spark2.shaded.o.a.c.codec'
relocate 'mozilla', 'datahub.spark2.shaded.mozilla'
relocate 'com.typesafe','datahub.spark2.shaded.typesafe'
relocate 'io.opentracing','datahub.spark2.shaded.io.opentracing'
finalizedBy checkShadowJar
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,9 @@
package datahub.spark;

import com.google.common.base.Splitter;
import datahub.spark.model.LineageUtils;
import datahub.spark.model.AppEndEvent;
import datahub.spark.model.AppStartEvent;
import datahub.spark.model.DatasetLineage;
import datahub.spark.model.LineageConsumer;
import datahub.spark.model.SQLQueryExecEndEvent;
import datahub.spark.model.SQLQueryExecStartEvent;
import datahub.spark.model.dataset.SparkDataset;
import datahub.spark.consumer.impl.McpEmitter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -26,7 +15,7 @@
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import lombok.extern.slf4j.Slf4j;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkEnv;
Expand All @@ -41,13 +30,25 @@
import org.apache.spark.sql.execution.SQLExecution;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;

import com.google.common.base.Splitter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.typesafe.config.Config;

import datahub.spark.consumer.impl.McpEmitter;
import datahub.spark.model.AppEndEvent;
import datahub.spark.model.AppStartEvent;
import datahub.spark.model.DatasetLineage;
import datahub.spark.model.LineageConsumer;
import datahub.spark.model.LineageUtils;
import datahub.spark.model.SQLQueryExecEndEvent;
import datahub.spark.model.SQLQueryExecStartEvent;
import datahub.spark.model.dataset.SparkDataset;
import lombok.extern.slf4j.Slf4j;
import scala.collection.JavaConversions;
import scala.runtime.AbstractFunction1;
import scala.runtime.AbstractPartialFunction;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;



@Slf4j
Expand All @@ -56,11 +57,15 @@ public class DatahubSparkListener extends SparkListener {
private static final int THREAD_CNT = 16;
public static final String CONSUMER_TYPE_KEY = "spark.datahub.lineage.consumerTypes";
public static final String DATAHUB_EMITTER = "mcpEmitter";

public static final String DATABRICKS_CLUSTER_KEY = "databricks.cluster";
public static final String PIPELINE_KEY = "metadata.pipeline";
public static final String PIPELINE_PLATFORM_INSTANCE_KEY = PIPELINE_KEY + ".platformInstance";

private final Map<String, AppStartEvent> appDetails = new ConcurrentHashMap<>();
private final Map<String, Map<Long, SQLQueryExecStartEvent>> appSqlDetails = new ConcurrentHashMap<>();
private final Map<String, ExecutorService> appPoolDetails = new ConcurrentHashMap<>();
private final Map<String, McpEmitter> appEmitters = new ConcurrentHashMap<>();
private final Map<String, Config> appConfig = new ConcurrentHashMap<>();

public DatahubSparkListener() {
log.info("DatahubSparkListener initialised.");
Expand All @@ -80,17 +85,16 @@ public SqlStartTask(SparkListenerSQLExecutionStart sqlStart, LogicalPlan plan, S

@Override
public void run() {
appSqlDetails.get(ctx.appName())
appSqlDetails.get(ctx.applicationId())
.put(sqlStart.executionId(),
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), ctx.appName(), ctx.applicationId(),
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), null));
log.debug("PLAN for execution id: " + ctx.appName() + ":" + sqlStart.executionId() + "\n");
log.debug("PLAN for execution id: " + getPipelineName(ctx) + ":" + sqlStart.executionId() + "\n");
log.debug(plan.toString());

DatasetExtractor extractor = new DatasetExtractor();
Optional<? extends SparkDataset> outputDS = extractor.asDataset(plan, ctx, true);
Optional<? extends SparkDataset> outputDS = DatasetExtractor.asDataset(plan, ctx, true);
if (!outputDS.isPresent()) {
log.debug("Skipping execution as no output dataset present for execution id: " + ctx.appName() + ":"
log.debug("Skipping execution as no output dataset present for execution id: " + ctx.applicationId() + ":"
+ sqlStart.executionId());
return;
}
Expand All @@ -103,7 +107,7 @@ public void run() {
@Override
public Void apply(LogicalPlan plan) {
log.debug("CHILD " + plan.getClass() + "\n" + plan + "\n-------------\n");
Optional<? extends SparkDataset> inputDS = extractor.asDataset(plan, ctx, false);
Optional<? extends SparkDataset> inputDS = DatasetExtractor.asDataset(plan, ctx, false);
inputDS.ifPresent(x -> lineage.addSource(x));
allInners.addAll(JavaConversions.asJavaCollection(plan.innerChildren()));
return null;
Expand All @@ -126,7 +130,7 @@ public boolean isDefinedAt(LogicalPlan x) {
@Override
public Void apply(LogicalPlan plan) {
log.debug("INNER CHILD " + plan.getClass() + "\n" + plan + "\n-------------\n");
Optional<? extends SparkDataset> inputDS = extractor.asDataset(plan, ctx, false);
Optional<? extends SparkDataset> inputDS = DatasetExtractor.asDataset(plan, ctx, false);
inputDS.ifPresent(
x -> log.debug("source added for " + ctx.appName() + "/" + sqlStart.executionId() + ": " + x));
inputDS.ifPresent(x -> lineage.addSource(x));
Expand All @@ -141,12 +145,12 @@ public boolean isDefinedAt(LogicalPlan x) {
}

SQLQueryExecStartEvent evt =
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), ctx.appName(), ctx.applicationId(),
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), lineage);

appSqlDetails.get(ctx.appName()).put(sqlStart.executionId(), evt);
appSqlDetails.get(ctx.applicationId()).put(sqlStart.executionId(), evt);

McpEmitter emitter = appEmitters.get(ctx.appName());
McpEmitter emitter = appEmitters.get(ctx.applicationId());
if (emitter != null) {
emitter.accept(evt);
}
Expand All @@ -157,14 +161,6 @@ public boolean isDefinedAt(LogicalPlan x) {
}
}

private Config parseSparkConfig() {
SparkConf conf = SparkEnv.get().conf();
String propertiesString = Arrays.stream(conf.getAllWithPrefix("spark.datahub."))
.map(tup -> tup._1 + "= \"" + tup._2 + "\"")
.collect(Collectors.joining("\n"));
return ConfigFactory.parseString(propertiesString);
}

@Override
public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
try {
Expand All @@ -173,18 +169,7 @@ public void onApplicationStart(SparkListenerApplicationStart applicationStart) {

@Override
public Void apply(SparkContext sc) {
String appId = applicationStart.appId().isDefined() ? applicationStart.appId().get() : "";
AppStartEvent evt =
new AppStartEvent(LineageUtils.getMaster(sc), applicationStart.appName(), appId, applicationStart.time(),
applicationStart.sparkUser());
Config datahubConf = parseSparkConfig();
appEmitters.computeIfAbsent(applicationStart.appName(), s -> new McpEmitter(datahubConf)).accept(evt);
consumers().forEach(c -> c.accept(evt));

appDetails.put(applicationStart.appName(), evt);
appSqlDetails.put(applicationStart.appName(), new ConcurrentHashMap<>());
ExecutorService pool = Executors.newFixedThreadPool(THREAD_CNT);
appPoolDetails.put(applicationStart.appName(), pool);
getOrCreateApplicationSetup(sc);
return null;
}
});
Expand All @@ -207,21 +192,21 @@ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
@Override
public Void apply(SparkContext sc) {
log.info("Application ended : {} {}", sc.appName(), sc.applicationId());
AppStartEvent start = appDetails.remove(sc.appName());
appPoolDetails.remove(sc.appName()).shutdown();
appSqlDetails.remove(sc.appName());
AppStartEvent start = appDetails.remove(sc.applicationId());
appPoolDetails.remove(sc.applicationId()).shutdown();
appSqlDetails.remove(sc.applicationId());
if (start == null) {
log.error("Application end event received, but start event missing for appId " + sc.applicationId());
} else {
AppEndEvent evt = new AppEndEvent(LineageUtils.getMaster(sc), sc.appName(), sc.applicationId(),
AppEndEvent evt = new AppEndEvent(LineageUtils.getMaster(sc), getPipelineName(sc), sc.applicationId(),
applicationEnd.time(), start);

McpEmitter emitter = appEmitters.get(sc.appName());
McpEmitter emitter = appEmitters.get(sc.applicationId());
if (emitter != null) {
emitter.accept(evt);
try {
emitter.close();
appEmitters.remove(sc.appName());
appEmitters.remove(sc.applicationId());
} catch (Exception e) {
log.warn("Failed to close underlying emitter due to {}", e.getMessage());
}
Expand Down Expand Up @@ -276,7 +261,7 @@ public void processExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd) {

@Override
public Void apply(SparkContext sc) {
SQLQueryExecStartEvent start = appSqlDetails.get(sc.appName()).remove(sqlEnd.executionId());
SQLQueryExecStartEvent start = appSqlDetails.get(sc.applicationId()).remove(sqlEnd.executionId());
if (start == null) {
log.error(
"Execution end event received, but start event missing for appId/sql exec Id " + sc.applicationId() + ":"
Expand All @@ -285,7 +270,7 @@ public Void apply(SparkContext sc) {
SQLQueryExecEndEvent evt =
new SQLQueryExecEndEvent(LineageUtils.getMaster(sc), sc.appName(), sc.applicationId(), sqlEnd.time(),
sqlEnd.executionId(), start);
McpEmitter emitter = appEmitters.get(sc.appName());
McpEmitter emitter = appEmitters.get(sc.applicationId());
if (emitter != null) {
emitter.accept(evt);
}
Expand All @@ -294,8 +279,50 @@ public Void apply(SparkContext sc) {
}
});
}

private synchronized ExecutorService getOrCreateApplicationSetup(SparkContext ctx) {

ExecutorService pool = null;
String appId = ctx.applicationId();
Config datahubConfig = appConfig.get(appId);
if (datahubConfig == null) {
Config datahubConf = LineageUtils.parseSparkConfig();
appConfig.put(appId, datahubConf);
Config pipelineConfig = datahubConf.hasPath(PIPELINE_KEY) ? datahubConf.getConfig(PIPELINE_KEY) : com.typesafe.config.ConfigFactory.empty();
AppStartEvent evt = new AppStartEvent(LineageUtils.getMaster(ctx), getPipelineName(ctx), appId, ctx.startTime(),
ctx.sparkUser(), pipelineConfig);

appEmitters.computeIfAbsent(appId, s -> new McpEmitter(datahubConf)).accept(evt);
consumers().forEach(c -> c.accept(evt));
appDetails.put(appId, evt);
appSqlDetails.put(appId, new ConcurrentHashMap<>());
pool = Executors.newFixedThreadPool(THREAD_CNT,
new ThreadFactoryBuilder().setNameFormat("datahub-emit-pool").build());
appPoolDetails.put(appId, pool);
log.debug("Execution thread pool initialised for {}", appId);
} else {
pool = appPoolDetails.get(appId);
}

return pool;

}

private String getPipelineName(SparkContext cx) {
Config datahubConfig = appConfig.computeIfAbsent(cx.applicationId(), s -> LineageUtils.parseSparkConfig());
String name = "";
if (datahubConfig.hasPath(DATABRICKS_CLUSTER_KEY)) {
name = datahubConfig.getString(DATABRICKS_CLUSTER_KEY) + "_" + cx.applicationId();
}
name = cx.appName();
//TODO: appending of platform instance needs to be done at central location like adding constructor to dataflowurl
if (datahubConfig.hasPath(PIPELINE_PLATFORM_INSTANCE_KEY)) {
name = datahubConfig.getString(PIPELINE_PLATFORM_INSTANCE_KEY) + "." + name;
}
return name;
}


// TODO sqlEvt.details() unused
private void processExecution(SparkListenerSQLExecutionStart sqlStart) {
QueryExecution queryExec = SQLExecution.getQueryExecution(sqlStart.executionId());
if (queryExec == null) {
Expand All @@ -306,7 +333,7 @@ private void processExecution(SparkListenerSQLExecutionStart sqlStart) {
LogicalPlan plan = queryExec.optimizedPlan();
SparkSession sess = queryExec.sparkSession();
SparkContext ctx = sess.sparkContext();
ExecutorService pool = appPoolDetails.get(ctx.appName());
ExecutorService pool = getOrCreateApplicationSetup(ctx);
pool.execute(new SqlStartTask(sqlStart, plan, ctx));
}
private List<LineageConsumer> consumers() {
Expand All @@ -320,4 +347,4 @@ private List<LineageConsumer> consumers() {
}

}
}
}
Loading