Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce instance mappings #132

Merged
merged 31 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
8c08fed
introduce instance mappings
danyilq Jul 31, 2023
ead493b
Implement InstanceMappings and default and cloud implementations
farshidz Jul 31, 2023
97489cd
Remove index name from control url method
farshidz Jul 31, 2023
705082e
Update httprequests module to use InstanceMappings
farshidz Jul 31, 2023
24a06d1
Fix class name
farshidz Jul 31, 2023
e83f0fe
fix minimum version check and add cloud error
danyilq Aug 1, 2023
9fca70b
delete enrich and improve error message
danyilq Aug 1, 2023
661d440
fix existing and add new tests
danyilq Aug 1, 2023
8ee578f
Add docstring for InstanceMappings
farshidz Aug 2, 2023
c26885c
move version_check and refactor
danyilq Aug 2, 2023
a4d93e2
refactor tests to work with cloud
danyilq Aug 2, 2023
acc162e
add readiness check for index that we want to clean documents in
danyilq Aug 2, 2023
fb766fa
unsupported operation + change hardcoded url for refresh_urls
danyilq Aug 3, 2023
e7f514d
adapt tests behaviour for cloud and partially refactor code
danyilq Aug 3, 2023
7b2491c
post debug changes to tests and new run env in tox
danyilq Aug 6, 2023
c6f57cd
update logging for wait_for_index_status
danyilq Aug 7, 2023
76b7b8f
improve testing
danyilq Aug 7, 2023
e99c7af
use http instead of client
danyilq Aug 7, 2023
5940880
Merge branch 'mainline' into danyil/instance-mappings
danyilq Aug 7, 2023
6f47bc5
follow up on all of the comments
danyilq Aug 7, 2023
a24d93e
Merge remote-tracking branch 'origin/danyil/instance-mappings' into d…
danyilq Aug 7, 2023
4e4d91a
final changes
danyilq Aug 8, 2023
32fc98a
improve test
danyilq Aug 8, 2023
b56e53e
consistent hash
danyilq Aug 8, 2023
89c1fb8
consistent hash
danyilq Aug 8, 2023
0472dff
better namings
danyilq Aug 8, 2023
a711a95
bump pymarqo version
danyilq Aug 8, 2023
836b4b7
adapt to 32 characters index name limit
danyilq Aug 8, 2023
e7b381b
reduce test_demos.py index name length
danyilq Aug 8, 2023
53db2d3
raise error if index_name is too long
danyilq Aug 8, 2023
2be58c2
update version
danyilq Aug 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/marqo/cloud_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
from marqo.marqo_logging import mq_logger


def cloud_wait_for_index_status(req, index_name, status):
creation = req.get(f"indexes/{index_name}/status")
while creation['index_status'] != status:
def cloud_wait_for_index_status(req , index_name: str, status):
""" Wait for index to be created on Marqo Cloud by checking
it's status every 10 seconds until it becomes expected value"""
danyilq marked this conversation as resolved.
Show resolved Hide resolved
current_status = req.get(f"indexes/{index_name}/status")
while current_status['index_status'] != status:
time.sleep(10)
creation = req.get(f"indexes/{index_name}/status")
mq_logger.info(f"Index creation status: {creation['index_status']}")
mq_logger.info("Index created successfully")
current_status = req.get(f"indexes/{index_name}/status")
mq_logger.info(f"Current index status: {current_status['index_status']}")
mq_logger.info(f"Index achieved status {status} successfully")
return True
10 changes: 5 additions & 5 deletions src/marqo/index.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import functools
import json
import logging
import pprint
import time

