-
Notifications
You must be signed in to change notification settings - Fork 394
/
airflow-log-cleanup-pwdless-ssh.py
195 lines (168 loc) · 6.54 KB
/
airflow-log-cleanup-pwdless-ssh.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
"""
A maintenance workflow that you can deploy into Airflow to periodically clean
out the task logs to avoid those getting too big.
airflow trigger_dag --conf '[curly-braces]"maxLogAgeInDays":30[curly-braces]' airflow-log-cleanup
--conf options:
maxLogAgeInDays:<INT> - Optional
"""
import logging
import os
import time
from datetime import timedelta
import airflow
from airflow.configuration import conf
from airflow.models import DAG, Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
# airflow-log-cleanup
DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")
START_DATE = airflow.utils.dates.days_ago(1)
try:
BASE_LOG_FOLDER = conf.get("core", "BASE_LOG_FOLDER").rstrip("/")
except Exception as e:
BASE_LOG_FOLDER = conf.get("logging", "BASE_LOG_FOLDER").rstrip("/")
# How often to Run. @daily - Once a day at Midnight
SCHEDULE_INTERVAL = "@daily"
# Who is listed as the owner of this DAG in the Airflow Web Server
DAG_OWNER_NAME = "operations"
# List of email address to send email alerts to if this job fails
ALERT_EMAIL_ADDRESSES = []
# Length to retain the log files if not already provided in the conf. If this
# is set to 30, the job will remove those files that are 30 days old or older
DEFAULT_MAX_LOG_AGE_IN_DAYS = Variable.get(
"airflow_log_cleanup__max_log_age_in_days", 30
)
# Whether the job should delete the logs or not. Included if you want to
# temporarily avoid deleting the logs
ENABLE_DELETE = False
AIRFLOW_HOSTS = "localhost" # comma separated list of host(s)
TEMP_LOG_CLEANUP_SCRIPT_PATH = "/tmp/airflow_log_cleanup.sh"
DIRECTORIES_TO_DELETE = [BASE_LOG_FOLDER]
ENABLE_DELETE_CHILD_LOG = Variable.get(
"airflow_log_cleanup__enable_delete_child_log", "False"
)
logging.info("ENABLE_DELETE_CHILD_LOG " + ENABLE_DELETE_CHILD_LOG)
if not BASE_LOG_FOLDER or BASE_LOG_FOLDER.strip() == "":
raise ValueError(
"BASE_LOG_FOLDER variable is empty in airflow.cfg. It can be found "
"under the [core] (<2.0.0) section or [logging] (>=2.0.0) in the cfg file. "
"Kindly provide an appropriate directory path."
)
if ENABLE_DELETE_CHILD_LOG.lower() == "true":
try:
CHILD_PROCESS_LOG_DIRECTORY = conf.get(
"scheduler", "CHILD_PROCESS_LOG_DIRECTORY"
)
if CHILD_PROCESS_LOG_DIRECTORY != ' ':
DIRECTORIES_TO_DELETE.append(CHILD_PROCESS_LOG_DIRECTORY)
except Exception as e:
logging.exception(
"Could not obtain CHILD_PROCESS_LOG_DIRECTORY from " +
"Airflow Configurations: " + str(e)
)
default_args = {
'owner': DAG_OWNER_NAME,
'depends_on_past': False,
'email': ALERT_EMAIL_ADDRESSES,
'email_on_failure': True,
'email_on_retry': False,
'start_date': START_DATE,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
DAG_ID,
default_args=default_args,
schedule_interval=SCHEDULE_INTERVAL,
start_date=START_DATE,
tags=['teamclairvoyant', 'airflow-maintenance-dags']
)
if hasattr(dag, 'doc_md'):
dag.doc_md = __doc__
if hasattr(dag, 'catchup'):
dag.catchup = False
log_cleanup = """
echo "Getting Configurations..."
BASE_LOG_FOLDER=$1
MAX_LOG_AGE_IN_DAYS=$2
ENABLE_DELETE=$3
echo "Finished Getting Configurations"
echo ""
echo "Configurations:"
echo "BASE_LOG_FOLDER: \'${BASE_LOG_FOLDER}\'"
echo "MAX_LOG_AGE_IN_DAYS: \'${MAX_LOG_AGE_IN_DAYS}\'"
echo "ENABLE_DELETE: \'${ENABLE_DELETE}\'"
cleanup() {
echo "Executing Find Statement: $1"
FILES_MARKED_FOR_DELETE=$(eval $1)
echo "Process will be Deleting the following files or directories:"
echo "${FILES_MARKED_FOR_DELETE}"
echo "Process will be Deleting $(echo "${FILES_MARKED_FOR_DELETE}" |
grep -v \'^$\' | wc -l) files or directories"
echo ""
if [ "${ENABLE_DELETE}" == "true" ]; then
if [ "${FILES_MARKED_FOR_DELETE}" != "" ]; then
echo "Executing Delete Statement: $2"
eval $2
DELETE_STMT_EXIT_CODE=$?
if [ "${DELETE_STMT_EXIT_CODE}" != "0" ]; then
echo "Delete process failed with exit code \'${DELETE_STMT_EXIT_CODE}\'"
exit ${DELETE_STMT_EXIT_CODE}
fi
else
echo "WARN: No files or directories to Delete"
fi
else
echo "WARN: You have opted to skip deleting the files or directories"
fi
}
echo ""
echo "Running Cleanup Process..."
FIND_STATEMENT="find ${BASE_LOG_FOLDER}/*/* -type f -mtime +${MAX_LOG_AGE_IN_DAYS}"
DELETE_STMT="${FIND_STATEMENT} -exec rm -f {} \;"
cleanup "${FIND_STATEMENT}" "${DELETE_STMT}"
CLEANUP_EXIT_CODE=$?
FIND_STATEMENT="find ${BASE_LOG_FOLDER}/*/* -type d -empty"
DELETE_STMT="${FIND_STATEMENT} -prune -exec rm -rf {} \;"
cleanup "${FIND_STATEMENT}" "${DELETE_STMT}"
CLEANUP_EXIT_CODE=$?
FIND_STATEMENT="find ${BASE_LOG_FOLDER}/* -type d -empty"
DELETE_STMT="${FIND_STATEMENT} -prune -exec rm -rf {} \;"
cleanup "${FIND_STATEMENT}" "${DELETE_STMT}"
CLEANUP_EXIT_CODE=$?
echo "Finished Running Cleanup Process"
"""
create_log_cleanup_script = BashOperator(
task_id=f'create_log_cleanup_script',
bash_command=f"""
echo '{log_cleanup}' > {TEMP_LOG_CLEANUP_SCRIPT_PATH}
chmod +x {TEMP_LOG_CLEANUP_SCRIPT_PATH}
current_host=$(echo $HOSTNAME)
echo "Current Host: $current_host"
hosts_string={AIRFLOW_HOSTS}
echo "All Scheduler Hosts: $hosts_string"
IFS=',' read -ra host_array <<< "$hosts_string"
for host in "${{host_array[@]}}"
do
if [ "$host" != "$current_host" ]; then
echo "Copying log_cleanup script to $host..."
scp {TEMP_LOG_CLEANUP_SCRIPT_PATH} $host:{TEMP_LOG_CLEANUP_SCRIPT_PATH}
echo "Making the script executable..."
ssh -o StrictHostKeyChecking=no $host "chmod +x {TEMP_LOG_CLEANUP_SCRIPT_PATH}"
fi
done
""",
dag=dag)
for host in AIRFLOW_HOSTS.split(","):
for DIR_ID, DIRECTORY in enumerate(DIRECTORIES_TO_DELETE):
LOG_CLEANUP_COMMAND = f'{TEMP_LOG_CLEANUP_SCRIPT_PATH} {DIRECTORY} {DEFAULT_MAX_LOG_AGE_IN_DAYS} {str(ENABLE_DELETE).lower()}'
cleanup_task = BashOperator(
task_id=f'airflow_log_cleanup_{host}_dir_{DIR_ID}',
bash_command=f"""
echo "Executing cleanup script..."
ssh -o StrictHostKeyChecking=no {host} "{LOG_CLEANUP_COMMAND}"
echo "Removing cleanup script..."
ssh -o StrictHostKeyChecking=no {host} "rm {TEMP_LOG_CLEANUP_SCRIPT_PATH}"
""",
dag=dag)
cleanup_task.set_upstream(create_log_cleanup_script)