Skip to content

Commit

Permalink
REST API : wfb to wic transformation and compile (with tests) (#270)
Browse files Browse the repository at this point in the history
Co-authored-by: Vasu Jaganath <vasu.jaganath@axleinfo.com>
  • Loading branch information
vjaganat90 and Vasu Jaganath authored Sep 10, 2024
1 parent 0cd1f04 commit 49935d1
Show file tree
Hide file tree
Showing 6 changed files with 394 additions and 263 deletions.
87 changes: 19 additions & 68 deletions src/sophios/api/http/restapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from sophios.cli import get_args
from sophios.wic_types import CompilerInfo, Json, Tool, Tools, StepId, YamlTree, Cwl, NodeData
from sophios.api.utils import converter
import sophios.plugins as plugins
# from .auth.auth import authenticate


Expand All @@ -39,21 +40,6 @@ def remove_dot_dollar(tree: Cwl) -> Cwl:
return tree_no_dd


def get_yaml_tree(req: Json) -> Json:
"""
Get the Sophios yaml tree from incoming JSON
Args:
req (JSON): A raw JSON content of incoming JSON object
Returns:
Cwl: A Cwl document with . and $ removed from $namespaces and $schemas
"""
wkflw_name = "generic_workflow"
# args = converter.get_args(wkflw_name)
# yaml_tree_json: Json = converter.wfb_to_wic(req)
yaml_tree_json: Json = {}
return yaml_tree_json


def run_workflow(compiler_info: CompilerInfo, args: argparse.Namespace) -> int:
"""
Get the Sophios yaml tree from incoming JSON
Expand Down Expand Up @@ -108,25 +94,31 @@ async def compile_wf(request: Request) -> Json:
# ========= PROCESS REQUEST OBJECT ==========
req: Json = await request.json()
# clean up and convert the incoming object
# schema preserving
req = converter.raw_wfb_to_lean_wfb(req)
# schema non-preserving
workflow_temp = converter.wfb_to_wic(req)
wkflw_name = "generic_workflow"
args = get_args(wkflw_name)

workflow_temp = {}
if req["links"] != []:
for node in req["nodes"]:
workflow_temp["id"] = node["id"]
workflow_temp["step"] = node["cwlScript"] # Assume dict form
else: # A single node workflow
node = req["nodes"][0]
workflow_temp = node["cwlScript"]
args = get_args(wkflw_name, ['--inline_cwl_runtag'])

# Build canonical workflow object
workflow_can = utils_cwl.desugar_into_canonical_normal_form(workflow_temp)

# ========= BUILD WIC COMPILE INPUT =========
tools_cwl: Tools = {StepId(content["id"], "global"):
Tool(".", content["run"]) for content in workflow_can["steps"]}
# Build a list of CLTs
# The default list
tools_cwl: Tools = {}
global_config = input_output.get_config(Path(args.config_file), Path(args.homedir)/'wic'/'global_config.json')
tools_cwl = plugins.get_tools_cwl(global_config,
args.validate_plugins,
not args.no_skip_dollar_schemas,
args.quiet)
# Add to the default list if the tool is 'inline' in run tag
# run tag will have the actual CommandLineTool
for can_step in workflow_can["steps"]:
if can_step.get("run", None):
# add a new tool
tools_cwl[StepId(can_step["id"], "global")] = Tool(".", can_step["run"])
wic_obj = {'wic': workflow_can.get('wic', {})}
plugin_ns = wic_obj['wic'].get('namespace', 'global')

Expand All @@ -152,14 +144,6 @@ async def compile_wf(request: Request) -> Json:

# Convert the compiled yaml file to json for labshare Compute.
cwl_tree_run = copy.deepcopy(cwl_tree_no_dd)
# for step_key in cwl_tree['steps']:
# step_name_i = step_key
# step_name_i = step_name_i.replace('.yml', '_yml') # Due to calling remove_dot_dollar above
# # step_name = '__'.join(step_key.split('__')[3:]) # Remove prefix
# # Get step CWL from templates
# # run_val = next((tval['cwlScript']
# # for _, tval in ict_plugins.items() if step_name == tval['name']), None)
# # cwl_tree_run['steps'][step_name_i]['run'] = run_val

compute_workflow: Json = {}
compute_workflow = {
Expand All @@ -173,36 +157,3 @@ async def compile_wf(request: Request) -> Json:

if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=3000)


# # ========= PROCESS COMPILED OBJECT =========
# sub_node_data: NodeData = compiler_info.rose.data
# yaml_stem = sub_node_data.name
# cwl_tree = sub_node_data.compiled_cwl
# yaml_inputs = sub_node_data.workflow_inputs_file

# # ======== OUTPUT PROCESSING ================
# cwl_tree_no_dd = remove_dot_dollar(cwl_tree)
# yaml_inputs_no_dd = remove_dot_dollar(yaml_inputs)

# # Convert the compiled yaml file to json for labshare Compute.
# cwl_tree_run = copy.deepcopy(cwl_tree_no_dd)
# for step_key in cwl_tree['steps']:
# step_name_i = step_key
# step_name_i = step_name_i.replace('.yml', '_yml') # Due to calling remove_dot_dollar above
# step_name = '__'.join(step_key.split('__')[3:]) # Remove prefix

# # Get step CWL from templates
# run_val = next((tval['cwlScript']
# for _, tval in ict_plugins.items() if step_name == tval['name']), None)
# cwl_tree_run['steps'][step_name_i]['run'] = run_val

# TODO: set name and driver in workflow builder ui
# compute_workflow: Json = {}
# compute_workflow = {
# "name": yaml_stem,
# "driver": "argo",
# # "driver": "cwltool",
# "cwlJobInputs": yaml_inputs_no_dd,
# **cwl_tree_run
# }
129 changes: 69 additions & 60 deletions src/sophios/api/utils/converter.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import copy
from typing import Any, Dict, List

import yaml
from jsonschema import Draft202012Validator
from sophios.utils_yaml import wic_loader

# from sophios import cli
from sophios.wic_types import Json
from sophios.wic_types import Json, Cwl

SCHEMA: Json = {
"$schema": "http://json-schema.org/draft-07/schema#",
Expand Down Expand Up @@ -153,7 +153,7 @@ def raw_wfb_to_lean_wfb(inp: Json) -> Json:
prop_req = SCHEMA['required']
nodes_req = SCHEMA['definitions']['NodeX']['required']
links_req = SCHEMA['definitions']['Link']['required']
do_not_rem_nodes_prop = ['cwlScript']
do_not_rem_nodes_prop = ['cwlScript', 'run']
do_not_rem_links_prop: list = []

for k in keys:
Expand All @@ -174,59 +174,68 @@ def raw_wfb_to_lean_wfb(inp: Json) -> Json:
return inp_restrict


# def wfb_to_wic(request: Json) -> Json:
# """Convert the json object from http request object to a json object that can be used as input to wic compiler.

# Args:
# request (Json): json object from http request

# Returns:
# converted_json (Json): json object that can be used as input to wic compiler"""

# converted_steps: list[Any] = []

# for step in request['steps']:
# step_template = step['template']
# arguments = step['arguments']
# # Get the template name from the step template
# template_name = next((tval['name']
# for tname, tval in request['templates'].items() if step_template == tname), None)
# # template_name = None
# # for tname, tval in request['templates'].items():
# # if tname == step_template and tval['name']:
# # template_name = tval['name']
# # break
# # elif tname == step_template and not tval['name']:
# # break
# # else:
# # pass

# converted_step: Json = {}
# if template_name:
# converted_step[template_name] = {
# "in": {}
# }

# for key, value in arguments.items():
# # be aware of usage of periods in the values as delimiters, this may cause an issue when storing in MongoDB
# if value.startswith("steps."):
# parts = value.split('.')
# src_step_idx = int(parts[1][len("step"):])
# src_output_key = parts[3]

# # Get the src template name from the src step name
# src_template = next((step.get("template")
# for step in request['steps'] if step.get("name") == parts[1]), None)
# src_template_name = next((stval['name'] for stname, stval in request['templates'].items()
# if src_template == stname), None)
# src_converted_step = next((step.get(src_template_name)
# for step in converted_steps if step.get(src_template_name)), None)
# if src_converted_step:
# src_converted_step["in"][src_output_key] = f"&{src_template_name}.{src_output_key}.{src_step_idx}"
# converted_step[template_name]["in"][key] = f"*{src_template_name}.{src_output_key}.{src_step_idx}"
# else:
# converted_step[template_name]["in"][key] = value
# converted_steps.append(converted_step)

# converted_json: Json = {"steps": converted_steps}
# return converted_json
def wfb_to_wic(inp: Json) -> Cwl:
"""convert lean wfb json to compliant wic"""
# non-schema preserving changes
inp_restrict = copy.deepcopy(inp)

for node in inp_restrict['nodes']:
if node.get('settings'):
node['in'] = node['settings'].get('inputs')
if node['settings'].get('outputs'):
node['out'] = list({k: yaml.load('!& ' + v, Loader=wic_loader())} for k, v in node['settings']
['outputs'].items()) # outputs always have to be list
# remove these (now) superfluous keys
node.pop('settings', None)
node.pop('pluginId', None)
node.pop('internal', None)

# setting the inputs of the non-sink nodes i.e. whose input doesn't depend on any other node's output
# first get all target node ids
target_node_ids = []
for edg in inp_restrict['links']:
target_node_ids.append(edg['targetId'])
# now set inputs on non-sink nodes as inline input '!ii '
# if inputs exist
non_sink_nodes = [node for node in inp_restrict['nodes'] if node['id'] not in target_node_ids]
for node in non_sink_nodes:
if node.get('in'):
for nkey in node['in']:
node['in'][nkey] = yaml.load('!ii ' + node['in'][nkey], Loader=wic_loader())

# After outs are set
for edg in inp_restrict['links']:
# links = edge. nodes and edges is the correct terminology!
src_id = edg['sourceId']
tgt_id = edg['targetId']
src_node = next((node for node in inp_restrict['nodes'] if node['id'] == src_id), None)
tgt_node = next((node for node in inp_restrict['nodes'] if node['id'] == tgt_id), None)
assert src_node, f'output(s) of source node of edge{edg} must exist!'
assert tgt_node, f'input(s) of target node of edge{edg} must exist!'
# flattened list of keys
if src_node.get('out') and tgt_node.get('in'):
src_out_keys = [sk for sout in src_node['out'] for sk in sout.keys()]
tgt_in_keys = tgt_node['in'].keys()
# we match the source output tag type to target input tag type
# and connect them through '!* ' for input, all outputs are '!& ' before this
for sk in src_out_keys:
tgt_node['in'][sk] = yaml.load('!* ' + tgt_node['in'][sk], Loader=wic_loader())
# the inputs which aren't dependent on previous/other steps
# they are by default inline input
diff_keys = set(tgt_in_keys) - set(src_out_keys)
for dfk in diff_keys:
tgt_node['in'][dfk] = yaml.load('!ii ' + tgt_node['in'][dfk], Loader=wic_loader())

for node in inp_restrict['nodes']:
node['id'] = node['name'] # just reuse name as node's id, wic id is same as wfb name
node.pop('name', None)

workflow_temp: Cwl = {}
if inp_restrict["links"] != []:
workflow_temp["steps"] = []
for node in inp_restrict["nodes"]:
workflow_temp["steps"].append(node) # node["cwlScript"] # Assume dict form
else: # A single node workflow
node = inp_restrict["nodes"][0]
workflow_temp = node["cwlScript"]
return workflow_temp
76 changes: 76 additions & 0 deletions tests/rest_wfb_objects/multi_node.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
{
"nodes": [
{
"id": 7,
"x": 462,
"y": 206,
"z": 2,
"name": "touch",
"pluginId": "touch",
"height": 50,
"width": 250,
"settings": {
"inputs": {
"filename": "empty.txt"
},
"outputs": {
"file": "file_touch"
}
},
"internal": false
},
{
"id": 18,
"x": 155,
"y": 195,
"z": 1,
"name": "append",
"pluginId": "append",
"height": 50,
"width": 250,
"settings": {
"inputs": {
"str": "Hello",
"file": "file_touch"
},
"outputs": {
"file": "file_append1"
}
},
"internal": false
},
{
"id": 9,
"x": 790.3254637299812,
"y": 449.8103498684344,
"z": 5,
"name": "append",
"pluginId": "append",
"height": 50,
"width": 250,
"settings": {
"inputs": {
"str": "World!",
"file": "file_append1"
},
"outputs": {
"file": "file_append2"
}
},
"internal": false
}
],
"links": [
{
"sourceId": 7,
"targetId": 18,
"id": 1
},
{
"sourceId": 18,
"targetId": 9,
"id": 5
}
],
"selection": []
}
Loading

0 comments on commit 49935d1

Please sign in to comment.