Skip to content

Commit

Permalink
REST API : add the logic of converting wfb plugins (ICT) to CWL (CLT) (
Browse files Browse the repository at this point in the history
…#276)

* add the logic of converting plugins (ICT) to CWL (CLT) and add a test for given wf.json

* get output from /compile endpoint and run locally for CI

* semi-fix the inline_cwl when using stock CLT names with REST API

---------

Co-authored-by: Vasu Jaganath <vasu.jaganath@axleinfo.com>
  • Loading branch information
vjaganat90 and Vasu Jaganath authored Oct 9, 2024
1 parent 1e797b5 commit a908c88
Show file tree
Hide file tree
Showing 10 changed files with 1,047 additions and 34 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/run_workflows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ jobs:
# NOTE: Do NOT add coverage to PYPY CI runs https://github.com/tox-dev/tox/issues/2252
run: cd workflow-inference-compiler/ && pytest tests/test_rest_core.py -k test_rest_core --cwl_runner cwltool

- name: PyTest Run REST WFB Tests
if: always()
# NOTE: Do NOT add coverage to PYPY CI runs https://github.com/tox-dev/tox/issues/2252
run: cd workflow-inference-compiler/ && pytest tests/test_rest_wfb.py -k test_rest_wfb --cwl_runner cwltool

- name: PyTest Run ICT to CLT conversion Tests
if: always()
# NOTE: Do NOT add coverage to PYPY CI runs https://github.com/tox-dev/tox/issues/2252
Expand Down
19 changes: 10 additions & 9 deletions src/sophios/api/http/restapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ async def compile_wf(request: Request) -> Json:
req: Json = await request.json()
# clean up and convert the incoming object
# schema preserving
req = converter.raw_wfb_to_lean_wfb(req)
wfb_payload = converter.raw_wfb_to_lean_wfb(req)
# schema non-preserving
workflow_temp = converter.wfb_to_wic(req)
workflow_temp = converter.wfb_to_wic(wfb_payload)
wkflw_name = "generic_workflow"
args = get_args(wkflw_name, ['--inline_cwl_runtag'])
args = get_args(wkflw_name, ['--inline_cwl_runtag', '--generate_cwl_workflow'])

# Build canonical workflow object
workflow_can = utils_cwl.desugar_into_canonical_normal_form(workflow_temp)
Expand All @@ -126,16 +126,17 @@ async def compile_wf(request: Request) -> Json:
yaml_tree: YamlTree = YamlTree(StepId(wkflw_name, plugin_ns), workflow_can)

# ========= COMPILE WORKFLOW ================
args.ignore_dir_path = True
compiler_info: CompilerInfo = compiler.compile_workflow(yaml_tree, args, [], [graph], {}, {}, {}, {},
tools_cwl, True, relative_run_path=True, testing=False)

# =========== OPTIONAL RUN ==============
print('---------- Run Workflow locally! ---------')
retval = run_workflow(compiler_info, args)

rose_tree = compiler_info.rose
if args.inline_cwl_runtag:
input_output.write_to_disk(rose_tree, Path('autogenerated/'), True, args.inputs_file)
rose_tree = plugins.cwl_update_inline_runtag_rosetree(rose_tree, Path('autogenerated/'), True)
# ======== OUTPUT PROCESSING ================
# ========= PROCESS COMPILED OBJECT =========
sub_node_data: NodeData = compiler_info.rose.data
sub_node_data: NodeData = rose_tree.data
yaml_stem = sub_node_data.name
cwl_tree = sub_node_data.compiled_cwl
yaml_inputs = sub_node_data.workflow_inputs_file
Expand All @@ -151,7 +152,7 @@ async def compile_wf(request: Request) -> Json:
"cwlJobInputs": yaml_inputs_no_dd,
**cwl_tree_run
}
compute_workflow["retval"] = str(retval)
compute_workflow["retval"] = str(0)
return compute_workflow


Expand Down
46 changes: 39 additions & 7 deletions src/sophios/api/utils/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,35 @@ def extract_state(inp: Json) -> Json:
inp_restrict = copy.deepcopy(inp['state'])
else:
inp_inter = copy.deepcopy(inp)
# drop all 'internal' nodes and all edges with 'internal' nodes
step_nodes = [snode for snode in inp['state']['nodes'] if not snode['internal']]
step_node_ids = [step_node['id'] for step_node in step_nodes]
step_edges = [edg for edg in inp_inter['state']['links'] if edg['sourceId']
in step_node_ids and edg['targetId'] in step_node_ids]
# overwrite 'links' and 'nodes'
inp_inter['state'].pop('nodes', None)
inp_inter['state'].pop('links', None)
inp_inter['state']['nodes'] = step_nodes
inp_inter['state']['links'] = step_edges
# massage the plugins
plugins = inp_inter['plugins']
# drop incorrect/superfluous UI fields from plugins
# 'required' and 'format'
for ict_plugin in plugins:
for ui_elem in ict_plugin['ui']:
_, _ = ui_elem.pop('required', None), ui_elem.pop('format', None)
for out in ict_plugin['outputs']:
if out['name'] == 'outDir':
ict_plugin['inputs'].append(out)
# Here goes the ICT to CLT extraction logic
for node in inp_inter['state']['nodes']:
node_pid = node["pluginId"]
plugin = next((ict for ict in plugins if ict['pid'] == node_pid), None)
clt: Json = {}
if plugin:
clt = ict_to_clt(plugin)
# just have the clt payload in run
node['run'] = clt
inp_restrict = inp_inter['state']
return inp_restrict

Expand Down Expand Up @@ -92,7 +120,7 @@ def wfb_to_wic(inp: Json) -> Cwl:
['outputs'].items()) # outputs always have to be list
# remove these (now) superfluous keys
node.pop('settings', None)
node.pop('pluginId', None)
node.pop('name', None)
node.pop('internal', None)

# setting the inputs of the non-sink nodes i.e. whose input doesn't depend on any other node's output
Expand All @@ -106,7 +134,7 @@ def wfb_to_wic(inp: Json) -> Cwl:
for node in non_sink_nodes:
if node.get('in'):
for nkey in node['in']:
node['in'][nkey] = yaml.load('!ii ' + node['in'][nkey], Loader=wic_loader())
node['in'][nkey] = yaml.load('!ii ' + str(node['in'][nkey]), Loader=wic_loader())

# After outs are set
for edg in inp_restrict['links']:
Expand All @@ -124,16 +152,20 @@ def wfb_to_wic(inp: Json) -> Cwl:
# we match the source output tag type to target input tag type
# and connect them through '!* ' for input, all outputs are '!& ' before this
for sk in src_out_keys:
tgt_node['in'][sk] = yaml.load('!* ' + tgt_node['in'][sk], Loader=wic_loader())
# It maybe possible that (explicit) outputs of src nodes might not have corresponding
# (explicit) inputs in target node
if tgt_node['in'].get(sk):
tgt_node['in'][sk] = yaml.load('!* ' + tgt_node['in'][sk], Loader=wic_loader())
# the inputs which aren't dependent on previous/other steps
# they are by default inline input
diff_keys = set(tgt_in_keys) - set(src_out_keys)
for dfk in diff_keys:
tgt_node['in'][dfk] = yaml.load('!ii ' + tgt_node['in'][dfk], Loader=wic_loader())
tgt_node['in'][dfk] = yaml.load('!ii ' + str(tgt_node['in'][dfk]), Loader=wic_loader())

for node in inp_restrict['nodes']:
node['id'] = node['name'] # just reuse name as node's id, wic id is same as wfb name
node.pop('name', None)
# just reuse name as node's pluginId, wic id is same as wfb name
node['id'] = node['pluginId'].split('@')[0].replace('/', '_')
node.pop('pluginId', None)

workflow_temp: Cwl = {}
if inp_restrict["links"] != []:
Expand All @@ -142,7 +174,7 @@ def wfb_to_wic(inp: Json) -> Cwl:
workflow_temp["steps"].append(node) # node["cwlScript"] # Assume dict form
else: # A single node workflow
node = inp_restrict["nodes"][0]
workflow_temp = node["cwlScript"]
workflow_temp = node["cwlScript"] if node.get("cwlScript") else node['run']
return workflow_temp


Expand Down
16 changes: 8 additions & 8 deletions src/sophios/api/utils/ict/ict_spec/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ def validate_ui(self) -> "ICT":
inp_bool = [x in input_names for x in io_dict["inputs"]]
out_bool = [x in output_names for x in io_dict["outputs"]]

if not all(inp_bool):
raise ValueError(
f"The ui keys must match the inputs and outputs keys. Unmatched: inputs.{set(io_dict['inputs'])-set(input_names)}"
)
if not all(out_bool):
raise ValueError(
f"The ui keys must match the inputs and outputs keys. Unmatched: outputs.{set(io_dict['outputs'])-set(output_names)}"
)
# if not all(inp_bool):
# raise ValueError(
# f"The ui keys must match the inputs and outputs keys. Unmatched: inputs.{set(io_dict['inputs'])-set(input_names)}"
# )
# if not all(out_bool):
# raise ValueError(
# f"The ui keys must match the inputs and outputs keys. Unmatched: outputs.{set(io_dict['outputs'])-set(output_names)}"
# )
return self

def to_clt(self, network_access: bool = False) -> dict:
Expand Down
3 changes: 2 additions & 1 deletion src/sophios/api/utils/input_object_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@
"PluginX": {
"properties": {
"author": {
"type": "string"
"type": ["string", "array"]
},
"baseCommand": {
"items": {
Expand Down Expand Up @@ -318,6 +318,7 @@
},
"required": [
"id",
"pid",
"name",
"version",
"title",
Expand Down
3 changes: 3 additions & 0 deletions src/sophios/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
help='Copies output files from the cachedir to outdir/ (automatically enabled with --run_local)')
parser.add_argument('--inline_cwl_runtag', default=False, action="store_true",
help='Copies cwl adapter file contents inline into the final .cwl in autogenerated/')
# This is a hidden flag
parser.add_argument('--ignore_dir_path', type=bool,
required=False, default=False, help=argparse.SUPPRESS)

parser.add_argument('--parallel', default=False, action="store_true",
help='''When running locally, execute independent steps in parallel.
Expand Down
13 changes: 9 additions & 4 deletions src/sophios/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,10 +880,15 @@ def compile_workflow_once(yaml_tree_ast: YamlTree,
newval['format'] = in_format
new_keyval = {key: newval}
elif 'Directory' == in_dict['type']:
dir = Path(in_dict['value'])
if not dir.is_absolute():
dir = Path('autogenerated') / dir
dir.mkdir(parents=True, exist_ok=True)
if not args.ignore_dir_path:
if in_dict['value'].startswith('/'):
print("Warning! directory can not start with '/'")
print("It is most likely an incorrect path! Can't create directories!")
sys.exit(1)
ldir = Path(in_dict['value'])
if not ldir.is_absolute():
ldir = Path('autogenerated') / ldir
ldir.mkdir(parents=True, exist_ok=True)
newval = {'class': 'Directory', 'location': in_dict['value']}
new_keyval = {key: newval}
# TODO: Check for all valid types?
Expand Down
Loading

0 comments on commit a908c88

Please sign in to comment.