Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce instance mappings #132

Merged
merged 31 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
8c08fed
introduce instance mappings
danyilq Jul 31, 2023
ead493b
Implement InstanceMappings and default and cloud implementations
farshidz Jul 31, 2023
97489cd
Remove index name from control url method
farshidz Jul 31, 2023
705082e
Update httprequests module to use InstanceMappings
farshidz Jul 31, 2023
24a06d1
Fix class name
farshidz Jul 31, 2023
e83f0fe
fix minimum version check and add cloud error
danyilq Aug 1, 2023
9fca70b
delete enrich and improve error message
danyilq Aug 1, 2023
661d440
fix existing and add new tests
danyilq Aug 1, 2023
8ee578f
Add docstring for InstanceMappings
farshidz Aug 2, 2023
c26885c
move version_check and refactor
danyilq Aug 2, 2023
a4d93e2
refactor tests to work with cloud
danyilq Aug 2, 2023
acc162e
add readiness check for index that we want to clean documents in
danyilq Aug 2, 2023
fb766fa
unsupported operation + change hardcoded url for refresh_urls
danyilq Aug 3, 2023
e7f514d
adapt tests behaviour for cloud and partially refactor code
danyilq Aug 3, 2023
7b2491c
post debug changes to tests and new run env in tox
danyilq Aug 6, 2023
c6f57cd
update logging for wait_for_index_status
danyilq Aug 7, 2023
76b7b8f
improve testing
danyilq Aug 7, 2023
e99c7af
use http instead of client
danyilq Aug 7, 2023
5940880
Merge branch 'mainline' into danyil/instance-mappings
danyilq Aug 7, 2023
6f47bc5
follow up on all of the comments
danyilq Aug 7, 2023
a24d93e
Merge remote-tracking branch 'origin/danyil/instance-mappings' into d…
danyilq Aug 7, 2023
4e4d91a
final changes
danyilq Aug 8, 2023
32fc98a
improve test
danyilq Aug 8, 2023
b56e53e
consistent hash
danyilq Aug 8, 2023
89c1fb8
consistent hash
danyilq Aug 8, 2023
0472dff
better namings
danyilq Aug 8, 2023
a711a95
bump pymarqo version
danyilq Aug 8, 2023
836b4b7
adapt to 32 characters index name limit
danyilq Aug 8, 2023
e7b381b
reduce test_demos.py index name length
danyilq Aug 8, 2023
53db2d3
raise error if index_name is too long
danyilq Aug 8, 2023
2be58c2
update version
danyilq Aug 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[build-system]
requires = ["setuptools>=42"]
build-backend = "setuptools.build_meta"
build-backend = "setuptools.build_meta"

[tool.pytest.ini_options]
markers = "ignore_during_cloud_tests: Mark tests to be ignored when running with env set to cloud_tests"
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"tox"
],
name="marqo",
version="1.1.1",
version="1.1.2",
author="marqo org",
author_email="org@marqo.io",
description="Tensor search for humans",
Expand Down
6 changes: 4 additions & 2 deletions src/marqo/_httprequests.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ def _operation(self, method: HTTP_OPERATIONS) -> Callable:

def _construct_path(self, path: str, index_name="") -> str:
"""Augment the URL request path based if telemetry is required."""
url = f"{self.config.get_url(index_name=index_name)}/{path}"
base_url = self.config.instance_mapping.get_index_base_url(index_name=index_name) if index_name \
else self.config.instance_mapping.get_control_base_url()
url = f"{base_url}/{path}"
if self.config.use_telemetry:
delimeter= "?" if "?" not in f"{self.config.url}/{path}" else "&"
delimeter= "?" if "?" not in f"{base_url}/{path}" else "&"
return url + f"{delimeter}telemetry=True"
return url

Expand Down
139 changes: 81 additions & 58 deletions src/marqo/client.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
import base64
import os
from typing import Any, Dict, List, Optional, Union

from pydantic import error_wrappers
from requests.exceptions import RequestException
from typing_extensions import deprecated

from marqo.cloud_helpers import cloud_wait_for_index_status
from marqo.default_instance_mappings import DefaultInstanceMappings
from marqo.index import Index
from marqo.config import Config
from marqo.instance_mappings import InstanceMappings
from marqo.marqo_cloud_instance_mappings import MarqoCloudInstanceMappings
from marqo.models import BulkSearchBody, BulkSearchQuery
from marqo._httprequests import HttpRequests
from marqo import utils, enums
from marqo import errors
from marqo.version import minimum_supported_marqo_version
from marqo.marqo_logging import mq_logger
from marqo.errors import MarqoWebError
# we want to avoid name conflicts with marqo.version
from packaging import version as versioning_helpers
from json import JSONDecodeError

# A dictionary to cache the marqo url and version for compatibility check
marqo_url_and_version_cache = {}


class Client:
Expand All @@ -30,7 +33,8 @@ class Client:
"""

def __init__(
self, url: str = "http://localhost:8882",
self, url: Optional[str] = "http://localhost:8882",
instance_mappings: Optional[InstanceMappings] = None,
main_user: str = None, main_password: str = None,
return_telemetry: bool = False,
api_key: str = None
Expand All @@ -39,17 +43,31 @@ def __init__(
Parameters
----------
url:
The url to the S2Search API (ex: http://localhost:8882)
The url to the Marqo API (ex: http://localhost:8882) If MARQO_CLOUD_URL environment variable is set, when
matching url is passed, the client will use the Marqo Cloud instance mappings.
instance_mappings:
An instance of InstanceMappings that maps index names to urls
api_key:
The api key to use for authentication with the Marqo API
"""
self.main_user = main_user
self.main_password = main_password
if (main_user is not None) and (main_password is not None):
self.url = utils.construct_authorized_url(url_base=url, username=main_user, password=main_password)
else:
self.url = url
self.config = Config(self.url, use_telemetry=return_telemetry, api_key=api_key)
if url is not None and instance_mappings is not None:
raise ValueError("Cannot specify both url and instance_mappings")

is_marqo_cloud = False
if url is not None:
if url.lower().startswith(os.environ.get("MARQO_CLOUD_URL", "https://api.marqo.ai")):
instance_mappings = MarqoCloudInstanceMappings(control_base_url=url, api_key=api_key)
is_marqo_cloud = True
else:
instance_mappings = DefaultInstanceMappings(url, main_user, main_password)
danyilq marked this conversation as resolved.
Show resolved Hide resolved

self.config = Config(
instance_mappings=instance_mappings,
is_marqo_cloud=is_marqo_cloud,
use_telemetry=return_telemetry,
api_key=api_key
)
self.http = HttpRequests(self.config)
self._marqo_minimum_supported_version_check()

def create_index(
self, index_name: str,
Expand Down Expand Up @@ -108,6 +126,7 @@ def delete_index(self, index_name: str) -> Dict[str, Any]:
"""
try:
res = self.http.delete(path=f"indexes/{index_name}")
cloud_wait_for_index_status(self.http, index_name, enums.IndexStatus.DELETED)
except errors.MarqoWebError as e:
return e.message

Expand Down Expand Up @@ -161,15 +180,6 @@ def get_indexes(self) -> Dict[str, List[Index]]:
]
return response

def enrich(self, documents: List[Dict], enrichment: Dict, device: str = None, ):
"""Enrich documents"""
translated = utils.translate_device_string_for_url(device)
response = self.http.post(path=f'enrichment?device={translated}', body={
"documents": documents,
"enrichment": enrichment
})
return response

def bulk_search(self, queries: List[Dict[str, Any]], device: Optional[str] = None) -> Dict[str, Any]:
try:
parsed_queries = [BulkSearchBody(**q) for q in queries]
Expand All @@ -188,13 +198,24 @@ def _base64url_encode(
) -> str:
return base64.urlsafe_b64encode(data).decode('utf-8').replace('=', '')

@deprecated(
"This method is deprecated and will be removed in Marqo 2.0.0. "
"Please use `mq.index(index_name).get_marqo()` instead. "
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details."
)
def get_marqo(self):
if self.config.is_marqo_cloud:
self.raise_error_for_cloud("get_marqo")
return self.http.get(path="")

@deprecated(
"This method is deprecated and will be removed in Marqo 2.0.0. "
"Please use `mq.index(index_name).health()` instead. "
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details."
)
def health(self):
mq_logger.warning('The `client.health()` API has been deprecated and will be removed in '
'Marqo 2.0.0. Use `client.index(index_name).health()` instead. '
'Check `https://docs.marqo.ai/latest/API-Reference/indexes/` for more details.')
if self.config.is_marqo_cloud:
self.raise_error_for_cloud("health")
try:
return self.http.get(path="health")
except (MarqoWebError, RequestException, TypeError, KeyError) as e:
Expand All @@ -204,48 +225,50 @@ def health(self):
"Marqo 2.0.0. Please Use `client.index('your-index-name').health()` instead. "
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details.")

@deprecated(
"This method is deprecated and will be removed in Marqo 2.0.0. "
"Please use 'mq.index(index_name).eject_model() instead. "
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details."
)
def eject_model(self, model_name: str, model_device: str):
if self.config.is_marqo_cloud:
self.raise_error_for_cloud("eject_model")
return self.http.delete(path=f"models?model_name={model_name}&model_device={model_device}")

@deprecated(
"This method is deprecated and will be removed in Marqo 2.0.0. "
"Please use 'mq.index(index_name).get_loaded_models() instead. "
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details."
)
def get_loaded_models(self):
if self.config.is_marqo_cloud:
self.raise_error_for_cloud("get_loaded_models")
return self.http.get(path="models")

@deprecated(
danyilq marked this conversation as resolved.
Show resolved Hide resolved
"This method is deprecated and will be removed in Marqo 2.0.0. "
"Please use 'mq.index(index_name).get_cuda_info() instead. "
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details."
)
def get_cuda_info(self):
if self.config.is_marqo_cloud:
self.raise_error_for_cloud("get_cuda_info")
return self.http.get(path="device/cuda")

@deprecated(
"This method is deprecated and will be removed in Marqo 2.0.0. "
"Please use 'mq.index(index_name).get_cpu_info() instead. "
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details."
)
def get_cpu_info(self):
if self.config.is_marqo_cloud:
self.raise_error_for_cloud("get_cpu_info")
return self.http.get(path="device/cpu")

def _marqo_minimum_supported_version_check(self):
min_ver = minimum_supported_marqo_version()
skip_warning_message = (
f"Marqo encountered a problem trying to check the Marqo version found at `{self.url}`. "
f"The minimum supported Marqo version for this client is {min_ver}. "
f"If you are sure your Marqo version is compatible with this client, you can ignore this message. ")

# Skip the check if the url is previously labelled as "_skipped"
if self.url in marqo_url_and_version_cache and marqo_url_and_version_cache[self.url] == "_skipped":
mq_logger.warning(skip_warning_message)
return

# Skip the check for Marqo CloudV2 APIs right now
skip_version_check_url = ["https://api.marqo.ai", "https://cloud.marqo.ai"]
if self.url in skip_version_check_url:
marqo_url_and_version_cache[self.url] = "_skipped"
mq_logger.warning(skip_warning_message)
return

# Do version check
try:
if self.url not in marqo_url_and_version_cache:
marqo_url_and_version_cache[self.url] = self.get_marqo()["version"]
marqo_version = marqo_url_and_version_cache[self.url]
if versioning_helpers.parse(marqo_version) < versioning_helpers.parse(min_ver):
mq_logger.warning(f"Your Marqo Python client requires a minimum Marqo version of "
f"{minimum_supported_marqo_version()} to function properly, but your Marqo version is {marqo_version}. "
f"Please upgrade your Marqo instance to avoid potential errors. "
f"If you have already changed your Marqo instance but still get this warning, please restart your Marqo client Python interpreter.")
except (MarqoWebError, RequestException, TypeError, KeyError) as e:
mq_logger.warning(skip_warning_message)
marqo_url_and_version_cache[self.url] = "_skipped"
return
@staticmethod
def raise_error_for_cloud(function_name: str = None):
raise errors.BadRequestError(
f"The `mq.{function_name}()` API is not supported on Marqo Cloud. "
f"Please Use `mq.index('your-index-name').{function_name}()` instead. "
"Check `https://docs.marqo.ai/1.1.0/API-Reference/indexes/` for more details.")

23 changes: 23 additions & 0 deletions src/marqo/cloud_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import time

from marqo.marqo_logging import mq_logger
from marqo._httprequests import HttpRequests
from marqo.enums import IndexStatus


def cloud_wait_for_index_status(req: HttpRequests, index_name: str, status: IndexStatus):
""" Wait for index to achieve some status on Marqo Cloud by checking
it's status every 10 seconds until it becomes expected value

Args:
req (HttpRequests): HttpRequests object
index_name (str): name of the index
status (IndexStatus): expected status of the index
"""
current_status = req.get(f"indexes/{index_name}/status")
while current_status['index_status'] != status:
time.sleep(10)
current_status = req.get(f"indexes/{index_name}/status")
mq_logger.info(f"Current index status: {current_status['index_status']}")
mq_logger.info(f"Index achieved status {status} successfully")
return True
58 changes: 13 additions & 45 deletions src/marqo/config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from typing import Optional, Union
from marqo import enums, utils
import urllib3
import warnings
from marqo.marqo_url_resolver import MarqoUrlResolver
from typing import Optional

from marqo.instance_mappings import InstanceMappings


class Config:
Expand All @@ -11,53 +9,23 @@ class Config:
"""

def __init__(
self,
url: str,
use_telemetry: bool = False,
timeout: Optional[int] = None,
api_key: str = None
self,
instance_mappings: Optional[InstanceMappings] = None,
is_marqo_cloud: bool = False,
use_telemetry: bool = False,
timeout: Optional[int] = None,
api_key: str = None
) -> None:
"""
Parameters
----------
url:
The url to the Marqo instance (ex: http://localhost:8882)
"""
self.cluster_is_remote = False
self.cluster_is_s2search = False
self.cluster_is_marqo = False
self.marqo_url_resolver = None
self.api_key = api_key
self.url = self.set_url(url)
self.instance_mapping = instance_mappings
self.is_marqo_cloud = is_marqo_cloud
self.use_telemetry = use_telemetry
self.timeout = timeout
self.api_key = api_key
# suppress warnings until we figure out the dependency issues:
# warnings.filterwarnings("ignore")
self.use_telemetry = use_telemetry

def set_url(self, url):
"""Set the URL, and infers whether that url is remote"""
lowered_url = url.lower()
local_host_markers = ["localhost", "0.0.0.0", "127.0.0.1"]
if any([marker in lowered_url for marker in local_host_markers]):
# urllib3.disable_warnings()
self.cluster_is_remote = False
else:
# warnings.resetwarnings()
self.cluster_is_remote = True
if "s2search.io" in lowered_url:
self.cluster_is_s2search = True
if "api.marqo.ai" in lowered_url:
self.cluster_is_marqo = True
self.marqo_url_resolver = MarqoUrlResolver(api_key=self.api_key, expiration_time=15)
self.url = url
return self.url

def get_url(self, index_name=None,):
"""Get the URL, and infers whether that url is marqo cloud,
and if it is targeting a specific index resolves the index-specific url"""
if not self.cluster_is_marqo:
return self.url
if self.cluster_is_marqo and not index_name:
return self.url + "/api"
# calls resolver to get index-specific url for when cluster is marqo and index_name is not None
return self.marqo_url_resolver[index_name]
27 changes: 27 additions & 0 deletions src/marqo/default_instance_mappings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from typing import Optional

from marqo import utils
from marqo.instance_mappings import InstanceMappings


class DefaultInstanceMappings(InstanceMappings):
def __init__(self, url: str, main_user: str = None, main_password: str = None):
self._url = url.lower()

if main_user is not None and main_password is not None:
self._url = utils.construct_authorized_url(self._url, main_user, main_password)

local_host_markers = ["localhost", "0.0.0.0", "127.0.0.1"]
if any([marker in self._url for marker in local_host_markers]):
self._is_remote = False
else:
self._is_remote = True

def get_index_base_url(self, index_name: str) -> str:
return self._url

def get_control_base_url(self) -> str:
return self._url

def is_remote(self):
return self._is_remote
4 changes: 4 additions & 0 deletions src/marqo/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ class Devices:
cpu = "cpu"
cuda = "cuda"


class IndexStatus:
CREATED = "READY"
DELETED = "DELETED"
4 changes: 4 additions & 0 deletions src/marqo/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ def __init__(self, message: str):
error_type = "invalid_request"


class UnsupportedOperationError(__InvalidRequestError):
code = "unsupported_operation"


class IndexAlreadyExistsError(__InvalidRequestError):
code = "index_already_exists"
status_code = HTTPStatus.CONFLICT
Expand Down
Loading
Loading