Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REST API : add the logic of converting wfb plugins (ICT) to CWL (CLT) #276

Merged
merged 3 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/run_workflows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ jobs:
# NOTE: Do NOT add coverage to PYPY CI runs https://github.com/tox-dev/tox/issues/2252
run: cd workflow-inference-compiler/ && pytest tests/test_rest_core.py -k test_rest_core --cwl_runner cwltool

- name: PyTest Run REST WFB Tests
if: always()
# NOTE: Do NOT add coverage to PYPY CI runs https://github.com/tox-dev/tox/issues/2252
run: cd workflow-inference-compiler/ && pytest tests/test_rest_wfb.py -k test_rest_wfb --cwl_runner cwltool

- name: PyTest Run ICT to CLT conversion Tests
if: always()
# NOTE: Do NOT add coverage to PYPY CI runs https://github.com/tox-dev/tox/issues/2252
Expand Down
19 changes: 10 additions & 9 deletions src/sophios/api/http/restapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ async def compile_wf(request: Request) -> Json:
req: Json = await request.json()
# clean up and convert the incoming object
# schema preserving
req = converter.raw_wfb_to_lean_wfb(req)
wfb_payload = converter.raw_wfb_to_lean_wfb(req)
# schema non-preserving
workflow_temp = converter.wfb_to_wic(req)
workflow_temp = converter.wfb_to_wic(wfb_payload)
wkflw_name = "generic_workflow"
args = get_args(wkflw_name, ['--inline_cwl_runtag'])
args = get_args(wkflw_name, ['--inline_cwl_runtag', '--generate_cwl_workflow'])

# Build canonical workflow object
workflow_can = utils_cwl.desugar_into_canonical_normal_form(workflow_temp)
Expand All @@ -126,16 +126,17 @@ async def compile_wf(request: Request) -> Json:
yaml_tree: YamlTree = YamlTree(StepId(wkflw_name, plugin_ns), workflow_can)

# ========= COMPILE WORKFLOW ================
args.ignore_dir_path = True
compiler_info: CompilerInfo = compiler.compile_workflow(yaml_tree, args, [], [graph], {}, {}, {}, {},
tools_cwl, True, relative_run_path=True, testing=False)

# =========== OPTIONAL RUN ==============
print('---------- Run Workflow locally! ---------')
retval = run_workflow(compiler_info, args)

rose_tree = compiler_info.rose
if args.inline_cwl_runtag:
input_output.write_to_disk(rose_tree, Path('autogenerated/'), True, args.inputs_file)
rose_tree = plugins.cwl_update_inline_runtag_rosetree(rose_tree, Path('autogenerated/'), True)
# ======== OUTPUT PROCESSING ================
# ========= PROCESS COMPILED OBJECT =========
sub_node_data: NodeData = compiler_info.rose.data
sub_node_data: NodeData = rose_tree.data
yaml_stem = sub_node_data.name
cwl_tree = sub_node_data.compiled_cwl
yaml_inputs = sub_node_data.workflow_inputs_file
Expand All @@ -151,7 +152,7 @@ async def compile_wf(request: Request) -> Json:
"cwlJobInputs": yaml_inputs_no_dd,
**cwl_tree_run
}
compute_workflow["retval"] = str(retval)
compute_workflow["retval"] = str(0)
return compute_workflow


Expand Down
46 changes: 39 additions & 7 deletions src/sophios/api/utils/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,35 @@ def extract_state(inp: Json) -> Json:
inp_restrict = copy.deepcopy(inp['state'])
else:
inp_inter = copy.deepcopy(inp)
# drop all 'internal' nodes and all edges with 'internal' nodes
step_nodes = [snode for snode in inp['state']['nodes'] if not snode['internal']]
step_node_ids = [step_node['id'] for step_node in step_nodes]
step_edges = [edg for edg in inp_inter['state']['links'] if edg['sourceId']
in step_node_ids and edg['targetId'] in step_node_ids]
# overwrite 'links' and 'nodes'
inp_inter['state'].pop('nodes', None)
inp_inter['state'].pop('links', None)
inp_inter['state']['nodes'] = step_nodes
inp_inter['state']['links'] = step_edges
# massage the plugins
plugins = inp_inter['plugins']
# drop incorrect/superfluous UI fields from plugins
# 'required' and 'format'
for ict_plugin in plugins:
for ui_elem in ict_plugin['ui']:
_, _ = ui_elem.pop('required', None), ui_elem.pop('format', None)
for out in ict_plugin['outputs']:
if out['name'] == 'outDir':
ict_plugin['inputs'].append(out)
# Here goes the ICT to CLT extraction logic
for node in inp_inter['state']['nodes']:
node_pid = node["pluginId"]
plugin = next((ict for ict in plugins if ict['pid'] == node_pid), None)
clt: Json = {}
if plugin:
clt = ict_to_clt(plugin)
# just have the clt payload in run
node['run'] = clt
inp_restrict = inp_inter['state']
return inp_restrict

Expand Down Expand Up @@ -92,7 +120,7 @@ def wfb_to_wic(inp: Json) -> Cwl:
['outputs'].items()) # outputs always have to be list
# remove these (now) superfluous keys
node.pop('settings', None)
node.pop('pluginId', None)
node.pop('name', None)
node.pop('internal', None)

