Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REST API : Object transformation #270

Merged
merged 1 commit into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 19 additions & 68 deletions src/sophios/api/http/restapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from sophios.cli import get_args
from sophios.wic_types import CompilerInfo, Json, Tool, Tools, StepId, YamlTree, Cwl, NodeData
from sophios.api.utils import converter
import sophios.plugins as plugins
# from .auth.auth import authenticate


Expand All @@ -39,21 +40,6 @@ def remove_dot_dollar(tree: Cwl) -> Cwl:
return tree_no_dd


def get_yaml_tree(req: Json) -> Json:
"""
Get the Sophios yaml tree from incoming JSON
Args:
req (JSON): A raw JSON content of incoming JSON object
Returns:
Cwl: A Cwl document with . and $ removed from $namespaces and $schemas
"""
wkflw_name = "generic_workflow"
# args = converter.get_args(wkflw_name)
# yaml_tree_json: Json = converter.wfb_to_wic(req)
yaml_tree_json: Json = {}
return yaml_tree_json


def run_workflow(compiler_info: CompilerInfo, args: argparse.Namespace) -> int:
"""
Get the Sophios yaml tree from incoming JSON
Expand Down Expand Up @@ -108,25 +94,31 @@ async def compile_wf(request: Request) -> Json:
# ========= PROCESS REQUEST OBJECT ==========
req: Json = await request.json()
# clean up and convert the incoming object
# schema preserving
req = converter.raw_wfb_to_lean_wfb(req)
# schema non-preserving
workflow_temp = converter.wfb_to_wic(req)
wkflw_name = "generic_workflow"
args = get_args(wkflw_name)

workflow_temp = {}
if req["links"] != []:
for node in req["nodes"]:
workflow_temp["id"] = node["id"]
workflow_temp["step"] = node["cwlScript"] # Assume dict form
else: # A single node workflow
node = req["nodes"][0]
workflow_temp = node["cwlScript"]
args = get_args(wkflw_name, ['--inline_cwl_runtag'])

# Build canonical workflow object
workflow_can = utils_cwl.desugar_into_canonical_normal_form(workflow_temp)

# ========= BUILD WIC COMPILE INPUT =========
tools_cwl: Tools = {StepId(content["id"], "global"):
Tool(".", content["run"]) for content in workflow_can["steps"]}
# Build a list of CLTs
# The default list
tools_cwl: Tools = {}
global_config = input_output.get_config(Path(args.config_file), Path(args.homedir)/'wic'/'global_config.json')
tools_cwl = plugins.get_tools_cwl(global_config,
args.validate_plugins,
not args.no_skip_dollar_schemas,
args.quiet)
# Add to the default list if the tool is 'inline' in run tag
# run tag will have the actual CommandLineTool
for can_step in workflow_can["steps"]:
if can_step.get("run", None):
# add a new tool
tools_cwl[StepId(can_step["id"], "global")] = Tool(".", can_step["run"])
wic_obj = {'wic': workflow_can.get('wic', {})}
plugin_ns = wic_obj['wic'].get('namespace', 'global')

Expand All @@ -152,14 +144,6 @@ async def compile_wf(request: Request) -> Json:

# Convert the compiled yaml file to json for labshare Compute.
cwl_tree_run = copy.deepcopy(cwl_tree_no_dd)
# for step_key in cwl_tree['steps']:
# step_name_i = step_key
# step_name_i = step_name_i.replace('.yml', '_yml') # Due to calling remove_dot_dollar above
# # step_name = '__'.join(step_key.split('__')[3:]) # Remove prefix
# # Get step CWL from templates
# # run_val = next((tval['cwlScript']
# # for _, tval in ict_plugins.items() if step_name == tval['name']), None)
# # cwl_tree_run['steps'][step_name_i]['run'] = run_val

compute_workflow: Json = {}
compute_workflow = {
Expand All @@ -173,36 +157,3 @@ async def compile_wf(request: Request) -> Json:

if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=3000)


# # ========= PROCESS COMPILED OBJECT =========
# sub_node_data: NodeData = compiler_info.rose.data
# yaml_stem = sub_node_data.name
# cwl_tree = sub_node_data.compiled_cwl
# yaml_inputs = sub_node_data.workflow_inputs_file

# # ======== OUTPUT PROCESSING ================
# cwl_tree_no_dd = remove_dot_dollar(cwl_tree)
# yaml_inputs_no_dd = remove_dot_dollar(yaml_inputs)

# # Convert the compiled yaml file to json for labshare Compute.
# cwl_tree_run = copy.deepcopy(cwl_tree_no_dd)
# for step_key in cwl_tree['steps']:
# step_name_i = step_key
# step_name_i = step_name_i.replace('.yml', '_yml') # Due to calling remove_dot_dollar above
# step_name = '__'.join(step_key.split('__')[3:]) # Remove prefix

# # Get step CWL from templates
# run_val = next((tval['cwlScript']
# for _, tval in ict_plugins.items() if step_name == tval['name']), None)
# cwl_tree_run['steps'][step_name_i]['run'] = run_val

# TODO: set name and driver in workflow builder ui
# compute_workflow: Json = {}
# compute_workflow = {
# "name": yaml_stem,
# "driver": "argo",
# # "driver": "cwltool",
# "cwlJobInputs": yaml_inputs_no_dd,
# **cwl_tree_run
# }
129 changes: 69 additions & 60 deletions src/sophios/api/utils/converter.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import copy
from typing import Any, Dict, List

import yaml
from jsonschema import Draft202012Validator
from sophios.utils_yaml import wic_loader

# from sophios import cli
from sophios.wic_types import Json
from sophios.wic_types import Json, Cwl

SCHEMA: Json = {
"$schema": "http://json-schema.org/draft-07/schema#",
Expand Down Expand Up @@ -153,7 +153,7 @@ def raw_wfb_to_lean_wfb(inp: Json) -> Json:
prop_req = SCHEMA['required']
nodes_req = SCHEMA['definitions']['NodeX']['required']
links_req = SCHEMA['definitions']['Link']['required']
do_not_rem_nodes_prop = ['cwlScript']
do_not_rem_nodes_prop = ['cwlScript', 'run']
do_not_rem_links_prop: list = []

for k in keys:
Expand All @@ -174,59 +174,68 @@ def raw_wfb_to_lean_wfb(inp: Json) -> Json:
return inp_restrict


# def wfb_to_wic(request: Json) -> Json:
# """Convert the json object from http request object to a json object that can be used as input to wic compiler.

