Skip to content

Commit f01e134

Browse files
authored
Merge pull request #5 from Sage-Bionetworks/poc-snow-streams
Create streams to start upserting into the latest tables
2 parents d22c260 + 4136404 commit f01e134

File tree

1 file changed

+256
-3
lines changed

1 file changed

+256
-3
lines changed

transforms/synapse_etl.sql

+256-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
11
USE ROLE SYSADMIN;
22
USE DATABASE SYNAPSE_DATA_WAREHOUSE;
3+
4+
USE SCHEMA SYNAPSE_RAW;
5+
CREATE STREAM IF NOT EXISTS CERTIFIEDQUIZQUESTION_STREAM ON TABLE CERTIFIEDQUIZQUESTION;
6+
CREATE STREAM IF NOT EXISTS CERTIFIEDQUIZ_STREAM ON TABLE CERTIFIEDQUIZ;
7+
CREATE STREAM IF NOT EXISTS USERPROFILESNAPSHOT_STREAM ON TABLE USERPROFILESNAPSHOT;
8+
CREATE STREAM IF NOT EXISTS TEAMMEMBERSNAPSHOTS_STREAM ON TABLE TEAMMEMBERSNAPSHOTS;
9+
CREATE STREAM IF NOT EXISTS FILESNAPSHOTS_STREAM ON TABLE FILESNAPSHOTS;
10+
CREATE STREAM IF NOT EXISTS PROCESSEDACCESS_STREAM ON TABLE PROCESSEDACCESS;
11+
CREATE STREAM IF NOT EXISTS FILEDOWNLOAD_STREAM ON TABLE FILEDOWNLOAD;
12+
CREATE STREAM IF NOT EXISTS NODESNAPSHOTS_STREAM ON TABLE NODESNAPSHOTS;
313
USE SCHEMA SYNAPSE;
414

