Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(snowflake): support snowflake allow/deny pattern for lineage and usage #3748

Merged
merged 1 commit into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions metadata-ingestion/source_docs/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,23 @@ sink:

Note that a `.` is used to denote nested fields in the YAML recipe.

| Field | Required | Default | Description |
| ----------------------------- | -------- | -------------------------------------------------------------- | --------------------------------------------------------------- |
| `username` | | | Snowflake username. |
| `password` | | | Snowflake password. |
| `host_port` | ✅ | | Snowflake host URL. |
| `warehouse` | | | Snowflake warehouse. |
| `role` | | | Snowflake role. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `bucket_duration` | | `"DAY"` | Duration to bucket usage events by. Can be `"DAY"` or `"HOUR"`. |
| `start_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Earliest date of usage logs to consider. |
| `end_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Latest date of usage logs to consider. |
| `top_n_queries` | | `10` | Number of top queries to save to each table. |
| `apply_view_usage_to_tables` | | False | Attribute usage of views to the underlying table. |

| Field | Required | Default | Description |
| ----------------- | -------- | --------------------------------------------------------------------| --------------------------------------------------------------- |
| `username` | | | Snowflake username. |
| `password` | | | Snowflake password. |
| `host_port` | ✅ | | Snowflake host URL. |
| `warehouse` | | | Snowflake warehouse. |
| `role` | | | Snowflake role. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `bucket_duration` | | `"DAY"` | Duration to bucket usage events by. Can be `"DAY"` or `"HOUR"`. |
| `start_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Earliest date of usage logs to consider. |
| `end_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Latest date of usage logs to consider. |
| `top_n_queries` | | `10` | Number of top queries to save to each table. |
| `database_pattern`| | `"^UTIL_DB$" `<br />`"^SNOWFLAKE$"`<br />`"^SNOWFLAKE_SAMPLE_DATA$" | Allow/deny patterns for db in snowflake dataset names. |
| `schema_pattern` | | | Allow/deny patterns for schema in snowflake dataset names. |
| `view_pattern` | | | Allow/deny patterns for views in snowflake dataset names. |
| `table_pattern` | | | Allow/deny patterns for tables in snowflake dataset names. |
### Compatibility

Coming soon!
Expand Down
22 changes: 20 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ def _get_upstream_lineage_info(
for lineage_entry in lineage:
# Update the table-lineage
upstream_table_name = lineage_entry[0]
if not self._is_dataset_allowed(upstream_table_name):
continue
upstream_table = UpstreamClass(
dataset=builder.make_dataset_urn(
self.platform, upstream_table_name, self.config.env
Expand All @@ -229,8 +231,9 @@ def _get_upstream_lineage_info(
)
column_lineage[column_lineage_key] = column_lineage_value
logger.debug(f"{column_lineage_key}:{column_lineage_value}")

return UpstreamLineage(upstreams=upstream_tables), column_lineage
if upstream_tables:
return UpstreamLineage(upstreams=upstream_tables), column_lineage
return None

# Override the base class method.
def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
Expand Down Expand Up @@ -288,3 +291,18 @@ def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:

# Emit the work unit from super.
yield wu

def _is_dataset_allowed(self, dataset_name: Optional[str]) -> bool:
# View lineages is not supported. Add the allow/deny pattern for that when it is supported.
if dataset_name is None:
return True
dataset_params = dataset_name.split(".")
if len(dataset_params) != 3:
return True
if (
not self.config.database_pattern.allowed(dataset_params[0])
or not self.config.schema_pattern.allowed(dataset_params[1])
or not self.config.table_pattern.allowed(dataset_params[2])
):
return False
return True
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from sqlalchemy.engine import Engine

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import get_time_bucket
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
Expand Down Expand Up @@ -95,6 +96,12 @@ class SnowflakeJoinedAccessEvent(PermissiveModel):
class SnowflakeUsageConfig(BaseSnowflakeConfig, BaseUsageConfig):
env: str = builder.DEFAULT_ENV
options: dict = {}
database_pattern: AllowDenyPattern = AllowDenyPattern(
deny=[r"^UTIL_DB$", r"^SNOWFLAKE$", r"^SNOWFLAKE_SAMPLE_DATA$"]
)
schema_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
view_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
apply_view_usage_to_tables: bool = False

@pydantic.validator("role", always=True)
Expand Down Expand Up @@ -167,15 +174,57 @@ def is_unsupported_object_accessed(obj: Dict[str, Any]) -> bool:
unsupported_keys = ["locations"]
return any([obj.get(key) is not None for key in unsupported_keys])

def is_dataset_pattern_allowed(
dataset_name: Optional[Any], dataset_type: Optional[Any]
) -> bool:
# TODO: support table/view patterns for usage logs by pulling that information as well from the usage query
if not dataset_type or not dataset_name:
return True

table_or_view_pattern: Optional[
AllowDenyPattern
] = AllowDenyPattern.allow_all()
# Test domain type = external_table and then add it
table_or_view_pattern = (
self.config.table_pattern
if dataset_type.lower() in {"table"}
else (
self.config.view_pattern
if dataset_type.lower() in {"view", "materialized_view"}
else None
)
)
if table_or_view_pattern is None:
return True

dataset_params = dataset_name.split(".")
assert len(dataset_params) == 3
if (
not self.config.database_pattern.allowed(dataset_params[0])
or not self.config.schema_pattern.allowed(dataset_params[1])
or not table_or_view_pattern.allowed(dataset_params[2])
):
return False
return True

def is_object_valid(obj: Dict[str, Any]) -> bool:
if is_unsupported_object_accessed(
obj
) or not is_dataset_pattern_allowed(
obj.get("objectName"), obj.get("objectDomain")
):
return False
return True

event_dict["base_objects_accessed"] = [
obj
for obj in json.loads(event_dict["base_objects_accessed"])
if not is_unsupported_object_accessed(obj)
if is_object_valid(obj)
]
event_dict["direct_objects_accessed"] = [
obj
for obj in json.loads(event_dict["direct_objects_accessed"])
if not is_unsupported_object_accessed(obj)
if is_object_valid(obj)
]
event_dict["query_start_time"] = (
event_dict["query_start_time"]
Expand All @@ -202,15 +251,13 @@ def _aggregate_access_events(
event.query_start_time, self.config.bucket_duration
)

accessed_data = []
if self.config.apply_view_usage_to_tables:
accessed_data = event.base_objects_accessed
else:
accessed_data = event.direct_objects_accessed

accessed_data = (
event.base_objects_accessed
if self.config.apply_view_usage_to_tables
else event.direct_objects_accessed
)
for object in accessed_data:
resource = object.objectName

agg_bucket = datasets[floored_ts].setdefault(
resource,
AggregatedDataset(bucket_start_time=floored_ts, resource=resource),
Expand Down