Skip to content

Commit

Permalink
Refactor matchtasks() to reduce complexity (ansible#1825)
Browse files Browse the repository at this point in the history
Co-authored-by: Sorin Sbarnea <ssbarnea@redhat.com>
  • Loading branch information
cognifloyd and ssbarnea authored Jan 25, 2022
1 parent c64f99b commit cea0415
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 35 deletions.
26 changes: 10 additions & 16 deletions src/ansiblelint/rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import ansiblelint.skip_utils
import ansiblelint.utils
import ansiblelint.yaml_utils
from ansiblelint._internal.rules import (
AnsibleParserErrorRule,
BaseRule,
Expand All @@ -21,7 +22,7 @@
)
from ansiblelint.config import get_rule_config, options
from ansiblelint.errors import MatchError
from ansiblelint.file_utils import Lintable
from ansiblelint.file_utils import Lintable, expand_paths_vars

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -107,24 +108,17 @@ def matchtasks(self, file: Lintable) -> List[MatchError]:
):
return matches

yaml = ansiblelint.utils.parse_yaml_linenumbers(file)
if not yaml:
return matches

yaml = ansiblelint.skip_utils.append_skipped_rules(yaml, file)

try:
tasks = ansiblelint.utils.get_normalized_tasks(yaml, file)
except MatchError as e:
return [e]
tasks_iterator = ansiblelint.yaml_utils.iter_tasks_in_file(file, self.id)
for raw_task, task, skipped, error in tasks_iterator:
if error is not None:
# normalize_task converts AnsibleParserError to MatchError
return [error]

for task in tasks:
if self.id in task.get('skipped_rules', ()):
if skipped or 'action' not in task:
continue

if 'action' not in task:
continue
result = self.matchtask(task, file=file)

if not result:
continue

Expand Down Expand Up @@ -208,7 +202,7 @@ def __init__(
self.options = options
if rulesdirs is None:
rulesdirs = []
self.rulesdirs = ansiblelint.file_utils.expand_paths_vars(rulesdirs)
self.rulesdirs = expand_paths_vars(rulesdirs)
self.rules: List[BaseRule] = []
# internal rules included in order to expose them for docs as they are
# not directly loaded by our rule loader.
Expand Down
18 changes: 0 additions & 18 deletions src/ansiblelint/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,24 +707,6 @@ def get_action_tasks(yaml: AnsibleBaseYAMLObject, file: Lintable) -> List[Any]:
]


def get_normalized_tasks(
yaml: "AnsibleBaseYAMLObject", file: Lintable
) -> List[Dict[str, Any]]:
"""Extract normalized tasks from a file."""
tasks = get_action_tasks(yaml, file)
res = []
for task in tasks:
# An empty `tags` block causes `None` to be returned if
# the `or []` is not present - `task.get('tags', [])`
# does not suffice.
if 'skip_ansible_lint' in (task.get('tags') or []):
# No need to normalize_task is we are skipping it.
continue
res.append(normalize_task(task, str(file.path)))

return res


@lru_cache(maxsize=128)
def parse_yaml_linenumbers(lintable: Lintable) -> AnsibleBaseYAMLObject:
"""Parse yaml as ansible.utils.parse_yaml but with linenumbers.
Expand Down
63 changes: 62 additions & 1 deletion src/ansiblelint/yaml_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,67 @@
"""Utility helpers to simplify working with yaml-based data."""
import functools
from typing import Any, Callable, Dict, Iterator, List, Tuple, Union, cast
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union, cast

import ansiblelint.skip_utils
from ansiblelint.errors import MatchError
from ansiblelint.file_utils import Lintable
from ansiblelint.utils import get_action_tasks, normalize_task, parse_yaml_linenumbers


def iter_tasks_in_file(
lintable: Lintable, rule_id: str
) -> Iterator[Tuple[Dict[str, Any], Dict[str, Any], bool, Optional[MatchError]]]:
"""Iterate over tasks in file.
This yields a 4-tuple of raw_task, normalized_task, skipped, and error.
raw_task:
When looping through the tasks in the file, each "raw_task" is minimally
processed to include these special keys: __line__, __file__, skipped_rules.
normalized_task:
When each raw_task is "normalized", action shorthand (strings) get parsed
by ansible into python objects and the action key gets normalized. If the task
should be skipped (skipped is True) or normalizing it fails (error is not None)
then this is just the raw_task instead of a normalized copy.
skipped:
Whether or not the task should be skipped according to tags or skipped_rules.
error:
This is normally None. It will be a MatchError when the raw_task cannot be
normalized due to an AnsibleParserError.
:param lintable: The playbook or tasks/handlers yaml file to get tasks from
:param rule_id: The current rule id to allow calculating skipped.
:return: raw_task, normalized_task, skipped, error
"""
data = parse_yaml_linenumbers(lintable)
if not data:
return
data = ansiblelint.skip_utils.append_skipped_rules(data, lintable)

raw_tasks = get_action_tasks(data, lintable)

for raw_task in raw_tasks:
err: Optional[MatchError] = None

# An empty `tags` block causes `None` to be returned if
# the `or []` is not present - `task.get('tags', [])`
# does not suffice.
skipped_in_task_tag = 'skip_ansible_lint' in (raw_task.get('tags') or [])
skipped_in_yaml_comment = rule_id in raw_task.get('skipped_rules', ())
skipped = skipped_in_task_tag or skipped_in_yaml_comment
if skipped:
yield raw_task, raw_task, skipped, err
continue

try:
normalized_task = normalize_task(raw_task, str(lintable.path))
except MatchError as err:
# normalize_task converts AnsibleParserError to MatchError
yield raw_task, raw_task, skipped, err
return

yield raw_task, normalized_task, skipped, err


def nested_items_path(
Expand Down
17 changes: 17 additions & 0 deletions test/test_yaml_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,23 @@
import pytest

import ansiblelint.yaml_utils
from ansiblelint.file_utils import Lintable


@pytest.fixture
def empty_lintable() -> Lintable:
"""Return a Lintable with no contents."""
lintable = Lintable("__empty_file__")
lintable._content = ""
return lintable


def test_iter_tasks_in_file_with_empty_file(empty_lintable: Lintable) -> None:
"""Make sure that iter_tasks_in_file returns early when files are empty."""
res = list(
ansiblelint.yaml_utils.iter_tasks_in_file(empty_lintable, "some-rule-id")
)
assert res == []


def test_nested_items_path() -> None:
Expand Down

0 comments on commit cea0415

Please sign in to comment.