Skip to content

Commit

Permalink
Merge pull request #148 from dbt-labs/children-of-multiple-boundary-n…
Browse files Browse the repository at this point in the history
…odes

Children of multiple boundary nodes
dave-connors-3 authored Aug 25, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents edfc9b5 + dc85cbe commit 83bd564
Showing 8 changed files with 186 additions and 12 deletions.
1 change: 1 addition & 0 deletions dbt_meshify/main.py
Original file line number Diff line number Diff line change
@@ -179,6 +179,7 @@ def connect(
dependency,
project_map[dependency.upstream_project_name],
project_map[dependency.downstream_project_name],
change_set,
)
change_set.extend(changes)
except Exception as e:
5 changes: 4 additions & 1 deletion dbt_meshify/storage/dbt_project_editors.py
Original file line number Diff line number Diff line change
@@ -137,6 +137,7 @@ def initialize(self) -> ChangeSet:
resource = subproject.get_manifest_node(unique_id)
if not resource:
raise KeyError(f"Resource {unique_id} not found in manifest")

if resource.resource_type in ["model", "test", "snapshot", "seed"]:
# ignore generic tests, as moving the yml entry will move the test too
if resource.resource_type == "test" and len(resource.unique_id.split(".")) == 4:
@@ -156,7 +157,9 @@ def initialize(self) -> ChangeSet:
logger.debug(
f"Updating ref functions for children of {resource.unique_id}..."
)
change_set.extend(reference_updater.update_child_refs(resource))
change_set.extend(
reference_updater.update_child_refs(resource, change_set)
)

