-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathreplicator_table_check.py
168 lines (143 loc) · 6.9 KB
/
replicator_table_check.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""Health checks for the MOVE -> bigdata replication process.
Checks for:
- `not_copied`: tables which are up to date in `move_staging` but not being copied downstream.
- `not_up_to_date`: tables which are being erroneously copied from `move_staging` without being up to date.
- `outdated_remove`: tables in `move_staging` which are outdated.
"""
#!/data/airflow/airflow_venv/bin/python3
# -*- coding: utf-8 -*-
# noqa: D415
import os
import sys
import pendulum
# pylint: disable=import-error
from airflow.decorators import dag, task
from airflow.models import Variable
from airflow.exceptions import AirflowFailException
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.latest_only import LatestOnlyOperator
# import custom operators and helper functions
repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
sys.path.insert(0, repo_path)
# pylint: disable=wrong-import-position
from dags.dag_functions import task_fail_slack_alert, get_readme_docmd
# pylint: enable=import-error
DAG_NAME = 'replicator_table_check'
DAG_OWNERS = Variable.get("dag_owners", deserialize_json=True).get(DAG_NAME, ["Unknown"])
README_PATH = os.path.join(repo_path, 'collisions/Readme.md')
DOC_MD = get_readme_docmd(README_PATH, DAG_NAME)
default_args = {
"owner": ",".join(DAG_OWNERS),
"depends_on_past": False,
"start_date": pendulum.datetime(2024, 4, 12, tz="America/Toronto"),
"email_on_failure": False,
"retries": 0,
"on_failure_callback": task_fail_slack_alert,
}
@dag(
dag_id=DAG_NAME,
default_args=default_args,
catchup=False,
max_active_runs=1,
#loosely coupled with the two replicator DAGs which are externally triggered at 430am
schedule='0 4 * * *',
doc_md=DOC_MD,
tags=["replicator", "data_checks"]
)
def replicator_DAG():
#backfill is meaningless since comparing to current table comments.
no_backfill = LatestOnlyOperator(task_id = 'no_backfill')
@task()
def tables_to_copy():
"""This task finds all the replicators from `replicators` airflow variable,
and then finds all the tables listed for replication by looking at the Airflow variable
listed in the `tables` key item in the replicator dictionaries."""
#a list of the replicators
replicators = Variable.get('replicators', deserialize_json=True)
#extract source tables from airflow variables
tables_to_copy = []
for _, dag_items in replicators.items():
tables = Variable.get(dag_items['tables'], deserialize_json=True)
src_tables = [tbl[0] for tbl in tables]
tables_to_copy = tables_to_copy + src_tables
#get only source table names
return tables_to_copy
@task()
def updated_tables(ds):
"""This task finds tables in `move_staging` with comments
indicating they are up to date ("last updated on {ds}")."""
updated_tables_sql = """
SELECT pgn.nspname::text || '.' || pgc.relname::text, pgd.description
FROM pg_description AS pgd
JOIN pg_class AS pgc ON pgd.objoid = pgc.oid
JOIN pg_namespace pgn ON pgc.relnamespace = pgn.oid
WHERE
pgn.nspname = 'move_staging'
AND pgc.relkind = 'r'
AND pgd.description ILIKE %s"""
con = PostgresHook("replicator_bot").get_conn()
with con.cursor() as cur:
cur.execute(updated_tables_sql, (f'%Last updated on {ds}%',))
updated_tables = [tbl[0] for tbl in cur.fetchall()]
return updated_tables
@task()
def not_copied(updated_tables: list, tables_to_copy: list, **context):
"""This task finds tables which are up to date according to their comments in `move_staging` schema
but are not being copied by the replicator due to not being included in the Airflow variables."""
failures = [value for value in updated_tables if value not in tables_to_copy]
if failures != []:
#send message with details of failure using task_fail_slack_alert
failure_extra_msg = [
"The following tables are up to date in `move_staging` but not being copied by bigdata replicators:",
sorted(failures)
]
context.get("task_instance").xcom_push(key="extra_msg", value=failure_extra_msg)
raise AirflowFailException('There were up to date tables in `move_staging` which were not copied by bigdata replicators.')
@task()
def not_up_to_date(updated_tables: list, tables_to_copy: list, **context):
"""This task finds tables which are being copied by the bigdata replicator via Airflow variables
which are not up to date according to their comments in `move_staging` schema."""
failures = [value for value in tables_to_copy if value not in updated_tables]
if failures != []:
#send message with details of failure using task_fail_slack_alert
failure_extra_msg = [
"The following tables are being copied by bigdata replicators, but are not up to date in `move_staging`:" ,
sorted(failures)
]
context.get("task_instance").xcom_push(key="extra_msg", value=failure_extra_msg)
raise AirflowFailException('There were tables copied from `move_staging` by bigdata replicators which were not up to date.')
@task()
def outdated_remove(ds, **context):
"""This task finds outdated tables in `move_staging` based on
comments not matching "last updated on {ds}"."""
outdated_tables_sql = """
SELECT pgn.nspname::text || '.' || pgc.relname::text, pgd.description
FROM pg_namespace AS pgn
JOIN pg_class AS pgc ON pgc.relnamespace = pgn.oid
LEFT JOIN pg_description AS pgd ON pgd.objoid = pgc.oid
WHERE
pgn.nspname = 'move_staging'
AND (
pgd.description NOT ILIKE %s
OR pgd.description IS NULL
)
AND pgc.relkind = 'r';"""
con = PostgresHook("replicator_bot").get_conn()
with con.cursor() as cur:
cur.execute(outdated_tables_sql, (f'%Last updated on {ds}%',))
failures = [tbl[0] for tbl in cur.fetchall()]
if failures != []:
#send message with details of failure using task_fail_slack_alert
failure_extra_msg = [
"The following tables in `move_staging` are outdated and should be purged:" ,
sorted(failures)
]
context.get("task_instance").xcom_push(key="extra_msg", value=failure_extra_msg)
raise AirflowFailException('There were outdated tables in bigdata `move_staging` schema.')
updated_tables, tables_to_copy = updated_tables(), tables_to_copy()
no_backfill >> (
not_copied(updated_tables, tables_to_copy),
not_up_to_date(updated_tables, tables_to_copy),
outdated_remove()
)
replicator_DAG()