From 93156cab625110acdfec796ceca78ab7e9ecc350 Mon Sep 17 00:00:00 2001 From: Antonio Papa Date: Sat, 9 Mar 2024 08:40:00 -0500 Subject: [PATCH 01/12] Allows snapshots to have multiple unique keys --- .../materializations/snapshots/helpers.sql | 84 ++++++++++++++++--- .../materializations/snapshots/snapshot.sql | 18 ++-- .../materializations/snapshots/strategies.sql | 30 +++++-- 3 files changed, 105 insertions(+), 27 deletions(-) diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index 7fd4bfd51..c9c37dee2 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -45,7 +45,14 @@ snapshotted_data as ( select *, + {% if strategy.unique_key is sequence and strategy.unique_key is not mapping and strategy.unique_key is not string %} + {% for key in strategy.unique_key %} + {{ key }} as dbt_unique_key_{{ loop.index }} + {%- if not loop.last %} , {%- endif %} + {% endfor %} + {% else %} {{ strategy.unique_key }} as dbt_unique_key + {% endif %} from {{ target_relation }} where dbt_valid_to is null @@ -56,7 +63,13 @@ select *, - {{ strategy.unique_key }} as dbt_unique_key, + {% if strategy.unique_key is sequence and strategy.unique_key is not mapping and strategy.unique_key is not string %} + {% for key in strategy.unique_key %} + {{ key }} as dbt_unique_key_{{ loop.index }}, + {% endfor %} + {% else %} + {{ strategy.unique_key }} as dbt_unique_key, + {% endif %} {{ strategy.updated_at }} as dbt_updated_at, {{ strategy.updated_at }} as dbt_valid_from, nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to, @@ -69,7 +82,13 @@ select *, - {{ strategy.unique_key }} as dbt_unique_key, + {% if strategy.unique_key is sequence and strategy.unique_key is not mapping and strategy.unique_key is not string %} + {% for key in strategy.unique_key %} + {{ key }} as dbt_unique_key_{{ loop.index }}, + {% endfor %} + {% else %} + {{ strategy.unique_key }} as dbt_unique_key, + {% endif %} {{ strategy.updated_at }} as dbt_updated_at, {{ strategy.updated_at }} as dbt_valid_from, {{ strategy.updated_at }} as dbt_valid_to @@ -83,7 +102,14 @@ select *, - {{ strategy.unique_key }} as dbt_unique_key + {% if strategy.unique_key is sequence and strategy.unique_key is not mapping and strategy.unique_key is not string %} + {% for key in strategy.unique_key %} + {{ key }} as dbt_unique_key_{{ loop.index }} + {%- if not loop.last %} , {%- endif %} + {% endfor %} + {% else %} + {{ strategy.unique_key }} as dbt_unique_key + {% endif %} from snapshot_query ), {% endif %} @@ -95,13 +121,28 @@ source_data.* from insertions_source_data as source_data - left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key - where snapshotted_data.dbt_unique_key is null - or ( - snapshotted_data.dbt_unique_key is not null - and ( - {{ strategy.row_changed }} - ) + left outer join snapshotted_data on + {% if strategy.unique_key is sequence and strategy.unique_key is not mapping and strategy.unique_key is not string %} + {% for key in strategy.unique_key %} + snapshotted_data.dbt_unique_key_{{ loop.index }} = source_data.dbt_unique_key_{{ loop.index }} + {%- if not loop.last %} and {%- endif %} + {% endfor %} + where snapshotted_data.dbt_unique_key_1 is null + or ( + snapshotted_data.dbt_unique_key_1 is not null + and ( + {{ strategy.row_changed }} + ) + {% else %} + snapshotted_data.dbt_unique_key = source_data.dbt_unique_key + where snapshotted_data.dbt_unique_key is null + or ( + snapshotted_data.dbt_unique_key is not null + and ( + {{ strategy.row_changed }} + ) + {% endif %} + ) ), @@ -114,7 +155,15 @@ snapshotted_data.dbt_scd_id from updates_source_data as source_data - join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key + join snapshotted_data on + {% if strategy.unique_key is sequence and strategy.unique_key is not mapping and strategy.unique_key is not string %} + {% for key in strategy.unique_key %} + snapshotted_data.dbt_unique_key_{{ loop.index }} = source_data.dbt_unique_key_{{ loop.index }} + {%- if not loop.last %} and {%- endif %} + {% endfor %} + {% else %} + snapshotted_data.dbt_unique_key = source_data.dbt_unique_key + {% endif %} where ( {{ strategy.row_changed }} ) @@ -134,8 +183,17 @@ snapshotted_data.dbt_scd_id from snapshotted_data - left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key - where source_data.dbt_unique_key is null + left join deletes_source_data as source_data on + {% if strategy.unique_key is sequence and strategy.unique_key is not mapping and strategy.unique_key is not string %} + {% for key in strategy.unique_key %} + snapshotted_data.dbt_unique_key_{{ loop.index }} = source_data.dbt_unique_key_{{ loop.index }} + {%- if not loop.last %} and {%- endif %} + {% endfor %} + where source_data.dbt_unique_key_1 is null + {% else %} + snapshotted_data.dbt_unique_key = source_data.dbt_unique_key + where source_data.dbt_unique_key is null + {% endif %} ) {%- endif %} diff --git a/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql b/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql index b0fe9222a..776502d8b 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql @@ -41,20 +41,22 @@ {% do adapter.expand_target_column_types(from_relation=staging_table, to_relation=target_relation) %} + {% set remove_columns = ['dbt_change_type', 'DBT_CHANGE_TYPE', 'dbt_unique_key', 'DBT_UNIQUE_KEY'] %} + {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %} + {% for key in strategy.unique_key %} + {{ remove_columns.append('dbt_unique_key_' + loop.index|string) }} + {{ remove_columns.append('DBT_UNIQUE_KEY_' + loop.index|string) }} + {% endfor %} + {% endif %} + {% set missing_columns = adapter.get_missing_columns(staging_table, target_relation) - | rejectattr('name', 'equalto', 'dbt_change_type') - | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') - | rejectattr('name', 'equalto', 'dbt_unique_key') - | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') + | rejectattr('name', 'in', remove_columns) | list %} {% do create_columns(target_relation, missing_columns) %} {% set source_columns = adapter.get_columns_in_relation(staging_table) - | rejectattr('name', 'equalto', 'dbt_change_type') - | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') - | rejectattr('name', 'equalto', 'dbt_unique_key') - | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') + | rejectattr('name', 'in', remove_columns) | list %} {% set quoted_source_columns = [] %} diff --git a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql index d22cc3363..4b102cda6 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql @@ -50,7 +50,7 @@ Core strategy definitions #} {% macro snapshot_timestamp_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} - {% set primary_key = config['unique_key'] %} + {% set unique_key = config['unique_key'] %} {% set updated_at = config['updated_at'] %} {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %} @@ -67,10 +67,19 @@ ({{ snapshotted_rel }}.dbt_valid_from < {{ current_rel }}.{{ updated_at }}) {%- endset %} - {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %} + {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %} + {% set scd_args = [] %} + {% for key in unique_key %} + {{ scd_args.append(key) }} + {% endfor %} + {{ scd_args.append(updated_at) }} + {% set scd_id_expr = snapshot_hash_arguments(scd_args) %} + {% else %} + {% set scd_id_expr = snapshot_hash_arguments([unique_key, updated_at]) %} + {% endif %} {% do return({ - "unique_key": primary_key, + "unique_key": unique_key, "updated_at": updated_at, "row_changed": row_changed_expr, "scd_id": scd_id_expr, @@ -135,7 +144,7 @@ {% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} {% set check_cols_config = config['check_cols'] %} - {% set primary_key = config['unique_key'] %} + {% set unique_key = config['unique_key'] %} {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %} {% set updated_at = config.get('updated_at', snapshot_get_time()) %} @@ -162,10 +171,19 @@ ) {%- endset %} - {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %} + {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %} + {% set scd_args = [] %} + {% for key in unique_key %} + {{ scd_args.append(key) }} + {% endfor %} + {{ scd_args.append(updated_at) }} + {% set scd_id_expr = snapshot_hash_arguments(scd_args) %} + {% else %} + {% set scd_id_expr = snapshot_hash_arguments([unique_key, updated_at]) %} + {% endif %} {% do return({ - "unique_key": primary_key, + "unique_key": unique_key, "updated_at": updated_at, "row_changed": row_changed_expr, "scd_id": scd_id_expr, From f36da899412f67dfbc6db1a029b3c6facb6f3ab3 Mon Sep 17 00:00:00 2001 From: Antonio Papa Date: Mon, 22 Apr 2024 08:13:17 -0400 Subject: [PATCH 02/12] Adds changie --- .changes/unreleased/Features-20240422-081302.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20240422-081302.yaml diff --git a/.changes/unreleased/Features-20240422-081302.yaml b/.changes/unreleased/Features-20240422-081302.yaml new file mode 100644 index 000000000..c58e471eb --- /dev/null +++ b/.changes/unreleased/Features-20240422-081302.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Allows unique_key for snapshots to take a list +time: 2024-04-22T08:13:02.937534-04:00 +custom: + Author: agpapa + Issue: "181" From 3155da109b2ddadc76c9611b40b49aa4e407a4c8 Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Mon, 30 Sep 2024 14:30:51 -0400 Subject: [PATCH 03/12] Use primary_key in strategies --- .../macros/materializations/snapshots/helpers.sql | 14 +++++++------- .../materializations/snapshots/strategies.sql | 8 ++++---- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index 56a993263..389a3618c 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -50,7 +50,7 @@ snapshotted_data as ( select *, - {% if strategy.unique_key is sequence and strategy.unique_key is not mapping and strategy.unique_key is not string %} + {% if strategy.unique_key is iterable and strategy.unique_key is not string and strategy.unique_key is not mapping %} {% for key in strategy.unique_key %} {{ key }} as dbt_unique_key_{{ loop.index }} {%- if not loop.last %} , {%- endif %} @@ -68,7 +68,7 @@ select *, - {% if strategy.unique_key is sequence and strategy.unique_key is not mapping and strategy.unique_key is not string %} + {% if strategy.unique_key is iterable and strategy.unique_key is not string and strategy.unique_key is not mapping %} {% for key in strategy.unique_key %} {{ key }} as dbt_unique_key_{{ loop.index }}, {% endfor %} @@ -87,7 +87,7 @@ select *, - {% if strategy.unique_key is sequence and strategy.unique_key is not mapping and strategy.unique_key is not string %} + {% if strategy.unique_key is iterable and strategy.unique_key is not string and strategy.unique_key is not mapping %} {% for key in strategy.unique_key %} {{ key }} as dbt_unique_key_{{ loop.index }}, {% endfor %} @@ -107,7 +107,7 @@ select *, - {% if strategy.unique_key is sequence and strategy.unique_key is not mapping and strategy.unique_key is not string %} + {% if strategy.unique_key is iterable and strategy.unique_key is not string and strategy.unique_key is not mapping %} {% for key in strategy.unique_key %} {{ key }} as dbt_unique_key_{{ loop.index }} {%- if not loop.last %} , {%- endif %} @@ -127,7 +127,7 @@ from insertions_source_data as source_data left outer join snapshotted_data on - {% if strategy.unique_key is sequence and strategy.unique_key is not mapping and strategy.unique_key is not string %} + {% if strategy.unique_key is iterable and strategy.unique_key is not string and strategy.unique_key is not mapping %} {% for key in strategy.unique_key %} snapshotted_data.dbt_unique_key_{{ loop.index }} = source_data.dbt_unique_key_{{ loop.index }} {%- if not loop.last %} and {%- endif %} @@ -161,7 +161,7 @@ from updates_source_data as source_data join snapshotted_data on - {% if strategy.unique_key is sequence and strategy.unique_key is not mapping and strategy.unique_key is not string %} + {% if strategy.unique_key is iterable and strategy.unique_key is not string and strategy.unique_key is not mapping %} {% for key in strategy.unique_key %} snapshotted_data.dbt_unique_key_{{ loop.index }} = source_data.dbt_unique_key_{{ loop.index }} {%- if not loop.last %} and {%- endif %} @@ -189,7 +189,7 @@ from snapshotted_data left join deletes_source_data as source_data on - {% if strategy.unique_key is sequence and strategy.unique_key is not mapping and strategy.unique_key is not string %} + {% if strategy.unique_key is iterable and strategy.unique_key is not string and strategy.unique_key is not mapping %} {% for key in strategy.unique_key %} snapshotted_data.dbt_unique_key_{{ loop.index }} = source_data.dbt_unique_key_{{ loop.index }} {%- if not loop.last %} and {%- endif %} diff --git a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql index 47b35ec39..db7017e21 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql @@ -70,19 +70,19 @@ ({{ snapshotted_rel }}.{{ columns.dbt_valid_from }} < {{ current_rel }}.{{ updated_at }}) {%- endset %} - {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %} + {% if primary_key is iterable and primary_key is not string and primary_key is not mapping %} {% set scd_args = [] %} - {% for key in unique_key %} + {% for key in primary_key %} {{ scd_args.append(key) }} {% endfor %} {{ scd_args.append(updated_at) }} {% set scd_id_expr = snapshot_hash_arguments(scd_args) %} {% else %} - {% set scd_id_expr = snapshot_hash_arguments([unique_key, updated_at]) %} + {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %} {% endif %} {% do return({ - "unique_key": unique_key, + "unique_key": primary_key, "updated_at": updated_at, "row_changed": row_changed_expr, "scd_id": scd_id_expr, From 19abd4149203f4e6d2df9e0fb96ea3a3b3a522b3 Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Tue, 1 Oct 2024 10:58:51 -0400 Subject: [PATCH 04/12] use primary_key in check strategy --- .../macros/materializations/snapshots/helpers.sql | 2 ++ .../macros/materializations/snapshots/strategies.sql | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index 389a3618c..917054fef 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -51,11 +51,13 @@ select *, {% if strategy.unique_key is iterable and strategy.unique_key is not string and strategy.unique_key is not mapping %} + {{ log("strategy.unique_key is list...." ~ strategy.unique_key, info=true) }} {% for key in strategy.unique_key %} {{ key }} as dbt_unique_key_{{ loop.index }} {%- if not loop.last %} , {%- endif %} {% endfor %} {% else %} + {{ log("strategy.unique_key is NOT list...." ~ strategy.unique_key, info=true) }} {{ strategy.unique_key }} as dbt_unique_key {% endif %} diff --git a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql index db7017e21..35d341b9a 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql @@ -175,19 +175,19 @@ ) {%- endset %} - {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %} + {% if primary_key is sequence and primary_key is not mapping and primary_key is not string %} {% set scd_args = [] %} - {% for key in unique_key %} + {% for key in primary_key %} {{ scd_args.append(key) }} {% endfor %} {{ scd_args.append(updated_at) }} {% set scd_id_expr = snapshot_hash_arguments(scd_args) %} {% else %} - {% set scd_id_expr = snapshot_hash_arguments([unique_key, updated_at]) %} + {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %} {% endif %} {% do return({ - "unique_key": unique_key, + "unique_key": primary_key, "updated_at": updated_at, "row_changed": row_changed_expr, "scd_id": scd_id_expr, From c2c1f761e8481434ca996cd1aefd25b266ebb329 Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Tue, 1 Oct 2024 15:03:05 -0400 Subject: [PATCH 05/12] Update to use is_list filter --- .../materializations/snapshots/helpers.sql | 16 +++++++--------- .../materializations/snapshots/snapshot.sql | 2 +- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index 917054fef..169bdb642 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -50,14 +50,12 @@ snapshotted_data as ( select *, - {% if strategy.unique_key is iterable and strategy.unique_key is not string and strategy.unique_key is not mapping %} - {{ log("strategy.unique_key is list...." ~ strategy.unique_key, info=true) }} + {% if strategy.unique_key | is_list %} {% for key in strategy.unique_key %} {{ key }} as dbt_unique_key_{{ loop.index }} {%- if not loop.last %} , {%- endif %} {% endfor %} {% else %} - {{ log("strategy.unique_key is NOT list...." ~ strategy.unique_key, info=true) }} {{ strategy.unique_key }} as dbt_unique_key {% endif %} @@ -70,7 +68,7 @@ select *, - {% if strategy.unique_key is iterable and strategy.unique_key is not string and strategy.unique_key is not mapping %} + {% if strategy.unique_key | is_list %} {% for key in strategy.unique_key %} {{ key }} as dbt_unique_key_{{ loop.index }}, {% endfor %} @@ -89,7 +87,7 @@ select *, - {% if strategy.unique_key is iterable and strategy.unique_key is not string and strategy.unique_key is not mapping %} + {% if strategy.unique_key | is_list %} {% for key in strategy.unique_key %} {{ key }} as dbt_unique_key_{{ loop.index }}, {% endfor %} @@ -109,7 +107,7 @@ select *, - {% if strategy.unique_key is iterable and strategy.unique_key is not string and strategy.unique_key is not mapping %} + {% if strategy.unique_key | is_list %} {% for key in strategy.unique_key %} {{ key }} as dbt_unique_key_{{ loop.index }} {%- if not loop.last %} , {%- endif %} @@ -129,7 +127,7 @@ from insertions_source_data as source_data left outer join snapshotted_data on - {% if strategy.unique_key is iterable and strategy.unique_key is not string and strategy.unique_key is not mapping %} + {% if strategy.unique_key | is_list %} {% for key in strategy.unique_key %} snapshotted_data.dbt_unique_key_{{ loop.index }} = source_data.dbt_unique_key_{{ loop.index }} {%- if not loop.last %} and {%- endif %} @@ -163,7 +161,7 @@ from updates_source_data as source_data join snapshotted_data on - {% if strategy.unique_key is iterable and strategy.unique_key is not string and strategy.unique_key is not mapping %} + {% if strategy.unique_key | is_list %} {% for key in strategy.unique_key %} snapshotted_data.dbt_unique_key_{{ loop.index }} = source_data.dbt_unique_key_{{ loop.index }} {%- if not loop.last %} and {%- endif %} @@ -191,7 +189,7 @@ from snapshotted_data left join deletes_source_data as source_data on - {% if strategy.unique_key is iterable and strategy.unique_key is not string and strategy.unique_key is not mapping %} + {% if strategy.unique_key | is_list %} {% for key in strategy.unique_key %} snapshotted_data.dbt_unique_key_{{ loop.index }} = source_data.dbt_unique_key_{{ loop.index }} {%- if not loop.last %} and {%- endif %} diff --git a/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql b/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql index b383ffa7e..0c9590b6b 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql @@ -47,7 +47,7 @@ to_relation=target_relation) %} {% set remove_columns = ['dbt_change_type', 'DBT_CHANGE_TYPE', 'dbt_unique_key', 'DBT_UNIQUE_KEY'] %} - {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %} + {% if unique_key | is_list %} {% for key in strategy.unique_key %} {{ remove_columns.append('dbt_unique_key_' + loop.index|string) }} {{ remove_columns.append('DBT_UNIQUE_KEY_' + loop.index|string) }} From 095c9f96e0c9c6e462761b85a91d28ba6dde7034 Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Thu, 17 Oct 2024 13:11:32 -0400 Subject: [PATCH 06/12] Specify dbt-common >= 1.11.0, create unique_key_fields macro --- .../materializations/snapshots/helpers.sql | 55 ++++++------------- .../materializations/snapshots/strategies.sql | 4 +- pyproject.toml | 2 +- 3 files changed, 20 insertions(+), 41 deletions(-) diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index 6710ed25e..a8a2ff727 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -49,16 +49,7 @@ snapshotted_data as ( - select *, - {% if strategy.unique_key | is_list %} - {% for key in strategy.unique_key %} - {{ key }} as dbt_unique_key_{{ loop.index }} - {%- if not loop.last %} , {%- endif %} - {% endfor %} - {% else %} - {{ strategy.unique_key }} as dbt_unique_key - {% endif %} - + select *, {% unique_key_fields(strategy.unique_key) %} from {{ target_relation }} where {% if config.get('dbt_valid_to_current') %} @@ -72,15 +63,7 @@ insertions_source_data as ( - select - *, - {% if strategy.unique_key | is_list %} - {% for key in strategy.unique_key %} - {{ key }} as dbt_unique_key_{{ loop.index }}, - {% endfor %} - {% else %} - {{ strategy.unique_key }} as dbt_unique_key, - {% endif %} + select *, {% unique_key_fields(strategy.unique_key) %}, {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, {{ get_dbt_valid_to_current(strategy, columns) }}, @@ -91,15 +74,7 @@ updates_source_data as ( - select - *, - {% if strategy.unique_key | is_list %} - {% for key in strategy.unique_key %} - {{ key }} as dbt_unique_key_{{ loop.index }}, - {% endfor %} - {% else %} - {{ strategy.unique_key }} as dbt_unique_key, - {% endif %} + select *, {% unique_key_fields(strategy.unique_key %}, {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, {{ strategy.updated_at }} as {{ columns.dbt_valid_to }} @@ -111,16 +86,7 @@ deletes_source_data as ( - select - *, - {% if strategy.unique_key | is_list %} - {% for key in strategy.unique_key %} - {{ key }} as dbt_unique_key_{{ loop.index }} - {%- if not loop.last %} , {%- endif %} - {% endfor %} - {% else %} - {{ strategy.unique_key }} as dbt_unique_key - {% endif %} + select *, {% unique_key_fields(strategy.unique_key) %} from snapshot_query ), {% endif %} @@ -275,8 +241,21 @@ {% endif %} {% endmacro %} + {% macro get_dbt_valid_to_current(strategy, columns) %} {% set dbt_valid_to_current = config.get('dbt_valid_to_current') or "null" %} coalesce(nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}), {{dbt_valid_to_current}}) as {{ columns.dbt_valid_to }} {% endmacro %} + + +{% macro unique_key_fields(unique_key) %} + {% if unique_key | is_list %} + {% for key in unique_key %} + {{ key }} as dbt_unique_key_{{ loop.index }} + {%- if not loop.last %} , {%- endif %} + {% endfor %} + {% else %} + {{ unique_key }} as dbt_unique_key + {% endif %} +{% endmacro %} diff --git a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql index 35d341b9a..dba5886f7 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql @@ -70,7 +70,7 @@ ({{ snapshotted_rel }}.{{ columns.dbt_valid_from }} < {{ current_rel }}.{{ updated_at }}) {%- endset %} - {% if primary_key is iterable and primary_key is not string and primary_key is not mapping %} + {% if primary_key | is_list %} {% set scd_args = [] %} {% for key in primary_key %} {{ scd_args.append(key) }} @@ -175,7 +175,7 @@ ) {%- endset %} - {% if primary_key is sequence and primary_key is not mapping and primary_key is not string %} + {% if primary_key | is_list %} {% set scd_args = [] %} {% for key in primary_key %} {{ scd_args.append(key) }} diff --git a/pyproject.toml b/pyproject.toml index 76ca3deeb..70a19dd8c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ classifiers = [ "Programming Language :: Python :: 3.12", ] dependencies = [ - "dbt-common>=1.10,<2.0", + "dbt-common>=1.11,<2.0", "pytz>=2015.7", # installed via dbt-common but used directly "agate>=1.0,<2.0", From 10867bbee3ecf45742291a734a05c66c416b946c Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Thu, 17 Oct 2024 13:25:21 -0400 Subject: [PATCH 07/12] Fix jinja syntax --- .../macros/materializations/snapshots/helpers.sql | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index a8a2ff727..d0f299857 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -49,7 +49,7 @@ snapshotted_data as ( - select *, {% unique_key_fields(strategy.unique_key) %} + select *, {{ unique_key_fields(strategy.unique_key) }} from {{ target_relation }} where {% if config.get('dbt_valid_to_current') %} @@ -63,7 +63,7 @@ insertions_source_data as ( - select *, {% unique_key_fields(strategy.unique_key) %}, + select *, {{ unique_key_fields(strategy.unique_key) }}, {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, {{ get_dbt_valid_to_current(strategy, columns) }}, @@ -74,7 +74,7 @@ updates_source_data as ( - select *, {% unique_key_fields(strategy.unique_key %}, + select *, {{ unique_key_fields(strategy.unique_key }}, {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, {{ strategy.updated_at }} as {{ columns.dbt_valid_to }} @@ -86,7 +86,7 @@ deletes_source_data as ( - select *, {% unique_key_fields(strategy.unique_key) %} + select *, {{ unique_key_fields(strategy.unique_key) }} from snapshot_query ), {% endif %} From 2e5c59b7d32c558fc62b179f8b99331611b7a618 Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Thu, 17 Oct 2024 15:05:42 -0400 Subject: [PATCH 08/12] typo --- .../macros/materializations/snapshots/helpers.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index d0f299857..2250ec9e4 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -74,7 +74,7 @@ updates_source_data as ( - select *, {{ unique_key_fields(strategy.unique_key }}, + select *, {{ unique_key_fields(strategy.unique_key) }}, {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, {{ strategy.updated_at }} as {{ columns.dbt_valid_to }} From 636e6b8780285dff1fade8ab13abd56bd62a10e9 Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Thu, 17 Oct 2024 15:44:02 -0400 Subject: [PATCH 09/12] classmethod scd_args on BaseRelation --- dbt/adapters/base/relation.py | 11 +++++++++++ .../macros/materializations/snapshots/strategies.sql | 12 ++---------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/dbt/adapters/base/relation.py b/dbt/adapters/base/relation.py index 80dbd34ba..ecd873457 100644 --- a/dbt/adapters/base/relation.py +++ b/dbt/adapters/base/relation.py @@ -6,6 +6,7 @@ Dict, FrozenSet, Iterator, + List, Optional, Set, Tuple, @@ -341,6 +342,16 @@ def create( ) return cls.from_dict(kwargs) + @classmethod + def scd_args(cls: Type[Self], primary_key: Union[str, List[str]], updated_at) -> List[str]: + scd_args = [] + if isinstance(primary_key, list): + scd_args.extend(primary_key) + else: + scd_args.append(primary_key) + scd_args.append(updated_at) + return scd_args + @property def can_be_renamed(self) -> bool: return self.type in self.renameable_relations diff --git a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql index dba5886f7..0b7b232a2 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql @@ -70,16 +70,8 @@ ({{ snapshotted_rel }}.{{ columns.dbt_valid_from }} < {{ current_rel }}.{{ updated_at }}) {%- endset %} - {% if primary_key | is_list %} - {% set scd_args = [] %} - {% for key in primary_key %} - {{ scd_args.append(key) }} - {% endfor %} - {{ scd_args.append(updated_at) }} - {% set scd_id_expr = snapshot_hash_arguments(scd_args) %} - {% else %} - {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %} - {% endif %} + {% set scd_args = api.Relation.scd_args(primary_key, updated_at) %} + {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %} {% do return({ "unique_key": primary_key, From 2f499d532fb037c5b5803d65c1c67e0eab2882a4 Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Fri, 18 Oct 2024 10:55:51 -0400 Subject: [PATCH 10/12] Add some macros and use in one place --- .../materializations/snapshots/helpers.sql | 55 ++++++++++++------- 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index 2250ec9e4..4acecc8b7 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -98,27 +98,10 @@ source_data.* from insertions_source_data as source_data - left outer join snapshotted_data on - {% if strategy.unique_key | is_list %} - {% for key in strategy.unique_key %} - snapshotted_data.dbt_unique_key_{{ loop.index }} = source_data.dbt_unique_key_{{ loop.index }} - {%- if not loop.last %} and {%- endif %} - {% endfor %} - where snapshotted_data.dbt_unique_key_1 is null - or ( - snapshotted_data.dbt_unique_key_1 is not null - and ( - {{ strategy.row_changed }} - ) - {% else %} - snapshotted_data.dbt_unique_key = source_data.dbt_unique_key - where snapshotted_data.dbt_unique_key is null - or ( - snapshotted_data.dbt_unique_key is not null - and ( - {{ strategy.row_changed }} - ) - {% endif %} + left outer join snapshotted_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }} + or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and {{ strategy.row_changed }}) ) @@ -259,3 +242,33 @@ {{ unique_key }} as dbt_unique_key {% endif %} {% endmacro %} + + +{% macro unique_key_join_on(unique_key, identifier, from_identifier) %} + {% if strategy.unique_key | is_list %} + {% for key in strategy.unique_key %} + {{ identifier }}.dbt_unique_key_{{ loop.index }} = {{ from_identifier }}.dbt_unique_key_{{ loop.index }} + {%- if not loop.last %} and {%- endif %} + {% endfor %} + {% else %} + {{ identifier }}.dbt_unique_key = {{ from_identifier }}.dbt_unique_key + {% endif %} +{% endmacro %} + + +{% macro unique_key_is_null(unique_key, identifier) %} + {% if unique_key | is_list %} + {{ identifier }}.dbt_unique_key_1 is null + {% else %} + {{ identifer }}.dbt_unique_key is null + {% endif %} +{% endmacro %} + + +{% macro unique_key_is_not_null(unique_key, identifier) %} + {% if unique_key | is_list %} + {{ identifier }}.dbt_unique_key_1 is not null + {% else %} + {{ identifer }}.dbt_unique_key is not null + {% endif %} +{% endmacro %} From 059b9f7bdcc73d553a9021ce41199c4a595bbb14 Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Fri, 18 Oct 2024 11:10:16 -0400 Subject: [PATCH 11/12] Use new macros in more places --- .../materializations/snapshots/helpers.sql | 25 ++++--------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index 4acecc8b7..5c2bad998 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -115,15 +115,8 @@ snapshotted_data.{{ columns.dbt_scd_id }} from updates_source_data as source_data - join snapshotted_data on - {% if strategy.unique_key | is_list %} - {% for key in strategy.unique_key %} - snapshotted_data.dbt_unique_key_{{ loop.index }} = source_data.dbt_unique_key_{{ loop.index }} - {%- if not loop.last %} and {%- endif %} - {% endfor %} - {% else %} - snapshotted_data.dbt_unique_key = source_data.dbt_unique_key - {% endif %} + join snapshotted_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} where ( {{ strategy.row_changed }} ) @@ -143,17 +136,9 @@ snapshotted_data.{{ columns.dbt_scd_id }} from snapshotted_data - left join deletes_source_data as source_data on - {% if strategy.unique_key | is_list %} - {% for key in strategy.unique_key %} - snapshotted_data.dbt_unique_key_{{ loop.index }} = source_data.dbt_unique_key_{{ loop.index }} - {%- if not loop.last %} and {%- endif %} - {% endfor %} - where source_data.dbt_unique_key_1 is null - {% else %} - snapshotted_data.dbt_unique_key = source_data.dbt_unique_key - where source_data.dbt_unique_key is null - {% endif %} + left join deletes_source_data as source_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "source_data") }} ) {%- endif %} From 9a959d26538855e3d7ec8d1beaf67fc49a8870a1 Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Fri, 18 Oct 2024 11:13:06 -0400 Subject: [PATCH 12/12] Switch to new scd_args in another place --- .../materializations/snapshots/strategies.sql | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql index 0b7b232a2..f9f5afbd5 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql @@ -71,7 +71,7 @@ {%- endset %} {% set scd_args = api.Relation.scd_args(primary_key, updated_at) %} - {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %} + {% set scd_id_expr = snapshot_hash_arguments(scd_args) %} {% do return({ "unique_key": primary_key, @@ -167,16 +167,8 @@ ) {%- endset %} - {% if primary_key | is_list %} - {% set scd_args = [] %} - {% for key in primary_key %} - {{ scd_args.append(key) }} - {% endfor %} - {{ scd_args.append(updated_at) }} - {% set scd_id_expr = snapshot_hash_arguments(scd_args) %} - {% else %} - {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %} - {% endif %} + {% set scd_args = api.Relation.scd_args(primary_key, updated_at) %} + {% set scd_id_expr = snapshot_hash_arguments(scd_args) %} {% do return({ "unique_key": primary_key,