Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update linking of the outputs and inputs #301

Merged
merged 12 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/sophios/api/http/restapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ async def compile_wf(request: Request) -> Json:
req = converter.update_payload_missing_inputs_outputs(req)
wfb_payload = converter.raw_wfb_to_lean_wfb(req)
# schema non-preserving
workflow_temp = converter.wfb_to_wic(wfb_payload)
wkflw_name = "generic_workflow_" + str(uuid.uuid4())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was removing uuuid from workflow name necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. There is a limitation on the WFB side for the length of the name. They are also adding a UUID, so we are dropping it. See the Slack discussion.

workflow_temp = converter.wfb_to_wic(wfb_payload, req["plugins"])
wkflw_name = "workflow_"
args = get_args(wkflw_name, suppliedargs)

# Build canonical workflow object
Expand Down Expand Up @@ -161,6 +161,13 @@ async def compile_wf(request: Request) -> Json:
cwl_tree_run.pop('steps', None)
cwl_tree_run['steps'] = cwl_tree_run.pop('steps_dict', None)

# currently there is a compiler bug where the output variables are duplicated
# this is a workaround to remove the duplicates till the compiler is fixed
for step in cwl_tree_run['steps']:

out_vars = cwl_tree_run['steps'][step]['out']
out_vars_unique = list(set(out_vars))
cwl_tree_run['steps'][step]['out'] = out_vars_unique
compute_workflow: Json = {}
compute_workflow = {
"name": yaml_stem,
Expand Down
124 changes: 96 additions & 28 deletions src/sophios/api/utils/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from sophios.wic_types import Json, Cwl
from sophios.api.utils.ict.ict_spec.model import ICT
from sophios.api.utils.ict.ict_spec.cast import cast_to_ict
from sophios.api.utils.wfb_util import get_node_config

SCHEMA_FILE = Path(__file__).parent / "input_object_schema.json"
SCHEMA: Json = {}
Expand Down Expand Up @@ -147,10 +148,16 @@ def get_topological_order(links: list[dict[str, str]]) -> list[str]:
return result


def wfb_to_wic(inp: Json) -> Cwl:
def wfb_to_wic(inp: Json, plugins: List[dict[str, Any]]) -> Cwl:
"""Convert lean wfb json to compliant wic"""
# non-schema preserving changes
inp_restrict = copy.deepcopy(inp)
plugin_config_map: dict[str, dict] = {}
for plugin in plugins:
pid: str = plugin.get("pid", "")
if pid == "":
continue
plugin_config_map[pid] = get_node_config(plugin)

for node in inp_restrict['nodes']:
if node.get('settings'):
Expand All @@ -159,7 +166,6 @@ def wfb_to_wic(inp: Json) -> Cwl:
node['out'] = list({k: yaml.load('!& ' + v, Loader=wic_loader())} for k, v in node['settings']
['outputs'].items()) # outputs always have to be list
# remove these (now) superfluous keys
node.pop('settings', None)
node.pop('name', None)
node.pop('internal', None)

Expand All @@ -168,39 +174,101 @@ def wfb_to_wic(inp: Json) -> Cwl:
target_node_ids = []
for edg in inp_restrict['links']:
target_node_ids.append(edg['targetId'])
# keep track of all the args that processed
node_arg_map: dict[int, set] = {}
# now set inputs on non-sink nodes as inline input '!ii '
# if inputs exist
non_sink_nodes = [node for node in inp_restrict['nodes'] if node['id'] not in target_node_ids]
for node in non_sink_nodes:
if node["id"] not in node_arg_map:
node_arg_map[node['id']] = set()
if node.get('in'):
for nkey in node['in']:
node['in'][nkey] = yaml.load('!ii ' + str(node['in'][nkey]), Loader=wic_loader())
if str(node['in'][nkey]) != "":
node['in'][nkey] = yaml.load('!ii ' + str(node['in'][nkey]), Loader=wic_loader())
node_arg_map[node['id']].add(nkey)

if plugins != []: # use the look up logic similar to WFB
for edg in inp_restrict['links']:
# links = edge. nodes and edges is the correct terminology!
src_id = edg['sourceId']
tgt_id = edg['targetId']
src_node = next((node for node in inp_restrict['nodes'] if node['id'] == src_id), None)
tgt_node = next((node for node in inp_restrict['nodes'] if node['id'] == tgt_id), None)
assert src_node, f'output(s) of source node of edge{edg} must exist!'
assert tgt_node, f'input(s) of target node of edge{edg} must exist!'
if src_id not in node_arg_map:
node_arg_map[src_id] = set()

if tgt_id not in node_arg_map:
node_arg_map[tgt_id] = set()

src_node_ui_config = plugin_config_map.get(src_node['pluginId'], None)
tgt_node_ui_config = plugin_config_map.get(tgt_node['pluginId'], None)
if src_node_ui_config and tgt_node_ui_config:
inlet_index = edg['inletIndex']
outlet_index = edg['outletIndex']

src_node_out_arg = src_node_ui_config['outputs'][outlet_index]["name"]
tgt_node_in_arg = tgt_node_ui_config['inputs'][inlet_index]["name"]

if tgt_node.get('in'):
source_output = src_node['out'][0][src_node_out_arg]
if isinstance(source_output, dict) and 'wic_anchor' in source_output:
source_output = source_output["wic_anchor"]
tgt_node['in'][tgt_node_in_arg] = yaml.load('!* ' + str(source_output), Loader=wic_loader())
node_arg_map[tgt_id].add(tgt_node_in_arg)

for node in inp_restrict['nodes']:
output_dict = node['settings'].get('outputs', {})
for key in output_dict:
if str(output_dict[key]) != "":
node['in'][key] = yaml.load('!ii ' + str(output_dict[key]), Loader=wic_loader())
node_arg_map[node['id']].add(key)
node.pop('settings', None)

# After outs are set
for edg in inp_restrict['links']:
# links = edge. nodes and edges is the correct terminology!
src_id = edg['sourceId']
tgt_id = edg['targetId']
src_node = next((node for node in inp_restrict['nodes'] if node['id'] == src_id), None)
tgt_node = next((node for node in inp_restrict['nodes'] if node['id'] == tgt_id), None)
assert src_node, f'output(s) of source node of edge{edg} must exist!'
assert tgt_node, f'input(s) of target node of edge{edg} must exist!'
# flattened list of keys
if src_node.get('out') and tgt_node.get('in'):
src_out_keys = [sk for sout in src_node['out'] for sk in sout.keys()]
tgt_in_keys = tgt_node['in'].keys()
# we match the source output tag type to target input tag type
# and connect them through '!* ' for input, all outputs are '!& ' before this
for sk in src_out_keys:
# It maybe possible that (explicit) outputs of src nodes might not have corresponding
# (explicit) inputs in target node
if tgt_node['in'].get(sk):
tgt_node['in'][sk] = yaml.load('!* ' + tgt_node['in'][sk], Loader=wic_loader())
# the inputs which aren't dependent on previous/other steps
# they are by default inline input
diff_keys = set(tgt_in_keys) - set(src_out_keys)
for dfk in diff_keys:
tgt_node['in'][dfk] = yaml.load('!ii ' + str(tgt_node['in'][dfk]), Loader=wic_loader())
if "in" in node:
unprocessed_args = set(node['in'].keys())
if node['id'] in node_arg_map:
unprocessed_args = unprocessed_args.difference(node_arg_map[node['id']])
for arg in unprocessed_args:
node['in'][arg] = yaml.load('!ii ' + str(node['in'][arg]), Loader=wic_loader())
else: # No plugins, use the old logic
# this logic is most likely not correct and need to be scrubbed
# along with updating the non_wfb dummy tests
for node in inp_restrict['nodes']:
node.pop('settings', None)

for edg in inp_restrict['links']:
# links = edge. nodes and edges is the correct terminology!
src_id = edg['sourceId']
tgt_id = edg['targetId']
src_node = next((node for node in inp_restrict['nodes'] if node['id'] == src_id), None)
tgt_node = next((node for node in inp_restrict['nodes'] if node['id'] == tgt_id), None)
assert src_node, f'output(s) of source node of edge{edg} must exist!'
assert tgt_node, f'input(s) of target node of edge{edg} must exist!'
# flattened list of keys
if src_node.get('out') and tgt_node.get('in'):
src_out_keys = [sk for sout in src_node['out'] for sk in sout.keys()]
tgt_in_keys = tgt_node['in'].keys()
# we match the source output tag type to target input tag type
# and connect them through '!* ' for input, all outputs are '!& ' before this
for sk in src_out_keys:
# It maybe possible that (explicit) outputs of src nodes might not have corresponding
# (explicit) inputs in target node
if tgt_node['in'].get(sk):
# if the output is a dict, it is a wic_anchor, so we need to get the anchor
# and use that as the input
src_node_out_arg = src_node['out'][0][sk]
source_output = src_node['out'][0][sk]
if isinstance(source_output, dict) and 'wic_anchor' in source_output:
source_output = source_output["wic_anchor"]
tgt_node['in'][sk] = yaml.load('!* ' + str(source_output), Loader=wic_loader())
# the inputs which aren't dependent on previous/other steps
# they are by default inline input
diff_keys = set(tgt_in_keys) - set(src_out_keys)
for dfk in diff_keys:
tgt_node['in'][dfk] = yaml.load('!ii ' + str(tgt_node['in'][dfk]), Loader=wic_loader())

workflow_temp: Cwl = {}
if inp_restrict["links"] != []:
Expand Down
43 changes: 26 additions & 17 deletions src/sophios/api/utils/ict/ict_spec/io/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Optional, Union, Any

from pydantic import BaseModel, Field
from sophios.api.utils.wfb_util import is_directory


CWL_IO_DICT: dict[str, str] = {
Expand Down Expand Up @@ -116,27 +117,35 @@ def _output_to_cwl(self, inputs: Any) -> dict:
"""Convert outputs to CWL."""
if self.io_type == "path":
if self.name in inputs:
if (
not isinstance(self.io_format, list)
and self.io_format["term"].lower()
== "directory" # pylint: disable=unsubscriptable-object
):
cwl_type = "Directory"
elif (
not isinstance(self.io_format, list)
and self.io_format["term"].lower()
== "file" # pylint: disable=unsubscriptable-object
):
cwl_type = "File"
elif (
isinstance(self.io_format, list)
and len(self.io_format) == 1
and self.io_format[0].lower() == 'directory'
):
if is_directory(dict(self)):
cwl_type = "Directory"
else:
cwl_type = "File"

# the logic here is probably wrong
# let's not go here until we have a better idea of io_format in ICT Spec

# if (
# not isinstance(self.io_format, list)
# and self.io_format["term"].lower()
# == "directory" # pylint: disable=unsubscriptable-object
# ):
# cwl_type = "Directory"
# elif (
# not isinstance(self.io_format, list)
# and self.io_format["term"].lower()
# == "file" # pylint: disable=unsubscriptable-object
# ):
# cwl_type = "File"
# elif (
# isinstance(self.io_format, list)
# and len(self.io_format) == 1
# and self.io_format[0].lower() == 'directory'
# ):
# cwl_type = "Directory"
# else:
# cwl_type = "File"

cwl_dict_ = {
"outputBinding": {"glob": f"$(inputs.{self.name}.basename)"},
"type": cwl_type,
Expand Down
2 changes: 1 addition & 1 deletion src/sophios/api/utils/ict/ict_spec/tools/cwl_ict.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def clt_dict(ict_: "ICT", network_access: bool) -> dict:
for io in ict_.outputs
},
"requirements": requirements(ict_, network_access),
"baseCommand": ict_.entrypoint,
"baseCommand": [],
"label": ict_.title,
"doc": str(ict_.documentation),
}
Expand Down
4 changes: 3 additions & 1 deletion src/sophios/api/utils/input_object_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
"required": [
"id",
"sourceId",
"targetId"
"targetId",
"inletIndex",
"outletIndex"
],
"type": "object"
},
Expand Down
84 changes: 84 additions & 0 deletions src/sophios/api/utils/wfb_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
def is_directory(input_dict: dict) -> bool:
"""Check if the given input dictionary represents a directory.

Args:
input_dict (dict): The input dictionary containing type and name.

Returns:
bool: True if the input represents a directory, False otherwise.
"""

