From ef51365ef457417860dcaf07e465f47ce07fb86a Mon Sep 17 00:00:00 2001 From: Tim Kirschke Date: Fri, 20 Dec 2024 10:42:55 +0100 Subject: [PATCH] Fixing Multi Active Hash Calculation and loading mechanism --- macros/macro-1.yml | 6 ++-- .../run.sql.j2 | 35 +++++++++++++++---- .../DatavaultbyScalefreeStage-4/create.sql.j2 | 33 +++++++++++++++-- 3 files changed, 62 insertions(+), 12 deletions(-) diff --git a/macros/macro-1.yml b/macros/macro-1.yml index fd2c0b0..b73a99b 100644 --- a/macros/macro-1.yml +++ b/macros/macro-1.yml @@ -21,7 +21,7 @@ macroString: |- "hash": "MD5", "hash_datatype": "STRING", "hash_input_case_sensitive": "TRUE", - "hash_passthrough_input_transformations": "TRUE", + "hash_passthrough_input_transformations": "FALSE", "beginning_of_all_times": "0001-01-01T00:00:01", "end_of_all_times": "8888-12-31T23:59:59", "timestamp_format": "YYYY-MM-DDTHH24:MI:SS", @@ -406,7 +406,7 @@ macroString: |- {#-- hashing calculation of multi-active --#} {#-- satellite's hash diff attribute. --#} {%- if is_hashdiff and multi_active_key is defined and multi_active_key|length>0 -%} - {%- set multi_active_key = multi_active_key|join(", ") -%} + {%- set multi_active_key = multi_active_key|join('", "') -%} {%- set listagg_closing = ' WITHIN GROUP (ORDER BY "{}") OVER (PARTITION BY "{}", "{}"))'.format(multi_active_key, main_hashkey_col, ldts_alias) -%} {%- endif -%} @@ -498,7 +498,7 @@ macroString: |- {{ standardise_prefix }} {%- for column in columns -%} {%- set ns.all_null = ns.all_null + [null_placeholder_string] -%} - {%- if datavault4coalesce.config.hash_passthrough_input_transformations -%} + {%- if datavault4coalesce.config.hash_passthrough_input_transformations == TRUE -%} {%- set column_str = datavault4coalesce__as_constant(get_source_transform(column)) -%} {%- else -%} {%- if '.' in column.name %} diff --git a/nodeTypes/DatavaultbyScalefreeMulti-activeSatelliteV0-18/run.sql.j2 b/nodeTypes/DatavaultbyScalefreeMulti-activeSatelliteV0-18/run.sql.j2 index 10d417f..e266150 100644 --- a/nodeTypes/DatavaultbyScalefreeMulti-activeSatelliteV0-18/run.sql.j2 +++ b/nodeTypes/DatavaultbyScalefreeMulti-activeSatelliteV0-18/run.sql.j2 @@ -7,7 +7,24 @@ INSERT INTO {{ ref_no_link(node.location.name, node.name) }} -WITH latest_entries_in_sat AS ( +WITH + +source_data AS ( + + {% for source in sources %} + + SELECT + {% for col in source.columns %} + {{ get_source_transform(col) }} AS "{{ col.name }}", + {% endfor %} + + {{ source.join }} + + {% endfor %} + +), + +latest_entries_in_sat AS ( /* get current rows from satellite */ {# /* USE THIS again when ColumnDropdownSelector is used @@ -33,7 +50,7 @@ deduplicated_numbered_source AS ( SELECT {% for col in source.columns %} - {{ get_source_transform(col) }} AS {{ col.name }}, + {{ get_source_transform(col) }} AS "{{ col.name }}", {% endfor %} ROW_NUMBER() OVER(PARTITION BY "{{ hashkey_column }}" ORDER BY "{{ datavault4coalesce.config.ldts_alias }}") as rn @@ -62,17 +79,21 @@ deduplicated_numbered_source AS ( {% for source in sources %} SELECT DISTINCT {% for col in source.columns %} - {{ col.name }} + src."{{ col.name }}" {%- if not loop.last -%}, {% endif %} {% endfor %} - FROM deduplicated_numbered_source + FROM source_data src + INNER JOIN deduplicated_numbered_source dedupe + ON src."{{ hashkey_column }}" = dedupe."{{ hashkey_column }}" + AND src."{{ hashdiff_column }}" = dedupe."{{ hashdiff_column }}" + AND src."{{ datavault4coalesce.config.ldts_alias }}" = dedupe."{{ datavault4coalesce.config.ldts_alias }}" WHERE NOT EXISTS ( SELECT 1 FROM latest_entries_in_sat WHERE - deduplicated_numbered_source.{{ hashdiff_column }} = latest_entries_in_sat."{{ hashdiff_column }}" - AND deduplicated_numbered_source.{{ hashkey_column }} = latest_entries_in_sat."{{ hashkey_column }}" - AND deduplicated_numbered_source.rn = 1 + dedupe."{{ hashdiff_column }}" = latest_entries_in_sat."{{ hashdiff_column }}" + AND dedupe."{{ hashkey_column }}" = latest_entries_in_sat."{{ hashkey_column }}" + AND dedupe.rn = 1 ) {% endfor %} diff --git a/nodeTypes/DatavaultbyScalefreeStage-4/create.sql.j2 b/nodeTypes/DatavaultbyScalefreeStage-4/create.sql.j2 index 46a4549..3ec08be 100644 --- a/nodeTypes/DatavaultbyScalefreeStage-4/create.sql.j2 +++ b/nodeTypes/DatavaultbyScalefreeStage-4/create.sql.j2 @@ -24,6 +24,31 @@ ) {%- if node.description | length > 0 %} COMMENT = '{{ node.description }}'{% endif %} AS + + WITH + + {# All columns that are not hash columns #} + + regular_columns AS ( + + {% for source in sources %} + SELECT + {% for col in source.columns %} + {% if not col.hashDetails %} + {{ get_source_transform(col) }} AS "{{ col.name }}" + {%- if not loop.last -%}, {% endif %} + {% endif %} + {% endfor %} + + {{ source.join }} + + {% endfor %} + + ), + + all_columns AS ( + + {% for source in sources %} {#-------------------- Process multi-activity config --------------------#} {#----------------------- FIXME explain variables -----------------------#} @@ -42,12 +67,12 @@ {{ datavault4coalesce__hash(columns=col.hashDetails.columns, is_hashdiff=col.is_hd,datatype=col.dataType, algo=datavault4coalesce.config.hash, multi_active_key=multi_active_key, main_hashkey_col=main_hashkey_col.hk_col) }} AS "{{ col.name }}" {#- Print other table columns -#} {% else %} - {{ get_source_transform(col) }} AS "{{ col.name }}" + "{{ col.name }}" {% endif %} {%- if not loop.last -%}, {% endif %} {% endfor %} - {{ source.join }} + FROM regular_columns {% if not loop.last %} {% if config.insertStrategy in ['UNION', 'UNION ALL'] %} @@ -93,4 +118,8 @@ {% endfor %} + ) + + SELECT * FROM all_columns + {% endif %} \ No newline at end of file