Expand Down Expand Up @@ -55,14 +54,16 @@ def __init__(
if config.is_marqo_cloud:
try:
if self.get_status()["index_status"] != IndexStatus.CREATED:
logging.warning(f"Index {index_name} is not ready. Status: {self.get_status()}, operations may fail.")
mq_logger.warning(f"Index {index_name} is not ready. Status: {self.get_status()}. Common operations, "
f"such as search and add_documents, may fail until the index is ready. "
f"Please check `mq.index('{index_name}').get_status()` for the index's status. "
f"Skipping version check.")
skip_version_check = True
except Exception as e:
danyilq marked this conversation as resolved.
Show resolved Hide resolved
skip_version_check = True
mq_logger.warning(f"Failed to get index status for index {index_name}. Skipping version check. Error: {e}")
if not skip_version_check:
self._marqo_minimum_supported_version_check()
else:
logging.warning("Version check is skipped because index is not ready yet.")

def delete(self) -> Dict[str, Any]:
"""Delete the index.
Expand Down Expand Up @@ -435,7 +436,6 @@ def _add_docs_organiser(
f"docs (server unbatched), for an average of {(res['processingTimeMs'] / (1000 * num_docs)):.3f}s per doc.")
if 'errors' in res and res['errors']:
mq_logger.info(error_detected_message)

if errors_detected:
mq_logger.info(error_detected_message)
total_add_docs_time = timer() - t0
Expand Down
96 changes: 51 additions & 45 deletions tests/marqo_test.py
danyilq marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,16 @@ def wrapper(self, *args, **kwargs):
return decorator


def create_settings_hash(settings_dict, kwargs):
danyilq marked this conversation as resolved.
Show resolved Hide resolved
combined_dict = {**settings_dict, **kwargs}
combined_str = ''.join(f"{key}{value}" for key, value in combined_dict.items())
crc32_hash = zlib.crc32(combined_str.encode())
short_hash = hex(crc32_hash & 0xffffffff)[2:][
:10] # Take the first 10 characters of the hexadecimal representation
print(f"Created index with settings hash: {short_hash} for settings: {combined_dict}")
return short_hash


class MarqoTestCase(TestCase):

@classmethod
Expand All @@ -115,10 +125,7 @@ def setUpClass(cls) -> None:
api_key = os.environ.get("MARQO_API_KEY", None)
if (api_key):
local_marqo_settings["api_key"] = api_key
cls.index_suffix = os.environ.get("MARQO_INDEX_SUFFIX", None)
if not cls.index_suffix:
os.environ["MARQO_INDEX_SUFFIX"] = str(uuid.uuid4())[:8]
cls.index_suffix = os.environ["MARQO_INDEX_SUFFIX"]
cls.index_suffix = os.environ.get("MARQO_INDEX_SUFFIX", "")
cls.client_settings = local_marqo_settings
cls.authorized_url = cls.client_settings["url"]
cls.generic_test_index_name = 'test-index'
danyilq marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -133,32 +140,35 @@ def tearDownClass(cls) -> None:
"""
client = marqo.Client(**cls.client_settings)
for index in client.get_indexes()['results']:
danyilq marked this conversation as resolved.
Show resolved Hide resolved
if not client.config.is_marqo_cloud:
try:
index.delete()
except marqo.errors.MarqoApiError as e:
logging.debug(f'received error `{e}` from index deletion request.')
if index.index_name.startswith(cls.generic_test_index_name):
if not client.config.is_marqo_cloud:
try:
index.delete()
except marqo.errors.MarqoApiError as e:
logging.debug(f'received error `{e}` from index deletion request.')

def setUp(self) -> None:
self.client = Client(**self.client_settings)
for index in self.client.get_indexes()['results']:
if not self.client.config.is_marqo_cloud:
try:
index.delete()
except marqo.errors.MarqoApiError as e:
logging.debug(f'received error `{e}` from index deletion request.')
else:
self.cleanup_documents_from_all_indices()
if self.client.config.is_marqo_cloud:
self.cleanup_documents_from_all_indices()
else:
for index in self.client.get_indexes()['results']:
if index.index_name.startswith(self.generic_test_index_name):
try:
index.delete()
except marqo.errors.MarqoApiError as e:
logging.debug(f'received error `{e}` from index deletion request.')

def tearDown(self) -> None:
for index in self.client.get_indexes()['results']:
if not self.client.config.is_marqo_cloud:
try:
index.delete()
except marqo.errors.MarqoApiError as e:
logging.debug(f'received error `{e}` from index deletion request.')
else:
self.cleanup_documents_from_all_indices()
if self.client.config.is_marqo_cloud:
self.cleanup_documents_from_all_indices()
else:
for index in self.client.get_indexes()['results']:
if index.index_name.startswith(self.generic_test_index_name):
try:
index.delete()
except marqo.errors.MarqoApiError as e:
logging.debug(f'received error `{e}` from index deletion request.')

def warm_request(self, func, *args, **kwargs):
'''
Expand All @@ -170,33 +180,28 @@ def warm_request(self, func, *args, **kwargs):
func(*args, **kwargs)

def create_cloud_index(self, index_name, settings_dict=None, **kwargs):
def create_settings_hash():
combined_dict = {**settings_dict, **kwargs}
combined_str = ''.join(f"{key}{value}" for key, value in combined_dict.items())
crc32_hash = zlib.crc32(combined_str.encode())
short_hash = hex(crc32_hash & 0xffffffff)[2:][
:10] # Take the first 10 characters of the hexadecimal representation
print(f"Created index with settings hash: {short_hash} for settings: {combined_dict}")
return short_hash

client = marqo.Client(**self.client_settings)
settings_dict = settings_dict if settings_dict else {}
index_name = f"{index_name}-{self.index_suffix}"
if settings_dict or kwargs:
index_name = f"{index_name}-{create_settings_hash()}"
index_name = f"{index_name}-{create_settings_hash(settings_dict, kwargs)}"
settings_dict.update({
"inference_type": "marqo.CPU", "storage_class": "marqo.basic", "model": "hf/all_datasets_v4_MiniLM-L6"
})
while True:
try:
if client.http.get(f"/indexes/{index_name}/status")["index_status"] == "READY":
break
except Exception as e:
pass
try:
status = client.http.get(f"/indexes/{index_name}/status")["index_status"]
if status == "CREATING":
while status == "CREATING":
time.sleep(10)
status = client.http.get(f"/indexes/{index_name}/status")["index_status"]
if status != "READY":
pandu-k marked this conversation as resolved.
Show resolved Hide resolved
self.client.create_index(index_name, settings_dict=settings_dict, **kwargs)
except Exception as e:
self.client.create_index(index_name, settings_dict=settings_dict, **kwargs)
return index_name

def create_test_index(self, index_name, settings_dict=None, **kwargs):
def create_test_index(self, index_name: str, settings_dict: dict = None, **kwargs):
"""Create a test index with the given name and settings and triggers specific logic if index is cloud index"""
client = marqo.Client(**self.client_settings)
if client.config.is_marqo_cloud:
index_name = self.create_cloud_index(index_name, settings_dict, **kwargs)
Expand All @@ -208,9 +213,10 @@ def cleanup_documents_from_all_indices(self):
client = marqo.Client(**self.client_settings)
indexes = client.get_indexes()
pandu-k marked this conversation as resolved.
Show resolved Hide resolved
for index in indexes['results']:
if self.index_suffix in index.index_name.split('-'):
if index.index_name.startswith(self.generic_test_index_name) and \
self.index_suffix in index.index_name.split('-'):
if client.http.get(f"/indexes/{index.index_name}/status")["index_status"] == "READY":
docs_to_delete = [i['_id'] for i in index.search("")['hits']]
docs_to_delete = [i['_id'] for i in index.search("", limit=100)['hits']]
while docs_to_delete:
index.delete_documents(docs_to_delete, auto_refresh=True)
docs_to_delete = [i['_id'] for i in index.search("")['hits']]
docs_to_delete = [i['_id'] for i in index.search("", limit=100)['hits']]
Empty file added tests/scripts/__init__.py
Empty file.
8 changes: 8 additions & 0 deletions tests/scripts/create_test_suffix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import os
import uuid


def set_index_suffix():
danyilq marked this conversation as resolved.
Show resolved Hide resolved
index_suffix = os.environ.get("MARQO_INDEX_SUFFIX", None)
if not index_suffix:
os.environ["MARQO_INDEX_SUFFIX"] = str(uuid.uuid4())[:8]
14 changes: 7 additions & 7 deletions tests/scripts/delete_all_indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@
import marqo


def cleanup_documents_from_all_indices():
def delete_all_test_indices():
local_marqo_settings = {
"url": os.environ.get("MARQO_URL", 'http://localhost:8882'),
}
suffix = os.environ.get("MARQO_INDEX_SUFFIX", None)
api_key = os.environ.get("MARQO_API_KEY", None)
if api_key:
local_marqo_settings["api_key"] = api_key
client = marqo.Client(**local_marqo_settings)
indexes = client.get_indexes()
for index in indexes['results']:
pandu-k marked this conversation as resolved.
Show resolved Hide resolved
if client.config.is_marqo_cloud:
if index.get_status()["index_status"] == marqo.enums.IndexStatus.CREATED:
index.delete()
else:
index.delete()
if index.index_name.startswith('test-index'):
if suffix is not None and suffix in index.index_name.split('-'):
if index.get_status()["index_status"] == marqo.enums.IndexStatus.CREATED:
index.delete()
danyilq marked this conversation as resolved.
Show resolved Hide resolved


if __name__ == '__main__':
cleanup_documents_from_all_indices()
delete_all_test_indices()
18 changes: 18 additions & 0 deletions tests/scripts/run_cloud_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import sys
from create_test_suffix import set_index_suffix
from delete_all_indexes import delete_all_test_indices

if __name__ == '__main__':
# Generate the random suffix
set_index_suffix()

# Run the first command to generate the suffix (already done)
# generate_index_suffix.py will set the TEST_INDEX_SUFFIX environment variable
danyilq marked this conversation as resolved.
Show resolved Hide resolved

# Run the second command with the generated suffix and pass posargs to pytest
import pytest
pytest_args = ['tests/', '-m', 'not ignore_cloud_tests'] + sys.argv[1:]
pytest.main(pytest_args)

# Run the third command that uses the suffix
delete_all_test_indices()
Loading
Loading