# Args:
# request (Json): json object from http request

# Returns:
# converted_json (Json): json object that can be used as input to wic compiler"""

# converted_steps: list[Any] = []

# for step in request['steps']:
# step_template = step['template']
# arguments = step['arguments']
# # Get the template name from the step template
# template_name = next((tval['name']
# for tname, tval in request['templates'].items() if step_template == tname), None)
# # template_name = None
# # for tname, tval in request['templates'].items():
# # if tname == step_template and tval['name']:
# # template_name = tval['name']
# # break
# # elif tname == step_template and not tval['name']:
# # break
# # else:
# # pass

# converted_step: Json = {}
# if template_name:
# converted_step[template_name] = {
# "in": {}
# }

# for key, value in arguments.items():
# # be aware of usage of periods in the values as delimiters, this may cause an issue when storing in MongoDB
# if value.startswith("steps."):
# parts = value.split('.')
# src_step_idx = int(parts[1][len("step"):])
# src_output_key = parts[3]

# # Get the src template name from the src step name
# src_template = next((step.get("template")
# for step in request['steps'] if step.get("name") == parts[1]), None)
# src_template_name = next((stval['name'] for stname, stval in request['templates'].items()
# if src_template == stname), None)
# src_converted_step = next((step.get(src_template_name)
# for step in converted_steps if step.get(src_template_name)), None)
# if src_converted_step:
# src_converted_step["in"][src_output_key] = f"&{src_template_name}.{src_output_key}.{src_step_idx}"
# converted_step[template_name]["in"][key] = f"*{src_template_name}.{src_output_key}.{src_step_idx}"
# else:
# converted_step[template_name]["in"][key] = value
# converted_steps.append(converted_step)

# converted_json: Json = {"steps": converted_steps}
# return converted_json
def wfb_to_wic(inp: Json) -> Cwl:
"""convert lean wfb json to compliant wic"""
# non-schema preserving changes
inp_restrict = copy.deepcopy(inp)

