-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/incremental non full refresh #3387
Conversation
Co-authored-by: Jeremy Cohen <jeremy@fishtownanalytics.com>
…ental-non-full-refresh
@jtcohen6 I've run through this for Postgres / Snowflake / Bigquery, exluding the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coming along nicely! Thanks for the work shoring up the progress so far, outside the trickier implications of on_schema_change: full_refresh
.
I left some comments around consolidating logic. I'd like to keep the "footprint" on materializations as light as possible, since that's where we have the least modularity, greatest complexity, and most duplication across adapters.
Let me know how I can help as you get into unit + integration tests!
core/dbt/include/global_project/macros/materializations/incremental/on_schema_change.sql
Outdated
Show resolved
Hide resolved
{% macro get_column_names(columns) %} | ||
|
||
{% set result = [] %} | ||
|
||
{% for col in columns %} | ||
{{ result.append(col.column) }} | ||
{% endfor %} | ||
|
||
{{ return(result) }} | ||
|
||
{% endmacro %} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this macro is effectively the same as columns | map(attribute = 'name')
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Boom, map
for the win.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On further testing, I found that and additional | list
was needed
{% macro alter_relation_add_remove_columns(relation, add_columns = none, remove_columns = none) -%} | ||
{{ return(adapter.dispatch('alter_relation_add_remove_columns')(relation, add_columns, remove_columns)) }} | ||
{% endmacro %} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove this dispatch call and just keep the bigquery__
implementation below
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %} | ||
|
||
{% else %} | ||
{% do adapter.expand_target_column_types( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's always expand_target_column_types
before check_for_schema_changes
, even if it means running a few extra metadata queries, in order to avoid over-aggressively dropping and recreating columns that just need to be varchar/numeric-type expanded
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great call - I tested this on Snowflake and works beautifully.
{% elif target_not_in_source != [] %} | ||
{% set schema_changed = True %} | ||
{% endif %} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we centralize all schema-change-related logging in this macro, to avoid repeating it in the materialization code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
{%- endset -%} | ||
|
||
{% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns) %} | ||
{% set tmp_relation = make_temp_relation(target_relation) %} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stylistically, I think we can reorder the logic below to avoid repetition:
{% set tmp_relation = make_temp_relation(target_relation) %}
{% if on_schema_change != 'ignore' %}
{# only create the temp table if it's needed. can we make this a free query? #}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% set schema_changed = check_for_schema_changes(tmp_relation, target_relation) %}
{% if schema_changed %}
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% endif %}
{% endif %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% set build_sql = dbt_bq_get_incremental_sql(strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns) %}
Substantively, there is something subtle going on here: We're creating a temp table first thing, in order to power schema comparison. I think it's fair to document that the "price" of dynamic schema evolution is the need to produce that temp table, but today, dbt does not produce a temp table for the merge
strategy, and produces that temp table as part of a script in the insert_overwrite
strategy.
I'm wondering if we could adjust the temp table creation to be a "free" ($0) query, i.e. by adding a where false limit 0
condition. So like:
{% set tmp_sql %}
select * from ({{ sql }}) where false limit 0
{% endset %}
{% do run_query(create_table_as(True, tmp_relation, tmp_sql)) %}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented this change on snowflake and rolling to other providers.
{% else %} | ||
{% do run_query(create_table_as(True, tmp_relation, sql)) %} | ||
{% do adapter.expand_target_column_types( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As with the default materialization, let's consolidate this logic to:
- Always expand target column types first thing
- Check for schema changes if and only if
on_schema_change != 'ignore'
. If there are schema changes, process them, endif. - Grab
dest_columns
and setbuild_sql
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is done for snowflake and porting to the other adapters
core/dbt/include/global_project/macros/materializations/incremental/incremental.sql
Outdated
Show resolved
Hide resolved
…ental/incremental.sql Accept Jeremy's suggested change Co-authored-by: Jeremy Cohen <jeremy@fishtownanalytics.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's ship it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is brillant !!
Thanks a lot for your work here @matt-winkler & @jtcohen6 !
(I needed this !)
I've mainly added a note in sync_column_schemas
as I think the current implementation would break if configured in append
mode.
Also a question (for context) in incremental_upsert
, but I can open a new issue for that if preferred ?
@@ -1,5 +1,6 @@ | |||
|
|||
{% macro incremental_upsert(tmp_relation, target_relation, unique_key=none, statement_name="main") %} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@matt-winkler @jtcohen6 do you have context on why the existing "target" columns are used as dest_columns
below ?
Why not use the "tmp" increment's own columns ?
Sorry, I stumbled on this PR because 1. I was trying to implement schema evolution on incremental runs as well and 2. I had issues with some of my increments not necessarily having all the columns of the existing "target".
So I've used tmp_relation instead in the line below in a custom materialization on my side.
If the reason is unclear, I'll open an issue to have a broader discussion on this topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @b-luu here are my thoughts on use of target columns for each scenario:
- If we
ignore
orfail
in the case of a schema change (or leave off theon_schema_change
entirely), we assume that the UPSERT is targeted towards the existing columns in the target table. Basically the thinking here is that the target schema should not be changing under these configuration options. - If we
append_new_columns
orsync_all_columns
, the column addition / syncing actions have already been performed by the time we reach this point in the code. - I'd be interested to see more examples of the issue you describe @b-luu to see if we're missing something here; let us know!
@jtcohen6 Please comment if the above seems off
Schema change approach: {{ on_schema_change }} | ||
Columns added: {{ add_to_target_arr }} | ||
Columns removed: {{ remove_from_target_arr }} | ||
Data types changed: {{ new_target_types }} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Warning: remove_from_target_arr
and new_target_types
are only set if on_schema_change == 'sync_all_columns'
.
This should raise an exception if on_schema_change == 'append_new_columns'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@b-luu The assumption here is that when someone only wants to append new columns, we don't need to be concerned with existing column data type changes, or columns that have been removed from the upstream model vs. the target incremental model. Those scenarios are handled with the sync_all_columns
configuration option.
@jtcohen6 Please comment if the above seems off
* detect and act on schema changes * update incremental helpers code * update changelog * fix error in diff_columns from testing * abstract code a bit further * address matching names vs. data types * Update CHANGELOG.md Co-authored-by: Jeremy Cohen <jeremy@fishtownanalytics.com> * updates from Jeremy's feedback * multi-column add / remove with full_refresh * simple changes from JC's feedback * updated for snowflake * reorganize postgres code * reorganize approach * updated full refresh trigger logic * fixed unintentional wipe behavior * catch final else condition * remove WHERE string replace * touch ups * port core to snowflake * added bigquery code * updated impacted unit tests * updates from linting tests * updates from linting again * snowflake updates from further testing * fix logging * clean up incremental logic * updated for bigquery * update postgres with new strategy * update nodeconfig * starting integration tests * integration test for ignore case * add test for append_new_columns * add integration test for sync * remove extra tests * add unique key and snowflake test * move incremental integration test dir * update integration tests * update integration tests * Suggestions for #3387 (#3558) * PR feedback: rationalize macros + logging, fix + expand tests * Rm alter_column_types, always true for sync_all_columns * update logging and integration test on sync * update integration tests * test fix SF integration tests Co-authored-by: Matt Winkler <matt.winkler@fishtownanalytics.com> * rename integration test folder * Update core/dbt/include/global_project/macros/materializations/incremental/incremental.sql Accept Jeremy's suggested change Co-authored-by: Jeremy Cohen <jeremy@fishtownanalytics.com> * Update changelog [skip ci] Co-authored-by: Jeremy Cohen <jeremy@fishtownanalytics.com> automatic commit by git-black, original commits: 2799a8c bd70106
* detect and act on schema changes * update incremental helpers code * update changelog * fix error in diff_columns from testing * abstract code a bit further * address matching names vs. data types * Update CHANGELOG.md Co-authored-by: Jeremy Cohen <jeremy@fishtownanalytics.com> * updates from Jeremy's feedback * multi-column add / remove with full_refresh * simple changes from JC's feedback * updated for snowflake * reorganize postgres code * reorganize approach * updated full refresh trigger logic * fixed unintentional wipe behavior * catch final else condition * remove WHERE string replace * touch ups * port core to snowflake * added bigquery code * updated impacted unit tests * updates from linting tests * updates from linting again * snowflake updates from further testing * fix logging * clean up incremental logic * updated for bigquery * update postgres with new strategy * update nodeconfig * starting integration tests * integration test for ignore case * add test for append_new_columns * add integration test for sync * remove extra tests * add unique key and snowflake test * move incremental integration test dir * update integration tests * update integration tests * Suggestions for #3387 (#3558) * PR feedback: rationalize macros + logging, fix + expand tests * Rm alter_column_types, always true for sync_all_columns * update logging and integration test on sync * update integration tests * test fix SF integration tests Co-authored-by: Matt Winkler <matt.winkler@fishtownanalytics.com> * rename integration test folder * Update core/dbt/include/global_project/macros/materializations/incremental/incremental.sql Accept Jeremy's suggested change Co-authored-by: Jeremy Cohen <jeremy@fishtownanalytics.com> * Update changelog [skip ci] Co-authored-by: Jeremy Cohen <jeremy@fishtownanalytics.com> automatic commit by git-black, original commits: bd70106
addresses #1132
Description
Goal is to identify whether there are different schemas in the source and target relations during an incremental run. The allowed behaviors are ['ignore', 'fail', 'append', 'sync'] specified in a new config option called on_schema_change on the incremental.sql materialization.
New macros added to /core/dbt/include/global_project/macros/materializations/incremental/on_schema_change.sql:
incremental_validate_on_schema_change
: defaults to 'ignore' if None or invalid values are specifiedsync_schemas
: runs ALTER TABLE statements depending on the value of on_schema_change and whether the source / target column sets differ.process_schema_changes
: orchestrator macro that determines which others in this module should be called based on configuration.get_column_names
: Extracts col.column values from a list of columnsdiff_arrays
: detects the difference between two arraysdiff_columns
: detects the difference between two arrays ofColumns
, and callsdiff_arrays
check_for_schema_changes
: Determines if a source and target schema are different due to the presence / absence of columns and / or data type differencesChecklist
CHANGELOG.md
and added information about my change to the "dbt next" section.