Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add create_new_es_index DAGs #3537

Merged
merged 19 commits into from
Jan 23, 2024
Merged

Add create_new_es_index DAGs #3537

merged 19 commits into from
Jan 23, 2024

Conversation

stacimc
Copy link
Collaborator

@stacimc stacimc commented Dec 16, 2023

Fixes

Fixes #2372 by @AetherUnbound

Description

Screenshot 2024-01-08 at 12 52 36 PM Screenshot 2024-01-03 at 4 21 13 PM

This PR adds the create_new_<environment>_es_index DAGs for staging and production, as described in this IP.

The DAGs can be used to create a new ES index, based on an existing index configuration, with multiple customization options. See the linked description in the IP for params and more.

It also updates some existing DAGs to prevent concurrency issues (see cases given in Testing Instructions).

Testing Instructions

Make sure you have the following in your catalog/.env:

ELASTICSEARCH_HTTP_PRODUCTION=http://es:9200
ELASTICSEARCH_HTTP_STAGING=http://es:9200

Testing the conf options

Defaults

Trigger the create_new_staging_es_index DAG and use all default options (make no changes to the conf). All steps should pass. In Elasticvue, you should see a new audio index with a timestamp suffix, something like "audio-20240108t215049". As it uses the audio index as a source, it should have 5000 docs.

Click the wheel icon and Show info to see the index configuration:
Screenshot 2024-01-08 at 1 54 50 PM

Compare it to the index configuration for the main audio index. They should be identical, except the new index has no aliases applied. The "provided_name", "creation_date", "uuid", and "version" are also index-specific and will differ.

Using a different source index

Trigger the DAG again, but this time make these conf updates:

  • Set media_type to "image"
  • Set source_index to "image-init-filtered" (Check the name of your filtered image index in Elasticvue first. If you're not using a freshly recreated environment, it might have a different name)
  • Set index_suffix to "filtered-test"

All tasks should pass. Now check Elasticvue. You should have a new index named image-filtered-test (using the suffix you provided). It should have no aliases. It should have 4656 docs (or however many docs are in your local filtered image index). Again, use the Show info button to check the index configuration and ensure it matches that of the filtered image index.

Using an alias as source_index

You can also pass an alias (rather than a name) to source_index. Use these conf updates for the next DagRun:

  • Set media_type to "image"
  • Set source_index to "image-filtered" (the alias for the filtered image index)
  • Set index_suffix to "filtered-alias-test"

The results should be the same as the previous test.

Update index configuration

Try making some updates to the index configuration. Refer to the DAG docs in this PR for an explanation of the merging policy and how to make updates. Here's an example that I used for the index_config DAG conf option:

{
    "mappings": {
        "properties": {
            "creator": {
                "type": "text",
                "fields": {
                    "keyword": {
                        "ignore_above": 100
                    }
                }
            }
        }
    },
    "settings": {
        "analysis": {
            "filter": {
                "stem_overrides": {
                  "type": "stemmer_override",
                  "rules": [
                    "foo => bar"
                  ]
                }
            }
        }
    }
}

Run the DAG and then check the new index in Elasticvue. Go to Show info and verify only the expected changes were made. Note that the rules for the stem_overrides get totally overwritten.

...
        "creator": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword", # Note this was not changed
              "ignore_above": 100 # Note this was updated
            }
          }
        },
...
            "stem_overrides": {
              "type": "stemmer_override", # Note this was not changed
              "rules": [
                "foo => bar" # Note this entire list was overwritten
              ]
            },

Completely override index configuration

This is an option that lets you completely overwrite the index configuration, rather than merging changes into the config of the source index.

Trigger the DAG with the following conf updates:

  • Set index_suffix to "test-override"
  • Set index_config to a complete index configuration. Here's a very simplified one I used:
  {
        "mappings": {
            "dynamic": "false",
            "properties": {
                "authority_boost": {
                    "type": "rank_feature"
                }
            }
        },
        "settings": {
            "index": {
                "number_of_replicas": "1",
                "number_of_shards": "1",
                "refresh_interval": "-1",
                "routing": {
                    "allocation": {
                        "include": {
                            "_tier_preference": "data_content"
                        }
                    }
                }
            }
        }
    }
  • Turn on the override_config option

