Skip to content

Commit

Permalink
favor-state-node
Browse files Browse the repository at this point in the history
  • Loading branch information
josephberni committed Sep 30, 2022
1 parent f5a94fc commit 5785a81
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 1 deletion.
8 changes: 8 additions & 0 deletions .changes/unreleased/Features-20220408-165459.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
kind: Features
body: Added favor-state flag to optionally favor state nodes even if unselected node
exists
time: 2022-04-08T16:54:59.696564+01:00
custom:
Author: daniel-murray & josephberni
Issue: "2968"
PR: "5859"
6 changes: 5 additions & 1 deletion core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,7 @@ def merge_from_artifact(
adapter,
other: "WritableManifest",
selected: AbstractSet[UniqueID],
favor_state: bool,
) -> None:
"""Given the selected unique IDs and a writable manifest, update this
manifest by replacing any unselected nodes with their counterpart.
Expand All @@ -1022,7 +1023,10 @@ def merge_from_artifact(
node.resource_type in refables
and not node.is_ephemeral
and unique_id not in selected
and not adapter.get_relation(current.database, current.schema, current.identifier)
and (
not adapter.get_relation(current.database, current.schema, current.identifier)
or favor_state
)
):
merged.add(unique_id)
self.nodes[unique_id] = node.replace(deferred=True)
Expand Down
1 change: 1 addition & 0 deletions core/dbt/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def env_set_path(key: str) -> Optional[Path]:

MACRO_DEBUGGING = env_set_truthy("DBT_MACRO_DEBUGGING")
DEFER_MODE = env_set_truthy("DBT_DEFER_TO_STATE")
FAVOR_STATE_MODE = env_set_truthy("DBT_FAVOR_STATE_STATE")
ARTIFACT_STATE_PATH = env_set_path("DBT_ARTIFACT_STATE_PATH")
ENABLE_LEGACY_LOGGER = env_set_truthy("DBT_ENABLE_LEGACY_LOGGER")

Expand Down
16 changes: 16 additions & 0 deletions core/dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,20 @@ def _add_defer_argument(*subparsers):
)


def _add_favor_state_argument(*subparsers):
for sub in subparsers:
sub.add_optional_argument_inverse(
"--favor-state",
enable_help="""
If set, defer to the state variable for resolving unselected nodes, even if node exist as a database object in the current environment.
""",
disable_help="""
If defer is set, expect standard defer behaviour.
""",
default=flags.FAVOR_STATE_MODE,
)


def _build_run_subparser(subparsers, base_subparser):
run_sub = subparsers.add_parser(
"run",
Expand Down Expand Up @@ -1168,6 +1182,8 @@ def parse_args(args, cls=DBTArgumentParser):
_add_selection_arguments(run_sub, compile_sub, generate_sub, test_sub, snapshot_sub, seed_sub)
# --defer
_add_defer_argument(run_sub, test_sub, build_sub, snapshot_sub, compile_sub)
# --favor-state
_add_favor_state_argument(run_sub, test_sub, build_sub, snapshot_sub)
# --full-refresh
_add_table_mutability_arguments(run_sub, compile_sub, build_sub)

Expand Down
124 changes: 124 additions & 0 deletions test/integration/062_defer_state_tests/test_defer_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ def run_and_snapshot_defer(self):
# defer test, it succeeds
results = self.run_dbt(['snapshot', '--state', 'state', '--defer'])

# favor_state test, it succeeds
results = self.run_dbt(['snapshot', '--state', 'state', '--defer', '--favor-state'])

def run_and_defer(self):
results = self.run_dbt(['seed'])
assert len(results) == 1
Expand Down Expand Up @@ -123,6 +126,40 @@ def run_and_defer(self):

assert len(results) == 1

def run_and_defer_favor_state(self):
results = self.run_dbt(['seed'])
assert len(results) == 1
assert not any(r.node.deferred for r in results)
results = self.run_dbt(['run'])
assert len(results) == 2
assert not any(r.node.deferred for r in results)
results = self.run_dbt(['test'])
assert len(results) == 2

# copy files over from the happy times when we had a good target
self.copy_state()

# test tests first, because run will change things
# no state, wrong schema, failure.
self.run_dbt(['test', '--target', 'otherschema'], expect_pass=False)

# no state, run also fails
self.run_dbt(['run', '--target', 'otherschema'], expect_pass=False)

# defer test, it succeeds
results = self.run_dbt(['test', '-m', 'view_model+', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'])

# with state it should work though
results = self.run_dbt(['run', '-m', 'view_model', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'])
assert self.other_schema not in results[0].node.compiled_sql
assert self.unique_schema() in results[0].node.compiled_sql

with open('target/manifest.json') as fp:
data = json.load(fp)
assert data['nodes']['seed.test.seed']['deferred']

assert len(results) == 1

def run_switchdirs_defer(self):
results = self.run_dbt(['seed'])
assert len(results) == 1
Expand Down Expand Up @@ -152,6 +189,35 @@ def run_switchdirs_defer(self):
expect_pass=False,
)

def run_switchdirs_defer_favor_state(self):
results = self.run_dbt(['seed'])
assert len(results) == 1
results = self.run_dbt(['run'])
assert len(results) == 2

# copy files over from the happy times when we had a good target
self.copy_state()

self.use_default_project({'model-paths': ['changed_models']})
# the sql here is just wrong, so it should fail
self.run_dbt(
['run', '-m', 'view_model', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'],
expect_pass=False,
)
# but this should work since we just use the old happy model
self.run_dbt(
['run', '-m', 'table_model', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'],
expect_pass=True,
)

self.use_default_project({'model-paths': ['changed_models_bad']})
# this should fail because the table model refs a broken ephemeral
# model, which it should see
self.run_dbt(
['run', '-m', 'table_model', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'],
expect_pass=False,
)

def run_defer_iff_not_exists(self):
results = self.run_dbt(['seed', '--target', 'otherschema'])
assert len(results) == 1
Expand All @@ -169,6 +235,23 @@ def run_defer_iff_not_exists(self):
assert self.other_schema not in results[0].node.compiled_code
assert self.unique_schema() in results[0].node.compiled_code

def run_defer_iff_not_exists_favor_state(self):
results = self.run_dbt(['seed'])
assert len(results) == 1
results = self.run_dbt(['run'])
assert len(results) == 2

# copy files over from the happy times when we had a good target
self.copy_state()
results = self.run_dbt(['seed', '--target', 'otherschema'])
assert len(results) == 1
results = self.run_dbt(['run', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'])
assert len(results) == 2

# because the seed exists in other schema, we should defer it
assert self.other_schema not in results[0].node.compiled_sql
assert self.unique_schema() in results[0].node.compiled_sql

def run_defer_deleted_upstream(self):
results = self.run_dbt(['seed'])
assert len(results) == 1
Expand All @@ -191,6 +274,27 @@ def run_defer_deleted_upstream(self):
assert self.other_schema not in results[0].node.compiled_code
assert self.unique_schema() in results[0].node.compiled_code

def run_defer_deleted_upstream_favor_state(self):
results = self.run_dbt(['seed'])
assert len(results) == 1
results = self.run_dbt(['run'])
assert len(results) == 2

# copy files over from the happy times when we had a good target
self.copy_state()

self.use_default_project({'model-paths': ['changed_models_missing']})

self.run_dbt(
['run', '-m', 'view_model', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'],
expect_pass=True,
)

# despite deferral, test should use models just created in our schema
results = self.run_dbt(['test', '--state', 'state', '--defer', '--favor-state'])
assert self.other_schema not in results[0].node.compiled_sql
assert self.unique_schema() in results[0].node.compiled_sql

@use_profile('postgres')
def test_postgres_state_changetarget(self):
self.run_and_defer()
Expand All @@ -199,18 +303,38 @@ def test_postgres_state_changetarget(self):
with pytest.raises(SystemExit):
self.run_dbt(['seed', '--defer'])

@use_profile('postgres')
def test_postgres_state_changetarget_favor_state(self):
self.run_and_defer_favor_state()

# make sure these commands don't work with --defer
with pytest.raises(SystemExit):
self.run_dbt(['seed', '--defer'])

@use_profile('postgres')
def test_postgres_state_changedir(self):
self.run_switchdirs_defer()

@use_profile('postgres')
def test_postgres_state_changedir_favor_state(self):
self.run_switchdirs_defer_favor_state()

@use_profile('postgres')
def test_postgres_state_defer_iffnotexists(self):
self.run_defer_iff_not_exists()

@use_profile('postgres')
def test_postgres_state_defer_iffnotexists_favor_state(self):
self.run_defer_iff_not_exists_favor_state()

@use_profile('postgres')
def test_postgres_state_defer_deleted_upstream(self):
self.run_defer_deleted_upstream()

@use_profile('postgres')
def test_postgres_state_defer_deleted_upstream_favor_state(self):
self.run_defer_deleted_upstream_favor_state()

@use_profile('postgres')
def test_postgres_state_snapshot_defer(self):
self.run_and_snapshot_defer()
Expand Down

0 comments on commit 5785a81

Please sign in to comment.