From d62c77f70abc284e9256500b034bd32a02945c9c Mon Sep 17 00:00:00 2001 From: Mugdha Hardikar Date: Tue, 28 Jun 2022 15:48:09 +0530 Subject: [PATCH] fix(ingest): delta-lake - fix dependency issue for snowflake due to s3_util (#5274) --- .../ingestion/source/aws/s3_boto_utils.py | 101 ++++++++++++++++++ .../ingestion/source/delta_lake/source.py | 3 +- .../src/datahub/ingestion/source/s3/source.py | 3 +- 3 files changed, 103 insertions(+), 4 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/aws/s3_boto_utils.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/s3_boto_utils.py b/metadata-ingestion/src/datahub/ingestion/source/aws/s3_boto_utils.py new file mode 100644 index 00000000000000..caec19d0fb2492 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/s3_boto_utils.py @@ -0,0 +1,101 @@ +import logging +from typing import Iterable, Optional + +from datahub.emitter.mce_builder import make_tag_urn +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.aws.aws_common import AwsSourceConfig +from datahub.ingestion.source.aws.s3_util import ( + get_bucket_name, + get_bucket_relative_path, + is_s3_uri, +) +from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass + +logging.getLogger("py4j").setLevel(logging.ERROR) +logger: logging.Logger = logging.getLogger(__name__) + + +def get_s3_tags( + bucket_name: str, + key_name: Optional[str], + dataset_urn: str, + aws_config: Optional[AwsSourceConfig], + ctx: PipelineContext, + use_s3_bucket_tags: Optional[bool] = False, + use_s3_object_tags: Optional[bool] = False, +) -> Optional[GlobalTagsClass]: + if aws_config is None: + raise ValueError("aws_config not set. Cannot browse s3") + new_tags = GlobalTagsClass(tags=[]) + tags_to_add = [] + if use_s3_bucket_tags: + s3 = aws_config.get_s3_resource() + bucket = s3.Bucket(bucket_name) + try: + tags_to_add.extend( + [ + make_tag_urn(f"""{tag["Key"]}:{tag["Value"]}""") + for tag in bucket.Tagging().tag_set + ] + ) + except s3.meta.client.exceptions.ClientError: + logger.warn(f"No tags found for bucket={bucket_name}") + + if use_s3_object_tags and key_name is not None: + s3_client = aws_config.get_s3_client() + object_tagging = s3_client.get_object_tagging(Bucket=bucket_name, Key=key_name) + tag_set = object_tagging["TagSet"] + if tag_set: + tags_to_add.extend( + [make_tag_urn(f"""{tag["Key"]}:{tag["Value"]}""") for tag in tag_set] + ) + else: + # Unlike bucket tags, if an object does not have tags, it will just return an empty array + # as opposed to an exception. + logger.warn(f"No tags found for bucket={bucket_name} key={key_name}") + if len(tags_to_add) == 0: + return None + if ctx.graph is not None: + logger.debug("Connected to DatahubApi, grabbing current tags to maintain.") + current_tags: Optional[GlobalTagsClass] = ctx.graph.get_aspect_v2( + entity_urn=dataset_urn, + aspect="globalTags", + aspect_type=GlobalTagsClass, + ) + if current_tags: + tags_to_add.extend([current_tag.tag for current_tag in current_tags.tags]) + else: + logger.warn("Could not connect to DatahubApi. No current tags to maintain") + # Remove duplicate tags + tags_to_add = list(set(tags_to_add)) + new_tags = GlobalTagsClass( + tags=[TagAssociationClass(tag_to_add) for tag_to_add in tags_to_add] + ) + return new_tags + + +def list_folders_path( + s3_uri: str, aws_config: Optional[AwsSourceConfig] +) -> Iterable[str]: + if not is_s3_uri(s3_uri): + raise ValueError("Not a s3 URI: " + s3_uri) + if aws_config is None: + raise ValueError("aws_config not set. Cannot browse s3") + bucket_name = get_bucket_name(s3_uri) + prefix = get_bucket_relative_path(s3_uri) + yield from list_folders(bucket_name, prefix, aws_config) + + +def list_folders( + bucket_name: str, prefix: str, aws_config: Optional[AwsSourceConfig] +) -> Iterable[str]: + if aws_config is None: + raise ValueError("aws_config not set. Cannot browse s3") + s3_client = aws_config.get_s3_client() + paginator = s3_client.get_paginator("list_objects_v2") + for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix, Delimiter="/"): + for o in page.get("CommonPrefixes", []): + folder: str = str(o.get("Prefix")) + if folder.endswith("/"): + folder = folder[:-1] + yield f"{folder}" diff --git a/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py b/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py index c292c16764ec3a..42837ac833b09b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py @@ -19,11 +19,10 @@ ) from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.aws.s3_boto_utils import get_s3_tags, list_folders_path from datahub.ingestion.source.aws.s3_util import ( get_bucket_name, get_key_prefix, - get_s3_tags, - list_folders_path, strip_s3_prefix, ) from datahub.ingestion.source.data_lake.data_lake_utils import ContainerWUCreator diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index 4e1dda628f1dad..6e66dcc3d84167 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -51,12 +51,11 @@ ) from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.aws.s3_boto_utils import get_s3_tags, list_folders from datahub.ingestion.source.aws.s3_util import ( get_bucket_name, get_bucket_relative_path, get_key_prefix, - get_s3_tags, - list_folders, strip_s3_prefix, ) from datahub.ingestion.source.data_lake.data_lake_utils import ContainerWUCreator