-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(spark-lineage): add support to custom env and platform_instance #4208
feat(spark-lineage): add support to custom env and platform_instance #4208
Conversation
…ab/datahub into spark-logging
refactor(spark-lineage): enhance logging and documentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review
@@ -14,6 +15,7 @@ | |||
|
|||
import org.apache.hadoop.conf.Configuration; | |||
import org.apache.hadoop.fs.Path; | |||
import org.apache.hive.com.esotericsoftware.minlog.Log; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like a bad import
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
try { | ||
fabricType = FabricType.valueOf(fabricTypeString); | ||
} catch (IllegalArgumentException e) { | ||
Log.warn("Invalid env ({}). Setting env to default PROD", DATASET_ENV_KEY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parameter here should be fabricTypeString
not DATASET_ENV_KEY
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
InsertIntoHadoopFsRelationCommand.class, SaveIntoDataSourceCommand.class, | ||
CreateDataSourceTableAsSelectCommand.class, CreateHiveTableAsSelectCommand.class, InsertIntoHiveTable.class); | ||
private static final String DATASET_ENV_KEY = "metadata.dataset.env"; | ||
private static final String DATSET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platform_instance"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo in constant name DATSET_
versus DATASET_
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
InsertIntoHadoopFsRelationCommand.class, SaveIntoDataSourceCommand.class, | ||
CreateDataSourceTableAsSelectCommand.class, CreateHiveTableAsSelectCommand.class, InsertIntoHiveTable.class); | ||
private static final String DATASET_ENV_KEY = "metadata.dataset.env"; | ||
private static final String DATSET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platform_instance"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest you use camel case here to align with Java / Scala style..
so metadata.dataset.platformInstance
as the config key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
CreateDataSourceTableAsSelectCommand cmd = (CreateDataSourceTableAsSelectCommand) p; | ||
// TODO what of cmd.mode() | ||
return Optional.of(new CatalogTableDataset(cmd.table())); | ||
return Optional.of(new CatalogTableDataset(cmd.table(), getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a good reason to compute the fabricType and platformInstance everytime, versus just calculating it once and using a final variable in the other method calls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to extend this further to each dataset level platforminstance. So there will be current dataset specific selection. Thats why i have not get it done at common place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would probably look a bit more clean with a true Visitor pattern rather than doing it through a map. Having this huge static initializer with a bunch of lambdas is pretty hard to read and is more error prone to missing an update to one of the lambdas.
metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/AppStartEvent.java
Outdated
Show resolved
Hide resolved
//TODO: Should map to the central location on datahub for platform names | ||
private static final Map<String, String> PLATFORM_NAME_MAPPING = new HashMap<>(); | ||
static { | ||
PLATFORM_NAME_MAPPING.put("postgresql", "postgres"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not add more mappings here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For others url to platform name is same hence have not added them there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some clean-up comments, but overall LGTM. I don't think the visitor pattern one is a blocker, but if it's quick to do would make it more readable.
|
||
| Field | Required | Default | Description | | ||
|-------------------------------------------------|----------|---------|-------------------------------------------------------------------------| | ||
| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:datahub-spark-lineage:0.8.23 | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this version still correct? Might be better to just leave it off if not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Version is not latest, but we dont update documentation on every release and put a note to check for latest version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, but it's better to just not specify a specific version then
CreateDataSourceTableAsSelectCommand cmd = (CreateDataSourceTableAsSelectCommand) p; | ||
// TODO what of cmd.mode() | ||
return Optional.of(new CatalogTableDataset(cmd.table())); | ||
return Optional.of(new CatalogTableDataset(cmd.table(), getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would probably look a bit more clean with a true Visitor pattern rather than doing it through a map. Having this huge static initializer with a bunch of lambdas is pretty hard to read and is more error prone to missing an update to one of the lambdas.
|
||
if (this.pipelineConfig.hasPath(PLATFORM_INSTANCE_KEY)) { | ||
try { | ||
DataPlatformInstance dpi = new DataPlatformInstance().setPlatform(new DataPlatformUrn("spark")).setInstance( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull repeated strings into constants
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using static inits vs visitor pattern was causios decision. Will discuss and take it up in another PR
@@ -25,6 +38,10 @@ private LineageUtils() { | |||
|
|||
} | |||
|
|||
public static Urn dataPlatformInstanceUrn(String platform, String instance) throws URISyntaxException { | |||
return new Urn("urn:li:dataPlatformInstance:(" + new DataPlatformUrn(platform).toString() + "," + instance + ")"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be done using the entityType & TupleKey constructor to avoid string manipulation and increased overhead from the Urn(String) constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
public static Urn dataPlatformInstanceUrn(String platform, String instance) throws URISyntaxException { | ||
return new Urn("urn:li:dataPlatformInstance:(" + new DataPlatformUrn(platform).toString() + "," + instance + ")"); | ||
} | ||
|
||
public static DataFlowUrn flowUrn(String master, String appName) { | ||
return new DataFlowUrn("spark", appName, master.replaceAll(":", "_").replaceAll("/", "_").replaceAll("[_]+", "_")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be done with a single replaceAll with Regex using a matcher for these three characters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
regex way seems more complicated for simple replace. this is more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's already doing regex though and this does 3 full scans through the String and creates two intermediate Strings that get committed to the String pool as extra memory... I also disagree wholeheartedly that
master.replaceAll("[:\/] | \[\]", "")
is less readable than the current approach, but really don't let this block the PR. It was more of a nitpick in the first place 😄 hopefully the compiler just figures it out anyway.
public CatalogTableDataset(String dsName) { | ||
this.urn = new DatasetUrn(new DataPlatformUrn("hive"), dsName, FabricType.PROD); | ||
public CatalogTableDataset(String dsName, String platformInstance, FabricType fabricType) { | ||
this.urn = LineageUtils.createDatasetUrn("hive", platformInstance, dsName, fabricType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be pushed up to the interface level with a call to super()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
…for spark datasets
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Checklist