Skip to content

Commit

Permalink
Merge pull request #122 from jfennick/inline_wic_tag
Browse files Browse the repository at this point in the history
Inline wic tag
  • Loading branch information
jfennick authored Dec 1, 2023
2 parents deb7779 + 01337ed commit ba539d1
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 17 deletions.
98 changes: 89 additions & 9 deletions src/wic/ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import re
import sys
import traceback
from typing import Dict, List
from typing import Dict, List, Tuple

from mergedeep import merge, Strategy
from jsonschema import Draft202012Validator
Expand Down Expand Up @@ -136,7 +136,7 @@ def merge_yml_trees(yaml_tree_tuple: YamlTree,

# Check for top-level yml dsl args
wic_self = {'wic': yaml_tree.get('wic', {})}
wic = merge(wic_self, wic_parent, strategy=Strategy.TYPESAFE_REPLACE) # TYPESAFE_ADDITIVE ?
wic = merge(wic_self, wic_parent, strategy=Strategy.TYPESAFE_REPLACE)
# Here we want to ADD wic: as a top-level yaml tag.
# In the compilation phase, we want to remove it.
yaml_tree['wic'] = wic['wic']
Expand Down Expand Up @@ -287,7 +287,7 @@ def get_inlineable_subworkflows(yaml_tree_tuple: YamlTree,
return namespaces


def inline_subworkflow(yaml_tree_tuple: YamlTree, namespaces: Namespaces) -> YamlTree:
def inline_subworkflow(yaml_tree_tuple: YamlTree, namespaces: Namespaces) -> Tuple[YamlTree, int]:
"""Inlines the given subworkflow into its immediate parent workflow.
Args:
Expand All @@ -298,7 +298,7 @@ def inline_subworkflow(yaml_tree_tuple: YamlTree, namespaces: Namespaces) -> Yam
YamlTree: The updated root workflow with the given subworkflow inlined into its immediate parent workflow.
"""
if namespaces == []:
return yaml_tree_tuple
return yaml_tree_tuple, 0

(step_id, yaml_tree) = copy.deepcopy(yaml_tree_tuple)
yaml_name = step_id.stem
Expand All @@ -308,15 +308,16 @@ def inline_subworkflow(yaml_tree_tuple: YamlTree, namespaces: Namespaces) -> Yam
if len(namespaces) == 1: # and namespaces[0] == yaml_name ?
(back_name_, yaml_tree) = utils.extract_backend(yaml_tree, wic['wic'], Path(''))
yaml_tree = {'steps': yaml_tree['steps']} # Remove wic tag
return YamlTree(step_id, yaml_tree) # TODO: check step_id
len_substeps = len(yaml_tree['steps'])
return YamlTree(StepId(back_name_, step_id.plugin_ns), yaml_tree), 0 # len_substeps # TODO: check step_id

# Pass namespaces through unmodified
backends_trees = []
for stepid, back in wic['wic']['backends'].items():
backend_tree = inline_subworkflow(YamlTree(stepid, back), namespaces)
backend_tree, len_substeps = inline_subworkflow(YamlTree(stepid, back), namespaces)
backends_trees.append(backend_tree)
yaml_tree['wic']['backends'] = dict(backends_trees)
return YamlTree(step_id, yaml_tree)
return YamlTree(step_id, yaml_tree), 0 # choose len_substeps from which backend?

steps: List[Yaml] = yaml_tree['steps']
steps_keys = utils.get_steps_keys(steps)
Expand All @@ -335,6 +336,7 @@ def inline_subworkflow(yaml_tree_tuple: YamlTree, namespaces: Namespaces) -> Yam
sub_yml_tree = steps[i][step_key]['subtree']
sub_parentargs = steps[i][step_key]['parentargs']

len_substeps = 0
if len(namespaces) == 1:
steps_inits = steps[:i] # Exclude step i
steps_tails = steps[i+1:] # Exclude step i
Expand All @@ -343,10 +345,28 @@ def inline_subworkflow(yaml_tree_tuple: YamlTree, namespaces: Namespaces) -> Yam
yaml_tree['steps'] = steps_inits + sub_steps + steps_tails
# Need to re-index both the sub-step numbers as well as the
# subsequent steps in this workflow? No, except for wic: steps:
len_substeps = len(sub_steps)

parent_wic_tag = wic.get('wic', {}).get("steps", {}).get(
f'({i + 1}, {step_key})', {}).get('wic', {})
sub_wic_tag = sub_yml_tree.get('wic', {})

# TODO: need cleaner code to make arbitrary-depth dictionary.
if 'wic' not in wic:
wic['wic'] = {}
if 'steps' not in wic['wic']:
wic['wic']['steps'] = {}
if f'({i + 1}, {step_key})' not in wic['wic']['steps']:
wic['wic']['steps'][f'({i + 1}, {step_key})'] = {}

# Merge parent into child to support overloading.
# TODO: Need to sort the steps by index
wic['wic']['steps'][f'({i + 1}, {step_key})']['wic'] = \
merge(sub_wic_tag, parent_wic_tag, strategy=Strategy.TYPESAFE_REPLACE)
else:
# Strip off one initial namespace
y_t = YamlTree(StepId(step_key, step_id.plugin_ns), sub_yml_tree)
(step_key_, sub_yml_tree) = inline_subworkflow(y_t, namespaces[1:])
(step_key_, sub_yml_tree), len_substeps = inline_subworkflow(y_t, namespaces[1:])
# TODO: re-index wic: steps: ? We probably should, although
# inlineing after merging should not affect CWL args.
# Re-indexing could be tricky w.r.t. overloading.
Expand All @@ -356,7 +376,67 @@ def inline_subworkflow(yaml_tree_tuple: YamlTree, namespaces: Namespaces) -> Yam
# appear to be inlineing invariant! However, using ~ syntax helps.
steps[i][step_key] = {'subtree': sub_yml_tree, 'parentargs': sub_parentargs}

return YamlTree(step_id, yaml_tree)
yaml_tree['wic'] = inline_subworkflow_wic_tag(wic, namespaces, len_substeps)

return YamlTree(step_id, yaml_tree), len_substeps


def inline_subworkflow_wic_tag(wic_tag: Yaml, namespaces: Namespaces, len_substeps: int) -> Yaml:
"""Inlines the wic metadata tags associated with the given subworkflow into its immediate parent wic.
Args:
wic_tag (Yaml): The wicmetadata tag associated with the given workflow
namespaces (Namespaces): Specifies the path in the yml AST to the subworkflow to be inlined.
len_substeps (int): The number of steps in the subworkflow to be inlined.
Returns:
Yaml: The updated wic metadata tag with the wic metadata tag associated with the given subworkflow inlined.
"""
tag_wic: Yaml = wic_tag['wic']

# Note: the index after parsing is 0-based.
step_ints_names = [utils.parse_step_name_str(ns)[1:] for ns in namespaces]

sub_wic_parent = wic_tag # initialize to the 'root' wic tag
# Traverse down to the parent node of the subworkflow to the inlined
for index, step_name in step_ints_names[:-1]:
sub_wic_parent = sub_wic_parent.get('wic', {}).get('steps', {}).get(f'({index + 1}, {step_name})', {})
# Note: if any of the intermediate workflows in the path in the AST tree
# from the current workflow to the subworkflow being inlined is absent in the current
# wic metadata tag, the inlining won't have any effect on the wic tag of this workflow.
# Note: When there're other options like 'graphviz' but not 'steps', we can also short
# circuit and return.
if 'steps' not in sub_wic_parent.get('wic', {}):
return tag_wic # If path does not exist, do nothing and short circuit

# Then get the wic tag of the subworkflow
# Note: sub_index is 0-based.
sub_index, sub_step_name = step_ints_names[-1]
sub_wic = sub_wic_parent.get('wic', {}).get('steps', {}).get(f'({sub_index + 1}, {sub_step_name})', {})

# Note: we should not short circuit when the subworkflow being inlined is not used in the
# current wic tag, since inlining it will affect the indices of sibling steps following it.
sub_wic_steps_reindexed = utils.reindex_wic_steps(sub_wic.get('wic', {}).get('steps', {}), 1, sub_index)

# Delete the subworkflow from the parent workflow since it is replaced by its internal steps.
# This needs to be explicitly done since the key of this subworkflow in the dict is not
# the same as any of its inlined steps and therefore won't be overwritten by the deep merge.
if f'({sub_index + 1}, {sub_step_name})' in sub_wic_parent.get('wic', {}).get('steps', {}):
del sub_wic_parent['wic']['steps'][f'({sub_index + 1}, {sub_step_name})']

# The inlining is actually a replacement of the target subworkflows by its steps.
# Therefore, the incrementing count should be len_substeps - 1.
sub_wic_parent_steps_reindexed = utils.reindex_wic_steps(sub_wic_parent['wic']['steps'],
sub_index + 1, len_substeps - 1)

# Merge the wic: steps: dicts and mutably update the parent
# Merge parent into child to support overloading.
# TODO: The 'ranksame' in the wic tag of the inlined subworkflow is ignored
# and not merged for now.
sub_wic_parent['wic']['steps'] = merge(sub_wic_steps_reindexed, sub_wic_parent_steps_reindexed,
strategy=Strategy.TYPESAFE_REPLACE)

return tag_wic


def move_slash_last(source_new: str) -> str:
Expand Down
2 changes: 1 addition & 1 deletion src/wic/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ def insert_step_into_workflow(yaml_tree_orig: Yaml, stepid: StepId, tools: Tools

if 'wic' in yaml_tree_mod:
if 'steps' in yaml_tree_mod['wic']:
yaml_tree_mod['wic']['steps'] = utils.reindex_wic_steps(yaml_tree_mod['wic']['steps'], i)
yaml_tree_mod['wic']['steps'] = utils.reindex_wic_steps(yaml_tree_mod['wic']['steps'], i+1)
yaml_tree_mod['wic']['steps'][keystr] = inf_dict
else:
yaml_tree_mod['wic'].update({'steps': {keystr: inf_dict}})
Expand Down
2 changes: 1 addition & 1 deletion src/wic/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def main() -> None:
break

# print('inlineing', namespaces_list[0])
yaml_tree = ast.inline_subworkflow(yaml_tree, namespaces_list[0])
yaml_tree, _len_substeps = ast.inline_subworkflow(yaml_tree, namespaces_list[0])

with open(f'autogenerated/{Path(yaml_path).stem}_tree_merged_inlined.yml', mode='w', encoding='utf-8') as f:
f.write(yaml.dump(yaml_tree.yml))
Expand Down
13 changes: 8 additions & 5 deletions src/wic/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,21 +450,24 @@ def parse_int_string_tuple(string: str) -> Tuple[int, str]:
return (int(str1.strip()), str2.strip())


def reindex_wic_steps(wic_steps: Yaml, index: int) -> Yaml:
"""After inserting a step into a workflow, we need to increment the steps in\n
the wic: metadata annotations tag whose original index is >= the given index.
def reindex_wic_steps(wic_steps: Yaml, index: int, num_steps: int = 1) -> Yaml:
""" Increment 1-based step index starting from the step with the given index by num_steps.
This function can be used to reindex steps after inserting num_steps at the given index: in\n
the wic: metadata annotations tag whose index (before insertion) is >= the given index.
Args:
wic_steps (Yaml): The steps: subtag of the wic: metadata annotations tag.
index (int): The (zero-based) index of the inserted workflow step.
index (int): The (one-based) start index that needs to be reindexed.
num_steps (int): The number of steps inserted.
Returns:
Yaml: The updated wic: steps: tag, with the appropriate indices incremented.
"""
wic_steps_reindexed = {}
for keystr, val in wic_steps.items():
(i, s) = parse_int_string_tuple(keystr)
newstr = f'({i+1}, {s})' if i >= index else keystr
newstr = f'({i+num_steps}, {s})' if i >= index else keystr
wic_steps_reindexed[newstr] = val
return wic_steps_reindexed

Expand Down
2 changes: 1 addition & 1 deletion tests/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def test_inline_subworkflows(yml_path_str: str, yml_path: Path) -> None:

# Inline each subworkflow individually and check that the graphs are isomorphic.
for namespaces in namespaces_list:
inline_yaml_tree = wic.ast.inline_subworkflow(yaml_tree, namespaces)
inline_yaml_tree, _len_substeps = wic.ast.inline_subworkflow(yaml_tree, namespaces)

inline_graph = get_graph_reps(str(yml_path))
inline_compiler_info = wic.compiler.compile_workflow(inline_yaml_tree, args,
Expand Down

0 comments on commit ba539d1

Please sign in to comment.