for node in inp_restrict['nodes']:
if node.get('settings'):
node['in'] = node['settings'].get('inputs')
if node['settings'].get('outputs'):
node['out'] = list({k: yaml.load('!& ' + v, Loader=wic_loader())} for k, v in node['settings']
['outputs'].items()) # outputs always have to be list
# remove these (now) superfluous keys
node.pop('settings', None)
node.pop('pluginId', None)
node.pop('internal', None)

# setting the inputs of the non-sink nodes i.e. whose input doesn't depend on any other node's output
# first get all target node ids
target_node_ids = []
for edg in inp_restrict['links']:
target_node_ids.append(edg['targetId'])
# now set inputs on non-sink nodes as inline input '!ii '
# if inputs exist
non_sink_nodes = [node for node in inp_restrict['nodes'] if node['id'] not in target_node_ids]
for node in non_sink_nodes:
if node.get('in'):
for nkey in node['in']:
node['in'][nkey] = yaml.load('!ii ' + node['in'][nkey], Loader=wic_loader())

# After outs are set
for edg in inp_restrict['links']:
# links = edge. nodes and edges is the correct terminology!
src_id = edg['sourceId']
tgt_id = edg['targetId']
src_node = next((node for node in inp_restrict['nodes'] if node['id'] == src_id), None)
tgt_node = next((node for node in inp_restrict['nodes'] if node['id'] == tgt_id), None)
assert src_node, f'output(s) of source node of edge{edg} must exist!'
assert tgt_node, f'input(s) of target node of edge{edg} must exist!'
# flattened list of keys
if src_node.get('out') and tgt_node.get('in'):
src_out_keys = [sk for sout in src_node['out'] for sk in sout.keys()]
tgt_in_keys = tgt_node['in'].keys()
# we match the source output tag type to target input tag type
# and connect them through '!* ' for input, all outputs are '!& ' before this
for sk in src_out_keys:
tgt_node['in'][sk] = yaml.load('!* ' + tgt_node['in'][sk], Loader=wic_loader())
# the inputs which aren't dependent on previous/other steps
# they are by default inline input
diff_keys = set(tgt_in_keys) - set(src_out_keys)
for dfk in diff_keys:
tgt_node['in'][dfk] = yaml.load('!ii ' + tgt_node['in'][dfk], Loader=wic_loader())

for node in inp_restrict['nodes']:
node['id'] = node['name'] # just reuse name as node's id, wic id is same as wfb name
node.pop('name', None)

workflow_temp: Cwl = {}
if inp_restrict["links"] != []:
workflow_temp["steps"] = []
for node in inp_restrict["nodes"]:
workflow_temp["steps"].append(node) # node["cwlScript"] # Assume dict form
else: # A single node workflow
node = inp_restrict["nodes"][0]
workflow_temp = node["cwlScript"]
return workflow_temp
76 changes: 76 additions & 0 deletions tests/rest_wfb_objects/multi_node.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
{
"nodes": [
{
"id": 7,
"x": 462,
"y": 206,
"z": 2,
"name": "touch",
"pluginId": "touch",
"height": 50,
"width": 250,
"settings": {
"inputs": {
"filename": "empty.txt"
},
"outputs": {
"file": "file_touch"
}
},
"internal": false
},
{
"id": 18,
"x": 155,
"y": 195,
"z": 1,
"name": "append",
"pluginId": "append",
"height": 50,
"width": 250,
"settings": {
"inputs": {
"str": "Hello",
"file": "file_touch"
},
"outputs": {
"file": "file_append1"
}
},
"internal": false
},
{
"id": 9,
"x": 790.3254637299812,
"y": 449.8103498684344,
"z": 5,
"name": "append",
"pluginId": "append",
"height": 50,
"width": 250,
"settings": {
"inputs": {
"str": "World!",
"file": "file_append1"
},
"outputs": {
"file": "file_append2"
}
},
"internal": false
}
],
"links": [
{
"sourceId": 7,
"targetId": 18,
"id": 1
},
{
"sourceId": 18,
"targetId": 9,
"id": 5
}
],
"selection": []
}
Loading
Loading