Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): Glue - Support for domains and containers #4110

Merged
merged 6 commits into from
Feb 16, 2022

Conversation

treff7es
Copy link
Contributor

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable)

@github-actions
Copy link

github-actions bot commented Feb 10, 2022

Unit Test Results (metadata ingestion)

    3 files  ±  0      3 suites  ±0   36m 46s ⏱️ - 6m 16s
317 tests +  5  317 ✔️ +5    0 💤 ±0  0 ±0 
908 runs  +12  879 ✔️ +3  29 💤 +9  0 ±0 

Results for commit 580a724. ± Comparison against base commit 9bdc9af.

This pull request removes 1 and adds 6 tests. Note that renamed tests count towards both.
tests.unit.test_utilities ‑ test_groupby_unsorted
tests.integration.kafka.test_kafka_state ‑ test_kafka_ingest_with_stateful
tests.unit.stateful_ingestion.test_kafka_state ‑ test_kafka_common_state
tests.unit.test_athena_source ‑ test_athena_get_table_properties
tests.unit.test_athena_source ‑ test_athena_uri
tests.unit.test_kafka_source.KafkaSourceTest ‑ test_kafka_source_stateful_ingestion_requires_platform_instance
tests.unit.test_kafka_source.KafkaSourceTest ‑ test_kafka_source_workunits_with_platform_instance

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Feb 10, 2022

Unit Test Results (build & test)

  70 files  +1    70 suites  +1   14m 47s ⏱️ ±0s
609 tests +5  550 ✔️ +5  59 💤 ±0  0 ±0 

Results for commit 580a724. ± Comparison against base commit 9bdc9af.

♻️ This comment has been updated with latest results.

@@ -54,6 +63,7 @@ class GlueSourceConfig(AwsSourceConfig):
ignore_unsupported_connectors: Optional[bool] = True
emit_s3_lineage: bool = False
glue_s3_lineage_direction: str = "upstream"
domain: Dict[str, AllowDenyPattern] = dict()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name this domain_pattern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the name in the other sources and I wanted to keep it consistent.

@@ -523,8 +533,66 @@ def get_lineage_if_enabled(
return mcp
return None

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
def gen_database_key(self, database: str) -> PlatformKey:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return type annotation looks wrong!
s/PlatformKey/DatabaseKey?


for domain, pattern in self.source_config.domain.items():
if pattern.allowed(dataset_name):
domain_urn = make_domain_urn(domain)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return make_domain_urn(domain)

if pattern.allowed(dataset_name):
domain_urn = make_domain_urn(domain)

return domain_urn
Copy link
Contributor

@rslanka rslanka Feb 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return None


def _get_domain_wu(
self, dataset_name: str, entity_urn: str, entity_type: str
) -> Iterable[Union[MetadataWorkUnit]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Union seems redundant here.

flow_names: Dict[str, str] = {}

for job in self.get_all_jobs():
def _transform_extraction(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing return type annotation.


for job in self.get_all_jobs():
def _transform_extraction(self):
dags = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type annotation?

# in Glue, it's possible for two buckets to have files of different extensions
# if this happens, we append the extension in the URN so the sources can be distinguished
# see process_dataflow_node() for details
s3_formats: typing.DefaultDict[str, Set[Union[str, None]]] = defaultdict(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set[Optional[str]]

self.report.report_workunit(wu)
yield wu

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has become a 100 line function now. Refactor into smaller functions?

@@ -1,4 +1,43 @@
[
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have the test coverage numbers remained the same or dropped after this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It remained the same==86%.

@@ -89,7 +90,7 @@ def get_session(self) -> Session:
region_name=self.aws_region,
)
else:
return Session(region_name=self.aws_region)
return Session(region_name=self.aws_region, profile_name=self.aws_profile)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the profile_name param supported by our min version of boto3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is there for 7 years, so I think we should be fine


def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(database)

Copy link
Contributor

@rslanka rslanka Feb 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: delete empty line.

Copy link
Contributor

@rslanka rslanka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Copy link
Contributor

@shirshanka shirshanka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@shirshanka shirshanka merged commit b266491 into datahub-project:master Feb 16, 2022
hevandro-veiga pushed a commit to hevandro-veiga/datahub that referenced this pull request Feb 18, 2022
…ect#4110)

* Add container and domain support for Glue.
Adding option to set aws profile for Glue.

* Adding domain doc for Glue

* Making get_workunits less complex

* Updating golden file

* Addressing pr review comments

* Remove unneded empty line
maggiehays pushed a commit to maggiehays/datahub that referenced this pull request Aug 1, 2022
…ect#4110)

* Add container and domain support for Glue.
Adding option to set aws profile for Glue.

* Adding domain doc for Glue

* Making get_workunits less complex

* Updating golden file

* Addressing pr review comments

* Remove unneded empty line
@treff7es treff7es deleted the glue_domains_containers branch February 8, 2023 11:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants