Skip to content

Commit

Permalink
Update linking of the outputs and inputs (#301)
Browse files Browse the repository at this point in the history
  • Loading branch information
sameeul authored Dec 19, 2024
1 parent c2b3664 commit 0de7798
Show file tree
Hide file tree
Showing 12 changed files with 237 additions and 56 deletions.
11 changes: 9 additions & 2 deletions src/sophios/api/http/restapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ async def compile_wf(request: Request) -> Json:
req = converter.update_payload_missing_inputs_outputs(req)
wfb_payload = converter.raw_wfb_to_lean_wfb(req)
# schema non-preserving
workflow_temp = converter.wfb_to_wic(wfb_payload)
wkflw_name = "generic_workflow_" + str(uuid.uuid4())
workflow_temp = converter.wfb_to_wic(wfb_payload, req["plugins"])
wkflw_name = "workflow_"
args = get_args(wkflw_name, suppliedargs)

# Build canonical workflow object
Expand Down Expand Up @@ -161,6 +161,13 @@ async def compile_wf(request: Request) -> Json:
cwl_tree_run.pop('steps', None)
cwl_tree_run['steps'] = cwl_tree_run.pop('steps_dict', None)

# currently there is a compiler bug where the output variables are duplicated
# this is a workaround to remove the duplicates till the compiler is fixed
for step in cwl_tree_run['steps']:

out_vars = cwl_tree_run['steps'][step]['out']
out_vars_unique = list(set(out_vars))
cwl_tree_run['steps'][step]['out'] = out_vars_unique
compute_workflow: Json = {}
compute_workflow = {
"name": yaml_stem,
Expand Down
124 changes: 96 additions & 28 deletions src/sophios/api/utils/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from sophios.wic_types import Json, Cwl
from sophios.api.utils.ict.ict_spec.model import ICT
from sophios.api.utils.ict.ict_spec.cast import cast_to_ict
from sophios.api.utils.wfb_util import get_node_config

SCHEMA_FILE = Path(__file__).parent / "input_object_schema.json"
SCHEMA: Json = {}
Expand Down Expand Up @@ -147,10 +148,16 @@ def get_topological_order(links: list[dict[str, str]]) -> list[str]:
return result


def wfb_to_wic(inp: Json) -> Cwl:
def wfb_to_wic(inp: Json, plugins: List[dict[str, Any]]) -> Cwl:
"""Convert lean wfb json to compliant wic"""
# non-schema preserving changes
inp_restrict = copy.deepcopy(inp)
plugin_config_map: dict[str, dict] = {}
for plugin in plugins:
pid: str = plugin.get("pid", "")
if pid == "":
continue
plugin_config_map[pid] = get_node_config(plugin)

for node in inp_restrict['nodes']:
if node.get('settings'):
Expand All @@ -159,7 +166,6 @@ def wfb_to_wic(inp: Json) -> Cwl:
node['out'] = list({k: yaml.load('!& ' + v, Loader=wic_loader())} for k, v in node['settings']
['outputs'].items()) # outputs always have to be list
# remove these (now) superfluous keys
node.pop('settings', None)
node.pop('name', None)
node.pop('internal', None)

Expand All @@ -168,39 +174,101 @@ def wfb_to_wic(inp: Json) -> Cwl:
target_node_ids = []
for edg in inp_restrict['links']:
target_node_ids.append(edg['targetId'])
# keep track of all the args that processed
node_arg_map: dict[int, set] = {}
# now set inputs on non-sink nodes as inline input '!ii '
# if inputs exist
non_sink_nodes = [node for node in inp_restrict['nodes'] if node['id'] not in target_node_ids]
for node in non_sink_nodes:
if node["id"] not in node_arg_map:
node_arg_map[node['id']] = set()
if node.get('in'):
for nkey in node['in']:
node['in'][nkey] = yaml.load('!ii ' + str(node['in'][nkey]), Loader=wic_loader())
if str(node['in'][nkey]) != "":
node['in'][nkey] = yaml.load('!ii ' + str(node['in'][nkey]), Loader=wic_loader())
node_arg_map[node['id']].add(nkey)

if plugins != []: # use the look up logic similar to WFB
for edg in inp_restrict['links']:
# links = edge. nodes and edges is the correct terminology!
src_id = edg['sourceId']
tgt_id = edg['targetId']
src_node = next((node for node in inp_restrict['nodes'] if node['id'] == src_id), None)
tgt_node = next((node for node in inp_restrict['nodes'] if node['id'] == tgt_id), None)
assert src_node, f'output(s) of source node of edge{edg} must exist!'
assert tgt_node, f'input(s) of target node of edge{edg} must exist!'
if src_id not in node_arg_map:
node_arg_map[src_id] = set()

if tgt_id not in node_arg_map:
node_arg_map[tgt_id] = set()

src_node_ui_config = plugin_config_map.get(src_node['pluginId'], None)
tgt_node_ui_config = plugin_config_map.get(tgt_node['pluginId'], None)
if src_node_ui_config and tgt_node_ui_config:
inlet_index = edg['inletIndex']
outlet_index = edg['outletIndex']

src_node_out_arg = src_node_ui_config['outputs'][outlet_index]["name"]
tgt_node_in_arg = tgt_node_ui_config['inputs'][inlet_index]["name"]

if tgt_node.get('in'):
source_output = src_node['out'][0][src_node_out_arg]
if isinstance(source_output, dict) and 'wic_anchor' in source_output:
source_output = source_output["wic_anchor"]
tgt_node['in'][tgt_node_in_arg] = yaml.load('!* ' + str(source_output), Loader=wic_loader())
node_arg_map[tgt_id].add(tgt_node_in_arg)

for node in inp_restrict['nodes']:
output_dict = node['settings'].get('outputs', {})
for key in output_dict:
if str(output_dict[key]) != "":
node['in'][key] = yaml.load('!ii ' + str(output_dict[key]), Loader=wic_loader())
node_arg_map[node['id']].add(key)
node.pop('settings', None)

# After outs are set
for edg in inp_restrict['links']:
# links = edge. nodes and edges is the correct terminology!
src_id = edg['sourceId']
tgt_id = edg['targetId']
src_node = next((node for node in inp_restrict['nodes'] if node['id'] == src_id), None)
tgt_node = next((node for node in inp_restrict['nodes'] if node['id'] == tgt_id), None)
assert src_node, f'output(s) of source node of edge{edg} must exist!'
assert tgt_node, f'input(s) of target node of edge{edg} must exist!'
# flattened list of keys
if src_node.get('out') and tgt_node.get('in'):
src_out_keys = [sk for sout in src_node['out'] for sk in sout.keys()]
tgt_in_keys = tgt_node['in'].keys()
# we match the source output tag type to target input tag type
# and connect them through '!* ' for input, all outputs are '!& ' before this
for sk in src_out_keys:
# It maybe possible that (explicit) outputs of src nodes might not have corresponding
# (explicit) inputs in target node
if tgt_node['in'].get(sk):
tgt_node['in'][sk] = yaml.load('!* ' + tgt_node['in'][sk], Loader=wic_loader())
# the inputs which aren't dependent on previous/other steps
# they are by default inline input
diff_keys = set(tgt_in_keys) - set(src_out_keys)
for dfk in diff_keys:
tgt_node['in'][dfk] = yaml.load('!ii ' + str(tgt_node['in'][dfk]), Loader=wic_loader())
if "in" in node:
unprocessed_args = set(node['in'].keys())
if node['id'] in node_arg_map:
unprocessed_args = unprocessed_args.difference(node_arg_map[node['id']])
for arg in unprocessed_args:
node['in'][arg] = yaml.load('!ii ' + str(node['in'][arg]), Loader=wic_loader())
else: # No plugins, use the old logic
# this logic is most likely not correct and need to be scrubbed
# along with updating the non_wfb dummy tests
for node in inp_restrict['nodes']:
node.pop('settings', None)

for edg in inp_restrict['links']:
# links = edge. nodes and edges is the correct terminology!
src_id = edg['sourceId']
tgt_id = edg['targetId']
src_node = next((node for node in inp_restrict['nodes'] if node['id'] == src_id), None)
tgt_node = next((node for node in inp_restrict['nodes'] if node['id'] == tgt_id), None)
assert src_node, f'output(s) of source node of edge{edg} must exist!'
assert tgt_node, f'input(s) of target node of edge{edg} must exist!'
# flattened list of keys
if src_node.get('out') and tgt_node.get('in'):
src_out_keys = [sk for sout in src_node['out'] for sk in sout.keys()]
tgt_in_keys = tgt_node['in'].keys()
# we match the source output tag type to target input tag type
# and connect them through '!* ' for input, all outputs are '!& ' before this
for sk in src_out_keys:
# It maybe possible that (explicit) outputs of src nodes might not have corresponding
# (explicit) inputs in target node
if tgt_node['in'].get(sk):
# if the output is a dict, it is a wic_anchor, so we need to get the anchor
# and use that as the input
src_node_out_arg = src_node['out'][0][sk]
source_output = src_node['out'][0][sk]
if isinstance(source_output, dict) and 'wic_anchor' in source_output:
source_output = source_output["wic_anchor"]
tgt_node['in'][sk] = yaml.load('!* ' + str(source_output), Loader=wic_loader())
# the inputs which aren't dependent on previous/other steps
# they are by default inline input
diff_keys = set(tgt_in_keys) - set(src_out_keys)
for dfk in diff_keys:
tgt_node['in'][dfk] = yaml.load('!ii ' + str(tgt_node['in'][dfk]), Loader=wic_loader())

workflow_temp: Cwl = {}
if inp_restrict["links"] != []:
Expand Down
43 changes: 26 additions & 17 deletions src/sophios/api/utils/ict/ict_spec/io/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Optional, Union, Any

from pydantic import BaseModel, Field
from sophios.api.utils.wfb_util import is_directory


CWL_IO_DICT: dict[str, str] = {
Expand Down Expand Up @@ -116,27 +117,35 @@ def _output_to_cwl(self, inputs: Any) -> dict:
"""Convert outputs to CWL."""
if self.io_type == "path":
if self.name in inputs:
if (
not isinstance(self.io_format, list)
and self.io_format["term"].lower()
== "directory" # pylint: disable=unsubscriptable-object
):
cwl_type = "Directory"
elif (
not isinstance(self.io_format, list)
and self.io_format["term"].lower()
== "file" # pylint: disable=unsubscriptable-object
):
cwl_type = "File"
elif (
isinstance(self.io_format, list)
and len(self.io_format) == 1
and self.io_format[0].lower() == 'directory'
):
if is_directory(dict(self)):
cwl_type = "Directory"
else:
cwl_type = "File"

# the logic here is probably wrong
# let's not go here until we have a better idea of io_format in ICT Spec

# if (
# not isinstance(self.io_format, list)
# and self.io_format["term"].lower()
# == "directory" # pylint: disable=unsubscriptable-object
# ):
# cwl_type = "Directory"
# elif (
# not isinstance(self.io_format, list)
# and self.io_format["term"].lower()
# == "file" # pylint: disable=unsubscriptable-object
# ):
# cwl_type = "File"
# elif (
# isinstance(self.io_format, list)
# and len(self.io_format) == 1
# and self.io_format[0].lower() == 'directory'
# ):
# cwl_type = "Directory"
# else:
# cwl_type = "File"

cwl_dict_ = {
"outputBinding": {"glob": f"$(inputs.{self.name}.basename)"},
"type": cwl_type,
Expand Down
2 changes: 1 addition & 1 deletion src/sophios/api/utils/ict/ict_spec/tools/cwl_ict.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def clt_dict(ict_: "ICT", network_access: bool) -> dict:
for io in ict_.outputs
},
"requirements": requirements(ict_, network_access),
"baseCommand": ict_.entrypoint,
"baseCommand": [],
"label": ict_.title,
"doc": str(ict_.documentation),
}
Expand Down
4 changes: 3 additions & 1 deletion src/sophios/api/utils/input_object_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
"required": [
"id",
"sourceId",
"targetId"
"targetId",
"inletIndex",
"outletIndex"
],
"type": "object"
},
Expand Down
84 changes: 84 additions & 0 deletions src/sophios/api/utils/wfb_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
def is_directory(input_dict: dict) -> bool:
"""Check if the given input dictionary represents a directory.
Args:
input_dict (dict): The input dictionary containing type and name.
Returns:
bool: True if the input represents a directory, False otherwise.
"""

is_dir: bool = input_dict.get("type", "") == "directory" \
or input_dict.get("type", "") == "file" \
or input_dict.get("type", "") == "path" \
or input_dict.get("type", "") == "collection" \
or input_dict.get("type", "") == "csvCollection" \
or input_dict.get("name", "").lower() == "file" \
or input_dict.get("name", "").lower().endswith("path") \
or input_dict.get("name", "").lower().endswith("dir")

return is_dir


def get_node_config(plugin: dict) -> dict:
"""Get the UI configuration for a specific plugin.
Args:
plugin (dict): The plugin dictionary containing UI and inputs.
Returns:
dict: A dictionary containing UI inputs, non-UI inputs, and outputs.
"""
uis = plugin.get("ui", [])
plugin_inputs = plugin.get("inputs", [])

# split inputs into UI (form) and non-UI (circle inlets)
non_ui_inputs = [] # circle inlets on the left side of the node
ui_inputs = [] # UI inputs such as text fields, checkboxes, etc.

for i in range(len(plugin_inputs) - 1, -1, -1):
input = plugin_inputs[i]

# find the UI element that corresponds to this input
ui_input = next(
(x for x in uis if "key" in x and x["key"] == "inputs." + input["name"]),
None,
)
is_dir = is_directory(input)

# if input is a directory - move it to the non-UI section
if is_dir:
non_ui_inputs.append(input)

# in some cases UI is missing for the input, so we need to create it
# but only if it's not a directory
if not ui_input and not is_dir:
calculated_ui_input = {
"key": "inputs." + input["name"],
"type": input["type"],
"title": input["name"],
"required": input["required"],
"format": input["format"],
}

ui_inputs.append(calculated_ui_input)

if ui_input and not is_dir:
ui_input["required"] = input["required"]
ui_input["format"] = input["format"]
ui_inputs.append(ui_input)

outputs = plugin.get("outputs", [])

# if output has UI - move it to the UI section
# this is mostly for internal nodes such as Input Data Directory
for output in outputs:
ui_output = next(
(x for x in uis if "key" in x and x["key"] == "outputs." + output["name"]),
None,
)
if ui_output:
ui_inputs.append(ui_output)

result = {"ui": ui_inputs, "inputs": non_ui_inputs, "outputs": outputs}
return result
5 changes: 4 additions & 1 deletion src/sophios/utils_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ def inlineinput_constructor(loader: yaml.SafeLoader, node: yaml.nodes.Node) -> D
if isinstance(node, yaml.nodes.ScalarNode):
try:
# loader.construct_scalar always returns a string, whereas
val = yaml.safe_load(node.value)
if node.value == "":
val = ""
else:
val = yaml.safe_load(node.value)
# yaml.safe_load returns the correct primitive types
except Exception:
# but fallback to a string if it is not actually a primitive type.
Expand Down
4 changes: 2 additions & 2 deletions tests/data/ict_data/czi_extract/czi_extract_clt.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"baseCommand": "[python3, main.py]",
"baseCommand": [],
"class": "CommandLineTool",
"cwlVersion": "v1.2",
"doc": "None",
Expand All @@ -23,7 +23,7 @@
"outputBinding": {
"glob": "$(inputs.outDir.basename)"
},
"type": "File"
"type": "Directory"
}
},
"requirements": {
Expand Down
4 changes: 2 additions & 2 deletions tests/data/ict_data/label_to_vector/label_to_vector_clt.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"baseCommand": "[python3, main.py]",
"baseCommand": [],
"class": "CommandLineTool",
"cwlVersion": "v1.2",
"doc": "None",
Expand Down Expand Up @@ -29,7 +29,7 @@
"outputBinding": {
"glob": "$(inputs.outDir.basename)"
},
"type": "File"
"type": "Directory"
}
},
"requirements": {
Expand Down
Loading

0 comments on commit 0de7798

Please sign in to comment.