From 5305ceea277b13408041bf2b8fc13d3b3357668d Mon Sep 17 00:00:00 2001 From: Krystle Salazar Date: Wed, 31 Jan 2024 15:50:30 -0400 Subject: [PATCH] Re-apply changes of #3652 --- .../data_refresh/data_refresh_task_factory.py | 22 +++++++++++++++---- catalog/env.template | 3 ++- catalog/tests/dags/common/conftest.py | 8 +++---- catalog/tests/dags/common/loader/test_s3.py | 4 ++-- 4 files changed, 26 insertions(+), 11 deletions(-) diff --git a/catalog/dags/data_refresh/data_refresh_task_factory.py b/catalog/dags/data_refresh/data_refresh_task_factory.py index 0531746aaf0..a2ef9259443 100644 --- a/catalog/dags/data_refresh/data_refresh_task_factory.py +++ b/catalog/dags/data_refresh/data_refresh_task_factory.py @@ -49,10 +49,11 @@ from collections.abc import Sequence from airflow.models.baseoperator import chain +from airflow.operators.python import PythonOperator from airflow.utils.task_group import TaskGroup from airflow.utils.trigger_rule import TriggerRule -from common import ingestion_server +from common import cloudwatch, ingestion_server from common.constants import PRODUCTION, XCOM_PULL_TEMPLATE from common.sensors.single_run_external_dags_sensor import SingleRunExternalDAGsSensor from common.sensors.utils import wait_for_external_dags @@ -137,7 +138,12 @@ def create_data_refresh_task_group( generate_index_suffix = ingestion_server.generate_index_suffix.override( trigger_rule=TriggerRule.NONE_FAILED, )() - tasks.append(generate_index_suffix) + disable_alarms = PythonOperator( + task_id="disable_sensitive_cloudwatch_alarms", + python_callable=cloudwatch.enable_or_disable_alarms, + op_args=[False], + ) + tasks.append([generate_index_suffix, disable_alarms]) # Trigger the 'ingest_upstream' task on the ingestion server and await its # completion. This task copies the media table for the given model from the @@ -181,6 +187,13 @@ def create_data_refresh_task_group( # running against an index that is already promoted in production. tasks.append(create_filtered_index) + enable_alarms = PythonOperator( + task_id="enable_sensitive_cloudwatch_alarms", + python_callable=cloudwatch.enable_or_disable_alarms, + op_args=[True], + trigger_rule=TriggerRule.ALL_DONE, + ) + # Trigger the `promote` task on the ingestion server and await its completion. # This task promotes the newly created API DB table and elasticsearch index. It # does not include promotion of the filtered index, which must be promoted @@ -195,7 +208,7 @@ def create_data_refresh_task_group( }, timeout=data_refresh.data_refresh_timeout, ) - tasks.append(promote_tasks) + tasks.append([enable_alarms, promote_tasks]) # Delete the alias' previous target index, now unused. delete_old_index = ingestion_server.trigger_task( @@ -219,7 +232,8 @@ def create_data_refresh_task_group( # └─ create_filtered_index # └─ promote (trigger_promote + wait_for_promote) # └─ delete_old_index - # └─ promote_filtered_index (including delete filtered index) + # └─ promote_filtered_index (including delete filtered index) + + # enable_alarms chain(*tasks) return data_refresh_group diff --git a/catalog/env.template b/catalog/env.template index d2064aeaf66..7e36a901d60 100644 --- a/catalog/env.template +++ b/catalog/env.template @@ -89,9 +89,10 @@ AIRFLOW_PORT=9090 LOADER_FILE_AGE=1 # Contact email for any APIs CONTACT_EMAIL=openverse@wordpress.org -# AWS/S3 configuration - does not need to be changed for development +# AWS configuration - does not need to be changed for development AWS_ACCESS_KEY=test_key AWS_SECRET_KEY=test_secret +AWS_DEFAULT_REGION=us-east-1 # General bucket used for TSV->DB ingestion and logging OPENVERSE_BUCKET=openverse-storage # Whether to toggle production CloudWatch alarms when running a data refresh DAG. diff --git a/catalog/tests/dags/common/conftest.py b/catalog/tests/dags/common/conftest.py index c36f42a7209..3472b9f9dc0 100644 --- a/catalog/tests/dags/common/conftest.py +++ b/catalog/tests/dags/common/conftest.py @@ -5,9 +5,9 @@ import pytest from catalog.tests.dags.common.loader.test_s3 import ( - ACCESS_KEY, + AWS_ACCESS_KEY_ID, + AWS_SECRET_ACCESS_KEY, S3_LOCAL_ENDPOINT, - SECRET_KEY, ) @@ -40,8 +40,8 @@ def empty_s3_bucket(request): print(f"{bucket_name=}") bucket = boto3.resource( "s3", - aws_access_key_id=ACCESS_KEY, - aws_secret_access_key=SECRET_KEY, + aws_access_key_id=AWS_ACCESS_KEY_ID, + aws_secret_access_key=AWS_SECRET_ACCESS_KEY, endpoint_url=S3_LOCAL_ENDPOINT, ).Bucket(bucket_name) diff --git a/catalog/tests/dags/common/loader/test_s3.py b/catalog/tests/dags/common/loader/test_s3.py index 46786727657..569b04ad022 100644 --- a/catalog/tests/dags/common/loader/test_s3.py +++ b/catalog/tests/dags/common/loader/test_s3.py @@ -14,8 +14,8 @@ TEST_STAGING_PREFIX = "test_staging" S3_LOCAL_ENDPOINT = os.getenv("S3_LOCAL_ENDPOINT") S3_TEST_BUCKET = f"cccatalog-storage-{TEST_ID}" -ACCESS_KEY = os.getenv("AWS_ACCESS_KEY") -SECRET_KEY = os.getenv("AWS_SECRET_KEY") +ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID") +SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") @pytest.fixture