Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(incremental): optimize 'insert_overwrite' strategy (#1409) #1410

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20241121-191041.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: 'refactor(incremental): optimize ''insert_overwrite'' strategy'
time: 2024-11-21T19:10:41.341213+01:00
custom:
Author: AxelThevenot
Issue: "1409"
Original file line number Diff line number Diff line change
@@ -1,3 +1,41 @@
{% macro bigquery__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) -%}
{#-- The only time include_sql_header is True: --#}
{#-- BigQuery + insert_overwrite strategy + "static" partitions config --#}
{#-- We should consider including the sql header at the materialization level instead --#}

{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none and include_sql_header }}

begin
begin transaction;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a problem with transactions, where other jobs can conflict with it.

For example, if this transaction statement is running and another (normal) statement runs on it, the transaction statement one fails:

https://cloud.google.com/bigquery/docs/transactions#transaction_concurrency

This is different as non transaction queries can run concurrently.

At my company it's relatively common to delete things as part of GDPR, or update late arriving columns in posthooks

I'm not saying this reduction in slot time is not worth this cost of conflicting jobs, but just want to point it out as a past learning! And if there is a non transaction version of this logic, that would swerve the transaction concurrency issue

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also had a problem where we tried a separate DELETE + INSERT without a transaction, and jobs ran in between with no data (especially when the DELETE + INSERT was catching up with the context date in airflow)


delete from {{ target }} as DBT_INTERNAL_DEST
where true
{%- if predicates %}
{% for predicate in predicates %}
and {{ predicate }}
{% endfor %}
{%- endif -%};

insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
);

commit transaction;

exception when error then
-- Roll if any error to avoid deleting rows.
raise using message = @@error.message;
rollback transaction;
end

{% endmacro %}

{% macro bq_generate_incremental_insert_overwrite_build_sql(
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions
) %}
Expand Down