Skip to content

Commit

Permalink
🔊 Rearrange target ID inquiry to improve load logging
Browse files Browse the repository at this point in the history
  • Loading branch information
fiendish committed May 10, 2021
1 parent fc8f79b commit 5e3310f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 36 deletions.
64 changes: 29 additions & 35 deletions kf_lib_data_ingest/etl/load/load_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,10 @@ def __init__(
self.project_id = project_id
self.use_async = use_async

target = urlparse(target_url).netloc or urlparse(target_url).path
self.uid_cache_filepath = os.path.join(
self.stage_cache_dir,
# Every target gets its own cache because they don't share UIDs
"_".join(multisplit(target, [":", "/"]))
self._clean_name(target_url)
# Every project gets its own cache to compartmentalize internal IDs
+ "_" + project_id + "_" + DEFAULT_ID_CACHE_FILENAME,
)
Expand All @@ -114,6 +113,10 @@ def __init__(
check_same_thread=False,
)

def _clean_name(self, target_url):
target = urlparse(target_url).netloc or urlparse(target_url).path
return "_".join(multisplit(target, [":", "/"]))

def _validate_entities(self, entities_to_load, msg):
"""
Validate that all entities in entities_to_load are one of the
Expand Down Expand Up @@ -289,15 +292,15 @@ def _load_entity(self, entity_class, record):

self.seen_entities[entity_class.class_name].add(unique_key)

target_id = self._get_target_id_from_record(entity_class, record)
method = "UPDATE" if target_id else "ADD"

body = self._do_target_get_entity(entity_class, record, unique_key)

if current_thread() is not main_thread():
current_thread().name = f"{entity_class.class_name} {unique_key}"

if self.resume_from:
target_id = self._get_target_id_from_key(
entity_class.class_name, unique_key
)
if not target_id:
raise InvalidIngestStageParameters(
"Use of the resume_from flag requires having already"
Expand All @@ -313,28 +316,18 @@ def _load_entity(self, entity_class, record):
self.dry_run = False
self.resume_from = None

if self.dry_run:
target_id = self._get_target_id_from_key(
entity_class.class_name, unique_key
)
if target_id:
req_method = "UPDATE"
id_str = f"({target_id})"
else:
req_method = "ADD"
id_str = f"({unique_key})"
target_id = id_str
msg = f"{method} {entity_class.class_name} ({unique_key})"
if target_id:
msg = f"{msg} [{target_id}]"

if self.dry_run:
self.logger.debug(f"Request body preview:\n{pformat(body)}")
done_msg = (
f"DRY RUN - {req_method} {entity_class.class_name} {id_str}"
)
msg = f"DRY RUN - {msg}"
target_id = f"({target_id})"
else:
# send to the target service
target_id = self._do_target_submit(entity_class, body)
done_msg = (
f"Loaded {entity_class.class_name} {unique_key} --> {target_id}"
)
msg = f"{msg} --> {target_id}"

# cache source_ID:target_ID lookup
self._store_target_id_for_key(
Expand All @@ -345,14 +338,14 @@ def _load_entity(self, entity_class, record):
with count_lock:
self.sent_messages.append(
{
"host": self.target_url,
"type": entity_class.class_name,
"method": method,
"body": body,
}
)
self.counts[entity_class.class_name] += 1
self.counts[entity_class.class_name][method] += 1
self.logger.info(
done_msg + f" (#{self.counts[entity_class.class_name]})"
f"{msg} (#{sum(self.counts[entity_class.class_name].values())})"
)

def _postrun_validation(self, validation_mode=None, report_kwargs={}):
Expand All @@ -367,7 +360,7 @@ def _run(self, transform_output):
:param transform_output: Output data structure from the Transform stage
:type transform_output: dict
"""
self.counts = {}
self.counts = defaultdict(dict)
self.seen_entities = defaultdict(set)

if self.dry_run:
Expand Down Expand Up @@ -413,24 +406,20 @@ def _run(self, transform_output):
for r in transformed_records:
r[CONCEPT.PROJECT.ID] = self.project_id

self.counts[entity_class.class_name] = 0
self.counts[entity_class.class_name]["ADD"] = 0
self.counts[entity_class.class_name]["UPDATE"] = 0

if self.use_async:
ex = concurrent.futures.ThreadPoolExecutor()
futures = []

self.logger.info(
f"Found {entity_class.class_name} {len(transformed_records)} "
"records to load."
f"Reading {len(transformed_records)} rows in '{t_key}' table."
)
for record in transformed_records:
if self.use_async and not self.resume_from:
futures.append(
ex.submit(
self._load_entity,
entity_class,
record,
)
ex.submit(self._load_entity, entity_class, record)
)
else:
self._load_entity(entity_class, record)
Expand All @@ -442,7 +431,10 @@ def _run(self, transform_output):

self.logger.info(f"End loading {entity_class.class_name}")
finally:
json_out = os.path.join(self.stage_cache_dir, "SentMessages.json")
target = self._clean_name(self.target_url)
json_out = os.path.join(
self.stage_cache_dir, f"SentMessages_{target}.json"
)
with open(json_out, "w") as jo:
json.dump(self.sent_messages, jo, indent=2)

Expand All @@ -451,3 +443,5 @@ def _run(self, transform_output):
f"⚠️ Could not find resume_from target '{self.resume_from}'! "
"Nothing was actually loaded into the target service."
)

self.logger.info(f"Load Summary:\n{pformat(dict(self.counts))}")
4 changes: 3 additions & 1 deletion tests/test_kidsfirst_api_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ def test_external_id_whole_ingest():
output_dir = join(study_dir, "output")
delete_dir(output_dir)
runner.invoke(cli.test, study_dir)
with open(join(output_dir, "LoadStage", "SentMessages.json")) as sm:
with open(
join(output_dir, "LoadStage", "SentMessages_localhost_5000.json")
) as sm:
data = json.load(sm)
gfs = [e for e in data if e["type"] == "genomic_file"]
bsgfs = [e for e in data if e["type"] == "biospecimen_genomic_file"]
Expand Down

0 comments on commit 5e3310f

Please sign in to comment.