From c1d9b306ee776a03c6544f2d5316df8672b18801 Mon Sep 17 00:00:00 2001 From: Dave Connors Date: Tue, 22 Aug 2023 10:57:59 -0500 Subject: [PATCH 1/5] pass current change set to each dependency operation in connect --- dbt_meshify/main.py | 1 + dbt_meshify/utilities/linker.py | 34 ++++++++++++++++++++++++++++++--- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/dbt_meshify/main.py b/dbt_meshify/main.py index 8293b35..4456b07 100644 --- a/dbt_meshify/main.py +++ b/dbt_meshify/main.py @@ -179,6 +179,7 @@ def connect( dependency, project_map[dependency.upstream_project_name], project_map[dependency.downstream_project_name], + change_set, ) change_set.extend(changes) except Exception as e: diff --git a/dbt_meshify/utilities/linker.py b/dbt_meshify/utilities/linker.py index 775aea9..96bb5c4 100644 --- a/dbt_meshify/utilities/linker.py +++ b/dbt_meshify/utilities/linker.py @@ -1,11 +1,17 @@ from dataclasses import dataclass from enum import Enum -from typing import Set, Union +from typing import List, Optional, Set, Union from dbt.contracts.graph.nodes import CompiledNode, ModelNode, SourceDefinition from dbt.node_types import AccessType -from dbt_meshify.change import ChangeSet, EntityType, Operation, ResourceChange +from dbt_meshify.change import ( + ChangeSet, + EntityType, + FileChange, + Operation, + ResourceChange, +) from dbt_meshify.dbt_projects import BaseDbtProject, DbtProject, PathedProject from dbt_meshify.utilities.contractor import Contractor from dbt_meshify.utilities.dependencies import DependenciesUpdater @@ -235,6 +241,7 @@ def resolve_dependency( dependency: ProjectDependency, upstream_project: DbtProject, downstream_project: DbtProject, + current_change_set: Optional[ChangeSet] = None, ) -> ChangeSet: upstream_manifest_entry = upstream_project.get_manifest_node(dependency.upstream_resource) if not upstream_manifest_entry: @@ -314,12 +321,33 @@ def resolve_dependency( f"{upstream_manifest_entry.unique_id}" ) + if current_change_set: + previous_changes: List[FileChange] = [ + change + for change in current_change_set.changes + if ( + isinstance(change, FileChange) + and change.identifier == downstream_manifest_entry.name + and change.operation == Operation.Update + and change.entity_type == EntityType.Code + and change.path + == downstream_project.resolve_file_path(downstream_manifest_entry) + ) + ] + previous_change = previous_changes[-1] if previous_changes else None + + code_to_update = ( + previous_change.data + if (previous_change and previous_change.data) + else downstream_manifest_entry.raw_code + ) + change_set.add( reference_updater.generate_reference_update( project_name=upstream_project.name, downstream_node=downstream_manifest_entry, upstream_node=upstream_manifest_entry, - code=downstream_manifest_entry.raw_code, + code=code_to_update, downstream_project=downstream_project, ) ) From 7160159eb0668aa6810756e09254fb9d4c35710d Mon Sep 17 00:00:00 2001 From: Dave Connors Date: Tue, 22 Aug 2023 11:03:16 -0500 Subject: [PATCH 2/5] use state of previous changes when updating children and parents during split --- dbt_meshify/storage/dbt_project_editors.py | 5 ++- dbt_meshify/utilities/references.py | 40 ++++++++++++++++++---- 2 files changed, 38 insertions(+), 7 deletions(-) diff --git a/dbt_meshify/storage/dbt_project_editors.py b/dbt_meshify/storage/dbt_project_editors.py index e49b715..7730b41 100644 --- a/dbt_meshify/storage/dbt_project_editors.py +++ b/dbt_meshify/storage/dbt_project_editors.py @@ -138,6 +138,7 @@ def initialize(self) -> ChangeSet: resource = subproject.get_manifest_node(unique_id) if not resource: raise KeyError(f"Resource {unique_id} not found in manifest") + if resource.resource_type in ["model", "test", "snapshot", "seed"]: # ignore generic tests, as moving the yml entry will move the test too if resource.resource_type == "test" and len(resource.unique_id.split(".")) == 4: @@ -157,7 +158,9 @@ def initialize(self) -> ChangeSet: logger.debug( f"Updating ref functions for children of {resource.unique_id}..." ) - change_set.extend(reference_updater.update_child_refs(resource)) + change_set.extend( + reference_updater.update_child_refs(resource, change_set) + ) logger.debug( f"Moving {resource.unique_id} and associated YML to subproject {subproject.name}..." diff --git a/dbt_meshify/utilities/references.py b/dbt_meshify/utilities/references.py index 3231b73..7d736ba 100644 --- a/dbt_meshify/utilities/references.py +++ b/dbt_meshify/utilities/references.py @@ -1,5 +1,5 @@ import re -from typing import Union +from typing import List, Optional, Union from dbt.contracts.graph.nodes import CompiledNode from loguru import logger @@ -123,12 +123,14 @@ def generate_reference_update( data=updated_code, ) - def update_child_refs(self, resource: CompiledNode) -> ChangeSet: + def update_child_refs( + self, resource: CompiledNode, current_change_set: Optional[ChangeSet] = None + ) -> ChangeSet: """Generate a set of FileChanges to update child references""" if not isinstance(self.project, DbtSubProject): raise Exception( - "The `update_parent_refs` method requires the calling project to have a parent project to update." + "The `update_child_refs` method requires the calling project to have a parent project to update." ) change_set = ChangeSet() @@ -142,11 +144,28 @@ def update_child_refs(self, resource: CompiledNode) -> ChangeSet: if not hasattr(model_node, "language") or not isinstance(model_node, CompiledNode): continue + if current_change_set: + previous_changes: List[FileChange] = [ + change + for change in current_change_set.changes + if ( + isinstance(change, FileChange) + and change.identifier == model_node.name + and change.operation == Operation.Update + and change.entity_type == EntityType.Code + and change.path + == self.project.parent_project.resolve_file_path(model_node) + ) + ] + previous_change = previous_changes[-1] if previous_changes else None + change = self.generate_reference_update( project_name=self.project.name, upstream_node=resource, downstream_node=model_node, - code=model_node.raw_code, + code=previous_change.data + if (previous_change and previous_change.data) + else model_node.raw_code, downstream_project=self.project.parent_project, ) @@ -170,8 +189,6 @@ def update_parent_refs(self, resource: CompiledNode) -> ChangeSet: change_set = ChangeSet() - code = resource.raw_code - for model in upstream_models: logger.debug(f"Updating reference to {model} in {resource.name}.") model_node = self.project.get_manifest_node(model) @@ -181,6 +198,17 @@ def update_parent_refs(self, resource: CompiledNode) -> ChangeSet: # Don't process Resources missing a language attribute if not hasattr(model_node, "language") or not isinstance(model_node, CompiledNode): continue + previous_change = change_set.changes[-1] if change_set.changes else None + + code = ( + previous_change.data + if ( + previous_change + and isinstance(previous_change, FileChange) + and previous_change.data + ) + else resource.raw_code + ) change = self.generate_reference_update( project_name=self.project.parent_project.name, From 09cfb2231baf89121d294d55cfb3758693c2ae20 Mon Sep 17 00:00:00 2001 From: Dave Connors Date: Tue, 22 Aug 2023 11:03:27 -0500 Subject: [PATCH 3/5] add test cases --- .../dest_proj_a/models/downstream_model_2.sql | 11 +++ tests/integration/test_connect_command.py | 7 ++ tests/integration/test_split_command.py | 73 +++++++++++++++++++ 3 files changed, 91 insertions(+) create mode 100644 test-projects/source-hack/dest_proj_a/models/downstream_model_2.sql diff --git a/test-projects/source-hack/dest_proj_a/models/downstream_model_2.sql b/test-projects/source-hack/dest_proj_a/models/downstream_model_2.sql new file mode 100644 index 0000000..7c4f1d3 --- /dev/null +++ b/test-projects/source-hack/dest_proj_a/models/downstream_model_2.sql @@ -0,0 +1,11 @@ +with + +upstream as ( + select * from {{ ref('shared_model') }} +), + +upstream1 as ( + select * from {{ ref('new_model') }} +) + +select 1 as id diff --git a/tests/integration/test_connect_command.py b/tests/integration/test_connect_command.py index 9988f80..cb0919e 100644 --- a/tests/integration/test_connect_command.py +++ b/tests/integration/test_connect_command.py @@ -86,11 +86,18 @@ def test_connect_package(self, producer_project): # assert that the source is replaced with a ref x_proj_ref = "{{ ref('src_proj_a', 'shared_model') }}" + x_proj_ref_2 = "{{ ref('src_proj_a', 'new_model') }}" child_sql = ( Path(copy_package_consumer_project_path) / "models" / "downstream_model.sql" ).read_text() assert x_proj_ref in child_sql + child_sql_2 = ( + Path(copy_package_consumer_project_path) / "models" / "downstream_model_2.sql" + ).read_text() + assert x_proj_ref in child_sql_2 + assert x_proj_ref_2 in child_sql_2 + # assert that the dependecies yml was created with a pointer to the upstream project assert ( "src_proj_a" diff --git a/tests/integration/test_split_command.py b/tests/integration/test_split_command.py index abfd653..6f6ca83 100644 --- a/tests/integration/test_split_command.py +++ b/tests/integration/test_split_command.py @@ -214,3 +214,76 @@ def test_split_project_cycle(self): assert result.exit_code == 1 teardown_test_project(dest_project_path) + + def test_split_upstream_multiple_boundary_parents(self): + """ + Test that splitting out a project downstream of the base project splits as expected + """ + setup_test_project(src_project_path, dest_project_path) + runner = CliRunner() + result = runner.invoke( + cli, + [ + "split", + "my_new_project", + "--project-path", + dest_project_path, + "--select", + "+stg_orders", + "+stg_order_items", + ], + ) + + assert result.exit_code == 0 + # selected model is moved to subdirectory + assert ( + Path(dest_project_path) / "my_new_project" / "models" / "staging" / "stg_orders.sql" + ).exists() + x_proj_ref_1 = "{{ ref('my_new_project', 'stg_orders') }}" + x_proj_ref_2 = "{{ ref('my_new_project', 'stg_order_items') }}" + child_sql = (Path(dest_project_path) / "models" / "marts" / "orders.sql").read_text() + # downstream model has all cross project refs updated + assert x_proj_ref_1 in child_sql + assert x_proj_ref_2 in child_sql + + teardown_test_project(dest_project_path) + + def test_split_downstream_multiple_boundary_parents(self): + """ + Test that splitting out a project downstream of the base project splits as expected + """ + setup_test_project(src_project_path, dest_project_path) + runner = CliRunner() + result = runner.invoke( + cli, + [ + "split", + "my_new_project", + "--project-path", + dest_project_path, + "--select", + "orders+", + ], + ) + + assert result.exit_code == 0 + # selected model is moved to subdirectory + assert ( + Path(dest_project_path) / "my_new_project" / "models" / "marts" / "orders.sql" + ).exists() + x_proj_ref_1 = "{{ ref('split_proj', 'stg_orders') }}" + x_proj_ref_2 = "{{ ref('split_proj', 'stg_order_items') }}" + x_proj_ref_3 = "{{ ref('split_proj', 'stg_products') }}" + x_proj_ref_4 = "{{ ref('split_proj', 'stg_locations') }}" + x_proj_ref_5 = "{{ ref('split_proj', 'stg_supplies') }}" + child_sql = ( + Path(dest_project_path) / "my_new_project" / "models" / "marts" / "orders.sql" + ).read_text() + # downstream model has all cross project refs updated + assert x_proj_ref_1 in child_sql + assert x_proj_ref_2 in child_sql + assert x_proj_ref_3 in child_sql + assert x_proj_ref_4 in child_sql + assert x_proj_ref_5 in child_sql + + teardown_test_project(dest_project_path) From 377cdf83e3425f19bb67dd675b2dd192e48a9e77 Mon Sep 17 00:00:00 2001 From: Dave Connors Date: Tue, 22 Aug 2023 11:14:09 -0500 Subject: [PATCH 4/5] update dependecy tests with new package dependencies --- tests/integration/test_dependency_detection.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_dependency_detection.py b/tests/integration/test_dependency_detection.py index ea7a9b4..7d3f5bc 100644 --- a/tests/integration/test_dependency_detection.py +++ b/tests/integration/test_dependency_detection.py @@ -91,7 +91,21 @@ def test_linker_detects_package_import_dependencies(self, src_proj_a, dest_proj_ downstream_resource="model.dest_proj_a.downstream_model", downstream_project_name="dest_proj_a", type=ProjectDependencyType.Package, - ) + ), + ProjectDependency( + upstream_resource="model.src_proj_a.shared_model", + upstream_project_name="src_proj_a", + downstream_resource="model.dest_proj_a.downstream_model_2", + downstream_project_name="dest_proj_a", + type=ProjectDependencyType.Package, + ), + ProjectDependency( + upstream_resource="model.src_proj_a.new_model", + upstream_project_name="src_proj_a", + downstream_resource="model.dest_proj_a.downstream_model_2", + downstream_project_name="dest_proj_a", + type=ProjectDependencyType.Package, + ), } # This doesn't exist yet as of 1.5.0. We'll test it out once it's a thing. From dc85cbe9e197ce5244e3e34c7ec5df46032caca7 Mon Sep 17 00:00:00 2001 From: Dave Connors Date: Fri, 25 Aug 2023 09:08:37 -0500 Subject: [PATCH 5/5] tidy uo change filter to a function --- dbt_meshify/utilities/linker.py | 20 ++++-------- dbt_meshify/utilities/references.py | 49 +++++++++++++++++++---------- 2 files changed, 39 insertions(+), 30 deletions(-) diff --git a/dbt_meshify/utilities/linker.py b/dbt_meshify/utilities/linker.py index 96bb5c4..0eee5b3 100644 --- a/dbt_meshify/utilities/linker.py +++ b/dbt_meshify/utilities/linker.py @@ -16,7 +16,7 @@ from dbt_meshify.utilities.contractor import Contractor from dbt_meshify.utilities.dependencies import DependenciesUpdater from dbt_meshify.utilities.grouper import ResourceGrouper -from dbt_meshify.utilities.references import ReferenceUpdater +from dbt_meshify.utilities.references import ReferenceUpdater, get_latest_file_change class ProjectDependencyType(str, Enum): @@ -322,19 +322,11 @@ def resolve_dependency( ) if current_change_set: - previous_changes: List[FileChange] = [ - change - for change in current_change_set.changes - if ( - isinstance(change, FileChange) - and change.identifier == downstream_manifest_entry.name - and change.operation == Operation.Update - and change.entity_type == EntityType.Code - and change.path - == downstream_project.resolve_file_path(downstream_manifest_entry) - ) - ] - previous_change = previous_changes[-1] if previous_changes else None + previous_change = get_latest_file_change( + changeset=current_change_set, + identifier=downstream_manifest_entry.name, + path=downstream_project.resolve_file_path(downstream_manifest_entry), + ) code_to_update = ( previous_change.data diff --git a/dbt_meshify/utilities/references.py b/dbt_meshify/utilities/references.py index 7d736ba..9f7c32a 100644 --- a/dbt_meshify/utilities/references.py +++ b/dbt_meshify/utilities/references.py @@ -1,4 +1,5 @@ import re +from pathlib import Path from typing import List, Optional, Union from dbt.contracts.graph.nodes import CompiledNode @@ -8,6 +9,27 @@ from dbt_meshify.dbt_projects import DbtProject, DbtSubProject, PathedProject +def get_latest_file_change( + changeset: ChangeSet, + path: Path, + identifier: str, + entity_type: EntityType = EntityType.Code, + operation: Operation = Operation.Update, +) -> Optional[FileChange]: + previous_changes: List[FileChange] = [ + change + for change in changeset.changes + if ( + isinstance(change, FileChange) + and change.identifier == identifier + and change.operation == operation + and change.entity_type == entity_type + and change.path == path + ) + ] + return previous_changes[-1] if previous_changes else None + + class ReferenceUpdater: def __init__(self, project: Union[DbtSubProject, DbtProject]): self.project = project @@ -145,27 +167,22 @@ def update_child_refs( continue if current_change_set: - previous_changes: List[FileChange] = [ - change - for change in current_change_set.changes - if ( - isinstance(change, FileChange) - and change.identifier == model_node.name - and change.operation == Operation.Update - and change.entity_type == EntityType.Code - and change.path - == self.project.parent_project.resolve_file_path(model_node) - ) - ] - previous_change = previous_changes[-1] if previous_changes else None + previous_change = get_latest_file_change( + changeset=current_change_set, + identifier=model_node.name, + path=self.project.parent_project.resolve_file_path(model_node), + ) + code = ( + previous_change.data + if (previous_change and previous_change.data) + else model_node.raw_code + ) change = self.generate_reference_update( project_name=self.project.name, upstream_node=resource, downstream_node=model_node, - code=previous_change.data - if (previous_change and previous_change.data) - else model_node.raw_code, + code=code, downstream_project=self.project.parent_project, )