Skip to content

Commit

Permalink
introduce instance mappings (#132)
Browse files Browse the repository at this point in the history
* introduce instance mappings
* Introduce changes for tests to make them compatible with Marqo Cloud

---------

Co-authored-by: Farshid Zavareh <farshid@marqo.ai>
  • Loading branch information
danyilq and farshidz authored Aug 9, 2023
1 parent a209023 commit 8b7e172
Show file tree
Hide file tree
Showing 40 changed files with 1,311 additions and 1,041 deletions.
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[build-system]
requires = ["setuptools>=42"]
build-backend = "setuptools.build_meta"
build-backend = "setuptools.build_meta"

[tool.pytest.ini_options]
markers = "ignore_during_cloud_tests: Mark tests to be ignored when running with env set to cloud_tests"
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"tox"
],
name="marqo",
version="1.1.1",
version="1.2.0",
author="marqo org",
author_email="org@marqo.io",
description="Tensor search for humans",
Expand Down
6 changes: 4 additions & 2 deletions src/marqo/_httprequests.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ def _operation(self, method: HTTP_OPERATIONS) -> Callable:

def _construct_path(self, path: str, index_name="") -> str:
"""Augment the URL request path based if telemetry is required."""
url = f"{self.config.get_url(index_name=index_name)}/{path}"
base_url = self.config.instance_mapping.get_index_base_url(index_name=index_name) if index_name \
else self.config.instance_mapping.get_control_base_url()
url = f"{base_url}/{path}"
if self.config.use_telemetry:
delimeter= "?" if "?" not in f"{self.config.url}/{path}" else "&"
delimeter= "?" if "?" not in f"{base_url}/{path}" else "&"
return url + f"{delimeter}telemetry=True"
return url

Expand Down
139 changes: 81 additions & 58 deletions src/marqo/client.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
import base64
import os
from typing import Any, Dict, List, Optional, Union

from pydantic import error_wrappers
from requests.exceptions import RequestException
from typing_extensions import deprecated

from marqo.cloud_helpers import cloud_wait_for_index_status
from marqo.default_instance_mappings import DefaultInstanceMappings
from marqo.index import Index
from marqo.config import Config
from marqo.instance_mappings import InstanceMappings
from marqo.marqo_cloud_instance_mappings import MarqoCloudInstanceMappings
from marqo.models import BulkSearchBody, BulkSearchQuery
from marqo._httprequests import HttpRequests
from marqo import utils, enums
from marqo import errors
from marqo.version import minimum_supported_marqo_version
from marqo.marqo_logging import mq_logger
from marqo.errors import MarqoWebError
# we want to avoid name conflicts with marqo.version
from packaging import version as versioning_helpers
from json import JSONDecodeError

# A dictionary to cache the marqo url and version for compatibility check
marqo_url_and_version_cache = {}


class Client:
Expand All @@ -30,7 +33,8 @@ class Client:
"""

def __init__(
self, url: str = "http://localhost:8882",
self, url: Optional[str] = "http://localhost:8882",
instance_mappings: Optional[InstanceMappings] = None,
main_user: str = None, main_password: str = None,
return_telemetry: bool = False,
api_key: str = None
Expand All @@ -39,17 +43,31 @@ def __init__(
Parameters
----------
url:
The url to the S2Search API (ex: http://localhost:8882)
The url to the Marqo API (ex: http://localhost:8882) If MARQO_CLOUD_URL environment variable is set, when
matching url is passed, the client will use the Marqo Cloud instance mappings.
instance_mappings:
An instance of InstanceMappings that maps index names to urls
api_key:
The api key to use for authentication with the Marqo API
"""
self.main_user = main_user
self.main_password = main_password
if (main_user is not None) and (main_password is not None):
self.url = utils.construct_authorized_url(url_base=url, username=main_user, password=main_password)
else:
self.url = url
self.config = Config(self.url, use_telemetry=return_telemetry, api_key=api_key)
if url is not None and instance_mappings is not None:
raise ValueError("Cannot specify both url and instance_mappings")

is_marqo_cloud = False
if url is not None:
if url.lower().startswith(os.environ.get("MARQO_CLOUD_URL", "https://api.marqo.ai")):
instance_mappings = MarqoCloudInstanceMappings(control_base_url=url, api_key=api_key)
is_marqo_cloud = True
else:
instance_mappings = DefaultInstanceMappings(url, main_user, main_password)

self.config = Config(
instance_mappings=instance_mappings,
is_marqo_cloud=is_marqo_cloud,
use_telemetry=return_telemetry,
api_key=api_key
)
self.http = HttpRequests(self.config)
self._marqo_minimum_supported_version_check()

def create_index(
self, index_name: str,
Expand Down Expand Up @@ -108,6 +126,7 @@ def delete_index(self, index_name: str) -> Dict[str, Any]:
"""
try:
res = self.http.delete(path=f"indexes/{index_name}")
cloud_wait_for_index_status(self.http, index_name, enums.IndexStatus.DELETED)
except errors.MarqoWebError as e:
return e.message

Expand Down Expand Up @@ -161,15 +180,6 @@ def get_indexes(self) -> Dict[str, List[Index]]:
]
return response

def enrich(self, documents: List[Dict], enrichment: Dict, device: str = None, ):
"""Enrich documents"""
translated = utils.translate_device_string_for_url(device)
response = self.http.post(path=f'enrichment?device={translated}', body={
"documents": documents,
"enrichment": enrichment
})
return response

def bulk_search(self, queries: List[Dict[str, Any]], device: Optional[str] = None) -> Dict[str, Any]:
try:
parsed_queries = [BulkSearchBody(**q) for q in queries]
Expand All @@ -188,13 +198,24 @@ def _base64url_encode(
) -> str:
return base64.urlsafe_b64encode(data).decode('utf-8').replace('=', '')

@deprecated(
"This method is deprecated and will be removed in Marqo 2.0.0. "
"Please use `mq.index(index_name).get_marqo()` instead. "
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details."
)
def get_marqo(self):
if self.config.is_marqo_cloud:
self.raise_error_for_cloud("get_marqo")
return self.http.get(path="")

@deprecated(
"This method is deprecated and will be removed in Marqo 2.0.0. "
"Please use `mq.index(index_name).health()` instead. "
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details."
)
def health(self):
mq_logger.warning('The `client.health()` API has been deprecated and will be removed in '
'Marqo 2.0.0. Use `client.index(index_name).health()` instead. '
'Check `https://docs.marqo.ai/latest/API-Reference/indexes/` for more details.')
if self.config.is_marqo_cloud:
self.raise_error_for_cloud("health")
try:
return self.http.get(path="health")
except (MarqoWebError, RequestException, TypeError, KeyError) as e:
Expand All @@ -204,48 +225,50 @@ def health(self):
"Marqo 2.0.0. Please Use `client.index('your-index-name').health()` instead. "
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details.")

@deprecated(
"This method is deprecated and will be removed in Marqo 2.0.0. "
"Please use 'mq.index(index_name).eject_model() instead. "
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details."
)
def eject_model(self, model_name: str, model_device: str):
if self.config.is_marqo_cloud:
self.raise_error_for_cloud("eject_model")
return self.http.delete(path=f"models?model_name={model_name}&model_device={model_device}")

@deprecated(
"This method is deprecated and will be removed in Marqo 2.0.0. "
"Please use 'mq.index(index_name).get_loaded_models() instead. "
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details."
)
def get_loaded_models(self):
if self.config.is_marqo_cloud:
self.raise_error_for_cloud("get_loaded_models")
return self.http.get(path="models")

@deprecated(
"This method is deprecated and will be removed in Marqo 2.0.0. "
"Please use 'mq.index(index_name).get_cuda_info() instead. "
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details."
)
def get_cuda_info(self):
if self.config.is_marqo_cloud:
self.raise_error_for_cloud("get_cuda_info")
return self.http.get(path="device/cuda")

@deprecated(
"This method is deprecated and will be removed in Marqo 2.0.0. "
"Please use 'mq.index(index_name).get_cpu_info() instead. "
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details."
)
def get_cpu_info(self):
if self.config.is_marqo_cloud:
self.raise_error_for_cloud("get_cpu_info")
return self.http.get(path="device/cpu")

def _marqo_minimum_supported_version_check(self):
min_ver = minimum_supported_marqo_version()
skip_warning_message = (
f"Marqo encountered a problem trying to check the Marqo version found at `{self.url}`. "
f"The minimum supported Marqo version for this client is {min_ver}. "
f"If you are sure your Marqo version is compatible with this client, you can ignore this message. ")

# Skip the check if the url is previously labelled as "_skipped"
if self.url in marqo_url_and_version_cache and marqo_url_and_version_cache[self.url] == "_skipped":
mq_logger.warning(skip_warning_message)
return

# Skip the check for Marqo CloudV2 APIs right now
skip_version_check_url = ["https://api.marqo.ai", "https://cloud.marqo.ai"]
if self.url in skip_version_check_url:
marqo_url_and_version_cache[self.url] = "_skipped"
mq_logger.warning(skip_warning_message)
return

# Do version check
try:
if self.url not in marqo_url_and_version_cache:
marqo_url_and_version_cache[self.url] = self.get_marqo()["version"]
marqo_version = marqo_url_and_version_cache[self.url]
if versioning_helpers.parse(marqo_version) < versioning_helpers.parse(min_ver):
mq_logger.warning(f"Your Marqo Python client requires a minimum Marqo version of "
f"{minimum_supported_marqo_version()} to function properly, but your Marqo version is {marqo_version}. "
f"Please upgrade your Marqo instance to avoid potential errors. "
f"If you have already changed your Marqo instance but still get this warning, please restart your Marqo client Python interpreter.")
except (MarqoWebError, RequestException, TypeError, KeyError) as e:
mq_logger.warning(skip_warning_message)
marqo_url_and_version_cache[self.url] = "_skipped"
return
@staticmethod
def raise_error_for_cloud(function_name: str = None):
raise errors.BadRequestError(
f"The `mq.{function_name}()` API is not supported on Marqo Cloud. "
f"Please Use `mq.index('your-index-name').{function_name}()` instead. "
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details.")

23 changes: 23 additions & 0 deletions src/marqo/cloud_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import time

from marqo.marqo_logging import mq_logger
from marqo._httprequests import HttpRequests
from marqo.enums import IndexStatus


def cloud_wait_for_index_status(req: HttpRequests, index_name: str, status: IndexStatus):
""" Wait for index to achieve some status on Marqo Cloud by checking
it's status every 10 seconds until it becomes expected value
Args:
req (HttpRequests): HttpRequests object
index_name (str): name of the index
status (IndexStatus): expected status of the index
"""
current_status = req.get(f"indexes/{index_name}/status")
while current_status['index_status'] != status:
time.sleep(10)
current_status = req.get(f"indexes/{index_name}/status")
mq_logger.info(f"Current index status: {current_status['index_status']}")
mq_logger.info(f"Index achieved status {status} successfully")
return True
58 changes: 13 additions & 45 deletions src/marqo/config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from typing import Optional, Union
from marqo import enums, utils
import urllib3
import warnings
from marqo.marqo_url_resolver import MarqoUrlResolver
from typing import Optional

from marqo.instance_mappings import InstanceMappings


class Config:
Expand All @@ -11,53 +9,23 @@ class Config:
"""

def __init__(
self,
url: str,
use_telemetry: bool = False,
timeout: Optional[int] = None,
api_key: str = None
self,
instance_mappings: Optional[InstanceMappings] = None,
is_marqo_cloud: bool = False,
use_telemetry: bool = False,
timeout: Optional[int] = None,
api_key: str = None
) -> None:
"""
Parameters
----------
url:
The url to the Marqo instance (ex: http://localhost:8882)
"""
self.cluster_is_remote = False
self.cluster_is_s2search = False
self.cluster_is_marqo = False
self.marqo_url_resolver = None
self.api_key = api_key
self.url = self.set_url(url)
self.instance_mapping = instance_mappings
self.is_marqo_cloud = is_marqo_cloud
self.use_telemetry = use_telemetry
self.timeout = timeout
self.api_key = api_key
# suppress warnings until we figure out the dependency issues:
# warnings.filterwarnings("ignore")
self.use_telemetry = use_telemetry

def set_url(self, url):
"""Set the URL, and infers whether that url is remote"""
lowered_url = url.lower()
local_host_markers = ["localhost", "0.0.0.0", "127.0.0.1"]
if any([marker in lowered_url for marker in local_host_markers]):
# urllib3.disable_warnings()
self.cluster_is_remote = False
else:
# warnings.resetwarnings()
self.cluster_is_remote = True
if "s2search.io" in lowered_url:
self.cluster_is_s2search = True
if "api.marqo.ai" in lowered_url:
self.cluster_is_marqo = True
self.marqo_url_resolver = MarqoUrlResolver(api_key=self.api_key, expiration_time=15)
self.url = url
return self.url

def get_url(self, index_name=None,):
"""Get the URL, and infers whether that url is marqo cloud,
and if it is targeting a specific index resolves the index-specific url"""
if not self.cluster_is_marqo:
return self.url
if self.cluster_is_marqo and not index_name:
return self.url + "/api"
# calls resolver to get index-specific url for when cluster is marqo and index_name is not None
return self.marqo_url_resolver[index_name]
27 changes: 27 additions & 0 deletions src/marqo/default_instance_mappings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from typing import Optional

from marqo import utils
from marqo.instance_mappings import InstanceMappings


class DefaultInstanceMappings(InstanceMappings):
def __init__(self, url: str, main_user: str = None, main_password: str = None):
self._url = url.lower()

if main_user is not None and main_password is not None:
self._url = utils.construct_authorized_url(self._url, main_user, main_password)

local_host_markers = ["localhost", "0.0.0.0", "127.0.0.1"]
if any([marker in self._url for marker in local_host_markers]):
self._is_remote = False
else:
self._is_remote = True

def get_index_base_url(self, index_name: str) -> str:
return self._url

def get_control_base_url(self) -> str:
return self._url

def is_remote(self):
return self._is_remote
4 changes: 4 additions & 0 deletions src/marqo/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ class Devices:
cpu = "cpu"
cuda = "cuda"


class IndexStatus:
CREATED = "READY"
DELETED = "DELETED"
4 changes: 4 additions & 0 deletions src/marqo/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ def __init__(self, message: str):
error_type = "invalid_request"


class UnsupportedOperationError(__InvalidRequestError):
code = "unsupported_operation"


class IndexAlreadyExistsError(__InvalidRequestError):
code = "index_already_exists"
status_code = HTTPStatus.CONFLICT
Expand Down
Loading

0 comments on commit 8b7e172

Please sign in to comment.