-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/incremental non full refresh #3387
Changes from all commits
f9f1a96
82950fa
e06f09f
0be265b
8c531fb
1f014de
59e10ce
e26c220
c073b45
4ab16d5
335eb15
7c7b5a0
1549af6
86a7536
49f5319
d5edd65
3d99fa3
7ba1286
1430834
9b0029f
1f1d65f
52e2c97
4188e08
2dd3756
5f9af84
b55f5c7
4ce23d1
1f7b882
e24ca37
0124efe
66d4dff
105ab4a
7613656
c9ab845
bb9cc46
d695f1b
69d31ba
6270ee5
cd24472
2463b94
4c54429
64b0877
93938f4
8fa33ec
e8b18b5
73f4d31
c32f3eb
bb5eab5
ff3fab8
f32c8b2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
{% macro incremental_validate_on_schema_change(on_schema_change, default='ignore') %} | ||
|
||
{% if on_schema_change not in ['sync_all_columns', 'append_new_columns', 'fail', 'ignore'] %} | ||
|
||
{% set log_message = 'Invalid value for on_schema_change (%s) specified. Setting default value of %s.' % (on_schema_change, default) %} | ||
{% do log(log_message) %} | ||
|
||
{{ return(default) }} | ||
|
||
{% else %} | ||
|
||
{{ return(on_schema_change) }} | ||
|
||
{% endif %} | ||
|
||
{% endmacro %} | ||
|
||
{% macro diff_columns(source_columns, target_columns) %} | ||
|
||
{% set result = [] %} | ||
{% set source_names = source_columns | map(attribute = 'column') | list %} | ||
{% set target_names = target_columns | map(attribute = 'column') | list %} | ||
|
||
{# --check whether the name attribute exists in the target - this does not perform a data type check #} | ||
{% for sc in source_columns %} | ||
{% if sc.name not in target_names %} | ||
{{ result.append(sc) }} | ||
{% endif %} | ||
{% endfor %} | ||
|
||
{{ return(result) }} | ||
|
||
{% endmacro %} | ||
|
||
{% macro diff_column_data_types(source_columns, target_columns) %} | ||
|
||
{% set result = [] %} | ||
{% for sc in source_columns %} | ||
{% set tc = target_columns | selectattr("name", "equalto", sc.name) | list | first %} | ||
{% if tc %} | ||
{% if sc.data_type != tc.data_type %} | ||
{{ result.append( { 'column_name': tc.name, 'new_type': sc.data_type } ) }} | ||
{% endif %} | ||
{% endif %} | ||
{% endfor %} | ||
|
||
{{ return(result) }} | ||
|
||
{% endmacro %} | ||
|
||
|
||
{% macro check_for_schema_changes(source_relation, target_relation) %} | ||
|
||
{% set schema_changed = False %} | ||
|
||
{%- set source_columns = adapter.get_columns_in_relation(source_relation) -%} | ||
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%} | ||
{%- set source_not_in_target = diff_columns(source_columns, target_columns) -%} | ||
{%- set target_not_in_source = diff_columns(target_columns, source_columns) -%} | ||
|
||
{% set new_target_types = diff_column_data_types(source_columns, target_columns) %} | ||
|
||
{% if source_not_in_target != [] %} | ||
{% set schema_changed = True %} | ||
{% elif target_not_in_source != [] or new_target_types != [] %} | ||
{% set schema_changed = True %} | ||
{% elif new_target_types != [] %} | ||
{% set schema_changed = True %} | ||
{% endif %} | ||
|
||
{% set changes_dict = { | ||
'schema_changed': schema_changed, | ||
'source_not_in_target': source_not_in_target, | ||
'target_not_in_source': target_not_in_source, | ||
'new_target_types': new_target_types | ||
} %} | ||
|
||
{% set msg %} | ||
In {{ target_relation }}: | ||
Schema changed: {{ schema_changed }} | ||
Source columns not in target: {{ source_not_in_target }} | ||
Target columns not in source: {{ target_not_in_source }} | ||
New column types: {{ new_target_types }} | ||
{% endset %} | ||
|
||
{% do log(msg) %} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we centralize all schema-change-related logging in this macro, to avoid repeating it in the materialization code? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
{{ return(changes_dict) }} | ||
|
||
{% endmacro %} | ||
|
||
|
||
{% macro sync_column_schemas(on_schema_change, target_relation, schema_changes_dict) %} | ||
|
||
{%- set add_to_target_arr = schema_changes_dict['source_not_in_target'] -%} | ||
|
||
{%- if on_schema_change == 'append_new_columns'-%} | ||
{%- if add_to_target_arr | length > 0 -%} | ||
{%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr, none) -%} | ||
{%- endif -%} | ||
|
||
{% elif on_schema_change == 'sync_all_columns' %} | ||
{%- set remove_from_target_arr = schema_changes_dict['target_not_in_source'] -%} | ||
{%- set new_target_types = schema_changes_dict['new_target_types'] -%} | ||
|
||
{% if add_to_target_arr | length > 0 or remove_from_target_arr | length > 0 %} | ||
{%- do alter_relation_add_remove_columns(target_relation, add_to_target_arr, remove_from_target_arr) -%} | ||
{% endif %} | ||
|
||
{% if new_target_types != [] %} | ||
{% for ntt in new_target_types %} | ||
{% set column_name = ntt['column_name'] %} | ||
{% set new_type = ntt['new_type'] %} | ||
{% do alter_column_type(target_relation, column_name, new_type) %} | ||
{% endfor %} | ||
{% endif %} | ||
|
||
{% endif %} | ||
|
||
{% set schema_change_message %} | ||
In {{ target_relation }}: | ||
Schema change approach: {{ on_schema_change }} | ||
Columns added: {{ add_to_target_arr }} | ||
Columns removed: {{ remove_from_target_arr }} | ||
Data types changed: {{ new_target_types }} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Warning: This should raise an exception There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @b-luu The assumption here is that when someone only wants to append new columns, we don't need to be concerned with existing column data type changes, or columns that have been removed from the upstream model vs. the target incremental model. Those scenarios are handled with the @jtcohen6 Please comment if the above seems off |
||
{% endset %} | ||
|
||
{% do log(schema_change_message) %} | ||
|
||
{% endmacro %} | ||
|
||
|
||
{% macro process_schema_changes(on_schema_change, source_relation, target_relation) %} | ||
|
||
{% if on_schema_change != 'ignore' %} | ||
|
||
{% set schema_changes_dict = check_for_schema_changes(source_relation, target_relation) %} | ||
|
||
{% if schema_changes_dict['schema_changed'] %} | ||
|
||
{% if on_schema_change == 'fail' %} | ||
|
||
{% set fail_msg %} | ||
The source and target schemas on this incremental model are out of sync! | ||
They can be reconciled in several ways: | ||
- set the `on_schema_change` config to either append_new_columns or sync_all_columns, depending on your situation. | ||
- Re-run the incremental model with `full_refresh: True` to update the target schema. | ||
- update the schema manually and re-run the process. | ||
{% endset %} | ||
|
||
{% do exceptions.raise_compiler_error(fail_msg) %} | ||
|
||
{# -- unless we ignore, run the sync operation per the config #} | ||
{% else %} | ||
|
||
{% do sync_column_schemas(on_schema_change, target_relation, schema_changes_dict) %} | ||
|
||
{% endif %} | ||
|
||
{% endif %} | ||
|
||
{% endif %} | ||
|
||
{% endmacro %} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@matt-winkler @jtcohen6 do you have context on why the existing "target" columns are used as
dest_columns
below ?Why not use the "tmp" increment's own columns ?
Sorry, I stumbled on this PR because 1. I was trying to implement schema evolution on incremental runs as well and 2. I had issues with some of my increments not necessarily having all the columns of the existing "target".
So I've used tmp_relation instead in the line below in a custom materialization on my side.
If the reason is unclear, I'll open an issue to have a broader discussion on this topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @b-luu here are my thoughts on use of target columns for each scenario:
ignore
orfail
in the case of a schema change (or leave off theon_schema_change
entirely), we assume that the UPSERT is targeted towards the existing columns in the target table. Basically the thinking here is that the target schema should not be changing under these configuration options.append_new_columns
orsync_all_columns
, the column addition / syncing actions have already been performed by the time we reach this point in the code.@jtcohen6 Please comment if the above seems off