Skip to content

Commit

Permalink
fix(ingestion) containers: Adding platform instance to container keys (
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Mar 16, 2022
1 parent 1ab3ad3 commit f557b2c
Show file tree
Hide file tree
Showing 10 changed files with 2,306 additions and 1,896 deletions.
117 changes: 107 additions & 10 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
BrowsePathsClass,
ChartInfoClass,
ChartKeyClass,
ContainerClass,
ContainerKeyClass,
ContainerPropertiesClass,
DatahubIngestionCheckpointClass,
DatahubIngestionRunSummaryClass,
DataJobInputOutputClass,
Expand Down Expand Up @@ -357,10 +360,103 @@ def get_urns_by_filter(
response.raise_for_status()


def get_container_ids_by_filter(
env: Optional[str],
entity_type: str = "container",
search_query: str = "*",
) -> Iterable[str]:
session, gms_host = get_session_and_host()
endpoint: str = "/entities?action=search"
url = gms_host + endpoint

container_filters = []
for container_subtype in ["Database", "Schema", "Project", "Dataset"]:
filter_criteria = []

filter_criteria.append(
{
"field": "customProperties",
"value": f"instance={env}",
"condition": "EQUAL",
}
)

filter_criteria.append(
{
"field": "typeNames",
"value": container_subtype,
"condition": "EQUAL",
}
)
container_filters.append({"and": filter_criteria})
search_body = {
"input": search_query,
"entity": entity_type,
"start": 0,
"count": 10000,
"filter": {"or": container_filters},
}
payload = json.dumps(search_body)
log.debug(payload)
response: Response = session.post(url, payload)
if response.status_code == 200:
assert response._content
log.debug(response._content)
results = json.loads(response._content)
num_entities = results["value"]["numEntities"]
entities_yielded: int = 0
for x in results["value"]["entities"]:
entities_yielded += 1
log.debug(f"yielding {x['entity']}")
yield x["entity"]
assert (
entities_yielded == num_entities
), "Did not delete all entities, try running this command again!"
else:
log.error(f"Failed to execute search query with {str(response.content)}")
response.raise_for_status()


def batch_get_ids(
ids: List[str],
) -> Iterable[Dict]:
session, gms_host = get_session_and_host()
endpoint: str = "/entitiesV2"
url = gms_host + endpoint
ids_to_get = []
for id in ids:
ids_to_get.append(urllib.parse.quote(id))

response = session.get(
f"{url}?ids=List({','.join(ids_to_get)})",
)

if response.status_code == 200:
assert response._content
log.debug(response._content)
results = json.loads(response._content)
num_entities = len(results["results"])
entities_yielded: int = 0
for x in results["results"].values():
entities_yielded += 1
log.debug(f"yielding {x}")
yield x
assert (
entities_yielded == num_entities
), "Did not delete all entities, try running this command again!"
else:
log.error(f"Failed to execute batch get with {str(response.content)}")
response.raise_for_status()


def get_incoming_relationships(urn: str, types: List[str]) -> Iterable[Dict]:
yield from get_relationships(urn=urn, types=types, direction="INCOMING")


def get_outgoing_relationships(urn: str, types: List[str]) -> Iterable[Dict]:
yield from get_relationships(urn=urn, types=types, direction="OUTGOING")


def get_relationships(urn: str, types: List[str], direction: str) -> Iterable[Dict]:
session, gms_host = get_session_and_host()
encoded_urn = urllib.parse.quote(urn, safe="")
Expand All @@ -369,6 +465,7 @@ def get_relationships(urn: str, types: List[str], direction: str) -> Iterable[Di
response: Response = session.get(endpoint)
if response.status_code == 200:
results = response.json()
log.debug(f"Relationship response: {results}")
num_entities = results["count"]
entities_yielded: int = 0
for x in results["relationships"]:
Expand Down Expand Up @@ -400,7 +497,7 @@ def get_entity(
raise Exception(
f"urn {urn} does not seem to be a valid raw (starts with urn:) or encoded urn (starts with urn%3A)"
)
endpoint: str = f"/entities/{encoded_urn}"
endpoint: str = f"/entitiesV2/{encoded_urn}"

if aspect:
endpoint = endpoint + "?aspects=List(" + ",".join(aspect) + ")"
Expand Down Expand Up @@ -474,6 +571,9 @@ def post_entity(
ChartInfoClass: "chartInfo",
DataProcessInfoClass: "dataProcessInfo",
ChartKeyClass: "chartKey",
ContainerClass: "container",
ContainerKeyClass: "containerKey",
ContainerPropertiesClass: "containerProperties",
}

timeseries_class_to_aspect_name_map: Dict[Type, str] = {
Expand Down Expand Up @@ -547,9 +647,7 @@ def get_aspects_for_entity(
entity_response = get_entity(
entity_urn, non_timeseries_aspects, cached_session_host
)
aspect_list: List[Dict[str, dict]] = list(entity_response["value"].values())[0][
"aspects"
]
aspect_list: Dict[str, dict] = entity_response["aspects"]

# Process timeseries aspects & append to aspect_list
timeseries_aspects: List[str] = [
Expand All @@ -570,7 +668,7 @@ def get_aspects_for_entity(
aspect_value["aspect"]["value"] = json.loads(
aspect_value["aspect"]["value"]
)
aspect_list.append(
aspect_list.update(
# Follow the convention used for non-timeseries aspects.
{
aspect_cls.RECORD_SCHEMA.fullname.replace(
Expand All @@ -580,16 +678,15 @@ def get_aspects_for_entity(
)

aspect_map: Dict[str, Union[dict, DictWrapper]] = {}
for a in aspect_list:
aspect_class = list(a.keys())[0]
aspect_name = _get_aspect_name_from_aspect_class(aspect_class)
for a in aspect_list.values():
aspect_name = a["name"]
aspect_py_class: Optional[Type[Any]] = _get_pydantic_class_from_aspect_name(
aspect_name
)
if aspect_name == "unknown":
print(f"Failed to find aspect_name for class {aspect_class}")
print(f"Failed to find aspect_name for class {aspect_name}")

aspect_dict = list(a.values())[0]
aspect_dict = a["value"]
if not typed:
aspect_map[aspect_name] = aspect_dict
elif aspect_py_class:
Expand Down
Loading

0 comments on commit f557b2c

Please sign in to comment.