From 99e1cc58984217d3da3e313b798cbc91f9b40956 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 15 Dec 2023 17:11:17 -0800 Subject: [PATCH] Add create_new_es_index DAG --- .../data_refresh/create_filtered_index_dag.py | 2 +- .../dags/data_refresh/data_refresh_types.py | 2 + .../create_new_es_index.py | 206 ++++++++++++++++++ .../create_new_es_index_dag.py | 150 +++++++++++++ .../create_new_es_index_types.py | 48 ++++ catalog/dags/es/create_new_es_index/utils.py | 53 +++++ 6 files changed, 460 insertions(+), 1 deletion(-) create mode 100644 catalog/dags/es/create_new_es_index/create_new_es_index.py create mode 100644 catalog/dags/es/create_new_es_index/create_new_es_index_dag.py create mode 100644 catalog/dags/es/create_new_es_index/create_new_es_index_types.py create mode 100644 catalog/dags/es/create_new_es_index/utils.py diff --git a/catalog/dags/data_refresh/create_filtered_index_dag.py b/catalog/dags/data_refresh/create_filtered_index_dag.py index 7870b7b7f29..db4ca2f12f8 100644 --- a/catalog/dags/data_refresh/create_filtered_index_dag.py +++ b/catalog/dags/data_refresh/create_filtered_index_dag.py @@ -108,7 +108,7 @@ def prevent_concurrency_with_data_refresh(**context): ) with DAG( - dag_id=f"create_filtered_{media_type}_index", + dag_id=data_refresh.filtered_index_dag_id, default_args=DAG_DEFAULT_ARGS, schedule=None, start_date=datetime(2023, 4, 1), diff --git a/catalog/dags/data_refresh/data_refresh_types.py b/catalog/dags/data_refresh/data_refresh_types.py index 6791edb69f4..f6bc6dff631 100644 --- a/catalog/dags/data_refresh/data_refresh_types.py +++ b/catalog/dags/data_refresh/data_refresh_types.py @@ -53,6 +53,7 @@ class DataRefresh: """ dag_id: str = field(init=False) + filtered_index_dag_id: str = field(init=False) media_type: str start_date: datetime = datetime(2020, 1, 1) schedule: str | None = "0 0 * * 1" # Mondays 00:00 UTC @@ -69,6 +70,7 @@ class DataRefresh: def __post_init__(self): self.dag_id = f"{self.media_type}_data_refresh" + self.filtered_index_dag_id = f"create_filtered_{self.media_type}_index" DATA_REFRESH_CONFIGS = { diff --git a/catalog/dags/es/create_new_es_index/create_new_es_index.py b/catalog/dags/es/create_new_es_index/create_new_es_index.py new file mode 100644 index 00000000000..9c7edc50873 --- /dev/null +++ b/catalog/dags/es/create_new_es_index/create_new_es_index.py @@ -0,0 +1,206 @@ +import logging + +from airflow.decorators import task, task_group +from airflow.exceptions import AirflowException, AirflowSensorTimeout +from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook +from airflow.sensors.external_task import ExternalTaskSensor +from airflow.sensors.python import PythonSensor +from airflow.utils.state import State +from es.create_new_es_index.utils import merge_configurations + +from common.sensors.utils import get_most_recent_dag_run + + +logger = logging.getLogger(__name__) + + +# Index settings that should not be copied over from the base configuration when +# creating a new index. +EXCLUDED_INDEX_SETTINGS = ["provided_name", "creation_date", "uuid", "version"] + + +# TODO: This can be removed in favor of the utility in +# https://github.com/WordPress/openverse/pull/3482 +@task(retries=0) +def prevent_concurrency_with_dag(external_dag_id: str, **context): + """ + Prevent concurrency with the given external DAG, by failing + immediately if that DAG is running. + """ + + wait_for_dag = ExternalTaskSensor( + task_id=f"prevent_concurrency_with_{external_dag_id}", + external_dag_id=external_dag_id, + # Wait for the whole DAG, not just a part of it + external_task_id=None, + check_existence=False, + execution_date_fn=lambda _: get_most_recent_dag_run(external_dag_id), + mode="reschedule", + retries=0, + # Any "finished" state is sufficient for us to continue + allowed_states=[State.SUCCESS, State.FAILED], + ) + wait_for_dag.timeout = 0 + try: + wait_for_dag.execute(context) + except AirflowSensorTimeout: + raise ValueError(f"Concurrency check with {external_dag_id} failed.") + + +@task_group(group_id="prevent_concurrency") +def prevent_concurrency_with_dags(external_dag_ids: list[str]): + """Fail immediately if any of the given external dags are in progress.""" + # TODO: Double check if these need to be chained or if they can + # run concurrently + for dag_id in external_dag_ids: + prevent_concurrency_with_dag.override( + task_id=f"prevent_concurrency_with_{dag_id}" + )(dag_id) + + +@task +def get_index_name(media_type: str, index_suffix: str): + return f"{media_type}-{index_suffix}".lower() + + +@task.branch +def check_override_config(override): + if override: + # Skip the steps to fetch the current index configuration + # and merge changes in. + return "get_final_index_configuration" + + return "get_current_index_configuration" + + +@task +def get_current_index_configuration( + source_index: str, + es_host: str, +): + """ + Return the configuration for the current index, identified by the + `source_index` param. `source_index` may be either an index name + or an alias, but must uniquely identify one existing index or an + error will be raised. + """ + logger.info(es_host) + hook = ElasticsearchPythonHook(es_host) + es_conn = hook.get_conn + + response = es_conn.indices.get( + index=source_index, + # Return empty dict instead of throwing error if no index can be + # found. This lets us raise our own error. + ignore_unavailable=True, + ) + + if len(response) != 1: + raise AirflowException( + f"Index {source_index} could not be uniquely identified." + ) + + # The response has the form: + # { index_name: index_configuration } + # However, since `source_index` can be an alias rather than the index name, + # we can not reliably know the value of the key. We instead get the first + # value, knowing that we have already ensured in a previous check that there + # is exactly one value in the response. + config = next(iter(response.values())) + return config + + +@task +def merge_index_configurations(new_index_config, current_index_config): + """ + Merge the `new_index_config` into the `current_index_config`, and + return an index configuration in the appropriate format for being + passed to the `create_index` API. + """ + # Do not automatically apply any aliases to the new index + current_index_config.pop("aliases") + + # Remove fields from the current_index_config that should not be copied + # over into the new index + for setting in EXCLUDED_INDEX_SETTINGS: + current_index_config.get("settings", {}).get("index", {}).pop(setting) + + # Merge the new configuration values into the current configuration + return merge_configurations(current_index_config, new_index_config) + + +@task +def get_final_index_configuration( + override_config: bool, + # The new config which was passed in via DAG params + index_config, + # The result of merging the index_config with the current index config. + # This may be None if the merge tasks were skipped. + merged_config, + index_name: str, +): + """ + Resolve the final index configuration to be used in the `create_index` + task. + """ + config = index_config if override_config else merged_config + + # Apply the index name + config["index"] = index_name + return config + + +@task +def create_index(index_config, es_host: str): + hook = ElasticsearchPythonHook(es_host) + es_conn = hook.get_conn + + new_index = es_conn.indices.create(**index_config) + + return new_index + + +@task_group(group_id="trigger_and_wait_for_reindex") +def trigger_and_wait_for_reindex( + index_name: str, source_index: str, query: dict, es_host: str +): + @task + def trigger_reindex(index_name: str, source_index: str, query: dict, es_host: str): + hook = ElasticsearchPythonHook(es_host) + es_conn = hook.get_conn + + source = {"index": source_index} + # An empty query is not accepted; only pass it + # if a query was actually supplied + if query: + source["query"] = query + + response = es_conn.reindex( + source=source, + dest={"index": index_name}, + # Parallelize indexing + slices="auto", + # Do not hold the slot while awaiting completion + wait_for_completion=False, + # Immediately refresh the index after completion to make + # the data available for search + refresh=True, + ) + + return response["task"] + + def _wait_for_reindex(task_id: str, es_host: str): + hook = ElasticsearchPythonHook(es_host) + es_conn = hook.get_conn + + response = es_conn.tasks.get(task_id=task_id) + + return response.get("completed") + + trigger_reindex_task = trigger_reindex(index_name, source_index, query, es_host) + + PythonSensor( + task_id="wait_for_reindex", + python_callable=_wait_for_reindex, + op_kwargs={"task_id": trigger_reindex_task, "es_host": es_host}, + ) diff --git a/catalog/dags/es/create_new_es_index/create_new_es_index_dag.py b/catalog/dags/es/create_new_es_index/create_new_es_index_dag.py new file mode 100644 index 00000000000..d653fcb0724 --- /dev/null +++ b/catalog/dags/es/create_new_es_index/create_new_es_index_dag.py @@ -0,0 +1,150 @@ +""" +# Create New Index DAG + +TODO: Docstring +""" +import logging +import os + +from airflow import DAG +from airflow.models.param import Param +from es.create_new_es_index import create_new_es_index as es +from es.create_new_es_index.create_new_es_index_types import ( + CREATE_NEW_INDEX_CONFIGS, + CreateNewIndex, +) + +from common.constants import AUDIO, DAG_DEFAULT_ARGS, MEDIA_TYPES + + +logger = logging.getLogger(__name__) + + +def create_new_es_index_dag(config: CreateNewIndex): + dag = DAG( + dag_id=config.dag_id, + default_args=DAG_DEFAULT_ARGS, + schedule=None, + max_active_runs=1, + catchup=False, + doc_md=__doc__, + tags=["elasticsearch"], + render_template_as_native_obj=True, + params={ + "media_type": Param( + default=AUDIO, + enum=MEDIA_TYPES, + description="The media type for which to create the index.", + ), + "index_config": Param( + default={}, + type=["object"], + description=( + "A JSON object containing the configuration for the new index." + " The values in this object will be merged with the existing" + " configuration, where the value specified at a leaf key in the" + " object will override the existing value (see Merging policy in" + " the DAG docs). This can also be the entire index configuration," + " in which case the existing configuration will be replaced entirely" + " (see override_config parameter below)." + ), + ), + "index_suffix": Param( + default=None, + type=["string", "null"], + description=( + "The name suffix of the new index to create. This will be a string," + " and will be used to name the index in Elasticsearch of the form" + " {media_type}-{index_suffix}. If not provided, the suffix will be a" + " timestamp of the form YYYYMMDDHHMMSS." + ), + ), + "source_index": Param( + default=None, + type=["string", "null"], + description=( + "The existing index on Elasticsearch to use as the basis for the new" + " index. If not provided, the index aliased to media_type will be used" + " (e.g. image for the image media type)." + ), + ), + "query": Param( + default={}, + type=["object"], + description=( + "An optional Elasticsearch query to use to filter the documents to be" + " copied to the new index. If not provided, all documents will be" + " copied." + ), + ), + "override_config": Param( + default=False, + type="boolean", + description=( + " A boolean value which can be toggled to replace the existing index" + " configuration entirely with the new configuration. If True, the" + " index_config parameter will be used as the entire configuration. If" + " False, the index_config parameter will be merged with the existing" + " configuration." + ), + ), + }, + ) + + # TODO: separate variables were necessary because we can't just get the value of + # Airflow connection vars, they get interpreted as Connection objects + es_host = os.getenv(f"ELASTICSEARCH_HTTP_{config.environment.upper()}") + + with dag: + prevent_concurrency = es.prevent_concurrency_with_dags(config.blocking_dags) + + index_name = es.get_index_name( + media_type="{{ params.media_type }}", + index_suffix="{{ params.index_suffix or ts_nodash }}", + ) + + check_override = es.check_override_config( + override="{{ params.override_config }}" + ) + + current_index_config = es.get_current_index_configuration( + source_index="{{ params.source_index or params.media_type }}", + es_host=es_host, + ) + + merged_index_config = es.merge_index_configurations( + new_index_config="{{ params.index_config }}", + current_index_config=current_index_config, + ) + + final_index_config = es.get_final_index_configuration( + override_config="{{ params.override_config }}", + index_config="{{ params.index_config }}", + # May resolve to None if tasks were skipped + merged_config=merged_index_config, + index_name=index_name, + ) + + create_new_index = es.create_index( + index_config=final_index_config, es_host=es_host + ) + + reindex = es.trigger_and_wait_for_reindex( + index_name=index_name, + source_index="{{ params.source_index or params.media_type }}", + query="{{ params.query }}", + es_host=es_host, + ) + + # Set up dependencies + prevent_concurrency >> index_name + index_name >> check_override >> [current_index_config, final_index_config] + current_index_config >> merged_index_config >> final_index_config + final_index_config >> create_new_index >> reindex + + return dag + + +for config in CREATE_NEW_INDEX_CONFIGS: + # Generate the DAG for this environment + globals()[config.dag_id] = create_new_es_index_dag(config) diff --git a/catalog/dags/es/create_new_es_index/create_new_es_index_types.py b/catalog/dags/es/create_new_es_index/create_new_es_index_types.py new file mode 100644 index 00000000000..4091d72d931 --- /dev/null +++ b/catalog/dags/es/create_new_es_index/create_new_es_index_types.py @@ -0,0 +1,48 @@ +from dataclasses import dataclass, field + +from es.recreate_staging_index.recreate_full_staging_index import ( + DAG_ID as RECREATE_STAGING_INDEX_DAG_ID, +) + +from data_refresh.data_refresh_types import DATA_REFRESH_CONFIGS + + +@dataclass +class CreateNewIndex: + """ + Configuration object for the create_new_es_index DAG. + + Required Constructor Arguments: + + environment: str representation of the environment in which to create + the new index + blocking_dags: list of dags with which to prevent concurrency; the + generated create_new_es_index dag will fail immediately if + any of these dags are running. + """ + + dag_id: str = field(init=False) + environment: str + blocking_dags: list + + def __post_init__(self): + self.dag_id = f"create_new_{self.environment}_es_index" + + +CREATE_NEW_INDEX_CONFIGS = [ + CreateNewIndex( + environment="staging", + blocking_dags=[RECREATE_STAGING_INDEX_DAG_ID], + ), + CreateNewIndex( + environment="production", + blocking_dags=( + # Block on all the data refreshes + [data_refresh.dag_id for data_refresh in DATA_REFRESH_CONFIGS.values()] + + [ # Block on the filtered index creation DAGs + data_refresh.filtered_index_dag_id + for data_refresh in DATA_REFRESH_CONFIGS.values() + ] + ), + ), +] diff --git a/catalog/dags/es/create_new_es_index/utils.py b/catalog/dags/es/create_new_es_index/utils.py new file mode 100644 index 00000000000..4b16fda3e02 --- /dev/null +++ b/catalog/dags/es/create_new_es_index/utils.py @@ -0,0 +1,53 @@ +def merge_configurations(base_configuration, update_configuration): + """ + Return a new dictionary which is the result of merging the given + update_configuration dictionary into the base_configuration dictionary, + according to the following merge policy: + + A leaf key in the `update_configuration` overwrites the entire value + present in the `base_configuration` at that key. This is a naive + overwrite, such that a list value for example is completely + overwritten rather than appended to. For example: + + merge_configurations( + { + "parent_key": { + "leaf_key": ["value1", "value2"] + }, + "other_key": True, + }, + { + "parent_key": { + "leaf_key": ["value3",] + }, + } + ) + + will return: + + { + "parent_key": { + "leaf_key": ["value3",] + }, + "other_key": True + } + """ + merge_configuration = base_configuration.copy() + for key, val in update_configuration.items(): + if ( + # The key is not present in the base configuration, so + # we are adding a new value rather than updating an existing + # one and can simply add it. + key not in base_configuration + or + # This is a leaf node. + not isinstance(val, dict) + ): + merge_configuration[key] = val + else: + # Recurse down to the leaf nodes. + merge_configuration[key] = merge_configurations( + base_configuration[key], update_configuration[key] + ) + + return merge_configuration