Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(GE): fix dependencies for GE DataHubValidationAction, logic for s… #4347

Merged
merged 3 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@ def get_long_description():
"cryptography",
}

microsoft_common = {
"msal==1.16.0"
}
microsoft_common = {"msal==1.16.0"}

data_lake_base = {
*aws_common,
Expand All @@ -129,7 +127,7 @@ def get_long_description():
"airflow": {
"apache-airflow >= 1.10.2",
},
"great-expectations": sql_common,
"great-expectations": sql_common | {"sqllineage==1.3.3"},
# Source plugins
# PyAthena is pinned with exact version because we use private method in PyAthena
"athena": sql_common | {"PyAthena[SQLAlchemy]==2.4.1"},
Expand Down Expand Up @@ -188,7 +186,7 @@ def get_long_description():
"trino": sql_common | {"trino"},
"starburst-trino-usage": sql_common | {"trino"},
"nifi": {"requests", "packaging"},
"powerbi": {"orderedset"} | microsoft_common
"powerbi": {"orderedset"} | microsoft_common,
}

all_exclude_plugins: Set[str] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
SqlAlchemyExecutionEngine,
)
from great_expectations.validator.validator import Validator
from sqlalchemy.engine.base import Connection, Engine
from sqlalchemy.engine.url import make_url

import datahub.emitter.mce_builder as builder
Expand Down Expand Up @@ -550,13 +551,20 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
data_asset.active_batch_definition.datasource_name
),
}
sqlalchemy_uri = None
if isinstance(data_asset.execution_engine.engine, Engine):
sqlalchemy_uri = data_asset.execution_engine.engine.url
# For snowflake sqlalchemy_execution_engine.engine is actually instance of Connection
elif isinstance(data_asset.execution_engine.engine, Connection):
sqlalchemy_uri = data_asset.execution_engine.engine.engine.url

if isinstance(ge_batch_spec, SqlAlchemyDatasourceBatchSpec):
# e.g. ConfiguredAssetSqlDataConnector with splitter_method or sampling_method
schema_name = ge_batch_spec.get("schema_name")
table_name = ge_batch_spec.get("table_name")

dataset_urn = make_dataset_urn_from_sqlalchemy_uri(
data_asset.execution_engine.engine.url,
sqlalchemy_uri,
schema_name,
table_name,
self.env,
Expand Down Expand Up @@ -609,7 +617,7 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
)
for table in tables:
dataset_urn = make_dataset_urn_from_sqlalchemy_uri(
data_asset.execution_engine.engine.url,
sqlalchemy_uri,
None,
table,
self.env,
Expand Down Expand Up @@ -640,7 +648,7 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
def get_platform_instance(self, datasource_name):
if self.platform_instance_map and datasource_name in self.platform_instance_map:
return self.platform_instance_map[datasource_name]
if datasource_name:
if self.platform_instance_map:
warn(
f"Datasource {datasource_name} is not present in platform_instance_map"
)
Expand Down Expand Up @@ -680,7 +688,15 @@ def make_dataset_urn_from_sqlalchemy_uri(
for {data_platform}."
)
return None
schema_name = "{}.{}".format(url_instance.database, schema_name)
# If data platform is snowflake, we artificially lowercase the Database name.
# This is because DataHub also does this during ingestion.
# Ref: https://github.com/linkedin/datahub/blob/master/metadata-ingestion%2Fsrc%2Fdatahub%2Fingestion%2Fsource%2Fsql%2Fsnowflake.py#L272
schema_name = "{}.{}".format(
url_instance.database.lower()
if data_platform == "snowflake"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. We might want to drop a pointer to the code link in a comment here. Will append this into the PR

else url_instance.database,
schema_name,
)
elif data_platform == "bigquery":
if url_instance.host is None or url_instance.database is None:
warn(
Expand All @@ -705,6 +721,7 @@ def make_dataset_urn_from_sqlalchemy_uri(
platform_instance=platform_instance,
env=env,
)

return dataset_urn


Expand Down