Skip to content

Commit

Permalink
Set es_host in types
Browse files Browse the repository at this point in the history
  • Loading branch information
stacimc committed Jan 9, 2024
1 parent 5505c05 commit db83bf0
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 14 deletions.
12 changes: 3 additions & 9 deletions catalog/dags/es/create_new_es_index/create_new_es_index_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@
```
"""
import logging
import os


from airflow import DAG
from airflow.models.param import Param
Expand Down Expand Up @@ -187,10 +185,6 @@ def create_new_es_index_dag(config: CreateNewIndex):
},
)

# TODO: separate variables were necessary because we can't just get the value of
# Airflow connection vars, they get interpreted as Connection objects
es_host = os.getenv(f"ELASTICSEARCH_HTTP_{config.environment.upper()}")

with dag:
prevent_concurrency = prevent_concurrency_with_dags(config.blocking_dags)

Expand All @@ -207,7 +201,7 @@ def create_new_es_index_dag(config: CreateNewIndex):
task_id=es.GET_CURRENT_INDEX_CONFIG_TASK_NAME
)(
source_index="{{ params.source_index or params.media_type }}",
es_host=es_host,
es_host=config.es_host,
)

merged_index_config = es.merge_index_configurations(
Expand All @@ -227,15 +221,15 @@ def create_new_es_index_dag(config: CreateNewIndex):
)

create_new_index = es.create_index(
index_config=final_index_config, es_host=es_host
index_config=final_index_config, es_host=config.es_host
)

reindex = es.trigger_and_wait_for_reindex(
index_name=index_name,
source_index="{{ params.source_index or params.media_type }}",
query="{{ params.query }}",
timeout=config.reindex_timeout,
es_host=es_host,
es_host=config.es_host,
)

# Set up dependencies
Expand Down
18 changes: 13 additions & 5 deletions catalog/dags/es/create_new_es_index/create_new_es_index_types.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os

from dataclasses import dataclass, field
from datetime import timedelta

Expand All @@ -19,21 +21,27 @@ class CreateNewIndex:
Required Constructor Arguments:
environment: str representation of the environment in which to create
the new index
blocking_dags: list of dags with which to prevent concurrency; the
generated create_new_es_index dag will fail immediately if
any of these dags are running.
environment: str representation of the environment in which to create
the new index
blocking_dags: list of dags with which to prevent concurrency; the
generated create_new_es_index dag will fail immediately if
any of these dags are running.
reindex_timeout: timedelta expressing maximum amount of time the reindexing
step may take
"""

dag_id: str = field(init=False)
es_host: str = field(init=False)
environment: str
blocking_dags: list
reindex_timeout: timedelta = timedelta(hours=12)

def __post_init__(self):
self.dag_id = f"create_new_{self.environment}_es_index"

# Get the appropriate connection information for this environment
self.es_host = os.getenv(f"ELASTICSEARCH_HTTP_{self.environment.upper()}")


CREATE_NEW_INDEX_CONFIGS = {
STAGING: CreateNewIndex(
Expand Down

0 comments on commit db83bf0

Please sign in to comment.