From 27547af44e5babd9bd8c2fa14b9105fa125148cd Mon Sep 17 00:00:00 2001 From: Vasu Jaganath Date: Mon, 19 Aug 2024 00:47:10 -0400 Subject: [PATCH] REST API : wfb to wic transformation and compile (with tests) --- src/sophios/api/http/restapi.py | 87 ++------- src/sophios/api/utils/converter.py | 129 ++++++------ tests/rest_wfb_objects/multi_node.json | 76 ++++++++ .../multi_node_inline_cwl.json | 181 +++++++++++++++++ .../single_node.json} | 0 tests/test_rest_core.py | 184 +++++------------- 6 files changed, 394 insertions(+), 263 deletions(-) create mode 100644 tests/rest_wfb_objects/multi_node.json create mode 100644 tests/rest_wfb_objects/multi_node_inline_cwl.json rename tests/{single_node_helloworld.json => rest_wfb_objects/single_node.json} (100%) diff --git a/src/sophios/api/http/restapi.py b/src/sophios/api/http/restapi.py index f7e49b8c..f2bb950f 100644 --- a/src/sophios/api/http/restapi.py +++ b/src/sophios/api/http/restapi.py @@ -16,6 +16,7 @@ from sophios.cli import get_args from sophios.wic_types import CompilerInfo, Json, Tool, Tools, StepId, YamlTree, Cwl, NodeData from sophios.api.utils import converter +import sophios.plugins as plugins # from .auth.auth import authenticate @@ -39,21 +40,6 @@ def remove_dot_dollar(tree: Cwl) -> Cwl: return tree_no_dd -def get_yaml_tree(req: Json) -> Json: - """ - Get the Sophios yaml tree from incoming JSON - Args: - req (JSON): A raw JSON content of incoming JSON object - Returns: - Cwl: A Cwl document with . and $ removed from $namespaces and $schemas - """ - wkflw_name = "generic_workflow" - # args = converter.get_args(wkflw_name) - # yaml_tree_json: Json = converter.wfb_to_wic(req) - yaml_tree_json: Json = {} - return yaml_tree_json - - def run_workflow(compiler_info: CompilerInfo, args: argparse.Namespace) -> int: """ Get the Sophios yaml tree from incoming JSON @@ -108,25 +94,31 @@ async def compile_wf(request: Request) -> Json: # ========= PROCESS REQUEST OBJECT ========== req: Json = await request.json() # clean up and convert the incoming object + # schema preserving req = converter.raw_wfb_to_lean_wfb(req) + # schema non-preserving + workflow_temp = converter.wfb_to_wic(req) wkflw_name = "generic_workflow" - args = get_args(wkflw_name) - - workflow_temp = {} - if req["links"] != []: - for node in req["nodes"]: - workflow_temp["id"] = node["id"] - workflow_temp["step"] = node["cwlScript"] # Assume dict form - else: # A single node workflow - node = req["nodes"][0] - workflow_temp = node["cwlScript"] + args = get_args(wkflw_name, ['--inline_cwl_runtag']) + # Build canonical workflow object workflow_can = utils_cwl.desugar_into_canonical_normal_form(workflow_temp) # ========= BUILD WIC COMPILE INPUT ========= - tools_cwl: Tools = {StepId(content["id"], "global"): - Tool(".", content["run"]) for content in workflow_can["steps"]} + # Build a list of CLTs + # The default list + tools_cwl: Tools = {} + global_config = input_output.get_config(Path(args.config_file), Path(args.homedir)/'wic'/'global_config.json') + tools_cwl = plugins.get_tools_cwl(global_config, + args.validate_plugins, + not args.no_skip_dollar_schemas, + args.quiet) + # Add to the default list if the tool is 'inline' in run tag # run tag will have the actual CommandLineTool + for can_step in workflow_can["steps"]: + if can_step.get("run", None): + # add a new tool + tools_cwl[StepId(can_step["id"], "global")] = Tool(".", can_step["run"]) wic_obj = {'wic': workflow_can.get('wic', {})} plugin_ns = wic_obj['wic'].get('namespace', 'global') @@ -152,14 +144,6 @@ async def compile_wf(request: Request) -> Json: # Convert the compiled yaml file to json for labshare Compute. cwl_tree_run = copy.deepcopy(cwl_tree_no_dd) - # for step_key in cwl_tree['steps']: - # step_name_i = step_key - # step_name_i = step_name_i.replace('.yml', '_yml') # Due to calling remove_dot_dollar above - # # step_name = '__'.join(step_key.split('__')[3:]) # Remove prefix - # # Get step CWL from templates - # # run_val = next((tval['cwlScript'] - # # for _, tval in ict_plugins.items() if step_name == tval['name']), None) - # # cwl_tree_run['steps'][step_name_i]['run'] = run_val compute_workflow: Json = {} compute_workflow = { @@ -173,36 +157,3 @@ async def compile_wf(request: Request) -> Json: if __name__ == '__main__': uvicorn.run(app, host="0.0.0.0", port=3000) - - -# # ========= PROCESS COMPILED OBJECT ========= - # sub_node_data: NodeData = compiler_info.rose.data - # yaml_stem = sub_node_data.name - # cwl_tree = sub_node_data.compiled_cwl - # yaml_inputs = sub_node_data.workflow_inputs_file - - # # ======== OUTPUT PROCESSING ================ - # cwl_tree_no_dd = remove_dot_dollar(cwl_tree) - # yaml_inputs_no_dd = remove_dot_dollar(yaml_inputs) - - # # Convert the compiled yaml file to json for labshare Compute. - # cwl_tree_run = copy.deepcopy(cwl_tree_no_dd) - # for step_key in cwl_tree['steps']: - # step_name_i = step_key - # step_name_i = step_name_i.replace('.yml', '_yml') # Due to calling remove_dot_dollar above - # step_name = '__'.join(step_key.split('__')[3:]) # Remove prefix - - # # Get step CWL from templates - # run_val = next((tval['cwlScript'] - # for _, tval in ict_plugins.items() if step_name == tval['name']), None) - # cwl_tree_run['steps'][step_name_i]['run'] = run_val - - # TODO: set name and driver in workflow builder ui - # compute_workflow: Json = {} - # compute_workflow = { - # "name": yaml_stem, - # "driver": "argo", - # # "driver": "cwltool", - # "cwlJobInputs": yaml_inputs_no_dd, - # **cwl_tree_run - # } diff --git a/src/sophios/api/utils/converter.py b/src/sophios/api/utils/converter.py index 67289d87..8a1e5ba6 100644 --- a/src/sophios/api/utils/converter.py +++ b/src/sophios/api/utils/converter.py @@ -1,10 +1,10 @@ import copy from typing import Any, Dict, List - +import yaml from jsonschema import Draft202012Validator +from sophios.utils_yaml import wic_loader -# from sophios import cli -from sophios.wic_types import Json +from sophios.wic_types import Json, Cwl SCHEMA: Json = { "$schema": "http://json-schema.org/draft-07/schema#", @@ -153,7 +153,7 @@ def raw_wfb_to_lean_wfb(inp: Json) -> Json: prop_req = SCHEMA['required'] nodes_req = SCHEMA['definitions']['NodeX']['required'] links_req = SCHEMA['definitions']['Link']['required'] - do_not_rem_nodes_prop = ['cwlScript'] + do_not_rem_nodes_prop = ['cwlScript', 'run'] do_not_rem_links_prop: list = [] for k in keys: @@ -174,59 +174,68 @@ def raw_wfb_to_lean_wfb(inp: Json) -> Json: return inp_restrict -# def wfb_to_wic(request: Json) -> Json: -# """Convert the json object from http request object to a json object that can be used as input to wic compiler. - -# Args: -# request (Json): json object from http request - -# Returns: -# converted_json (Json): json object that can be used as input to wic compiler""" - -# converted_steps: list[Any] = [] - -# for step in request['steps']: -# step_template = step['template'] -# arguments = step['arguments'] -# # Get the template name from the step template -# template_name = next((tval['name'] -# for tname, tval in request['templates'].items() if step_template == tname), None) -# # template_name = None -# # for tname, tval in request['templates'].items(): -# # if tname == step_template and tval['name']: -# # template_name = tval['name'] -# # break -# # elif tname == step_template and not tval['name']: -# # break -# # else: -# # pass - -# converted_step: Json = {} -# if template_name: -# converted_step[template_name] = { -# "in": {} -# } - -# for key, value in arguments.items(): -# # be aware of usage of periods in the values as delimiters, this may cause an issue when storing in MongoDB -# if value.startswith("steps."): -# parts = value.split('.') -# src_step_idx = int(parts[1][len("step"):]) -# src_output_key = parts[3] - -# # Get the src template name from the src step name -# src_template = next((step.get("template") -# for step in request['steps'] if step.get("name") == parts[1]), None) -# src_template_name = next((stval['name'] for stname, stval in request['templates'].items() -# if src_template == stname), None) -# src_converted_step = next((step.get(src_template_name) -# for step in converted_steps if step.get(src_template_name)), None) -# if src_converted_step: -# src_converted_step["in"][src_output_key] = f"&{src_template_name}.{src_output_key}.{src_step_idx}" -# converted_step[template_name]["in"][key] = f"*{src_template_name}.{src_output_key}.{src_step_idx}" -# else: -# converted_step[template_name]["in"][key] = value -# converted_steps.append(converted_step) - -# converted_json: Json = {"steps": converted_steps} -# return converted_json +def wfb_to_wic(inp: Json) -> Cwl: + """convert lean wfb json to compliant wic""" + # non-schema preserving changes + inp_restrict = copy.deepcopy(inp) + + for node in inp_restrict['nodes']: + if node.get('settings'): + node['in'] = node['settings'].get('inputs') + if node['settings'].get('outputs'): + node['out'] = list({k: yaml.load('!& ' + v, Loader=wic_loader())} for k, v in node['settings'] + ['outputs'].items()) # outputs always have to be list + # remove these (now) superfluous keys + node.pop('settings', None) + node.pop('pluginId', None) + node.pop('internal', None) + + # setting the inputs of the non-sink nodes i.e. whose input doesn't depend on any other node's output + # first get all target node ids + target_node_ids = [] + for edg in inp_restrict['links']: + target_node_ids.append(edg['targetId']) + # now set inputs on non-sink nodes as inline input '!ii ' + # if inputs exist + non_sink_nodes = [node for node in inp_restrict['nodes'] if node['id'] not in target_node_ids] + for node in non_sink_nodes: + if node.get('in'): + for nkey in node['in']: + node['in'][nkey] = yaml.load('!ii ' + node['in'][nkey], Loader=wic_loader()) + + # After outs are set + for edg in inp_restrict['links']: + # links = edge. nodes and edges is the correct terminology! + src_id = edg['sourceId'] + tgt_id = edg['targetId'] + src_node = next((node for node in inp_restrict['nodes'] if node['id'] == src_id), None) + tgt_node = next((node for node in inp_restrict['nodes'] if node['id'] == tgt_id), None) + assert src_node, f'output(s) of source node of edge{edg} must exist!' + assert tgt_node, f'input(s) of target node of edge{edg} must exist!' + # flattened list of keys + if src_node.get('out') and tgt_node.get('in'): + src_out_keys = [sk for sout in src_node['out'] for sk in sout.keys()] + tgt_in_keys = tgt_node['in'].keys() + # we match the source output tag type to target input tag type + # and connect them through '!* ' for input, all outputs are '!& ' before this + for sk in src_out_keys: + tgt_node['in'][sk] = yaml.load('!* ' + tgt_node['in'][sk], Loader=wic_loader()) + # the inputs which aren't dependent on previous/other steps + # they are by default inline input + diff_keys = set(tgt_in_keys) - set(src_out_keys) + for dfk in diff_keys: + tgt_node['in'][dfk] = yaml.load('!ii ' + tgt_node['in'][dfk], Loader=wic_loader()) + + for node in inp_restrict['nodes']: + node['id'] = node['name'] # just reuse name as node's id, wic id is same as wfb name + node.pop('name', None) + + workflow_temp: Cwl = {} + if inp_restrict["links"] != []: + workflow_temp["steps"] = [] + for node in inp_restrict["nodes"]: + workflow_temp["steps"].append(node) # node["cwlScript"] # Assume dict form + else: # A single node workflow + node = inp_restrict["nodes"][0] + workflow_temp = node["cwlScript"] + return workflow_temp diff --git a/tests/rest_wfb_objects/multi_node.json b/tests/rest_wfb_objects/multi_node.json new file mode 100644 index 00000000..ffe42014 --- /dev/null +++ b/tests/rest_wfb_objects/multi_node.json @@ -0,0 +1,76 @@ +{ + "nodes": [ + { + "id": 7, + "x": 462, + "y": 206, + "z": 2, + "name": "touch", + "pluginId": "touch", + "height": 50, + "width": 250, + "settings": { + "inputs": { + "filename": "empty.txt" + }, + "outputs": { + "file": "file_touch" + } + }, + "internal": false + }, + { + "id": 18, + "x": 155, + "y": 195, + "z": 1, + "name": "append", + "pluginId": "append", + "height": 50, + "width": 250, + "settings": { + "inputs": { + "str": "Hello", + "file": "file_touch" + }, + "outputs": { + "file": "file_append1" + } + }, + "internal": false + }, + { + "id": 9, + "x": 790.3254637299812, + "y": 449.8103498684344, + "z": 5, + "name": "append", + "pluginId": "append", + "height": 50, + "width": 250, + "settings": { + "inputs": { + "str": "World!", + "file": "file_append1" + }, + "outputs": { + "file": "file_append2" + } + }, + "internal": false + } + ], + "links": [ + { + "sourceId": 7, + "targetId": 18, + "id": 1 + }, + { + "sourceId": 18, + "targetId": 9, + "id": 5 + } + ], + "selection": [] +} \ No newline at end of file diff --git a/tests/rest_wfb_objects/multi_node_inline_cwl.json b/tests/rest_wfb_objects/multi_node_inline_cwl.json new file mode 100644 index 00000000..c972ee36 --- /dev/null +++ b/tests/rest_wfb_objects/multi_node_inline_cwl.json @@ -0,0 +1,181 @@ +{ + "nodes": [ + { + "id": 7, + "x": 462, + "y": 206, + "z": 2, + "name": "touch", + "pluginId": "touch", + "height": 50, + "width": 250, + "settings": { + "inputs": { + "filename": "empty.txt" + }, + "outputs": { + "file": "file_touch" + } + }, + "run": { + "cwlVersion": "v1.0", + "class": "CommandLineTool", + "requirements": { + "DockerRequirement": { + "dockerPull": "docker.io/bash:4.4" + }, + "InlineJavascriptRequirement": {} + }, + "baseCommand": "touch", + "inputs": { + "filename": { + "type": "string", + "inputBinding": { + "position": 1 + } + } + }, + "outputs": { + "file": { + "type": "File", + "outputBinding": { + "glob": "$(inputs.filename)" + } + } + } + }, + "internal": false + }, + { + "id": 18, + "x": 155, + "y": 195, + "z": 1, + "name": "append", + "pluginId": "append", + "height": 50, + "width": 250, + "settings": { + "inputs": { + "str": "Hello", + "file": "file_touch" + }, + "outputs": { + "file": "file_append1" + } + }, + "run": { + "class": "CommandLineTool", + "cwlVersion": "v1.0", + "requirements": { + "ShellCommandRequirement": {}, + "InlineJavascriptRequirement": {}, + "InitialWorkDirRequirement": { + "listing": [ + "$(inputs.file)" + ] + } + }, + "inputs": { + "str": { + "type": "string", + "inputBinding": { + "shellQuote": false, + "position": 1, + "prefix": "echo" + } + }, + "file": { + "type": "File", + "inputBinding": { + "shellQuote": false, + "position": 2, + "prefix": ">>" + } + } + }, + "outputs": { + "file": { + "type": "File", + "outputBinding": { + "glob": "$(inputs.file.basename)" + } + } + } + }, + "internal": false + }, + { + "id": 9, + "x": 790.3254637299812, + "y": 449.8103498684344, + "z": 5, + "name": "append", + "pluginId": "append", + "height": 50, + "width": 250, + "settings": { + "inputs": { + "str": "World!", + "file": "file_append1" + }, + "outputs": { + "file": "file_append2" + } + }, + "run": { + "class": "CommandLineTool", + "cwlVersion": "v1.0", + "requirements": { + "ShellCommandRequirement": {}, + "InlineJavascriptRequirement": {}, + "InitialWorkDirRequirement": { + "listing": [ + "$(inputs.file)" + ] + } + }, + "inputs": { + "str": { + "type": "string", + "inputBinding": { + "shellQuote": false, + "position": 1, + "prefix": "echo" + } + }, + "file": { + "type": "File", + "inputBinding": { + "shellQuote": false, + "position": 2, + "prefix": ">>" + } + } + }, + "outputs": { + "file": { + "type": "File", + "outputBinding": { + "glob": "$(inputs.file.basename)" + } + } + } + }, + "internal": false + } + ], + "links": [ + { + "sourceId": 7, + "targetId": 18, + "id": 1 + }, + { + "sourceId": 18, + "targetId": 9, + "id": 5 + } + ], + "selection": [] +} \ No newline at end of file diff --git a/tests/single_node_helloworld.json b/tests/rest_wfb_objects/single_node.json similarity index 100% rename from tests/single_node_helloworld.json rename to tests/rest_wfb_objects/single_node.json diff --git a/tests/test_rest_core.py b/tests/test_rest_core.py index f85bf2e9..54639d02 100644 --- a/tests/test_rest_core.py +++ b/tests/test_rest_core.py @@ -1,12 +1,6 @@ import json -# import subprocess as sub from pathlib import Path -# import signal -# import sys -# from typing import List -# import argparse import asyncio -from jsonschema import Draft202012Validator from fastapi import Request @@ -16,141 +10,61 @@ from sophios.api.http import restapi -SCHEMA = { - "$schema": "http://json-schema.org/draft-07/schema#", - "definitions": { - "Link": { - "properties": { - "id": { - "type": "number" - }, - "inletIndex": { - "type": "number" - }, - "outletIndex": { - "type": "number" - }, - "sourceId": { - "type": "number" - }, - "targetId": { - "type": "number" - }, - "x1": { - "type": "number" - }, - "x2": { - "type": "number" - }, - "y1": { - "type": "number" - }, - "y2": { - "type": "number" - } - }, - "type": "object", - "required": ["id", "inletIndex", "outletIndex", "sourceId", "targetId"] - }, - "NodeSettings": { - "properties": { - "inputs": { - "additionalProperties": { - "$ref": "#/definitions/T" - }, - "type": "object" - }, - "outputs": { - "additionalProperties": { - "$ref": "#/definitions/T" - }, - "type": "object" - } - }, - "type": "object" - }, - "NodeX": { - "properties": { - "expanded": { - "type": "boolean" - }, - "height": { - "type": "number" - }, - "id": { - "type": "number" - }, - "internal": { - "type": "boolean" - }, - "name": { - "type": "string" - }, - "pluginId": { - "type": "string" - }, - "settings": { - "$ref": "#/definitions/NodeSettings" - }, - "width": { - "type": "number" - }, - "x": { - "type": "number" - }, - "y": { - "type": "number" - }, - "z": { - "type": "number" - }, - }, - "type": "object", - "required": ["id", "name", "pluginId", "settings", "internal"] - }, - "T": { - "type": "object" - } - }, - "properties": { - "links": { - "items": { - "$ref": "#/definitions/Link" - }, - "type": "array" - }, - "nodes": { - "items": { - "$ref": "#/definitions/NodeX" - }, - "type": "array" - }, - "selection": { - "items": { - "type": "number" - }, - "type": "array" - } - }, - "type": "object", - "required": ["links", "nodes"] -} - @pytest.mark.fast def test_rest_core_single_node() -> None: - """A simple single node 'hello world' test""" - # validate schema - Draft202012Validator.check_schema(SCHEMA) - df2012 = Draft202012Validator(SCHEMA) - inp_file = "single_node_helloworld.json" + """A simple single node sophios/restapi test""" + inp_file = "single_node.json" + inp: Json = {} + inp_path = Path(__file__).parent / 'rest_wfb_objects' / inp_file + with open(inp_path, 'r', encoding='utf-8') as f: + inp = json.load(f) + print('----------- from rest api ----------- \n\n') + scope = {} + scope['type'] = 'http' + + async def receive() -> Json: + inp_byte = json.dumps(inp).encode('utf-8') + return {"type": "http.request", "body": inp_byte} + + # create a request object and pack it with our json payload + req: Request = Request(scope) + req._receive = receive + res: Json = asyncio.run(restapi.compile_wf(req)) # call to rest api + assert int(res['retval']) == 0 + + +@pytest.mark.fast +def test_rest_core_multi_node() -> None: + """A simple multi node sophios/restapi test""" + inp_file = "multi_node.json" + inp: Json = {} + inp_path = Path(__file__).parent / 'rest_wfb_objects' / inp_file + with open(inp_path, 'r', encoding='utf-8') as f: + inp = json.load(f) + print('----------- from rest api ----------- \n\n') + scope = {} + scope['type'] = 'http' + + async def receive() -> Json: + inp_byte = json.dumps(inp).encode('utf-8') + return {"type": "http.request", "body": inp_byte} + + # create a request object and pack it with our json payload + req: Request = Request(scope) + req._receive = receive + res: Json = asyncio.run(restapi.compile_wf(req)) # call to rest api + assert int(res['retval']) == 0 + + +@pytest.mark.fast +def test_rest_core_multi_node_inline_cwl() -> None: + """A simple multi node (inline cwl) sophios/restapi test""" + inp_file = "multi_node_inline_cwl.json" inp: Json = {} - yaml_path = "workflow.json" - inp_path = Path(__file__).with_name(inp_file) + inp_path = Path(__file__).parent / 'rest_wfb_objects' / inp_file with open(inp_path, 'r', encoding='utf-8') as f: inp = json.load(f) - # check if object is conformant with our schema - df2012.is_valid(inp) print('----------- from rest api ----------- \n\n') scope = {} scope['type'] = 'http'