From 00ac96cf7dd3c1a8259349be334233982518a30b Mon Sep 17 00:00:00 2001 From: Rob Freedy Date: Thu, 28 Mar 2024 14:14:54 -0400 Subject: [PATCH 1/7] Initial dbt_build_task and requirements update --- prefect_dbt/tasks.py | 39 +++++++++++++++++++++++++++++++++++++++ requirements.txt | 2 +- 2 files changed, 40 insertions(+), 1 deletion(-) create mode 100644 prefect_dbt/tasks.py diff --git a/prefect_dbt/tasks.py b/prefect_dbt/tasks.py new file mode 100644 index 0000000..87832ee --- /dev/null +++ b/prefect_dbt/tasks.py @@ -0,0 +1,39 @@ +import os +from pathlib import Path, PosixPath +from prefect import task, get_run_logger +from typing import Any, Dict, List, Optional, Union + +from pydantic import VERSION as PYDANTIC_VERSION + +from dbt.cli.main import dbtRunner, dbtRunnerResult +from prefect_dbt.cli.credentials import DbtCliProfile + +if PYDANTIC_VERSION.startswith("2."): + from pydantic.v1 import Field, validator +else: + from pydantic import Field, validator + +@task +def dbt_build_task( + profiles_dir: Optional[Union[Path, str]] = None, + project_dir: Optional[Union[Path, str]] = None, + overwrite_profiles: bool = False, + dbt_cli_profile: Optional[DbtCliProfile] = None, + tags: Optional[List[str]] = None, + **shell_run_command_kwargs: Dict[str, Any], +): + logger = get_run_logger() + logger.info("Running dbt build task.") + # initialize + dbt = dbtRunner() + + # create CLI args as a list of strings + cli_args = ["build"] + + # run the command + res: dbtRunnerResult = dbt.invoke(cli_args) + + # inspect the results + for r in res.result: + logger.info(f"{r.node.name}: {r.status}") + diff --git a/requirements.txt b/requirements.txt index 3eebf47..28ade61 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -prefect>=2.13.5 +prefect>=2.16.6 dbt_core>=1.1.1 prefect_shell>=0.1.4 sgqlc>=16.0.0 From 7c27069d9b83aa4ff3f6d5150ead035057321aab Mon Sep 17 00:00:00 2001 From: Rob Freedy Date: Fri, 29 Mar 2024 09:40:28 -0400 Subject: [PATCH 2/7] Init tests, profile yaml block, and markdown artifact --- prefect_dbt/cli/credentials.py | 11 +++++++++++ prefect_dbt/tasks.py | 13 +++++++++---- tests/test_tasks.py | 0 3 files changed, 20 insertions(+), 4 deletions(-) create mode 100644 tests/test_tasks.py diff --git a/prefect_dbt/cli/credentials.py b/prefect_dbt/cli/credentials.py index 34c9663..19a6cd3 100644 --- a/prefect_dbt/cli/credentials.py +++ b/prefect_dbt/cli/credentials.py @@ -156,3 +156,14 @@ def get_profile(self) -> Dict[str, Any]: }, } return profile + + +class DbtYamlProfile(Block): + + _block_type_name = "dbt YAML Profile" + _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/5zE9lxfzBHjw3tnEup4wWL/9a001902ed43a84c6c96d23b24622e19/dbt-bit_tm.png?h=250" # noqa + _documentation_url = "https://prefecthq.github.io/prefect-dbt/cli/credentials/#prefect_dbt.cli.credentials.DbtCliProfile" # noqa + + yaml: str = Field( + default=..., description="Profile name used for populating profiles.yml." + ) \ No newline at end of file diff --git a/prefect_dbt/tasks.py b/prefect_dbt/tasks.py index 87832ee..c47e3ae 100644 --- a/prefect_dbt/tasks.py +++ b/prefect_dbt/tasks.py @@ -1,6 +1,7 @@ import os -from pathlib import Path, PosixPath +from pathlib import Path from prefect import task, get_run_logger +from prefect.artifacts import create_markdown_artifact from typing import Any, Dict, List, Optional, Union from pydantic import VERSION as PYDANTIC_VERSION @@ -33,7 +34,11 @@ def dbt_build_task( # run the command res: dbtRunnerResult = dbt.invoke(cli_args) - # inspect the results - for r in res.result: - logger.info(f"{r.node.name}: {r.status}") + markdown = f""" + result: {res.result} + """ + + create_markdown_artifact( + markdown=markdown, + ) diff --git a/tests/test_tasks.py b/tests/test_tasks.py new file mode 100644 index 0000000..e69de29 From 1579acca6da82bdb20b5559b75ffcba8e93f2259 Mon Sep 17 00:00:00 2001 From: Rob Freedy Date: Fri, 12 Apr 2024 10:22:19 -0400 Subject: [PATCH 3/7] WIP, build task and profile improvements --- prefect_dbt/cli/credentials.py | 53 ++++++++++++++++++-------- prefect_dbt/cli/tasks.py | 68 ++++++++++++++++++++++++++++++++++ prefect_dbt/tasks.py | 44 ---------------------- 3 files changed, 106 insertions(+), 59 deletions(-) create mode 100644 prefect_dbt/cli/tasks.py delete mode 100644 prefect_dbt/tasks.py diff --git a/prefect_dbt/cli/credentials.py b/prefect_dbt/cli/credentials.py index 19a6cd3..88019a5 100644 --- a/prefect_dbt/cli/credentials.py +++ b/prefect_dbt/cli/credentials.py @@ -1,13 +1,16 @@ """Module containing credentials for interacting with dbt CLI""" -from typing import Any, Dict, Optional, Union +from pathlib import Path +from typing import Any, Dict, Optional, Type, Union +from typing_extensions import Self from prefect.blocks.core import Block from pydantic import VERSION as PYDANTIC_VERSION +import yaml if PYDANTIC_VERSION.startswith("2."): - from pydantic.v1 import Field + from pydantic.v1 import Field, validator else: - from pydantic import Field + from pydantic import Field, validator from prefect_dbt.cli.configs import GlobalConfigs, TargetConfigs @@ -26,6 +29,7 @@ except ImportError: PostgresTargetConfigs = None +DEFAULT_PROFILE_PATH = "~/.dbt/profiles.yml" class DbtCliProfile(Block): """ @@ -141,6 +145,36 @@ class DbtCliProfile(Block): ), ) + @validator("config", pre=True) + def parse_yaml_config(cls, value): + if isinstance(value, str): + return yaml.safe_load(value) + return value + + @classmethod + def from_file(cls: Type[Self], path: Path = None, context_name: str = None) -> Self: + """ + Create DBTCLIProfile blocks from a dbt profile file. + + By default, the default profile localtion is used (~/.dbt/profiles.yml). + + An alternative file or context may be specified. + + The entire config file will be loaded and stored. + """ + + path = Path(path or DEFAULT_PROFILE_PATH) + path = path.expanduser().resolve() + + # Load the entire config file + profile_file_contents = path.read_text() + config_dict = yaml.safe_load(profile_file_contents) + + print(config_dict) + + return cls(config=config_dict, context_name=context_name) + + def get_profile(self) -> Dict[str, Any]: """ Returns the dbt profile, likely used for writing to profiles.yml. @@ -155,15 +189,4 @@ def get_profile(self) -> Dict[str, Any]: "outputs": {self.target: self.target_configs.get_configs()}, }, } - return profile - - -class DbtYamlProfile(Block): - - _block_type_name = "dbt YAML Profile" - _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/5zE9lxfzBHjw3tnEup4wWL/9a001902ed43a84c6c96d23b24622e19/dbt-bit_tm.png?h=250" # noqa - _documentation_url = "https://prefecthq.github.io/prefect-dbt/cli/credentials/#prefect_dbt.cli.credentials.DbtCliProfile" # noqa - - yaml: str = Field( - default=..., description="Profile name used for populating profiles.yml." - ) \ No newline at end of file + return profile \ No newline at end of file diff --git a/prefect_dbt/cli/tasks.py b/prefect_dbt/cli/tasks.py new file mode 100644 index 0000000..5e7b619 --- /dev/null +++ b/prefect_dbt/cli/tasks.py @@ -0,0 +1,68 @@ +import os +import json +from pathlib import Path +from prefect import task, get_run_logger +from prefect.artifacts import create_markdown_artifact +from typing import Any, Dict, List, Optional, Union + +from pydantic import VERSION as PYDANTIC_VERSION + +from dbt.cli.main import dbtRunner, dbtRunnerResult +from dbt.contracts.results import RunStatus +from prefect_dbt.cli.credentials import DbtCliProfile + +if PYDANTIC_VERSION.startswith("2."): + from pydantic.v1 import Field, validator +else: + from pydantic import Field, validator + +@task +def dbt_build_task( + project_dir: str, + artifact_key: str = "dbt-build-task-summary" +): + logger = get_run_logger() + logger.info("Running dbt build task.") + # initialize + dbt = dbtRunner() + + # create CLI args as a list of strings + cli_args = ["build"] + + # run the command + res: dbtRunnerResult = dbt.invoke(cli_args) + + if res.exception is not None: + logger.error(f"dbt build task failed with exception: {res.exception}") + raise res.exception + + if res.success: + logger.info(f"dbt build task succeeded.") + else: + logger.error(f"dbt build task failed.") + + #Create Summary Markdown Artifact + successful_runs = [] + failed_runs = [] + skipped_runs = [] + for r in res.result.results: + match r.status: + case RunStatus.Success: + successful_runs.append(r) + case RunStatus.Error: + failed_runs.append(r) + case RunStatus.Skipped: + skipped_runs.append(r) + + successful_runs_str = "\n".join([f"{r.node.name}" for r in successful_runs]) + failed_runs_str = "\n".join([f"{r.node.name}" for r in failed_runs]) + skipped_runs_str = "\n".join([f"{r.node.name}" for r in skipped_runs]) + + markdown = f"""# DBT Build Task Summary\n## Successful Runs\n\n{successful_runs_str}\n\n## Failed Runs\n{failed_runs_str}\n\n## Skipped Runs\n{skipped_runs_str}\n\n""" + + create_markdown_artifact( + markdown=markdown, + key=artifact_key, + description="DBT Build Task Summary", + ) + diff --git a/prefect_dbt/tasks.py b/prefect_dbt/tasks.py deleted file mode 100644 index c47e3ae..0000000 --- a/prefect_dbt/tasks.py +++ /dev/null @@ -1,44 +0,0 @@ -import os -from pathlib import Path -from prefect import task, get_run_logger -from prefect.artifacts import create_markdown_artifact -from typing import Any, Dict, List, Optional, Union - -from pydantic import VERSION as PYDANTIC_VERSION - -from dbt.cli.main import dbtRunner, dbtRunnerResult -from prefect_dbt.cli.credentials import DbtCliProfile - -if PYDANTIC_VERSION.startswith("2."): - from pydantic.v1 import Field, validator -else: - from pydantic import Field, validator - -@task -def dbt_build_task( - profiles_dir: Optional[Union[Path, str]] = None, - project_dir: Optional[Union[Path, str]] = None, - overwrite_profiles: bool = False, - dbt_cli_profile: Optional[DbtCliProfile] = None, - tags: Optional[List[str]] = None, - **shell_run_command_kwargs: Dict[str, Any], -): - logger = get_run_logger() - logger.info("Running dbt build task.") - # initialize - dbt = dbtRunner() - - # create CLI args as a list of strings - cli_args = ["build"] - - # run the command - res: dbtRunnerResult = dbt.invoke(cli_args) - - markdown = f""" - result: {res.result} - """ - - create_markdown_artifact( - markdown=markdown, - ) - From 81798d78dae2474bdcd0544936aef6572fe75ff2 Mon Sep 17 00:00:00 2001 From: Rob Freedy Date: Fri, 19 Apr 2024 10:17:33 -0400 Subject: [PATCH 4/7] Unit tests --- prefect_dbt/cli/credentials.py | 43 ++---------- prefect_dbt/cli/tasks.py | 118 +++++++++++++++++++++++++-------- tests/cli/test_tasks.py | 79 ++++++++++++++++++++++ tests/conftest.py | 2 + tests/test_tasks.py | 0 5 files changed, 178 insertions(+), 64 deletions(-) create mode 100644 tests/cli/test_tasks.py delete mode 100644 tests/test_tasks.py diff --git a/prefect_dbt/cli/credentials.py b/prefect_dbt/cli/credentials.py index 88019a5..7ac11d7 100644 --- a/prefect_dbt/cli/credentials.py +++ b/prefect_dbt/cli/credentials.py @@ -1,16 +1,14 @@ """Module containing credentials for interacting with dbt CLI""" -from pathlib import Path -from typing import Any, Dict, Optional, Type, Union -from typing_extensions import Self + +from typing import Any, Dict, Optional, Union from prefect.blocks.core import Block from pydantic import VERSION as PYDANTIC_VERSION -import yaml if PYDANTIC_VERSION.startswith("2."): - from pydantic.v1 import Field, validator + from pydantic.v1 import Field else: - from pydantic import Field, validator + from pydantic import Field from prefect_dbt.cli.configs import GlobalConfigs, TargetConfigs @@ -31,6 +29,7 @@ DEFAULT_PROFILE_PATH = "~/.dbt/profiles.yml" + class DbtCliProfile(Block): """ Profile for use across dbt CLI tasks and flows. @@ -145,36 +144,6 @@ class DbtCliProfile(Block): ), ) - @validator("config", pre=True) - def parse_yaml_config(cls, value): - if isinstance(value, str): - return yaml.safe_load(value) - return value - - @classmethod - def from_file(cls: Type[Self], path: Path = None, context_name: str = None) -> Self: - """ - Create DBTCLIProfile blocks from a dbt profile file. - - By default, the default profile localtion is used (~/.dbt/profiles.yml). - - An alternative file or context may be specified. - - The entire config file will be loaded and stored. - """ - - path = Path(path or DEFAULT_PROFILE_PATH) - path = path.expanduser().resolve() - - # Load the entire config file - profile_file_contents = path.read_text() - config_dict = yaml.safe_load(profile_file_contents) - - print(config_dict) - - return cls(config=config_dict, context_name=context_name) - - def get_profile(self) -> Dict[str, Any]: """ Returns the dbt profile, likely used for writing to profiles.yml. @@ -189,4 +158,4 @@ def get_profile(self) -> Dict[str, Any]: "outputs": {self.target: self.target_configs.get_configs()}, }, } - return profile \ No newline at end of file + return profile diff --git a/prefect_dbt/cli/tasks.py b/prefect_dbt/cli/tasks.py index 5e7b619..ea8489d 100644 --- a/prefect_dbt/cli/tasks.py +++ b/prefect_dbt/cli/tasks.py @@ -1,68 +1,132 @@ import os -import json from pathlib import Path -from prefect import task, get_run_logger -from prefect.artifacts import create_markdown_artifact -from typing import Any, Dict, List, Optional, Union +from typing import Optional, Union +import yaml +from dbt.cli.main import dbtRunner, dbtRunnerResult +from dbt.contracts.results import NodeStatus +from prefect import get_run_logger, task +from prefect.artifacts import create_markdown_artifact from pydantic import VERSION as PYDANTIC_VERSION -from dbt.cli.main import dbtRunner, dbtRunnerResult -from dbt.contracts.results import RunStatus from prefect_dbt.cli.credentials import DbtCliProfile if PYDANTIC_VERSION.startswith("2."): - from pydantic.v1 import Field, validator + pass else: - from pydantic import Field, validator + pass + @task def dbt_build_task( - project_dir: str, - artifact_key: str = "dbt-build-task-summary" + profiles_dir: Optional[Union[Path, str]] = None, + project_dir: Optional[Union[Path, str]] = None, + overwrite_profiles: bool = False, + dbt_cli_profile: Optional[DbtCliProfile] = None, + dbt_client: Optional[dbtRunner] = None, + create_artifact: bool = True, + artifact_key: str = "dbt-build-task-summary", ): logger = get_run_logger() logger.info("Running dbt build task.") - # initialize - dbt = dbtRunner() + + # Initialize client if not passed in + if not dbt_client: + dbt_client = dbtRunner() + + if profiles_dir is None: + profiles_dir = os.getenv("DBT_PROFILES_DIR", Path.home() / ".dbt") + profiles_dir = Path(profiles_dir).expanduser() + + # https://docs.getdbt.com/dbt-cli/configure-your-profile + # Note that the file always needs to be called profiles.yml, + # regardless of which directory it is in. + profiles_path = profiles_dir / "profiles.yml" + logger.debug(f"Using this profiles path: {profiles_path}") + + # write the profile if overwrite or no profiles exist + if overwrite_profiles or not profiles_path.exists(): + if dbt_cli_profile is None: + raise ValueError("Provide `dbt_cli_profile` keyword for writing profiles") + profile = dbt_cli_profile.get_profile() + profiles_dir.mkdir(exist_ok=True) + with open(profiles_path, "w+") as f: + yaml.dump(profile, f, default_flow_style=False) + logger.info(f"Wrote profile to {profiles_path}") + elif dbt_cli_profile is not None: + raise ValueError( + f"Since overwrite_profiles is False and profiles_path ({profiles_path}) " + f"already exists, the profile within dbt_cli_profile could not be used; " + f"if the existing profile is satisfactory, do not pass dbt_cli_profile" + ) # create CLI args as a list of strings cli_args = ["build"] + # append the options + cli_args.append("--profiles-dir") + cli_args.append(profiles_dir) + if project_dir is not None: + project_dir = Path(project_dir).expanduser() + cli_args.append("--project-dir") + cli_args.append(project_dir) + # run the command - res: dbtRunnerResult = dbt.invoke(cli_args) + res: dbtRunnerResult = dbt_client.invoke(cli_args) if res.exception is not None: logger.error(f"dbt build task failed with exception: {res.exception}") raise res.exception - + if res.success: - logger.info(f"dbt build task succeeded.") + logger.info("dbt build task succeeded.") else: - logger.error(f"dbt build task failed.") + logger.error("dbt build task failed.") + + if create_artifact: + create_dbt_task_artifact(artifact_key=artifact_key, results=res, mode="Build") + - #Create Summary Markdown Artifact +def create_dbt_task_artifact( + artifact_key: str, results: dbtRunnerResult, mode: str = "Build" +): + # Create Summary Markdown Artifact successful_runs = [] failed_runs = [] skipped_runs = [] - for r in res.result.results: + for r in results.result.results: match r.status: - case RunStatus.Success: + case NodeStatus.Success: + successful_runs.append(r) + case NodeStatus.Pass: successful_runs.append(r) - case RunStatus.Error: + case NodeStatus.Fail: failed_runs.append(r) - case RunStatus.Skipped: + case NodeStatus.Error: + failed_runs.append(r) + case NodeStatus.RuntimeErr: + failed_runs.append(r) + case NodeStatus.Skipped: skipped_runs.append(r) - successful_runs_str = "\n".join([f"{r.node.name}" for r in successful_runs]) - failed_runs_str = "\n".join([f"{r.node.name}" for r in failed_runs]) - skipped_runs_str = "\n".join([f"{r.node.name}" for r in skipped_runs]) + markdown = "# DBT Build Task Summary" + + if failed_runs != []: + failed_runs_str = "" + for r in failed_runs: + failed_runs_str += f"**{r.node.name}**\nNode Type: {r.node.resource_type}\nNode Path: {r.node.original_file_path}" + if r.message: + message = r.message.replace("\n", ".") + failed_runs_str += f"\nError Message: {message}\n" + markdown += f"""\n## Failed Runs\n\n{failed_runs_str}\n\n""" - markdown = f"""# DBT Build Task Summary\n## Successful Runs\n\n{successful_runs_str}\n\n## Failed Runs\n{failed_runs_str}\n\n## Skipped Runs\n{skipped_runs_str}\n\n""" + successful_runs_str = "\n".join([f"**{r.node.name}**" for r in successful_runs]) + markdown += f"""\n## Successful Runs\n\n{successful_runs_str}\n\n""" + + skipped_runs_str = "\n".join([f"**{r.node.name}**" for r in skipped_runs]) + markdown += f"""## Skipped Runs\n{skipped_runs_str}\n\n""" create_markdown_artifact( markdown=markdown, key=artifact_key, - description="DBT Build Task Summary", ) - diff --git a/tests/cli/test_tasks.py b/tests/cli/test_tasks.py new file mode 100644 index 0000000..565dd95 --- /dev/null +++ b/tests/cli/test_tasks.py @@ -0,0 +1,79 @@ +from unittest.mock import MagicMock + +import pytest +from dbt.cli.main import dbtRunnerResult +from dbt.contracts.files import FileHash +from dbt.contracts.graph.nodes import ModelNode +from dbt.contracts.results import RunExecutionResult, RunResult +from prefect import flow +from prefect.artifacts import Artifact + +from prefect_dbt.cli.tasks import dbt_build_task + + +@pytest.fixture +async def mock_dbt_runner_invoke_success(): + return dbtRunnerResult( + success=True, + exception=None, + result=RunExecutionResult( + results=[ + RunResult( + status="pass", + timing=None, + thread_id="'Thread-1 (worker)'", + message="CREATE TABLE (1.0 rows, 0 processed)", + failures=None, + node=ModelNode( + database="test-123", + schema="prefect_dbt_example", + name="my_first_dbt_model", + resource_type="model", + package_name="prefect_dbt_bigquery", + path="example/my_first_dbt_model.sql", + original_file_path="models/example/my_first_dbt_model.sql", + unique_id="model.prefect_dbt_bigquery.my_first_dbt_model", + fqn=["prefect_dbt_bigquery", "example", "my_first_dbt_model"], + alias="my_first_dbt_model", + checksum=FileHash(name="sha256", checksum="123456789"), + ), + execution_time=0.0, + adapter_response=None, + ) + ], + elapsed_time=0.0, + ), + ) + + +@pytest.fixture() +def dbt_runner_result(monkeypatch, mock_dbt_runner_invoke_success): + _mock_dbt_runner_invoke_success = MagicMock( + return_value=mock_dbt_runner_invoke_success + ) + monkeypatch.setattr( + "dbt.cli.main.dbtRunner.invoke", _mock_dbt_runner_invoke_success + ) + + +@pytest.fixture +def profiles_dir(tmp_path): + return tmp_path / ".dbt" + + +@pytest.mark.usefixtures("dbt_runner_result") +def test_dbt_build_task_creates_artifact(profiles_dir, dbt_cli_profile_bare): + @flow + def test_flow(): + return dbt_build_task( + profiles_dir=profiles_dir, + dbt_cli_profile=dbt_cli_profile_bare, + artifact_key="foo", + create_artifact=True, + ) + + test_flow() + assert (a := Artifact.get(key="foo")) + assert a.type == "markdown" + assert a.data.startswith("# DBT Build Task Summary") + assert "my_first_dbt_model" in a.data diff --git a/tests/conftest.py b/tests/conftest.py index c4e7a6f..46c0e82 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,6 +21,8 @@ from prefect_dbt.cli.credentials import DbtCliProfile from prefect_dbt.cloud.credentials import DbtCloudCredentials +from dbt.cli.main import dbtRunner + @pytest.fixture def dbt_cloud_credentials(): diff --git a/tests/test_tasks.py b/tests/test_tasks.py deleted file mode 100644 index e69de29..0000000 From ddd7a965403fb174309178e8744c66012513cefa Mon Sep 17 00:00:00 2001 From: Rob Freedy Date: Fri, 19 Apr 2024 10:20:58 -0400 Subject: [PATCH 5/7] Cleanup unused code and match case statement --- prefect_dbt/cli/credentials.py | 2 -- prefect_dbt/cli/tasks.py | 23 ++++++++++------------- tests/conftest.py | 2 -- 3 files changed, 10 insertions(+), 17 deletions(-) diff --git a/prefect_dbt/cli/credentials.py b/prefect_dbt/cli/credentials.py index 7ac11d7..51e3a85 100644 --- a/prefect_dbt/cli/credentials.py +++ b/prefect_dbt/cli/credentials.py @@ -27,8 +27,6 @@ except ImportError: PostgresTargetConfigs = None -DEFAULT_PROFILE_PATH = "~/.dbt/profiles.yml" - class DbtCliProfile(Block): """ diff --git a/prefect_dbt/cli/tasks.py b/prefect_dbt/cli/tasks.py index ea8489d..804f917 100644 --- a/prefect_dbt/cli/tasks.py +++ b/prefect_dbt/cli/tasks.py @@ -95,19 +95,16 @@ def create_dbt_task_artifact( failed_runs = [] skipped_runs = [] for r in results.result.results: - match r.status: - case NodeStatus.Success: - successful_runs.append(r) - case NodeStatus.Pass: - successful_runs.append(r) - case NodeStatus.Fail: - failed_runs.append(r) - case NodeStatus.Error: - failed_runs.append(r) - case NodeStatus.RuntimeErr: - failed_runs.append(r) - case NodeStatus.Skipped: - skipped_runs.append(r) + if r.status == NodeStatus.Success or r.status == NodeStatus.Pass: + successful_runs.append(r) + elif ( + r.status == NodeStatus.Fail + or r.status == NodeStatus.Error + or r.status == NodeStatus.RuntimeErr + ): + failed_runs.append(r) + elif r.status == NodeStatus.Skipped: + skipped_runs.append(r) markdown = "# DBT Build Task Summary" diff --git a/tests/conftest.py b/tests/conftest.py index 46c0e82..c4e7a6f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,8 +21,6 @@ from prefect_dbt.cli.credentials import DbtCliProfile from prefect_dbt.cloud.credentials import DbtCloudCredentials -from dbt.cli.main import dbtRunner - @pytest.fixture def dbt_cloud_credentials(): From c393777cd90cfec055a2204ddb2b0d78f3f478f8 Mon Sep 17 00:00:00 2001 From: Rob Freedy Date: Fri, 19 Apr 2024 16:11:17 -0400 Subject: [PATCH 6/7] PR changes and add docstrings --- docs/cli/tasks.md | 1 + prefect_dbt/cli/tasks.py | 96 +++++++++++++++++++++++++++++++--------- 2 files changed, 77 insertions(+), 20 deletions(-) create mode 100644 docs/cli/tasks.md diff --git a/docs/cli/tasks.md b/docs/cli/tasks.md new file mode 100644 index 0000000..5486a9f --- /dev/null +++ b/docs/cli/tasks.md @@ -0,0 +1 @@ +::: prefect_dbt.cli.tasks \ No newline at end of file diff --git a/prefect_dbt/cli/tasks.py b/prefect_dbt/cli/tasks.py index 804f917..d184976 100644 --- a/prefect_dbt/cli/tasks.py +++ b/prefect_dbt/cli/tasks.py @@ -1,6 +1,9 @@ +"""Module containing pre-built tasks and flows that execute specific DBT CLI commands""" + import os from pathlib import Path from typing import Optional, Union +from uuid import UUID import yaml from dbt.cli.main import dbtRunner, dbtRunnerResult @@ -27,6 +30,48 @@ def dbt_build_task( create_artifact: bool = True, artifact_key: str = "dbt-build-task-summary", ): + """ + Executes the 'dbt build' command within a Prefect task, + and optionally creates a Prefect artifact summarizing the dbt build results. + + Args: + profiles_dir: The directory to search for the profiles.yml file. Setting this + appends the `--profiles-dir` option to the command provided. + If this is not set, will try using the DBT_PROFILES_DIR env variable, + but if that's also not set, will use the default directory `$HOME/.dbt/`. + project_dir: The directory to search for the dbt_project.yml file. + Default is the current working directory and its parents. + overwrite_profiles: Whether the existing profiles.yml file under profiles_dir + should be overwritten with a new profile. + dbt_cli_profile: Profiles class containing the profile written to profiles.yml. + Note! This is optional and will raise an error + if profiles.yml already exists under profile_dir + and overwrite_profiles is set to False. + dbt_client: An instance of a dbtRunner client to execute dbt commands. If None, + a new instance is created. + create_artifact: If True, creates a Prefect artifact on the task run + with the dbt build results using the specified artifact key. + Defaults to True. + artifact_key: The key under which to store + the dbt build results artifact in Prefect. + Defaults to 'dbt-build-task-summary'. + + Example: + ```python + @flow + def dbt_test_flow(): + dbt_build_task( + project_dir="/Users/test/my_dbt_project_dir" + ) + ``` + + Raises: + ValueError: If required dbt_cli_profile is not provided + when needed for profile writing. + RuntimeError: If the dbt build fails for any reason, + it will be indicated by the exception raised. + """ + logger = get_run_logger() logger.info("Running dbt build task.") @@ -55,7 +100,7 @@ def dbt_build_task( logger.info(f"Wrote profile to {profiles_path}") elif dbt_cli_profile is not None: raise ValueError( - f"Since overwrite_profiles is False and profiles_path ({profiles_path}) " + f"Since overwrite_profiles is False and profiles_path ({profiles_path!r})" f"already exists, the profile within dbt_cli_profile could not be used; " f"if the existing profile is satisfactory, do not pass dbt_cli_profile" ) @@ -78,52 +123,63 @@ def dbt_build_task( logger.error(f"dbt build task failed with exception: {res.exception}") raise res.exception - if res.success: + if create_artifact: + artifact_id = create_dbt_task_artifact(artifact_key=artifact_key, results=res) + + if res.success and artifact_id: logger.info("dbt build task succeeded.") else: logger.error("dbt build task failed.") - if create_artifact: - create_dbt_task_artifact(artifact_key=artifact_key, results=res, mode="Build") - -def create_dbt_task_artifact( - artifact_key: str, results: dbtRunnerResult, mode: str = "Build" -): +def create_dbt_task_artifact(artifact_key: str, results: dbtRunnerResult) -> UUID: + """ + Creates a Prefect task artifact summarizing the results + of the above predefined prefrect-dbt task. + """ # Create Summary Markdown Artifact - successful_runs = [] - failed_runs = [] - skipped_runs = [] + run_statuses: dict[str, list[str]] = { + "successful": [], + "failed": [], + "skipped": [], + } + for r in results.result.results: if r.status == NodeStatus.Success or r.status == NodeStatus.Pass: - successful_runs.append(r) + run_statuses["successful"].append(r) elif ( r.status == NodeStatus.Fail or r.status == NodeStatus.Error or r.status == NodeStatus.RuntimeErr ): - failed_runs.append(r) + run_statuses["failed"].append(r) elif r.status == NodeStatus.Skipped: - skipped_runs.append(r) + run_statuses["skipped"].append(r) markdown = "# DBT Build Task Summary" - if failed_runs != []: + if run_statuses["failed"] != []: failed_runs_str = "" - for r in failed_runs: - failed_runs_str += f"**{r.node.name}**\nNode Type: {r.node.resource_type}\nNode Path: {r.node.original_file_path}" + for r in run_statuses["failed"]: + failed_runs_str += f"**{r.node.name}**\n \ + Node Type: {r.node.resource_type}\n \ + Node Path: {r.node.original_file_path}" if r.message: message = r.message.replace("\n", ".") failed_runs_str += f"\nError Message: {message}\n" markdown += f"""\n## Failed Runs\n\n{failed_runs_str}\n\n""" - successful_runs_str = "\n".join([f"**{r.node.name}**" for r in successful_runs]) + successful_runs_str = "\n".join( + [f"**{r.node.name}**" for r in run_statuses["successful"]] + ) markdown += f"""\n## Successful Runs\n\n{successful_runs_str}\n\n""" - skipped_runs_str = "\n".join([f"**{r.node.name}**" for r in skipped_runs]) + skipped_runs_str = "\n".join( + [f"**{r.node.name}**" for r in run_statuses["skipped"]] + ) markdown += f"""## Skipped Runs\n{skipped_runs_str}\n\n""" - create_markdown_artifact( + return create_markdown_artifact( markdown=markdown, key=artifact_key, ) From 4367cd6029227a65afeb13033fdd48ae83e71a7d Mon Sep 17 00:00:00 2001 From: Rob Freedy Date: Fri, 19 Apr 2024 16:12:15 -0400 Subject: [PATCH 7/7] Removing unused pydantic version check --- prefect_dbt/cli/tasks.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/prefect_dbt/cli/tasks.py b/prefect_dbt/cli/tasks.py index d184976..730ee81 100644 --- a/prefect_dbt/cli/tasks.py +++ b/prefect_dbt/cli/tasks.py @@ -10,15 +10,9 @@ from dbt.contracts.results import NodeStatus from prefect import get_run_logger, task from prefect.artifacts import create_markdown_artifact -from pydantic import VERSION as PYDANTIC_VERSION from prefect_dbt.cli.credentials import DbtCliProfile -if PYDANTIC_VERSION.startswith("2."): - pass -else: - pass - @task def dbt_build_task(