From 3a31cf0b5e2cc80d01dc9245d6b14cf87942aa3b Mon Sep 17 00:00:00 2001 From: Jake Fennick Date: Wed, 8 May 2024 16:35:52 -1000 Subject: [PATCH] temporarily remove legacy Labshare Compute API --- docs/compute.md | 40 ------- docs/dev/api.rst | 4 - docs/index.rst | 1 - src/wic/cli.py | 21 +--- src/wic/labshare.py | 264 -------------------------------------------- src/wic/main.py | 10 +- 6 files changed, 3 insertions(+), 337 deletions(-) delete mode 100644 docs/compute.md delete mode 100644 src/wic/labshare.py diff --git a/docs/compute.md b/docs/compute.md deleted file mode 100644 index 7d366888..00000000 --- a/docs/compute.md +++ /dev/null @@ -1,40 +0,0 @@ -# Compute API - -As previously mentioned, one of the beautiful things about the declarative approach to workflows is that we can execute workflows on massive machines just as easily as executing workflows on a local laptop. Concretely, merely changing `--run_local` to `--run_compute`, we can execute the exact same workflow on the NCATS HPC cluster! That's it! Absolutely no modifications to the workflow itself are necessary! - -Note that if your workflow inputs file references files on your local machine, then -1. those files will need to be manually transferred to the new machine (this could perhaps be automated, but maybe you have ~1TB of data...) -2. if you are using absolute file paths, the paths will need to be updated w.r.t. the new machine. (So use relative paths ;) ) - -## Authentication Access Token - -When using `--run_compute` you will also need to use `--compute_access_token $ACCESS_TOKEN`. Unfortunately, there is currently no programmatic way of obtaining the access token via an API from the command line. You will need to manually perform the following steps: - -* Go to https://compute.scb-ncats.io/explorer/ -* Click the green Authorize button. You will be taken to the NIH login page. -* Enter your NIH username and password, then -* Authenticate (using Microsoft Authenticator) -* You will be returned to https://compute.scb-ncats.io/explorer/ -* Click Close (NOT Logout!) -* Scroll down to [HealthController](https://compute.scb-ncats.io/explorer/#/HealthController/HealthController.ping) -* Click Try It Out and then click Execute. -* You should see a massive hash string after "authorization: bearer". -* Copy the massive hash string. Be careful not to copy any other characters (like blank spaces). -* In a bash terminal, create the environment variable `export ACCESS_TOKEN=...` (where ... means paste in the hash string) - -Unfortunately, the access token currently expires after about an hour, so you will need to repeat these steps periodically. - -## Workflow Status - -After submitting a workflow, users can check on its status either using https://compute.scb-ncats.io/explorer/ or by directly logging into dali.ncats.nih.gov and using the `squeue` command. Currently, the workflows are executed under the placeholder svc-polus user. - -``` -ssh @dali.ncats.nih.gov -``` -NOTE: The above server is behind the NIH VPN; you must enable the VPN to access it. - -``` -watch -n5 squeue -u svc-polus -``` - -After a ~2-3 minute initial delay you should see nodes starting, running, and finishing, corresponding to the individual steps in the workflow. The output files and logs are currently stored under /project/labshare-compute/ \ No newline at end of file diff --git a/docs/dev/api.rst b/docs/dev/api.rst index b715ab2b..ae6abc07 100644 --- a/docs/dev/api.rst +++ b/docs/dev/api.rst @@ -29,10 +29,6 @@ wic.input_output ------------------------------------ .. automodule:: wic.input_output -wic.labshare ------------------------------------- -.. automodule:: wic.labshare - wic.main ------------------------------------ .. automodule:: wic.main diff --git a/docs/index.rst b/docs/index.rst index d5d6f35e..22ee0e9c 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -10,7 +10,6 @@ Workflow Inference Compiler documentation tutorials/tutorials.rst userguide.md advanced.md - compute.md validation.md dev/installguide.md dev/devguide.md diff --git a/src/wic/cli.py b/src/wic/cli.py index 453df733..6e416e61 100644 --- a/src/wic/cli.py +++ b/src/wic/cli.py @@ -69,17 +69,9 @@ help='Just generates run.sh and exits. Does not actually invoke ./run.sh') group_run.add_argument('--run_local', default=False, action="store_true", help='After generating the cwl file(s), run it on your local machine.') -group_run.add_argument('--run_compute', default=False, action="store_true", - help='After generating the cwl file(s), run it on the remote labshare Compute platform.') -parser.add_argument('--compute_driver', type=str, required=False, default='slurm', choices=['slurm', 'argo'], - help='The driver to use for running workflows on labshare Compute.') -# Use required=('--run_compute' in sys.argv) make other args conditionally required. -# See https://stackoverflow.com/questions/19414060/argparse-required-argument-y-if-x-is-present -# For example, if run_compute is enabled, you MUST enable cwl_inline_subworkflows! -# Plugins with 'class: Workflow' (i.e. subworkflows) are not currently supported. -parser.add_argument('--cwl_inline_subworkflows', default=('--run_compute' in sys.argv), action="store_true", - help='Before generating the cwl file, inline all subworkflows. Required for --run_compute') +parser.add_argument('--cwl_inline_subworkflows', default=False, action="store_true", + help='Before generating the cwl file, inline all subworkflows.') parser.add_argument('--inference_disable', default=False, action="store_true", help='Disables use of the inference algorithm when compiling.') parser.add_argument('--inference_use_naming_conventions', default=False, action="store_true", @@ -95,15 +87,6 @@ parser.add_argument('--cachedir', type=str, required=False, default='cachedir', help='The directory to save intermediate results; useful with RealtimePlots.py') -AWS_URL = 'http://compute.ci.aws.labshare.org' -NCATS_URL = 'https://compute.scb-ncats.io/' - -parser.add_argument('--compute_url', type=str, default=NCATS_URL, - help='The URL associated with the labshare Compute API. Required for --run_compute') -parser.add_argument('--compute_access_token', type=str, required=('--run_compute' in sys.argv), - help="""The access_token used for authentication. Required for --run_compute - For now, get this manually from https://a-qa.labshare.org/""") - parser.add_argument('--graphviz', default=False, action="store_true", help='Generate a DAG using graphviz.') parser.add_argument('--graph_label_edges', default=False, action="store_true", diff --git a/src/wic/labshare.py b/src/wic/labshare.py deleted file mode 100644 index b5553076..00000000 --- a/src/wic/labshare.py +++ /dev/null @@ -1,264 +0,0 @@ -import argparse -import copy -from pathlib import Path -from typing import Dict, List - -import requests -import yaml - -from wic.utils_yaml import wic_loader -from . import __version__, utils -from .wic_types import KV, Cwl, NodeData, RoseTree, StepId, Tools - -TIMEOUT = 60 # seconds - - -def delete_previously_uploaded(args: argparse.Namespace, plugins_or_pipelines: str, name: str) -> None: - """Delete plugins/pipelines previously uploaded to labshare. - - Args: - args (argparse.Namespace): The command line arguments - name (str): 'plugins' or 'pipelines' - """ - access_token = args.compute_access_token - - response = requests.delete(args.compute_url + f'/compute/{plugins_or_pipelines}/' + name + ':' + __version__, - headers={'Authorization': f'Bearer {access_token}'}, timeout=TIMEOUT) - print('delete', response.json()) - # TODO: Check response for success - - -def remove_dot_dollar(tree: Cwl) -> Cwl: - """Removes . and $ from dictionary keys, e.g. $namespaces and $schemas. Otherwise, you will get - {'error': {'statusCode': 500, 'message': 'Internal Server Error'}} - This is due to MongoDB: - See https://www.mongodb.com/docs/manual/reference/limits/#Restrictions-on-Field-Names - - Args: - tree (Cwl): A Cwl document - - Returns: - Cwl: A Cwl document with . and $ removed from $namespaces and $schemas - """ - tree_str = str(yaml.dump(tree, sort_keys=False, line_break='\n', indent=2)) - tree_str_no_dd = tree_str.replace('$namespaces', 'namespaces').replace( - '$schemas', 'schemas').replace('.wic', '_wic') - tree_no_dd: Cwl = yaml.load(tree_str_no_dd, Loader=wic_loader()) # This effectively copies tree - return tree_no_dd - - -def pretty_print_request(request: requests.PreparedRequest) -> None: - """pretty prints a requests.PreparedRequest - - Args: - request (requests.PreparedRequest): The request to be printed - """ - print('request.headers', request.headers) - body = request.body - if body is not None: - if isinstance(body, bytes): - body_str = body.decode('utf-8') - else: - body_str = body - print('request.body\n', yaml.dump(yaml.load(body_str, Loader=wic_loader()))) - else: - print('request.body is None') - - -def upload_plugin(compute_url: str, access_token: str, tool: Cwl, name: str) -> str: - """Uploads CWL CommandLineTools to Polus Compute - - Args: - compute_url (str): The url to the Compute API - access_token (str): The access token used for authentication - tool (Cwl): The CWL CommandLineTool - name (str): The name of the CWL CommandLineTool - - Raises: - Exception: If the upload failed for any reason - - Returns: - str: The unique id of the plugin - """ - # Convert the compiled yaml file to json for labshare Compute. - tool_no_dd = remove_dot_dollar(tool) - compute_plugin: KV = { - 'name': name, - # TODO: Using the WIC version works for now, but since the plugins - # are supposed to be independent, they should have their own versions. - # For biobb, we can extract the version from dockerPull - 'version': __version__, - 'cwlScript': tool_no_dd - } - - # Use http POST request to upload a primitive CommandLineTool / define a plugin and get its id hash. - response = requests.post(compute_url + '/compute/plugins', - headers={'Authorization': f'Bearer {access_token}'}, - json=compute_plugin, timeout=TIMEOUT) - r_json = response.json() - - # {'error': {'statusCode': 422, 'name': 'UnprocessableEntityError', - # 'message': 'A Plugin with name ... and version ... already exists.'}} - already_uploaded = r_json.get('error', {}).get('statusCode', {}) == 422 - if already_uploaded: - return '-1' - - if 'id' not in r_json: - pretty_print_request(response.request) - print('post response') - print(r_json) - raise Exception(f'Error! Labshare plugin upload failed for {name}.') - - plugin_id: str = r_json['id'] # hash - compute_plugin['id'] = plugin_id - compute_plugin.update({'id': plugin_id}) # Necessary ? - return plugin_id - - -def print_plugins(compute_url: str) -> None: - """prints information on all currently available Compute plugins - - Args: - compute_url (str): The url to the Compute API - """ - r = requests.get(compute_url + '/compute/plugins/', timeout=TIMEOUT) - for j in r.json(): - print(f"id {j.get('id')} class {j.get('class')} name {j.get('name')}") - # print(j) - print(len(r.json())) - - -def upload_all(rose_tree: RoseTree, tools: Tools, args: argparse.Namespace, is_root: bool) -> str: - """Uploads all Plugins, Pipelines, and the root Workflow to the Compute platform - - Args: - rose_tree (RoseTree): The data associated with compiled subworkflows - tools (Tools): The CWL CommandLineTool definitions found using get_tools_cwl() - args (argparse.Namespace): The command line arguments - is_root (bool): True if this is the root workflow - - Raises: - Exception: If any of the uploads fails for any reason - - Returns: - str: The unique id of the workflow - """ - access_token = args.compute_access_token - # print('access_token', access_token) - - sub_node_data: NodeData = rose_tree.data - yaml_stem = sub_node_data.name - yaml_tree = sub_node_data.yml - cwl_tree = sub_node_data.compiled_cwl - yaml_inputs = sub_node_data.workflow_inputs_file - - sub_rose_trees: Dict[str, RoseTree] = {r.data.name: r for r in rose_tree.sub_trees} - # print(list(sub_rose_trees)) - - steps = cwl_tree['steps'] - - # Get the dictionary key (i.e. the name) of each step. - steps_keys: List[str] = [] - for step in steps: - step_key = utils.parse_step_name_str(step)[-1] - steps_keys.append(step_key) - # print(steps_keys) - - # tools_stems = [stepid.stem for stepid in tools] - # subkeys = utils.get_subkeys(steps_keys, tools_stems) - - cwl_tree_no_dd = remove_dot_dollar(cwl_tree) - yaml_inputs_no_dd = remove_dot_dollar(yaml_inputs) - - wic = yaml_tree.get('wic', {}) - wic_steps = wic.get('steps', {}) - - # Convert the compiled yaml file to json for labshare Compute. - # Replace 'run' with plugin:id - cwl_tree_run = copy.deepcopy(cwl_tree_no_dd) - for i, step_key in enumerate(steps_keys): - sub_wic = wic_steps.get(f'({i+1}, {step_key})', {}) - plugin_ns_i = sub_wic.get('wic', {}).get('namespace', 'global') - stem = Path(step_key).stem - - # if step_key in subkeys: # and not is_root, but the former implies the latter - # plugin_id = upload_plugin(args.compute_url, access_token, cwl_tree_run, yaml_stem) - if stem in sub_rose_trees: - subworkflow_id = upload_all(sub_rose_trees[stem], tools, args, False) - run_val = f'pipeline:{stem}:{__version__}' - else: - # i.e. If this is either a primitive CommandLineTool and/or - # a 'primitive' Workflow that we did NOT recursively generate. - # delete_previously_uploaded(args, 'plugins', stem) - step_id = StepId(stem, plugin_ns_i) - tool_i = tools[step_id].cwl - plugin_id = upload_plugin(args.compute_url, access_token, tool_i, stem) - run_val = f'plugin:{stem}:{__version__}' - step_name_i = utils.step_name_str(yaml_stem, i, step_key) - step_name_i = step_name_i.replace('.wic', '_wic') # Due to calling remove_dot_dollar above - cwl_tree_run['steps'][step_name_i]['run'] = run_val - - workflow_id: str = '' - if is_root: - compute_workflow = { - "name": yaml_stem, - # "version": __version__, # no version for workflows - "driver": args.compute_driver, - "cwlJobInputs": yaml_inputs_no_dd, - **cwl_tree_run - } - # Use http POST request to upload a complete Workflow (w/ inputs) and get its id hash. - response = requests.post(args.compute_url + '/compute/workflows', - headers={'Authorization': f'Bearer {access_token}'}, - json=compute_workflow, timeout=TIMEOUT) - r_json = response.json() - print('post response') - j = r_json - print(f"id {j.get('id')} class {j.get('class')} name {j.get('name')}") - if 'id' not in r_json: - pretty_print_request(response.request) - print(r_json) - raise Exception(f'Error! Labshare workflow upload failed for {yaml_stem}.') - workflow_id = r_json['id'] # hash - else: - # "owner": "string", - # "additionalProp1": {} - # TODO: Check this. - compute_pipeline = { - "name": yaml_stem, - "version": __version__, - **cwl_tree_run - } - # Need to add owner and/or additionalProp1 ? - # Need to remove headers and/or requirements? i.e. - # Use 1.0 because cromwell only supports 1.0 and we are not using 1.1 / 1.2 features. - # Eventually we will want to use 1.2 to support conditional workflows - # yaml_tree['cwlVersion'] = 'v1.0' - # yaml_tree['class'] = 'Workflow' - # yaml_tree['requirements'] = subworkreqdict - - # delete_previously_uploaded(args, 'pipelines', yaml_stem) - # Use http POST request to upload a subworkflow / "pipeline" (no inputs) and get its id hash. - response = requests.post(args.compute_url + '/compute/pipelines', - headers={'Authorization': f'Bearer {access_token}'}, - json=compute_pipeline, timeout=TIMEOUT) - r_json = response.json() - - # {'error': {'statusCode': 422, 'name': 'UnprocessableEntityError', - # 'message': 'A Plugin with name ... and version ... already exists.'}} - already_uploaded = r_json.get('error', {}).get('statusCode', {}) == 422 - if already_uploaded: - return '-1' - - print('post response') - j = r_json - print(f"id {j.get('id')} class {j.get('class')} name {j.get('name')}") - if 'id' not in r_json: - pretty_print_request(response.request) - print(r_json) - raise Exception(f'Error! Labshare workflow upload failed for {yaml_stem}.') - workflow_id = r_json['id'] # hash - # if is_root: - # print_plugins(args.compute_url) - - return workflow_id diff --git a/src/wic/main.py b/src/wic/main.py index fa4897ac..11796425 100644 --- a/src/wic/main.py +++ b/src/wic/main.py @@ -11,7 +11,7 @@ from wic.utils_yaml import wic_loader from . import input_output as io -from . import ast, cli, compiler, inference, inlineing, labshare, plugins, run_local, utils # , utils_graphs +from . import ast, cli, compiler, inference, inlineing, plugins, run_local, utils # , utils_graphs from .schemas import wic_schema from .wic_types import GraphData, GraphReps, Json, StepId, Yaml, YamlTree @@ -161,14 +161,6 @@ def main() -> None: rose_tree = plugins.cwl_update_outputs_optional_rosetree(rose_tree) io.write_to_disk(rose_tree, Path('autogenerated/'), True, args.inputs_file) - if args.run_compute: - # Inline compiled CWL if necessary, i.e. inline across scattering boundaries. - # NOTE: Since we need to distribute scattering operations across all dependencies, - # and due to inference, this cannot be done before compilation. - rose_tree = inlineing.inline_subworkflow_cwl(rose_tree) - io.write_to_disk(rose_tree, Path('autogenerated/'), True, args.inputs_file) - labshare.upload_all(rose_tree, tools_cwl, args, True) - if args.graphviz: if shutil.which('dot'): # Render the GraphViz diagram