logger.debug(
f"Moving {resource.unique_id} and associated YML to subproject {subproject.name}..."
28 changes: 24 additions & 4 deletions dbt_meshify/utilities/linker.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
from dataclasses import dataclass
from enum import Enum
from typing import Set, Union
from typing import List, Optional, Set, Union

from dbt.contracts.graph.nodes import CompiledNode, ModelNode, SourceDefinition
from dbt.node_types import AccessType

from dbt_meshify.change import ChangeSet, EntityType, Operation, ResourceChange
from dbt_meshify.change import (
ChangeSet,
EntityType,
FileChange,
Operation,
ResourceChange,
)
from dbt_meshify.dbt_projects import BaseDbtProject, DbtProject, PathedProject
from dbt_meshify.utilities.contractor import Contractor
from dbt_meshify.utilities.dependencies import DependenciesUpdater
from dbt_meshify.utilities.grouper import ResourceGrouper
from dbt_meshify.utilities.references import ReferenceUpdater
from dbt_meshify.utilities.references import ReferenceUpdater, get_latest_file_change


class ProjectDependencyType(str, Enum):
@@ -235,6 +241,7 @@ def resolve_dependency(
dependency: ProjectDependency,
upstream_project: DbtProject,
downstream_project: DbtProject,
current_change_set: Optional[ChangeSet] = None,
) -> ChangeSet:
upstream_manifest_entry = upstream_project.get_manifest_node(dependency.upstream_resource)
if not upstream_manifest_entry:
@@ -314,12 +321,25 @@ def resolve_dependency(
f"{upstream_manifest_entry.unique_id}"
)

if current_change_set:
previous_change = get_latest_file_change(
changeset=current_change_set,
identifier=downstream_manifest_entry.name,
path=downstream_project.resolve_file_path(downstream_manifest_entry),
)

code_to_update = (
previous_change.data
if (previous_change and previous_change.data)
else downstream_manifest_entry.raw_code
)

change_set.add(
reference_updater.generate_reference_update(
project_name=upstream_project.name,
downstream_node=downstream_manifest_entry,
upstream_node=upstream_manifest_entry,
code=downstream_manifest_entry.raw_code,
code=code_to_update,
downstream_project=downstream_project,
)
)
57 changes: 51 additions & 6 deletions dbt_meshify/utilities/references.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re
from typing import Union
from pathlib import Path
from typing import List, Optional, Union

from dbt.contracts.graph.nodes import CompiledNode
from loguru import logger
@@ -8,6 +9,27 @@
from dbt_meshify.dbt_projects import DbtProject, DbtSubProject, PathedProject


def get_latest_file_change(
changeset: ChangeSet,
path: Path,
identifier: str,
entity_type: EntityType = EntityType.Code,
operation: Operation = Operation.Update,
) -> Optional[FileChange]:
previous_changes: List[FileChange] = [
change
for change in changeset.changes
if (
isinstance(change, FileChange)
and change.identifier == identifier
and change.operation == operation
and change.entity_type == entity_type
and change.path == path
)
]
return previous_changes[-1] if previous_changes else None


class ReferenceUpdater:
def __init__(self, project: Union[DbtSubProject, DbtProject]):
self.project = project
@@ -123,12 +145,14 @@ def generate_reference_update(
data=updated_code,
)

def update_child_refs(self, resource: CompiledNode) -> ChangeSet:
def update_child_refs(
self, resource: CompiledNode, current_change_set: Optional[ChangeSet] = None
) -> ChangeSet:
"""Generate a set of FileChanges to update child references"""

if not isinstance(self.project, DbtSubProject):
raise Exception(
"The `update_parent_refs` method requires the calling project to have a parent project to update."
"The `update_child_refs` method requires the calling project to have a parent project to update."
)

change_set = ChangeSet()
@@ -144,11 +168,23 @@ def update_child_refs(self, resource: CompiledNode) -> ChangeSet:
if not hasattr(model_node, "language") or not isinstance(model_node, CompiledNode):
continue

if current_change_set:
previous_change = get_latest_file_change(
changeset=current_change_set,
identifier=model_node.name,
path=self.project.parent_project.resolve_file_path(model_node),
)
code = (
previous_change.data
if (previous_change and previous_change.data)
else model_node.raw_code
)

change = self.generate_reference_update(
project_name=self.project.name,
upstream_node=resource,
downstream_node=model_node,
code=model_node.raw_code,
code=code,
downstream_project=self.project.parent_project,
)

@@ -172,8 +208,6 @@ def update_parent_refs(self, resource: CompiledNode) -> ChangeSet:

change_set = ChangeSet()

code = resource.raw_code

for model in upstream_models:
logger.debug(f"Updating reference to {model} in {resource.name}.")
model_node = self.project.get_manifest_node(model)
@@ -183,6 +217,17 @@ def update_parent_refs(self, resource: CompiledNode) -> ChangeSet:
# Don't process Resources missing a language attribute
if not hasattr(model_node, "language") or not isinstance(model_node, CompiledNode):
continue
previous_change = change_set.changes[-1] if change_set.changes else None

code = (
previous_change.data
if (
previous_change
and isinstance(previous_change, FileChange)
and previous_change.data
)
else resource.raw_code
)

change = self.generate_reference_update(
project_name=self.project.parent_project.name,
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
with

upstream as (
select * from {{ ref('shared_model') }}
),

upstream1 as (
select * from {{ ref('new_model') }}
)

select 1 as id
7 changes: 7 additions & 0 deletions tests/integration/test_connect_command.py
Original file line number Diff line number Diff line change
@@ -86,11 +86,18 @@ def test_connect_package(self, producer_project):

# assert that the source is replaced with a ref
x_proj_ref = "{{ ref('src_proj_a', 'shared_model') }}"
x_proj_ref_2 = "{{ ref('src_proj_a', 'new_model') }}"
child_sql = (
Path(copy_package_consumer_project_path) / "models" / "downstream_model.sql"
).read_text()
assert x_proj_ref in child_sql

child_sql_2 = (
Path(copy_package_consumer_project_path) / "models" / "downstream_model_2.sql"
).read_text()
assert x_proj_ref in child_sql_2
assert x_proj_ref_2 in child_sql_2

# assert that the dependecies yml was created with a pointer to the upstream project
assert (
"src_proj_a"
16 changes: 15 additions & 1 deletion tests/integration/test_dependency_detection.py
Original file line number Diff line number Diff line change
@@ -91,7 +91,21 @@ def test_linker_detects_package_import_dependencies(self, src_proj_a, dest_proj_
downstream_resource="model.dest_proj_a.downstream_model",
downstream_project_name="dest_proj_a",
type=ProjectDependencyType.Package,
)
),
ProjectDependency(
upstream_resource="model.src_proj_a.shared_model",
upstream_project_name="src_proj_a",
downstream_resource="model.dest_proj_a.downstream_model_2",
downstream_project_name="dest_proj_a",
type=ProjectDependencyType.Package,
),
ProjectDependency(
upstream_resource="model.src_proj_a.new_model",
upstream_project_name="src_proj_a",
downstream_resource="model.dest_proj_a.downstream_model_2",
downstream_project_name="dest_proj_a",
type=ProjectDependencyType.Package,
),
}

# This doesn't exist yet as of 1.5.0. We'll test it out once it's a thing.
73 changes: 73 additions & 0 deletions tests/integration/test_split_command.py
Original file line number Diff line number Diff line change
@@ -244,3 +244,76 @@ def test_split_public_leaf_nodes(self, project):
Path(dest_project_path) / "my_new_project" / "models" / "marts" / "__models.yml"
).read_text()
)

