From 4114ab81f9758710a70d4d381ff74de454d2452d Mon Sep 17 00:00:00 2001 From: Emil Christensen Date: Thu, 17 Oct 2024 10:40:40 -0400 Subject: [PATCH 1/3] Adds sync_compatible to each run_dbt_* function --- src/integrations/prefect-dbt/prefect_dbt/cli/commands.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/integrations/prefect-dbt/prefect_dbt/cli/commands.py b/src/integrations/prefect-dbt/prefect_dbt/cli/commands.py index f12ead188d90..588c4ffbe9ae 100644 --- a/src/integrations/prefect-dbt/prefect_dbt/cli/commands.py +++ b/src/integrations/prefect-dbt/prefect_dbt/cli/commands.py @@ -398,6 +398,7 @@ def _compile_kwargs(self, **open_kwargs: Dict[str, Any]) -> Dict[str, Any]: @task +@sync_compatible async def run_dbt_build( profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, @@ -471,6 +472,7 @@ def dbt_test_flow(): @task +@sync_compatible async def run_dbt_model( profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, @@ -545,6 +547,7 @@ def dbt_test_flow(): @task +@sync_compatible async def run_dbt_test( profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, @@ -619,6 +622,7 @@ def dbt_test_flow(): @task +@sync_compatible async def run_dbt_snapshot( profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, @@ -693,6 +697,7 @@ def dbt_test_flow(): @task +@sync_compatible async def run_dbt_seed( profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, @@ -767,6 +772,7 @@ def dbt_test_flow(): @task +@sync_compatible async def run_dbt_source_freshness( profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, From c21d84d77f03dde41074232bfdfd3f366880cac4 Mon Sep 17 00:00:00 2001 From: Emil Christensen Date: Mon, 21 Oct 2024 11:46:02 -0400 Subject: [PATCH 2/3] Fixes tests by removing awaits --- .../prefect-dbt/tests/cli/test_commands.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/integrations/prefect-dbt/tests/cli/test_commands.py b/src/integrations/prefect-dbt/tests/cli/test_commands.py index 96add6efa380..a3e136c8fa69 100644 --- a/src/integrations/prefect-dbt/tests/cli/test_commands.py +++ b/src/integrations/prefect-dbt/tests/cli/test_commands.py @@ -561,7 +561,7 @@ def test_append_dirs_to_commands( async def test_run_dbt_build_creates_artifact(profiles_dir, dbt_cli_profile_bare): @flow async def test_flow(): - return await run_dbt_build( + return run_dbt_build( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", @@ -580,7 +580,7 @@ async def test_flow(): async def test_run_dbt_test_creates_artifact(profiles_dir, dbt_cli_profile_bare): @flow async def test_flow(): - return await run_dbt_test( + return run_dbt_test( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", @@ -599,7 +599,7 @@ async def test_flow(): async def test_run_dbt_snapshot_creates_artifact(profiles_dir, dbt_cli_profile_bare): @flow async def test_flow(): - return await run_dbt_snapshot( + return run_dbt_snapshot( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", @@ -618,7 +618,7 @@ async def test_flow(): async def test_run_dbt_seed_creates_artifact(profiles_dir, dbt_cli_profile_bare): @flow async def test_flow(): - return await run_dbt_seed( + return run_dbt_seed( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", @@ -637,7 +637,7 @@ async def test_flow(): async def test_run_dbt_model_creates_artifact(profiles_dir, dbt_cli_profile_bare): @flow async def test_flow(): - return await run_dbt_model( + return run_dbt_model( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", @@ -658,7 +658,7 @@ async def test_run_dbt_source_freshness_creates_artifact( ): @flow async def test_flow(): - return await run_dbt_source_freshness( + return run_dbt_source_freshness( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", @@ -685,7 +685,7 @@ async def test_run_dbt_model_creates_unsuccessful_artifact( ): @flow async def test_flow(): - return await run_dbt_model( + return run_dbt_model( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", @@ -709,7 +709,7 @@ async def test_run_dbt_source_freshness_creates_unsuccessful_artifact( ): @flow async def test_flow(): - return await run_dbt_source_freshness( + return run_dbt_source_freshness( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", @@ -731,7 +731,7 @@ async def test_flow(): async def test_run_dbt_model_throws_error(profiles_dir, dbt_cli_profile_bare): @flow async def test_flow(): - return await run_dbt_model( + return run_dbt_model( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", From 8d77e03525246710fb7040bbaf440e840a84b3f5 Mon Sep 17 00:00:00 2001 From: Emil Christensen Date: Mon, 21 Oct 2024 13:22:45 -0400 Subject: [PATCH 3/3] Reverses decorator order and undoes changes to tests --- .../prefect-dbt/prefect_dbt/cli/commands.py | 12 ++++++------ .../prefect-dbt/tests/cli/test_commands.py | 18 +++++++++--------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/integrations/prefect-dbt/prefect_dbt/cli/commands.py b/src/integrations/prefect-dbt/prefect_dbt/cli/commands.py index 588c4ffbe9ae..2193de75f037 100644 --- a/src/integrations/prefect-dbt/prefect_dbt/cli/commands.py +++ b/src/integrations/prefect-dbt/prefect_dbt/cli/commands.py @@ -397,8 +397,8 @@ def _compile_kwargs(self, **open_kwargs: Dict[str, Any]) -> Dict[str, Any]: return super(type(self), modified_self)._compile_kwargs(**open_kwargs) -@task @sync_compatible +@task async def run_dbt_build( profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, @@ -471,8 +471,8 @@ def dbt_test_flow(): return results -@task @sync_compatible +@task async def run_dbt_model( profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, @@ -546,8 +546,8 @@ def dbt_test_flow(): return results -@task @sync_compatible +@task async def run_dbt_test( profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, @@ -621,8 +621,8 @@ def dbt_test_flow(): return results -@task @sync_compatible +@task async def run_dbt_snapshot( profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, @@ -696,8 +696,8 @@ def dbt_test_flow(): return results -@task @sync_compatible +@task async def run_dbt_seed( profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, @@ -771,8 +771,8 @@ def dbt_test_flow(): return results -@task @sync_compatible +@task async def run_dbt_source_freshness( profiles_dir: Optional[Union[Path, str]] = None, project_dir: Optional[Union[Path, str]] = None, diff --git a/src/integrations/prefect-dbt/tests/cli/test_commands.py b/src/integrations/prefect-dbt/tests/cli/test_commands.py index a3e136c8fa69..96add6efa380 100644 --- a/src/integrations/prefect-dbt/tests/cli/test_commands.py +++ b/src/integrations/prefect-dbt/tests/cli/test_commands.py @@ -561,7 +561,7 @@ def test_append_dirs_to_commands( async def test_run_dbt_build_creates_artifact(profiles_dir, dbt_cli_profile_bare): @flow async def test_flow(): - return run_dbt_build( + return await run_dbt_build( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", @@ -580,7 +580,7 @@ async def test_flow(): async def test_run_dbt_test_creates_artifact(profiles_dir, dbt_cli_profile_bare): @flow async def test_flow(): - return run_dbt_test( + return await run_dbt_test( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", @@ -599,7 +599,7 @@ async def test_flow(): async def test_run_dbt_snapshot_creates_artifact(profiles_dir, dbt_cli_profile_bare): @flow async def test_flow(): - return run_dbt_snapshot( + return await run_dbt_snapshot( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", @@ -618,7 +618,7 @@ async def test_flow(): async def test_run_dbt_seed_creates_artifact(profiles_dir, dbt_cli_profile_bare): @flow async def test_flow(): - return run_dbt_seed( + return await run_dbt_seed( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", @@ -637,7 +637,7 @@ async def test_flow(): async def test_run_dbt_model_creates_artifact(profiles_dir, dbt_cli_profile_bare): @flow async def test_flow(): - return run_dbt_model( + return await run_dbt_model( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", @@ -658,7 +658,7 @@ async def test_run_dbt_source_freshness_creates_artifact( ): @flow async def test_flow(): - return run_dbt_source_freshness( + return await run_dbt_source_freshness( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", @@ -685,7 +685,7 @@ async def test_run_dbt_model_creates_unsuccessful_artifact( ): @flow async def test_flow(): - return run_dbt_model( + return await run_dbt_model( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", @@ -709,7 +709,7 @@ async def test_run_dbt_source_freshness_creates_unsuccessful_artifact( ): @flow async def test_flow(): - return run_dbt_source_freshness( + return await run_dbt_source_freshness( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", @@ -731,7 +731,7 @@ async def test_flow(): async def test_run_dbt_model_throws_error(profiles_dir, dbt_cli_profile_bare): @flow async def test_flow(): - return run_dbt_model( + return await run_dbt_model( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo",