diff --git a/.changes/unreleased/Features-20241229-174752.yaml b/.changes/unreleased/Features-20241229-174752.yaml new file mode 100644 index 000000000..6d2a01ae8 --- /dev/null +++ b/.changes/unreleased/Features-20241229-174752.yaml @@ -0,0 +1,6 @@ +kind: Features +body: add multiple substrategies for insert_overwrite and microbatch +time: 2024-12-29T17:47:52.647374Z +custom: + Author: borjavb + Issue: "1409" diff --git a/dbt/include/bigquery/macros/materializations/incremental.sql b/dbt/include/bigquery/macros/materializations/incremental.sql index a399b44d0..fb3555ebe 100644 --- a/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/dbt/include/bigquery/macros/materializations/incremental.sql @@ -18,25 +18,27 @@ {% endmacro %} -{% macro dbt_bigquery_validate_insert_overwrite_fn(config, strategy) %} - {#-- Find and validate the function used for insert_overwrite #} - {%- set insert_overwrite_fn = config.get('insert_overwrite_fn', none) -%} - {%- set default_fn = 'merge' -%} - {% if insert_overwrite_fn is none and strategy in ['insert_overwrite','microbatch']%} - {{return (default_fn)}} - {% elif insert_overwrite_fn is not in ["delete+insert"] and strategy in ['insert_overwrite','microbatch']%} +{% macro dbt_bigquery_validate_incremental_substrategy(config, strategy, copy_partitions) %} + {#-- Find and validate the function used for insert_overwrite + Legacy behaviour was to pass the copy_partitions as part of the `partition_by` clause + So we need to bring back that optionality into this validation. + #} + {%- set incremental_substrategy = config.get('incremental_substrategy', 'copy_partitions' if copy_partitions else 'merge') -%} + + {% if strategy in ['insert_overwrite', 'microbatch'] %} + {% if incremental_substrategy not in ['merge', 'delete+insert', 'copy_partitions'] %} {% set wrong_fn -%} - The 'insert_overwrite_fn' option has to be either 'merge' (default) or 'delete+insert'. + The 'incremental_substrategy' option has to be either 'merge' (default), 'delete+insert' or 'copy_partitions'. {%- endset %} {% do exceptions.raise_compiler_error(wrong_strategy_msg) %} - {% elif insert_overwrite_fn is not none and strategy not ['insert_overwrite','microbatch'] %} + {% endif %} + {% elif incremental_substrategy is not none%} {% set wrong_strategy_msg -%} - The 'insert_overwrite_fn' option requires the 'incremental_strategy' option to be set to 'insert_overwrite' or 'microbatch'. + The 'incremental_substrategy' option requires the 'incremental_strategy' option to be set to 'insert_overwrite' or 'microbatch'. {%- endset %} {% do exceptions.raise_compiler_error(wrong_strategy_msg) %} - {% else %} - {% return(insert_overwrite_fn) %} - {% endif %} + {% endif %} + {{ return(incremental_substrategy) %}} {% endmacro %} {% macro source_sql_with_partition(partition_by, source_sql) %} @@ -65,19 +67,19 @@ {% endmacro %} {% macro bq_generate_incremental_build_sql( - strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, incremental_predicates, insert_overwrite_fn + strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy, incremental_predicates, ) %} {#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#} {% if strategy == 'insert_overwrite' %} {% set build_sql = bq_generate_incremental_insert_overwrite_build_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {% elif strategy == 'microbatch' %} {% set build_sql = bq_generate_microbatch_build_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {% else %} {# strategy == 'merge' #} @@ -103,14 +105,16 @@ {#-- Validate early so we don't run SQL if the strategy is invalid --#} {% set strategy = dbt_bigquery_validate_get_incremental_strategy(config) -%} - {#-- Validate early that the fn strategy is set correctly for insert_overwrite--#} - {% set insert_overwrite_fn = dbt_bigquery_validate_insert_overwrite_fn(config, strategy) -%} {%- set raw_partition_by = config.get('partition_by', none) -%} {%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%} {%- set partitions = config.get('partitions', none) -%} {%- set cluster_by = config.get('cluster_by', none) -%} + {#-- Validate early that the incremental substrategy is set correctly for insert_overwrite or microbatch--#} + {% set incremental_substrategy = dbt_bigquery_validate_incremental_substrategy(config, strategy, partition_by.copy_partitions) -%} + + {% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %} {% set incremental_predicates = config.get('predicates', default=none) or config.get('incremental_predicates', default=none) %} @@ -119,13 +123,8 @@ {{ run_hooks(pre_hooks) }} - {% if partition_by.copy_partitions is true and strategy not in ['insert_overwrite', 'microbatch'] %} {#-- We can't copy partitions with merge strategy --#} - {% set wrong_strategy_msg -%} - The 'copy_partitions' option requires the 'incremental_strategy' option to be set to 'insert_overwrite' or 'microbatch'. - {%- endset %} - {% do exceptions.raise_compiler_error(wrong_strategy_msg) %} - {% elif existing_relation is none %} + {% if existing_relation is none %} {%- call statement('main', language=language) -%} {{ bq_create_table_as(partition_by, False, target_relation, compiled_code, language) }} {%- endcall -%} @@ -177,7 +176,7 @@ {% endif %} {% set build_sql = bq_generate_incremental_build_sql( - strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, partition_by.copy_partitions, incremental_predicates, insert_overwrite_fn + strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy, incremental_predicates, ) %} {%- call statement('main') -%} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql index 01bb8307f..f955c265b 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql @@ -1,5 +1,5 @@ {% macro bq_generate_incremental_insert_overwrite_build_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {% if partition_by is none %} {% set missing_partition_msg -%} @@ -9,7 +9,7 @@ {% endif %} {% set build_sql = bq_insert_overwrite_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {{ return(build_sql) }} @@ -38,17 +38,17 @@ {% endmacro %} {% macro bq_insert_overwrite_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {% if partitions is not none and partitions != [] %} {# static #} - {{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn) }} + {{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy) }} {% else %} {# dynamic #} - {{ bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn) }} + {{ bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, incremental_substrategy) }} {% endif %} {% endmacro %} {% macro bq_static_insert_overwrite_sql( - tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn + tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {% set predicate -%} @@ -75,35 +75,35 @@ ) {%- endset -%} - {% if copy_partitions %} + {% if incremental_substrategy == 'copy_partitions' %} {% do bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) %} {% else %} - {#-- In case we're putting the model SQL _directly_ into the MERGE statement, - we need to prepend the MERGE statement with the user-configured sql_header, - which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header) - in the "temporary table exists" case, we save the model SQL result as a temp table first, wherein the - sql_header is included by the create_table_as macro. - #} - - {% if insert_overwrite_fn == 'delete+insert' %} - -- 1. run insert_overwrite with delete+insert transaction strategy optimisation - {{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }}; - {% else %} - -- 1. run insert_overwrite with merge strategy optimisation - {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }}; - {% endif %} - - {%- if tmp_relation_exists -%} - -- 2. clean up the temp table - drop table if exists {{ tmp_relation }}; - {%- endif -%} + {#-- In case we're putting the model SQL _directly_ into the MERGE/insert+delete transaction, + we need to prepend the merge/transaction statement with the user-configured sql_header, + which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header) + in the "temporary table exists" case, we save the model SQL result as a temp table first, wherein the + sql_header is included by the create_table_as macro. + #} + + {% if incremental_substrategy == 'delete+insert' %} + -- 1. run insert_overwrite with delete+insert transaction strategy optimisation + {{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }}; + {% else %} + -- 1. run insert_overwrite with merge strategy optimisation + {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }}; + {% endif %} + + {%- if tmp_relation_exists -%} + -- 2. clean up the temp table + drop table if exists {{ tmp_relation }}; + {%- endif -%} - {% endif %} + {% endif %} {% endmacro %} {% macro bq_dynamic_copy_partitions_insert_overwrite_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions + tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists ) %} {%- if tmp_relation_exists is false -%} {# We run temp table creation in a separated script to move to partitions copy if it does not already exist #} @@ -123,9 +123,9 @@ drop table if exists {{ tmp_relation }} {% endmacro %} -{% macro bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) %} - {%- if copy_partitions is true %} - {{ bq_dynamic_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }} +{% macro bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, incremental_substrategy) %} + {% if incremental_substrategy == 'copy_partitions' %} + {{ bq_dynamic_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists) }} {% else -%} {% set predicate -%} {{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement) @@ -161,7 +161,7 @@ from {{ tmp_relation }} ); - {% if insert_overwrite_fn == 'delete+insert' %} + {% if incremental_substrategy == 'delete+insert' %} -- 3. run insert_overwrite with the delete+insert transaction strategy optimisation {{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate]) }}; {% else %} @@ -185,11 +185,11 @@ {{ sql_header if sql_header is not none and include_sql_header }} begin - begin transaction; + begin transaction; -- (as of Nov 2024) - -- DELETE operations are free if the partition is a DATE - -- * Not free if the partitions are granular (hourly, monthly) + -- DELETE operations are free if the partition is a DATE + -- * Not free if the partitions are granular (hourly, monthly) -- or some other conditions like subqueries and so on. delete from {{ target }} as DBT_INTERNAL_DEST where true diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql index f2c585c25..21c52c4c6 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql @@ -18,10 +18,10 @@ {% endmacro %} {% macro bq_generate_microbatch_build_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {% set build_sql = bq_insert_overwrite_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {{ return(build_sql) }} diff --git a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py index c710685b3..d6f0f6a6f 100644 --- a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py +++ b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py @@ -676,7 +676,105 @@ config( materialized="incremental", incremental_strategy="insert_overwrite", - insert_overwrite_fn='delete+insert', + incremental_substrategy='delete+insert', + cluster_by="id", + partition_by={ + "field": "date_time", + "data_type": "datetime", + "granularity": "day" + }, + partitions=partitions_to_replace, + on_schema_change="sync_all_columns" + ) +}} + + +with data as ( + + {% if not is_incremental() %} + + select 1 as id, cast('2020-01-01' as datetime) as date_time union all + select 2 as id, cast('2020-01-01' as datetime) as date_time union all + select 3 as id, cast('2020-01-01' as datetime) as date_time union all + select 4 as id, cast('2020-01-01' as datetime) as date_time + + {% else %} + + -- we want to overwrite the 4 records in the 2020-01-01 partition + -- with the 2 records below, but add two more in the 2020-01-02 partition + select 10 as id, cast('2020-01-01' as datetime) as date_time union all + select 20 as id, cast('2020-01-01' as datetime) as date_time union all + select 30 as id, cast('2020-01-02' as datetime) as date_time union all + select 40 as id, cast('2020-01-02' as datetime) as date_time + + {% endif %} + +) + +select * from data +""".lstrip() + + +overwrite_static_day_merge_sub_strategy_sql = """ +{% set partitions_to_replace = [ + "'2020-01-01'", + "'2020-01-02'", +] %} + +{{ + config( + materialized="incremental", + incremental_strategy="insert_overwrite", + incremental_substrategy='merge', + cluster_by="id", + partition_by={ + "field": "date_time", + "data_type": "datetime", + "granularity": "day" + }, + partitions=partitions_to_replace, + on_schema_change="sync_all_columns" + ) +}} + + +with data as ( + + {% if not is_incremental() %} + + select 1 as id, cast('2020-01-01' as datetime) as date_time union all + select 2 as id, cast('2020-01-01' as datetime) as date_time union all + select 3 as id, cast('2020-01-01' as datetime) as date_time union all + select 4 as id, cast('2020-01-01' as datetime) as date_time + + {% else %} + + -- we want to overwrite the 4 records in the 2020-01-01 partition + -- with the 2 records below, but add two more in the 2020-01-02 partition + select 10 as id, cast('2020-01-01' as datetime) as date_time union all + select 20 as id, cast('2020-01-01' as datetime) as date_time union all + select 30 as id, cast('2020-01-02' as datetime) as date_time union all + select 40 as id, cast('2020-01-02' as datetime) as date_time + + {% endif %} + +) + +select * from data +""".lstrip() + + +overwrite_static_day_copy_partitions_sub_strategy_sql = """ +{% set partitions_to_replace = [ + "'2020-01-01'", + "'2020-01-02'", +] %} + +{{ + config( + materialized="incremental", + incremental_strategy="insert_overwrite", + incremental_substrategy='copy_partitions', cluster_by="id", partition_by={ "field": "date_time", diff --git a/tests/functional/adapter/incremental/test_incremental_strategies.py b/tests/functional/adapter/incremental/test_incremental_strategies.py index fbef2a794..8c0528807 100644 --- a/tests/functional/adapter/incremental/test_incremental_strategies.py +++ b/tests/functional/adapter/incremental/test_incremental_strategies.py @@ -28,6 +28,8 @@ overwrite_day_with_time_partition_datetime_sql, overwrite_static_day_sql, overwrite_static_day_delete_and_insert_sub_strategy_sql, + overwrite_static_day_merge_sub_strategy_sql, + overwrite_static_day_copy_partitions_sub_strategy_sql, ) @@ -50,8 +52,10 @@ def models(self): "incremental_overwrite_time.sql": overwrite_time_sql, "incremental_overwrite_day_with_time_partition.sql": overwrite_day_with_time_ingestion_sql, "incremental_overwrite_day_with_time_partition_datetime.sql": overwrite_day_with_time_partition_datetime_sql, - "incremental_overwrite_static_day.sql": overwrite_static_day_sql, - "incremental_overwrite_static_day_with_deleteinsert.sql": overwrite_static_day_delete_and_insert_sub_strategy_sql + "incremental_overwrite_static_substrategy_day.sql": overwrite_static_day_sql, + "incremental_overwrite_static_substrategy_day_with_deleteinsert.sql": overwrite_static_day_delete_and_insert_sub_strategy_sql, + "incremental_overwrite_static_substrategy_day_with_merge.sql": overwrite_static_day_merge_sub_strategy_sql, + "incremental_overwrite_static_substrategy_day_with_copy_partitions.sql": overwrite_static_day_copy_partitions_sub_strategy_sql, } @pytest.fixture(scope="class") @@ -86,8 +90,13 @@ def test__bigquery_assert_incremental_configurations_apply_the_right_strategy(se "incremental_overwrite_day_with_time_partition_datetime", "incremental_overwrite_day_with_time_partition_expected", ), - ("incremental_overwrite_static_day", "incremental_overwrite_day_expected"), - ("incremental_overwrite_static_day_with_deleteinsert", "incremental_overwrite_day_expected"), + ("incremental_overwrite_static_substrategy_day", "incremental_overwrite_day_expected"), + ( + "incremental_overwrite_static_substrategy_day_with_deleteinsert", + "incremental_overwrite_static_substrategy_day_with_merge", + "incremental_overwrite_static_substrategy_day_with_copy_partitions", + "incremental_overwrite_day_expected", + ), ] db_with_schema = f"{project.database}.{project.test_schema}" for incremental_strategy in incremental_strategies: