Проект направлен на ежедневную отправку основных метрик в telegram в установленное время. Данные взяты из приложения, объединяющее ленту новостей и сообщений. Также в чат telegram направляются алерты с информацией по отклонениям метрик от нормы.
Стэк:
- JupiterHub
- Clickhouse
- Airflow
- Superset
- Python
Подключаемся к базе данных и сразу зададим все условия для отправки итоговых отчетов в нужный чат. Отчеты за полный вчерашний день будут приходить каждый день в 11:00 по МСК.
#подключаемся к базе данных
connection = {
'host': 'https://clickhouse.lab.karpov.courses',
'database':'simulator_20221020',
'user':'student',
'password':'dpo_python_2020'
}
# Зададим параметры
default_args = {
'owner': 'a-lesihina', # Владелец операции
'depends_on_past': False, # Зависимость от прошлых запусков
'retries': 1, # Кол-во попыток выполнить DAG
'retry_delay': timedelta(minutes=1), # Промежуток между перезапусками
'start_date': datetime(2022, 11, 15) # Дата начала выполнения DAG
}
В файле Feed_report и Full_report можно посмотреть настройку графиков и способ подсчета метрик.
В результате ежедневно получаем следующий отчет:
Данные для таблцы получаем тем же образом, что и на предыдущей отчетности.
В данной отчетности отражается информация:
- значения ключевых метрик за предыдущий день
- график с значениями метрик за предыдущие 7 дней
- отражены метрики: DAU, Просмотры, Лайки, CTR
В результате получаем следующую отчетность:
Система с периодичностью в 15 минут проверяет ключевые метрики, такие,как: активные пользователи в ленте / мессенджере, просмотры, лайки, CTR, количество отправленных сообщений.
В случае обнаружения аномального значения, в чат направляется алерт и график.
Используем квартили для определения верхних и нижних допустимых значений.
def check_anomalys(df, metric,a=3, n=6):
df['25'] = df[metric].shift(1).rolling(n).quantile(0.25)
df['75'] = df[metric].shift(1).rolling(n).quantile(0.75)
df['iqr'] = df['75'] - df['25']
df['up'] = df['75'] + a*df['iqr']
df['low'] = df['25'] - a*df['iqr']
df['up'] = df['up'].rolling(n, center=True, min_periods=1).mean()
df['low'] = df['low'].rolling(n, center=True, min_periods=1).mean()
if df[metric].iloc[-1] < df['low'].iloc[-1] or df[metric].iloc[-1] > df['up'].iloc[-1]:
is_alert = 1
else:
is_alert = 0
return is_alert, df
После, с помощью SQL, объединяем данные в одну таблицу для создания единого таска и настраиваем внешний вид сообщения и графиков. Итоговое сообщение выглядит следующим образом: