Skip to content

Commit

Permalink
Disable and enable CloudWatch alarm during Data Refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
krysal committed Jan 16, 2024
1 parent 76e35c1 commit 75543a3
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 3 deletions.
62 changes: 62 additions & 0 deletions catalog/dags/common/cloudwatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""
CloudwatchWrapper extracted partially from
https://github.com/awsdocs/aws-doc-sdk-examples/blob/54c3b82d8f9a12a862f9fcec44909829bda849af/python/example_code/cloudwatch/cloudwatch_basics.py
"""

import logging
import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)


class CloudWatchWrapper:
"""Encapsulates Amazon CloudWatch functions"""

def __init__(self, cloudwatch_resource):
""":param cloudwatch_resource: A Boto3 CloudWatch resource."""
self.cloudwatch_resource = cloudwatch_resource

def enable_alarm_actions(self, alarm_name, enable):
"""
Enable or disable actions on the specified alarm. Alarm actions can be
used to send notifications or automate responses when an alarm enters a
particular state.
:param alarm_name: The name of the alarm.
:param enable: When True, actions are enabled for the alarm. Otherwise, they
disabled.
"""
try:
alarm = self.cloudwatch_resource.Alarm(alarm_name)
if enable:
alarm.enable_actions()
else:
alarm.disable_actions()
logger.info(
"%s actions for alarm %s.",
"Enabled" if enable else "Disabled",
alarm_name,
)
except ClientError:
logger.exception(
"Couldn't %s actions alarm %s.",
"enable" if enable else "disable",
alarm_name,
)
raise


def enable_or_disable_alarms(enable):
cw_wrapper = CloudWatchWrapper(boto3.resource("cloudwatch"))

sensitive_alarms_list = [
"API Production Average Response Time above threshold",
"API Production Average Response Time anomalously high",
"API Production P99 Response Time above threshold",
"API Production P99 Response Time anomalously high",
"ES Production CPU utilization above 50%",
]

for alarm in sensitive_alarms_list:
cw_wrapper.enable_alarm_actions(alarm, enable)
19 changes: 16 additions & 3 deletions catalog/dags/data_refresh/data_refresh_task_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@
from collections.abc import Sequence

from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule

from common import ingestion_server
from common import ingestion_server, cloudwatch
from common.constants import XCOM_PULL_TEMPLATE
from common.sensors.single_run_external_dags_sensor import SingleRunExternalDAGsSensor
from common.sensors.utils import wait_for_external_dag
Expand Down Expand Up @@ -131,7 +132,13 @@ def create_data_refresh_task_group(
generate_index_suffix = ingestion_server.generate_index_suffix.override(
trigger_rule=TriggerRule.NONE_FAILED,
)()
tasks.append(generate_index_suffix)

disable_alarms = PythonOperator(
task_id="disable_cloudwatch_alarms",
python_callable=cloudwatch.enable_or_disable_alarms,
op_args=[False],
)
tasks.append([generate_index_suffix, disable_alarms])

# Trigger the 'ingest_upstream' task on the ingestion server and await its
# completion. This task copies the media table for the given model from the
Expand Down Expand Up @@ -201,7 +208,13 @@ def create_data_refresh_task_group(
),
},
)
tasks.append(delete_old_index)

enable_alarms = PythonOperator(
task_id="enable_cloudwatch_alarms",
python_callable=cloudwatch.enable_or_disable_alarms,
op_args=[True],
)
tasks.append([delete_old_index, enable_alarms])

# Finally, promote the filtered index.
tasks.append(promote_filtered_index)
Expand Down

0 comments on commit 75543a3

Please sign in to comment.