Skip to content

Commit

Permalink
Merge pull request #42 from dbt-labs/feature/revamp-incremental-by-pe…
Browse files Browse the repository at this point in the history
…riod-add-bq

Revamp incremental by period and make it work for BQ
  • Loading branch information
dbeatty10 authored Apr 25, 2024
2 parents d6fe2b9 + 5e62415 commit c845674
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 26 deletions.
1 change: 1 addition & 0 deletions insert_by_period/integration_tests/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ dbt_modules/
logs/
.env/
profiles.yml
package-lock.yml
2 changes: 1 addition & 1 deletion insert_by_period/integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

name: 'dbt_utils_integration_tests'
name: 'insert_by_period_integration_tests'
version: '1.0'

profile: 'integration_tests'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{
config(
materialized = 'view',
enabled=(target.type == 'snowflake')
enabled=(project_name == 'insert_by_period_integration_tests'),
)
}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
config(
materialized = 'insert_by_period',
period = 'month',
timestamp_field = 'created_at',
timestamp_field = 'cast(created_at as timestamp)',
start_date = '2018-01-01',
stop_date = '2018-06-01',
enabled=(target.type == 'snowflake')
enabled=(project_name == 'insert_by_period_integration_tests'),
)
}}

Expand Down
26 changes: 24 additions & 2 deletions insert_by_period/macros/get_period_boundaries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
{% call statement('period_boundaries', fetch_result=True) -%}
with data as (
select
coalesce(max("{{timestamp_field}}"), '{{start_date}}')::timestamp as start_timestamp,
coalesce(max({{timestamp_field}}), '{{start_date}}')::timestamp as start_timestamp,
coalesce(
{{ dateadd('millisecond',
-1,
"nullif('" ~ stop_date ~ "','')::timestamp") }},
{{ dbt.current_timestamp() }}
) as stop_timestamp
from "{{target_schema}}"."{{target_table}}"
from {{adapter.quote(target_schema)}}.{{adapter.quote(target_table)}}
)

select
Expand All @@ -27,3 +27,25 @@
{%- endcall %}

{%- endmacro %}


{% macro bigquery__get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%}

{% call statement('period_boundaries', fetch_result=True) -%}
with data as (
select
coalesce(max({{timestamp_field}}), cast('{{start_date}}' as timestamp)) as start_timestamp,
coalesce(datetime_add(cast(nullif('{{stop_date}}','') as timestamp), interval -1 millisecond), {{dbt.current_timestamp()}}) as stop_timestamp
from {{adapter.quote(target_schema)}}.{{adapter.quote(target_table)}}
)

select
start_timestamp,
stop_timestamp,
{{ datediff('start_timestamp',
'stop_timestamp',
period) }} + 1 as num_periods
from data
{%- endcall %}

{%- endmacro %}
25 changes: 22 additions & 3 deletions insert_by_period/macros/get_period_sql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
{% macro default__get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%}

{%- set period_filter -%}
("{{timestamp_field}}" > '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and
"{{timestamp_field}}" <= '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' + interval '1 {{period}}' and
"{{timestamp_field}}" < '{{stop_timestamp}}'::timestamp)
({{timestamp_field}} > '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and
{{timestamp_field}} <= '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' + interval '1 {{period}}' and
{{timestamp_field}} < '{{stop_timestamp}}'::timestamp)
{%- endset -%}

{%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%}
Expand All @@ -19,3 +19,22 @@
) target_cols

{%- endmacro %}


{% macro bigquery__get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%}

{%- set period_filter -%}
({{timestamp_field}} > cast(cast(timestamp('{{start_timestamp}}') as datetime) + interval {{offset}} {{period}} as timestamp) and
{{timestamp_field}} <= cast(cast(timestamp('{{start_timestamp}}') as datetime) + interval {{offset}} {{period}} + interval 1 {{period}} as timestamp) and
{{timestamp_field}} < cast('{{stop_timestamp}}' as timestamp))
{%- endset -%}

{%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%}

select
{{target_cols_csv}}
from (
{{filtered_sql}}
) target_cols

{%- endmacro %}
34 changes: 17 additions & 17 deletions insert_by_period/macros/insert_by_period_materialization.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,10 @@
{%- set old_relation = none -%}
{%- endif %}

{{run_hooks(pre_hooks, inside_transaction=False)}}
{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `begin` happens here, so `commit` after it to finish the transaction
{{run_hooks(pre_hooks, inside_transaction=True)}}
{% call statement() -%}
begin; -- make extra sure we've closed out the transaction
commit;
{%- endcall %}
-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

-- build model
{% if force_create or old_relation is none -%}
Expand Down Expand Up @@ -97,7 +93,7 @@
(
select
{{target_cols_csv}}
from {{tmp_relation.include(schema=False)}}
from {{tmp_relation.include(schema=True)}}
);
{%- endcall %}
{% set result = load_result('main-' ~ i) %}
Expand All @@ -115,17 +111,21 @@

{%- endfor %}

{% call statement() -%}
begin;
{%- endcall %}
-- from the table mat
{% do create_indexes(target_relation) %}

{{run_hooks(post_hooks, inside_transaction=True)}}
{{ run_hooks(post_hooks, inside_transaction=True) }}

{% call statement() -%}
commit;
{%- endcall %}
{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

-- `COMMIT` happens here
{{ adapter.commit() }}

{{run_hooks(post_hooks, inside_transaction=False)}}
{{ run_hooks(post_hooks, inside_transaction=False) }}
-- end from the table mat

{%- set status_string = "INSERT " ~ loop_vars['sum_rows_inserted'] -%}

Expand All @@ -136,4 +136,4 @@
-- Return the relations created in this materialization
{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}
{%- endmaterialization %}

0 comments on commit c845674

Please sign in to comment.