diff --git a/.changes/unreleased/Features-20220408-165459.yaml b/.changes/unreleased/Features-20220408-165459.yaml new file mode 100644 index 00000000000..c29cfc617c7 --- /dev/null +++ b/.changes/unreleased/Features-20220408-165459.yaml @@ -0,0 +1,8 @@ +kind: Features +body: Added favor-state flag to optionally favor state nodes even if unselected node + exists +time: 2022-04-08T16:54:59.696564+01:00 +custom: + Author: daniel-murray josephberni + Issue: "2968" + PR: "5859" diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index 5aac858d284..dfde4109c13 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -1008,6 +1008,7 @@ def merge_from_artifact( adapter, other: "WritableManifest", selected: AbstractSet[UniqueID], + favor_state: bool = False, ) -> None: """Given the selected unique IDs and a writable manifest, update this manifest by replacing any unselected nodes with their counterpart. @@ -1022,7 +1023,10 @@ def merge_from_artifact( node.resource_type in refables and not node.is_ephemeral and unique_id not in selected - and not adapter.get_relation(current.database, current.schema, current.identifier) + and ( + not adapter.get_relation(current.database, current.schema, current.identifier) + or favor_state + ) ): merged.add(unique_id) self.nodes[unique_id] = node.replace(deferred=True) diff --git a/core/dbt/flags.py b/core/dbt/flags.py index a4db9a1eb87..540bf6ab815 100644 --- a/core/dbt/flags.py +++ b/core/dbt/flags.py @@ -106,6 +106,7 @@ def env_set_path(key: str) -> Optional[Path]: MACRO_DEBUGGING = env_set_truthy("DBT_MACRO_DEBUGGING") DEFER_MODE = env_set_truthy("DBT_DEFER_TO_STATE") +FAVOR_STATE_MODE = env_set_truthy("DBT_FAVOR_STATE_STATE") ARTIFACT_STATE_PATH = env_set_path("DBT_ARTIFACT_STATE_PATH") ENABLE_LEGACY_LOGGER = env_set_truthy("DBT_ENABLE_LEGACY_LOGGER") diff --git a/core/dbt/main.py b/core/dbt/main.py index 5be5151bc0c..917ff033f17 100644 --- a/core/dbt/main.py +++ b/core/dbt/main.py @@ -496,6 +496,20 @@ def _add_defer_argument(*subparsers): ) +def _add_favor_state_argument(*subparsers): + for sub in subparsers: + sub.add_optional_argument_inverse( + "--favor-state", + enable_help=""" + If set, defer to the state variable for resolving unselected nodes, even if node exist as a database object in the current environment. + """, + disable_help=""" + If defer is set, expect standard defer behaviour. + """, + default=flags.FAVOR_STATE_MODE, + ) + + def _build_run_subparser(subparsers, base_subparser): run_sub = subparsers.add_parser( "run", @@ -1168,6 +1182,8 @@ def parse_args(args, cls=DBTArgumentParser): _add_selection_arguments(run_sub, compile_sub, generate_sub, test_sub, snapshot_sub, seed_sub) # --defer _add_defer_argument(run_sub, test_sub, build_sub, snapshot_sub, compile_sub) + # --favor-state + _add_favor_state_argument(run_sub, test_sub, build_sub, snapshot_sub) # --full-refresh _add_table_mutability_arguments(run_sub, compile_sub, build_sub) diff --git a/test/integration/062_defer_state_tests/test_defer_state.py b/test/integration/062_defer_state_tests/test_defer_state.py index 56004a1f28c..058e43ef05f 100644 --- a/test/integration/062_defer_state_tests/test_defer_state.py +++ b/test/integration/062_defer_state_tests/test_defer_state.py @@ -89,6 +89,9 @@ def run_and_snapshot_defer(self): # defer test, it succeeds results = self.run_dbt(['snapshot', '--state', 'state', '--defer']) + # favor_state test, it succeeds + results = self.run_dbt(['snapshot', '--state', 'state', '--defer', '--favor-state']) + def run_and_defer(self): results = self.run_dbt(['seed']) assert len(results) == 1 @@ -123,6 +126,40 @@ def run_and_defer(self): assert len(results) == 1 + def run_and_defer_favor_state(self): + results = self.run_dbt(['seed']) + assert len(results) == 1 + assert not any(r.node.deferred for r in results) + results = self.run_dbt(['run']) + assert len(results) == 2 + assert not any(r.node.deferred for r in results) + results = self.run_dbt(['test']) + assert len(results) == 2 + + # copy files over from the happy times when we had a good target + self.copy_state() + + # test tests first, because run will change things + # no state, wrong schema, failure. + self.run_dbt(['test', '--target', 'otherschema'], expect_pass=False) + + # no state, run also fails + self.run_dbt(['run', '--target', 'otherschema'], expect_pass=False) + + # defer test, it succeeds + results = self.run_dbt(['test', '-m', 'view_model+', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema']) + + # with state it should work though + results = self.run_dbt(['run', '-m', 'view_model', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema']) + assert self.other_schema not in results[0].node.compiled_code + assert self.unique_schema() in results[0].node.compiled_code + + with open('target/manifest.json') as fp: + data = json.load(fp) + assert data['nodes']['seed.test.seed']['deferred'] + + assert len(results) == 1 + def run_switchdirs_defer(self): results = self.run_dbt(['seed']) assert len(results) == 1 @@ -152,6 +189,35 @@ def run_switchdirs_defer(self): expect_pass=False, ) + def run_switchdirs_defer_favor_state(self): + results = self.run_dbt(['seed']) + assert len(results) == 1 + results = self.run_dbt(['run']) + assert len(results) == 2 + + # copy files over from the happy times when we had a good target + self.copy_state() + + self.use_default_project({'model-paths': ['changed_models']}) + # the sql here is just wrong, so it should fail + self.run_dbt( + ['run', '-m', 'view_model', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'], + expect_pass=False, + ) + # but this should work since we just use the old happy model + self.run_dbt( + ['run', '-m', 'table_model', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'], + expect_pass=True, + ) + + self.use_default_project({'model-paths': ['changed_models_bad']}) + # this should fail because the table model refs a broken ephemeral + # model, which it should see + self.run_dbt( + ['run', '-m', 'table_model', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'], + expect_pass=False, + ) + def run_defer_iff_not_exists(self): results = self.run_dbt(['seed', '--target', 'otherschema']) assert len(results) == 1 @@ -169,6 +235,23 @@ def run_defer_iff_not_exists(self): assert self.other_schema not in results[0].node.compiled_code assert self.unique_schema() in results[0].node.compiled_code + def run_defer_iff_not_exists_favor_state(self): + results = self.run_dbt(['seed']) + assert len(results) == 1 + results = self.run_dbt(['run']) + assert len(results) == 2 + + # copy files over from the happy times when we had a good target + self.copy_state() + results = self.run_dbt(['seed']) + assert len(results) == 1 + results = self.run_dbt(['run', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema']) + assert len(results) == 2 + + # because the seed exists in other schema, we should defer it + assert self.other_schema not in results[0].node.compiled_code + assert self.unique_schema() in results[0].node.compiled_code + def run_defer_deleted_upstream(self): results = self.run_dbt(['seed']) assert len(results) == 1 @@ -191,6 +274,27 @@ def run_defer_deleted_upstream(self): assert self.other_schema not in results[0].node.compiled_code assert self.unique_schema() in results[0].node.compiled_code + def run_defer_deleted_upstream_favor_state(self): + results = self.run_dbt(['seed']) + assert len(results) == 1 + results = self.run_dbt(['run']) + assert len(results) == 2 + + # copy files over from the happy times when we had a good target + self.copy_state() + + self.use_default_project({'model-paths': ['changed_models_missing']}) + + self.run_dbt( + ['run', '-m', 'view_model', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'], + expect_pass=True, + ) + + # despite deferral, test should use models just created in our schema + results = self.run_dbt(['test', '--state', 'state', '--defer', '--favor-state']) + assert self.other_schema not in results[0].node.compiled_code + assert self.unique_schema() in results[0].node.compiled_code + @use_profile('postgres') def test_postgres_state_changetarget(self): self.run_and_defer() @@ -199,18 +303,38 @@ def test_postgres_state_changetarget(self): with pytest.raises(SystemExit): self.run_dbt(['seed', '--defer']) + @use_profile('postgres') + def test_postgres_state_changetarget_favor_state(self): + self.run_and_defer_favor_state() + + # make sure these commands don't work with --defer + with pytest.raises(SystemExit): + self.run_dbt(['seed', '--defer']) + @use_profile('postgres') def test_postgres_state_changedir(self): self.run_switchdirs_defer() + @use_profile('postgres') + def test_postgres_state_changedir_favor_state(self): + self.run_switchdirs_defer_favor_state() + @use_profile('postgres') def test_postgres_state_defer_iffnotexists(self): self.run_defer_iff_not_exists() + @use_profile('postgres') + def test_postgres_state_defer_iffnotexists_favor_state(self): + self.run_defer_iff_not_exists_favor_state() + @use_profile('postgres') def test_postgres_state_defer_deleted_upstream(self): self.run_defer_deleted_upstream() + @use_profile('postgres') + def test_postgres_state_defer_deleted_upstream_favor_state(self): + self.run_defer_deleted_upstream_favor_state() + @use_profile('postgres') def test_postgres_state_snapshot_defer(self): self.run_and_snapshot_defer()