Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/favor-state-node #5859

Merged
merged 6 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changes/unreleased/Features-20220408-165459.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
kind: Features
body: Added favor-state flag to optionally favor state nodes even if unselected node
exists
time: 2022-04-08T16:54:59.696564+01:00
custom:
Author: daniel-murray josephberni
Issue: "2968"
PR: "5859"
6 changes: 5 additions & 1 deletion core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,7 @@ def merge_from_artifact(
adapter,
other: "WritableManifest",
selected: AbstractSet[UniqueID],
favor_state: bool = False,
) -> None:
"""Given the selected unique IDs and a writable manifest, update this
manifest by replacing any unselected nodes with their counterpart.
Expand All @@ -1022,7 +1023,10 @@ def merge_from_artifact(
node.resource_type in refables
and not node.is_ephemeral
and unique_id not in selected
and not adapter.get_relation(current.database, current.schema, current.identifier)
and (
not adapter.get_relation(current.database, current.schema, current.identifier)
or favor_state
)
):
merged.add(unique_id)
self.nodes[unique_id] = node.replace(deferred=True)
Expand Down
1 change: 1 addition & 0 deletions core/dbt/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def env_set_path(key: str) -> Optional[Path]:

MACRO_DEBUGGING = env_set_truthy("DBT_MACRO_DEBUGGING")
DEFER_MODE = env_set_truthy("DBT_DEFER_TO_STATE")
FAVOR_STATE_MODE = env_set_truthy("DBT_FAVOR_STATE_STATE")
ARTIFACT_STATE_PATH = env_set_path("DBT_ARTIFACT_STATE_PATH")
ENABLE_LEGACY_LOGGER = env_set_truthy("DBT_ENABLE_LEGACY_LOGGER")

Expand Down
16 changes: 16 additions & 0 deletions core/dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,20 @@ def _add_defer_argument(*subparsers):
)


def _add_favor_state_argument(*subparsers):
for sub in subparsers:
sub.add_optional_argument_inverse(
"--favor-state",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iknox-fa Do you have a sense of how we want to handle new flag additions, while we're migrating from old to new CLI? Since the new CLI is checked into main, we could ask folks to add it here — but we'll probably also need to check for parity again before formally cutting over in a few months

Copy link
Contributor

@iknox-fa iknox-fa Sep 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed this the other day @jtcohen6. I don't have a strong feeling about this-- if we add this flag to the new CLI in this PR I'd be thrilled, but ultimately the "last stop" for this would be when we implement the Click version of the run task (#5551).

enable_help="""
If set, defer to the state variable for resolving unselected nodes, even if node exist as a database object in the current environment.
""",
disable_help="""
If defer is set, expect standard defer behaviour.
""",
default=flags.FAVOR_STATE_MODE,
)


def _build_run_subparser(subparsers, base_subparser):
run_sub = subparsers.add_parser(
"run",
Expand Down Expand Up @@ -1168,6 +1182,8 @@ def parse_args(args, cls=DBTArgumentParser):
_add_selection_arguments(run_sub, compile_sub, generate_sub, test_sub, snapshot_sub, seed_sub)
# --defer
_add_defer_argument(run_sub, test_sub, build_sub, snapshot_sub, compile_sub)
# --favor-state
_add_favor_state_argument(run_sub, test_sub, build_sub, snapshot_sub)
# --full-refresh
_add_table_mutability_arguments(run_sub, compile_sub, build_sub)

Expand Down
124 changes: 124 additions & 0 deletions test/integration/062_defer_state_tests/test_defer_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ def run_and_snapshot_defer(self):
# defer test, it succeeds
results = self.run_dbt(['snapshot', '--state', 'state', '--defer'])

# favor_state test, it succeeds
results = self.run_dbt(['snapshot', '--state', 'state', '--defer', '--favor-state'])

def run_and_defer(self):
results = self.run_dbt(['seed'])
assert len(results) == 1
Expand Down Expand Up @@ -123,6 +126,40 @@ def run_and_defer(self):

assert len(results) == 1

def run_and_defer_favor_state(self):
results = self.run_dbt(['seed'])
assert len(results) == 1
assert not any(r.node.deferred for r in results)
results = self.run_dbt(['run'])
assert len(results) == 2
assert not any(r.node.deferred for r in results)
results = self.run_dbt(['test'])
assert len(results) == 2

# copy files over from the happy times when we had a good target
self.copy_state()

# test tests first, because run will change things
# no state, wrong schema, failure.
self.run_dbt(['test', '--target', 'otherschema'], expect_pass=False)

# no state, run also fails
self.run_dbt(['run', '--target', 'otherschema'], expect_pass=False)

# defer test, it succeeds
results = self.run_dbt(['test', '-m', 'view_model+', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'])

# with state it should work though
results = self.run_dbt(['run', '-m', 'view_model', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'])
assert self.other_schema not in results[0].node.compiled_code
assert self.unique_schema() in results[0].node.compiled_code

with open('target/manifest.json') as fp:
data = json.load(fp)
assert data['nodes']['seed.test.seed']['deferred']

assert len(results) == 1

def run_switchdirs_defer(self):
results = self.run_dbt(['seed'])
assert len(results) == 1
Expand Down Expand Up @@ -152,6 +189,35 @@ def run_switchdirs_defer(self):
expect_pass=False,
)

def run_switchdirs_defer_favor_state(self):
results = self.run_dbt(['seed'])
assert len(results) == 1
results = self.run_dbt(['run'])
assert len(results) == 2

# copy files over from the happy times when we had a good target
self.copy_state()

self.use_default_project({'model-paths': ['changed_models']})
# the sql here is just wrong, so it should fail
self.run_dbt(
['run', '-m', 'view_model', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'],
expect_pass=False,
)
# but this should work since we just use the old happy model
self.run_dbt(
['run', '-m', 'table_model', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'],
expect_pass=True,
)

self.use_default_project({'model-paths': ['changed_models_bad']})
# this should fail because the table model refs a broken ephemeral
# model, which it should see
self.run_dbt(
['run', '-m', 'table_model', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'],
expect_pass=False,
)

def run_defer_iff_not_exists(self):
results = self.run_dbt(['seed', '--target', 'otherschema'])
assert len(results) == 1
Expand All @@ -169,6 +235,23 @@ def run_defer_iff_not_exists(self):
assert self.other_schema not in results[0].node.compiled_code
assert self.unique_schema() in results[0].node.compiled_code

def run_defer_iff_not_exists_favor_state(self):
results = self.run_dbt(['seed'])
assert len(results) == 1
results = self.run_dbt(['run'])
assert len(results) == 2

# copy files over from the happy times when we had a good target
self.copy_state()
results = self.run_dbt(['seed'])
assert len(results) == 1
results = self.run_dbt(['run', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'])
assert len(results) == 2

# because the seed exists in other schema, we should defer it
assert self.other_schema not in results[0].node.compiled_code
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is the line causing the tests to fail 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was creating the seed in the other_schema and checking it wasn't there, resolved this. If all OK will squash commits.

assert self.unique_schema() in results[0].node.compiled_code

def run_defer_deleted_upstream(self):
results = self.run_dbt(['seed'])
assert len(results) == 1
Expand All @@ -191,6 +274,27 @@ def run_defer_deleted_upstream(self):
assert self.other_schema not in results[0].node.compiled_code
assert self.unique_schema() in results[0].node.compiled_code

def run_defer_deleted_upstream_favor_state(self):
results = self.run_dbt(['seed'])
assert len(results) == 1
results = self.run_dbt(['run'])
assert len(results) == 2

# copy files over from the happy times when we had a good target
self.copy_state()

self.use_default_project({'model-paths': ['changed_models_missing']})

self.run_dbt(
['run', '-m', 'view_model', '--state', 'state', '--defer', '--favor-state', '--target', 'otherschema'],
expect_pass=True,
)

# despite deferral, test should use models just created in our schema
results = self.run_dbt(['test', '--state', 'state', '--defer', '--favor-state'])
assert self.other_schema not in results[0].node.compiled_code
assert self.unique_schema() in results[0].node.compiled_code

@use_profile('postgres')
def test_postgres_state_changetarget(self):
self.run_and_defer()
Expand All @@ -199,18 +303,38 @@ def test_postgres_state_changetarget(self):
with pytest.raises(SystemExit):
self.run_dbt(['seed', '--defer'])

@use_profile('postgres')
def test_postgres_state_changetarget_favor_state(self):
self.run_and_defer_favor_state()

# make sure these commands don't work with --defer
with pytest.raises(SystemExit):
self.run_dbt(['seed', '--defer'])

@use_profile('postgres')
def test_postgres_state_changedir(self):
self.run_switchdirs_defer()

@use_profile('postgres')
def test_postgres_state_changedir_favor_state(self):
self.run_switchdirs_defer_favor_state()

@use_profile('postgres')
def test_postgres_state_defer_iffnotexists(self):
self.run_defer_iff_not_exists()

@use_profile('postgres')
def test_postgres_state_defer_iffnotexists_favor_state(self):
self.run_defer_iff_not_exists_favor_state()

@use_profile('postgres')
def test_postgres_state_defer_deleted_upstream(self):
self.run_defer_deleted_upstream()

@use_profile('postgres')
def test_postgres_state_defer_deleted_upstream_favor_state(self):
self.run_defer_deleted_upstream_favor_state()

@use_profile('postgres')
def test_postgres_state_snapshot_defer(self):
self.run_and_snapshot_defer()
Expand Down