-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add create_new_es_index DAGs #3537
Conversation
a6aec62
to
99e1cc5
Compare
99e1cc5
to
234d4eb
Compare
Full-stack documentation: https://docs.openverse.org/_preview/3537 Please note that GitHub pages takes a little time to deploy newly pushed code, if the links above don't work or you see old versions, wait 5 minutes and try again. You can check the GitHub pages deployment action list to see the current status of the deployments. Changed files 🔄: |
19b756a
to
db83bf0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks exciting! I have yet to review and test it in more detail, but regarding the new environment variables, at least I don't see them in the env.template
file. Shouldn't they be added there?
Oh thank you for this! I wanted to use the existing I had forgotten about that detail but now it's fixed and we can reuse the existing conn ids :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! I tested several cases and all worked as expected. Thanks for the detailed testing instructions as always ⭐ I left some minor suggestions but nothing blocking. It's very cool to see this DAG come to live after so much planning and work put on the conceptual definition!
Last thing I want to mention is if we can change the catalog/dags/es
folder for the full name, elasticsearch
. For someone new, having the full name would be much more easy to grasp.
), | ||
"index_config": Param( | ||
default={}, | ||
type=["object"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this param is optional, it is better to indicate it here so it prevents adding the red * in the form and avoids confusion for thinking maybe it's required (same for query
).
type=["object"], | |
type=["object", "null"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
query
is unambiguously optional, but index_config
isn't really, although I think it's on the line. It's just that the empty config, which is also the default, is valid (although not a common expected use case as far as I know).
# Do not automatically apply any aliases to the new index | ||
current_index_config.pop("aliases") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well spotted!
I'm not opposed, and I do see your point. But |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is so fantastic! It's incredible to see us start building the power to control Elasticsearch directly in Airflow, without having to go through the ingestion server 🚀 I have a number of questions, nothing blocking but a few changes might be good before we merge.
|
||
|
||
@task_group(group_id="prevent_concurrency") | ||
def prevent_concurrency_with_dags(external_dag_ids: list[str]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is excellent and so simple!
@@ -0,0 +1,53 @@ | |||
def merge_configurations(base_configuration, update_configuration): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will likely be useful for #3240 too - would you mind putting it under a common/es.py
module or something similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could -- I was planning on just pulling things out as they're needed by the new DAGs rather than trying to anticipate exactly what would be needed here. This is an example of something I wouldn't have guessed we would use for the filtered index DAGs, actually -- maybe the tasks for triggering the reindex, but I didn't think we had any plans for merging configurations elsewhere.
catalog/dags/es/create_new_es_index/create_new_es_index_types.py
Outdated
Show resolved
Hide resolved
[data_refresh.dag_id for data_refresh in DATA_REFRESH_CONFIGS.values()] | ||
+ [ # Block on the filtered index creation DAGs | ||
data_refresh.filtered_index_dag_id | ||
for data_refresh in DATA_REFRESH_CONFIGS.values() | ||
] | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Loveeee this 🤩
|
||
|
||
@task | ||
def get_current_index_configuration( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is another function I could totally see moving into a common.es
module!
Based on the medium urgency of this PR, the following reviewers are being gently reminded to review this PR: @obulat Excluding weekend1 days, this PR was ready for review 8 day(s) ago. PRs labelled with medium urgency are expected to be reviewed within 4 weekday(s)2. @stacimc, if this PR is not ready for a review, please draft it to prevent reviewers from getting further unnecessary pings. Footnotes
|
2f204c7
to
37821b7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really close! I think just one more change is necessary given the new variable. That variable will also be queried at DAG parsing time as well, though again I'm not sure the best way to avoid it given we want it as part of the config. Maybe since it's called as a parameter to an Airflow tasks, we can default it to the jinja rendering for calling that Variable, e.g. "{{ var.json.ES_INDEX_THROTTLING_RATE }}"
?
@task | ||
def get_es_host(environment: str): | ||
conn = Connection.get_connection_from_secrets(f"elasticsearch_http_{environment}") | ||
return conn.host |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least this was an easy change!
catalog/dags/es/create_new_es_index/create_new_es_index_types.py
Outdated
Show resolved
Hide resolved
37821b7
to
6c7c193
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome! 🥳
Fixes
Fixes #2372 by @AetherUnbound
Description
This PR adds the
create_new_<environment>_es_index
DAGs for staging and production, as described in this IP.The DAGs can be used to create a new ES index, based on an existing index configuration, with multiple customization options. See the linked description in the IP for params and more.
It also updates some existing DAGs to prevent concurrency issues (see cases given in Testing Instructions).
Testing Instructions
Make sure you have the following in your
catalog/.env
:Testing the conf options
Defaults
Trigger the
create_new_staging_es_index
DAG and use all default options (make no changes to the conf). All steps should pass. In Elasticvue, you should see a newaudio
index with a timestamp suffix, something like "audio-20240108t215049". As it uses theaudio
index as a source, it should have 5000 docs.Click the wheel icon and
![Screenshot 2024-01-08 at 1 54 50 PM](https://private-user-images.githubusercontent.com/63313398/295045603-a16374f2-8b3e-42ca-b5b2-e3f5789e7d3d.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3Mzg5ODE1ODEsIm5iZiI6MTczODk4MTI4MSwicGF0aCI6Ii82MzMxMzM5OC8yOTUwNDU2MDMtYTE2Mzc0ZjItOGIzZS00MmNhLWI1YjItZTNmNTc4OWU3ZDNkLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAyMDglMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMjA4VDAyMjEyMVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTE3ZjUyNmZjNDY5YTlhYjI5OWFkODU5MTI4NzY3NDUzODc0YzNlNzAyY2ZkMDUwNTgzN2RkNjQzMDk2M2ExMTcmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.EOuBiYZrHqMclX2mbhtwPo7xjs6MckJdGhqxxn_wsJw)
Show info
to see the index configuration:Compare it to the index configuration for the main
audio
index. They should be identical, except the new index has no aliases applied. The "provided_name", "creation_date", "uuid", and "version" are also index-specific and will differ.Using a different source index
Trigger the DAG again, but this time make these conf updates:
media_type
to "image"source_index
to "image-init-filtered" (Check the name of your filtered image index in Elasticvue first. If you're not using a freshly recreated environment, it might have a different name)index_suffix
to "filtered-test"All tasks should pass. Now check Elasticvue. You should have a new index named
image-filtered-test
(using the suffix you provided). It should have no aliases. It should have 4656 docs (or however many docs are in your local filtered image index). Again, use theShow info
button to check the index configuration and ensure it matches that of the filtered image index.Using an alias as
source_index
You can also pass an alias (rather than a name) to
source_index
. Use these conf updates for the next DagRun:media_type
to "image"source_index
to "image-filtered" (the alias for the filtered image index)index_suffix
to "filtered-alias-test"The results should be the same as the previous test.
Update index configuration
Try making some updates to the index configuration. Refer to the DAG docs in this PR for an explanation of the merging policy and how to make updates. Here's an example that I used for the
index_config
DAG conf option:Run the DAG and then check the new index in Elasticvue. Go to
Show info
and verify only the expected changes were made. Note that therules
for the stem_overrides get totally overwritten.Completely override index configuration
This is an option that lets you completely overwrite the index configuration, rather than merging changes into the config of the source index.
Trigger the DAG with the following conf updates:
index_suffix
to "test-override"index_config
to a complete index configuration. Here's a very simplified one I used:override_config
optionRun the DAG. See that the
get_current_index_configuration
andmerge_index_configurations
steps are skipped. Check your new index in Elasticvue and see that it uses only the supplied config.Testing the
query
optionThis option lets you supply an Elasticsearch query that is used to filter documents from the source index while adding them to the new index. I used these DAG confs to test creating a new index based on the full audio index, but filtering out records that have 'bird' in the tags, title, or description:
All tasks should pass. The new index should have documents filtered appropriately. In my case this resulted in 4938 docs.
Testing concurrency concerns
When testing what happens when two DAGs are started right after one another, it's easiest to trigger them through the Airflow CLI (rather than scrambling around in the UI to try to trigger them as fast as possible). First, make sure that all the relevant DAGs are turned on in your local Airflow. Then to trigger them, run
just catalog/shell
and then trigger them with a command like this:Use this to test the scenarios below. A general tip for remembering how the concurrency checks should work: scheduled DAGs (data refreshes, staging database restore) will wait if a conflicting DAG is running, and resume once it completes. DAGs that are only run manually (create_new_es_index, create_filtered_index, recreate_full_staging_index) fail immediately if a conflicting DAG is running.
Staging DAG
staging_database_restore
staging_database_restore
DAG starts first, thecreate_new_staging_es_index
DAG should fail immediately. Thestaging_database_restore
should continue (it will fail locally but that's okay as long as it gets past the Sensor step.)create_new_staging_es_index
DAG starts first it should run to completion. Thestaging_database_restore
DAG should wait on it, then proceeds when it completes.recreate_full_staging_index
Production DAG
audio_data_refresh
andimage_data_refresh
create_new_production_es_index
DAG should fail immediately.create_new_production_es_index
starts first, it should run to completion. The data refreshes should wait for it to complete before resuming.create_filtered_audio_index
andcreate_filtered_image_index
Checklist
Update index.md
).main
) or a parent feature branch.Developer Certificate of Origin
Developer Certificate of Origin