Skip to content

Commit

Permalink
Merge pull request #552 from kids-first/querytarget
Browse files Browse the repository at this point in the history
✨ Add option to query from another source in Load
  • Loading branch information
fiendish authored Jan 6, 2021
2 parents e250fbb + 4edb271 commit 04f0310
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 6 deletions.
25 changes: 23 additions & 2 deletions kf_lib_data_ingest/app/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,28 @@ def common_args_options(func):
type=click.Path(exists=True, file_okay=True, dir_okay=True),
)(func)

func = click.option(
"-q",
"--query_url",
default="",
help=(
"Service URL that unique identifiers will be queried from during"
" loading if not in the local cache (defaults to the load"
" target URL)."
),
)(func)

# Clear the UID cache
func = click.option(
"--clear_cache",
default=False,
is_flag=True,
help="Clear the identifier cache before loading into the target service.",
help=(
"Clear the local identifier cache before loading into the target"
" service. This should always be safe unless it's the only location"
" where the target identifiers are stored (i.e. they've been"
" temporarily erased from the target service)."
),
)(func)

# Disable warehousing
Expand All @@ -86,7 +102,10 @@ def common_args_options(func):
default=False,
is_flag=True,
help=(
"Load into the target service using faster asynchronous requests."
"Load into the target service using multiple asynchronous requests"
" at a time instead of only one request at a time."
" This is potentially much faster, but consequently puts much more"
" strain on the target service."
),
)(func)

Expand Down Expand Up @@ -195,6 +214,7 @@ def ingest(
no_validate,
validation_mode,
clear_cache,
query_url,
):
"""
Run the Kids First data ingest pipeline.
Expand Down Expand Up @@ -262,6 +282,7 @@ def test(
no_validate,
validation_mode,
clear_cache,
query_url,
):
"""
Run the Kids First data ingest pipeline in dry_run mode (--dry_run=True)
Expand Down
6 changes: 6 additions & 0 deletions kf_lib_data_ingest/etl/ingest_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(
db_url_env_key=None,
validation_mode=None,
clear_cache=False,
query_url="",
):
"""
Set up data ingest pipeline. Create the config object and logger
Expand All @@ -89,6 +90,8 @@ def __init__(
defaults to False. Equivalent to deleting the file manually. Ignored
when using resume_from, because that needs the cache to be effective.
:type clear_cache: bool, optional
:param query_url: Alternative API query URL instead of asking the load target
:type query_url: str, optional
"""

assert_safe_type(ingest_package_config_path, str)
Expand All @@ -104,6 +107,7 @@ def __init__(
assert_safe_type(db_url_env_key, None, str)
assert_safe_type(validation_mode, None, str)
assert_safe_type(clear_cache, bool)
assert_safe_type(query_url, str)
stages_to_run_str = stages_to_run_str.lower()
self._validate_stages_to_run_str(stages_to_run_str)

Expand All @@ -127,6 +131,7 @@ def __init__(
self.warehouse_db_url = os.environ.get(db_url_env_key or "")
self.validation_mode = validation_mode
self.clear_cache = clear_cache
self.query_url = query_url

# Get log params from ingest_package_config
log_dir = log_dir or self.data_ingest_config.log_dir
Expand Down Expand Up @@ -226,6 +231,7 @@ def _iterate_stages(self):
dry_run=self.dry_run,
resume_from=self.resume_from,
clear_cache=self.clear_cache,
query_url=self.query_url,
)

def run(self):
Expand Down
16 changes: 12 additions & 4 deletions kf_lib_data_ingest/etl/load/load_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,18 @@
from pprint import pformat

from kf_lib_data_ingest.common import constants
from kf_lib_data_ingest.etl.load.load_v1 import LoadStage as LoadV1
from kf_lib_data_ingest.etl.load.load_v1 import LoadStage as LoadBase


class LoadStage(LoadV1):
class LoadStage(LoadBase):
def __init__(self, *args, query_url="", **kwargs):
"""
:param query_url: Alternative API query URL instead of asking the load target
:type query_url: str, optional
"""
super().__init__(*args, **kwargs)
self.query_url = query_url

def _get_target_id_from_record(self, entity_class, record):
"""
Find the target service ID for the given record and entity class.
Expand Down Expand Up @@ -40,11 +48,11 @@ def _get_target_id_from_record(self, entity_class, record):
return tic

# check the server
if self.dry_run:
if self.dry_run and not self.query_url:
return None
try:
tic_list = entity_class.query_target_ids(
self.target_url, key_components
self.query_url or self.target_url, key_components
)
if tic_list:
if len(tic_list) > 1:
Expand Down

0 comments on commit 04f0310

Please sign in to comment.