Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🔊🐛 More verbose loader logging + loader fixes #570

Merged
merged 3 commits into from
Feb 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions kf_lib_data_ingest/etl/load/load_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def __init__(
when using resume_from, because that needs the cache to be effective.
:type clear_cache: bool, optional
"""
super().__init__(cache_dir)
super().__init__(cache_dir or os.getcwd())
self.target_api_config = TargetAPIConfig(target_api_config_path)
self._validate_entities(
entities_to_load,
Expand All @@ -87,7 +87,7 @@ def __init__(

target = urlparse(target_url).netloc or urlparse(target_url).path
self.uid_cache_filepath = os.path.join(
cache_dir or os.getcwd(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this?

Copy link
Member Author

@znatty22 znatty22 Feb 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wellll, bc when you instantiate loader with cache_dir=None, it throws an exception bc the base Stage class uses it for ingest_output_dir. I went back and forth on this. If no dir is passed in should the Loader use cwd as a default to pass to base class or should base class handle the no dir and set it to cwd?

Copy link
Member Author

@znatty22 znatty22 Feb 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh also, its not removed. I just moved setting cache_dir up earlier in the constructor

Copy link
Contributor

@fiendish fiendish Feb 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it looks like this doesn't use the cache dir anymore for the uid cache file? it seems like this always puts self.uid_cache_filepath in the current dir instead of in the cache dir

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohhh yea sorry I see what you're saying now. I accidentally removed that little important line altogether.

self.stage_cache_dir,
# Every target gets its own cache because they don't share UIDs
"_".join(multisplit(target, [":", "/"]))
# Every study gets its own cache to compartmentalize internal IDs
Expand Down Expand Up @@ -266,15 +266,25 @@ def _load_entity(self, entity_class, record):
"""
try:
key_components = self._do_target_get_key(entity_class, record)
unique_key = str(key_components)
except Exception:
# no new key, no new entity
key_components = None

if not key_components:
self.logger.debug(
f"Skip {entity_class.class_name}. Missing key components. "
f"Failed to construct unique key from record:"
f"\n{pformat(record)}"
)
return

if (not key_components) or (
unique_key in self.seen_entities[entity_class.class_name]
):
unique_key = str(key_components)
if unique_key in self.seen_entities[entity_class.class_name]:
# no new key, no new entity
self.logger.debug(
f"Skip {entity_class.class_name}. Duplicate record found in "
f"data:\n{record}"
)
return

self.seen_entities[entity_class.class_name].add(unique_key)
Expand Down Expand Up @@ -379,7 +389,8 @@ def _run(self, transform_output):
for entity_class in self.target_api_config.all_targets:
if entity_class.class_name not in self.entities_to_load:
self.logger.info(
f"Skipping load of {entity_class.class_name}"
f"Skipping load of {entity_class.class_name}. Not "
"included in ingest package config."
)
continue

Expand Down Expand Up @@ -408,6 +419,10 @@ def _run(self, transform_output):
ex = concurrent.futures.ThreadPoolExecutor()
futures = []

self.logger.info(
f"Found {entity_class.class_name} {len(transformed_records)} "
"records to load."
)
for record in transformed_records:
if self.use_async and not self.resume_from:
futures.append(
Expand Down
8 changes: 7 additions & 1 deletion kf_lib_data_ingest/etl/load/load_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,15 @@ def _get_target_id_from_record(self, entity_class, record):
# check the server
if self.dry_run and not self.query_url:
return None

err = False
try:
tic_list = entity_class.query_target_ids(
self.query_url or self.target_url, key_components
)
if tic_list:
if len(tic_list) > 1:
err = True
raise Exception(
znatty22 marked this conversation as resolved.
Show resolved Hide resolved
"Ambiguous query. Multiple target identifiers found.\n"
"Sent:\n"
Expand All @@ -73,7 +76,10 @@ def _get_target_id_from_record(self, entity_class, record):
)
return tic
except Exception:
return None
if err:
raise

return None

def _do_target_submit(self, entity_class, body):
"""Shim for target API submission across loader versions"""
Expand Down
69 changes: 54 additions & 15 deletions kf_lib_data_ingest/target_api_plugins/kids_first_dataservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,32 @@
logger = logging.getLogger(__name__)

LOADER_VERSION = 2
DELIMITER = "-"


def drop_none(body):
return {k: v for k, v in body.items() if v is not None}


def not_none(val):
assert val is not None
if val is None:
raise ValueError("Missing required value")
return val


def external_id(cls_list, record, get_target_id_from_record):
out = []
for cls in cls_list:
cls_key_dict = cls.get_key_components(
record, get_target_id_from_record
).copy()
for key in ["study_id", "sequencing_center_id"]:
cls_key_dict.pop(key, None)
out.extend([str(v) for v in cls_key_dict.values()])

return DELIMITER.join(out) or None


class Investigator:
class_name = "investigator"
api_path = "investigators"
Expand Down Expand Up @@ -339,14 +354,8 @@ class Biospecimen:
@classmethod
def get_key_components(cls, record, get_target_id_from_record):
return {
"participant_id": not_none(
get_target_id_from_record(Participant, record)
),
"study_id": not_none(record[CONCEPT.STUDY.TARGET_SERVICE_ID]),
"external_aliquot_id": not_none(record[CONCEPT.BIOSPECIMEN.ID]),
"external_sample_id": (
record.get(CONCEPT.BIOSPECIMEN_GROUP.ID)
or not_none(record[CONCEPT.BIOSPECIMEN.ID])
),
}

@classmethod
Expand All @@ -360,6 +369,13 @@ def build_entity(cls, record, get_target_id_from_record):
"sequencing_center_id": record.get(
CONCEPT.SEQUENCING.CENTER.TARGET_SERVICE_ID
),
"participant_id": not_none(
get_target_id_from_record(Participant, record)
),
"external_sample_id": (
record.get(CONCEPT.BIOSPECIMEN_GROUP.ID)
or not_none(record[CONCEPT.BIOSPECIMEN.ID])
),
"source_text_tissue_type": record.get(
CONCEPT.BIOSPECIMEN.TISSUE_TYPE
),
Expand Down Expand Up @@ -426,6 +442,20 @@ def query_target_ids(cls, host, key_components):

@classmethod
def build_entity(cls, record, get_target_id_from_record):
def size(record):
try:
return int(record.get(CONCEPT.GENOMIC_FILE.SIZE))
except Exception:
return None

def hashes(record):
return {
k.lower().replace("-", ""): v
for k, v in str_to_obj(
record.get(CONCEPT.GENOMIC_FILE.HASH_DICT, {})
).items()
}

secondary_components = {
"kf_id": get_target_id_from_record(cls, record),
"file_name": record.get(CONCEPT.GENOMIC_FILE.FILE_NAME),
Expand All @@ -436,13 +466,8 @@ def build_entity(cls, record, get_target_id_from_record):
record.get(CONCEPT.GENOMIC_FILE.CONTROLLED_ACCESS)
),
"is_harmonized": record.get(CONCEPT.GENOMIC_FILE.HARMONIZED),
"hashes": {
k.lower().replace("-", ""): v
for k, v in str_to_obj(
record.get(CONCEPT.GENOMIC_FILE.HASH_DICT)
).items()
},
"size": int(record.get(CONCEPT.GENOMIC_FILE.SIZE)),
"hashes": hashes(record),
"size": size(record),
"urls": str_to_obj(record.get(CONCEPT.GENOMIC_FILE.URL_LIST)),
"acl": str_to_obj(record.get(CONCEPT.GENOMIC_FILE.ACL)),
"reference_genome": record.get(
Expand Down Expand Up @@ -671,6 +696,9 @@ def build_entity(cls, record, get_target_id_from_record):
secondary_components = {
"kf_id": get_target_id_from_record(cls, record),
"visible": record.get(CONCEPT.BIOSPECIMEN_GENOMIC_FILE.VISIBLE),
"external_id": external_id(
[Biospecimen, GenomicFile], record, get_target_id_from_record
),
}
return {
**cls.get_key_components(record, get_target_id_from_record),
Expand Down Expand Up @@ -708,6 +736,9 @@ def build_entity(cls, record, get_target_id_from_record):
secondary_components = {
"kf_id": get_target_id_from_record(cls, record),
"visible": record.get(CONCEPT.BIOSPECIMEN_DIAGNOSIS.VISIBLE),
"external_id": external_id(
[Biospecimen, Diagnosis], record, get_target_id_from_record
),
}
return {
**cls.get_key_components(record, get_target_id_from_record),
Expand Down Expand Up @@ -745,6 +776,9 @@ def build_entity(cls, record, get_target_id_from_record):
secondary_components = {
"kf_id": get_target_id_from_record(cls, record),
"visible": record.get(CONCEPT.READ_GROUP_GENOMIC_FILE.VISIBLE),
"external_id": external_id(
[ReadGroup, GenomicFile], record, get_target_id_from_record
),
}
return {
**cls.get_key_components(record, get_target_id_from_record),
Expand Down Expand Up @@ -782,6 +816,11 @@ def build_entity(cls, record, get_target_id_from_record):
secondary_components = {
"kf_id": get_target_id_from_record(cls, record),
"visible": record.get(CONCEPT.SEQUENCING_GENOMIC_FILE.VISIBLE),
"external_id": external_id(
[SequencingExperiment, GenomicFile],
record,
get_target_id_from_record,
),
}
return {
**cls.get_key_components(record, get_target_id_from_record),
Expand Down