Skip to content

Commit

Permalink
Fix custom "+tag:some_tag" selector issue related to tests tag inheri…
Browse files Browse the repository at this point in the history
…tance (#1466)

The selector method `_should_include_node` changes test tasks to inherit
tags from their parent nodes. While this behaviour is acceptable and
desirable in some cases, it can cause problems using graph selectors
with tags.

This PR improves the test coverage, narrows down the problem and fixes
the problem reported by Astronomer customers. More details below.

A user reported that they see the correct `DbtDag` when using Cosmos
1.8.1 with:

* `LoadMode.DBT_LS`
* `RenderConfig(selector="accounts_marts")`

Where the selector `accounts_marts` is defined as:

```
    - name: accounts_marts
      description: Run Accounts models
      definition:
        intersection:
          - '+tag:accounts'
          - '+tag:datamart'
          - '+tag:stratus'
```

The expected behaviour includes:
- 164 Airflow tasks
- 152 Local run tasks
- 12 Snapshot tasks

However, when they attempt to run the same `DbtDag` using:

* `LoadMode.DBT_MANIFEST`
* `RenderConfig(select=["+tag:accounts,+tag:datamart,+tag:stratus"])`

Their `DbtDag` seems to have the wrong subset of nodes.

They reported:
- 197 Airflow tasks
- 183 Local run tasks
- 14 Snapshot tasks

This pull request aims to reproduce and fix this issue.
  • Loading branch information
tatiana authored Jan 21, 2025
1 parent de84174 commit 74c7c7d
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 10 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
Changelog
=========

1.9.0a1 (2025-01-20)
--------------------

Bug Fixes

* Fix select complex intersection of three tag-based graph selectors by @tatiana in #1466


1.8.2 (2025-01-15)
--------------------

Expand Down
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
Contains dags, task groups, and operators.
"""

__version__ = "1.8.2"
__version__ = "1.9.0a1"


from cosmos.airflow.dag import DbtDag
Expand Down
27 changes: 18 additions & 9 deletions cosmos/dbt/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]:
"""
selected_nodes: set[str] = set()
root_nodes: set[str] = set()

# Index nodes by name, we can improve performance by doing this once
# for multiple GraphSelectors
if PATH_SELECTOR in self.node_name:
Expand Down Expand Up @@ -367,25 +366,36 @@ def select_nodes_ids_by_intersection(self) -> set[str]:
selected_nodes: set[str] = set()
self.visited_nodes: set[str] = set()

for node_id, node in self.nodes.items():
if self._should_include_node(node_id, node):
selected_nodes.add(node_id)

if self.config.graph_selectors:
nodes_by_graph_selector = self.select_by_graph_operator()
selected_nodes = selected_nodes.intersection(nodes_by_graph_selector)
graph_selected_nodes = self.select_by_graph_operator()
for node_id in graph_selected_nodes:
node = self.nodes[node_id]
# Since the method below changes the tags of test nodes, it can lead to incorrect
# results during the application of graph selectors. Therefore, it is being run within
# nodes previously selected
# This solves https://github.com/astronomer/astronomer-cosmos/pull/1466
if self._should_include_node(node_id, node):
selected_nodes.add(node_id)
else:
for node_id, node in self.nodes.items():
if self._should_include_node(node_id, node):
selected_nodes.add(node_id)

self.selected_nodes = selected_nodes
return selected_nodes

def _should_include_node(self, node_id: str, node: DbtNode) -> bool:
"""Checks if a single node should be included. Only runs once per node with caching."""
"""
Checks if a single node should be included. Only runs once per node with caching."""
logger.debug("Inspecting if the node <%s> should be included.", node_id)
if node_id in self.visited_nodes:
return node_id in self.selected_nodes

self.visited_nodes.add(node_id)

# Disclaimer: this method currently copies the tags from parent nodes to children nodes
# that are tests. This can lead to incorrect results in graph node selectors such as reported in
# https://github.com/astronomer/astronomer-cosmos/pull/1466
if node.resource_type == DbtResourceType.TEST and node.depends_on and len(node.depends_on) > 0:
node.tags = getattr(self.nodes.get(node.depends_on[0]), "tags", [])
logger.debug(
Expand Down Expand Up @@ -498,7 +508,6 @@ def select_nodes(
exclude = exclude or []
if not select and not exclude:
return nodes

validate_filters(exclude, select)
subset_ids = apply_select_filter(nodes, project_dir, select)
if select:
Expand Down
89 changes: 89 additions & 0 deletions tests/dbt/test_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def test_is_empty_config(selector_config, paths, tags, config, other, expected):
config={"materialized": "view", "tags": ["has_child", "is_child"]},
)


child_node = DbtNode(
unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.child",
resource_type=DbtResourceType.MODEL,
Expand Down Expand Up @@ -183,6 +184,94 @@ def test_select_nodes_by_select_intersection_config_tag():
assert selected == expected


def test_select_nodes_by_select_intersection_config_graph_selector_includes_ancestors():
selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+child,+sibling1"])
expected = {
grandparent_node.unique_id: grandparent_node,
another_grandparent_node.unique_id: another_grandparent_node,
parent_node.unique_id: parent_node,
}
assert selected == expected


def test_select_nodes_by_select_intersection_config_graph_selector_none():
selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+child,+orphaned"])
expected = {}
assert selected == expected


def test_select_nodes_by_intersection_and_tag_ancestry():
parent_sibling_node = DbtNode(
unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.parent_sibling",
resource_type=DbtResourceType.MODEL,
depends_on=[grandparent_node.unique_id, another_grandparent_node.unique_id],
file_path=SAMPLE_PROJ_PATH / "gen2/models/parent_sibling.sql",
tags=["is_adopted"],
config={"materialized": "view", "tags": ["is_adopted"]},
)
sample_nodes_with_parent_sibling = dict(sample_nodes)
sample_nodes_with_parent_sibling[parent_sibling_node.unique_id] = parent_sibling_node
selected = select_nodes(
project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes_with_parent_sibling, select=["+tag:is_child,+tag:is_adopted"]
)
expected = {
grandparent_node.unique_id: grandparent_node,
another_grandparent_node.unique_id: another_grandparent_node,
}
assert selected == expected


def test_select_nodes_by_tag_ancestry():
parent_sibling_node = DbtNode(
unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.parent_sibling",
resource_type=DbtResourceType.MODEL,
depends_on=[grandparent_node.unique_id, another_grandparent_node.unique_id],
file_path=SAMPLE_PROJ_PATH / "gen2/models/parent_sibling.sql",
tags=["is_adopted"],
config={"materialized": "view", "tags": ["is_adopted"]},
)
sample_nodes_with_parent_sibling = dict(sample_nodes)
sample_nodes_with_parent_sibling[parent_sibling_node.unique_id] = parent_sibling_node
selected = select_nodes(
project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes_with_parent_sibling, select=["+tag:is_adopted"]
)
expected = {
grandparent_node.unique_id: grandparent_node,
another_grandparent_node.unique_id: another_grandparent_node,
parent_sibling_node.unique_id: parent_sibling_node,
}
assert selected == expected


def test_select_nodes_with_test_by_intersection_and_tag_ancestry():
parent_sibling_node = DbtNode(
unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.parent_sibling",
resource_type=DbtResourceType.MODEL,
depends_on=[grandparent_node.unique_id, another_grandparent_node.unique_id],
file_path="",
tags=["is_adopted"],
config={"materialized": "view", "tags": ["is_adopted"]},
)
test_node = DbtNode(
unique_id=f"{DbtResourceType.TEST.value}.{SAMPLE_PROJ_PATH.stem}.test",
resource_type=DbtResourceType.TEST,
depends_on=[parent_node.unique_id, parent_sibling_node.unique_id],
file_path="",
config={},
)
new_sample_nodes = dict(sample_nodes)
new_sample_nodes[parent_sibling_node.unique_id] = parent_sibling_node
new_sample_nodes[test_node.unique_id] = test_node
selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=new_sample_nodes, select=["+tag:has_child"])
# Expected must not include `parent_sibling_node` nor `test_node`
expected = {
parent_node.unique_id: parent_node,
grandparent_node.unique_id: grandparent_node,
another_grandparent_node.unique_id: another_grandparent_node,
}
assert selected == expected


def test_select_nodes_by_select_path():
selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["path:gen2/models"])
expected = {
Expand Down

0 comments on commit 74c7c7d

Please sign in to comment.