Skip to content

Commit

Permalink
Re-apply changes of #3652
Browse files Browse the repository at this point in the history
  • Loading branch information
krysal committed Feb 7, 2024
1 parent 37396ef commit a8ffc17
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 11 deletions.
22 changes: 18 additions & 4 deletions catalog/dags/data_refresh/data_refresh_task_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@
from collections.abc import Sequence

from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule

from common import ingestion_server
from common import cloudwatch, ingestion_server
from common.constants import PRODUCTION, XCOM_PULL_TEMPLATE
from common.sensors.single_run_external_dags_sensor import SingleRunExternalDAGsSensor
from common.sensors.utils import wait_for_external_dags
Expand Down Expand Up @@ -137,7 +138,12 @@ def create_data_refresh_task_group(
generate_index_suffix = ingestion_server.generate_index_suffix.override(
trigger_rule=TriggerRule.NONE_FAILED,
)()
tasks.append(generate_index_suffix)
disable_alarms = PythonOperator(
task_id="disable_sensitive_cloudwatch_alarms",
python_callable=cloudwatch.enable_or_disable_alarms,
op_args=[False],
)
tasks.append([generate_index_suffix, disable_alarms])

# Trigger the 'ingest_upstream' task on the ingestion server and await its
# completion. This task copies the media table for the given model from the
Expand Down Expand Up @@ -181,6 +187,13 @@ def create_data_refresh_task_group(
# running against an index that is already promoted in production.
tasks.append(create_filtered_index)

enable_alarms = PythonOperator(
task_id="enable_sensitive_cloudwatch_alarms",
python_callable=cloudwatch.enable_or_disable_alarms,
op_args=[True],
trigger_rule=TriggerRule.ALL_DONE,
)

# Trigger the `promote` task on the ingestion server and await its completion.
# This task promotes the newly created API DB table and elasticsearch index. It
# does not include promotion of the filtered index, which must be promoted
Expand All @@ -195,7 +208,7 @@ def create_data_refresh_task_group(
},
timeout=data_refresh.data_refresh_timeout,
)
tasks.append(promote_tasks)
tasks.append([enable_alarms, promote_tasks])

# Delete the alias' previous target index, now unused.
delete_old_index = ingestion_server.trigger_task(
Expand All @@ -219,7 +232,8 @@ def create_data_refresh_task_group(
# └─ create_filtered_index
# └─ promote (trigger_promote + wait_for_promote)
# └─ delete_old_index
# └─ promote_filtered_index (including delete filtered index)
# └─ promote_filtered_index (including delete filtered index) +
# enable_alarms
chain(*tasks)

return data_refresh_group
3 changes: 2 additions & 1 deletion catalog/env.template
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ AIRFLOW_PORT=9090
LOADER_FILE_AGE=1
# Contact email for any APIs
CONTACT_EMAIL=openverse@wordpress.org
# AWS/S3 configuration - does not need to be changed for development
# AWS configuration - does not need to be changed for development
AWS_ACCESS_KEY=test_key
AWS_SECRET_KEY=test_secret
AWS_DEFAULT_REGION=us-east-1
# General bucket used for TSV->DB ingestion and logging
OPENVERSE_BUCKET=openverse-storage
# Whether to toggle production CloudWatch alarms when running a data refresh DAG.
Expand Down
8 changes: 4 additions & 4 deletions catalog/tests/dags/common/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import pytest

from catalog.tests.dags.common.loader.test_s3 import (
ACCESS_KEY,
AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY,
S3_LOCAL_ENDPOINT,
SECRET_KEY,
)


Expand Down Expand Up @@ -40,8 +40,8 @@ def empty_s3_bucket(request):
print(f"{bucket_name=}")
bucket = boto3.resource(
"s3",
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
endpoint_url=S3_LOCAL_ENDPOINT,
).Bucket(bucket_name)

Expand Down
4 changes: 2 additions & 2 deletions catalog/tests/dags/common/loader/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
TEST_STAGING_PREFIX = "test_staging"
S3_LOCAL_ENDPOINT = os.getenv("S3_LOCAL_ENDPOINT")
S3_TEST_BUCKET = f"cccatalog-storage-{TEST_ID}"
ACCESS_KEY = os.getenv("AWS_ACCESS_KEY")
SECRET_KEY = os.getenv("AWS_SECRET_KEY")
ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID")
SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")


@pytest.fixture
Expand Down

0 comments on commit a8ffc17

Please sign in to comment.