Skip to content

Commit

Permalink
substrategy
Browse files Browse the repository at this point in the history
fixtures

Create Features-20241229-174752.yaml
  • Loading branch information
borjavb committed Dec 30, 2024
1 parent f00bc38 commit e26a05f
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 67 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241229-174752.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: add multiple substrategies for insert_overwrite and microbatch
time: 2024-12-29T17:47:52.647374Z
custom:
Author: borjavb
Issue: "1409"
49 changes: 24 additions & 25 deletions dbt/include/bigquery/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,27 @@
{% endmacro %}


{% macro dbt_bigquery_validate_insert_overwrite_fn(config, strategy) %}
{#-- Find and validate the function used for insert_overwrite #}
{%- set insert_overwrite_fn = config.get('insert_overwrite_fn', none) -%}
{%- set default_fn = 'merge' -%}
{% if insert_overwrite_fn is none and strategy in ['insert_overwrite','microbatch']%}
{{return (default_fn)}}
{% elif insert_overwrite_fn is not in ["delete+insert"] and strategy in ['insert_overwrite','microbatch']%}
{% macro dbt_bigquery_validate_incremental_substrategy(config, strategy, copy_partitions) %}
{#-- Find and validate the function used for insert_overwrite
Legacy behaviour was to pass the copy_partitions as part of the `partition_by` clause
So we need to bring back that optionality into this validation.
#}
{%- set incremental_substrategy = config.get('incremental_substrategy', 'copy_partitions' if copy_partitions else 'merge') -%}

{% if strategy in ['insert_overwrite', 'microbatch'] %}
{% if incremental_substrategy not in ['merge', 'delete+insert', 'copy_partitions'] %}
{% set wrong_fn -%}
The 'insert_overwrite_fn' option has to be either 'merge' (default) or 'delete+insert'.
The 'incremental_substrategy' option has to be either 'merge' (default), 'delete+insert' or 'copy_partitions'.
{%- endset %}
{% do exceptions.raise_compiler_error(wrong_strategy_msg) %}
{% elif insert_overwrite_fn is not none and strategy not ['insert_overwrite','microbatch'] %}
{% endif %}
{% elif incremental_substrategy is not none%}
{% set wrong_strategy_msg -%}
The 'insert_overwrite_fn' option requires the 'incremental_strategy' option to be set to 'insert_overwrite' or 'microbatch'.
The 'incremental_substrategy' option requires the 'incremental_strategy' option to be set to 'insert_overwrite' or 'microbatch'.
{%- endset %}
{% do exceptions.raise_compiler_error(wrong_strategy_msg) %}
{% else %}
{% return(insert_overwrite_fn) %}
{% endif %}
{% endif %}
{{ return(incremental_substrategy) }}
{% endmacro %}

{% macro source_sql_with_partition(partition_by, source_sql) %}
Expand Down Expand Up @@ -65,19 +67,19 @@
{% endmacro %}

{% macro bq_generate_incremental_build_sql(
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, incremental_predicates, insert_overwrite_fn
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy, incremental_predicates
) %}
{#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#}
{% if strategy == 'insert_overwrite' %}

{% set build_sql = bq_generate_incremental_insert_overwrite_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
) %}

{% elif strategy == 'microbatch' %}

{% set build_sql = bq_generate_microbatch_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
) %}

{% else %} {# strategy == 'merge' #}
Expand All @@ -103,14 +105,16 @@

{#-- Validate early so we don't run SQL if the strategy is invalid --#}
{% set strategy = dbt_bigquery_validate_get_incremental_strategy(config) -%}
{#-- Validate early that the fn strategy is set correctly for insert_overwrite--#}
{% set insert_overwrite_fn = dbt_bigquery_validate_insert_overwrite_fn(config, strategy) -%}

{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
{%- set partitions = config.get('partitions', none) -%}
{%- set cluster_by = config.get('cluster_by', none) -%}

{#-- Validate early that the incremental substrategy is set correctly for insert_overwrite or microbatch--#}
{% set incremental_substrategy = dbt_bigquery_validate_incremental_substrategy(config, strategy, partition_by.copy_partitions) -%}


{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
{% set incremental_predicates = config.get('predicates', default=none) or config.get('incremental_predicates', default=none) %}

Expand All @@ -119,13 +123,8 @@

{{ run_hooks(pre_hooks) }}

{% if partition_by.copy_partitions is true and strategy not in ['insert_overwrite', 'microbatch'] %} {#-- We can't copy partitions with merge strategy --#}
{% set wrong_strategy_msg -%}
The 'copy_partitions' option requires the 'incremental_strategy' option to be set to 'insert_overwrite' or 'microbatch'.
{%- endset %}
{% do exceptions.raise_compiler_error(wrong_strategy_msg) %}

{% elif existing_relation is none %}
{% if existing_relation is none %}
{%- call statement('main', language=language) -%}
{{ bq_create_table_as(partition_by, False, target_relation, compiled_code, language) }}
{%- endcall -%}
Expand Down Expand Up @@ -177,7 +176,7 @@
{% endif %}

{% set build_sql = bq_generate_incremental_build_sql(
strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, partition_by.copy_partitions, incremental_predicates, insert_overwrite_fn
strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy,
) %}

{%- call statement('main') -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% macro bq_generate_incremental_insert_overwrite_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
) %}
{% if partition_by is none %}
{% set missing_partition_msg -%}
Expand All @@ -9,7 +9,7 @@
{% endif %}

{% set build_sql = bq_insert_overwrite_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
) %}

{{ return(build_sql) }}
Expand Down Expand Up @@ -38,17 +38,17 @@
{% endmacro %}

{% macro bq_insert_overwrite_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
) %}
{% if partitions is not none and partitions != [] %} {# static #}
{{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn) }}
{{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy) }}
{% else %} {# dynamic #}
{{ bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn) }}
{{ bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, incremental_substrategy) }}
{% endif %}
{% endmacro %}

{% macro bq_static_insert_overwrite_sql(
tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
) %}

{% set predicate -%}
Expand All @@ -75,35 +75,35 @@
)
{%- endset -%}

{% if copy_partitions %}
{% if incremental_substrategy == 'copy_partitions' %}
{% do bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) %}
{% else %}

{#-- In case we're putting the model SQL _directly_ into the MERGE statement,
we need to prepend the MERGE statement with the user-configured sql_header,
which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header)
in the "temporary table exists" case, we save the model SQL result as a temp table first, wherein the
sql_header is included by the create_table_as macro.
#}
{% if insert_overwrite_fn == 'delete+insert' %}
-- 1. run insert_overwrite with delete+insert transaction strategy optimisation
{{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};
{% else %}
-- 1. run insert_overwrite with merge strategy optimisation
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};
{% endif %}

{%- if tmp_relation_exists -%}
-- 2. clean up the temp table
drop table if exists {{ tmp_relation }};
{%- endif -%}
{#-- In case we're putting the model SQL _directly_ into the MERGE/insert+delete transaction,
we need to prepend the merge/transaction statement with the user-configured sql_header,
which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header)
in the "temporary table exists" case, we save the model SQL result as a temp table first, wherein the
sql_header is included by the create_table_as macro.
#}

{% if incremental_substrategy == 'delete+insert' %}
-- 1. run insert_overwrite with delete+insert transaction strategy optimisation
{{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};
{% else %}
-- 1. run insert_overwrite with merge strategy optimisation
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }};
{% endif %}