# setting the inputs of the non-sink nodes i.e. whose input doesn't depend on any other node's output
Expand All @@ -106,7 +134,7 @@ def wfb_to_wic(inp: Json) -> Cwl:
for node in non_sink_nodes:
if node.get('in'):
for nkey in node['in']:
node['in'][nkey] = yaml.load('!ii ' + node['in'][nkey], Loader=wic_loader())
node['in'][nkey] = yaml.load('!ii ' + str(node['in'][nkey]), Loader=wic_loader())

# After outs are set
for edg in inp_restrict['links']:
Expand All @@ -124,16 +152,20 @@ def wfb_to_wic(inp: Json) -> Cwl:
# we match the source output tag type to target input tag type
# and connect them through '!* ' for input, all outputs are '!& ' before this
for sk in src_out_keys:
tgt_node['in'][sk] = yaml.load('!* ' + tgt_node['in'][sk], Loader=wic_loader())
# It maybe possible that (explicit) outputs of src nodes might not have corresponding
# (explicit) inputs in target node
if tgt_node['in'].get(sk):
tgt_node['in'][sk] = yaml.load('!* ' + tgt_node['in'][sk], Loader=wic_loader())
# the inputs which aren't dependent on previous/other steps
# they are by default inline input
diff_keys = set(tgt_in_keys) - set(src_out_keys)
for dfk in diff_keys:
tgt_node['in'][dfk] = yaml.load('!ii ' + tgt_node['in'][dfk], Loader=wic_loader())
tgt_node['in'][dfk] = yaml.load('!ii ' + str(tgt_node['in'][dfk]), Loader=wic_loader())

for node in inp_restrict['nodes']:
node['id'] = node['name'] # just reuse name as node's id, wic id is same as wfb name
node.pop('name', None)
# just reuse name as node's pluginId, wic id is same as wfb name
node['id'] = node['pluginId'].split('@')[0].replace('/', '_')
node.pop('pluginId', None)

workflow_temp: Cwl = {}
if inp_restrict["links"] != []:
Expand All @@ -142,7 +174,7 @@ def wfb_to_wic(inp: Json) -> Cwl:
workflow_temp["steps"].append(node) # node["cwlScript"] # Assume dict form
else: # A single node workflow
node = inp_restrict["nodes"][0]
workflow_temp = node["cwlScript"]
workflow_temp = node["cwlScript"] if node.get("cwlScript") else node['run']
return workflow_temp


Expand Down
16 changes: 8 additions & 8 deletions src/sophios/api/utils/ict/ict_spec/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ def validate_ui(self) -> "ICT":
inp_bool = [x in input_names for x in io_dict["inputs"]]
out_bool = [x in output_names for x in io_dict["outputs"]]

if not all(inp_bool):
raise ValueError(
f"The ui keys must match the inputs and outputs keys. Unmatched: inputs.{set(io_dict['inputs'])-set(input_names)}"
)
if not all(out_bool):
raise ValueError(
f"The ui keys must match the inputs and outputs keys. Unmatched: outputs.{set(io_dict['outputs'])-set(output_names)}"
)
# if not all(inp_bool):
# raise ValueError(
# f"The ui keys must match the inputs and outputs keys. Unmatched: inputs.{set(io_dict['inputs'])-set(input_names)}"
# )
# if not all(out_bool):
# raise ValueError(
# f"The ui keys must match the inputs and outputs keys. Unmatched: outputs.{set(io_dict['outputs'])-set(output_names)}"
# )
return self

def to_clt(self, network_access: bool = False) -> dict:
Expand Down
3 changes: 2 additions & 1 deletion src/sophios/api/utils/input_object_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@
"PluginX": {
"properties": {
"author": {
"type": "string"
"type": ["string", "array"]
},
"baseCommand": {
"items": {
Expand Down Expand Up @@ -318,6 +318,7 @@
},
"required": [
"id",
"pid",
"name",
"version",
"title",
Expand Down
3 changes: 3 additions & 0 deletions src/sophios/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
help='Copies output files from the cachedir to outdir/ (automatically enabled with --run_local)')
parser.add_argument('--inline_cwl_runtag', default=False, action="store_true",
help='Copies cwl adapter file contents inline into the final .cwl in autogenerated/')
# This is a hidden flag
parser.add_argument('--ignore_dir_path', type=bool,
required=False, default=False, help=argparse.SUPPRESS)

parser.add_argument('--parallel', default=False, action="store_true",
help='''When running locally, execute independent steps in parallel.
Expand Down
13 changes: 9 additions & 4 deletions src/sophios/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,10 +880,15 @@ def compile_workflow_once(yaml_tree_ast: YamlTree,
newval['format'] = in_format
new_keyval = {key: newval}
elif 'Directory' == in_dict['type']:
dir = Path(in_dict['value'])
if not dir.is_absolute():
dir = Path('autogenerated') / dir
dir.mkdir(parents=True, exist_ok=True)
if not args.ignore_dir_path:
if in_dict['value'].startswith('/'):
print("Warning! directory can not start with '/'")
print("It is most likely an incorrect path! Can't create directories!")
sys.exit(1)
ldir = Path(in_dict['value'])
if not ldir.is_absolute():
ldir = Path('autogenerated') / ldir
ldir.mkdir(parents=True, exist_ok=True)
newval = {'class': 'Directory', 'location': in_dict['value']}
new_keyval = {key: newval}
# TODO: Check for all valid types?
Expand Down
Loading
Loading