Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Looper's integration of Pipestat #492

Closed
donaldcampbelljr opened this issue May 20, 2024 · 12 comments
Closed

Refactor Looper's integration of Pipestat #492

donaldcampbelljr opened this issue May 20, 2024 · 12 comments
Milestone

Comments

@donaldcampbelljr
Copy link
Contributor

donaldcampbelljr commented May 20, 2024

Currently, Looper checks if Pipestat has been configured for each sample before adding the sample to the submission conductor.

If pipestat can be successfully configured, looper generates a configuration file to be used by pipestat called looper_pipestat_config.yaml which looks something like this:

results_file_path: /home/drc/GITHUB/hello_looper/hello_looper/pipestat/./results.yaml
flag_file_dir: /home/drc/GITHUB/hello_looper/hello_looper/pipestat/./results/flags
record_identifier: frog_2
output_dir: /home/drc/GITHUB/hello_looper/hello_looper/pipestat/./results
schema_path: /home/drc/GITHUB/hello_looper/hello_looper/pipestat/./pipeline_pipestat/pipestat_output_schema.yaml
pipeline_name: test_pipe
pipeline_type: sample

Currently, user adds pipestat field to .looper.yaml file with relevant info:

pep_config: ./project/project_config.yaml # pephub registry path or local path  
output_dir: ./results  
pipeline_interfaces:  
  sample:  ./pipeline_pipestat/pipeline_interface.yaml  
  project: ./pipeline_pipestat/pipeline_interface_project.yaml  
pipestat:  
  results_file_path: results.yaml  
  flag_file_dir: results/flags

after setting everything up, looper creates a pipestat config file which can be used by the pipeline author to configure pipestat by passing that along to a pipestat instance within a pipeline:

results_file_path: /home/drc/GITHUB/hello_looper/hello_looper/pipestat/./results.yaml  
flag_file_dir: /home/drc/GITHUB/hello_looper/hello_looper/pipestat/./results/flags  
output_dir: /home/drc/GITHUB/hello_looper/hello_looper/pipestat/./results  
schema_path: /home/drc/GITHUB/hello_looper/hello_looper/pipestat/./pipeline_pipestat/pipestat_output_schema.yaml  
pipeline_name: example_pipestat_pipeline  
pipeline_type: sample  
record_identifier: frog_2

For example: the pipeline interface author (pipeline author) can pass these values to the pipeline:

pipeline_interface (for a pipeline.py):

pipeline_name: example_pipestat_pipeline  
pipeline_type: sample  
output_schema: pipestat_output_schema.yaml  
command_template: >  
  python {looper.piface_dir}/count_lines.py {sample.file} {sample.sample_name} {pipestat.results_file}

pipeline_interface (for a shell pipeline):

pipeline_name: example_pipestat_pipeline  
pipeline_type: sample  
output_schema: pipestat_output_schema.yaml  
command_template: >  
  {looper.piface_dir}/count_lines_pipestat.sh {sample.file} {sample.sample_name} {pipestat.config_file}

How looper checks for pipestat configuration:

looper/looper/project.py

Lines 336 to 352 in 3899672

@cached_property
def pipestat_configured(self):
"""
Whether pipestat configuration is complete for all sample pipelines
:return bool: whether pipestat configuration is complete
"""
return self._check_if_pipestat_configured()
@cached_property
def pipestat_configured_project(self):
"""
Whether pipestat configuration is complete for all project pipelines
:return bool: whether pipestat configuration is complete
"""
return self._check_if_pipestat_configured(project_level=True)

The main functions for this are _check_if_pipestat_configured and _get_pipestat_configuration.

Code moves through _check_if_pipestat_configured first and will return True or False. If there is any exception raised during the next step for either a single sample or a project, it will return False.

looper/looper/project.py

Lines 471 to 503 in 3899672

def _check_if_pipestat_configured(self, project_level=False):
"""
A helper method determining whether pipestat configuration is complete
:param bool project_level: whether the project pipestat config should be checked
:return bool: whether pipestat configuration is complete
"""
try:
if project_level:
pipestat_configured = self._get_pipestat_configuration(
sample_name=None, project_level=project_level
)
else:
for s in self.samples:
pipestat_configured = self._get_pipestat_configuration(
sample_name=s.sample_name
)
except Exception as e:
context = (
f"Project '{self.name}'"
if project_level
else f"Sample '{s.sample_name}'"
)
_LOGGER.debug(
f"Pipestat configuration incomplete for {context}; "
f"caught exception: {getattr(e, 'message', repr(e))}"
)
return False
else:
if pipestat_configured is not None and pipestat_configured != {}:
return True
else:
return False

