Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/incremental non full refresh #3387

Merged
merged 50 commits into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
f9f1a96
detect and act on schema changes
matt-winkler Apr 22, 2021
82950fa
update incremental helpers code
matt-winkler Apr 22, 2021
e06f09f
update changelog
matt-winkler Apr 22, 2021
0be265b
fix error in diff_columns from testing
matt-winkler Apr 23, 2021
8c531fb
abstract code a bit further
matt-winkler Apr 23, 2021
1f014de
address matching names vs. data types
matt-winkler Apr 26, 2021
59e10ce
Update CHANGELOG.md
matt-winkler Apr 27, 2021
e26c220
updates from Jeremy's feedback
matt-winkler Apr 27, 2021
c073b45
updates from Jeremy's feedback
matt-winkler Apr 27, 2021
4ab16d5
Merge branch 'develop' into feature/incremental-schema-changes
matt-winkler Apr 27, 2021
335eb15
multi-column add / remove with full_refresh
matt-winkler Apr 28, 2021
7c7b5a0
pulled in updates from remote
matt-winkler Apr 28, 2021
1549af6
simple changes from JC's feedback
matt-winkler Apr 28, 2021
86a7536
updated for snowflake
matt-winkler May 7, 2021
49f5319
merging develop
matt-winkler May 7, 2021
d5edd65
Merge branch 'develop' into feature/incremental-schema-changes
matt-winkler May 18, 2021
3d99fa3
reorganize postgres code
matt-winkler May 19, 2021
7ba1286
reorganize approach
matt-winkler May 21, 2021
1430834
updated full refresh trigger logic
matt-winkler May 23, 2021
9b0029f
fixed unintentional wipe behavior
matt-winkler May 24, 2021
1f1d65f
catch final else condition
matt-winkler May 24, 2021
52e2c97
Merge branch 'feature/incremental-schema-changes' into feature/increm…
matt-winkler May 24, 2021
4188e08
remove WHERE string replace
matt-winkler May 24, 2021
2dd3756
touch ups
matt-winkler May 24, 2021
5f9af84
port core to snowflake
matt-winkler May 24, 2021
b55f5c7
added bigquery code
matt-winkler May 24, 2021
4ce23d1
updated impacted unit tests
matt-winkler May 26, 2021
1f7b882
updates from linting tests
matt-winkler May 26, 2021
e24ca37
updates from linting again
matt-winkler May 26, 2021
0124efe
snowflake updates from further testing
matt-winkler May 27, 2021
66d4dff
fix logging
matt-winkler May 27, 2021
105ab4a
clean up incremental logic
matt-winkler May 27, 2021
7613656
updated for bigquery
matt-winkler Jun 1, 2021
c9ab845
update postgres with new strategy
matt-winkler Jun 1, 2021
bb9cc46
update nodeconfig
matt-winkler Jun 3, 2021
d695f1b
starting integration tests
matt-winkler Jun 3, 2021
69d31ba
integration test for ignore case
matt-winkler Jun 4, 2021
6270ee5
add test for append_new_columns
matt-winkler Jun 4, 2021
cd24472
add integration test for sync
matt-winkler Jun 4, 2021
2463b94
remove extra tests
matt-winkler Jun 4, 2021
4c54429
add unique key and snowflake test
matt-winkler Jun 14, 2021
64b0877
fix merge conflicts
matt-winkler Jun 14, 2021
93938f4
move incremental integration test dir
matt-winkler Jun 14, 2021
8fa33ec
update integration tests
matt-winkler Jun 14, 2021
e8b18b5
update integration tests
matt-winkler Jun 14, 2021
73f4d31
Suggestions for #3387 (#3558)
jtcohen6 Jul 15, 2021
c32f3eb
fix merge conflicts with develop
matt-winkler Jul 21, 2021
bb5eab5
rename integration test folder
matt-winkler Jul 21, 2021
ff3fab8
Update core/dbt/include/global_project/macros/materializations/increm…
matt-winkler Jul 21, 2021
f32c8b2
Update changelog [skip ci]
jtcohen6 Jul 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features
- Add `dbt build` command to run models, tests, seeds, and snapshots in DAG order. ([#2743] (https://github.com/dbt-labs/dbt/issues/2743), [#3490] (https://github.com/dbt-labs/dbt/issues/3490))
- Introduce `on_schema_change` config to detect and handle schema changes on incremental models ([#1132](https://github.com/fishtown-analytics/dbt/issues/1132), [#3387](https://github.com/fishtown-analytics/dbt/issues/3387))

### Fixes
- Fix docs generation for cross-db sources in REDSHIFT RA3 node ([#3236](https://github.com/fishtown-analytics/dbt/issues/3236), [#3408](https://github.com/fishtown-analytics/dbt/pull/3408))
Expand All @@ -20,6 +21,7 @@

Contributors:
- [@kostek-pl](https://github.com/kostek-pl) ([#3236](https://github.com/fishtown-analytics/dbt/pull/3408))
- [@matt-winkler](https://github.com/matt-winkler) ([#3387](https://github.com/dbt-labs/dbt/pull/3387))
- [@tconbeer](https://github.com/tconbeer) [#3468](https://github.com/fishtown-analytics/dbt/pull/3468))
- [@JLDLaughlin](https://github.com/JLDLaughlin) ([#3473](https://github.com/fishtown-analytics/dbt/pull/3473))
- [@jmriego](https://github.com/jmriego) ([#3526](https://github.com/dbt-labs/dbt/pull/3526))
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ def rename_relation(
def get_columns_in_relation(
self, relation: BaseRelation
) -> List[BaseColumn]:
"""Get a list of the columns in the given Relation."""
"""Get a list of the columns in the given Relation. """
raise NotImplementedException(
'`get_columns_in_relation` is not implemented for this adapter!'
)
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/graph/model_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ class NodeConfig(BaseConfig):
CompareBehavior.Exclude),
)
full_refresh: Optional[bool] = None
on_schema_change: Optional[str] = 'ignore'

@classmethod
def __pre_deserialize__(cls, data):
Expand Down
31 changes: 31 additions & 0 deletions core/dbt/include/global_project/macros/adapters/common.sql
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,34 @@
{{ config.set('sql_header', caller()) }}
{%- endmacro %}


{% macro alter_relation_add_remove_columns(relation, add_columns = none, remove_columns = none) -%}
{{ return(adapter.dispatch('alter_relation_add_remove_columns')(relation, add_columns, remove_columns)) }}
{% endmacro %}

{% macro default__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}

{% if add_columns is none %}
{% set add_columns = [] %}
{% endif %}
{% if remove_columns is none %}
{% set remove_columns = [] %}
{% endif %}

{% set sql -%}

alter {{ relation.type }} {{ relation }}

{% for column in add_columns %}
add column {{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %}{{ ',' if remove_columns | length > 0 }}

{% for column in remove_columns %}
drop column {{ column.name }}{{ ',' if not loop.last }}
{% endfor %}

{%- endset -%}

{% do run_query(sql) %}

{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

{% macro incremental_upsert(tmp_relation, target_relation, unique_key=none, statement_name="main") %}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@matt-winkler @jtcohen6 do you have context on why the existing "target" columns are used as dest_columns below ?
Why not use the "tmp" increment's own columns ?

Sorry, I stumbled on this PR because 1. I was trying to implement schema evolution on incremental runs as well and 2. I had issues with some of my increments not necessarily having all the columns of the existing "target".

So I've used tmp_relation instead in the line below in a custom materialization on my side.

If the reason is unclear, I'll open an issue to have a broader discussion on this topic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @b-luu here are my thoughts on use of target columns for each scenario:

  • If we ignore or fail in the case of a schema change (or leave off the on_schema_change entirely), we assume that the UPSERT is targeted towards the existing columns in the target table. Basically the thinking here is that the target schema should not be changing under these configuration options.
  • If we append_new_columns or sync_all_columns, the column addition / syncing actions have already been performed by the time we reach this point in the code.
  • I'd be interested to see more examples of the issue you describe @b-luu to see if we're missing something here; let us know!

@jtcohen6 Please comment if the above seems off

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

{% set target_relation = this.incorporate(type='table') %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}

{% set tmp_identifier = model['name'] + '__dbt_tmp' %}
{% set backup_identifier = model['name'] + "__dbt_backup" %}
Expand All @@ -28,22 +32,30 @@
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set to_drop = [] %}

{# -- first check whether we want to full refresh for source view or config reasons #}
{% set trigger_full_refresh = (full_refresh_mode or existing_relation.is_view) %}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view or should_full_refresh() %}
{% elif trigger_full_refresh %}
{#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #}
{% set tmp_identifier = model['name'] + '__dbt_tmp' %}
{% set backup_identifier = model['name'] + '__dbt_backup' %}
{% set intermediate_relation = existing_relation.incorporate(path={"identifier": tmp_identifier}) %}
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}

{% set build_sql = create_table_as(False, intermediate_relation, sql) %}
{% set need_swap = true %}
{% do to_drop.append(backup_relation) %}
{% else %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do adapter.expand_target_column_types(
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}

{% endif %}

{% call statement("main") %}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
{% macro incremental_validate_on_schema_change(on_schema_change, default='ignore') %}

{% if on_schema_change not in ['sync_all_columns', 'append_new_columns', 'fail', 'ignore'] %}

{% set log_message = 'Invalid value for on_schema_change (%s) specified. Setting default value of %s.' % (on_schema_change, default) %}
{% do log(log_message) %}

{{ return(default) }}

{% else %}

{{ return(on_schema_change) }}

{% endif %}

{% endmacro %}

{% macro diff_columns(source_columns, target_columns) %}

{% set result = [] %}
{% set source_names = source_columns | map(attribute = 'column') | list %}
{% set target_names = target_columns | map(attribute = 'column') | list %}

{# --check whether the name attribute exists in the target - this does not perform a data type check #}
{% for sc in source_columns %}
{% if sc.name not in target_names %}
{{ result.append(sc) }}
{% endif %}
{% endfor %}

{{ return(result) }}

{% endmacro %}

{% macro diff_column_data_types(source_columns, target_columns) %}

{% set result = [] %}
{% for sc in source_columns %}
{% set tc = target_columns | selectattr("name", "equalto", sc.name) | list | first %}
{% if tc %}
{% if sc.data_type != tc.data_type %}
{{ result.append( { 'column_name': tc.name, 'new_type': sc.data_type } ) }}
{% endif %}
{% endif %}
{% endfor %}

{{ return(result) }}

{% endmacro %}


{% macro check_for_schema_changes(source_relation, target_relation) %}

{% set schema_changed = False %}

{%- set source_columns = adapter.get_columns_in_relation(source_relation) -%}
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set source_not_in_target = diff_columns(source_columns, target_columns) -%}
{%- set target_not_in_source = diff_columns(target_columns, source_columns) -%}

{% set new_target_types = diff_column_data_types(source_columns, target_columns) %}

{% if source_not_in_target != [] %}
{% set schema_changed = True %}
{% elif target_not_in_source != [] or new_target_types != [] %}
{% set schema_changed = True %}
{% elif new_target_types != [] %}
{% set schema_changed = True %}
{% endif %}

{% set changes_dict = {
'schema_changed': schema_changed,
'source_not_in_target': source_not_in_target,
'target_not_in_source': target_not_in_source,
'new_target_types': new_target_types
} %}

{% set msg %}
In {{ target_relation }}:
Schema changed: {{ schema_changed }}
Source columns not in target: {{ source_not_in_target }}
Target columns not in source: {{ target_not_in_source }}
New column types: {{ new_target_types }}
{% endset %}

{% do log(msg) %}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we centralize all schema-change-related logging in this macro, to avoid repeating it in the materialization code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

{{ return(changes_dict) }}

{% endmacro %}


{% macro sync_column_schemas(on_schema_change, target_relation, schema_changes_dict) %}

{%- set add_to_target_arr = schema_changes_dict['source_not_in_target'] -%}

{%- if on_schema_change == 'append_new_columns'-%}
{%- if add_to_target_arr | length > 0 -%}
{%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr, none) -%}
{%- endif -%}

{% elif on_schema_change == 'sync_all_columns' %}
{%- set remove_from_target_arr = schema_changes_dict['target_not_in_source'] -%}
{%- set new_target_types = schema_changes_dict['new_target_types'] -%}

{% if add_to_target_arr | length > 0 or remove_from_target_arr | length > 0 %}
{%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr, remove_from_target_arr) -%}
{% endif %}

{% if new_target_types != [] %}
{% for ntt in new_target_types %}
{% set column_name = ntt['column_name'] %}
{% set new_type = ntt['new_type'] %}
{% do alter_column_type(target_relation, column_name, new_type) %}
{% endfor %}
{% endif %}

{% endif %}

{% set schema_change_message %}
In {{ target_relation }}:
Schema change approach: {{ on_schema_change }}
Columns added: {{ add_to_target_arr }}
Columns removed: {{ remove_from_target_arr }}
Data types changed: {{ new_target_types }}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warning: remove_from_target_arr and new_target_types are only set if on_schema_change == 'sync_all_columns'.

This should raise an exception if on_schema_change == 'append_new_columns'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@b-luu The assumption here is that when someone only wants to append new columns, we don't need to be concerned with existing column data type changes, or columns that have been removed from the upstream model vs. the target incremental model. Those scenarios are handled with the sync_all_columns configuration option.

@jtcohen6 Please comment if the above seems off

{% endset %}

{% do log(schema_change_message) %}

{% endmacro %}


{% macro process_schema_changes(on_schema_change, source_relation, target_relation) %}

{% if on_schema_change != 'ignore' %}

{% set schema_changes_dict = check_for_schema_changes(source_relation, target_relation) %}

{% if schema_changes_dict['schema_changed'] %}

{% if on_schema_change == 'fail' %}

{% set fail_msg %}
The source and target schemas on this incremental model are out of sync!
They can be reconciled in several ways:
- set the `on_schema_change` config to either append_new_columns or sync_all_columns, depending on your situation.
- Re-run the incremental model with `full_refresh: True` to update the target schema.
- update the schema manually and re-run the process.
{% endset %}

{% do exceptions.raise_compiler_error(fail_msg) %}

{# -- unless we ignore, run the sync operation per the config #}
{% else %}

{% do sync_column_schemas(on_schema_change, target_relation, schema_changes_dict) %}

{% endif %}

{% endif %}

{% endif %}

{% endmacro %}
32 changes: 32 additions & 0 deletions plugins/bigquery/dbt/include/bigquery/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,38 @@
{% do adapter.rename_relation(from_relation, to_relation) %}
{% endmacro %}

{% macro bigquery__alter_relation_add_columns(relation, add_columns) %}

{% set sql -%}

alter {{ relation.type }} {{ relation }}
{% for column in add_columns %}
add column {{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %}

{%- endset -%}

{{ return(run_query(sql)) }}

{% endmacro %}

{% macro bigquery__alter_relation_drop_columns(relation, drop_columns) %}

{% set sql -%}

alter {{ relation.type }} {{ relation }}

{% for column in drop_columns %}
drop column {{ column.name }}{{ ',' if not loop.last }}
{% endfor %}

{%- endset -%}

{{ return(run_query(sql)) }}

{% endmacro %}


{% macro bigquery__alter_column_type(relation, column_name, new_column_type) -%}
{#
Changing a column's data type using a query requires you to scan the entire table.
Expand Down
Loading