is_dir: bool = input_dict.get("type", "") == "directory" \
or input_dict.get("type", "") == "file" \
or input_dict.get("type", "") == "path" \
or input_dict.get("type", "") == "collection" \
or input_dict.get("type", "") == "csvCollection" \
or input_dict.get("name", "").lower() == "file" \
or input_dict.get("name", "").lower().endswith("path") \
or input_dict.get("name", "").lower().endswith("dir")

return is_dir


def get_node_config(plugin: dict) -> dict:
"""Get the UI configuration for a specific plugin.

Args:
plugin (dict): The plugin dictionary containing UI and inputs.

Returns:
dict: A dictionary containing UI inputs, non-UI inputs, and outputs.
"""
uis = plugin.get("ui", [])
plugin_inputs = plugin.get("inputs", [])

# split inputs into UI (form) and non-UI (circle inlets)
non_ui_inputs = [] # circle inlets on the left side of the node
ui_inputs = [] # UI inputs such as text fields, checkboxes, etc.

for i in range(len(plugin_inputs) - 1, -1, -1):
input = plugin_inputs[i]

# find the UI element that corresponds to this input
ui_input = next(
(x for x in uis if "key" in x and x["key"] == "inputs." + input["name"]),
None,
)
is_dir = is_directory(input)

# if input is a directory - move it to the non-UI section
if is_dir:
non_ui_inputs.append(input)

# in some cases UI is missing for the input, so we need to create it
# but only if it's not a directory
if not ui_input and not is_dir:
calculated_ui_input = {
"key": "inputs." + input["name"],
"type": input["type"],
"title": input["name"],
"required": input["required"],
"format": input["format"],
}

ui_inputs.append(calculated_ui_input)

if ui_input and not is_dir:
ui_input["required"] = input["required"]
ui_input["format"] = input["format"]
ui_inputs.append(ui_input)

outputs = plugin.get("outputs", [])

# if output has UI - move it to the UI section
# this is mostly for internal nodes such as Input Data Directory
for output in outputs:
ui_output = next(
(x for x in uis if "key" in x and x["key"] == "outputs." + output["name"]),
None,
)
if ui_output:
ui_inputs.append(ui_output)

result = {"ui": ui_inputs, "inputs": non_ui_inputs, "outputs": outputs}
return result
5 changes: 4 additions & 1 deletion src/sophios/utils_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ def inlineinput_constructor(loader: yaml.SafeLoader, node: yaml.nodes.Node) -> D
if isinstance(node, yaml.nodes.ScalarNode):
try:
# loader.construct_scalar always returns a string, whereas
val = yaml.safe_load(node.value)
if node.value == "":
val = ""
else:
val = yaml.safe_load(node.value)
# yaml.safe_load returns the correct primitive types
except Exception:
# but fallback to a string if it is not actually a primitive type.
Expand Down
4 changes: 2 additions & 2 deletions tests/data/ict_data/czi_extract/czi_extract_clt.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"baseCommand": "[python3, main.py]",
"baseCommand": [],
"class": "CommandLineTool",
"cwlVersion": "v1.2",
"doc": "None",
Expand All @@ -23,7 +23,7 @@
"outputBinding": {
"glob": "$(inputs.outDir.basename)"
},
"type": "File"
"type": "Directory"
}
},
"requirements": {
Expand Down
4 changes: 2 additions & 2 deletions tests/data/ict_data/label_to_vector/label_to_vector_clt.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"baseCommand": "[python3, main.py]",
"baseCommand": [],
"class": "CommandLineTool",
"cwlVersion": "v1.2",
"doc": "None",
Expand Down Expand Up @@ -29,7 +29,7 @@
"outputBinding": {
"glob": "$(inputs.outDir.basename)"
},
"type": "File"
"type": "Directory"
}
},
"requirements": {
Expand Down
Loading
Loading