-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/incremental non full refresh #3387
Changes from 29 commits
f9f1a96
82950fa
e06f09f
0be265b
8c531fb
1f014de
59e10ce
e26c220
c073b45
4ab16d5
335eb15
7c7b5a0
1549af6
86a7536
49f5319
d5edd65
3d99fa3
7ba1286
1430834
9b0029f
1f1d65f
52e2c97
4188e08
2dd3756
5f9af84
b55f5c7
4ce23d1
1f7b882
e24ca37
0124efe
66d4dff
105ab4a
7613656
c9ab845
bb9cc46
d695f1b
69d31ba
6270ee5
cd24472
2463b94
4c54429
64b0877
93938f4
8fa33ec
e8b18b5
73f4d31
c32f3eb
bb5eab5
ff3fab8
f32c8b2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,3 +72,5 @@ | |
{% endif %} | ||
{% do return(config_full_refresh) %} | ||
{% endmacro %} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
|
||
{% macro incremental_upsert(tmp_relation, target_relation, unique_key=none, statement_name="main") %} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @matt-winkler @jtcohen6 do you have context on why the existing "target" columns are used as Sorry, I stumbled on this PR because 1. I was trying to implement schema evolution on incremental runs as well and 2. I had issues with some of my increments not necessarily having all the columns of the existing "target". So I've used tmp_relation instead in the line below in a custom materialization on my side. If the reason is unclear, I'll open an issue to have a broader discussion on this topic. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @b-luu here are my thoughts on use of target columns for each scenario:
@jtcohen6 Please comment if the above seems off |
||
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} | ||
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,19 +5,29 @@ | |
|
||
{% set target_relation = this.incorporate(type='table') %} | ||
{% set existing_relation = load_relation(this) %} | ||
{%- set full_refresh_mode = (should_full_refresh()) -%} | ||
|
||
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %} | ||
|
||
{{ run_hooks(pre_hooks, inside_transaction=False) }} | ||
|
||
-- `BEGIN` happens here: | ||
{{ run_hooks(pre_hooks, inside_transaction=True) }} | ||
|
||
{% set to_drop = [] %} | ||
|
||
{# -- first check whether we want to full refresh for source view or config reasons #} | ||
{% set trigger_full_refresh = (full_refresh_mode or existing_relation.is_view) %} | ||
{% do log('full refresh mode: %s' % trigger_full_refresh) %} | ||
|
||
{% if existing_relation is none %} | ||
{% set build_sql = create_table_as(False, target_relation, sql) %} | ||
{% elif existing_relation.is_view or should_full_refresh() %} | ||
|
||
{% elif trigger_full_refresh %} | ||
{#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} | ||
{% do log('running full refresh procedure', info=true) %} | ||
{% set tmp_identifier = model['name'] + '__dbt_tmp' %} | ||
{% set backup_identifier = model['name'] + "__dbt_backup" %} | ||
{% set backup_identifier = model['name'] + '__dbt_backup' %} | ||
|
||
{% set intermediate_relation = existing_relation.incorporate(path={"identifier": tmp_identifier}) %} | ||
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %} | ||
|
@@ -28,13 +38,32 @@ | |
{% set build_sql = create_table_as(False, intermediate_relation, sql) %} | ||
{% set need_swap = true %} | ||
{% do to_drop.append(backup_relation) %} | ||
|
||
{% else %} | ||
{% set tmp_relation = make_temp_relation(target_relation) %} | ||
{% do run_query(create_table_as(True, tmp_relation, sql)) %} | ||
{% set tmp_relation = make_temp_relation(target_relation) %} | ||
{% do run_query(create_table_as(True, tmp_relation, sql)) %} | ||
|
||
{% if on_schema_change != 'ignore' %} | ||
{% set schema_changed = check_for_schema_changes(tmp_relation, target_relation) %} | ||
{% do log('schema changed: %s' % schema_changed, info=true) %} | ||
{% if schema_changed %} | ||
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %} | ||
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %} | ||
|
||
{% else %} | ||
{% do adapter.expand_target_column_types( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's always There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great call - I tested this on Snowflake and works beautifully. |
||
from_relation=tmp_relation, | ||
to_relation=target_relation) %} | ||
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %} | ||
{% endif %} | ||
|
||
{% else %} | ||
{% do adapter.expand_target_column_types( | ||
from_relation=tmp_relation, | ||
to_relation=target_relation) %} | ||
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %} | ||
{% endif %} | ||
|
||
{% endif %} | ||
|
||
{% call statement("main") %} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
{% macro incremental_validate_on_schema_change(on_schema_change, default='ignore') %} | ||
|
||
{% if on_schema_change not in ['sync_all_columns', 'append_new_columns', 'fail', 'ignore'] %} | ||
|
||
{% set log_message = 'invalid value for on_schema_change (%s) specified. Setting default value of %s.' % (on_schema_change, default_value) %} | ||
{% do log(log_message, info=true) %} | ||
|
||
{{ return(default) }} | ||
|
||
{% else %} | ||
{{ return(on_schema_change) }} | ||
|
||
{% endif %} | ||
|
||
{% endmacro %} | ||
|
||
{% macro get_column_names(columns) %} | ||
|
||
{% set result = [] %} | ||
|
||
{% for col in columns %} | ||
{{ result.append(col.column) }} | ||
{% endfor %} | ||
|
||
{{ return(result) }} | ||
|
||
{% endmacro %} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this macro is effectively the same as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Boom, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On further testing, I found that and additional |
||
|
||
{% macro diff_arrays(source_array, target_array) %} | ||
|
||
{% set result = [] %} | ||
{%- for elem in source_array -%} | ||
{% if elem not in target_array %} | ||
|
||
{{ result.append(elem) }} | ||
|
||
{% endif %} | ||
|
||
{%- endfor -%} | ||
|
||
{{ return(result) }} | ||
|
||
{% endmacro %} | ||
|
||
{% macro diff_columns(source_columns, target_columns) %} | ||
|
||
{% set result = [] %} | ||
{% set source_names = get_column_names(source_columns) %} | ||
{% set target_names = get_column_names(target_columns) %} | ||
|
||
{# --check whether the name attribute exists in the target, but dont worry about data type differences #} | ||
{%- for col in source_columns -%} | ||
{%- if col.column not in target_names -%} | ||
{{ result.append(col) }} | ||
{%- endif -%} | ||
{%- endfor -%} | ||
|
||
{{ return(result) }} | ||
|
||
{% endmacro %} | ||
|
||
{% macro check_for_schema_changes(source_relation, target_relation) %} | ||
|
||
{% set schema_changed = False %} | ||
|
||
{%- set source_columns = adapter.get_columns_in_relation(source_relation) -%} | ||
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%} | ||
{%- set source_not_in_target = diff_columns(source_columns, target_columns) -%} | ||
{%- set target_not_in_source = diff_columns(target_columns, source_columns) -%} | ||
|
||
{% if source_not_in_target != [] %} | ||
{% set schema_changed = True %} | ||
{% elif target_not_in_source != [] %} | ||
{% set schema_changed = True %} | ||
{% endif %} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we centralize all schema-change-related logging in this macro, to avoid repeating it in the materialization code? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
{{ return(schema_changed) }} | ||
|
||
{% endmacro %} | ||
|
||
{% macro sync_schemas(source_relation, target_relation, on_schema_change='append_new_columns') %} | ||
|
||
{%- set source_columns = adapter.get_columns_in_relation(source_relation) -%} | ||
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%} | ||
{%- set add_to_target_arr = diff_columns(source_columns, target_columns) -%} | ||
{%- set remove_from_target_arr = diff_columns(target_columns, source_columns) -%} | ||
|
||
-- Validates on_schema_change config vs. whether there are column differences | ||
{% if on_schema_change=='append_new_columns' and add_to_target_arr == [] %} | ||
|
||
{{ | ||
exceptions.raise_compiler_error('append_new_columns was set, but no new columns to append. | ||
This can occur when columns are removed from the source dataset unintentionally. | ||
Review the schemas in the source and target relations, and consider re-running with the --full-refresh option.') | ||
}} | ||
matt-winkler marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
{% endif %} | ||
|
||
{%- if on_schema_change == 'append_new_columns' -%} | ||
{%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr) -%} | ||
{% elif on_schema_change == 'sync_all_columns' %} | ||
{%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr, remove_from_target_arr) -%} | ||
{% endif %} | ||
|
||
{{ | ||
return( | ||
{ | ||
'columns_added': add_to_target_arr, | ||
'columns_removed': remove_from_target_arr | ||
} | ||
) | ||
}} | ||
|
||
{% endmacro %} | ||
|
||
{% macro process_schema_changes(on_schema_change, source_relation, target_relation) %} | ||
|
||
{% if on_schema_change=='fail' %} | ||
|
||
{{ | ||
exceptions.raise_compiler_error("The source and target schemas on this incremental model are out of sync! | ||
You can specify one of ['fail', 'ignore', 'append_new_columns', 'sync_all_columns'] in the on_schema_change config to control this behavior. | ||
Please re-run the incremental model with full_refresh set to True to update the target schema. | ||
Alternatively, you can update the schema manually and re-run the process.") | ||
}} | ||
|
||
{# unless we ignore, run the sync operation per the config #} | ||
{% else %} | ||
|
||
{% set schema_changes = sync_schemas(source_relation, target_relation, on_schema_change) %} | ||
{% set columns_added = schema_changes['columns_added'] %} | ||
{% set columns_removed = schema_changes['columns_removed'] %} | ||
{% do log('columns added: ' + columns_added|join(', '), info=true) %} | ||
{% do log('columns removed: ' + columns_removed|join(', '), info=true) %} | ||
|
||
{% endif %} | ||
|
||
{% endmacro %} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -128,6 +128,37 @@ | |
{% do adapter.rename_relation(from_relation, to_relation) %} | ||
{% endmacro %} | ||
|
||
{% macro alter_relation_add_remove_columns(relation, add_columns = none, remove_columns = none) -%} | ||
{{ return(adapter.dispatch('alter_relation_add_remove_columns')(relation, add_columns, remove_columns)) }} | ||
{% endmacro %} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can remove this dispatch call and just keep the |
||
|
||
{% macro bigquery__alter_relation_add_remove_columns(relation, add_columns = none, remove_columns = none) -%} | ||
|
||
{% set sql -%} | ||
|
||
alter {{ relation.type }} {{ relation }} | ||
{% if add_columns %} | ||
|
||
{% for column in add_columns %} | ||
add column {{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }} | ||
{% endfor %} | ||
{% endif %} | ||
|
||
{{ ', ' if add_columns and remove_columns }} | ||
|
||
{% if remove_columns %} | ||
{% for column in remove_columns %} | ||
drop column {{ column.name }}{{ ',' if not loop.last }} | ||
{% endfor %} | ||
{% endif %} | ||
|
||
{%- endset %} | ||
|
||
{{ return(run_query(sql)) }} | ||
|
||
{% endmacro %} | ||
|
||
|
||
{% macro bigquery__alter_column_type(relation, column_name, new_column_type) -%} | ||
{# | ||
Changing a column's data type using a query requires you to scan the entire table. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a test run with a column add and column drop, default__alter_relation_add_remove_columns got dispatched to take care of the alters and snowflake did not like the comma separated alters.
Here is the config and sql in database side