Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce instance mappings #132

Merged
merged 31 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
8c08fed
introduce instance mappings
danyilq Jul 31, 2023
ead493b
Implement InstanceMappings and default and cloud implementations
farshidz Jul 31, 2023
97489cd
Remove index name from control url method
farshidz Jul 31, 2023
705082e
Update httprequests module to use InstanceMappings
farshidz Jul 31, 2023
24a06d1
Fix class name
farshidz Jul 31, 2023
e83f0fe
fix minimum version check and add cloud error
danyilq Aug 1, 2023
9fca70b
delete enrich and improve error message
danyilq Aug 1, 2023
661d440
fix existing and add new tests
danyilq Aug 1, 2023
8ee578f
Add docstring for InstanceMappings
farshidz Aug 2, 2023
c26885c
move version_check and refactor
danyilq Aug 2, 2023
a4d93e2
refactor tests to work with cloud
danyilq Aug 2, 2023
acc162e
add readiness check for index that we want to clean documents in
danyilq Aug 2, 2023
fb766fa
unsupported operation + change hardcoded url for refresh_urls
danyilq Aug 3, 2023
e7f514d
adapt tests behaviour for cloud and partially refactor code
danyilq Aug 3, 2023
7b2491c
post debug changes to tests and new run env in tox
danyilq Aug 6, 2023
c6f57cd
update logging for wait_for_index_status
danyilq Aug 7, 2023
76b7b8f
improve testing
danyilq Aug 7, 2023
e99c7af
use http instead of client
danyilq Aug 7, 2023
5940880
Merge branch 'mainline' into danyil/instance-mappings
danyilq Aug 7, 2023
6f47bc5
follow up on all of the comments
danyilq Aug 7, 2023
a24d93e
Merge remote-tracking branch 'origin/danyil/instance-mappings' into d…
danyilq Aug 7, 2023
4e4d91a
final changes
danyilq Aug 8, 2023
32fc98a
improve test
danyilq Aug 8, 2023
b56e53e
consistent hash
danyilq Aug 8, 2023
89c1fb8
consistent hash
danyilq Aug 8, 2023
0472dff
better namings
danyilq Aug 8, 2023
a711a95
bump pymarqo version
danyilq Aug 8, 2023
836b4b7
adapt to 32 characters index name limit
danyilq Aug 8, 2023
e7b381b
reduce test_demos.py index name length
danyilq Aug 8, 2023
53db2d3
raise error if index_name is too long
danyilq Aug 8, 2023
2be58c2
update version
danyilq Aug 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions src/marqo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from requests.exceptions import RequestException
from typing_extensions import deprecated