def test_split_upstream_multiple_boundary_parents(self):
"""
Test that splitting out a project downstream of the base project splits as expected
"""
setup_test_project(src_project_path, dest_project_path)
runner = CliRunner()
result = runner.invoke(
cli,
[
"split",
"my_new_project",
"--project-path",
dest_project_path,
"--select",
"+stg_orders",
"+stg_order_items",
],
)

assert result.exit_code == 0
# selected model is moved to subdirectory
assert (
Path(dest_project_path) / "my_new_project" / "models" / "staging" / "stg_orders.sql"
).exists()
x_proj_ref_1 = "{{ ref('my_new_project', 'stg_orders') }}"
x_proj_ref_2 = "{{ ref('my_new_project', 'stg_order_items') }}"
child_sql = (Path(dest_project_path) / "models" / "marts" / "orders.sql").read_text()
# downstream model has all cross project refs updated
assert x_proj_ref_1 in child_sql
assert x_proj_ref_2 in child_sql

teardown_test_project(dest_project_path)

def test_split_downstream_multiple_boundary_parents(self):
"""
Test that splitting out a project downstream of the base project splits as expected
"""
setup_test_project(src_project_path, dest_project_path)
runner = CliRunner()
result = runner.invoke(
cli,
[
"split",
"my_new_project",
"--project-path",
dest_project_path,
"--select",
"orders+",
],
)

assert result.exit_code == 0
# selected model is moved to subdirectory
assert (
Path(dest_project_path) / "my_new_project" / "models" / "marts" / "orders.sql"
).exists()
x_proj_ref_1 = "{{ ref('split_proj', 'stg_orders') }}"
x_proj_ref_2 = "{{ ref('split_proj', 'stg_order_items') }}"
x_proj_ref_3 = "{{ ref('split_proj', 'stg_products') }}"
x_proj_ref_4 = "{{ ref('split_proj', 'stg_locations') }}"
x_proj_ref_5 = "{{ ref('split_proj', 'stg_supplies') }}"
child_sql = (
Path(dest_project_path) / "my_new_project" / "models" / "marts" / "orders.sql"
).read_text()
# downstream model has all cross project refs updated
assert x_proj_ref_1 in child_sql
assert x_proj_ref_2 in child_sql
assert x_proj_ref_3 in child_sql
assert x_proj_ref_4 in child_sql
assert x_proj_ref_5 in child_sql

teardown_test_project(dest_project_path)

0 comments on commit 83bd564

Please sign in to comment.