Skip to content

Commit

Permalink
[Issue 3166] cleanup to delete all indexes using prefix (#3372)
Browse files Browse the repository at this point in the history
## Summary
Fixes #{[3166](#3166)}

### Time to review: __5 mins__

## Changes proposed
> cleanup func to  delete all old indexes
  • Loading branch information
babebe authored Jan 7, 2025
1 parent 89581c1 commit 70c1fc0
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 14 deletions.
26 changes: 16 additions & 10 deletions api/src/adapters/search/opensearch_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def create_index(
*,
shard_count: int = 1,
replica_count: int = 1,
analysis: dict | None = None
analysis: dict | None = None,
) -> None:
"""
Create an empty search index
Expand Down Expand Up @@ -78,7 +78,7 @@ def bulk_upsert(
records: Iterable[dict[str, Any]],
primary_key_field: str,
*,
refresh: bool = True
refresh: bool = True,
) -> None:
"""
Bulk upsert records to an index
Expand Down Expand Up @@ -148,9 +148,20 @@ def alias_exists(self, alias_name: str) -> bool:
existing_index_mapping = self._client.cat.aliases(alias_name, format="json")
return len(existing_index_mapping) > 0

def swap_alias_index(
self, index_name: str, alias_name: str, *, delete_prior_indexes: bool = False
) -> None:
def cleanup_old_indices(self, index_prefix: str, indexes_to_keep: list[str]) -> None:
"""
Cleanup old indexes now that they aren't connected to the alias
"""
resp = self._client.cat.indices(f"{index_prefix}-*", format="json", h=["index"])

old_indexes = [
index["index"] for index in resp if index["index"] not in indexes_to_keep
] # omit the newly created one

for index in old_indexes:
self.delete_index(index)

def swap_alias_index(self, index_name: str, alias_name: str) -> None:
"""
For a given index, set it to the given alias. If any existing index(es) are
attached to the alias, remove them from the alias.
Expand All @@ -174,11 +185,6 @@ def swap_alias_index(

self._client.indices.update_aliases({"actions": actions})

# Cleanup old indexes now that they aren't connected to the alias
if delete_prior_indexes:
for index in existing_indexes:
self.delete_index(index)

def search_raw(self, index_name: str, search_query: dict) -> dict:
# Simple wrapper around search if you don't want the request or response
# object handled in any special way.
Expand Down
6 changes: 5 additions & 1 deletion api/src/search/backend/load_opportunities_to_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,13 @@ def full_refresh(self) -> None:

# handle aliasing of endpoints
self.search_client.swap_alias_index(
self.index_name, self.config.alias_name, delete_prior_indexes=True
self.index_name,
self.config.alias_name,
)

# cleanup old indexes
self.search_client.cleanup_old_indices(self.config.index_prefix, [self.index_name])

def fetch_opportunities(self) -> Iterator[Sequence[Opportunity]]:
"""
Fetch the opportunities in batches. The iterator returned
Expand Down
31 changes: 29 additions & 2 deletions api/tests/src/adapters/search/test_opensearch_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,15 @@ def test_swap_alias_index(search_client, generic_index):
search_client.bulk_upsert(tmp_index, tmp_index_records, primary_key_field="id")

# Set the alias
search_client.swap_alias_index(tmp_index, alias_name, delete_prior_indexes=True)
search_client.swap_alias_index(tmp_index, alias_name)

# Can search by this alias and get records from the tmp index
resp = search_client.search(alias_name, {}, include_scores=False)
assert resp.records == tmp_index_records

# Swap the index to the generic one + delete the tmp one
search_client.swap_alias_index(generic_index, alias_name, delete_prior_indexes=True)
search_client.swap_alias_index(generic_index, alias_name)
search_client.cleanup_old_indices("test-tmp-index", [generic_index])

resp = search_client.search(alias_name, {}, include_scores=False)
assert resp.records == records
Expand Down Expand Up @@ -213,3 +214,29 @@ def test_get_connection_parameters():
"connection_class": opensearchpy.RequestsHttpConnection,
"pool_maxsize": 10,
}


def test_cleanup_old_indices(search_client):
index_name_1 = f"test-index-{uuid.uuid4().int}" # old index
index_name_2 = f"test-index-{uuid.uuid4().int}" # old index
index_name_3 = f"partial-refresh-index-{uuid.uuid4().int}" # old index
index_name_4 = f"test-index-{uuid.uuid4().int}" # new index

search_client.create_index(index_name_1)
search_client.create_index(index_name_2)
search_client.create_index(index_name_3)
search_client.create_index(index_name_4)

# check all indexes were created
assert search_client.index_exists(index_name_1) is True
assert search_client.index_exists(index_name_2) is True
assert search_client.index_exists(index_name_3) is True
assert search_client.index_exists(index_name_4) is True

# expect old index with same prefix to be deleted and others to remain
search_client.cleanup_old_indices("test-index", [index_name_4])

assert search_client.index_exists(index_name_1) is False
assert search_client.index_exists(index_name_2) is False
assert search_client.index_exists(index_name_3) is True
assert search_client.index_exists(index_name_4) is True
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ def test_load_opportunities_to_index(
)
search_client.create_index(index_name)
search_client.swap_alias_index(
index_name, load_opportunities_to_index.config.alias_name, delete_prior_indexes=True
index_name,
load_opportunities_to_index.config.alias_name,
)

# Load a bunch of records into the DB
Expand Down

0 comments on commit 70c1fc0

Please sign in to comment.