-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest): Glue - Support for domains and containers #4110
feat(ingest): Glue - Support for domains and containers #4110
Conversation
Adding option to set aws profile for Glue.
Unit Test Results (metadata ingestion) 3 files ± 0 3 suites ±0 36m 46s ⏱️ - 6m 16s Results for commit 580a724. ± Comparison against base commit 9bdc9af. This pull request removes 1 and adds 6 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
@@ -54,6 +63,7 @@ class GlueSourceConfig(AwsSourceConfig): | |||
ignore_unsupported_connectors: Optional[bool] = True | |||
emit_s3_lineage: bool = False | |||
glue_s3_lineage_direction: str = "upstream" | |||
domain: Dict[str, AllowDenyPattern] = dict() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name this domain_pattern
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the name in the other sources and I wanted to keep it consistent.
@@ -523,8 +533,66 @@ def get_lineage_if_enabled( | |||
return mcp | |||
return None | |||
|
|||
def get_workunits(self) -> Iterable[MetadataWorkUnit]: | |||
def gen_database_key(self, database: str) -> PlatformKey: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return type annotation looks wrong!
s/PlatformKey/DatabaseKey?
|
||
for domain, pattern in self.source_config.domain.items(): | ||
if pattern.allowed(dataset_name): | ||
domain_urn = make_domain_urn(domain) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return make_domain_urn(domain)
if pattern.allowed(dataset_name): | ||
domain_urn = make_domain_urn(domain) | ||
|
||
return domain_urn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return None
|
||
def _get_domain_wu( | ||
self, dataset_name: str, entity_urn: str, entity_type: str | ||
) -> Iterable[Union[MetadataWorkUnit]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Union seems redundant here.
flow_names: Dict[str, str] = {} | ||
|
||
for job in self.get_all_jobs(): | ||
def _transform_extraction(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing return type annotation.
|
||
for job in self.get_all_jobs(): | ||
def _transform_extraction(self): | ||
dags = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type annotation?
# in Glue, it's possible for two buckets to have files of different extensions | ||
# if this happens, we append the extension in the URN so the sources can be distinguished | ||
# see process_dataflow_node() for details | ||
s3_formats: typing.DefaultDict[str, Set[Union[str, None]]] = defaultdict( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set[Optional[str]]
self.report.report_workunit(wu) | ||
yield wu | ||
|
||
def get_workunits(self) -> Iterable[MetadataWorkUnit]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has become a 100 line function now. Refactor into smaller functions?
@@ -1,4 +1,43 @@ | |||
[ | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have the test coverage numbers remained the same or dropped after this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It remained the same==86%.
@@ -89,7 +90,7 @@ def get_session(self) -> Session: | |||
region_name=self.aws_region, | |||
) | |||
else: | |||
return Session(region_name=self.aws_region) | |||
return Session(region_name=self.aws_region, profile_name=self.aws_profile) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the profile_name
param supported by our min version of boto3?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is there for 7 years, so I think we should be fine
|
||
def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]: | ||
domain_urn = self._gen_domain_urn(database) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: delete empty line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…ect#4110) * Add container and domain support for Glue. Adding option to set aws profile for Glue. * Adding domain doc for Glue * Making get_workunits less complex * Updating golden file * Addressing pr review comments * Remove unneded empty line
…ect#4110) * Add container and domain support for Glue. Adding option to set aws profile for Glue. * Adding domain doc for Glue * Making get_workunits less complex * Updating golden file * Addressing pr review comments * Remove unneded empty line
Checklist