diff --git a/.changes/unreleased/Fixes-20230119-111101.yaml b/.changes/unreleased/Fixes-20230119-111101.yaml new file mode 100644 index 00000000000..5e7415849f4 --- /dev/null +++ b/.changes/unreleased/Fixes-20230119-111101.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Fix behavior of --favor-state with --defer +time: 2023-01-19T11:11:01.354227+01:00 +custom: + Author: jtcohen6 + Issue: "6617" diff --git a/core/dbt/main.py b/core/dbt/main.py index 8368ab9f723..a6c6f0b013d 100644 --- a/core/dbt/main.py +++ b/core/dbt/main.py @@ -486,7 +486,7 @@ def _build_snapshot_subparser(subparsers, base_subparser): return sub -def _add_defer_argument(*subparsers): +def _add_defer_arguments(*subparsers): for sub in subparsers: sub.add_optional_argument_inverse( "--defer", @@ -499,10 +499,6 @@ def _add_defer_argument(*subparsers): """, default=flags.DEFER_MODE, ) - - -def _add_favor_state_argument(*subparsers): - for sub in subparsers: sub.add_optional_argument_inverse( "--favor-state", enable_help=""" @@ -580,7 +576,7 @@ def _build_docs_generate_subparser(subparsers, base_subparser): Do not run "dbt compile" as part of docs generation """, ) - _add_defer_argument(generate_sub) + _add_defer_arguments(generate_sub) return generate_sub @@ -1192,9 +1188,7 @@ def parse_args(args, cls=DBTArgumentParser): # list_sub sets up its own arguments. _add_selection_arguments(run_sub, compile_sub, generate_sub, test_sub, snapshot_sub, seed_sub) # --defer - _add_defer_argument(run_sub, test_sub, build_sub, snapshot_sub, compile_sub) - # --favor-state - _add_favor_state_argument(run_sub, test_sub, build_sub, snapshot_sub) + _add_defer_arguments(run_sub, test_sub, build_sub, snapshot_sub, compile_sub) # --full-refresh _add_table_mutability_arguments(run_sub, compile_sub, build_sub) diff --git a/core/dbt/task/compile.py b/core/dbt/task/compile.py index 995063491f6..7d2bc0482db 100644 --- a/core/dbt/task/compile.py +++ b/core/dbt/task/compile.py @@ -83,6 +83,7 @@ def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]): adapter=adapter, other=deferred_manifest, selected=selected_uids, + favor_state=bool(self.args.favor_state), ) # TODO: is it wrong to write the manifest here? I think it's right... self.write_manifest() diff --git a/test/integration/062_defer_state_tests/changed_models/ephemeral_model.sql b/test/integration/062_defer_state_tests/changed_models/ephemeral_model.sql deleted file mode 100644 index 2f976e3a9b5..00000000000 --- a/test/integration/062_defer_state_tests/changed_models/ephemeral_model.sql +++ /dev/null @@ -1,2 +0,0 @@ -{{ config(materialized='ephemeral') }} -select * from {{ ref('view_model') }} diff --git a/test/integration/062_defer_state_tests/changed_models/schema.yml b/test/integration/062_defer_state_tests/changed_models/schema.yml deleted file mode 100644 index 1ec506d3d19..00000000000 --- a/test/integration/062_defer_state_tests/changed_models/schema.yml +++ /dev/null @@ -1,9 +0,0 @@ -version: 2 -models: - - name: view_model - columns: - - name: id - tests: - - unique - - not_null - - name: name diff --git a/test/integration/062_defer_state_tests/changed_models/table_model.sql b/test/integration/062_defer_state_tests/changed_models/table_model.sql deleted file mode 100644 index 65909318bab..00000000000 --- a/test/integration/062_defer_state_tests/changed_models/table_model.sql +++ /dev/null @@ -1,5 +0,0 @@ -{{ config(materialized='table') }} -select * from {{ ref('ephemeral_model') }} - --- establish a macro dependency to trigger state:modified.macros --- depends on: {{ my_macro() }} \ No newline at end of file diff --git a/test/integration/062_defer_state_tests/changed_models/view_model.sql b/test/integration/062_defer_state_tests/changed_models/view_model.sql deleted file mode 100644 index bddbbb23cc2..00000000000 --- a/test/integration/062_defer_state_tests/changed_models/view_model.sql +++ /dev/null @@ -1 +0,0 @@ -select * from no.such.table diff --git a/test/integration/062_defer_state_tests/changed_models_bad/ephemeral_model.sql b/test/integration/062_defer_state_tests/changed_models_bad/ephemeral_model.sql deleted file mode 100644 index 5155dfa475e..00000000000 --- a/test/integration/062_defer_state_tests/changed_models_bad/ephemeral_model.sql +++ /dev/null @@ -1,2 +0,0 @@ -{{ config(materialized='ephemeral') }} -select * from no.such.table diff --git a/test/integration/062_defer_state_tests/changed_models_bad/schema.yml b/test/integration/062_defer_state_tests/changed_models_bad/schema.yml deleted file mode 100644 index 1ec506d3d19..00000000000 --- a/test/integration/062_defer_state_tests/changed_models_bad/schema.yml +++ /dev/null @@ -1,9 +0,0 @@ -version: 2 -models: - - name: view_model - columns: - - name: id - tests: - - unique - - not_null - - name: name diff --git a/test/integration/062_defer_state_tests/changed_models_bad/table_model.sql b/test/integration/062_defer_state_tests/changed_models_bad/table_model.sql deleted file mode 100644 index 65909318bab..00000000000 --- a/test/integration/062_defer_state_tests/changed_models_bad/table_model.sql +++ /dev/null @@ -1,5 +0,0 @@ -{{ config(materialized='table') }} -select * from {{ ref('ephemeral_model') }} - --- establish a macro dependency to trigger state:modified.macros --- depends on: {{ my_macro() }} \ No newline at end of file diff --git a/test/integration/062_defer_state_tests/changed_models_bad/view_model.sql b/test/integration/062_defer_state_tests/changed_models_bad/view_model.sql deleted file mode 100644 index bddbbb23cc2..00000000000 --- a/test/integration/062_defer_state_tests/changed_models_bad/view_model.sql +++ /dev/null @@ -1 +0,0 @@ -select * from no.such.table diff --git a/test/integration/062_defer_state_tests/changed_models_missing/schema.yml b/test/integration/062_defer_state_tests/changed_models_missing/schema.yml deleted file mode 100644 index 1ec506d3d19..00000000000 --- a/test/integration/062_defer_state_tests/changed_models_missing/schema.yml +++ /dev/null @@ -1,9 +0,0 @@ -version: 2 -models: - - name: view_model - columns: - - name: id - tests: - - unique - - not_null - - name: name diff --git a/test/integration/062_defer_state_tests/changed_models_missing/table_model.sql b/test/integration/062_defer_state_tests/changed_models_missing/table_model.sql deleted file mode 100644 index 22b040d2c8b..00000000000 --- a/test/integration/062_defer_state_tests/changed_models_missing/table_model.sql +++ /dev/null @@ -1,2 +0,0 @@ -{{ config(materialized='table') }} -select 1 as fun diff --git a/test/integration/062_defer_state_tests/changed_models_missing/view_model.sql b/test/integration/062_defer_state_tests/changed_models_missing/view_model.sql deleted file mode 100644 index 4b91aa0f2fa..00000000000 --- a/test/integration/062_defer_state_tests/changed_models_missing/view_model.sql +++ /dev/null @@ -1 +0,0 @@ -select * from {{ ref('seed') }} diff --git a/test/integration/062_defer_state_tests/macros/infinite_macros.sql b/test/integration/062_defer_state_tests/macros/infinite_macros.sql deleted file mode 100644 index 81d2083d3bb..00000000000 --- a/test/integration/062_defer_state_tests/macros/infinite_macros.sql +++ /dev/null @@ -1,13 +0,0 @@ -{# trigger infinite recursion if not handled #} - -{% macro my_infinitely_recursive_macro() %} - {{ return(adapter.dispatch('my_infinitely_recursive_macro')()) }} -{% endmacro %} - -{% macro default__my_infinitely_recursive_macro() %} - {% if unmet_condition %} - {{ my_infinitely_recursive_macro() }} - {% else %} - {{ return('') }} - {% endif %} -{% endmacro %} diff --git a/test/integration/062_defer_state_tests/macros/macros.sql b/test/integration/062_defer_state_tests/macros/macros.sql deleted file mode 100644 index 79519c1b60b..00000000000 --- a/test/integration/062_defer_state_tests/macros/macros.sql +++ /dev/null @@ -1,3 +0,0 @@ -{% macro my_macro() %} - {% do log('in a macro' ) %} -{% endmacro %} diff --git a/test/integration/062_defer_state_tests/models/ephemeral_model.sql b/test/integration/062_defer_state_tests/models/ephemeral_model.sql deleted file mode 100644 index 2f976e3a9b5..00000000000 --- a/test/integration/062_defer_state_tests/models/ephemeral_model.sql +++ /dev/null @@ -1,2 +0,0 @@ -{{ config(materialized='ephemeral') }} -select * from {{ ref('view_model') }} diff --git a/test/integration/062_defer_state_tests/models/exposures.yml b/test/integration/062_defer_state_tests/models/exposures.yml deleted file mode 100644 index 489dec3c3c4..00000000000 --- a/test/integration/062_defer_state_tests/models/exposures.yml +++ /dev/null @@ -1,8 +0,0 @@ -version: 2 -exposures: - - name: my_exposure - type: application - depends_on: - - ref('view_model') - owner: - email: test@example.com diff --git a/test/integration/062_defer_state_tests/models/schema.yml b/test/integration/062_defer_state_tests/models/schema.yml deleted file mode 100644 index 342335148bf..00000000000 --- a/test/integration/062_defer_state_tests/models/schema.yml +++ /dev/null @@ -1,10 +0,0 @@ -version: 2 -models: - - name: view_model - columns: - - name: id - tests: - - unique: - severity: error - - not_null - - name: name diff --git a/test/integration/062_defer_state_tests/models/table_model.sql b/test/integration/062_defer_state_tests/models/table_model.sql deleted file mode 100644 index 65909318bab..00000000000 --- a/test/integration/062_defer_state_tests/models/table_model.sql +++ /dev/null @@ -1,5 +0,0 @@ -{{ config(materialized='table') }} -select * from {{ ref('ephemeral_model') }} - --- establish a macro dependency to trigger state:modified.macros --- depends on: {{ my_macro() }} \ No newline at end of file diff --git a/test/integration/062_defer_state_tests/models/view_model.sql b/test/integration/062_defer_state_tests/models/view_model.sql deleted file mode 100644 index 72cb07a5ef4..00000000000 --- a/test/integration/062_defer_state_tests/models/view_model.sql +++ /dev/null @@ -1,4 +0,0 @@ -select * from {{ ref('seed') }} - --- establish a macro dependency that trips infinite recursion if not handled --- depends on: {{ my_infinitely_recursive_macro() }} \ No newline at end of file diff --git a/test/integration/062_defer_state_tests/previous_state/manifest.json b/test/integration/062_defer_state_tests/previous_state/manifest.json deleted file mode 100644 index 6ab63f3f563..00000000000 --- a/test/integration/062_defer_state_tests/previous_state/manifest.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "metadata": { - "dbt_schema_version": "https://schemas.getdbt.com/dbt/manifest/v3.json", - "dbt_version": "0.21.1" - } -} diff --git a/test/integration/062_defer_state_tests/seeds/seed.csv b/test/integration/062_defer_state_tests/seeds/seed.csv deleted file mode 100644 index 1a728c8ab74..00000000000 --- a/test/integration/062_defer_state_tests/seeds/seed.csv +++ /dev/null @@ -1,3 +0,0 @@ -id,name -1,Alice -2,Bob diff --git a/test/integration/062_defer_state_tests/snapshots/my_snapshot.sql b/test/integration/062_defer_state_tests/snapshots/my_snapshot.sql deleted file mode 100644 index 6a7d2b31bfa..00000000000 --- a/test/integration/062_defer_state_tests/snapshots/my_snapshot.sql +++ /dev/null @@ -1,14 +0,0 @@ -{% snapshot my_cool_snapshot %} - - {{ - config( - target_database=database, - target_schema=schema, - unique_key='id', - strategy='check', - check_cols=['id'], - ) - }} - select * from {{ ref('view_model') }} - -{% endsnapshot %} diff --git a/test/integration/062_defer_state_tests/test_defer_state.py b/test/integration/062_defer_state_tests/test_defer_state.py deleted file mode 100644 index 593dc034036..00000000000 --- a/test/integration/062_defer_state_tests/test_defer_state.py +++ /dev/null @@ -1,354 +0,0 @@ -from test.integration.base import DBTIntegrationTest, use_profile -import copy -import json -import os -import shutil - -import pytest -import dbt.exceptions - - -class TestDeferState(DBTIntegrationTest): - @property - def schema(self): - return "defer_state_062" - - @property - def models(self): - return "models" - - def setUp(self): - self.other_schema = None - super().setUp() - self._created_schemas.add(self.other_schema) - - @property - def project_config(self): - return { - 'config-version': 2, - 'seeds': { - 'test': { - 'quote_columns': False, - } - } - } - - def get_profile(self, adapter_type): - if self.other_schema is None: - self.other_schema = self.unique_schema() + '_other' - profile = super().get_profile(adapter_type) - default_name = profile['test']['target'] - profile['test']['outputs']['otherschema'] = copy.deepcopy(profile['test']['outputs'][default_name]) - profile['test']['outputs']['otherschema']['schema'] = self.other_schema - return profile - - def copy_state(self): - assert not os.path.exists('state') - os.makedirs('state') - shutil.copyfile('target/manifest.json', 'state/manifest.json') - - def run_and_compile_defer(self): - results = self.run_dbt(['seed']) - assert len(results) == 1 - assert not any(r.node.deferred for r in results) - results = self.run_dbt(['run']) - assert len(results) == 2 - assert not any(r.node.deferred for r in results) - results = self.run_dbt(['test']) - assert len(results) == 2 - - # copy files - self.copy_state() - - # defer test, it succeeds - results, success = self.run_dbt_and_check(['compile', '--state', 'state', '--defer']) - self.assertEqual(len(results.results), 6) - self.assertEqual(results.results[0].node.name, "seed") - self.assertTrue(success) - - def run_and_snapshot_defer(self): - results = self.run_dbt(['seed']) - assert len(results) == 1 - assert not any(r.node.deferred for r in results) - results = self.run_dbt(['run']) - assert len(results) == 2 - assert not any(r.node.deferred for r in results) - results = self.run_dbt(['test']) - assert len(results) == 2 - - # snapshot succeeds without --defer - results = self.run_dbt(['snapshot']) - - # no state, snapshot fails - with pytest.raises(dbt.exceptions.DbtRuntimeError): - results = self.run_dbt(['snapshot', '--state', 'state', '--defer']) - - # copy files - self.copy_state() - - # defer test, it succeeds - results = self.run_dbt(['snapshot', '--state', 'state', '--defer']) - - # favor_state test, it succeeds - results = self.run_dbt(['snapshot', '--state', 'state', '--defer', '--favor-state']) - - def run_and_defer(self): - results = self.run_dbt(['seed']) - assert len(results) == 1 - assert not any(r.node.deferred for r in results) - results = self.run_dbt(['run']) - assert len(results) == 2 - assert not any(r.node.deferred for r in results) - results = self.run_dbt(['test']) - assert len(results) == 2 - - # copy files over from the happy times when we had a good target - self.copy_state() - - # test tests first, because run will change things - # no state, wrong schema, failure. - self.run_dbt(['test', '--target', 'otherschema'], expect_pass=False) - - # test generate docs - # no state, wrong schema, empty nodes - catalog = self.run_dbt(['docs','generate','--target', 'otherschema']) - assert not catalog.nodes - - # no state, run also fails - self.run_dbt(['run', '--target', 'otherschema'], expect_pass=False) - - # defer test, it succeeds - results = self.run_dbt(['test', '-m', 'view_model+', '--state', 'state', '--defer', '--target', 'otherschema']) - - # defer docs generate with state, catalog refers schema from the happy times - catalog = self.run_dbt(['docs','generate', '-m', 'view_model+', '--state', 'state', '--defer','--target', 'otherschema']) - assert self.other_schema not in catalog.nodes["seed.test.seed"].metadata.schema - assert self.unique_schema() in catalog.nodes["seed.test.seed"].metadata.schema - - # with state it should work though - results = self.run_dbt(['run', '-m', 'view_model', '--state', 'state', '--defer', '--target', 'otherschema']) - assert self.other_schema not in results[0].node.compiled_code - assert self.unique_schema() in results[0].node.compiled_code - - with open('target/manifest.json') as fp: - data = json.load(fp) - assert data['nodes']['seed.test.seed']['deferred'] - - assert len(results) == 1 - - def run_and_defer_favor_state(self): - results = self.run_dbt(['seed']) - assert len(results) == 1 - assert not any(r.node.deferred for r in results) - results = self.run_dbt(['run']) - assert len(results) == 2 - assert not any(r.node.deferred for r in results) - results = self.run_dbt(['test']) - assert len(results) == 2 - - # copy files over from the happy times when we had a good target - self.copy_state() - - # test tests first, because run will change things - # no state, wrong schema, failure. - self.run_dbt(['test', '--target', 'otherschema'], expect_pass=False) - - # no state, run also fails - self.run_dbt(['run', '--target', 'otherschema'], expect_pass=False) - - # defer test, it succeeds - results = self.run_dbt(['test', '-m', 'view_model+', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema']) - - # with state it should work though - results = self.run_dbt(['run', '-m', 'view_model', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema']) - assert self.other_schema not in results[0].node.compiled_code - assert self.unique_schema() in results[0].node.compiled_code - - with open('target/manifest.json') as fp: - data = json.load(fp) - assert data['nodes']['seed.test.seed']['deferred'] - - assert len(results) == 1 - - def run_switchdirs_defer(self): - results = self.run_dbt(['seed']) - assert len(results) == 1 - results = self.run_dbt(['run']) - assert len(results) == 2 - - # copy files over from the happy times when we had a good target - self.copy_state() - - self.use_default_project({'model-paths': ['changed_models']}) - # the sql here is just wrong, so it should fail - self.run_dbt( - ['run', '-m', 'view_model', '--state', 'state', '--defer', '--target', 'otherschema'], - expect_pass=False, - ) - # but this should work since we just use the old happy model - self.run_dbt( - ['run', '-m', 'table_model', '--state', 'state', '--defer', '--target', 'otherschema'], - expect_pass=True, - ) - - self.use_default_project({'model-paths': ['changed_models_bad']}) - # this should fail because the table model refs a broken ephemeral - # model, which it should see - self.run_dbt( - ['run', '-m', 'table_model', '--state', 'state', '--defer', '--target', 'otherschema'], - expect_pass=False, - ) - - def run_switchdirs_defer_favor_state(self): - results = self.run_dbt(['seed']) - assert len(results) == 1 - results = self.run_dbt(['run']) - assert len(results) == 2 - - # copy files over from the happy times when we had a good target - self.copy_state() - - self.use_default_project({'model-paths': ['changed_models']}) - # the sql here is just wrong, so it should fail - self.run_dbt( - ['run', '-m', 'view_model', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'], - expect_pass=False, - ) - # but this should work since we just use the old happy model - self.run_dbt( - ['run', '-m', 'table_model', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'], - expect_pass=True, - ) - - self.use_default_project({'model-paths': ['changed_models_bad']}) - # this should fail because the table model refs a broken ephemeral - # model, which it should see - self.run_dbt( - ['run', '-m', 'table_model', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'], - expect_pass=False, - ) - - def run_defer_iff_not_exists(self): - results = self.run_dbt(['seed', '--target', 'otherschema']) - assert len(results) == 1 - results = self.run_dbt(['run', '--target', 'otherschema']) - assert len(results) == 2 - - # copy files over from the happy times when we had a good target - self.copy_state() - results = self.run_dbt(['seed']) - assert len(results) == 1 - results = self.run_dbt(['run', '--state', 'state', '--defer']) - assert len(results) == 2 - - # because the seed now exists in our schema, we shouldn't defer it - assert self.other_schema not in results[0].node.compiled_code - assert self.unique_schema() in results[0].node.compiled_code - - def run_defer_iff_not_exists_favor_state(self): - results = self.run_dbt(['seed']) - assert len(results) == 1 - results = self.run_dbt(['run']) - assert len(results) == 2 - - # copy files over from the happy times when we had a good target - self.copy_state() - results = self.run_dbt(['seed']) - assert len(results) == 1 - results = self.run_dbt(['run', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema']) - assert len(results) == 2 - - # because the seed exists in other schema, we should defer it - assert self.other_schema not in results[0].node.compiled_code - assert self.unique_schema() in results[0].node.compiled_code - - def run_defer_deleted_upstream(self): - results = self.run_dbt(['seed']) - assert len(results) == 1 - results = self.run_dbt(['run']) - assert len(results) == 2 - - # copy files over from the happy times when we had a good target - self.copy_state() - - self.use_default_project({'model-paths': ['changed_models_missing']}) - # ephemeral_model is now gone. previously this caused a - # keyerror (dbt#2875), now it should pass - self.run_dbt( - ['run', '-m', 'view_model', '--state', 'state', '--defer', '--target', 'otherschema'], - expect_pass=True, - ) - - # despite deferral, test should use models just created in our schema - results = self.run_dbt(['test', '--state', 'state', '--defer']) - assert self.other_schema not in results[0].node.compiled_code - assert self.unique_schema() in results[0].node.compiled_code - - def run_defer_deleted_upstream_favor_state(self): - results = self.run_dbt(['seed']) - assert len(results) == 1 - results = self.run_dbt(['run']) - assert len(results) == 2 - - # copy files over from the happy times when we had a good target - self.copy_state() - - self.use_default_project({'model-paths': ['changed_models_missing']}) - - self.run_dbt( - ['run', '-m', 'view_model', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'], - expect_pass=True, - ) - - # despite deferral, test should use models just created in our schema - results = self.run_dbt(['test', '--state', 'state', '--defer', '--favor-state']) - assert self.other_schema not in results[0].node.compiled_code - assert self.unique_schema() in results[0].node.compiled_code - - @use_profile('postgres') - def test_postgres_state_changetarget(self): - self.run_and_defer() - - # make sure these commands don't work with --defer - with pytest.raises(SystemExit): - self.run_dbt(['seed', '--defer']) - - @use_profile('postgres') - def test_postgres_state_changetarget_favor_state(self): - self.run_and_defer_favor_state() - - # make sure these commands don't work with --defer - with pytest.raises(SystemExit): - self.run_dbt(['seed', '--defer']) - - @use_profile('postgres') - def test_postgres_state_changedir(self): - self.run_switchdirs_defer() - - @use_profile('postgres') - def test_postgres_state_changedir_favor_state(self): - self.run_switchdirs_defer_favor_state() - - @use_profile('postgres') - def test_postgres_state_defer_iffnotexists(self): - self.run_defer_iff_not_exists() - - @use_profile('postgres') - def test_postgres_state_defer_iffnotexists_favor_state(self): - self.run_defer_iff_not_exists_favor_state() - - @use_profile('postgres') - def test_postgres_state_defer_deleted_upstream(self): - self.run_defer_deleted_upstream() - - @use_profile('postgres') - def test_postgres_state_defer_deleted_upstream_favor_state(self): - self.run_defer_deleted_upstream_favor_state() - - @use_profile('postgres') - def test_postgres_state_snapshot_defer(self): - self.run_and_snapshot_defer() - - @use_profile('postgres') - def test_postgres_state_compile_defer(self): - self.run_and_compile_defer() diff --git a/test/integration/062_defer_state_tests/test_modified_state.py b/test/integration/062_defer_state_tests/test_modified_state.py deleted file mode 100644 index 085faf11d5b..00000000000 --- a/test/integration/062_defer_state_tests/test_modified_state.py +++ /dev/null @@ -1,211 +0,0 @@ -from test.integration.base import DBTIntegrationTest, use_profile -import os -import random -import shutil -import string - -import pytest - -from dbt.exceptions import CompilationError, IncompatibleSchemaError - - -class TestModifiedState(DBTIntegrationTest): - @property - def schema(self): - return "modified_state_062" - - @property - def models(self): - return "models" - - @property - def project_config(self): - return { - 'config-version': 2, - 'macro-paths': ['macros'], - 'seeds': { - 'test': { - 'quote_columns': True, - } - } - } - - def _symlink_test_folders(self): - # dbt's normal symlink behavior breaks this test. Copy the files - # so we can freely modify them. - for entry in os.listdir(self.test_original_source_path): - src = os.path.join(self.test_original_source_path, entry) - tst = os.path.join(self.test_root_dir, entry) - if entry in {'models', 'seeds', 'macros', 'previous_state'}: - shutil.copytree(src, tst) - elif os.path.isdir(entry) or entry.endswith('.sql'): - os.symlink(src, tst) - - def copy_state(self): - assert not os.path.exists('state') - os.makedirs('state') - shutil.copyfile('target/manifest.json', 'state/manifest.json') - - def setUp(self): - super().setUp() - self.run_dbt(['seed']) - self.run_dbt(['run']) - self.copy_state() - - @use_profile('postgres') - def test_postgres_changed_seed_contents_state(self): - results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'state:modified', '--state', './state'], expect_pass=True) - assert len(results) == 0 - with open('seeds/seed.csv') as fp: - fp.readline() - newline = fp.newlines - with open('seeds/seed.csv', 'a') as fp: - fp.write(f'3,carl{newline}') - - results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'state:modified', '--state', './state']) - assert len(results) == 1 - assert results[0] == 'test.seed' - - results = self.run_dbt(['ls', '--select', 'state:modified', '--state', './state']) - assert len(results) == 1 - assert results[0] == 'test.seed' - - results = self.run_dbt(['ls', '--select', 'state:modified+', '--state', './state']) - assert len(results) == 7 - assert set(results) == {'test.seed', 'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'} - - shutil.rmtree('./state') - self.copy_state() - - with open('seeds/seed.csv', 'a') as fp: - # assume each line is ~2 bytes + len(name) - target_size = 1*1024*1024 - line_size = 64 - - num_lines = target_size // line_size - - maxlines = num_lines + 4 - - for idx in range(4, maxlines): - value = ''.join(random.choices(string.ascii_letters, k=62)) - fp.write(f'{idx},{value}{newline}') - - # now if we run again, we should get a warning - results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'state:modified', '--state', './state']) - assert len(results) == 1 - assert results[0] == 'test.seed' - - with pytest.raises(CompilationError) as exc: - self.run_dbt(['--warn-error', 'ls', '--resource-type', 'seed', '--select', 'state:modified', '--state', './state']) - assert '>1MB' in str(exc.value) - - shutil.rmtree('./state') - self.copy_state() - - # once it's in path mode, we don't mark it as modified if it changes - with open('seeds/seed.csv', 'a') as fp: - fp.write(f'{random},test{newline}') - - results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'state:modified', '--state', './state'], expect_pass=True) - assert len(results) == 0 - - @use_profile('postgres') - def test_postgres_changed_seed_config(self): - results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'state:modified', '--state', './state'], expect_pass=True) - assert len(results) == 0 - - self.use_default_project({'seeds': {'test': {'quote_columns': False}}}) - - # quoting change -> seed changed - results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'state:modified', '--state', './state']) - assert len(results) == 1 - assert results[0] == 'test.seed' - - @use_profile('postgres') - def test_postgres_unrendered_config_same(self): - results = self.run_dbt(['ls', '--resource-type', 'model', '--select', 'state:modified', '--state', './state'], expect_pass=True) - assert len(results) == 0 - - # although this is the default value, dbt will recognize it as a change - # for previously-unconfigured models, because it's been explicitly set - self.use_default_project({'models': {'test': {'materialized': 'view'}}}) - results = self.run_dbt(['ls', '--resource-type', 'model', '--select', 'state:modified', '--state', './state']) - assert len(results) == 1 - assert results[0] == 'test.view_model' - - @use_profile('postgres') - def test_postgres_changed_model_contents(self): - results = self.run_dbt(['run', '--models', 'state:modified', '--state', './state']) - assert len(results) == 0 - - with open('models/table_model.sql') as fp: - fp.readline() - newline = fp.newlines - - with open('models/table_model.sql', 'w') as fp: - fp.write("{{ config(materialized='table') }}") - fp.write(newline) - fp.write("select * from {{ ref('seed') }}") - fp.write(newline) - - results = self.run_dbt(['run', '--models', 'state:modified', '--state', './state']) - assert len(results) == 1 - assert results[0].node.name == 'table_model' - - @use_profile('postgres') - def test_postgres_new_macro(self): - with open('macros/macros.sql') as fp: - fp.readline() - newline = fp.newlines - - new_macro = '{% macro my_other_macro() %}{% endmacro %}' + newline - - # add a new macro to a new file - with open('macros/second_macro.sql', 'w') as fp: - fp.write(new_macro) - - results, stdout = self.run_dbt_and_capture(['run', '--models', 'state:modified', '--state', './state']) - assert len(results) == 0 - - os.remove('macros/second_macro.sql') - # add a new macro to the existing file - with open('macros/macros.sql', 'a') as fp: - fp.write(new_macro) - - results, stdout = self.run_dbt_and_capture(['run', '--models', 'state:modified', '--state', './state']) - assert len(results) == 0 - - @use_profile('postgres') - def test_postgres_changed_macro_contents(self): - with open('macros/macros.sql') as fp: - fp.readline() - newline = fp.newlines - - # modify an existing macro - with open('macros/macros.sql', 'w') as fp: - fp.write("{% macro my_macro() %}") - fp.write(newline) - fp.write(" {% do log('in a macro', info=True) %}") - fp.write(newline) - fp.write('{% endmacro %}') - fp.write(newline) - - # table_model calls this macro - results, stdout = self.run_dbt_and_capture(['run', '--models', 'state:modified', '--state', './state']) - assert len(results) == 1 - - @use_profile('postgres') - def test_postgres_changed_exposure(self): - with open('models/exposures.yml', 'a') as fp: - fp.write(' name: John Doe\n') - - results, stdout = self.run_dbt_and_capture(['run', '--models', '+state:modified', '--state', './state']) - assert len(results) == 1 - assert results[0].node.name == 'view_model' - - @use_profile('postgres') - def test_postgres_previous_version_manifest(self): - # This tests that a different schema version in the file throws an error - with self.assertRaises(IncompatibleSchemaError) as exc: - results = self.run_dbt(['ls', '-s', 'state:modified', '--state', './previous_state']) - self.assertEqual(exc.CODE, 10014) diff --git a/test/integration/062_defer_state_tests/test_run_results_state.py b/test/integration/062_defer_state_tests/test_run_results_state.py deleted file mode 100644 index 58215009ad7..00000000000 --- a/test/integration/062_defer_state_tests/test_run_results_state.py +++ /dev/null @@ -1,434 +0,0 @@ -from test.integration.base import DBTIntegrationTest, use_profile -import os -import random -import shutil -import string - -import pytest - - -class TestRunResultsState(DBTIntegrationTest): - @property - def schema(self): - return "run_results_state_062" - - @property - def models(self): - return "models" - - @property - def project_config(self): - return { - 'config-version': 2, - 'macro-paths': ['macros'], - 'seeds': { - 'test': { - 'quote_columns': True, - } - } - } - - def _symlink_test_folders(self): - # dbt's normal symlink behavior breaks this test. Copy the files - # so we can freely modify them. - for entry in os.listdir(self.test_original_source_path): - src = os.path.join(self.test_original_source_path, entry) - tst = os.path.join(self.test_root_dir, entry) - if entry in {'models', 'seeds', 'macros'}: - shutil.copytree(src, tst) - elif os.path.isdir(entry) or entry.endswith('.sql'): - os.symlink(src, tst) - - def copy_state(self): - assert not os.path.exists('state') - os.makedirs('state') - shutil.copyfile('target/manifest.json', 'state/manifest.json') - shutil.copyfile('target/run_results.json', 'state/run_results.json') - - def setUp(self): - super().setUp() - self.run_dbt(['build']) - self.copy_state() - - def rebuild_run_dbt(self, expect_pass=True): - shutil.rmtree('./state') - self.run_dbt(['build'], expect_pass=expect_pass) - self.copy_state() - - @use_profile('postgres') - def test_postgres_seed_run_results_state(self): - shutil.rmtree('./state') - self.run_dbt(['seed']) - self.copy_state() - results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'result:success', '--state', './state'], expect_pass=True) - assert len(results) == 1 - assert results[0] == 'test.seed' - - results = self.run_dbt(['ls', '--select', 'result:success', '--state', './state']) - assert len(results) == 1 - assert results[0] == 'test.seed' - - results = self.run_dbt(['ls', '--select', 'result:success+', '--state', './state']) - assert len(results) == 7 - assert set(results) == {'test.seed', 'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'} - - with open('seeds/seed.csv') as fp: - fp.readline() - newline = fp.newlines - with open('seeds/seed.csv', 'a') as fp: - fp.write(f'\"\'\'3,carl{newline}') - shutil.rmtree('./state') - self.run_dbt(['seed'], expect_pass=False) - self.copy_state() - - results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'result:error', '--state', './state'], expect_pass=True) - assert len(results) == 1 - assert results[0] == 'test.seed' - - results = self.run_dbt(['ls', '--select', 'result:error', '--state', './state']) - assert len(results) == 1 - assert results[0] == 'test.seed' - - results = self.run_dbt(['ls', '--select', 'result:error+', '--state', './state']) - assert len(results) == 7 - assert set(results) == {'test.seed', 'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'} - - - with open('seeds/seed.csv') as fp: - fp.readline() - newline = fp.newlines - with open('seeds/seed.csv', 'a') as fp: - # assume each line is ~2 bytes + len(name) - target_size = 1*1024*1024 - line_size = 64 - - num_lines = target_size // line_size - - maxlines = num_lines + 4 - - for idx in range(4, maxlines): - value = ''.join(random.choices(string.ascii_letters, k=62)) - fp.write(f'{idx},{value}{newline}') - shutil.rmtree('./state') - self.run_dbt(['seed'], expect_pass=False) - self.copy_state() - - results = self.run_dbt(['ls', '--resource-type', 'seed', '--select', 'result:error', '--state', './state'], expect_pass=True) - assert len(results) == 1 - assert results[0] == 'test.seed' - - results = self.run_dbt(['ls', '--select', 'result:error', '--state', './state']) - assert len(results) == 1 - assert results[0] == 'test.seed' - - results = self.run_dbt(['ls', '--select', 'result:error+', '--state', './state']) - assert len(results) == 7 - assert set(results) == {'test.seed', 'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'} - - @use_profile('postgres') - def test_postgres_build_run_results_state(self): - results = self.run_dbt(['build', '--select', 'result:error', '--state', './state']) - assert len(results) == 0 - - with open('models/view_model.sql') as fp: - fp.readline() - newline = fp.newlines - - with open('models/view_model.sql', 'w') as fp: - fp.write(newline) - fp.write("select * from forced_error") - fp.write(newline) - - self.rebuild_run_dbt(expect_pass=False) - - results = self.run_dbt(['build', '--select', 'result:error', '--state', './state'], expect_pass=False) - assert len(results) == 3 - nodes = set([elem.node.name for elem in results]) - assert nodes == {'view_model', 'not_null_view_model_id','unique_view_model_id'} - - results = self.run_dbt(['ls', '--select', 'result:error', '--state', './state']) - assert len(results) == 3 - assert set(results) == {'test.view_model', 'test.not_null_view_model_id', 'test.unique_view_model_id'} - - results = self.run_dbt(['build', '--select', 'result:error+', '--state', './state'], expect_pass=False) - assert len(results) == 4 - nodes = set([elem.node.name for elem in results]) - assert nodes == {'table_model','view_model', 'not_null_view_model_id','unique_view_model_id'} - - results = self.run_dbt(['ls', '--select', 'result:error+', '--state', './state']) - assert len(results) == 6 # includes exposure - assert set(results) == {'test.table_model', 'test.view_model', 'test.ephemeral_model', 'test.not_null_view_model_id', 'test.unique_view_model_id', 'exposure:test.my_exposure'} - - # test failure on build tests - # fail the unique test - with open('models/view_model.sql', 'w') as fp: - fp.write(newline) - fp.write("select 1 as id union all select 1 as id") - fp.write(newline) - - self.rebuild_run_dbt(expect_pass=False) - - results = self.run_dbt(['build', '--select', 'result:fail', '--state', './state'], expect_pass=False) - assert len(results) == 1 - assert results[0].node.name == 'unique_view_model_id' - - results = self.run_dbt(['ls', '--select', 'result:fail', '--state', './state']) - assert len(results) == 1 - assert results[0] == 'test.unique_view_model_id' - - results = self.run_dbt(['build', '--select', 'result:fail+', '--state', './state'], expect_pass=False) - assert len(results) == 2 - nodes = set([elem.node.name for elem in results]) - assert nodes == {'table_model', 'unique_view_model_id'} - - results = self.run_dbt(['ls', '--select', 'result:fail+', '--state', './state']) - assert len(results) == 1 - assert set(results) == {'test.unique_view_model_id'} - - # change the unique test severity from error to warn and reuse the same view_model.sql changes above - f = open('models/schema.yml', 'r') - filedata = f.read() - f.close() - newdata = filedata.replace('error','warn') - f = open('models/schema.yml', 'w') - f.write(newdata) - f.close() - - self.rebuild_run_dbt(expect_pass=True) - - results = self.run_dbt(['build', '--select', 'result:warn', '--state', './state'], expect_pass=True) - assert len(results) == 1 - assert results[0].node.name == 'unique_view_model_id' - - results = self.run_dbt(['ls', '--select', 'result:warn', '--state', './state']) - assert len(results) == 1 - assert results[0] == 'test.unique_view_model_id' - - results = self.run_dbt(['build', '--select', 'result:warn+', '--state', './state'], expect_pass=True) - assert len(results) == 2 # includes table_model to be run - nodes = set([elem.node.name for elem in results]) - assert nodes == {'table_model', 'unique_view_model_id'} - - results = self.run_dbt(['ls', '--select', 'result:warn+', '--state', './state']) - assert len(results) == 1 - assert set(results) == {'test.unique_view_model_id'} - - @use_profile('postgres') - def test_postgres_run_run_results_state(self): - results = self.run_dbt(['run', '--select', 'result:success', '--state', './state'], expect_pass=True) - assert len(results) == 2 - assert results[0].node.name == 'view_model' - assert results[1].node.name == 'table_model' - - # clear state and rerun upstream view model to test + operator - shutil.rmtree('./state') - self.run_dbt(['run', '--select', 'view_model'], expect_pass=True) - self.copy_state() - results = self.run_dbt(['run', '--select', 'result:success+', '--state', './state'], expect_pass=True) - assert len(results) == 2 - assert results[0].node.name == 'view_model' - assert results[1].node.name == 'table_model' - - # check we are starting from a place with 0 errors - results = self.run_dbt(['run', '--select', 'result:error', '--state', './state']) - assert len(results) == 0 - - # force an error in the view model to test error and skipped states - with open('models/view_model.sql') as fp: - fp.readline() - newline = fp.newlines - - with open('models/view_model.sql', 'w') as fp: - fp.write(newline) - fp.write("select * from forced_error") - fp.write(newline) - - shutil.rmtree('./state') - self.run_dbt(['run'], expect_pass=False) - self.copy_state() - - # test single result selector on error - results = self.run_dbt(['run', '--select', 'result:error', '--state', './state'], expect_pass=False) - assert len(results) == 1 - assert results[0].node.name == 'view_model' - - # test + operator selection on error - results = self.run_dbt(['run', '--select', 'result:error+', '--state', './state'], expect_pass=False) - assert len(results) == 2 - assert results[0].node.name == 'view_model' - assert results[1].node.name == 'table_model' - - # single result selector on skipped. Expect this to pass becase underlying view already defined above - results = self.run_dbt(['run', '--select', 'result:skipped', '--state', './state'], expect_pass=True) - assert len(results) == 1 - assert results[0].node.name == 'table_model' - - # add a downstream model that depends on table_model for skipped+ selector - with open('models/table_model_downstream.sql', 'w') as fp: - fp.write("select * from {{ref('table_model')}}") - - shutil.rmtree('./state') - self.run_dbt(['run'], expect_pass=False) - self.copy_state() - - results = self.run_dbt(['run', '--select', 'result:skipped+', '--state', './state'], expect_pass=True) - assert len(results) == 2 - assert results[0].node.name == 'table_model' - assert results[1].node.name == 'table_model_downstream' - - - @use_profile('postgres') - def test_postgres_test_run_results_state(self): - # run passed nodes - results = self.run_dbt(['test', '--select', 'result:pass', '--state', './state'], expect_pass=True) - assert len(results) == 2 - nodes = set([elem.node.name for elem in results]) - assert nodes == {'unique_view_model_id', 'not_null_view_model_id'} - - # run passed nodes with + operator - results = self.run_dbt(['test', '--select', 'result:pass+', '--state', './state'], expect_pass=True) - assert len(results) == 2 - nodes = set([elem.node.name for elem in results]) - assert nodes == {'unique_view_model_id', 'not_null_view_model_id'} - - # update view model to generate a failure case - os.remove('./models/view_model.sql') - with open('models/view_model.sql', 'w') as fp: - fp.write("select 1 as id union all select 1 as id") - - self.rebuild_run_dbt(expect_pass=False) - - # test with failure selector - results = self.run_dbt(['test', '--select', 'result:fail', '--state', './state'], expect_pass=False) - assert len(results) == 1 - assert results[0].node.name == 'unique_view_model_id' - - # test with failure selector and + operator - results = self.run_dbt(['test', '--select', 'result:fail+', '--state', './state'], expect_pass=False) - assert len(results) == 1 - assert results[0].node.name == 'unique_view_model_id' - - # change the unique test severity from error to warn and reuse the same view_model.sql changes above - with open('models/schema.yml', 'r+') as f: - filedata = f.read() - newdata = filedata.replace('error','warn') - f.seek(0) - f.write(newdata) - f.truncate() - - # rebuild - expect_pass = True because we changed the error to a warning this time around - self.rebuild_run_dbt(expect_pass=True) - - # test with warn selector - results = self.run_dbt(['test', '--select', 'result:warn', '--state', './state'], expect_pass=True) - assert len(results) == 1 - assert results[0].node.name == 'unique_view_model_id' - - # test with warn selector and + operator - results = self.run_dbt(['test', '--select', 'result:warn+', '--state', './state'], expect_pass=True) - assert len(results) == 1 - assert results[0].node.name == 'unique_view_model_id' - - - @use_profile('postgres') - def test_postgres_concurrent_selectors_run_run_results_state(self): - results = self.run_dbt(['run', '--select', 'state:modified+', 'result:error+', '--state', './state']) - assert len(results) == 0 - - # force an error on a dbt model - with open('models/view_model.sql') as fp: - fp.readline() - newline = fp.newlines - - with open('models/view_model.sql', 'w') as fp: - fp.write(newline) - fp.write("select * from forced_error") - fp.write(newline) - - shutil.rmtree('./state') - self.run_dbt(['run'], expect_pass=False) - self.copy_state() - - # modify another dbt model - with open('models/table_model_modified_example.sql', 'w') as fp: - fp.write(newline) - fp.write("select * from forced_error") - fp.write(newline) - - results = self.run_dbt(['run', '--select', 'state:modified+', 'result:error+', '--state', './state'], expect_pass=False) - assert len(results) == 3 - nodes = set([elem.node.name for elem in results]) - assert nodes == {'view_model', 'table_model_modified_example', 'table_model'} - - - @use_profile('postgres') - def test_postgres_concurrent_selectors_test_run_results_state(self): - # create failure test case for result:fail selector - os.remove('./models/view_model.sql') - with open('./models/view_model.sql', 'w') as f: - f.write('select 1 as id union all select 1 as id union all select null as id') - - # run dbt build again to trigger test errors - self.rebuild_run_dbt(expect_pass=False) - - # get the failures from - results = self.run_dbt(['test', '--select', 'result:fail', '--exclude', 'not_null_view_model_id', '--state', './state'], expect_pass=False) - assert len(results) == 1 - nodes = set([elem.node.name for elem in results]) - assert nodes == {'unique_view_model_id'} - - - @use_profile('postgres') - def test_postgres_concurrent_selectors_build_run_results_state(self): - results = self.run_dbt(['build', '--select', 'state:modified+', 'result:error+', '--state', './state']) - assert len(results) == 0 - - # force an error on a dbt model - with open('models/view_model.sql') as fp: - fp.readline() - newline = fp.newlines - - with open('models/view_model.sql', 'w') as fp: - fp.write(newline) - fp.write("select * from forced_error") - fp.write(newline) - - self.rebuild_run_dbt(expect_pass=False) - - # modify another dbt model - with open('models/table_model_modified_example.sql', 'w') as fp: - fp.write(newline) - fp.write("select * from forced_error") - fp.write(newline) - - results = self.run_dbt(['build', '--select', 'state:modified+', 'result:error+', '--state', './state'], expect_pass=False) - assert len(results) == 5 - nodes = set([elem.node.name for elem in results]) - assert nodes == {'table_model_modified_example', 'view_model', 'table_model', 'not_null_view_model_id', 'unique_view_model_id'} - - # create failure test case for result:fail selector - os.remove('./models/view_model.sql') - with open('./models/view_model.sql', 'w') as f: - f.write('select 1 as id union all select 1 as id') - - # create error model case for result:error selector - with open('./models/error_model.sql', 'w') as f: - f.write('select 1 as id from not_exists') - - # create something downstream from the error model to rerun - with open('./models/downstream_of_error_model.sql', 'w') as f: - f.write('select * from {{ ref("error_model") }} )') - - # regenerate build state - self.rebuild_run_dbt(expect_pass=False) - - # modify model again to trigger the state:modified selector - with open('models/table_model_modified_example.sql', 'w') as fp: - fp.write(newline) - fp.write("select * from forced_another_error") - fp.write(newline) - - results = self.run_dbt(['build', '--select', 'state:modified+', 'result:error+', 'result:fail+', '--state', './state'], expect_pass=False) - assert len(results) == 5 - nodes = set([elem.node.name for elem in results]) - assert nodes == {'error_model', 'downstream_of_error_model', 'table_model_modified_example', 'table_model', 'unique_view_model_id'} diff --git a/tests/functional/defer_state/fixtures.py b/tests/functional/defer_state/fixtures.py new file mode 100644 index 00000000000..17f46f842d9 --- /dev/null +++ b/tests/functional/defer_state/fixtures.py @@ -0,0 +1,101 @@ +seed_csv = """id,name +1,Alice +2,Bob +""" + +table_model_sql = """ +{{ config(materialized='table') }} +select * from {{ ref('ephemeral_model') }} + +-- establish a macro dependency to trigger state:modified.macros +-- depends on: {{ my_macro() }} +""" + +changed_table_model_sql = """ +{{ config(materialized='table') }} +select 1 as fun +""" + +view_model_sql = """ +select * from {{ ref('seed') }} + +-- establish a macro dependency that trips infinite recursion if not handled +-- depends on: {{ my_infinitely_recursive_macro() }} +""" + +changed_view_model_sql = """ +select * from no.such.table +""" + +ephemeral_model_sql = """ +{{ config(materialized='ephemeral') }} +select * from {{ ref('view_model') }} +""" + +changed_ephemeral_model_sql = """ +{{ config(materialized='ephemeral') }} +select * from no.such.table +""" + +schema_yml = """ +version: 2 +models: + - name: view_model + columns: + - name: id + tests: + - unique: + severity: error + - not_null + - name: name +""" + +exposures_yml = """ +version: 2 +exposures: + - name: my_exposure + type: application + depends_on: + - ref('view_model') + owner: + email: test@example.com +""" + +macros_sql = """ +{% macro my_macro() %} + {% do log('in a macro' ) %} +{% endmacro %} +""" + +infinite_macros_sql = """ +{# trigger infinite recursion if not handled #} + +{% macro my_infinitely_recursive_macro() %} + {{ return(adapter.dispatch('my_infinitely_recursive_macro')()) }} +{% endmacro %} + +{% macro default__my_infinitely_recursive_macro() %} + {% if unmet_condition %} + {{ my_infinitely_recursive_macro() }} + {% else %} + {{ return('') }} + {% endif %} +{% endmacro %} +""" + +snapshot_sql = """ +{% snapshot my_cool_snapshot %} + + {{ + config( + target_database=database, + target_schema=schema, + unique_key='id', + strategy='check', + check_cols=['id'], + ) + }} + select * from {{ ref('view_model') }} + +{% endsnapshot %} +""" diff --git a/tests/functional/defer_state/test_defer_state.py b/tests/functional/defer_state/test_defer_state.py new file mode 100644 index 00000000000..134cae1c626 --- /dev/null +++ b/tests/functional/defer_state/test_defer_state.py @@ -0,0 +1,273 @@ +import json +import os +import shutil +from copy import deepcopy + +import pytest + +from dbt.tests.util import run_dbt, write_file, rm_file + +from dbt.exceptions import DbtRuntimeError + +from tests.functional.defer_state.fixtures import ( + seed_csv, + table_model_sql, + changed_table_model_sql, + view_model_sql, + changed_view_model_sql, + ephemeral_model_sql, + changed_ephemeral_model_sql, + schema_yml, + exposures_yml, + macros_sql, + infinite_macros_sql, + snapshot_sql, +) + + +class BaseDeferState: + @pytest.fixture(scope="class") + def models(self): + return { + "table_model.sql": table_model_sql, + "view_model.sql": view_model_sql, + "ephemeral_model.sql": ephemeral_model_sql, + "schema.yml": schema_yml, + "exposures.yml": exposures_yml, + } + + @pytest.fixture(scope="class") + def macros(self): + return { + "macros.sql": macros_sql, + "infinite_macros.sql": infinite_macros_sql, + } + + @pytest.fixture(scope="class") + def seeds(self): + return { + "seed.csv": seed_csv, + } + + @pytest.fixture(scope="class") + def snapshots(self): + return { + "snapshot.sql": snapshot_sql, + } + + @pytest.fixture(scope="class") + def other_schema(self, unique_schema): + return unique_schema + "_other" + + @property + def project_config_update(self): + return { + "seeds": { + "test": { + "quote_columns": False, + } + } + } + + @pytest.fixture(scope="class") + def profiles_config_update(self, dbt_profile_target, unique_schema, other_schema): + outputs = {"default": dbt_profile_target, "otherschema": deepcopy(dbt_profile_target)} + outputs["default"]["schema"] = unique_schema + outputs["otherschema"]["schema"] = other_schema + return {"test": {"outputs": outputs, "target": "default"}} + + def copy_state(self): + if not os.path.exists("state"): + os.makedirs("state") + shutil.copyfile("target/manifest.json", "state/manifest.json") + + def run_and_save_state(self): + results = run_dbt(["seed"]) + assert len(results) == 1 + assert not any(r.node.deferred for r in results) + results = run_dbt(["run"]) + assert len(results) == 2 + assert not any(r.node.deferred for r in results) + results = run_dbt(["test"]) + assert len(results) == 2 + + # copy files + self.copy_state() + + +class TestDeferStateUnsupportedCommands(BaseDeferState): + def test_unsupported_commands(self, project): + # make sure these commands don"t work with --defer + with pytest.raises(SystemExit): + run_dbt(["seed", "--defer"]) + + def test_no_state(self, project): + # no "state" files present, snapshot fails + with pytest.raises(DbtRuntimeError): + run_dbt(["snapshot", "--state", "state", "--defer"]) + + +class TestRunCompileState(BaseDeferState): + def test_run_and_compile_defer(self, project): + self.run_and_save_state() + + # defer test, it succeeds + results = run_dbt(["compile", "--state", "state", "--defer"]) + assert len(results.results) == 6 + assert results.results[0].node.name == "seed" + + +class TestSnapshotState(BaseDeferState): + def test_snapshot_state_defer(self, project): + self.run_and_save_state() + # snapshot succeeds without --defer + run_dbt(["snapshot"]) + # copy files + self.copy_state() + # defer test, it succeeds + run_dbt(["snapshot", "--state", "state", "--defer"]) + # favor_state test, it succeeds + run_dbt(["snapshot", "--state", "state", "--defer", "--favor-state"]) + + +class TestRunDeferState(BaseDeferState): + def test_run_and_defer(self, project, unique_schema, other_schema): + project.create_test_schema(other_schema) + self.run_and_save_state() + + # test tests first, because run will change things + # no state, wrong schema, failure. + run_dbt(["test", "--target", "otherschema"], expect_pass=False) + + # test generate docs + # no state, wrong schema, empty nodes + catalog = run_dbt(["docs", "generate", "--target", "otherschema"]) + assert not catalog.nodes + + # no state, run also fails + run_dbt(["run", "--target", "otherschema"], expect_pass=False) + + # defer test, it succeeds + results = run_dbt( + ["test", "-m", "view_model+", "--state", "state", "--defer", "--target", "otherschema"] + ) + + # defer docs generate with state, catalog refers schema from the happy times + catalog = run_dbt( + [ + "docs", + "generate", + "-m", + "view_model+", + "--state", + "state", + "--defer", + "--target", + "otherschema", + ] + ) + assert other_schema not in catalog.nodes["seed.test.seed"].metadata.schema + assert unique_schema in catalog.nodes["seed.test.seed"].metadata.schema + + # with state it should work though + results = run_dbt( + ["run", "-m", "view_model", "--state", "state", "--defer", "--target", "otherschema"] + ) + assert other_schema not in results[0].node.compiled_code + assert unique_schema in results[0].node.compiled_code + + with open("target/manifest.json") as fp: + data = json.load(fp) + assert data["nodes"]["seed.test.seed"]["deferred"] + + assert len(results) == 1 + + +class TestRunDeferStateChangedModel(BaseDeferState): + def test_run_defer_state_changed_model(self, project): + self.run_and_save_state() + + # change "view_model" + write_file(changed_view_model_sql, "models", "view_model.sql") + + # the sql here is just wrong, so it should fail + run_dbt( + ["run", "-m", "view_model", "--state", "state", "--defer", "--target", "otherschema"], + expect_pass=False, + ) + # but this should work since we just use the old happy model + run_dbt( + ["run", "-m", "table_model", "--state", "state", "--defer", "--target", "otherschema"], + expect_pass=True, + ) + + # change "ephemeral_model" + write_file(changed_ephemeral_model_sql, "models", "ephemeral_model.sql") + # this should fail because the table model refs a broken ephemeral + # model, which it should see + run_dbt( + ["run", "-m", "table_model", "--state", "state", "--defer", "--target", "otherschema"], + expect_pass=False, + ) + + +class TestRunDeferStateIFFNotExists(BaseDeferState): + def test_run_defer_iff_not_exists(self, project, unique_schema, other_schema): + project.create_test_schema(other_schema) + self.run_and_save_state() + + results = run_dbt(["seed", "--target", "otherschema"]) + assert len(results) == 1 + results = run_dbt(["run", "--state", "state", "--defer", "--target", "otherschema"]) + assert len(results) == 2 + + # because the seed now exists in our "other" schema, we should prefer it over the one + # available from state + assert other_schema in results[0].node.compiled_code + + # this time with --favor-state: even though the seed now exists in our "other" schema, + # we should still favor the one available from state + results = run_dbt( + ["run", "--state", "state", "--defer", "--favor-state", "--target", "otherschema"] + ) + assert len(results) == 2 + assert other_schema not in results[0].node.compiled_code + + +class TestDeferStateDeletedUpstream(BaseDeferState): + def test_run_defer_deleted_upstream(self, project, unique_schema, other_schema): + project.create_test_schema(other_schema) + self.run_and_save_state() + + # remove "ephemeral_model" + change "table_model" + rm_file("models", "ephemeral_model.sql") + write_file(changed_table_model_sql, "models", "table_model.sql") + + # ephemeral_model is now gone. previously this caused a + # keyerror (dbt#2875), now it should pass + run_dbt( + ["run", "-m", "view_model", "--state", "state", "--defer", "--target", "otherschema"], + expect_pass=True, + ) + + # despite deferral, we should use models just created in our schema + results = run_dbt(["test", "--state", "state", "--defer", "--target", "otherschema"]) + assert other_schema in results[0].node.compiled_code + + # this time with --favor-state: prefer the models in the "other" schema, even though they exist in ours + run_dbt( + [ + "run", + "-m", + "view_model", + "--state", + "state", + "--defer", + "--favor-state", + "--target", + "otherschema", + ], + expect_pass=True, + ) + results = run_dbt(["test", "--state", "state", "--defer", "--favor-state"]) + assert other_schema not in results[0].node.compiled_code diff --git a/tests/functional/defer_state/test_modified_state.py b/tests/functional/defer_state/test_modified_state.py new file mode 100644 index 00000000000..80e3d455da1 --- /dev/null +++ b/tests/functional/defer_state/test_modified_state.py @@ -0,0 +1,263 @@ +import os +import random +import shutil +import string + +import pytest + +from dbt.tests.util import run_dbt, update_config_file, write_file + +from dbt.exceptions import CompilationError + +from tests.functional.defer_state.fixtures import ( + seed_csv, + table_model_sql, + view_model_sql, + ephemeral_model_sql, + schema_yml, + exposures_yml, + macros_sql, + infinite_macros_sql, +) + + +class BaseModifiedState: + @pytest.fixture(scope="class") + def models(self): + return { + "table_model.sql": table_model_sql, + "view_model.sql": view_model_sql, + "ephemeral_model.sql": ephemeral_model_sql, + "schema.yml": schema_yml, + "exposures.yml": exposures_yml, + } + + @pytest.fixture(scope="class") + def macros(self): + return { + "macros.sql": macros_sql, + "infinite_macros.sql": infinite_macros_sql, + } + + @pytest.fixture(scope="class") + def seeds(self): + return { + "seed.csv": seed_csv, + } + + @property + def project_config_update(self): + return { + "seeds": { + "test": { + "quote_columns": False, + } + } + } + + def copy_state(self): + if not os.path.exists("state"): + os.makedirs("state") + shutil.copyfile("target/manifest.json", "state/manifest.json") + + def run_and_save_state(self): + run_dbt(["seed"]) + run_dbt(["run"]) + self.copy_state() + + +class TestChangedSeedContents(BaseModifiedState): + def test_changed_seed_contents_state(self, project): + self.run_and_save_state() + results = run_dbt( + ["ls", "--resource-type", "seed", "--select", "state:modified", "--state", "./state"], + expect_pass=True, + ) + assert len(results) == 0 + + # add a new row to the seed + changed_seed_contents = seed_csv + "\n" + "3,carl" + write_file(changed_seed_contents, "seeds", "seed.csv") + + results = run_dbt( + ["ls", "--resource-type", "seed", "--select", "state:modified", "--state", "./state"] + ) + assert len(results) == 1 + assert results[0] == "test.seed" + + results = run_dbt(["ls", "--select", "state:modified", "--state", "./state"]) + assert len(results) == 1 + assert results[0] == "test.seed" + + results = run_dbt(["ls", "--select", "state:modified+", "--state", "./state"]) + assert len(results) == 7 + assert set(results) == { + "test.seed", + "test.table_model", + "test.view_model", + "test.ephemeral_model", + "test.not_null_view_model_id", + "test.unique_view_model_id", + "exposure:test.my_exposure", + } + + shutil.rmtree("./state") + self.copy_state() + + # make a very big seed + # assume each line is ~2 bytes + len(name) + target_size = 1 * 1024 * 1024 + line_size = 64 + num_lines = target_size // line_size + maxlines = num_lines + 4 + seed_lines = [seed_csv] + for idx in range(4, maxlines): + value = "".join(random.choices(string.ascii_letters, k=62)) + seed_lines.append(f"{idx},{value}") + seed_contents = "\n".join(seed_lines) + write_file(seed_contents, "seeds", "seed.csv") + + # now if we run again, we should get a warning + results = run_dbt( + ["ls", "--resource-type", "seed", "--select", "state:modified", "--state", "./state"] + ) + assert len(results) == 1 + assert results[0] == "test.seed" + + with pytest.raises(CompilationError) as exc: + run_dbt( + [ + "--warn-error", + "ls", + "--resource-type", + "seed", + "--select", + "state:modified", + "--state", + "./state", + ] + ) + assert ">1MB" in str(exc.value) + + shutil.rmtree("./state") + self.copy_state() + + # once it"s in path mode, we don"t mark it as modified if it changes + write_file(seed_contents + "\n1,test", "seeds", "seed.csv") + + results = run_dbt( + ["ls", "--resource-type", "seed", "--select", "state:modified", "--state", "./state"], + expect_pass=True, + ) + assert len(results) == 0 + + +class TestChangedSeedConfig(BaseModifiedState): + def test_changed_seed_config(self, project): + self.run_and_save_state() + results = run_dbt( + ["ls", "--resource-type", "seed", "--select", "state:modified", "--state", "./state"], + expect_pass=True, + ) + assert len(results) == 0 + + update_config_file({"seeds": {"test": {"quote_columns": False}}}, "dbt_project.yml") + + # quoting change -> seed changed + results = run_dbt( + ["ls", "--resource-type", "seed", "--select", "state:modified", "--state", "./state"] + ) + assert len(results) == 1 + assert results[0] == "test.seed" + + +class TestUnrenderedConfigSame(BaseModifiedState): + def test_unrendered_config_same(self, project): + self.run_and_save_state() + results = run_dbt( + ["ls", "--resource-type", "model", "--select", "state:modified", "--state", "./state"], + expect_pass=True, + ) + assert len(results) == 0 + + # although this is the default value, dbt will recognize it as a change + # for previously-unconfigured models, because it"s been explicitly set + update_config_file({"models": {"test": {"materialized": "view"}}}, "dbt_project.yml") + results = run_dbt( + ["ls", "--resource-type", "model", "--select", "state:modified", "--state", "./state"] + ) + assert len(results) == 1 + assert results[0] == "test.view_model" + + +class TestChangedModelContents(BaseModifiedState): + def test_changed_model_contents(self, project): + self.run_and_save_state() + results = run_dbt(["run", "--models", "state:modified", "--state", "./state"]) + assert len(results) == 0 + + table_model_update = """ + {{ config(materialized="table") }} + + select * from {{ ref("seed") }} + """ + + write_file(table_model_update, "models", "table_model.sql") + + results = run_dbt(["run", "--models", "state:modified", "--state", "./state"]) + assert len(results) == 1 + assert results[0].node.name == "table_model" + + +class TestNewMacro(BaseModifiedState): + def test_new_macro(self, project): + self.run_and_save_state() + + new_macro = """ + {% macro my_other_macro() %} + {% endmacro %} + """ + + # add a new macro to a new file + write_file(new_macro, "macros", "second_macro.sql") + + results = run_dbt(["run", "--models", "state:modified", "--state", "./state"]) + assert len(results) == 0 + + os.remove("macros/second_macro.sql") + # add a new macro to the existing file + with open("macros/macros.sql", "a") as fp: + fp.write(new_macro) + + results = run_dbt(["run", "--models", "state:modified", "--state", "./state"]) + assert len(results) == 0 + + +class TestChangedMacroContents(BaseModifiedState): + def test_changed_macro_contents(self, project): + self.run_and_save_state() + + # modify an existing macro + updated_macro = """ + {% macro my_macro() %} + {% do log("in a macro", info=True) %} + {% endmacro %} + """ + write_file(updated_macro, "macros", "macros.sql") + + # table_model calls this macro + results = run_dbt(["run", "--models", "state:modified", "--state", "./state"]) + assert len(results) == 1 + + +class TestChangedExposure(BaseModifiedState): + def test_changed_exposure(self, project): + self.run_and_save_state() + + # add an "owner.name" to existing exposure + updated_exposure = exposures_yml + "\n name: John Doe\n" + write_file(updated_exposure, "models", "exposures.yml") + + results = run_dbt(["run", "--models", "+state:modified", "--state", "./state"]) + assert len(results) == 1 + assert results[0].node.name == "view_model" diff --git a/tests/functional/defer_state/test_run_results_state.py b/tests/functional/defer_state/test_run_results_state.py new file mode 100644 index 00000000000..aa1dc549272 --- /dev/null +++ b/tests/functional/defer_state/test_run_results_state.py @@ -0,0 +1,494 @@ +import os +import shutil + +import pytest + +from dbt.tests.util import run_dbt, write_file + +from tests.functional.defer_state.fixtures import ( + seed_csv, + table_model_sql, + view_model_sql, + ephemeral_model_sql, + schema_yml, + exposures_yml, + macros_sql, + infinite_macros_sql, +) + + +class BaseRunResultsState: + @pytest.fixture(scope="class") + def models(self): + return { + "table_model.sql": table_model_sql, + "view_model.sql": view_model_sql, + "ephemeral_model.sql": ephemeral_model_sql, + "schema.yml": schema_yml, + "exposures.yml": exposures_yml, + } + + @pytest.fixture(scope="class") + def macros(self): + return { + "macros.sql": macros_sql, + "infinite_macros.sql": infinite_macros_sql, + } + + @pytest.fixture(scope="class") + def seeds(self): + return { + "seed.csv": seed_csv, + } + + @property + def project_config_update(self): + return { + "seeds": { + "test": { + "quote_columns": False, + } + } + } + + def clear_state(self): + shutil.rmtree("./state") + + def copy_state(self): + if not os.path.exists("state"): + os.makedirs("state") + shutil.copyfile("target/manifest.json", "state/manifest.json") + shutil.copyfile("target/run_results.json", "state/run_results.json") + + def run_and_save_state(self): + run_dbt(["build"]) + self.copy_state() + + def rebuild_run_dbt(self, expect_pass=True): + self.clear_state() + run_dbt(["build"], expect_pass=expect_pass) + self.copy_state() + + def update_view_model_bad_sql(self): + # update view model to generate a failure case + not_unique_sql = "select * from forced_error" + write_file(not_unique_sql, "models", "view_model.sql") + + def update_view_model_failing_tests(self, with_dupes=True, with_nulls=False): + # test failure on build tests + # fail the unique test + select_1 = "select 1 as id" + select_stmts = [select_1] + if with_dupes: + select_stmts.append(select_1) + if with_nulls: + select_stmts.append("select null as id") + failing_tests_sql = " union all ".join(select_stmts) + write_file(failing_tests_sql, "models", "view_model.sql") + + def update_unique_test_severity_warn(self): + # change the unique test severity from error to warn and reuse the same view_model.sql changes above + new_config = schema_yml.replace("error", "warn") + write_file(new_config, "models", "schema.yml") + + +class TestSeedRunResultsState(BaseRunResultsState): + def test_seed_run_results_state(self, project): + self.run_and_save_state() + self.clear_state() + run_dbt(["seed"]) + self.copy_state() + results = run_dbt( + ["ls", "--resource-type", "seed", "--select", "result:success", "--state", "./state"], + expect_pass=True, + ) + assert len(results) == 1 + assert results[0] == "test.seed" + + results = run_dbt(["ls", "--select", "result:success", "--state", "./state"]) + assert len(results) == 1 + assert results[0] == "test.seed" + + results = run_dbt(["ls", "--select", "result:success+", "--state", "./state"]) + assert len(results) == 7 + assert set(results) == { + "test.seed", + "test.table_model", + "test.view_model", + "test.ephemeral_model", + "test.not_null_view_model_id", + "test.unique_view_model_id", + "exposure:test.my_exposure", + } + + # add a new faulty row to the seed + changed_seed_contents = seed_csv + "\n" + "\\\3,carl" + write_file(changed_seed_contents, "seeds", "seed.csv") + + self.clear_state() + run_dbt(["seed"], expect_pass=False) + self.copy_state() + + results = run_dbt( + ["ls", "--resource-type", "seed", "--select", "result:error", "--state", "./state"], + expect_pass=True, + ) + assert len(results) == 1 + assert results[0] == "test.seed" + + results = run_dbt(["ls", "--select", "result:error", "--state", "./state"]) + assert len(results) == 1 + assert results[0] == "test.seed" + + results = run_dbt(["ls", "--select", "result:error+", "--state", "./state"]) + assert len(results) == 7 + assert set(results) == { + "test.seed", + "test.table_model", + "test.view_model", + "test.ephemeral_model", + "test.not_null_view_model_id", + "test.unique_view_model_id", + "exposure:test.my_exposure", + } + + +class TestBuildRunResultsState(BaseRunResultsState): + def test_build_run_results_state(self, project): + self.run_and_save_state() + results = run_dbt(["build", "--select", "result:error", "--state", "./state"]) + assert len(results) == 0 + + self.update_view_model_bad_sql() + self.rebuild_run_dbt(expect_pass=False) + + results = run_dbt( + ["build", "--select", "result:error", "--state", "./state"], expect_pass=False + ) + assert len(results) == 3 + nodes = set([elem.node.name for elem in results]) + assert nodes == {"view_model", "not_null_view_model_id", "unique_view_model_id"} + + results = run_dbt(["ls", "--select", "result:error", "--state", "./state"]) + assert len(results) == 3 + assert set(results) == { + "test.view_model", + "test.not_null_view_model_id", + "test.unique_view_model_id", + } + + results = run_dbt( + ["build", "--select", "result:error+", "--state", "./state"], expect_pass=False + ) + assert len(results) == 4 + nodes = set([elem.node.name for elem in results]) + assert nodes == { + "table_model", + "view_model", + "not_null_view_model_id", + "unique_view_model_id", + } + + results = run_dbt(["ls", "--select", "result:error+", "--state", "./state"]) + assert len(results) == 6 # includes exposure + assert set(results) == { + "test.table_model", + "test.view_model", + "test.ephemeral_model", + "test.not_null_view_model_id", + "test.unique_view_model_id", + "exposure:test.my_exposure", + } + + self.update_view_model_failing_tests() + self.rebuild_run_dbt(expect_pass=False) + + results = run_dbt( + ["build", "--select", "result:fail", "--state", "./state"], expect_pass=False + ) + assert len(results) == 1 + assert results[0].node.name == "unique_view_model_id" + + results = run_dbt(["ls", "--select", "result:fail", "--state", "./state"]) + assert len(results) == 1 + assert results[0] == "test.unique_view_model_id" + + results = run_dbt( + ["build", "--select", "result:fail+", "--state", "./state"], expect_pass=False + ) + assert len(results) == 2 + nodes = set([elem.node.name for elem in results]) + assert nodes == {"table_model", "unique_view_model_id"} + + results = run_dbt(["ls", "--select", "result:fail+", "--state", "./state"]) + assert len(results) == 1 + assert set(results) == {"test.unique_view_model_id"} + + self.update_unique_test_severity_warn() + self.rebuild_run_dbt(expect_pass=True) + + results = run_dbt( + ["build", "--select", "result:warn", "--state", "./state"], expect_pass=True + ) + assert len(results) == 1 + assert results[0].node.name == "unique_view_model_id" + + results = run_dbt(["ls", "--select", "result:warn", "--state", "./state"]) + assert len(results) == 1 + assert results[0] == "test.unique_view_model_id" + + results = run_dbt( + ["build", "--select", "result:warn+", "--state", "./state"], expect_pass=True + ) + assert len(results) == 2 # includes table_model to be run + nodes = set([elem.node.name for elem in results]) + assert nodes == {"table_model", "unique_view_model_id"} + + results = run_dbt(["ls", "--select", "result:warn+", "--state", "./state"]) + assert len(results) == 1 + assert set(results) == {"test.unique_view_model_id"} + + +class TestRunRunResultsState(BaseRunResultsState): + def test_run_run_results_state(self, project): + self.run_and_save_state() + results = run_dbt( + ["run", "--select", "result:success", "--state", "./state"], expect_pass=True + ) + assert len(results) == 2 + assert results[0].node.name == "view_model" + assert results[1].node.name == "table_model" + + # clear state and rerun upstream view model to test + operator + self.clear_state() + run_dbt(["run", "--select", "view_model"], expect_pass=True) + self.copy_state() + results = run_dbt( + ["run", "--select", "result:success+", "--state", "./state"], expect_pass=True + ) + assert len(results) == 2 + assert results[0].node.name == "view_model" + assert results[1].node.name == "table_model" + + # check we are starting from a place with 0 errors + results = run_dbt(["run", "--select", "result:error", "--state", "./state"]) + assert len(results) == 0 + + self.update_view_model_bad_sql() + self.clear_state() + run_dbt(["run"], expect_pass=False) + self.copy_state() + + # test single result selector on error + results = run_dbt( + ["run", "--select", "result:error", "--state", "./state"], expect_pass=False + ) + assert len(results) == 1 + assert results[0].node.name == "view_model" + + # test + operator selection on error + results = run_dbt( + ["run", "--select", "result:error+", "--state", "./state"], expect_pass=False + ) + assert len(results) == 2 + assert results[0].node.name == "view_model" + assert results[1].node.name == "table_model" + + # single result selector on skipped. Expect this to pass becase underlying view already defined above + results = run_dbt( + ["run", "--select", "result:skipped", "--state", "./state"], expect_pass=True + ) + assert len(results) == 1 + assert results[0].node.name == "table_model" + + # add a downstream model that depends on table_model for skipped+ selector + downstream_model_sql = "select * from {{ref('table_model')}}" + write_file(downstream_model_sql, "models", "table_model_downstream.sql") + + self.clear_state() + run_dbt(["run"], expect_pass=False) + self.copy_state() + + results = run_dbt( + ["run", "--select", "result:skipped+", "--state", "./state"], expect_pass=True + ) + assert len(results) == 2 + assert results[0].node.name == "table_model" + assert results[1].node.name == "table_model_downstream" + + +class TestTestRunResultsState(BaseRunResultsState): + def test_test_run_results_state(self, project): + self.run_and_save_state() + # run passed nodes + results = run_dbt( + ["test", "--select", "result:pass", "--state", "./state"], expect_pass=True + ) + assert len(results) == 2 + nodes = set([elem.node.name for elem in results]) + assert nodes == {"unique_view_model_id", "not_null_view_model_id"} + + # run passed nodes with + operator + results = run_dbt( + ["test", "--select", "result:pass+", "--state", "./state"], expect_pass=True + ) + assert len(results) == 2 + nodes = set([elem.node.name for elem in results]) + assert nodes == {"unique_view_model_id", "not_null_view_model_id"} + + self.update_view_model_failing_tests() + self.rebuild_run_dbt(expect_pass=False) + + # test with failure selector + results = run_dbt( + ["test", "--select", "result:fail", "--state", "./state"], expect_pass=False + ) + assert len(results) == 1 + assert results[0].node.name == "unique_view_model_id" + + # test with failure selector and + operator + results = run_dbt( + ["test", "--select", "result:fail+", "--state", "./state"], expect_pass=False + ) + assert len(results) == 1 + assert results[0].node.name == "unique_view_model_id" + + self.update_unique_test_severity_warn() + # rebuild - expect_pass = True because we changed the error to a warning this time around + self.rebuild_run_dbt(expect_pass=True) + + # test with warn selector + results = run_dbt( + ["test", "--select", "result:warn", "--state", "./state"], expect_pass=True + ) + assert len(results) == 1 + assert results[0].node.name == "unique_view_model_id" + + # test with warn selector and + operator + results = run_dbt( + ["test", "--select", "result:warn+", "--state", "./state"], expect_pass=True + ) + assert len(results) == 1 + assert results[0].node.name == "unique_view_model_id" + + +class TestConcurrentSelectionRunResultsState(BaseRunResultsState): + def test_concurrent_selection_run_run_results_state(self, project): + self.run_and_save_state() + results = run_dbt( + ["run", "--select", "state:modified+", "result:error+", "--state", "./state"] + ) + assert len(results) == 0 + + self.update_view_model_bad_sql() + self.clear_state() + run_dbt(["run"], expect_pass=False) + self.copy_state() + + # add a new failing dbt model + bad_sql = "select * from forced_error" + write_file(bad_sql, "models", "table_model_modified_example.sql") + + results = run_dbt( + ["run", "--select", "state:modified+", "result:error+", "--state", "./state"], + expect_pass=False, + ) + assert len(results) == 3 + nodes = set([elem.node.name for elem in results]) + assert nodes == {"view_model", "table_model_modified_example", "table_model"} + + +class TestConcurrentSelectionTestRunResultsState(BaseRunResultsState): + def test_concurrent_selection_test_run_results_state(self, project): + self.run_and_save_state() + # create failure test case for result:fail selector + self.update_view_model_failing_tests(with_nulls=True) + + # run dbt build again to trigger test errors + self.rebuild_run_dbt(expect_pass=False) + + # get the failures from + results = run_dbt( + [ + "test", + "--select", + "result:fail", + "--exclude", + "not_null_view_model_id", + "--state", + "./state", + ], + expect_pass=False, + ) + assert len(results) == 1 + nodes = set([elem.node.name for elem in results]) + assert nodes == {"unique_view_model_id"} + + +class TestConcurrentSelectionBuildRunResultsState(BaseRunResultsState): + def test_concurrent_selectors_build_run_results_state(self, project): + self.run_and_save_state() + results = run_dbt( + ["build", "--select", "state:modified+", "result:error+", "--state", "./state"] + ) + assert len(results) == 0 + + self.update_view_model_bad_sql() + self.rebuild_run_dbt(expect_pass=False) + + # add a new failing dbt model + bad_sql = "select * from forced_error" + write_file(bad_sql, "models", "table_model_modified_example.sql") + + results = run_dbt( + ["build", "--select", "state:modified+", "result:error+", "--state", "./state"], + expect_pass=False, + ) + assert len(results) == 5 + nodes = set([elem.node.name for elem in results]) + assert nodes == { + "table_model_modified_example", + "view_model", + "table_model", + "not_null_view_model_id", + "unique_view_model_id", + } + + self.update_view_model_failing_tests() + + # create error model case for result:error selector + more_bad_sql = "select 1 as id from not_exists" + write_file(more_bad_sql, "models", "error_model.sql") + + # create something downstream from the error model to rerun + downstream_model_sql = "select * from {{ ref('error_model') }} )" + write_file(downstream_model_sql, "models", "downstream_of_error_model.sql") + + # regenerate build state + self.rebuild_run_dbt(expect_pass=False) + + # modify model again to trigger the state:modified selector + bad_again_sql = "select * from forced_anothererror" + write_file(bad_again_sql, "models", "table_model_modified_example.sql") + + results = run_dbt( + [ + "build", + "--select", + "state:modified+", + "result:error+", + "result:fail+", + "--state", + "./state", + ], + expect_pass=False, + ) + assert len(results) == 5 + nodes = set([elem.node.name for elem in results]) + assert nodes == { + "error_model", + "downstream_of_error_model", + "table_model_modified_example", + "table_model", + "unique_view_model_id", + }