Skip to content

Commit

Permalink
Add create_new_es_index DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
stacimc committed Dec 16, 2023
1 parent 5fe9fea commit 99e1cc5
Show file tree
Hide file tree
Showing 6 changed files with 460 additions and 1 deletion.
2 changes: 1 addition & 1 deletion catalog/dags/data_refresh/create_filtered_index_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def prevent_concurrency_with_data_refresh(**context):
)

with DAG(
dag_id=f"create_filtered_{media_type}_index",
dag_id=data_refresh.filtered_index_dag_id,
default_args=DAG_DEFAULT_ARGS,
schedule=None,
start_date=datetime(2023, 4, 1),
Expand Down
2 changes: 2 additions & 0 deletions catalog/dags/data_refresh/data_refresh_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class DataRefresh:
"""

dag_id: str = field(init=False)
filtered_index_dag_id: str = field(init=False)
media_type: str
start_date: datetime = datetime(2020, 1, 1)
schedule: str | None = "0 0 * * 1" # Mondays 00:00 UTC
Expand All @@ -69,6 +70,7 @@ class DataRefresh:

def __post_init__(self):
self.dag_id = f"{self.media_type}_data_refresh"
self.filtered_index_dag_id = f"create_filtered_{self.media_type}_index"


DATA_REFRESH_CONFIGS = {
Expand Down
206 changes: 206 additions & 0 deletions catalog/dags/es/create_new_es_index/create_new_es_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
import logging

from airflow.decorators import task, task_group
from airflow.exceptions import AirflowException, AirflowSensorTimeout
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.python import PythonSensor
from airflow.utils.state import State
from es.create_new_es_index.utils import merge_configurations

from common.sensors.utils import get_most_recent_dag_run


logger = logging.getLogger(__name__)


# Index settings that should not be copied over from the base configuration when
# creating a new index.
EXCLUDED_INDEX_SETTINGS = ["provided_name", "creation_date", "uuid", "version"]


# TODO: This can be removed in favor of the utility in
# https://github.com/WordPress/openverse/pull/3482
@task(retries=0)
def prevent_concurrency_with_dag(external_dag_id: str, **context):
"""
Prevent concurrency with the given external DAG, by failing
immediately if that DAG is running.
"""

wait_for_dag = ExternalTaskSensor(
task_id=f"prevent_concurrency_with_{external_dag_id}",
external_dag_id=external_dag_id,
# Wait for the whole DAG, not just a part of it
external_task_id=None,
check_existence=False,
execution_date_fn=lambda _: get_most_recent_dag_run(external_dag_id),
mode="reschedule",
retries=0,
# Any "finished" state is sufficient for us to continue
allowed_states=[State.SUCCESS, State.FAILED],
)
wait_for_dag.timeout = 0
try:
wait_for_dag.execute(context)
except AirflowSensorTimeout:
raise ValueError(f"Concurrency check with {external_dag_id} failed.")


@task_group(group_id="prevent_concurrency")
def prevent_concurrency_with_dags(external_dag_ids: list[str]):
"""Fail immediately if any of the given external dags are in progress."""
# TODO: Double check if these need to be chained or if they can
# run concurrently
for dag_id in external_dag_ids:
prevent_concurrency_with_dag.override(
task_id=f"prevent_concurrency_with_{dag_id}"
)(dag_id)


@task
def get_index_name(media_type: str, index_suffix: str):
return f"{media_type}-{index_suffix}".lower()


@task.branch
def check_override_config(override):
if override:
# Skip the steps to fetch the current index configuration
# and merge changes in.
return "get_final_index_configuration"

return "get_current_index_configuration"


@task
def get_current_index_configuration(
source_index: str,
es_host: str,
):
"""
Return the configuration for the current index, identified by the
`source_index` param. `source_index` may be either an index name
or an alias, but must uniquely identify one existing index or an
error will be raised.
"""
logger.info(es_host)
hook = ElasticsearchPythonHook(es_host)
es_conn = hook.get_conn

response = es_conn.indices.get(
index=source_index,
# Return empty dict instead of throwing error if no index can be
# found. This lets us raise our own error.
ignore_unavailable=True,
)

if len(response) != 1:
raise AirflowException(
f"Index {source_index} could not be uniquely identified."
)

# The response has the form:
# { index_name: index_configuration }
# However, since `source_index` can be an alias rather than the index name,
# we can not reliably know the value of the key. We instead get the first
# value, knowing that we have already ensured in a previous check that there
# is exactly one value in the response.
config = next(iter(response.values()))
return config


@task
def merge_index_configurations(new_index_config, current_index_config):
"""
Merge the `new_index_config` into the `current_index_config`, and
return an index configuration in the appropriate format for being
passed to the `create_index` API.
"""
# Do not automatically apply any aliases to the new index
current_index_config.pop("aliases")

# Remove fields from the current_index_config that should not be copied
# over into the new index
for setting in EXCLUDED_INDEX_SETTINGS:
current_index_config.get("settings", {}).get("index", {}).pop(setting)

# Merge the new configuration values into the current configuration
return merge_configurations(current_index_config, new_index_config)


@task
def get_final_index_configuration(
override_config: bool,
# The new config which was passed in via DAG params
index_config,
# The result of merging the index_config with the current index config.
# This may be None if the merge tasks were skipped.
merged_config,
index_name: str,
):
"""
Resolve the final index configuration to be used in the `create_index`
task.
"""
config = index_config if override_config else merged_config

# Apply the index name
config["index"] = index_name
return config


@task
def create_index(index_config, es_host: str):
hook = ElasticsearchPythonHook(es_host)
es_conn = hook.get_conn

new_index = es_conn.indices.create(**index_config)

return new_index


@task_group(group_id="trigger_and_wait_for_reindex")
def trigger_and_wait_for_reindex(
index_name: str, source_index: str, query: dict, es_host: str
):
@task
def trigger_reindex(index_name: str, source_index: str, query: dict, es_host: str):
hook = ElasticsearchPythonHook(es_host)
es_conn = hook.get_conn

source = {"index": source_index}
# An empty query is not accepted; only pass it
# if a query was actually supplied
if query:
source["query"] = query

response = es_conn.reindex(
source=source,
dest={"index": index_name},
# Parallelize indexing
slices="auto",
# Do not hold the slot while awaiting completion
wait_for_completion=False,
# Immediately refresh the index after completion to make
# the data available for search
refresh=True,
)

return response["task"]

def _wait_for_reindex(task_id: str, es_host: str):
hook = ElasticsearchPythonHook(es_host)
es_conn = hook.get_conn

response = es_conn.tasks.get(task_id=task_id)

return response.get("completed")

trigger_reindex_task = trigger_reindex(index_name, source_index, query, es_host)

PythonSensor(
task_id="wait_for_reindex",
python_callable=_wait_for_reindex,
op_kwargs={"task_id": trigger_reindex_task, "es_host": es_host},
)
150 changes: 150 additions & 0 deletions catalog/dags/es/create_new_es_index/create_new_es_index_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
"""
# Create New Index DAG
TODO: Docstring
"""
import logging
import os

from airflow import DAG
from airflow.models.param import Param
from es.create_new_es_index import create_new_es_index as es
from es.create_new_es_index.create_new_es_index_types import (
CREATE_NEW_INDEX_CONFIGS,
CreateNewIndex,
)

from common.constants import AUDIO, DAG_DEFAULT_ARGS, MEDIA_TYPES


logger = logging.getLogger(__name__)


def create_new_es_index_dag(config: CreateNewIndex):
dag = DAG(
dag_id=config.dag_id,
default_args=DAG_DEFAULT_ARGS,
schedule=None,
max_active_runs=1,
catchup=False,
doc_md=__doc__,
tags=["elasticsearch"],
render_template_as_native_obj=True,
params={
"media_type": Param(
default=AUDIO,
enum=MEDIA_TYPES,
description="The media type for which to create the index.",
),
"index_config": Param(
default={},
type=["object"],
description=(
"A JSON object containing the configuration for the new index."
" The values in this object will be merged with the existing"
" configuration, where the value specified at a leaf key in the"
" object will override the existing value (see Merging policy in"
" the DAG docs). This can also be the entire index configuration,"
" in which case the existing configuration will be replaced entirely"
" (see override_config parameter below)."
),
),
"index_suffix": Param(
default=None,
type=["string", "null"],
description=(
"The name suffix of the new index to create. This will be a string,"
" and will be used to name the index in Elasticsearch of the form"
" {media_type}-{index_suffix}. If not provided, the suffix will be a"
" timestamp of the form YYYYMMDDHHMMSS."
),
),
"source_index": Param(
default=None,
type=["string", "null"],
description=(
"The existing index on Elasticsearch to use as the basis for the new"
" index. If not provided, the index aliased to media_type will be used"
" (e.g. image for the image media type)."
),
),
"query": Param(
default={},
type=["object"],
description=(
"An optional Elasticsearch query to use to filter the documents to be"
" copied to the new index. If not provided, all documents will be"
" copied."
),
),
"override_config": Param(
default=False,
type="boolean",
description=(
" A boolean value which can be toggled to replace the existing index"
" configuration entirely with the new configuration. If True, the"
" index_config parameter will be used as the entire configuration. If"
" False, the index_config parameter will be merged with the existing"
" configuration."
),
),
},
)

# TODO: separate variables were necessary because we can't just get the value of
# Airflow connection vars, they get interpreted as Connection objects
es_host = os.getenv(f"ELASTICSEARCH_HTTP_{config.environment.upper()}")

with dag:
prevent_concurrency = es.prevent_concurrency_with_dags(config.blocking_dags)

index_name = es.get_index_name(
media_type="{{ params.media_type }}",
index_suffix="{{ params.index_suffix or ts_nodash }}",
)

check_override = es.check_override_config(
override="{{ params.override_config }}"
)

current_index_config = es.get_current_index_configuration(
source_index="{{ params.source_index or params.media_type }}",
es_host=es_host,
)

merged_index_config = es.merge_index_configurations(
new_index_config="{{ params.index_config }}",
current_index_config=current_index_config,
)

final_index_config = es.get_final_index_configuration(
override_config="{{ params.override_config }}",
index_config="{{ params.index_config }}",
# May resolve to None if tasks were skipped
merged_config=merged_index_config,
index_name=index_name,
)

create_new_index = es.create_index(
index_config=final_index_config, es_host=es_host
)

reindex = es.trigger_and_wait_for_reindex(
index_name=index_name,
source_index="{{ params.source_index or params.media_type }}",
query="{{ params.query }}",
es_host=es_host,
)

# Set up dependencies
prevent_concurrency >> index_name
index_name >> check_override >> [current_index_config, final_index_config]
current_index_config >> merged_index_config >> final_index_config
final_index_config >> create_new_index >> reindex

return dag


for config in CREATE_NEW_INDEX_CONFIGS:
# Generate the DAG for this environment
globals()[config.dag_id] = create_new_es_index_dag(config)
Loading

0 comments on commit 99e1cc5

Please sign in to comment.