Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/incremental non full refresh #3387

Merged
merged 50 commits into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
f9f1a96
detect and act on schema changes
matt-winkler Apr 22, 2021
82950fa
update incremental helpers code
matt-winkler Apr 22, 2021
e06f09f
update changelog
matt-winkler Apr 22, 2021
0be265b
fix error in diff_columns from testing
matt-winkler Apr 23, 2021
8c531fb
abstract code a bit further
matt-winkler Apr 23, 2021
1f014de
address matching names vs. data types
matt-winkler Apr 26, 2021
59e10ce
Update CHANGELOG.md
matt-winkler Apr 27, 2021
e26c220
updates from Jeremy's feedback
matt-winkler Apr 27, 2021
c073b45
updates from Jeremy's feedback
matt-winkler Apr 27, 2021
4ab16d5
Merge branch 'develop' into feature/incremental-schema-changes
matt-winkler Apr 27, 2021
335eb15
multi-column add / remove with full_refresh
matt-winkler Apr 28, 2021
7c7b5a0
pulled in updates from remote
matt-winkler Apr 28, 2021
1549af6
simple changes from JC's feedback
matt-winkler Apr 28, 2021
86a7536
updated for snowflake
matt-winkler May 7, 2021
49f5319
merging develop
matt-winkler May 7, 2021
d5edd65
Merge branch 'develop' into feature/incremental-schema-changes
matt-winkler May 18, 2021
3d99fa3
reorganize postgres code
matt-winkler May 19, 2021
7ba1286
reorganize approach
matt-winkler May 21, 2021
1430834
updated full refresh trigger logic
matt-winkler May 23, 2021
9b0029f
fixed unintentional wipe behavior
matt-winkler May 24, 2021
1f1d65f
catch final else condition
matt-winkler May 24, 2021
52e2c97
Merge branch 'feature/incremental-schema-changes' into feature/increm…
matt-winkler May 24, 2021
4188e08
remove WHERE string replace
matt-winkler May 24, 2021
2dd3756
touch ups
matt-winkler May 24, 2021
5f9af84
port core to snowflake
matt-winkler May 24, 2021
b55f5c7
added bigquery code
matt-winkler May 24, 2021
4ce23d1
updated impacted unit tests
matt-winkler May 26, 2021
1f7b882
updates from linting tests
matt-winkler May 26, 2021
e24ca37
updates from linting again
matt-winkler May 26, 2021
0124efe
snowflake updates from further testing
matt-winkler May 27, 2021
66d4dff
fix logging
matt-winkler May 27, 2021
105ab4a
clean up incremental logic
matt-winkler May 27, 2021
7613656
updated for bigquery
matt-winkler Jun 1, 2021
c9ab845
update postgres with new strategy
matt-winkler Jun 1, 2021
bb9cc46
update nodeconfig
matt-winkler Jun 3, 2021
d695f1b
starting integration tests
matt-winkler Jun 3, 2021
69d31ba
integration test for ignore case
matt-winkler Jun 4, 2021
6270ee5
add test for append_new_columns
matt-winkler Jun 4, 2021
cd24472
add integration test for sync
matt-winkler Jun 4, 2021
2463b94
remove extra tests
matt-winkler Jun 4, 2021
4c54429
add unique key and snowflake test
matt-winkler Jun 14, 2021
64b0877
fix merge conflicts
matt-winkler Jun 14, 2021
93938f4
move incremental integration test dir
matt-winkler Jun 14, 2021
8fa33ec
update integration tests
matt-winkler Jun 14, 2021
e8b18b5
update integration tests
matt-winkler Jun 14, 2021
73f4d31
Suggestions for #3387 (#3558)
jtcohen6 Jul 15, 2021
c32f3eb
fix merge conflicts with develop
matt-winkler Jul 21, 2021
bb5eab5
rename integration test folder
matt-winkler Jul 21, 2021
ff3fab8
Update core/dbt/include/global_project/macros/materializations/increm…
matt-winkler Jul 21, 2021
f32c8b2
Update changelog [skip ci]
jtcohen6 Jul 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Contributors:
- Support disabling schema tests, and configuring tests from `dbt_project.yml` ([#3252](https://github.com/fishtown-analytics/dbt/issues/3252),
[#3253](https://github.com/fishtown-analytics/dbt/issues/3253), [#3257](https://github.com/fishtown-analytics/dbt/pull/3257))
- Add Jinja tag for tests ([#1173](https://github.com/fishtown-analytics/dbt/issues/1173), [#3261](https://github.com/fishtown-analytics/dbt/pull/3261))
- Support detecting schema changes on incremental models ([#1132](https://github.com/fishtown-analytics/dbt/issues/1132), [#3288](https://github.com/fishtown-analytics/dbt/issues/3288))
- Add native support for Postgres index creation ([#804](https://github.com/fishtown-analytics/dbt/issues/804), [3106](https://github.com/fishtown-analytics/dbt/pull/3106))
- Less greedy test selection: expand to select unselected tests if and only if all parents are selected ([#2891](https://github.com/fishtown-analytics/dbt/issues/2891), [#3235](https://github.com/fishtown-analytics/dbt/pull/3235))
- Prevent locks in Redshift during full refresh in incremental materialization. ([#2426](https://github.com/fishtown-analytics/dbt/issues/2426), [#2998](https://github.com/fishtown-analytics/dbt/pull/2998))
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ def rename_relation(
def get_columns_in_relation(
self, relation: BaseRelation
) -> List[BaseColumn]:
"""Get a list of the columns in the given Relation."""
"""Get a list of the columns in the given Relation. """
raise NotImplementedException(
'`get_columns_in_relation` is not implemented for this adapter!'
)
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/graph/model_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ class NodeConfig(BaseConfig):
CompareBehavior.Exclude),
)
full_refresh: Optional[bool] = None
on_schema_change: str = 'ignore'

@classmethod
def __pre_deserialize__(cls, data):
Expand Down
32 changes: 32 additions & 0 deletions core/dbt/include/global_project/macros/adapters/common.sql
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,35 @@
{{ config.set('sql_header', caller()) }}
{%- endmacro %}

{% macro alter_relation_add_remove_columns(relation, add_columns = none, remove_columns = none) -%}
{{ return(adapter.dispatch('alter_relation_add_remove_columns')(relation, add_columns, remove_columns)) }}
{% endmacro %}

{% macro default__alter_relation_add_remove_columns(relation, add_columns = none, remove_columns = none) -%}

{% set sql -%}

alter {{ relation.type }} {{ relation }}
{% if add_columns %}
add
{% endif %}
{% for column in add_columns %}
column {{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %}

{{ ', ' if add_columns and remove_columns }}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a test run with a column add and column drop, default__alter_relation_add_remove_columns got dispatched to take care of the alters and snowflake did not like the comma separated alters.

Here is the config and sql in database side

alter table xyz
            add column TEST_COL2 character varying(250), 
            drop column TEST_COL1
{{ 
    config(
	  alias = 'xyz',
      materialized='incremental',
      incremental_strategy='merge',
      unique_key='record_id',
	  transient=false,
      full_refresh=false,
      on_schema_change='sync_all_columns'
  )
}}


{% if remove_columns %}
drop
{% for column in remove_columns %}
column {{ column.name }}{{ ',' if not loop.last }}
{% endfor %}
{% endif %}

{%- endset %}

{{ return(run_query(sql)) }}

{% endmacro %}

{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,5 @@
{% endif %}
{% do return(config_full_refresh) %}
{% endmacro %}


Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

{% macro incremental_upsert(tmp_relation, target_relation, unique_key=none, statement_name="main") %}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@matt-winkler @jtcohen6 do you have context on why the existing "target" columns are used as dest_columns below ?
Why not use the "tmp" increment's own columns ?

Sorry, I stumbled on this PR because 1. I was trying to implement schema evolution on incremental runs as well and 2. I had issues with some of my increments not necessarily having all the columns of the existing "target".

So I've used tmp_relation instead in the line below in a custom materialization on my side.

If the reason is unclear, I'll open an issue to have a broader discussion on this topic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @b-luu here are my thoughts on use of target columns for each scenario:

  • If we ignore or fail in the case of a schema change (or leave off the on_schema_change entirely), we assume that the UPSERT is targeted towards the existing columns in the target table. Basically the thinking here is that the target schema should not be changing under these configuration options.
  • If we append_new_columns or sync_all_columns, the column addition / syncing actions have already been performed by the time we reach this point in the code.
  • I'd be interested to see more examples of the issue you describe @b-luu to see if we're missing something here; let us know!

@jtcohen6 Please comment if the above seems off

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,29 @@

{% set target_relation = this.incorporate(type='table') %}
{% set existing_relation = load_relation(this) %}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set to_drop = [] %}

{# -- first check whether we want to full refresh for source view or config reasons #}
{% set trigger_full_refresh = (full_refresh_mode or existing_relation.is_view) %}
{% do log('full refresh mode: %s' % trigger_full_refresh) %}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view or should_full_refresh() %}

{% elif trigger_full_refresh %}
{#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #}
{% do log('running full refresh procedure', info=true) %}
{% set tmp_identifier = model['name'] + '__dbt_tmp' %}
{% set backup_identifier = model['name'] + "__dbt_backup" %}
{% set backup_identifier = model['name'] + '__dbt_backup' %}

{% set intermediate_relation = existing_relation.incorporate(path={"identifier": tmp_identifier}) %}
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}
Expand All @@ -28,13 +38,32 @@
{% set build_sql = create_table_as(False, intermediate_relation, sql) %}
{% set need_swap = true %}
{% do to_drop.append(backup_relation) %}

{% else %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}

{% if on_schema_change != 'ignore' %}
{% set schema_changed = check_for_schema_changes(tmp_relation, target_relation) %}
{% do log('schema changed: %s' % schema_changed, info=true) %}
{% if schema_changed %}
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}

{% else %}
{% do adapter.expand_target_column_types(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's always expand_target_column_types before check_for_schema_changes, even if it means running a few extra metadata queries, in order to avoid over-aggressively dropping and recreating columns that just need to be varchar/numeric-type expanded

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great call - I tested this on Snowflake and works beautifully.

from_relation=tmp_relation,
to_relation=target_relation) %}
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}
{% endif %}

{% else %}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}
{% endif %}

{% endif %}

{% call statement("main") %}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
{% macro incremental_validate_on_schema_change(on_schema_change, default='ignore') %}

{% if on_schema_change not in ['sync_all_columns', 'append_new_columns', 'fail', 'ignore'] %}

{% set log_message = 'invalid value for on_schema_change (%s) specified. Setting default value of %s.' % (on_schema_change, default_value) %}
{% do log(log_message, info=true) %}

{{ return(default) }}

{% else %}
{{ return(on_schema_change) }}

{% endif %}

{% endmacro %}

{% macro get_column_names(columns) %}

{% set result = [] %}

{% for col in columns %}
{{ result.append(col.column) }}
{% endfor %}

{{ return(result) }}

{% endmacro %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this macro is effectively the same as columns | map(attribute = 'name'), right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boom, map for the win.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On further testing, I found that and additional | list was needed


{% macro diff_arrays(source_array, target_array) %}

{% set result = [] %}
{%- for elem in source_array -%}
{% if elem not in target_array %}

{{ result.append(elem) }}

{% endif %}

{%- endfor -%}

{{ return(result) }}

{% endmacro %}

{% macro diff_columns(source_columns, target_columns) %}

{% set result = [] %}
{% set source_names = get_column_names(source_columns) %}
{% set target_names = get_column_names(target_columns) %}

{# --check whether the name attribute exists in the target, but dont worry about data type differences #}
{%- for col in source_columns -%}
{%- if col.column not in target_names -%}
{{ result.append(col) }}
{%- endif -%}
{%- endfor -%}

{{ return(result) }}

{% endmacro %}

{% macro check_for_schema_changes(source_relation, target_relation) %}

{% set schema_changed = False %}

{%- set source_columns = adapter.get_columns_in_relation(source_relation) -%}
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set source_not_in_target = diff_columns(source_columns, target_columns) -%}
{%- set target_not_in_source = diff_columns(target_columns, source_columns) -%}

{% if source_not_in_target != [] %}
{% set schema_changed = True %}
{% elif target_not_in_source != [] %}
{% set schema_changed = True %}
{% endif %}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we centralize all schema-change-related logging in this macro, to avoid repeating it in the materialization code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

{{ return(schema_changed) }}

{% endmacro %}

{% macro sync_schemas(source_relation, target_relation, on_schema_change='append_new_columns') %}

{%- set source_columns = adapter.get_columns_in_relation(source_relation) -%}
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set add_to_target_arr = diff_columns(source_columns, target_columns) -%}
{%- set remove_from_target_arr = diff_columns(target_columns, source_columns) -%}

-- Validates on_schema_change config vs. whether there are column differences
{% if on_schema_change=='append_new_columns' and add_to_target_arr == [] %}

{{
exceptions.raise_compiler_error('append_new_columns was set, but no new columns to append.
This can occur when columns are removed from the source dataset unintentionally.
Review the schemas in the source and target relations, and consider re-running with the --full-refresh option.')
}}
matt-winkler marked this conversation as resolved.
Show resolved Hide resolved

{% endif %}

{%- if on_schema_change == 'append_new_columns' -%}
{%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr) -%}
{% elif on_schema_change == 'sync_all_columns' %}
{%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr, remove_from_target_arr) -%}
{% endif %}

{{
return(
{
'columns_added': add_to_target_arr,
'columns_removed': remove_from_target_arr
}
)
}}

{% endmacro %}

{% macro process_schema_changes(on_schema_change, source_relation, target_relation) %}

{% if on_schema_change=='fail' %}

{{
exceptions.raise_compiler_error("The source and target schemas on this incremental model are out of sync!
You can specify one of ['fail', 'ignore', 'append_new_columns', 'sync_all_columns'] in the on_schema_change config to control this behavior.
Please re-run the incremental model with full_refresh set to True to update the target schema.
Alternatively, you can update the schema manually and re-run the process.")
}}

{# unless we ignore, run the sync operation per the config #}
{% else %}

{% set schema_changes = sync_schemas(source_relation, target_relation, on_schema_change) %}
{% set columns_added = schema_changes['columns_added'] %}
{% set columns_removed = schema_changes['columns_removed'] %}
{% do log('columns added: ' + columns_added|join(', '), info=true) %}
{% do log('columns removed: ' + columns_removed|join(', '), info=true) %}

{% endif %}

{% endmacro %}
31 changes: 31 additions & 0 deletions plugins/bigquery/dbt/include/bigquery/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,37 @@
{% do adapter.rename_relation(from_relation, to_relation) %}
{% endmacro %}

{% macro alter_relation_add_remove_columns(relation, add_columns = none, remove_columns = none) -%}
{{ return(adapter.dispatch('alter_relation_add_remove_columns')(relation, add_columns, remove_columns)) }}
{% endmacro %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove this dispatch call and just keep the bigquery__ implementation below


{% macro bigquery__alter_relation_add_remove_columns(relation, add_columns = none, remove_columns = none) -%}

{% set sql -%}

alter {{ relation.type }} {{ relation }}
{% if add_columns %}

{% for column in add_columns %}
add column {{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %}
{% endif %}

{{ ', ' if add_columns and remove_columns }}

{% if remove_columns %}
{% for column in remove_columns %}
drop column {{ column.name }}{{ ',' if not loop.last }}
{% endfor %}
{% endif %}

{%- endset %}

{{ return(run_query(sql)) }}

{% endmacro %}


{% macro bigquery__alter_column_type(relation, column_name, new_column_type) -%}
{#
Changing a column's data type using a query requires you to scan the entire table.
Expand Down
Loading