{%- if tmp_relation_exists -%}
-- 2. clean up the temp table
drop table if exists {{ tmp_relation }};
{%- endif -%}

{% endif %}
{% endif %}
{% endmacro %}

{% macro bq_dynamic_copy_partitions_insert_overwrite_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions
tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists
) %}
{%- if tmp_relation_exists is false -%}
{# We run temp table creation in a separated script to move to partitions copy if it does not already exist #}
Expand All @@ -123,9 +123,9 @@
drop table if exists {{ tmp_relation }}
{% endmacro %}

{% macro bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) %}
{%- if copy_partitions is true %}
{{ bq_dynamic_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }}
{% macro bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, incremental_substrategy) %}
{% if incremental_substrategy == 'copy_partitions' %}
{{ bq_dynamic_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists) }}
{% else -%}
{% set predicate -%}
{{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
Expand Down Expand Up @@ -161,7 +161,7 @@
from {{ tmp_relation }}
);

{% if insert_overwrite_fn == 'delete+insert' %}
{% if incremental_substrategy == 'delete+insert' %}
-- 3. run insert_overwrite with the delete+insert transaction strategy optimisation
{{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate]) }};
{% else %}
Expand All @@ -185,11 +185,11 @@
{{ sql_header if sql_header is not none and include_sql_header }}

