Skip to content

Commit

Permalink
ICT spec updates and WFB payload fix (#282)
Browse files Browse the repository at this point in the history
* Remove error when looking for output in inputs

* Fix bug when parsing outputs

* Remove incorrect keys from ui in ICT dict

* Add function to add missing inputs to nodes in wfb

* Make UI optional in ict spec and remove ui if found

* Add test for updating nodes in wfb payload

* Update ui validation

* Update wfb type to Json

* Update wfb nodes before raw to lean

* Remove update to ui before clt conversion

* Move is_inlet inside of wfb fixing function

* Check that plugins are in wfb data

* Move schema validation to payload update function

* Check if plugins are present in WFB data
  • Loading branch information
JesseMckinzie authored Oct 9, 2024
1 parent a908c88 commit 11705e1
Show file tree
Hide file tree
Showing 10 changed files with 1,757 additions and 30 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/run_workflows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ jobs:
# NOTE: Do NOT add coverage to PYPY CI runs https://github.com/tox-dev/tox/issues/2252
run: cd workflow-inference-compiler/ && pytest tests/test_ict_to_clt_conversion.py -k test_ict_to_clt

- name: PyTest Run update wfb payload Tests
if: always()
# NOTE: Do NOT add coverage to PYPY CI runs https://github.com/tox-dev/tox/issues/2252
run: cd workflow-inference-compiler/ && pytest tests/test_fix_payload.py -k test_fix

# NOTE: The steps below are for repository_dispatch only. For all other steps, please insert above
# this comment.

Expand Down
1 change: 1 addition & 0 deletions src/sophios/api/http/restapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ async def compile_wf(request: Request) -> Json:
req: Json = await request.json()
# clean up and convert the incoming object
# schema preserving
req = converter.update_payload_missing_inputs_outputs(req)
wfb_payload = converter.raw_wfb_to_lean_wfb(req)
# schema non-preserving
workflow_temp = converter.wfb_to_wic(wfb_payload)
Expand Down
74 changes: 64 additions & 10 deletions src/sophios/api/utils/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,7 @@ def extract_state(inp: Json) -> Json:
inp_inter['state']['links'] = step_edges
# massage the plugins
plugins = inp_inter['plugins']
# drop incorrect/superfluous UI fields from plugins
# 'required' and 'format'
for ict_plugin in plugins:
for ui_elem in ict_plugin['ui']:
_, _ = ui_elem.pop('required', None), ui_elem.pop('format', None)
for out in ict_plugin['outputs']:
if out['name'] == 'outDir':
ict_plugin['inputs'].append(out)

# Here goes the ICT to CLT extraction logic
for node in inp_inter['state']['nodes']:
node_pid = node["pluginId"]
Expand All @@ -78,8 +71,6 @@ def extract_state(inp: Json) -> Json:

def raw_wfb_to_lean_wfb(inp: Json) -> Json:
"""Drop all the unnecessary info from incoming wfb object"""
if validate_schema_and_object(SCHEMA, inp):
print('incoming object is valid against input object schema')
inp_restrict = extract_state(inp)
keys = list(inp_restrict.keys())
# To avoid deserialization
Expand Down Expand Up @@ -193,3 +184,66 @@ def ict_to_clt(ict: Union[ICT, Path, str, dict], network_access: bool = False) -
ict_local = ict if isinstance(ict, ICT) else cast_to_ict(ict)

return ict_local.to_clt(network_access=network_access)


def update_payload_missing_inputs_outputs(wfb_data: Json) -> Json:
"""Update payload with missing inputs and outputs using links"""

# ensure the incoming wfb data is valid
if validate_schema_and_object(SCHEMA, wfb_data):
print('incoming object is valid against input object schema')

# return if no plugins are found in data
if not wfb_data['plugins']:
return wfb_data

wfb_data_copy = copy.deepcopy(wfb_data)

links = wfb_data_copy["state"]["links"]
nodes = wfb_data_copy["state"]["nodes"]
plugins = wfb_data_copy["plugins"]

# hashmap of node id to nodes for fast node lookup
nodes_dict = {node['id']: node for node in nodes}

# hashmap of plugins id to nodes for fast plugin lookup
plugins_dict = {plugin['pid']: plugin for plugin in plugins}

# find links corresponding to the node
for link in links:

# link ids
target_id: int = link["targetId"]
source_id: int = link["sourceId"]

target_node = nodes_dict[target_id]
source_node = nodes_dict[source_id]

# plugins corresponding to the nodes
target_plugin = plugins_dict[target_node["pluginId"]]
source_plugin = plugins_dict[source_node["pluginId"]]

def is_inlet(binding: Json) -> bool:
"""Check if a wfb input is an inlet (directory)"""

return (
binding['type'] in ['directory', 'file', 'path', 'collection', 'csvCollection'] or
binding['name'].lower() == 'inpdir' or
binding['name'].lower().endswith('path') or
binding['name'].lower().endswith('dir')
)

# filter inputs by to only be inlets (directories)
input_directories = [binding for binding in target_plugin["inputs"] if is_inlet(binding)]
output_directories = [binding for binding in source_plugin["outputs"] if is_inlet(binding)]

missing_input_key = input_directories[link["inletIndex"]]["name"]
missing_output_key = output_directories[link["outletIndex"]]["name"]

# add the missing input value to the node if needed
target_node["settings"]["inputs"][missing_input_key] = source_node["settings"]["outputs"][missing_output_key]

if validate_schema_and_object(SCHEMA, wfb_data_copy):
print('Updated object is valid against input object schema')

return wfb_data_copy
5 changes: 5 additions & 0 deletions src/sophios/api/utils/ict/ict_spec/cast.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def cast_to_ict(ict: Union[Path, str, dict]) -> ICT:
ict = Path(ict)

if isinstance(ict, Path):

if str(ict).endswith(".yaml") or str(ict).endswith(".yml"):
with open(ict, "r", encoding="utf-8") as f_o:
data = safe_load(f_o)
Expand All @@ -22,6 +23,10 @@ def cast_to_ict(ict: Union[Path, str, dict]) -> ICT:
else:
raise ValueError(f"File extension not supported: {ict}")

data.pop("ui", None)

return ICT(**data)

ict.pop("ui", None)

return ICT(**ict)
1 change: 0 additions & 1 deletion src/sophios/api/utils/ict/ict_spec/io/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,5 +144,4 @@ def _output_to_cwl(self, inputs: Any) -> dict:
cwl_dict_["format"] = self.convert_uri_format(self.io_format["uri"])
return cwl_dict_

raise ValueError(f"Output {self.name} not found in inputs")
raise NotImplementedError(f"Output not supported {self.name}")
38 changes: 20 additions & 18 deletions src/sophios/api/utils/ict/ict_spec/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,31 @@ class ICT(Metadata):

inputs: list[IO]
outputs: list[IO]
ui: list[UIItem]
ui: Optional[list[UIItem]] = None
hardware: Optional[HardwareRequirements] = None

@model_validator(mode="after")
def validate_ui(self) -> "ICT":
"""Validate that the ui matches the inputs and outputs."""
io_dict = {"inputs": [], "outputs": []} # type: ignore
ui_keys = [ui.key.root.split(".") for ui in self.ui]
for ui_ in ui_keys:
io_dict[ui_[0]].append(ui_[1])
input_names = [io.name for io in self.inputs]
output_names = [io.name for io in self.outputs]
inp_bool = [x in input_names for x in io_dict["inputs"]]
out_bool = [x in output_names for x in io_dict["outputs"]]

# if not all(inp_bool):
# raise ValueError(
# f"The ui keys must match the inputs and outputs keys. Unmatched: inputs.{set(io_dict['inputs'])-set(input_names)}"
# )
# if not all(out_bool):
# raise ValueError(
# f"The ui keys must match the inputs and outputs keys. Unmatched: outputs.{set(io_dict['outputs'])-set(output_names)}"
# )
if self.ui is not None:
io_dict = {"inputs": [], "outputs": []} # type: ignore
ui_keys = [ui.key.root.split(".") for ui in self.ui]
for ui_ in ui_keys:
io_dict[ui_[0]].append(ui_[1])
input_names = [io.name for io in self.inputs]
output_names = [io.name for io in self.outputs]
inp_bool = [x in input_names for x in io_dict["inputs"]]
out_bool = [x in output_names for x in io_dict["outputs"]]

# if not all(inp_bool):
# raise ValueError(
# f"The ui keys must match the inputs and outputs keys. Unmatched: inputs.{set(io_dict['inputs'])-set(input_names)}"
# )
# if not all(out_bool):
# raise ValueError(
# f"The ui keys must match the inputs and outputs keys. Unmatched: outputs.{set(io_dict['outputs'])-set(output_names)}"
# )

return self

def to_clt(self, network_access: bool = False) -> dict:
Expand Down
2 changes: 1 addition & 1 deletion src/sophios/api/utils/ict/ict_spec/tools/cwl_ict.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def clt_dict(ict_: "ICT", network_access: bool) -> dict:
},
"outputs": {
io.name: io._output_to_cwl(
[io.name for io in ict_.inputs]
[io.name for io in ict_.outputs]
) # pylint: disable=W0212
for io in ict_.outputs
},
Expand Down
Loading

0 comments on commit 11705e1

Please sign in to comment.