From d0ddb96aea48af342faacb8c468214679dcc9efa Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Mon, 5 Jun 2023 10:53:55 -0700 Subject: [PATCH 1/4] wip --- core/dbt/contracts/graph/manifest.py | 25 +++++++++++++++++++++++++ core/dbt/contracts/graph/nodes.py | 9 +++++++++ 2 files changed, 34 insertions(+) diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index 3f4f3adec2c..4c9f9d4f95f 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -39,6 +39,7 @@ BaseNode, ManifestOrPublicNode, ModelNode, + RelationalNode, ) from dbt.contracts.graph.unparsed import SourcePatch, NodeVersion, UnparsedVersion from dbt.contracts.graph.manifest_upgrade import upgrade_manifest_json @@ -1133,6 +1134,30 @@ def merge_from_artifact( sample = list(islice(merged, 5)) fire_event(MergedFromState(num_merged=len(merged), sample=sample)) + # Called by CloneTask.defer_to_manifest + def add_from_artifact( + self, + other: "WritableManifest", + ) -> None: + """Update this manifest by *adding* information about each node's location + in the other manifest. + + Only non-ephemeral refable nodes are examined. + """ + refables = set(NodeType.refable()) + for unique_id, node in other.nodes.items(): + current = self.nodes.get(unique_id) + if current and (node.resource_type in refables and not node.is_ephemeral): + other_node = other.nodes[unique_id] + state_relation = RelationalNode( + other_node.database, other_node.schema, other_node.alias + ) + self.nodes[unique_id] = current.replace(state_relation=state_relation) + + # Rebuild the flat_graph, which powers the 'graph' context variable, + # now that we've deferred some nodes + self.build_flat_graph() + # Methods that were formerly in ParseResult def add_macro(self, source_file: SourceFile, macro: Macro): diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index 24cd8f7ddcb..a3b988f35ff 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -259,6 +259,15 @@ def add_macro(self, value: str): self.macros.append(value) +@dataclass +class RelationalNode(HasRelationMetadata): + alias: str + + @property + def identifier(self): + return self.alias + + @dataclass class DependsOn(MacroDependsOn): nodes: List[str] = field(default_factory=list) From f1f3f45285e14e34877977fef090b1b20e8faa79 Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Mon, 5 Jun 2023 23:46:22 -0700 Subject: [PATCH 2/4] Fixes #7551: Create `add_from_artifact` to populate `state_relation` field of nodes --- test/unit/test_manifest.py | 68 +++++++++++++++++++++++++++----------- 1 file changed, 48 insertions(+), 20 deletions(-) diff --git a/test/unit/test_manifest.py b/test/unit/test_manifest.py index 3138dba2e85..24ff18aff9a 100644 --- a/test/unit/test_manifest.py +++ b/test/unit/test_manifest.py @@ -1,13 +1,13 @@ import os import unittest -from unittest import mock - from argparse import Namespace -import copy from collections import namedtuple -from itertools import product from datetime import datetime +from itertools import product +from unittest import mock +from copy import deepcopy +import freezegun import pytest import dbt.flags @@ -27,7 +27,6 @@ Group, RefArgs, ) - from dbt.contracts.graph.unparsed import ( ExposureType, Owner, @@ -35,14 +34,10 @@ MetricFilter, MetricTime, ) - from dbt.events.functions import reset_metadata_vars from dbt.exceptions import AmbiguousResourceNameRefError from dbt.flags import set_from_args - from dbt.node_types import NodeType -import freezegun - from .utils import ( MockMacro, MockDocumentation, @@ -53,7 +48,6 @@ inject_plugin, ) - REQUIRED_PARSED_NODE_KEYS = frozenset( { "alias", @@ -103,7 +97,6 @@ | {"compiled", "extra_ctes_injected", "extra_ctes", "compiled_code", "relation_name"} ) - ENV_KEY_NAME = "KEY" if os.name == "nt" else "key" @@ -408,7 +401,7 @@ def test__no_nodes(self): @freezegun.freeze_time("2018-02-14T09:15:13Z") def test__nested_nodes(self): - nodes = copy.copy(self.nested_nodes) + nodes = deepcopy(self.nested_nodes) manifest = Manifest( nodes=nodes, sources={}, @@ -463,11 +456,11 @@ def test__nested_nodes(self): self.assertEqual(child_map["model.snowplow.events"], []) def test__build_flat_graph(self): - exposures = copy.copy(self.exposures) - metrics = copy.copy(self.metrics) - groups = copy.copy(self.groups) - nodes = copy.copy(self.nested_nodes) - sources = copy.copy(self.sources) + exposures = deepcopy(self.exposures) + metrics = deepcopy(self.metrics) + groups = deepcopy(self.groups) + nodes = deepcopy(self.nested_nodes) + sources = deepcopy(self.sources) manifest = Manifest( nodes=nodes, sources=sources, @@ -588,7 +581,7 @@ def test_get_resource_fqns_empty(self): self.assertEqual(manifest.get_resource_fqns(), {}) def test_get_resource_fqns(self): - nodes = copy.copy(self.nested_nodes) + nodes = deepcopy(self.nested_nodes) nodes["seed.root.seed"] = SeedNode( name="seed", database="dbt", @@ -663,6 +656,41 @@ def test__deepcopy_copies_flat_graph(self): copy = original.deepcopy() self.assertEqual(original.flat_graph, copy.flat_graph) + def test__add_from_artifact(self): + original_nodes = deepcopy(self.nested_nodes) + other_nodes = deepcopy(self.nested_nodes) + + nested2 = other_nodes.pop("model.root.nested") + nested2.name = "nested2" + nested2.alias = "nested2" + nested2.fqn = ["root", "nested2"] + + other_nodes["model.root.nested2"] = nested2 + + for k, v in other_nodes.items(): + v.database = "other_" + v.database + v.schema = "other_" + v.schema + v.alias = "other_" + v.alias + + other_nodes[k] = v + + original_manifest = Manifest(nodes=original_nodes) + other_manifest = Manifest(nodes=other_nodes) + original_manifest.add_from_artifact(other_manifest.writable_manifest()) + + # new node added should not be in original manifest + assert "model.root.nested2" not in original_manifest.nodes + + # old node removed should not have state relation in original manifest + assert original_manifest.nodes["model.root.nested"].state_relation is None + + # for all other nodes, check that state relation is updated + for k, v in original_manifest.nodes.items(): + if k not in ["model.root.nested", "model.root.nested2"]: + self.assertEqual("other_" + v.database, v.state_relation.database) + self.assertEqual("other_" + v.schema, v.state_relation.schema) + self.assertEqual("other_" + v.alias, v.state_relation.alias) + class MixedManifestTest(unittest.TestCase): def setUp(self): @@ -912,7 +940,7 @@ def test__no_nodes(self): @freezegun.freeze_time("2018-02-14T09:15:13Z") def test__nested_nodes(self): - nodes = copy.copy(self.nested_nodes) + nodes = deepcopy(self.nested_nodes) manifest = Manifest( nodes=nodes, sources={}, @@ -964,7 +992,7 @@ def test__nested_nodes(self): self.assertEqual(child_map["model.snowplow.events"], []) def test__build_flat_graph(self): - nodes = copy.copy(self.nested_nodes) + nodes = deepcopy(self.nested_nodes) manifest = Manifest( nodes=nodes, sources={}, From 113097174bb00d61d185be874ac19731181c3b1a Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Mon, 5 Jun 2023 23:47:16 -0700 Subject: [PATCH 3/4] Add changie --- .changes/unreleased/Under the Hood-20230605-234706.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Under the Hood-20230605-234706.yaml diff --git a/.changes/unreleased/Under the Hood-20230605-234706.yaml b/.changes/unreleased/Under the Hood-20230605-234706.yaml new file mode 100644 index 00000000000..f61d849ec1b --- /dev/null +++ b/.changes/unreleased/Under the Hood-20230605-234706.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Create `add_from_artifact` to populate `state_relation` field of nodes +time: 2023-06-05T23:47:06.581326-07:00 +custom: + Author: stu-k aranke + Issue: "7551" From 234264c3d607c688d50ccb75cd534dbfe5d00cba Mon Sep 17 00:00:00 2001 From: Kshitij Aranke Date: Tue, 6 Jun 2023 12:24:44 -0700 Subject: [PATCH 4/4] Address PR comments --- core/dbt/contracts/graph/manifest.py | 9 +-------- test/unit/test_manifest.py | 20 ++++++++++---------- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index e3eeb052ea3..e7251c793a1 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -1148,16 +1148,9 @@ def add_from_artifact( for unique_id, node in other.nodes.items(): current = self.nodes.get(unique_id) if current and (node.resource_type in refables and not node.is_ephemeral): - other_node = other.nodes[unique_id] - state_relation = RelationalNode( - other_node.database, other_node.schema, other_node.alias - ) + state_relation = RelationalNode(node.database, node.schema, node.alias) self.nodes[unique_id] = current.replace(state_relation=state_relation) - # Rebuild the flat_graph, which powers the 'graph' context variable, - # now that we've deferred some nodes - self.build_flat_graph() - # Methods that were formerly in ParseResult def add_macro(self, source_file: SourceFile, macro: Macro): diff --git a/test/unit/test_manifest.py b/test/unit/test_manifest.py index 24ff18aff9a..5a3a23169f5 100644 --- a/test/unit/test_manifest.py +++ b/test/unit/test_manifest.py @@ -2,10 +2,10 @@ import unittest from argparse import Namespace from collections import namedtuple +from copy import deepcopy from datetime import datetime from itertools import product from unittest import mock -from copy import deepcopy import freezegun import pytest @@ -358,7 +358,7 @@ def tearDown(self): reset_metadata_vars() @freezegun.freeze_time("2018-02-14T09:15:13Z") - def test__no_nodes(self): + def test_no_nodes(self): manifest = Manifest( nodes={}, sources={}, @@ -400,7 +400,7 @@ def test__no_nodes(self): ) @freezegun.freeze_time("2018-02-14T09:15:13Z") - def test__nested_nodes(self): + def test_nested_nodes(self): nodes = deepcopy(self.nested_nodes) manifest = Manifest( nodes=nodes, @@ -455,7 +455,7 @@ def test__nested_nodes(self): ) self.assertEqual(child_map["model.snowplow.events"], []) - def test__build_flat_graph(self): + def test_build_flat_graph(self): exposures = deepcopy(self.exposures) metrics = deepcopy(self.metrics) groups = deepcopy(self.groups) @@ -627,7 +627,7 @@ def test_get_resource_fqns(self): resource_fqns = manifest.get_resource_fqns() self.assertEqual(resource_fqns, expect) - def test__deepcopy_copies_flat_graph(self): + def test_deepcopy_copies_flat_graph(self): test_node = ModelNode( name="events", database="dbt", @@ -656,7 +656,7 @@ def test__deepcopy_copies_flat_graph(self): copy = original.deepcopy() self.assertEqual(original.flat_graph, copy.flat_graph) - def test__add_from_artifact(self): + def test_add_from_artifact(self): original_nodes = deepcopy(self.nested_nodes) other_nodes = deepcopy(self.nested_nodes) @@ -686,7 +686,7 @@ def test__add_from_artifact(self): # for all other nodes, check that state relation is updated for k, v in original_manifest.nodes.items(): - if k not in ["model.root.nested", "model.root.nested2"]: + if k != "model.root.nested": self.assertEqual("other_" + v.database, v.state_relation.database) self.assertEqual("other_" + v.schema, v.state_relation.schema) self.assertEqual("other_" + v.alias, v.state_relation.alias) @@ -897,7 +897,7 @@ def tearDown(self): del os.environ["DBT_ENV_CUSTOM_ENV_key"] @freezegun.freeze_time("2018-02-14T09:15:13Z") - def test__no_nodes(self): + def test_no_nodes(self): metadata = ManifestMetadata( generated_at=datetime.utcnow(), invocation_id="01234567-0123-0123-0123-0123456789ab" ) @@ -939,7 +939,7 @@ def test__no_nodes(self): ) @freezegun.freeze_time("2018-02-14T09:15:13Z") - def test__nested_nodes(self): + def test_nested_nodes(self): nodes = deepcopy(self.nested_nodes) manifest = Manifest( nodes=nodes, @@ -991,7 +991,7 @@ def test__nested_nodes(self): ) self.assertEqual(child_map["model.snowplow.events"], []) - def test__build_flat_graph(self): + def test_build_flat_graph(self): nodes = deepcopy(self.nested_nodes) manifest = Manifest( nodes=nodes,