|
| 1 | +use role accountadmin; |
| 2 | +use schema {{database_name}}.synapse_raw; --noqa: JJ01,PRS,TMP |
| 3 | +alter task refresh_synapse_warehouse_s3_stage_task suspend; |
| 4 | +alter task userprofilesnapshot_task suspend; |
| 5 | +alter task upsert_to_userprofile_latest_task suspend; |
| 6 | +alter task userprofilesnapshot_task MODIFY AS |
| 7 | + copy into |
| 8 | + userprofilesnapshot |
| 9 | + from ( |
| 10 | + select |
| 11 | + $1:change_type as change_type, |
| 12 | + $1:change_timestamp as change_timestamp, |
| 13 | + $1:change_user_id as change_user_id, |
| 14 | + $1:snapshot_timestamp as snapshot_timestamp, |
| 15 | + $1:id as id, |
| 16 | + $1:user_name as user_name, |
| 17 | + $1:first_name as first_name, |
| 18 | + $1:last_name as last_name, |
| 19 | + REGEXP_REPLACE($1:email, '.+\@', '*****@') as email, |
| 20 | + $1:location as location, |
| 21 | + $1:company as company, |
| 22 | + $1:position as position, |
| 23 | + NULLIF( |
| 24 | + REGEXP_REPLACE( |
| 25 | + metadata$filename, |
| 26 | + '.*userprofilesnapshots\/snapshot_date\=(.*)\/.*', '\\1' |
| 27 | + ), |
| 28 | + '__HIVE_DEFAULT_PARTITION__' |
| 29 | + ) as snapshot_date, |
| 30 | + $1:created_on as created_on |
| 31 | + from |
| 32 | + @{{stage_storage_integration}}_stage/userprofilesnapshots --noqa: TMP |
| 33 | + ) |
| 34 | + pattern = '.*userprofilesnapshots/snapshot_date=.*/.*'; |
| 35 | + |
| 36 | +alter task upsert_to_userprofile_latest_task modify as |
| 37 | + MERGE INTO {{database_name}}.SYNAPSE.USERPROFILE_LATEST AS TARGET_TABLE --noqa: TMP |
| 38 | + USING ( |
| 39 | + WITH RANKED_NODES AS ( |
| 40 | + SELECT |
| 41 | + *, |
| 42 | + "row_number"() |
| 43 | + OVER ( |
| 44 | + PARTITION BY ID |
| 45 | + ORDER BY CHANGE_TIMESTAMP DESC, SNAPSHOT_TIMESTAMP DESC |
| 46 | + ) |
| 47 | + AS N |
| 48 | + FROM |
| 49 | + USERPROFILESNAPSHOT_STREAM |
| 50 | + ) |
| 51 | + |
| 52 | + SELECT * EXCLUDE N |
| 53 | + FROM RANKED_NODES |
| 54 | + WHERE N = 1 |
| 55 | + ) AS SOURCE_TABLE ON TARGET_TABLE.ID = SOURCE_TABLE.ID |
| 56 | + WHEN MATCHED THEN |
| 57 | + UPDATE SET |
| 58 | + TARGET_TABLE.CHANGE_TYPE = SOURCE_TABLE.CHANGE_TYPE, |
| 59 | + TARGET_TABLE.CHANGE_TIMESTAMP = SOURCE_TABLE.CHANGE_TIMESTAMP, |
| 60 | + TARGET_TABLE.CHANGE_USER_ID = SOURCE_TABLE.CHANGE_USER_ID, |
| 61 | + TARGET_TABLE.SNAPSHOT_TIMESTAMP = SOURCE_TABLE.SNAPSHOT_TIMESTAMP, |
| 62 | + TARGET_TABLE.ID = SOURCE_TABLE.ID, |
| 63 | + TARGET_TABLE.USER_NAME = SOURCE_TABLE.USER_NAME, |
| 64 | + TARGET_TABLE.FIRST_NAME = SOURCE_TABLE.FIRST_NAME, |
| 65 | + TARGET_TABLE.LAST_NAME = SOURCE_TABLE.LAST_NAME, |
| 66 | + TARGET_TABLE.EMAIL = SOURCE_TABLE.EMAIL, |
| 67 | + TARGET_TABLE.LOCATION = SOURCE_TABLE.LOCATION, |
| 68 | + TARGET_TABLE.COMPANY = SOURCE_TABLE.COMPANY, |
| 69 | + TARGET_TABLE.POSITION = SOURCE_TABLE.POSITION, |
| 70 | + TARGET_TABLE.SNAPSHOT_DATE = SOURCE_TABLE.SNAPSHOT_DATE, |
| 71 | + TARGET_TABLE.CREATED_ON = SOURCE_TABLE.CREATED_ON |
| 72 | + WHEN NOT MATCHED THEN |
| 73 | + INSERT (CHANGE_TYPE, CHANGE_TIMESTAMP, CHANGE_USER_ID, SNAPSHOT_TIMESTAMP, ID, USER_NAME, FIRST_NAME, LAST_NAME, EMAIL, LOCATION, COMPANY, POSITION, SNAPSHOT_DATE, CREATED_ON) |
| 74 | + VALUES (SOURCE_TABLE.CHANGE_TYPE, SOURCE_TABLE.CHANGE_TIMESTAMP, SOURCE_TABLE.CHANGE_USER_ID, SOURCE_TABLE.SNAPSHOT_TIMESTAMP, SOURCE_TABLE.ID, SOURCE_TABLE.USER_NAME, SOURCE_TABLE.FIRST_NAME, SOURCE_TABLE.LAST_NAME, SOURCE_TABLE.EMAIL, SOURCE_TABLE.LOCATION, SOURCE_TABLE.COMPANY, SOURCE_TABLE.POSITION, SOURCE_TABLE.SNAPSHOT_DATE, SOURCE_TABLE.CREATED_ON); |
| 75 | + |
| 76 | +alter task upsert_to_userprofile_latest_task resume; |
| 77 | +alter task userprofilesnapshot_task resume; |
| 78 | +alter task refresh_synapse_warehouse_s3_stage_task resume; |
0 commit comments