This repository has been archived by the owner on Apr 26, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 9
DBT Build Task #184
Merged
Merged
DBT Build Task #184
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
00ac96c
Initial dbt_build_task and requirements update
robfreedy 7c27069
Init tests, profile yaml block, and markdown artifact
robfreedy 1579acc
WIP, build task and profile improvements
robfreedy 81798d7
Unit tests
robfreedy ddd7a96
Cleanup unused code and match case statement
robfreedy c393777
PR changes and add docstrings
robfreedy 4367cd6
Removing unused pydantic version check
robfreedy File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
::: prefect_dbt.cli.tasks |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
"""Module containing pre-built tasks and flows that execute specific DBT CLI commands""" | ||
|
||
import os | ||
from pathlib import Path | ||
from typing import Optional, Union | ||
from uuid import UUID | ||
|
||
import yaml | ||
from dbt.cli.main import dbtRunner, dbtRunnerResult | ||
from dbt.contracts.results import NodeStatus | ||
from prefect import get_run_logger, task | ||
from prefect.artifacts import create_markdown_artifact | ||
|
||
from prefect_dbt.cli.credentials import DbtCliProfile | ||
|
||
|
||
@task | ||
def dbt_build_task( | ||
profiles_dir: Optional[Union[Path, str]] = None, | ||
project_dir: Optional[Union[Path, str]] = None, | ||
overwrite_profiles: bool = False, | ||
dbt_cli_profile: Optional[DbtCliProfile] = None, | ||
dbt_client: Optional[dbtRunner] = None, | ||
create_artifact: bool = True, | ||
artifact_key: str = "dbt-build-task-summary", | ||
): | ||
""" | ||
Executes the 'dbt build' command within a Prefect task, | ||
and optionally creates a Prefect artifact summarizing the dbt build results. | ||
|
||
Args: | ||
profiles_dir: The directory to search for the profiles.yml file. Setting this | ||
appends the `--profiles-dir` option to the command provided. | ||
If this is not set, will try using the DBT_PROFILES_DIR env variable, | ||
but if that's also not set, will use the default directory `$HOME/.dbt/`. | ||
project_dir: The directory to search for the dbt_project.yml file. | ||
Default is the current working directory and its parents. | ||
overwrite_profiles: Whether the existing profiles.yml file under profiles_dir | ||
should be overwritten with a new profile. | ||
dbt_cli_profile: Profiles class containing the profile written to profiles.yml. | ||
Note! This is optional and will raise an error | ||
if profiles.yml already exists under profile_dir | ||
and overwrite_profiles is set to False. | ||
dbt_client: An instance of a dbtRunner client to execute dbt commands. If None, | ||
a new instance is created. | ||
create_artifact: If True, creates a Prefect artifact on the task run | ||
with the dbt build results using the specified artifact key. | ||
Defaults to True. | ||
artifact_key: The key under which to store | ||
the dbt build results artifact in Prefect. | ||
Defaults to 'dbt-build-task-summary'. | ||
|
||
Example: | ||
```python | ||
@flow | ||
def dbt_test_flow(): | ||
dbt_build_task( | ||
project_dir="/Users/test/my_dbt_project_dir" | ||
) | ||
``` | ||
|
||
Raises: | ||
ValueError: If required dbt_cli_profile is not provided | ||
when needed for profile writing. | ||
RuntimeError: If the dbt build fails for any reason, | ||
it will be indicated by the exception raised. | ||
""" | ||
|
||
logger = get_run_logger() | ||
logger.info("Running dbt build task.") | ||
|
||
# Initialize client if not passed in | ||
if not dbt_client: | ||
dbt_client = dbtRunner() | ||
|
||
if profiles_dir is None: | ||
profiles_dir = os.getenv("DBT_PROFILES_DIR", Path.home() / ".dbt") | ||
profiles_dir = Path(profiles_dir).expanduser() | ||
|
||
# https://docs.getdbt.com/dbt-cli/configure-your-profile | ||
# Note that the file always needs to be called profiles.yml, | ||
# regardless of which directory it is in. | ||
profiles_path = profiles_dir / "profiles.yml" | ||
logger.debug(f"Using this profiles path: {profiles_path}") | ||
|
||
# write the profile if overwrite or no profiles exist | ||
if overwrite_profiles or not profiles_path.exists(): | ||
if dbt_cli_profile is None: | ||
raise ValueError("Provide `dbt_cli_profile` keyword for writing profiles") | ||
profile = dbt_cli_profile.get_profile() | ||
profiles_dir.mkdir(exist_ok=True) | ||
with open(profiles_path, "w+") as f: | ||
yaml.dump(profile, f, default_flow_style=False) | ||
logger.info(f"Wrote profile to {profiles_path}") | ||
elif dbt_cli_profile is not None: | ||
raise ValueError( | ||
f"Since overwrite_profiles is False and profiles_path ({profiles_path!r})" | ||
f"already exists, the profile within dbt_cli_profile could not be used; " | ||
f"if the existing profile is satisfactory, do not pass dbt_cli_profile" | ||
) | ||
|
||
# create CLI args as a list of strings | ||
cli_args = ["build"] | ||
|
||
# append the options | ||
cli_args.append("--profiles-dir") | ||
cli_args.append(profiles_dir) | ||
if project_dir is not None: | ||
project_dir = Path(project_dir).expanduser() | ||
cli_args.append("--project-dir") | ||
cli_args.append(project_dir) | ||
|
||
# run the command | ||
res: dbtRunnerResult = dbt_client.invoke(cli_args) | ||
|
||
if res.exception is not None: | ||
logger.error(f"dbt build task failed with exception: {res.exception}") | ||
raise res.exception | ||
|
||
if create_artifact: | ||
artifact_id = create_dbt_task_artifact(artifact_key=artifact_key, results=res) | ||
|
||
if res.success and artifact_id: | ||
logger.info("dbt build task succeeded.") | ||
else: | ||
logger.error("dbt build task failed.") | ||
|
||
|
||
def create_dbt_task_artifact(artifact_key: str, results: dbtRunnerResult) -> UUID: | ||
""" | ||
Creates a Prefect task artifact summarizing the results | ||
of the above predefined prefrect-dbt task. | ||
""" | ||
# Create Summary Markdown Artifact | ||
run_statuses: dict[str, list[str]] = { | ||
"successful": [], | ||
"failed": [], | ||
"skipped": [], | ||
} | ||
|
||
for r in results.result.results: | ||
if r.status == NodeStatus.Success or r.status == NodeStatus.Pass: | ||
run_statuses["successful"].append(r) | ||
elif ( | ||
r.status == NodeStatus.Fail | ||
or r.status == NodeStatus.Error | ||
or r.status == NodeStatus.RuntimeErr | ||
): | ||
run_statuses["failed"].append(r) | ||
elif r.status == NodeStatus.Skipped: | ||
run_statuses["skipped"].append(r) | ||
|
||
markdown = "# DBT Build Task Summary" | ||
|
||
if run_statuses["failed"] != []: | ||
failed_runs_str = "" | ||
for r in run_statuses["failed"]: | ||
failed_runs_str += f"**{r.node.name}**\n \ | ||
Node Type: {r.node.resource_type}\n \ | ||
Node Path: {r.node.original_file_path}" | ||
if r.message: | ||
message = r.message.replace("\n", ".") | ||
failed_runs_str += f"\nError Message: {message}\n" | ||
markdown += f"""\n## Failed Runs\n\n{failed_runs_str}\n\n""" | ||
|
||
successful_runs_str = "\n".join( | ||
[f"**{r.node.name}**" for r in run_statuses["successful"]] | ||
) | ||
markdown += f"""\n## Successful Runs\n\n{successful_runs_str}\n\n""" | ||
|
||
skipped_runs_str = "\n".join( | ||
[f"**{r.node.name}**" for r in run_statuses["skipped"]] | ||
) | ||
markdown += f"""## Skipped Runs\n{skipped_runs_str}\n\n""" | ||
|
||
return create_markdown_artifact( | ||
markdown=markdown, | ||
key=artifact_key, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
prefect>=2.13.5 | ||
prefect>=2.16.6 | ||
dbt_core>=1.1.1 | ||
prefect_shell>=0.1.4 | ||
sgqlc>=16.0.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
from unittest.mock import MagicMock | ||
|
||
import pytest | ||
from dbt.cli.main import dbtRunnerResult | ||
from dbt.contracts.files import FileHash | ||
from dbt.contracts.graph.nodes import ModelNode | ||
from dbt.contracts.results import RunExecutionResult, RunResult | ||
from prefect import flow | ||
from prefect.artifacts import Artifact | ||
|
||
from prefect_dbt.cli.tasks import dbt_build_task | ||
|
||
|
||
@pytest.fixture | ||
async def mock_dbt_runner_invoke_success(): | ||
return dbtRunnerResult( | ||
success=True, | ||
exception=None, | ||
result=RunExecutionResult( | ||
results=[ | ||
RunResult( | ||
status="pass", | ||
timing=None, | ||
thread_id="'Thread-1 (worker)'", | ||
message="CREATE TABLE (1.0 rows, 0 processed)", | ||
failures=None, | ||
node=ModelNode( | ||
database="test-123", | ||
schema="prefect_dbt_example", | ||
name="my_first_dbt_model", | ||
resource_type="model", | ||
package_name="prefect_dbt_bigquery", | ||
path="example/my_first_dbt_model.sql", | ||
original_file_path="models/example/my_first_dbt_model.sql", | ||
unique_id="model.prefect_dbt_bigquery.my_first_dbt_model", | ||
fqn=["prefect_dbt_bigquery", "example", "my_first_dbt_model"], | ||
alias="my_first_dbt_model", | ||
checksum=FileHash(name="sha256", checksum="123456789"), | ||
), | ||
execution_time=0.0, | ||
adapter_response=None, | ||
) | ||
], | ||
elapsed_time=0.0, | ||
), | ||
) | ||
|
||
|
||
@pytest.fixture() | ||
def dbt_runner_result(monkeypatch, mock_dbt_runner_invoke_success): | ||
_mock_dbt_runner_invoke_success = MagicMock( | ||
return_value=mock_dbt_runner_invoke_success | ||
) | ||
monkeypatch.setattr( | ||
"dbt.cli.main.dbtRunner.invoke", _mock_dbt_runner_invoke_success | ||
) | ||
|
||
|
||
@pytest.fixture | ||
def profiles_dir(tmp_path): | ||
return tmp_path / ".dbt" | ||
|
||
|
||
@pytest.mark.usefixtures("dbt_runner_result") | ||
def test_dbt_build_task_creates_artifact(profiles_dir, dbt_cli_profile_bare): | ||
@flow | ||
def test_flow(): | ||
return dbt_build_task( | ||
profiles_dir=profiles_dir, | ||
dbt_cli_profile=dbt_cli_profile_bare, | ||
artifact_key="foo", | ||
create_artifact=True, | ||
) | ||
|
||
test_flow() | ||
assert (a := Artifact.get(key="foo")) | ||
assert a.type == "markdown" | ||
assert a.data.startswith("# DBT Build Task Summary") | ||
assert "my_first_dbt_model" in a.data |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it make sense to parameterize this for a given task so the user doesn't always overwrite the same key by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am going to leave this as the default for the dbt build command because dbt build runs all of the models/tests/snapshots/seeds/etc. We will have other pre-built tasks with their own artifacts for each of the individual node types that I will make sure have unique keys for the artifacts