If this function returns False, looper continues, assuming the user does not wish to use pipestat.

Related Issues:
#411
#413
#425
#459
#471

@donaldcampbelljr
Copy link
Contributor Author

Here is _get_pipestat_configuration:

looper/looper/project.py

Lines 505 to 599 in 3899672

def _get_pipestat_configuration(self, sample_name=None, project_level=False):
"""
Get all required pipestat configuration variables from looper_config file
"""
ret = {}
if not project_level and sample_name is None:
raise ValueError(
"Must provide the sample_name to determine the "
"sample to get the PipestatManagers for"
)
if PIPESTAT_KEY in self[EXTRA_KEY]:
pipestat_config_dict = self[EXTRA_KEY][PIPESTAT_KEY]
else:
_LOGGER.debug(
f"'{PIPESTAT_KEY}' not found in '{LOOPER_KEY}' section of the "
f"project configuration file."
)
# We cannot use pipestat without it being defined in the looper config file.
raise ValueError
# Expand paths in the event ENV variables were used in config files
output_dir = expandpath(self.output_dir)
# Get looper user configured items first and update the pipestat_config_dict
try:
results_file_path = expandpath(pipestat_config_dict["results_file_path"])
if not os.path.exists(os.path.dirname(results_file_path)):
results_file_path = os.path.join(
os.path.dirname(output_dir), results_file_path
)
pipestat_config_dict.update({"results_file_path": results_file_path})
except KeyError:
results_file_path = None
try:
flag_file_dir = expandpath(pipestat_config_dict["flag_file_dir"])
if not os.path.isabs(flag_file_dir):
flag_file_dir = os.path.join(os.path.dirname(output_dir), flag_file_dir)
pipestat_config_dict.update({"flag_file_dir": flag_file_dir})
except KeyError:
flag_file_dir = None
if sample_name:
pipestat_config_dict.update({"record_identifier": sample_name})
if project_level and "project_name" in pipestat_config_dict:
pipestat_config_dict.update(
{"project_name": pipestat_config_dict["project_name"]}
)
if project_level and "{record_identifier}" in results_file_path:
# if project level and using {record_identifier}, pipestat needs some sort of record_identifier during creation
pipestat_config_dict.update(
{"record_identifier": "default_project_record_identifier"}
)
pipestat_config_dict.update({"output_dir": output_dir})
pifaces = (
self.project_pipeline_interfaces
if project_level
else self._interfaces_by_sample[sample_name]
)
for piface in pifaces:
# We must also obtain additional pipestat items from the pipeline author's piface
if "output_schema" in piface.data:
schema_path = expandpath(piface.data["output_schema"])
if not os.path.isabs(schema_path):
# Get path relative to the pipeline_interface
schema_path = os.path.join(
os.path.dirname(piface.pipe_iface_file), schema_path
)
pipestat_config_dict.update({"schema_path": schema_path})
if "pipeline_name" in piface.data:
pipestat_config_dict.update(
{"pipeline_name": piface.data["pipeline_name"]}
)
if "pipeline_type" in piface.data:
pipestat_config_dict.update(
{"pipeline_type": piface.data["pipeline_type"]}
)
# Pipestat_dict_ is now updated from all sources and can be written to a yaml.
looper_pipestat_config_path = os.path.join(
os.path.dirname(output_dir), "looper_pipestat_config.yaml"
)
write_pipestat_config(looper_pipestat_config_path, pipestat_config_dict)
ret[piface.pipeline_name] = {
"config_file": looper_pipestat_config_path,
}
return ret

@donaldcampbelljr
Copy link
Contributor Author

Looper creates pipestat managers using:

looper/looper/project.py

Lines 450 to 469 in 3899672

def get_pipestat_managers(self, sample_name=None, project_level=False):
"""
Get a collection of pipestat managers for the selected sample or project.
The number of pipestat managers corresponds to the number of unique
output schemas in the pipeline interfaces specified by the sample or project.
:param str sample_name: sample name to get pipestat managers for
:param bool project_level: whether the project PipestatManagers
should be returned
:return dict[str, pipestat.PipestatManager]: a mapping of pipestat
managers by pipeline interface name
"""
pipestat_configs = self._get_pipestat_configuration(
sample_name=sample_name, project_level=project_level
)
return {
pipeline_name: PipestatManager(**pipestat_vars)
for pipeline_name, pipestat_vars in pipestat_configs.items()
}