begin
begin transaction;
begin transaction;

-- (as of Nov 2024)
-- DELETE operations are free if the partition is a DATE
-- * Not free if the partitions are granular (hourly, monthly)
-- DELETE operations are free if the partition is a DATE
-- * Not free if the partitions are granular (hourly, monthly)
-- or some other conditions like subqueries and so on.
delete from {{ target }} as DBT_INTERNAL_DEST
where true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
{% endmacro %}

{% macro bq_generate_microbatch_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
) %}
{% set build_sql = bq_insert_overwrite_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy
) %}

{{ return(build_sql) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,105 @@
config(
materialized="incremental",
incremental_strategy="insert_overwrite",
insert_overwrite_fn='delete+insert',
incremental_substrategy='delete+insert',
cluster_by="id",
partition_by={
"field": "date_time",
"data_type": "datetime",
"granularity": "day"
},
partitions=partitions_to_replace,
on_schema_change="sync_all_columns"
)
}}
with data as (
{% if not is_incremental() %}
select 1 as id, cast('2020-01-01' as datetime) as date_time union all
select 2 as id, cast('2020-01-01' as datetime) as date_time union all
select 3 as id, cast('2020-01-01' as datetime) as date_time union all
select 4 as id, cast('2020-01-01' as datetime) as date_time
{% else %}
-- we want to overwrite the 4 records in the 2020-01-01 partition
-- with the 2 records below, but add two more in the 2020-01-02 partition
select 10 as id, cast('2020-01-01' as datetime) as date_time union all
select 20 as id, cast('2020-01-01' as datetime) as date_time union all
select 30 as id, cast('2020-01-02' as datetime) as date_time union all
select 40 as id, cast('2020-01-02' as datetime) as date_time
{% endif %}
)
select * from data
""".lstrip()


overwrite_static_day_merge_sub_strategy_sql = """
{% set partitions_to_replace = [
"'2020-01-01'",
"'2020-01-02'",
] %}
{{
config(
materialized="incremental",
incremental_strategy="insert_overwrite",
incremental_substrategy='merge',
cluster_by="id",
partition_by={
"field": "date_time",
"data_type": "datetime",
"granularity": "day"
},
partitions=partitions_to_replace,
on_schema_change="sync_all_columns"
)
}}
with data as (
{% if not is_incremental() %}
select 1 as id, cast('2020-01-01' as datetime) as date_time union all
select 2 as id, cast('2020-01-01' as datetime) as date_time union all
select 3 as id, cast('2020-01-01' as datetime) as date_time union all
select 4 as id, cast('2020-01-01' as datetime) as date_time
{% else %}
-- we want to overwrite the 4 records in the 2020-01-01 partition
-- with the 2 records below, but add two more in the 2020-01-02 partition
select 10 as id, cast('2020-01-01' as datetime) as date_time union all
select 20 as id, cast('2020-01-01' as datetime) as date_time union all
select 30 as id, cast('2020-01-02' as datetime) as date_time union all
select 40 as id, cast('2020-01-02' as datetime) as date_time
{% endif %}
)
select * from data
""".lstrip()


overwrite_static_day_copy_partitions_sub_strategy_sql = """
{% set partitions_to_replace = [
"'2020-01-01'",
"'2020-01-02'",
] %}
{{
config(
materialized="incremental",
incremental_strategy="insert_overwrite",
incremental_substrategy='copy_partitions',
cluster_by="id",
partition_by={
"field": "date_time",
Expand Down
Loading

0 comments on commit e26a05f

Please sign in to comment.