Skip to content

Commit

Permalink
MC-35 marqo GA changes (#115)
Browse files Browse the repository at this point in the history
* MC-35 marqo GA changes

* remove server batch (#113)

* MC-35 add tests and changes after review

* resolve url only when needed for index + improve tests

* improves for edgecases

* use correct expiration time

* add error handling for indices

* timeout for slow refresh + tests

---------
  • Loading branch information
danyilq authored Jul 11, 2023
1 parent b51ec24 commit 73d8406
Show file tree
Hide file tree
Showing 11 changed files with 343 additions and 39 deletions.
24 changes: 15 additions & 9 deletions src/marqo/_httprequests.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
'put': session.put
}


class HttpRequests:
def __init__(self, config: Config) -> None:
self.config = config
Expand All @@ -32,19 +33,21 @@ def _operation(self, method: HTTP_OPERATIONS) -> Callable:

return OPERATION_MAPPING[method]

def _construct_path(self, path: str) -> str:
def _construct_path(self, path: str, index_name="") -> str:
"""Augment the URL request path based if telemetry is required."""
url = f"{self.config.get_url(index_name=index_name)}/{path}"
if self.config.use_telemetry:
delimeter= "?" if "?" not in f"{self.config.url}/{path}" else "&"
return f"{self.config.url}/{path}{delimeter}telemetry=True"
return f"{self.config.url}/{path}"
return url + f"{delimeter}telemetry=True"
return url

def send_request(
self,
http_operation: HTTP_OPERATIONS,
path: str,
body: Optional[Union[Dict[str, Any], List[Dict[str, Any]], List[str], str]] = None,
content_type: Optional[str] = None,
index_name: str = ""
) -> Any:
req_headers = copy.deepcopy(self.headers)

Expand All @@ -56,7 +59,7 @@ def send_request(

try:
response = self._operation(http_operation)(
url=self._construct_path(path),
url=self._construct_path(path, index_name),
timeout=self.config.timeout,
headers=req_headers,
data=body,
Expand All @@ -68,40 +71,43 @@ def send_request(
except requests.exceptions.ConnectionError as err:
raise BackendCommunicationError(str(err)) from err


def get(
self, path: str,
body: Optional[Union[Dict[str, Any], List[Dict[str, Any]], List[str], str]] = None,
index_name: str = ""
) -> Any:
content_type = None
if body is not None:
content_type = 'application/json'
return self.send_request('get', path=path, body=body, content_type=content_type)
return self.send_request('get', path=path, body=body, content_type=content_type,index_name=index_name)

def post(
self,
path: str,
body: Optional[Union[Dict[str, Any], List[Dict[str, Any]], List[str], str]] = None,
content_type: Optional[str] = 'application/json',
index_name: str = ""
) -> Any:
return self.send_request('post', path, body, content_type)
return self.send_request('post', path, body, content_type, index_name=index_name)

def put(
self,
path: str,
body: Optional[Union[Dict[str, Any], List[Dict[str, Any]], List[str], str]] = None,
content_type: Optional[str] = None,
index_name: str = ""
) -> Any:
if body is not None:
content_type = 'application/json'
return self.send_request('put', path, body, content_type)
return self.send_request('put', path, body, content_type, index_name=index_name)

def delete(
self,
path: str,
body: Optional[Union[Dict[str, Any], List[Dict[str, Any]], List[str]]] = None,
index_name: str = ""
) -> Any:
return self.send_request('delete', path, body)
return self.send_request('delete', path, body, index_name=index_name)

@staticmethod
def __to_json(
Expand Down
19 changes: 11 additions & 8 deletions src/marqo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ def create_index(
sentences_per_chunk=2,
sentence_overlap=0,
image_preprocessing_method=None,
settings_dict=None
settings_dict=None,
inference_node_type=None,
storage_node_type=None,
inference_node_count=1,
) -> Dict[str, Any]:
"""Create the index.
"""Create the index. Please refer to the marqo cloud to see options for inference and storage node types.
Args:
index_name: name of the index.
Expand All @@ -61,6 +64,9 @@ def create_index(
settings_dict: if specified, overwrites all other setting
parameters, and is passed directly as the index's
index_settings
inference_node_type:
storage_node_type:
inference_node_count;
Returns:
Response body, containing information about index creation result
"""
Expand All @@ -70,7 +76,8 @@ def create_index(
model=model, normalize_embeddings=normalize_embeddings,
sentences_per_chunk=sentences_per_chunk, sentence_overlap=sentence_overlap,
image_preprocessing_method=image_preprocessing_method,
settings_dict=settings_dict
settings_dict=settings_dict, inference_node_type=inference_node_type, storage_node_type=storage_node_type,
inference_node_count=inference_node_count
)

def delete_index(self, index_name: str) -> Dict[str, Any]:
Expand Down Expand Up @@ -101,7 +108,7 @@ def get_index(self, index_name: str) -> Index:
"""
ix = Index(self.config, index_name)
# verify it exists:
self.http.get(path=f"indexes/{index_name}/stats")
self.http.get(path=f"indexes/{index_name}/stats", index_name=index_name)
return ix

def index(self, index_name: str) -> Index:
Expand Down Expand Up @@ -170,19 +177,15 @@ def get_marqo(self):
def health(self):
return self.http.get(path="health")


def eject_model(self, model_name:str, model_device:str):
return self.http.delete(path=f"models?model_name={model_name}&model_device={model_device}")


def get_loaded_models(self):
return self.http.get(path="models")


def get_cuda_info(self):
return self.http.get(path="device/cuda")


def get_cpu_info(self):
return self.http.get(path="device/cpu")

Expand Down
18 changes: 17 additions & 1 deletion src/marqo/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from marqo import enums, utils
import urllib3
import warnings
from marqo.marqo_url_resolver import MarqoUrlResolver


class Config:
Expand All @@ -24,9 +25,11 @@ def __init__(
"""
self.cluster_is_remote = False
self.cluster_is_s2search = False
self.cluster_is_marqo = False
self.marqo_url_resolver = None
self.api_key = api_key
self.url = self.set_url(url)
self.timeout = timeout
self.api_key = api_key
# suppress warnings until we figure out the dependency issues:
# warnings.filterwarnings("ignore")
self.use_telemetry = use_telemetry
Expand All @@ -43,5 +46,18 @@ def set_url(self, url):
self.cluster_is_remote = True
if "s2search.io" in lowered_url:
self.cluster_is_s2search = True
if "api.marqo.ai" in lowered_url:
self.cluster_is_marqo = True
self.marqo_url_resolver = MarqoUrlResolver(api_key=self.api_key, expiration_time=15)
self.url = url
return self.url

def get_url(self, index_name=None,):
"""Get the URL, and infers whether that url is marqo cloud,
and if it is targeting a specific index resolves the index-specific url"""
if not self.cluster_is_marqo:
return self.url
if self.cluster_is_marqo and not index_name:
return self.url + "/api"
# calls resolver to get index-specific url for when cluster is marqo and index_name is not None
return self.marqo_url_resolver[index_name]
4 changes: 2 additions & 2 deletions src/marqo/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ def get_cloud_default_index_settings():
"patch_method": None
}
},
"number_of_shards": 2,
"number_of_replicas": 1,
"number_of_shards": 1,
"number_of_replicas": 0,
}
24 changes: 24 additions & 0 deletions src/marqo/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,27 @@ class BackendTimeoutError(InternalError):
def __init__(self, message: str,) -> None:
self.message = f"Timeout error communicating with Marqo: {message}"


class MarqoCloudIndexNotReadyError(MarqoError):
"""Error when Marqo index is not ready"""
code = "index_not_ready_cloud"
status_code = HTTPStatus.NOT_FOUND

def __init__(self, index_name: str,) -> None:
self.message = f"The Python client could not resolve the endpoint for the index name {index_name}. " \
f"This could be due to the index is still in the process of being created," \
f" or that the client's cache has not yet been updated.\n" \
f"- Please try again in a couple of minutes, or you can query the index status" \
f" with mq.index({index_name}).get_status function to see when index is ready.\n" \
f"- If the problem persists, please contact marqo support at support@marqo.ai"


class MarqoCloudIndexNotFoundError(MarqoError):
"""Error when Marqo index is not ready"""
code = "index_not_found_cloud"
status_code = HTTPStatus.NOT_FOUND

def __init__(self, index_name: str,) -> None:
self.message = f"The index name {index_name} does not exist in the Marqo cloud or client's cache" \
f" has not yet been updated. Please check the index name and try again.\n" \
f"- If the problem persists, please contact marqo support at support@marqo.ai"
52 changes: 39 additions & 13 deletions src/marqo/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import json
import logging
import pprint
import time

from marqo import defaults
import typing
from urllib import parse
Expand Down Expand Up @@ -54,7 +56,10 @@ def create(config: Config, index_name: str,
sentences_per_chunk=2,
sentence_overlap=0,
image_preprocessing_method=None,
settings_dict: dict = None
settings_dict: dict = None,
inference_node_type: str = None,
storage_node_type: str = None,
inference_node_count: int = 1,
) -> Dict[str, Any]:
"""Create the index.
Expand All @@ -70,6 +75,9 @@ def create(config: Config, index_name: str,
settings_dict: if specified, overwrites all other setting
parameters, and is passed directly as the index's
index_settings
inference_node_type: inference type for the index
storage_node_type: storage type for the index
inference_node_count: number of inference nodes for the index
Returns:
Response body, containing information about index creation result
"""
Expand All @@ -91,7 +99,19 @@ def create(config: Config, index_name: str,
cl_text_preprocessing['split_length'] = sentences_per_chunk
cl_img_preprocessing = cl_ix_defaults['image_preprocessing']
cl_img_preprocessing['patch_method'] = image_preprocessing_method
return req.post(f"indexes/{index_name}", body=cl_settings)
if not config.cluster_is_marqo:
return req.post(f"indexes/{index_name}", body=cl_settings)
cl_settings['inference_type'] = inference_node_type
cl_settings['storage_class'] = storage_node_type
cl_settings['inference_node_count'] = inference_node_count
response = req.post(f"indexes/{index_name}", body=cl_settings)
index = Index(config, index_name)
creation = index.get_status()
while creation['index_status'] != 'READY':
time.sleep(10)
creation = index.get_status()
mq_logger.info(f"Index creation status: {creation['index_status']}")
return response

return req.post(f"indexes/{index_name}", body={
"index_defaults": {
Expand All @@ -111,7 +131,11 @@ def create(config: Config, index_name: str,

def refresh(self):
"""refreshes the index"""
return self.http.post(path=F"indexes/{self.index_name}/refresh")
return self.http.post(path=F"indexes/{self.index_name}/refresh", index_name=self.index_name,)

def get_status(self):
"""gets the status of the index"""
return self.http.get(path=F"indexes/{self.index_name}/status")

def search(self, q: Union[str, dict], searchable_attributes: Optional[List[str]] = None,
limit: int = 10, offset: int = 0, search_method: Union[SearchMethods.TENSOR, str] = SearchMethods.TENSOR,
Expand Down Expand Up @@ -184,7 +208,8 @@ def search(self, q: Union[str, dict], searchable_attributes: Optional[List[str]]
body["modelAuth"] = model_auth
res = self.http.post(
path=path_with_query_str,
body=body
body=body,
index_name=self.index_name,
)

num_results = len(res["hits"])
Expand Down Expand Up @@ -214,7 +239,7 @@ def get_document(self, document_id: str, expose_facets=None) -> Dict[str, Any]:
url_string = f"indexes/{self.index_name}/documents/{document_id}"
if expose_facets is not None:
url_string += f"?expose_facets={expose_facets}"
return self.http.get(url_string)
return self.http.get(url_string, index_name=self.index_name,)

def get_documents(self, document_ids: List[str], expose_facets=None) -> Dict[str, Any]:
"""Gets a selection of documents based on their IDs.
Expand All @@ -233,7 +258,8 @@ def get_documents(self, document_ids: List[str], expose_facets=None) -> Dict[str
url_string += f"?expose_facets={expose_facets}"
return self.http.get(
url_string,
body=document_ids
body=document_ids,
index_name=self.index_name,
)

def add_documents(
Expand Down Expand Up @@ -340,7 +366,9 @@ def _add_docs_organiser(
# ADD DOCS TIMER-LOGGER (2)
start_time_client_request = timer()

res = self.http.post(path=path_with_query_str, body=documents)
res = self.http.post(
path=path_with_query_str, body=documents, index_name=self.index_name,
)

end_time_client_request = timer()
total_client_request_time = end_time_client_request - start_time_client_request
Expand Down Expand Up @@ -374,13 +402,11 @@ def delete_documents(self, ids: List[str], auto_refresh: bool = None) -> Dict[st
base_path = f"indexes/{self.index_name}/documents/delete-batch"
path_with_refresh = base_path if auto_refresh is None else base_path + f"?refresh={str(auto_refresh).lower()}"

return self.http.post(
path=path_with_refresh, body=ids
)
return self.http.post(path=path_with_refresh, body=ids, index_name=self.index_name,)

def get_stats(self) -> Dict[str, Any]:
"""Get stats about the index"""
return self.http.get(path=f"indexes/{self.index_name}/stats")
return self.http.get(path=f"indexes/{self.index_name}/stats", index_name=self.index_name,)

@staticmethod
def _maybe_datetime(the_date: Optional[Union[datetime, str]]) -> Optional[datetime]:
Expand Down Expand Up @@ -434,7 +460,7 @@ def verbosely_add_docs(i, docs):
errors_detected = False

t0 = timer()
res = self.http.post(path=path_with_query_str, body=docs)
res = self.http.post(path=path_with_query_str, body=docs, index_name=self.index_name,)

total_batch_time = timer() - t0
num_docs = len(docs)
Expand Down Expand Up @@ -491,4 +517,4 @@ def verbosely_add_docs(i, docs):

def get_settings(self) -> dict:
"""Get all settings of the index"""
return self.http.get(path=f"indexes/{self.index_name}/settings")
return self.http.get(path=f"indexes/{self.index_name}/settings", index_name=self.index_name,)
Loading

0 comments on commit 73d8406

Please sign in to comment.