This func creates PipestatManagerObjects based on the configuration made within _get_pipestat_configuration

@donaldcampbelljr
Copy link
Contributor Author

donaldcampbelljr commented May 21, 2024

After today's discussion:
- [ ] Looper should not create a pipestat config for every sample. Looper should use the hook system to create a pipestat configuration file that is to be used for the entire PEP.
- [ ] similarly, looper should not create a pipestat object for each sample, it should instead create a PipestatManager once for the entire pep.

  • continue to pass the pipestat config to the pipeline via the command templates (we are doing this currently)

@donaldcampbelljr
Copy link
Contributor Author

Question:

What about when more than one pipeline interface is specified? Create a pipestat instance for each and thus each would have their own config file ala: pipeline_name_pipestat_config.yaml ?

pipeline_interfaces:
  sample:
    - ../pipeline/pipeline_interface1_sample.yaml
    - ../pipeline/pipeline_interface2_sample.yaml
  project:
    - ../pipeline/pipeline_interface1_project.yaml
    - ../pipeline/pipeline_interface2_project.yaml

@nsheff
Copy link
Contributor

nsheff commented May 22, 2024

That probably makes sense.

the alternative would be, you could maybe just update the pipeline interface of an existing PipestatManager object

But maybe it makes sense to attach the PipestatManager to the pipeline interface object, so there would be one per pipeline interface.

@donaldcampbelljr
Copy link
Contributor Author

But maybe it makes sense to attach the PipestatManager to the pipeline interface object, so there would be one per pipeline interface.

Yeah, I'm leaning towards this and will investigate this approach.

@donaldcampbelljr
Copy link
Contributor Author

Another question/issue I've run into this afternoon:

What's the priority for sourcing the pipeline name?

Pipestat looks at the one supplied in the output_schema first. However, the pipeline_interface (either sample or project) used with Looper also has a pipeline_name and points to a pipestat output schema.

These pipeline_names could be different if the user wishes to use one output_schema for two different interfaces.

I just modified the order in which Pipestat can determine the pipeline_name:

        self.cfg[PIPELINE_NAME] = (
            pipeline_name
            or self.cfg[CONFIG_KEY].get(PIPELINE_NAME)
            or self.cfg[SCHEMA_KEY].pipeline_name
            if self.cfg[SCHEMA_KEY] is not None
            else DEFAULT_PIPELINE_NAME
        )

So user_supplied -> config_supplied -> schema_supplied else use the default.

@donaldcampbelljr
Copy link
Contributor Author

Follow up issue:
I believe the project level execution currently only works when using runp command. It appears as though there was an argument project that would enable project-level running for all other commands (except run) but I believe this is now broken. Need to add this back in so that looper only creates and uses project-level PipestatManagers when appropriate. Looper is currently creating psms for every pipeline_interface whether it be sample or project. Due to this and the above issue, I'm seeing errors where sample-level and project-level executions are attempting to write to the same results.yaml file while having different pipeline_names and it is throwing an error (because multi_pipelines is set to False by default).

@donaldcampbelljr
Copy link
Contributor Author

donaldcampbelljr commented May 24, 2024

Some notes for next Steps:

  • fix the project-level commands (add them back in as a flag).
  • force the current method to just pull from the pipeline name from pipeline interface. If the user is using pipestat and the output schema has a different pipeline_name, warn the user but send the piface pipeline_name to pipestat via the config file.
  • Add multi_pipelines = true so that multiple pipelines can write to one results file under different pipeline names.

The above will be fixes for 1.9.0 that should work for the majority of cases until we do more work to the system for 2.0 release.

@donaldcampbelljr
Copy link
Contributor Author

For Looper 2.0, we will consolidate sample and project level pipeline interfaces under a single interface. This will break backwards compatibility.

pipeline_name: example_pipestat_pipeline
output_schema: pipestat_output_schema.yaml
sample_interface:
  command_template: >
   python {looper.piface_dir}/count_lines.py {sample.file} {sample.sample_name} {pipestat.results_file}
project_interface:
  command_template: >
   python {looper.piface_dir}/count_lines_project.py {sample.file} {pipestat.results_file}

And the output schema pipeline name must match with the pipeline interface pipeline_name. We will enforce this by raising an exception of they do not match.

@donaldcampbelljr
Copy link
Contributor Author

For sample vs project level commands, see this related issue: #360

@donaldcampbelljr
Copy link
Contributor Author

Ok, the refactoring is nearly complete and I've also added the --project argument back to the CLI. I will still need to do some final clean up/testing before merging.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants