Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
dave-connors-3 committed Jul 7, 2023
2 parents e4e80f9 + 764ad1f commit 538f7fd
Show file tree
Hide file tree
Showing 29 changed files with 1,798 additions and 455 deletions.
7 changes: 7 additions & 0 deletions dbt_meshify/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
help="The path to the dbt project to operate on. Defaults to the current directory.",
)

create_path = click.option(
"--create-path",
type=click.Path(exists=True),
default=None,
help="The path to create the new dbt project. Defaults to the name argument supplied.",
)

exclude = click.option(
"--exclude",
"-e",
Expand Down
3 changes: 3 additions & 0 deletions dbt_meshify/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dbt.cli.main import dbtRunner
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.results import CatalogArtifact
from loguru import logger


class Dbt:
Expand All @@ -25,6 +26,7 @@ def invoke(
return result.result

def parse(self, directory: os.PathLike):
logger.info("Executing dbt parse...")
return self.invoke(directory, ["--quiet", "parse"])

def ls(
Expand All @@ -50,5 +52,6 @@ def docs_generate(self, directory: os.PathLike) -> CatalogArtifact:
"""
Excute dbt docs generate with the given arguments
"""
logger.info("Generating catalog with dbt docs generate...")
args = ["--quiet", "docs", "generate"]
return self.invoke(directory, args)
129 changes: 107 additions & 22 deletions dbt_meshify/dbt_projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,27 @@

import yaml
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import ManifestNode, ModelNode, SourceDefinition
from dbt.contracts.graph.nodes import (
Documentation,
Exposure,
Group,
Macro,
ManifestNode,
ModelNode,
Resource,
SourceDefinition,
)
from dbt.contracts.project import Project
from dbt.contracts.results import CatalogArtifact, CatalogTable
from dbt.graph import Graph
from dbt.node_types import NodeType

from dbt_meshify.dbt import Dbt
from dbt_meshify.storage.file_content_editors import (
DbtMeshConstructor,
filter_empty_dict_items,
)
from dbt_meshify.storage.file_manager import DbtFileManager

logger = logging.getLogger()

Expand Down Expand Up @@ -100,7 +115,8 @@ def installed_packages(self) -> Set[str]:
if item.package_name:
_hash = hashlib.md5()
_hash.update(item.package_name.encode("utf-8"))
project_packages.append(_hash.hexdigest())
if _hash.hexdigest() != self.manifest.metadata.project_id:
project_packages.append(_hash.hexdigest())
return set(project_packages)

@property
Expand Down Expand Up @@ -150,9 +166,20 @@ def get_catalog_entry(self, unique_id: str) -> Optional[CatalogTable]:
"""Returns the catalog entry for a model in the dbt project's catalog"""
return self.catalog.nodes.get(unique_id)

def get_manifest_node(self, unique_id: str) -> Optional[ManifestNode]:
"""Returns the catalog entry for a model in the dbt project's catalog"""
return self.manifest.nodes.get(unique_id)
def get_manifest_node(self, unique_id: str) -> Optional[Resource]:
"""Returns the manifest entry for a resource in the dbt project's manifest"""
if unique_id.split(".")[0] in [
"model",
"seed",
"snapshot",
"test",
"analysis",
"snapshot",
]:
return self.manifest.nodes.get(unique_id)
pluralized = NodeType(unique_id.split(".")[0]).pluralize()
resources = getattr(self.manifest, pluralized)
return resources.get(unique_id)


class DbtProject(BaseDbtProject):
Expand Down Expand Up @@ -216,10 +243,13 @@ def split(
project_name: str,
select: str,
exclude: Optional[str] = None,
selector: Optional[str] = None,
) -> "DbtSubProject":
"""Create a new DbtSubProject using NodeSelection syntax."""

subproject_resources = self.select_resources(select, exclude)
subproject_resources = self.select_resources(
select=select, exclude=exclude, selector=selector, output_key="unique_id"
)

# Construct a new project and inject the new manifest
subproject = DbtSubProject(
Expand All @@ -242,24 +272,86 @@ class DbtSubProject(BaseDbtProject):
def __init__(self, name: str, parent_project: DbtProject, resources: Set[str]):
self.name = name
self.resources = resources
self.parent = parent_project
self.parent_project = parent_project
self.path = parent_project.path / Path(name)

self.manifest = parent_project.manifest.deepcopy()
# self.manifest = parent_project.manifest.deepcopy()
# i am running into a bug with the core deepcopy -- checking with michelle
self.manifest = copy.deepcopy(parent_project.manifest)
self.project = copy.deepcopy(parent_project.project)
self.catalog = parent_project.catalog
self.custom_macros = self._get_custom_macros()
self.groups = self._get_indirect_groups()

super().__init__(self.manifest, self.project, self.catalog)
self._rename_project()

def select_resources(self, select: str, exclude: Optional[str] = None) -> Set[str]:
super().__init__(self.manifest, self.project, self.catalog, self.name)

def _rename_project(self) -> None:
"""
edits the project yml to take any instance of the parent project name and update it to the subproject name
"""
project_dict = self.project.to_dict()
for key in [resource.pluralize() for resource in NodeType]:
if self.parent_project.name in project_dict.get(key, {}).keys():
project_dict[key][self.name] = project_dict[key].pop(self.parent_project.name)
project_dict["name"] = self.name
self.project = Project.from_dict(project_dict)

def _get_custom_macros(self) -> Set[str]:
"""
get a set of macro unique_ids for all the selected resources
"""
macros_set = set()
for unique_id in self.resources:
resource = self.get_manifest_node(unique_id)
if not resource or any(
isinstance(resource, class_) for class_ in [Documentation, Group]
):
continue
macros = resource.depends_on.macros # type: ignore
project_macros = [
macro
for macro in macros
if hashlib.md5((macro.split(".")[1]).encode()).hexdigest()
== self.manifest.metadata.project_id
]
macros_set.update(project_macros)
return macros_set

def _get_indirect_groups(self) -> Set[str]:
"""
get a set of group unique_ids for all the selected resources
"""
groups = set()
for unique_id in self.resources:
resource = self.get_manifest_node(unique_id) # type: ignore
if not resource or any(
isinstance(resource, class_)
for class_ in [Documentation, Group, Exposure, SourceDefinition, Macro]
):
continue
group = resource.group # type: ignore
if group:
group_unique_id = f"group.{self.parent_project.name}.{group}"
groups.update({group_unique_id})
return groups

def select_resources(
self,
select: str,
exclude: Optional[str] = None,
selector: Optional[str] = None,
output_key: Optional[str] = None,
) -> Set[str]:
"""
Select resources using the parent DbtProject and filtering down to only include resources in this
subproject.
"""
args = ["--select", select]
if exclude:
args.extend(["--exclude", exclude])

results = self.parent.dbt.ls(self.parent.path, args)
results = self.parent_project.select_resources(
select=select, exclude=exclude, selector=selector, output_key=output_key
)

return set(results) - self.resources

Expand All @@ -276,7 +368,7 @@ def split(
# Construct a new project and inject the new manifest
subproject = DbtSubProject(
name=project_name,
parent_project=copy.deepcopy(self.parent),
parent_project=copy.deepcopy(self.parent_project),
resources=subproject_resources,
)

Expand All @@ -285,13 +377,6 @@ def split(

return subproject

def initialize(self, target_directory: os.PathLike):
"""Initialize this subproject as a full dbt project at the provided `target_directory`."""

# TODO: Implement project initialization

raise NotImplementedError


class DbtProjectHolder:
def __init__(self) -> None:
Expand Down
Loading

0 comments on commit 538f7fd

Please sign in to comment.