from marqo.cloud_helpers import cloud_wait_for_index_status
from marqo.default_instance_mappings import DefaultInstanceMappings
from marqo.index import Index
from marqo.config import Config
Expand Down Expand Up @@ -42,7 +43,12 @@ def __init__(
Parameters
----------
url:
The url to the S2Search API (ex: http://localhost:8882)
The url to the Marqo API (ex: http://localhost:8882) If MARQO_CLOUD_URL environment variable is set, when
matching url is passed, the client will use the Marqo Cloud instance mappings.
instance_mappings:
An instance of InstanceMappings that maps index names to urls
api_key:
The api key to use for authentication with the Marqo API
"""
if url is not None and instance_mappings is not None:
raise ValueError("Cannot specify both url and instance_mappings")
Expand Down Expand Up @@ -120,6 +126,7 @@ def delete_index(self, index_name: str) -> Dict[str, Any]:
"""
try:
res = self.http.delete(path=f"indexes/{index_name}")
cloud_wait_for_index_status(self.http, index_name, enums.IndexStatus.DELETED)
except errors.MarqoWebError as e:
return e.message

Expand Down Expand Up @@ -194,7 +201,7 @@ def _base64url_encode(
@deprecated(
"This method is deprecated and will be removed in Marqo 2.0.0. "
"Please use `client.index(index_name).get_marqo()` instead. "
"Check `https://docs.marqo.ai/latest/API-Reference/indexes/` for more details."
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details."
)
def get_marqo(self):
if self.config.is_marqo_cloud:
Expand All @@ -204,14 +211,14 @@ def get_marqo(self):
@deprecated(
"This method is deprecated and will be removed in Marqo 2.0.0. "
"Please use `client.index(index_name).health()` instead. "
"Check `https://docs.marqo.ai/latest/API-Reference/indexes/` for more details."
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details."
)
def health(self):
if self.config.is_marqo_cloud:
self.raise_error_for_cloud("health")
mq_logger.warning('The `client.health()` API has been deprecated and will be removed in '
'Marqo 2.0.0. Use `client.index(index_name).health()` instead. '
'Check `https://docs.marqo.ai/latest/API-Reference/indexes/` for more details.')
'Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details.')
try:
return self.http.get(path="health")
except (MarqoWebError, RequestException, TypeError, KeyError) as e:
Expand All @@ -224,7 +231,7 @@ def health(self):
@deprecated(
"This method is deprecated and will be removed in Marqo 2.0.0. "
"Please use 'mq.index(index_name).eject_model() instead. "
"Check `https://docs.marqo.ai/latest/API-Reference/indexes/` for more details."
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details."
)
def eject_model(self, model_name: str, model_device: str):
if self.config.is_marqo_cloud:
Expand All @@ -234,7 +241,7 @@ def eject_model(self, model_name: str, model_device: str):
@deprecated(
"This method is deprecated and will be removed in Marqo 2.0.0. "
"Please use 'mq.index(index_name).get_loaded_models() instead. "
"Check `https://docs.marqo.ai/latest/API-Reference/indexes/` for more details."
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details."
)
def get_loaded_models(self):
if self.config.is_marqo_cloud:
Expand All @@ -244,7 +251,7 @@ def get_loaded_models(self):
@deprecated(
danyilq marked this conversation as resolved.
Show resolved Hide resolved
"This method is deprecated and will be removed in Marqo 2.0.0. "
"Please use 'mq.index(index_name).get_cuda_info() instead. "
"Check `https://docs.marqo.ai/latest/API-Reference/indexes/` for more details."
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details."
)
def get_cuda_info(self):
if self.config.is_marqo_cloud:
Expand All @@ -254,7 +261,7 @@ def get_cuda_info(self):
@deprecated(
"This method is deprecated and will be removed in Marqo 2.0.0. "
"Please use 'mq.index(index_name).get_cpu_info() instead. "
"Check `https://docs.marqo.ai/latest/API-Reference/indexes/` for more details."
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details."
)
def get_cpu_info(self):
if self.config.is_marqo_cloud:
Expand All @@ -266,5 +273,5 @@ def raise_error_for_cloud(function_name: str = None):
raise errors.BadRequestError(
f"The `mq.{function_name}()` API is not supported on Marqo Cloud. "
f"Please Use `mq.index('your-index-name').{function_name}()` instead. "
"Check `https://docs.marqo.ai/latest/API-Reference/indexes/` for more details.")
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details.")

13 changes: 13 additions & 0 deletions src/marqo/cloud_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import time

from marqo.marqo_logging import mq_logger


def cloud_wait_for_index_status(req, index_name, status):
danyilq marked this conversation as resolved.
Show resolved Hide resolved
creation = req.get(f"indexes/{index_name}/status")
while creation['index_status'] != status:
time.sleep(10)
creation = req.get(f"indexes/{index_name}/status")
mq_logger.info(f"Index creation status: {creation['index_status']}")
mq_logger.info("Index created successfully")
return True
4 changes: 4 additions & 0 deletions src/marqo/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ class Devices:
cpu = "cpu"
cuda = "cuda"


class IndexStatus:
CREATED = "READY"
DELETED = "DELETED"
35 changes: 22 additions & 13 deletions src/marqo/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from requests import RequestException

from marqo import defaults
from marqo.cloud_helpers import cloud_wait_for_index_status
from marqo.enums import IndexStatus
import typing
from urllib import parse
from datetime import datetime
Expand Down Expand Up @@ -49,12 +51,27 @@ def __init__(
self.index_name = index_name
self.created_at = self._maybe_datetime(created_at)
self.updated_at = self._maybe_datetime(updated_at)
self._marqo_minimum_supported_version_check()
skip_version_check = False
if config.is_marqo_cloud:
try:
if self.get_status()["index_status"] != IndexStatus.CREATED:
logging.warning(f"Index {index_name} is not ready. Status: {self.get_status()}, operations may fail.")
danyilq marked this conversation as resolved.
Show resolved Hide resolved
skip_version_check = True
except Exception as e:
danyilq marked this conversation as resolved.
Show resolved Hide resolved
skip_version_check = True
if not skip_version_check:
self._marqo_minimum_supported_version_check()
else:
logging.warning("Version check is skipped because index is not ready yet.")
danyilq marked this conversation as resolved.
Show resolved Hide resolved

def delete(self) -> Dict[str, Any]:
"""Delete the index.
"""
return self.http.delete(path=f"indexes/{self.index_name}")
response = self.http.delete(path=f"indexes/{self.index_name}")
if self.config.is_marqo_cloud:
cloud_wait_for_index_status(self.http, self.index_name, IndexStatus.DELETED)
danyilq marked this conversation as resolved.
Show resolved Hide resolved
return response


@staticmethod
def create(config: Config, index_name: str,
Expand Down Expand Up @@ -93,20 +110,12 @@ def create(config: Config, index_name: str,
Returns:
Response body, containing information about index creation result
"""
def cloud_wait_for_index_ready():
creation = req.get(f"indexes/{index_name}/status")
while creation['index_status'] != 'READY':
time.sleep(10)
creation = req.get(f"indexes/{index_name}/status")
mq_logger.info(f"Index creation status: {creation['index_status']}")
mq_logger.info("Index created successfully")
return True
req = HttpRequests(config)

if settings_dict is not None and settings_dict:
response = req.post(f"indexes/{index_name}", body=settings_dict)
if config.is_marqo_cloud:
cloud_wait_for_index_ready()
cloud_wait_for_index_status(req, index_name, IndexStatus.CREATED)
return response

if config.api_key is not None:
Expand All @@ -130,7 +139,7 @@ def cloud_wait_for_index_ready():
cl_settings['number_of_replicas'] = replicas_count
cl_settings['number_of_shards'] = storage_node_count
response = req.post(f"indexes/{index_name}", body=cl_settings)
cloud_wait_for_index_ready()
cloud_wait_for_index_status(req, index_name, IndexStatus.CREATED)
return response

return req.post(f"indexes/{index_name}", body={
Expand Down Expand Up @@ -566,7 +575,7 @@ def get_settings(self) -> dict:

def health(self) -> dict:
"""Check the health of an index"""
return self.http.get(path=f"indexes/{self.index_name}/health", index_name=self.index_name)
return self.http.get(path="health", index_name=self.index_name)

def get_loaded_models(self):
return self.http.get(path="models", index_name=self.index_name)
Expand Down
98 changes: 67 additions & 31 deletions tests/marqo_test.py
danyilq marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
import logging
import time
import uuid
from collections import defaultdict
from functools import wraps
import json
Expand All @@ -18,7 +19,8 @@
from marqo._httprequests import HTTP_OPERATIONS
from marqo.version import __marqo_version__ as py_marqo_support_version
from marqo.client import Client
from marqo.errors import InternalError
from marqo.errors import InternalError, MarqoApiError
import zlib


class MockHTTPTraffic(BaseModel):
Expand Down Expand Up @@ -90,7 +92,7 @@ def wrapper(self, *args, **kwargs):
for index_name, docs in index_to_documents.items():
if len(docs) == 0:
continue
self.client.create_index(index_name=index_name)
self.create_test_index(index_name=index_name)
self.client.index(index_name).add_documents(docs, non_tensor_fields=[])
if self.IS_MULTI_INSTANCE:
self.warm_request(self.client.bulk_search, [{
Expand All @@ -113,10 +115,14 @@ def setUpClass(cls) -> None:
api_key = os.environ.get("MARQO_API_KEY", None)
if (api_key):
local_marqo_settings["api_key"] = api_key

cls.index_suffix = os.environ.get("MARQO_INDEX_SUFFIX", None)
if not cls.index_suffix:
danyilq marked this conversation as resolved.
Show resolved Hide resolved
os.environ["MARQO_INDEX_SUFFIX"] = str(uuid.uuid4())[:8]
pandu-k marked this conversation as resolved.
Show resolved Hide resolved
cls.index_suffix = os.environ["MARQO_INDEX_SUFFIX"]
cls.client_settings = local_marqo_settings
cls.authorized_url = cls.client_settings["url"]
cls.generic_test_index_name = 'test-index'
danyilq marked this conversation as resolved.
Show resolved Hide resolved
cls.generic_test_index_name_2 = cls.generic_test_index_name + '-2'

# class property to indicate if test is being run on multi
cls.IS_MULTI_INSTANCE = (True if os.environ.get("IS_MULTI_INSTANCE", False) in ["True", "TRUE", "true", True] else False)
Expand All @@ -126,15 +132,34 @@ def tearDownClass(cls) -> None:
"""Delete commonly used test indexes after all tests are run
"""
client = marqo.Client(**cls.client_settings)
commonly_used_ix_name = 'my-test-index-1'
indexes_to_tear_down = [cls.generic_test_index_name, commonly_used_ix_name]
for ix_name in indexes_to_tear_down:
for index in client.get_indexes()['results']:
danyilq marked this conversation as resolved.
Show resolved Hide resolved
if not client.config.is_marqo_cloud:
try:
client.delete_index(ix_name)
index.delete()
except marqo.errors.MarqoApiError as e:
logging.debug(f'received error `{e}` from index deletion request.')

def setUp(self) -> None:
self.client = Client(**self.client_settings)
for index in self.client.get_indexes()['results']:
danyilq marked this conversation as resolved.
Show resolved Hide resolved
if not self.client.config.is_marqo_cloud:
try:
index.delete()
except marqo.errors.MarqoApiError as e:
logging.debug(f'received error `{e}` from index deletion request.')
else:
self.cleanup_documents_from_all_indices()
danyilq marked this conversation as resolved.
Show resolved Hide resolved

def tearDown(self) -> None:
for index in self.client.get_indexes()['results']:
danyilq marked this conversation as resolved.
Show resolved Hide resolved
if not self.client.config.is_marqo_cloud:
try:
index.delete()
except marqo.errors.MarqoApiError as e:
logging.debug(f'received error `{e}` from index deletion request.')
else:
self.cleanup_documents_from_all_indices()
danyilq marked this conversation as resolved.
Show resolved Hide resolved

def warm_request(self, func, *args, **kwargs):
'''
Takes in a function object, func, and executes the function 5 times to warm search results.
Expand All @@ -144,37 +169,48 @@ def warm_request(self, func, *args, **kwargs):
for i in range(5):
func(*args, **kwargs)

def create_cloud_index(self, index_name, index_defaults=None, **kwargs):
def create_cloud_index(self, index_name, settings_dict=None, **kwargs):
def create_settings_hash():
danyilq marked this conversation as resolved.
Show resolved Hide resolved
combined_dict = {**settings_dict, **kwargs}
combined_str = ''.join(f"{key}{value}" for key, value in combined_dict.items())
crc32_hash = zlib.crc32(combined_str.encode())
short_hash = hex(crc32_hash & 0xffffffff)[2:][
:10] # Take the first 10 characters of the hexadecimal representation
print(f"Created index with settings hash: {short_hash} for settings: {combined_dict}")
danyilq marked this conversation as resolved.
Show resolved Hide resolved
return short_hash

client = marqo.Client(**self.client_settings)
index_settings = index_defaults if index_defaults else {}
index_settings.update({
settings_dict = settings_dict if settings_dict else {}
index_name = f"{index_name}-{self.index_suffix}"
if settings_dict or kwargs:
index_name = f"{index_name}-{create_settings_hash()}"
danyilq marked this conversation as resolved.
Show resolved Hide resolved
settings_dict.update({
"inference_type": "marqo.CPU", "storage_class": "marqo.basic", "model": "hf/all_datasets_v4_MiniLM-L6"
})
while True:
danyilq marked this conversation as resolved.
Show resolved Hide resolved
if client.http.get(f"/indexes/{index_name}/status") == "READY":
break
try:
client.create_index(index_name, settings_dict=index_settings)
except marqo.errors.MarqoWebError as e:
if e.status_code == 409:
continue

def create_index(self, index_name, index_defaults=None, **kwargs):
if client.http.get(f"/indexes/{index_name}/status")["index_status"] == "READY":
break
except Exception as e:
danyilq marked this conversation as resolved.
Show resolved Hide resolved
pass
self.client.create_index(index_name, settings_dict=settings_dict, **kwargs)
return index_name

def create_test_index(self, index_name, settings_dict=None, **kwargs):
danyilq marked this conversation as resolved.
Show resolved Hide resolved
client = marqo.Client(**self.client_settings)
if client.config.is_marqo_cloud:
self.create_cloud_index(index_name, index_defaults, **kwargs)
return
client.create_index(index_name, settings_dict=index_defaults, **kwargs)
index_name = self.create_cloud_index(index_name, settings_dict, **kwargs)
else:
client.create_index(index_name, settings_dict=settings_dict, **kwargs)
return index_name

def delete_documents(self, index_name):
def cleanup_documents_from_all_indices(self):
client = marqo.Client(**self.client_settings)
indexes = client.get_indexes()
pandu-k marked this conversation as resolved.
Show resolved Hide resolved
if index_name in indexes['results']:
if client.http.get(f"/indexes/{index_name}/status") == "READY":
try:
idx = client.index(index_name)
docs_to_delete = [i['_id'] for i in idx.search("")['hits']]
if docs_to_delete:
idx.delete_documents(docs_to_delete)
except marqo.errors.MarqoCloudIndexNotFoundError:
pass
for index in indexes['results']:
if self.index_suffix in index.index_name.split('-'):
danyilq marked this conversation as resolved.
Show resolved Hide resolved
if client.http.get(f"/indexes/{index.index_name}/status")["index_status"] == "READY":
docs_to_delete = [i['_id'] for i in index.search("")['hits']]
danyilq marked this conversation as resolved.
Show resolved Hide resolved
while docs_to_delete:
index.delete_documents(docs_to_delete, auto_refresh=True)
docs_to_delete = [i['_id'] for i in index.search("")['hits']]
24 changes: 24 additions & 0 deletions tests/scripts/delete_all_indexes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import os

import marqo


def cleanup_documents_from_all_indices():
danyilq marked this conversation as resolved.
Show resolved Hide resolved
local_marqo_settings = {
"url": os.environ.get("MARQO_URL", 'http://localhost:8882'),
}
api_key = os.environ.get("MARQO_API_KEY", None)
if api_key:
local_marqo_settings["api_key"] = api_key
client = marqo.Client(**local_marqo_settings)
indexes = client.get_indexes()
for index in indexes['results']:
pandu-k marked this conversation as resolved.
Show resolved Hide resolved
if client.config.is_marqo_cloud:
if index.get_status()["index_status"] == marqo.enums.IndexStatus.CREATED:
danyilq marked this conversation as resolved.
Show resolved Hide resolved
index.delete()
else:
index.delete()


if __name__ == '__main__':
cleanup_documents_from_all_indices()
Loading
Loading