-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add create_new_es_index DAGs #3537
Merged
Merged
Changes from all commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
a1659aa
Add create_new_es_index DAG
stacimc 74a39f1
Use new utility
stacimc 3570b65
Add tests for merge_configurations
stacimc 76838f3
Add merge index tests
stacimc 2e48665
Add docs
stacimc 4f793ee
Configure timeouts
stacimc de0023d
Prevent concurrency with other DAGs
stacimc 8ecfd33
Fix typo, trigger rule
stacimc 9cc9b1d
Set es_host in types
stacimc e94a0a6
Use existing env variables
stacimc 790b580
Fix warning
stacimc b5fd105
Get es_host in Airflow task
stacimc ca32860
Add requests_per_second using Variable but configurable by environment
stacimc ebd5901
Use more precise error
stacimc fb0d334
Update docstring
stacimc ebe828b
Move import to local imports section
stacimc 4f8c9e5
Add default value for variable
stacimc 6c7c193
Rename module to elasticsearch_cluster to disambiguate
stacimc 16aadf7
Deserialize variable to be safe
stacimc File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
195 changes: 195 additions & 0 deletions
195
catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
import logging | ||
from datetime import timedelta | ||
|
||
from airflow.decorators import task, task_group | ||
from airflow.models.connection import Connection | ||
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook | ||
from airflow.sensors.python import PythonSensor | ||
|
||
from common.constants import REFRESH_POKE_INTERVAL | ||
from elasticsearch_cluster.create_new_es_index.utils import merge_configurations | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
# Index settings that should not be copied over from the base configuration when | ||
# creating a new index. | ||
EXCLUDED_INDEX_SETTINGS = ["provided_name", "creation_date", "uuid", "version"] | ||
|
||
GET_FINAL_INDEX_CONFIG_TASK_NAME = "get_final_index_configuration" | ||
GET_CURRENT_INDEX_CONFIG_TASK_NAME = "get_current_index_configuration" | ||
|
||
|
||
@task | ||
def get_es_host(environment: str): | ||
conn = Connection.get_connection_from_secrets(f"elasticsearch_http_{environment}") | ||
return conn.host | ||
|
||
|
||
@task | ||
def get_index_name(media_type: str, index_suffix: str): | ||
return f"{media_type}-{index_suffix}".lower() | ||
|
||
|
||
@task.branch | ||
def check_override_config(override): | ||
if override: | ||
# Skip the steps to fetch the current index configuration | ||
# and merge changes in. | ||
return GET_FINAL_INDEX_CONFIG_TASK_NAME | ||
|
||
return GET_CURRENT_INDEX_CONFIG_TASK_NAME | ||
|
||
|
||
@task | ||
def get_current_index_configuration( | ||
source_index: str, | ||
es_host: str, | ||
): | ||
""" | ||
Return the configuration for the current index, identified by the | ||
`source_index` param. `source_index` may be either an index name | ||
or an alias, but must uniquely identify one existing index or an | ||
error will be raised. | ||
""" | ||
es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn | ||
|
||
response = es_conn.indices.get( | ||
index=source_index, | ||
# Return empty dict instead of throwing error if no index can be | ||
# found. We raise our own error instead. | ||
ignore_unavailable=True, | ||
) | ||
|
||
if len(response) != 1: | ||
raise ValueError(f"Index {source_index} could not be uniquely identified.") | ||
|
||
# The response has the form: | ||
# { index_name: index_configuration } | ||
# However, since `source_index` can be an alias rather than the index name, | ||
# we do not necessarily know the index_name so we cannot access the configuration | ||
# directly by key. We instead get the first value from the dict, knowing that we | ||
# have already ensured in a previous check that there is exactly one value in the | ||
# response. | ||
config = next(iter(response.values())) | ||
return config | ||
|
||
|
||
@task | ||
def merge_index_configurations(new_index_config, current_index_config): | ||
""" | ||
Merge the `new_index_config` into the `current_index_config`, and | ||
return an index configuration in the appropriate format for being | ||
passed to the `create_index` API. | ||
""" | ||
# Do not automatically apply any aliases to the new index | ||
current_index_config.pop("aliases") | ||
|
||
# Remove fields from the current_index_config that should not be copied | ||
# over into the new index (such as uuid) | ||
for setting in EXCLUDED_INDEX_SETTINGS: | ||
current_index_config.get("settings", {}).get("index", {}).pop(setting) | ||
|
||
# Merge the new configuration values into the current configuration | ||
return merge_configurations(current_index_config, new_index_config) | ||
|
||
|
||
@task | ||
def get_final_index_configuration( | ||
override_config: bool, | ||
index_config, | ||
merged_config, | ||
index_name: str, | ||
): | ||
""" | ||
Resolve the final index configuration to be used in the `create_index` | ||
task. | ||
|
||
Required arguments: | ||
|
||
override_config: Whether the index_config should be used instead of | ||
the merged_config | ||
index_config: The new index configuration which was passed in via | ||
DAG params | ||
merged_config: The result of merging the index_config with the current | ||
index configuration. This may be None if the merge | ||
tasks were skipped using the override param. | ||
index_name: Name of the index to update. | ||
""" | ||
config = index_config if override_config else merged_config | ||
|
||
# Apply the desired index name | ||
config["index"] = index_name | ||
return config | ||
|
||
|
||
@task | ||
def create_index(index_config, es_host: str): | ||
es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn | ||
|
||
new_index = es_conn.indices.create(**index_config) | ||
|
||
return new_index | ||
|
||
|
||
@task_group(group_id="trigger_and_wait_for_reindex") | ||
def trigger_and_wait_for_reindex( | ||
index_name: str, | ||
source_index: str, | ||
query: dict, | ||
timeout: timedelta, | ||
requests_per_second: int, | ||
es_host: str, | ||
): | ||
@task | ||
def trigger_reindex( | ||
index_name: str, | ||
source_index: str, | ||
query: dict, | ||
requests_per_second: int, | ||
es_host: str, | ||
): | ||
es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn | ||
|
||
source = {"index": source_index} | ||
# An empty query is not accepted; only pass it | ||
# if a query was actually supplied | ||
if query: | ||
source["query"] = query | ||
|
||
response = es_conn.reindex( | ||
source=source, | ||
dest={"index": index_name}, | ||
# Parallelize indexing | ||
slices="auto", | ||
# Do not hold the slot while awaiting completion | ||
wait_for_completion=False, | ||
# Immediately refresh the index after completion to make | ||
# the data available for search | ||
refresh=True, | ||
# Throttle | ||
requests_per_second=requests_per_second, | ||
) | ||
|
||
return response["task"] | ||
|
||
def _wait_for_reindex(task_id: str, es_host: str): | ||
es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn | ||
|
||
response = es_conn.tasks.get(task_id=task_id) | ||
return response.get("completed") | ||
|
||
trigger_reindex_task = trigger_reindex( | ||
index_name, source_index, query, requests_per_second, es_host | ||
) | ||
|
||
wait_for_reindex = PythonSensor( | ||
task_id="wait_for_reindex", | ||
python_callable=_wait_for_reindex, | ||
timeout=timeout, | ||
poke_interval=REFRESH_POKE_INTERVAL, | ||
op_kwargs={"task_id": trigger_reindex_task, "es_host": es_host}, | ||
) | ||
|
||
trigger_reindex_task >> wait_for_reindex |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is excellent and so simple!