515
-- Create certified quiz question latest
@@ -73,7 +83,7 @@ RANKED_NODES AS (
7383
*,
7484
"row_number"()
7585
OVER (
76-
PARTITION BY MEMBER_ID
86+
PARTITION BY MEMBER_ID, TEAM_ID
7787
ORDER BY CHANGE_TIMESTAMP DESC, SNAPSHOT_TIMESTAMP DESC
7888
)
7989
AS N
@@ -87,7 +97,7 @@ SELECT * EXCLUDE N
8797
FROM RANKED_NODES
8898
WHERE N = 1;
8999
ALTER TABLE SYNAPSE_DATA_WAREHOUSE.SYNAPSE.TEAMMEMBER_LATEST ADD PRIMARY KEY (
90-
MEMBER_ID
100+
MEMBER_ID, TEAM_ID
91101
);
92102

93103
CREATE OR REPLACE TABLE SYNAPSE_DATA_WAREHOUSE.SYNAPSE.TEAM_LATEST AS WITH
@@ -209,9 +219,252 @@ SELECT
209219
FROM USER_CERT_JOINED;
210220
-- zero copy clone of processed access records
211221
CREATE OR REPLACE TABLE SYNAPSE_DATA_WAREHOUSE.SYNAPSE.PROCESSEDACCESS
212-
CLONE SYNAPSE_DATA_WAREHOUSE.SYNAPSE_RAW.PROCESSEDACCESS;
222+
CLONE
223+
SYNAPSE_DATA_WAREHOUSE.SYNAPSE_RAW.PROCESSEDACCESS;
224+
213225

214226
-- zero copy clone of file download records
227+
-- Only get the latest file download record for each user, file handle, and record date
228+
-- since this is the closest estimate to download records
229+
SELECT *
230+
FROM
231+
SYNAPSE_DATA_WAREHOUSE.SYNAPSE_RAW.FILEDOWNLOAD
232+
QUALIFY ROW_NUMBER() OVER (PARTITION BY USER_ID, FILE_HANDLE_ID, RECORD_DATE ORDER BY USER_ID, FILE_HANDLE_ID, RECORD_DATE) = 1;
233+
234+
235+
236+
-- Merge into certified quiz
237+
MERGE INTO SYNAPSE_DATA_WAREHOUSE.SYNAPSE.CERTIFIEDQUIZ_LATEST AS TARGET_TABLE
238+
USING (
239+
WITH CQQ_RANKED AS (
240+
SELECT
241+
*,
242+
ROW_NUMBER() OVER (
243+
PARTITION BY USER_ID
244+
ORDER BY INSTANCE DESC, RESPONSE_ID DESC
245+
) AS ROW_NUM
246+
FROM CERTIFIEDQUIZ_STREAM
247+
)
248+
249+
SELECT * EXCLUDE ROW_NUM
250+
FROM CQQ_RANKED
251+
WHERE ROW_NUM = 1
252+
) AS SOURCE_TABLE ON TARGET_TABLE.USER_ID = SOURCE_TABLE.USER_ID
253+
WHEN MATCHED THEN
254+
UPDATE SET
255+
TARGET_TABLE.RESPONSE_ID = SOURCE_TABLE.RESPONSE_ID,
256+
TARGET_TABLE.PASSED = SOURCE_TABLE.PASSED,
257+
TARGET_TABLE.PASSED_ON = SOURCE_TABLE.PASSED_ON,
258+
TARGET_TABLE.STACK = SOURCE_TABLE.STACK,
259+
TARGET_TABLE.INSTANCE = SOURCE_TABLE.INSTANCE,
260+
TARGET_TABLE.RECORD_DATE = SOURCE_TABLE.RECORD_DATE
261+
WHEN NOT MATCHED THEN
262+
INSERT (RESPONSE_ID, USER_ID, PASSED, PASSED_ON, STACK, INSTANCE, RECORD_DATE)
263+
VALUES (SOURCE_TABLE.RESPONSE_ID, SOURCE_TABLE.USER_ID, SOURCE_TABLE.PASSED, SOURCE_TABLE.PASSED_ON, SOURCE_TABLE.STACK, SOURCE_TABLE.INSTANCE, SOURCE_TABLE.RECORD_DATE);
264+
265+
-- merge into certified quiz questions
266+
MERGE INTO SYNAPSE_DATA_WAREHOUSE.SYNAPSE.CERTIFIEDQUIZQUESTION_LATEST AS TARGET_TABLE
267+
USING (
268+
WITH CQQ_RANKED AS (
269+
SELECT
270+
*,
271+
ROW_NUMBER() OVER (
272+
PARTITION BY RESPONSE_ID, QUESTION_INDEX
273+
ORDER BY IS_CORRECT DESC, INSTANCE DESC
274+
) AS ROW_NUM
275+
FROM CERTIFIEDQUIZQUESTION_STREAM
276+
)
277+
278+
SELECT * EXCLUDE ROW_NUM
279+
FROM CQQ_RANKED
280+
WHERE ROW_NUM = 1
281+
ORDER BY RESPONSE_ID DESC, QUESTION_INDEX ASC
282+
) AS SOURCE_TABLE ON TARGET_TABLE.RESPONSE_ID = SOURCE_TABLE.RESPONSE_ID AND TARGET_TABLE.QUESTION_INDEX = SOURCE_TABLE.QUESTION_INDEX
283+
WHEN MATCHED THEN
284+
UPDATE SET
285+
TARGET_TABLE.RESPONSE_ID = SOURCE_TABLE.RESPONSE_ID,
286+
TARGET_TABLE.QUESTION_INDEX = SOURCE_TABLE.QUESTION_INDEX,
287+
TARGET_TABLE.IS_CORRECT = SOURCE_TABLE.IS_CORRECT,
288+
TARGET_TABLE.STACK = SOURCE_TABLE.STACK,
289+
TARGET_TABLE.INSTANCE = SOURCE_TABLE.INSTANCE,
290+
TARGET_TABLE.RECORD_DATE = SOURCE_TABLE.RECORD_DATE
291+
WHEN NOT MATCHED THEN
292+
INSERT (RESPONSE_ID, QUESTION_INDEX, IS_CORRECT, STACK, INSTANCE, RECORD_DATE)
293+
VALUES (SOURCE_TABLE.RESPONSE_ID, SOURCE_TABLE.QUESTION_INDEX, SOURCE_TABLE.IS_CORRECT, SOURCE_TABLE.STACK, SOURCE_TABLE.INSTANCE, SOURCE_TABLE.RECORD_DATE);
294+
295+
296+
-- merge into node latest
297+
298+
MERGE INTO SYNAPSE_DATA_WAREHOUSE.SYNAPSE.NODE_LATEST AS TARGET_TABLE
299+
USING (
300+
WITH RANKED_NODES AS (
301+
SELECT
302+
*,
303+
"row_number"()
304+
OVER (
305+
PARTITION BY ID
306+
ORDER BY CHANGE_TIMESTAMP DESC, SNAPSHOT_TIMESTAMP DESC
307+
)
308+
AS N
309+
FROM NODESNAPSHOTS_STREAM
310+
WHERE
311+
CHANGE_TYPE != 'DELETE'
312+
)
313+
314+
SELECT * EXCLUDE N
315+
FROM RANKED_NODES
316+
WHERE N = 1
317+
) AS SOURCE_TABLE ON TARGET_TABLE.ID = SOURCE_TABLE.ID
318+
WHEN MATCHED THEN
319+
UPDATE SET
320+
TARGET_TABLE.CHANGE_TYPE = SOURCE_TABLE.CHANGE_TYPE,
321+
TARGET_TABLE.CHANGE_TIMESTAMP = SOURCE_TABLE.CHANGE_TIMESTAMP,
322+
TARGET_TABLE.CHANGE_USER_ID = SOURCE_TABLE.CHANGE_USER_ID,
323+
TARGET_TABLE.SNAPSHOT_TIMESTAMP = SOURCE_TABLE.SNAPSHOT_TIMESTAMP,
324+
TARGET_TABLE.ID = SOURCE_TABLE.ID,
325+
TARGET_TABLE.BENEFACTOR_ID = SOURCE_TABLE.BENEFACTOR_ID,
326+
TARGET_TABLE.PROJECT_ID = SOURCE_TABLE.PROJECT_ID,
327+
TARGET_TABLE.PARENT_ID = SOURCE_TABLE.PARENT_ID,
328+
TARGET_TABLE.NODE_TYPE = SOURCE_TABLE.NODE_TYPE,
329+
TARGET_TABLE.CREATED_ON = SOURCE_TABLE.CREATED_ON,
330+
TARGET_TABLE.CREATED_BY = SOURCE_TABLE.CREATED_BY,
331+
TARGET_TABLE.MODIFIED_ON = SOURCE_TABLE.MODIFIED_ON,
332+
TARGET_TABLE.MODIFIED_BY = SOURCE_TABLE.MODIFIED_BY,
333+
TARGET_TABLE.VERSION_NUMBER = SOURCE_TABLE.VERSION_NUMBER,
334+
TARGET_TABLE.FILE_HANDLE_ID = SOURCE_TABLE.FILE_HANDLE_ID,
335+
TARGET_TABLE.NAME = SOURCE_TABLE.NAME,
336+
TARGET_TABLE.IS_PUBLIC = SOURCE_TABLE.IS_PUBLIC,
337+
TARGET_TABLE.IS_CONTROLLED = SOURCE_TABLE.IS_CONTROLLED,
338+
TARGET_TABLE.IS_RESTRICTED = SOURCE_TABLE.IS_RESTRICTED,
339+
TARGET_TABLE.SNAPSHOT_DATE = SOURCE_TABLE.SNAPSHOT_DATE
340+
WHEN NOT MATCHED THEN
341+
INSERT (CHANGE_TYPE, CHANGE_TIMESTAMP, CHANGE_USER_ID, SNAPSHOT_TIMESTAMP, ID, BENEFACTOR_ID, PROJECT_ID, PARENT_ID, NODE_TYPE, CREATED_ON, CREATED_BY, MODIFIED_ON, MODIFIED_BY, VERSION_NUMBER, FILE_HANDLE_ID, NAME, IS_PUBLIC, IS_CONTROLLED, IS_RESTRICTED, SNAPSHOT_DATE)
342+
VALUES (SOURCE_TABLE.CHANGE_TYPE, SOURCE_TABLE.CHANGE_TIMESTAMP, SOURCE_TABLE.CHANGE_USER_ID, SOURCE_TABLE.SNAPSHOT_TIMESTAMP, SOURCE_TABLE.ID, SOURCE_TABLE.BENEFACTOR_ID, SOURCE_TABLE.PROJECT_ID, SOURCE_TABLE.PARENT_ID, SOURCE_TABLE.NODE_TYPE, SOURCE_TABLE.CREATED_ON, SOURCE_TABLE.CREATED_BY, SOURCE_TABLE.MODIFIED_ON, SOURCE_TABLE.MODIFIED_BY, SOURCE_TABLE.VERSION_NUMBER, SOURCE_TABLE.FILE_HANDLE_ID, SOURCE_TABLE.NAME, SOURCE_TABLE.IS_PUBLIC, SOURCE_TABLE.IS_CONTROLLED, SOURCE_TABLE.IS_RESTRICTED, SOURCE_TABLE.SNAPSHOT_DATE);
343+
344+
345+
-- merge into file latest
346+
347+
MERGE INTO SYNAPSE_DATA_WAREHOUSE.SYNAPSE.FILE_LATEST AS TARGET_TABLE
348+
USING (
349+
WITH RANKED_NODES AS (
350+
SELECT
351+
*,
352+
"row_number"()
353+
OVER (
354+
PARTITION BY ID
355+
ORDER BY CHANGE_TIMESTAMP DESC, SNAPSHOT_TIMESTAMP DESC
356+
)
357+
AS N
358+
FROM FILESNAPSHOTS_STREAM
359+
WHERE
360+
NOT IS_PREVIEW
361+
AND CHANGE_TYPE != 'DELETE'
362+
)
363+
364+
SELECT * EXCLUDE N
365+
FROM RANKED_NODES
366+
WHERE N = 1
367+
368+
) AS SOURCE_TABLE ON TARGET_TABLE.ID = SOURCE_TABLE.ID
369+
WHEN MATCHED THEN
370+
UPDATE SET
371+
TARGET_TABLE.CHANGE_TYPE = SOURCE_TABLE.CHANGE_TYPE,
372+
TARGET_TABLE.CHANGE_TIMESTAMP = SOURCE_TABLE.CHANGE_TIMESTAMP,
373+
TARGET_TABLE.CHANGE_USER_ID = SOURCE_TABLE.CHANGE_USER_ID,
374+
TARGET_TABLE.SNAPSHOT_TIMESTAMP = SOURCE_TABLE.SNAPSHOT_TIMESTAMP,
375+
TARGET_TABLE.ID = SOURCE_TABLE.ID,
376+
TARGET_TABLE.CREATED_BY = SOURCE_TABLE.CREATED_BY,
377+
TARGET_TABLE.CREATED_ON = SOURCE_TABLE.CREATED_ON,
378+
TARGET_TABLE.MODIFIED_ON = SOURCE_TABLE.MODIFIED_ON,
379+
TARGET_TABLE.CONCRETE_TYPE = SOURCE_TABLE.CONCRETE_TYPE,
380+
TARGET_TABLE.CONTENT_MD5 = SOURCE_TABLE.CONTENT_MD5,
381+
TARGET_TABLE.CONTENT_TYPE = SOURCE_TABLE.CONTENT_TYPE,
382+
TARGET_TABLE.FILE_NAME = SOURCE_TABLE.FILE_NAME,
383+
TARGET_TABLE.STORAGE_LOCATION_ID = SOURCE_TABLE.STORAGE_LOCATION_ID,
384+
TARGET_TABLE.CONTENT_SIZE = SOURCE_TABLE.CONTENT_SIZE,
385+
TARGET_TABLE.BUCKET = SOURCE_TABLE.BUCKET,
386+
TARGET_TABLE.KEY = SOURCE_TABLE.KEY,
387+
TARGET_TABLE.PREVIEW_ID = SOURCE_TABLE.PREVIEW_ID,
388+
TARGET_TABLE.IS_PREVIEW = SOURCE_TABLE.IS_PREVIEW,
389+
TARGET_TABLE.STATUS = SOURCE_TABLE.STATUS,
390+
TARGET_TABLE.SNAPSHOT_DATE = SOURCE_TABLE.SNAPSHOT_DATE
391+
WHEN NOT MATCHED THEN
392+
INSERT (CHANGE_TYPE, CHANGE_TIMESTAMP, CHANGE_USER_ID, SNAPSHOT_TIMESTAMP, ID, CREATED_BY, CREATED_ON, MODIFIED_ON, CONCRETE_TYPE, CONTENT_MD5, CONTENT_TYPE, FILE_NAME, STORAGE_LOCATION_ID, CONTENT_SIZE, BUCKET, KEY, PREVIEW_ID, IS_PREVIEW, STATUS, SNAPSHOT_DATE)
393+
VALUES (SOURCE_TABLE.CHANGE_TYPE, SOURCE_TABLE.CHANGE_TIMESTAMP, SOURCE_TABLE.CHANGE_USER_ID, SOURCE_TABLE.SNAPSHOT_TIMESTAMP, SOURCE_TABLE.ID, SOURCE_TABLE.CREATED_BY, SOURCE_TABLE.CREATED_ON, SOURCE_TABLE.MODIFIED_ON, SOURCE_TABLE.CONCRETE_TYPE, SOURCE_TABLE.CONTENT_MD5, SOURCE_TABLE.CONTENT_TYPE, SOURCE_TABLE.FILE_NAME, SOURCE_TABLE.STORAGE_LOCATION_ID, SOURCE_TABLE.CONTENT_SIZE, SOURCE_TABLE.BUCKET, SOURCE_TABLE.KEY, SOURCE_TABLE.PREVIEW_ID, SOURCE_TABLE.IS_PREVIEW, SOURCE_TABLE.STATUS, SOURCE_TABLE.SNAPSHOT_DATE);
394+
395+
396+
MERGE INTO SYNAPSE_DATA_WAREHOUSE.SYNAPSE.USERPROFILE_LATEST AS TARGET_TABLE
397+
USING (
398+
WITH RANKED_NODES AS (
399+
SELECT
400+
*,
401+
"row_number"()
402+
OVER (
403+
PARTITION BY ID
404+
ORDER BY CHANGE_TIMESTAMP DESC, SNAPSHOT_TIMESTAMP DESC
405+
)
406+
AS N
407+
FROM
408+
USERPROFILESNAPSHOT_STREAM
409+
)
410+
411+
SELECT * EXCLUDE N
412+
FROM RANKED_NODES
413+
WHERE N = 1
414+
) AS SOURCE_TABLE ON TARGET_TABLE.ID = SOURCE_TABLE.ID
415+
WHEN MATCHED THEN
416+
UPDATE SET
417+
TARGET_TABLE.CHANGE_TYPE = SOURCE_TABLE.CHANGE_TYPE,
418+
TARGET_TABLE.CHANGE_TIMESTAMP = SOURCE_TABLE.CHANGE_TIMESTAMP,
419+
TARGET_TABLE.CHANGE_USER_ID = SOURCE_TABLE.CHANGE_USER_ID,
420+
TARGET_TABLE.SNAPSHOT_TIMESTAMP = SOURCE_TABLE.SNAPSHOT_TIMESTAMP,
421+
TARGET_TABLE.ID = SOURCE_TABLE.ID,
422+
TARGET_TABLE.USER_NAME = SOURCE_TABLE.USER_NAME,
423+
TARGET_TABLE.FIRST_NAME = SOURCE_TABLE.FIRST_NAME,
424+
TARGET_TABLE.LAST_NAME = SOURCE_TABLE.LAST_NAME,
425+
TARGET_TABLE.EMAIL = SOURCE_TABLE.EMAIL,
426+
TARGET_TABLE.LOCATION = SOURCE_TABLE.LOCATION,
427+
TARGET_TABLE.COMPANY = SOURCE_TABLE.COMPANY,
428+
TARGET_TABLE.POSITION = SOURCE_TABLE.POSITION,
429+
TARGET_TABLE.SNAPSHOT_DATE = SOURCE_TABLE.SNAPSHOT_DATE
430+
WHEN NOT MATCHED THEN
431+
INSERT (CHANGE_TYPE, CHANGE_TIMESTAMP, CHANGE_USER_ID, SNAPSHOT_TIMESTAMP, ID, USER_NAME, FIRST_NAME, LAST_NAME, EMAIL, LOCATION, COMPANY, POSITION, SNAPSHOT_DATE)
432+
VALUES (SOURCE_TABLE.CHANGE_TYPE, SOURCE_TABLE.CHANGE_TIMESTAMP, SOURCE_TABLE.CHANGE_USER_ID, SOURCE_TABLE.SNAPSHOT_TIMESTAMP, SOURCE_TABLE.ID, SOURCE_TABLE.USER_NAME, SOURCE_TABLE.FIRST_NAME, SOURCE_TABLE.LAST_NAME, SOURCE_TABLE.EMAIL, SOURCE_TABLE.LOCATION, SOURCE_TABLE.COMPANY, SOURCE_TABLE.POSITION, SOURCE_TABLE.SNAPSHOT_DATE);
433+
434+
435+
MERGE INTO SYNAPSE_DATA_WAREHOUSE.SYNAPSE.TEAMMEMBER_LATEST AS TARGET_TABLE
436+
USING (
437+
WITH RANKED_NODES AS (
438+
SELECT
439+
*,
440+
"row_number"()
441+
OVER (
442+
PARTITION BY TEAM_ID, MEMBER_ID
443+
ORDER BY CHANGE_TIMESTAMP DESC, SNAPSHOT_TIMESTAMP DESC
444+
)
445+
AS N
446+
FROM
447+
TEAMMEMBERSNAPSHOTS_STREAM
448+
)
449+
450+
SELECT * EXCLUDE N
451+
FROM RANKED_NODES
452+
WHERE N = 1
453+
) AS SOURCE_TABLE ON TARGET_TABLE.MEMBER_ID = SOURCE_TABLE.MEMBER_ID AND TARGET_TABLE.TEAM_ID = SOURCE_TABLE.TEAM_ID
454+
WHEN MATCHED THEN
455+
UPDATE SET
456+
TARGET_TABLE.CHANGE_TYPE = SOURCE_TABLE.CHANGE_TYPE,
457+
TARGET_TABLE.CHANGE_TIMESTAMP = SOURCE_TABLE.CHANGE_TIMESTAMP,
458+
TARGET_TABLE.CHANGE_USER_ID = SOURCE_TABLE.CHANGE_USER_ID,
459+
TARGET_TABLE.SNAPSHOT_TIMESTAMP = SOURCE_TABLE.SNAPSHOT_TIMESTAMP,
460+
TARGET_TABLE.TEAM_ID = SOURCE_TABLE.TEAM_ID,
461+
TARGET_TABLE.MEMBER_ID = SOURCE_TABLE.MEMBER_ID,
462+
TARGET_TABLE.IS_ADMIN = SOURCE_TABLE.IS_ADMIN,
463+
TARGET_TABLE.SNAPSHOT_DATE = SOURCE_TABLE.SNAPSHOT_DATE
464+
WHEN NOT MATCHED THEN
465+
INSERT (CHANGE_TYPE, CHANGE_TIMESTAMP, CHANGE_USER_ID, SNAPSHOT_TIMESTAMP, TEAM_ID, MEMBER_ID, IS_ADMIN, SNAPSHOT_DATE)
466+
VALUES (SOURCE_TABLE.CHANGE_TYPE, SOURCE_TABLE.CHANGE_TIMESTAMP, SOURCE_TABLE.CHANGE_USER_ID, SOURCE_TABLE.SNAPSHOT_TIMESTAMP, SOURCE_TABLE.TEAM_ID, SOURCE_TABLE.MEMBER_ID, SOURCE_TABLE.IS_ADMIN, SOURCE_TABLE.SNAPSHOT_DATE);
467+
215468
CREATE OR REPLACE TABLE SYNAPSE_DATA_WAREHOUSE.SYNAPSE.FILEDOWNLOAD
216469
CLONE SYNAPSE_DATA_WAREHOUSE.SYNAPSE_RAW.FILEDOWNLOAD;
217470

0 commit comments

Comments
 (0)