Run the DAG. See that the get_current_index_configuration and merge_index_configurations steps are skipped. Check your new index in Elasticvue and see that it uses only the supplied config.

Testing the query option

This option lets you supply an Elasticsearch query that is used to filter documents from the source index while adding them to the new index. I used these DAG confs to test creating a new index based on the full audio index, but filtering out records that have 'bird' in the tags, title, or description:

{
    "index_config": {},
    "index_suffix": "test-query",
    "media_type": "audio",
    "override_config": true,
    "query": {
        "bool": {
            "must_not": [
                {
                    "multi_match": {
                        "query": "'bird",
                        "fields": ["tags.name", "title", "description"]
                    }
                }
            ]
        }
    },
    "source_index": null
}

All tasks should pass. The new index should have documents filtered appropriately. In my case this resulted in 4938 docs.

Testing concurrency concerns

When testing what happens when two DAGs are started right after one another, it's easiest to trigger them through the Airflow CLI (rather than scrambling around in the UI to try to trigger them as fast as possible). First, make sure that all the relevant DAGs are turned on in your local Airflow. Then to trigger them, run just catalog/shell and then trigger them with a command like this:

# For example, this would trigger the staging db restore and then immediately trigger the new staging index DAG
> airflow dags trigger staging_database_restore && airflow dags trigger create_new_staging_es_index

Use this to test the scenarios below. A general tip for remembering how the concurrency checks should work: scheduled DAGs (data refreshes, staging database restore) will wait if a conflicting DAG is running, and resume once it completes. DAGs that are only run manually (create_new_es_index, create_filtered_index, recreate_full_staging_index) fail immediately if a conflicting DAG is running.

Staging DAG

  • staging_database_restore
    • If the staging_database_restore DAG starts first, the create_new_staging_es_index DAG should fail immediately. The staging_database_restore should continue (it will fail locally but that's okay as long as it gets past the Sensor step.)
    • If the create_new_staging_es_index DAG starts first it should run to completion. The staging_database_restore DAG should wait on it, then proceeds when it completes.
  • recreate_full_staging_index
    • Whichever DAG starts first should run to completion. Whichever starts second should fail immediately.

Production DAG

  • audio_data_refresh and image_data_refresh
    • If either data refresh starts first, it should run to completion; meanwhile the create_new_production_es_index DAG should fail immediately.
    • If the create_new_production_es_index starts first, it should run to completion. The data refreshes should wait for it to complete before resuming.
  • create_filtered_audio_index and create_filtered_image_index
    • Whichever DAG starts first should run to completion. Whichever starts second (or third, or fourth) should fail immediately.

Checklist

  • My pull request has a descriptive title (not a vague title likeUpdate index.md).
  • My pull request targets the default branch of the repository (main) or a parent feature branch.
  • My commit messages follow best practices.
  • My code follows the established code style of the repository.
  • I added or updated tests for the changes I made (if applicable).
  • I added or updated documentation (if applicable).
  • I tried running the project locally and verified that there are no visible errors.
  • I ran the DAG documentation generator (if applicable).

Developer Certificate of Origin

Developer Certificate of Origin
Developer Certificate of Origin
Version 1.1

Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
1 Letterman Drive
Suite D4700
San Francisco, CA, 94129

Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.


Developer's Certificate of Origin 1.1

By making a contribution to this project, I certify that:

(a) The contribution was created in whole or in part by me and I
    have the right to submit it under the open source license
    indicated in the file; or

(b) The contribution is based upon previous work that, to the best
    of my knowledge, is covered under an appropriate open source
    license and I have the right under that license to submit that
    work with modifications, whether created in whole or in part
    by me, under the same open source license (unless I am
    permitted to submit under a different license), as indicated
    in the file; or

(c) The contribution was provided directly to me by some other
    person who certified (a), (b) or (c) and I have not modified
    it.

(d) I understand and agree that this project and the contribution
    are public and that a record of the contribution (including all
    personal information I submit with it, including my sign-off) is
    maintained indefinitely and may be redistributed consistent with
    this project or the open source license(s) involved.

@stacimc stacimc self-assigned this Dec 16, 2023
@stacimc stacimc force-pushed the add/create-new-es-index-dag branch from a6aec62 to 99e1cc5 Compare December 16, 2023 01:14
@openverse-bot openverse-bot added the 🚦 status: awaiting triage Has not been triaged & therefore, not ready for work label Dec 16, 2023
@github-actions github-actions bot added the 🧱 stack: catalog Related to the catalog and Airflow DAGs label Dec 16, 2023
@obulat obulat added 🟨 priority: medium Not blocking but should be addressed soon 🌟 goal: addition Addition of new feature 💻 aspect: code Concerns the software code in the repository and removed 🚦 status: awaiting triage Has not been triaged & therefore, not ready for work labels Dec 28, 2023
@stacimc stacimc force-pushed the add/create-new-es-index-dag branch from 99e1cc5 to 234d4eb Compare January 4, 2024 00:18
Copy link

github-actions bot commented Jan 4, 2024

Full-stack documentation: https://docs.openverse.org/_preview/3537

Please note that GitHub pages takes a little time to deploy newly pushed code, if the links above don't work or you see old versions, wait 5 minutes and try again.

You can check the GitHub pages deployment action list to see the current status of the deployments.

Changed files 🔄:

@stacimc stacimc marked this pull request as ready for review January 8, 2024 23:14
@stacimc stacimc requested review from a team as code owners January 8, 2024 23:14
@stacimc stacimc force-pushed the add/create-new-es-index-dag branch from 19b756a to db83bf0 Compare January 9, 2024 19:00
Copy link
Member

@krysal krysal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks exciting! I have yet to review and test it in more detail, but regarding the new environment variables, at least I don't see them in the env.template file. Shouldn't they be added there?

@stacimc
Copy link
Collaborator Author

stacimc commented Jan 12, 2024

regarding the new environment variables, at least I don't see them in the env.template file. Shouldn't they be added there?

Oh thank you for this! I wanted to use the existing AIRFLOW_CONN_ELASTICSEARCH_HTTP_<x> variables, but couldn't figure out why it was breaking and had to duplicate them. Your comment reminded me to look again, and I realize it's because we do some encoding of those env vars that we don't want in this case.

I had forgotten about that detail but now it's fixed and we can reuse the existing conn ids :)

Copy link
Member

@krysal krysal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! I tested several cases and all worked as expected. Thanks for the detailed testing instructions as always ⭐ I left some minor suggestions but nothing blocking. It's very cool to see this DAG come to live after so much planning and work put on the conceptual definition!

Last thing I want to mention is if we can change the catalog/dags/es folder for the full name, elasticsearch. For someone new, having the full name would be much more easy to grasp.

catalog/dags/es/create_new_es_index/create_new_es_index.py Outdated Show resolved Hide resolved
),
"index_config": Param(
default={},
type=["object"],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this param is optional, it is better to indicate it here so it prevents adding the red * in the form and avoids confusion for thinking maybe it's required (same for query).

Suggested change
type=["object"],
type=["object", "null"],

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query is unambiguously optional, but index_config isn't really, although I think it's on the line. It's just that the empty config, which is also the default, is valid (although not a common expected use case as far as I know).

Comment on lines 83 to 87
# Do not automatically apply any aliases to the new index
current_index_config.pop("aliases")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well spotted!

@stacimc
Copy link
Collaborator Author

stacimc commented Jan 17, 2024

Last thing I want to mention is if we can change the catalog/dags/es folder for the full name, elasticsearch. For someone new, having the full name would be much more easy to grasp.

I'm not opposed, and I do see your point. But es is consistent with our existing docker/es naming, and changing it to elasticsearch causes a naming collision with the elasticsearch library which we'd have to work around.

Copy link
Collaborator

@AetherUnbound AetherUnbound left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so fantastic! It's incredible to see us start building the power to control Elasticsearch directly in Airflow, without having to go through the ingestion server 🚀 I have a number of questions, nothing blocking but a few changes might be good before we merge.



@task_group(group_id="prevent_concurrency")
def prevent_concurrency_with_dags(external_dag_ids: list[str]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is excellent and so simple!

catalog/dags/data_refresh/create_filtered_index_dag.py Outdated Show resolved Hide resolved
@@ -0,0 +1,53 @@
def merge_configurations(base_configuration, update_configuration):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will likely be useful for #3240 too - would you mind putting it under a common/es.py module or something similar?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could -- I was planning on just pulling things out as they're needed by the new DAGs rather than trying to anticipate exactly what would be needed here. This is an example of something I wouldn't have guessed we would use for the filtered index DAGs, actually -- maybe the tasks for triggering the reindex, but I didn't think we had any plans for merging configurations elsewhere.

Comment on lines 57 to 62
[data_refresh.dag_id for data_refresh in DATA_REFRESH_CONFIGS.values()]
+ [ # Block on the filtered index creation DAGs
data_refresh.filtered_index_dag_id
for data_refresh in DATA_REFRESH_CONFIGS.values()
]
),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loveeee this 🤩



@task
def get_current_index_configuration(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another function I could totally see moving into a common.es module!

catalog/dags/es/create_new_es_index/create_new_es_index.py Outdated Show resolved Hide resolved
catalog/dags/es/create_new_es_index/create_new_es_index.py Outdated Show resolved Hide resolved
catalog/dags/es/create_new_es_index/create_new_es_index.py Outdated Show resolved Hide resolved
catalog/dags/es/create_new_es_index/create_new_es_index.py Outdated Show resolved Hide resolved
@openverse-bot
Copy link
Collaborator

Based on the medium urgency of this PR, the following reviewers are being gently reminded to review this PR:

@obulat
This reminder is being automatically generated due to the urgency configuration.

Excluding weekend1 days, this PR was ready for review 8 day(s) ago. PRs labelled with medium urgency are expected to be reviewed within 4 weekday(s)2.

@stacimc, if this PR is not ready for a review, please draft it to prevent reviewers from getting further unnecessary pings.

Footnotes

  1. Specifically, Saturday and Sunday.

  2. For the purpose of these reminders we treat Monday - Friday as weekdays. Please note that the operation that generates these reminders runs at midnight UTC on Monday - Friday. This means that depending on your timezone, you may be pinged outside of the expected range.

@krysal krysal removed the request for review from obulat January 19, 2024 17:12
@stacimc stacimc force-pushed the add/create-new-es-index-dag branch from 2f204c7 to 37821b7 Compare January 22, 2024 19:11
Copy link
Collaborator

@AetherUnbound AetherUnbound left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really close! I think just one more change is necessary given the new variable. That variable will also be queried at DAG parsing time as well, though again I'm not sure the best way to avoid it given we want it as part of the config. Maybe since it's called as a parameter to an Airflow tasks, we can default it to the jinja rendering for calling that Variable, e.g. "{{ var.json.ES_INDEX_THROTTLING_RATE }}"?

Comment on lines 24 to 27
@task
def get_es_host(environment: str):
conn = Connection.get_connection_from_secrets(f"elasticsearch_http_{environment}")
return conn.host
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least this was an easy change!

@stacimc stacimc force-pushed the add/create-new-es-index-dag branch from 37821b7 to 6c7c193 Compare January 23, 2024 20:14
Copy link
Collaborator

@AetherUnbound AetherUnbound left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! 🥳

@stacimc stacimc merged commit 8e21893 into main Jan 23, 2024
40 checks passed
@stacimc stacimc deleted the add/create-new-es-index-dag branch January 23, 2024 23:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
💻 aspect: code Concerns the software code in the repository 🌟 goal: addition Addition of new feature 🟨 priority: medium Not blocking but should be addressed soon 🧱 stack: catalog Related to the catalog and Airflow DAGs
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

Build ES index creation DAG
5 participants