From 817f98ef70a25aeb65761731065ffe2d42d6c31b Mon Sep 17 00:00:00 2001 From: Alibi Zhenis Date: Thu, 18 May 2023 10:56:01 +0500 Subject: [PATCH 1/7] adding executeAPI Signed-off-by: Alibi Zhenis --- .../ml_commons/ml_commons_client.py | 17 +++ opensearch_py_ml/ml_commons/model_execute.py | 104 ++++++++++++++++++ tests/ml_commons/test_ml_commons_client.py | 31 +++++- 3 files changed, 151 insertions(+), 1 deletion(-) create mode 100644 opensearch_py_ml/ml_commons/model_execute.py diff --git a/opensearch_py_ml/ml_commons/ml_commons_client.py b/opensearch_py_ml/ml_commons/ml_commons_client.py index 850f21d6..543eda95 100644 --- a/opensearch_py_ml/ml_commons/ml_commons_client.py +++ b/opensearch_py_ml/ml_commons/ml_commons_client.py @@ -12,6 +12,7 @@ from opensearchpy import OpenSearch from opensearch_py_ml.ml_commons.ml_common_utils import ML_BASE_URI, TIMEOUT +from opensearch_py_ml.ml_commons.model_execute import ModelExecute from opensearch_py_ml.ml_commons.model_uploader import ModelUploader @@ -24,6 +25,22 @@ class MLCommonClient: def __init__(self, os_client: OpenSearch): self._client = os_client self._model_uploader = ModelUploader(os_client) + self._model_execute = ModelExecute(os_client) + + def execute(self, algorithm_name: str, input_json: dict) -> dict: + """ + This method executes ML algorithms that can be only executed directly (i.e. do not support train and + predict APIs), like anomaly localization and metrics correlation. The algorithm has to be supported by ML Commons. + Refer to https://opensearch.org/docs/2.7/ml-commons-plugin/api/#execute + + :param algorithm_name: Name of the algorithm + :type algorithm_name: string + :param input_json: Dictionary of parameters + :type input_json: dict + :return: returns the API response + :rtype: dict + """ + return self._model_execute._execute(algorithm_name, input_json) def upload_model( self, diff --git a/opensearch_py_ml/ml_commons/model_execute.py b/opensearch_py_ml/ml_commons/model_execute.py new file mode 100644 index 00000000..f5035ff9 --- /dev/null +++ b/opensearch_py_ml/ml_commons/model_execute.py @@ -0,0 +1,104 @@ +# SPDX-License-Identifier: Apache-2.0 +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# Any modifications Copyright OpenSearch Contributors. See +# GitHub history for details. + +from opensearchpy import OpenSearch + +from opensearch_py_ml.ml_commons.ml_common_utils import ML_BASE_URI + + +class ModelExecute: + """ + Class for executing algorithms using ML Commons execute API. + """ + + API_ENDPOINT = "models/_execute" + + def __init__(self, os_client: OpenSearch): + self._client = os_client + + def _execute(self, algorithm_name: str, input_json: dict) -> dict: + """ + This method executes ML algorithms that can be only executed directly (i.e. do not support train and + predict APIs), like anomaly localization and metrics correlation. The algorithm has to be supported by ML Commons. + Refer to https://opensearch.org/docs/2.7/ml-commons-plugin/api/#execute + + :param algorithm_name: Name of the algorithm + :type algorithm_name: string + :param input_json: Dictionary of parameters + :type input_json: dict + :return: returns the API response + :rtype: dict + """ + + if algorithm_name.lower() not in [ + "anomaly_localization", + "metrics_correlation", + ]: + raise ValueError("Algorithm must be supported by ML Commons") + + if algorithm_name.upper() == "METRICS_CORRELATION": + if self._validate_json_mcorr(input_json=input_json): + return self._client.transport.perform_request( + method="POST", + url=f"{ML_BASE_URI}/_execute/METRICS_CORRELATION", + body=input_json, + ) + elif algorithm_name.lower() == "anomaly_localization": + if self._validate_json_localization(input_json=input_json): + return self._client.transport.perform_request( + method="POST", + url=f"{ML_BASE_URI}/_execute/anomaly_localization", + body=input_json, + ) + + def _validate_json_localization(self, input_json: dict) -> bool: + mandatory_fields = [ + "index_name", + "attribute_field_names", + "aggregations", + "time_field_name", + "start_time", + "end_time", + "min_time_interval", + "num_outputs", + ] + optional_fields = ["filter_query", "anomaly_star"] + + for key in input_json: + if key not in mandatory_fields and key not in optional_fields: + raise ValueError(f"Parameter {key} is not supported") + + for mand_key in mandatory_fields: + if not input_json.get(mand_key): + raise ValueError(f"{mand_key} can not be empty") + + return True + + def _validate_json_mcorr(self, input_json: dict) -> bool: + for key in input_json: + if key != "metrics": + raise ValueError(f"Parameter {key} is not supported") + + if "metrics" not in input_json: + raise ValueError("Metrics field is missing") + if len(input_json["metrics"]) == 0: + raise ValueError("metrics parameter can't be empty") + + arr = input_json["metrics"] + num_timesteps = len(arr[0]) + for row in arr: + if len(row) != num_timesteps: + raise ValueError( + "All metrics need to have an equal amount of time steps" + ) + + if num_timesteps * len(arr) > 10000: + raise ValueError( + "The total number of data points must not be higher than 10000" + ) + + return True diff --git a/tests/ml_commons/test_ml_commons_client.py b/tests/ml_commons/test_ml_commons_client.py index 431a72d0..a8934a03 100644 --- a/tests/ml_commons/test_ml_commons_client.py +++ b/tests/ml_commons/test_ml_commons_client.py @@ -34,7 +34,9 @@ MODEL_CONFIG_FILE_NAME = "ml-commons_model_config.json" TEST_FOLDER = os.path.join(PROJECT_DIR, "test_model_files") -TESTDATA_SYNTHETIC_QUERY_ZIP = os.path.join(PROJECT_DIR, "..", "synthetic_queries.zip") +TESTDATA_SYNTHETIC_QUERY_ZIP = os.path.join( + PROJECT_DIR, "../../../../..", "synthetic_queries.zip" +) MODEL_PATH = os.path.join(TEST_FOLDER, MODEL_FILE_ZIP_NAME) MODEL_CONFIG_FILE_PATH = os.path.join(TEST_FOLDER, MODEL_CONFIG_FILE_NAME) @@ -73,6 +75,33 @@ def test_init(): assert type(ml_client._model_uploader) == ModelUploader +def test_execute(): + raised = False + try: + input_json = { + "index_name": "rca-index", + "attribute_field_names": ["attribute"], + "aggregations": [{"sum": {"sum": {"field": "value"}}}], + "time_field_name": "timestamp", + "start_time": 1620630000000, + "end_time": 1621234800000, + "min_time_interval": 86400000, + "num_outputs": 10, + } + ml_client.execute(algorithm_name="anomaly_localization", input_json=input_json) + except: # noqa: E722 + raised = True + assert raised == False, "Raised Exception during anomaly localization execution" + + raised = False + try: + input_json = {"metrics": [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11]]} + ml_client.execute(algorithm_name="METRICS_CORRELATION", input_json=input_json) + except: # noqa: E722 + raised = True + assert raised == False, "Raised Exception during metrics correlation execution" + + def test_integration_pretrained_model_upload_unload_delete(): raised = False try: From 9ce6338d317f97024a17f95a5d661902f6df33f3 Mon Sep 17 00:00:00 2001 From: Alibi Zhenis Date: Thu, 18 May 2023 10:58:31 +0500 Subject: [PATCH 2/7] updating documentation Signed-off-by: Alibi Zhenis --- docs/source/reference/api/ml_commons_execute_api.rst | 6 ++++++ docs/source/reference/mlcommons.rst | 6 ++++++ 2 files changed, 12 insertions(+) create mode 100644 docs/source/reference/api/ml_commons_execute_api.rst diff --git a/docs/source/reference/api/ml_commons_execute_api.rst b/docs/source/reference/api/ml_commons_execute_api.rst new file mode 100644 index 00000000..bea00e05 --- /dev/null +++ b/docs/source/reference/api/ml_commons_execute_api.rst @@ -0,0 +1,6 @@ +Execute +================== + +.. currentmodule:: opensearch_py_ml + +.. autofunction:: opensearch_py_ml.ml_commons.MLCommonClient.execute diff --git a/docs/source/reference/mlcommons.rst b/docs/source/reference/mlcommons.rst index 258dc853..1d8bc465 100644 --- a/docs/source/reference/mlcommons.rst +++ b/docs/source/reference/mlcommons.rst @@ -68,3 +68,9 @@ Delete Task api/ml_commons_delete_task_api +Execute +~~~~~~~~~~~~~~~~~ +.. toctree:: + :maxdepth: 2 + + api/ml_commons_execute_api From ec6f1eaeabef178f9cd258ecb4b20e8761d854ab Mon Sep 17 00:00:00 2001 From: Alibi Zhenis Date: Sun, 28 May 2023 14:36:09 +0500 Subject: [PATCH 3/7] changing code and test Signed-off-by: Alibi Zhenis --- opensearch_py_ml/ml_commons/model_execute.py | 81 ++++---------------- tests/ml_commons/test_ml_commons_client.py | 25 ++---- 2 files changed, 19 insertions(+), 87 deletions(-) diff --git a/opensearch_py_ml/ml_commons/model_execute.py b/opensearch_py_ml/ml_commons/model_execute.py index f5035ff9..fa0b49fc 100644 --- a/opensearch_py_ml/ml_commons/model_execute.py +++ b/opensearch_py_ml/ml_commons/model_execute.py @@ -5,6 +5,8 @@ # Any modifications Copyright OpenSearch Contributors. See # GitHub history for details. +import json + from opensearchpy import OpenSearch from opensearch_py_ml.ml_commons.ml_common_utils import ML_BASE_URI @@ -23,82 +25,27 @@ def __init__(self, os_client: OpenSearch): def _execute(self, algorithm_name: str, input_json: dict) -> dict: """ This method executes ML algorithms that can be only executed directly (i.e. do not support train and - predict APIs), like anomaly localization and metrics correlation. The algorithm has to be supported by ML Commons. + predict APIs), like anomaly localization and metrics correlation. + The input json must be a dictionary or a deserializable Python object. + The algorithm has to be supported by ML Commons. Refer to https://opensearch.org/docs/2.7/ml-commons-plugin/api/#execute :param algorithm_name: Name of the algorithm :type algorithm_name: string - :param input_json: Dictionary of parameters + :param input_json: Dictionary of parameters or a deserializable string, byte, or bytearray :type input_json: dict :return: returns the API response :rtype: dict """ - if algorithm_name.lower() not in [ - "anomaly_localization", - "metrics_correlation", - ]: - raise ValueError("Algorithm must be supported by ML Commons") - - if algorithm_name.upper() == "METRICS_CORRELATION": - if self._validate_json_mcorr(input_json=input_json): - return self._client.transport.perform_request( - method="POST", - url=f"{ML_BASE_URI}/_execute/METRICS_CORRELATION", - body=input_json, - ) - elif algorithm_name.lower() == "anomaly_localization": - if self._validate_json_localization(input_json=input_json): - return self._client.transport.perform_request( - method="POST", - url=f"{ML_BASE_URI}/_execute/anomaly_localization", - body=input_json, - ) - - def _validate_json_localization(self, input_json: dict) -> bool: - mandatory_fields = [ - "index_name", - "attribute_field_names", - "aggregations", - "time_field_name", - "start_time", - "end_time", - "min_time_interval", - "num_outputs", - ] - optional_fields = ["filter_query", "anomaly_star"] - - for key in input_json: - if key not in mandatory_fields and key not in optional_fields: - raise ValueError(f"Parameter {key} is not supported") - - for mand_key in mandatory_fields: - if not input_json.get(mand_key): - raise ValueError(f"{mand_key} can not be empty") - - return True - - def _validate_json_mcorr(self, input_json: dict) -> bool: - for key in input_json: - if key != "metrics": - raise ValueError(f"Parameter {key} is not supported") - - if "metrics" not in input_json: - raise ValueError("Metrics field is missing") - if len(input_json["metrics"]) == 0: - raise ValueError("metrics parameter can't be empty") - - arr = input_json["metrics"] - num_timesteps = len(arr[0]) - for row in arr: - if len(row) != num_timesteps: - raise ValueError( - "All metrics need to have an equal amount of time steps" - ) - - if num_timesteps * len(arr) > 10000: - raise ValueError( - "The total number of data points must not be higher than 10000" + if self._is_json(input_json): + return self._client.transport.perform_request( + method="POST", + url=f"{ML_BASE_URI}/_execute/{algorithm_name}", + body=input_json, ) + def _is_json(self, input_json: dict) -> bool: + if not isinstance(input_json, dict): + json.loads(input_json) return True diff --git a/tests/ml_commons/test_ml_commons_client.py b/tests/ml_commons/test_ml_commons_client.py index a8934a03..6bed27af 100644 --- a/tests/ml_commons/test_ml_commons_client.py +++ b/tests/ml_commons/test_ml_commons_client.py @@ -78,28 +78,13 @@ def test_init(): def test_execute(): raised = False try: - input_json = { - "index_name": "rca-index", - "attribute_field_names": ["attribute"], - "aggregations": [{"sum": {"sum": {"field": "value"}}}], - "time_field_name": "timestamp", - "start_time": 1620630000000, - "end_time": 1621234800000, - "min_time_interval": 86400000, - "num_outputs": 10, - } - ml_client.execute(algorithm_name="anomaly_localization", input_json=input_json) - except: # noqa: E722 - raised = True - assert raised == False, "Raised Exception during anomaly localization execution" - - raised = False - try: - input_json = {"metrics": [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11]]} - ml_client.execute(algorithm_name="METRICS_CORRELATION", input_json=input_json) + input_json = {"operation": "max", "input_data": [1.0, 2.0, 3.0]} + ml_client.execute( + algorithm_name="local_sample_calculator", input_json=input_json + ) except: # noqa: E722 raised = True - assert raised == False, "Raised Exception during metrics correlation execution" + assert raised == False, "Raised Exception during execute API testing" def test_integration_pretrained_model_upload_unload_delete(): From 25cbdaa85151f75ed9d8f3c7a7bf4af3c982fe2d Mon Sep 17 00:00:00 2001 From: Alibi Zhenis Date: Sun, 28 May 2023 14:41:41 +0500 Subject: [PATCH 4/7] changing code Signed-off-by: Alibi Zhenis --- opensearch_py_ml/ml_commons/model_execute.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/opensearch_py_ml/ml_commons/model_execute.py b/opensearch_py_ml/ml_commons/model_execute.py index fa0b49fc..36bda99d 100644 --- a/opensearch_py_ml/ml_commons/model_execute.py +++ b/opensearch_py_ml/ml_commons/model_execute.py @@ -38,14 +38,11 @@ def _execute(self, algorithm_name: str, input_json: dict) -> dict: :rtype: dict """ - if self._is_json(input_json): - return self._client.transport.perform_request( - method="POST", - url=f"{ML_BASE_URI}/_execute/{algorithm_name}", - body=input_json, - ) - - def _is_json(self, input_json: dict) -> bool: if not isinstance(input_json, dict): - json.loads(input_json) - return True + input_json = json.loads(input_json) + + return self._client.transport.perform_request( + method="POST", + url=f"{ML_BASE_URI}/_execute/{algorithm_name}", + body=input_json, + ) From d05fda76c6750918f6a7071b5a7efbf36f93532b Mon Sep 17 00:00:00 2001 From: Alibi Zhenis Date: Mon, 5 Jun 2023 13:23:27 +0800 Subject: [PATCH 5/7] fixing tests Signed-off-by: Alibi Zhenis --- tests/ml_commons/test_ml_commons_client.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/ml_commons/test_ml_commons_client.py b/tests/ml_commons/test_ml_commons_client.py index df8591e1..406d17b0 100644 --- a/tests/ml_commons/test_ml_commons_client.py +++ b/tests/ml_commons/test_ml_commons_client.py @@ -33,9 +33,7 @@ MODEL_CONFIG_FILE_NAME = "ml-commons_model_config.json" TEST_FOLDER = os.path.join(PROJECT_DIR, "test_model_files") -TESTDATA_SYNTHETIC_QUERY_ZIP = os.path.join( - PROJECT_DIR, "../../../../..", "synthetic_queries.zip" -) +TESTDATA_SYNTHETIC_QUERY_ZIP = os.path.join(PROJECT_DIR, "..", "synthetic_queries.zip") MODEL_PATH = os.path.join(TEST_FOLDER, MODEL_FILE_ZIP_NAME) MODEL_CONFIG_FILE_PATH = os.path.join(TEST_FOLDER, MODEL_CONFIG_FILE_NAME) @@ -395,3 +393,6 @@ def test_integration_model_train_register_full_cycle(): except: # noqa: E722 raised = True assert raised == False, "Raised Exception in deleting model" + + +test_integration_model_train_register_full_cycle() From 0d0300b7bd0b38d77fd23c294b140f19fc0a9aba Mon Sep 17 00:00:00 2001 From: Alibi Zhenis Date: Sat, 10 Jun 2023 12:26:12 +0800 Subject: [PATCH 6/7] changing tests Signed-off-by: Alibi Zhenis --- tests/ml_commons/test_ml_commons_client.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/tests/ml_commons/test_ml_commons_client.py b/tests/ml_commons/test_ml_commons_client.py index 406d17b0..792aa14f 100644 --- a/tests/ml_commons/test_ml_commons_client.py +++ b/tests/ml_commons/test_ml_commons_client.py @@ -81,7 +81,21 @@ def test_execute(): ) except: # noqa: E722 raised = True - assert raised == False, "Raised Exception during execute API testing" + assert ( + raised == False + ), "Raised Exception during execute API testing with dictionary" + + raised = False + try: + input_json = '{"operation": "max", "input_data": [1.0, 2.0, 3.0]}' + ml_client.execute( + algorithm_name="local_sample_calculator", input_json=input_json + ) + except: # noqa: E722 + raised = True + assert ( + raised == False + ), "Raised Exception during execute API testing with JSON string" def test_DEPRECATED_integration_pretrained_model_upload_unload_delete(): From 3f786723f93b34943c9dc9723a92378c12b344ae Mon Sep 17 00:00:00 2001 From: Alibi Zhenis Date: Tue, 13 Jun 2023 21:49:35 +0800 Subject: [PATCH 7/7] checking output value in integration tests Signed-off-by: Alibi Zhenis --- tests/ml_commons/test_ml_commons_client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/ml_commons/test_ml_commons_client.py b/tests/ml_commons/test_ml_commons_client.py index 792aa14f..0dc5bfd2 100644 --- a/tests/ml_commons/test_ml_commons_client.py +++ b/tests/ml_commons/test_ml_commons_client.py @@ -76,9 +76,10 @@ def test_execute(): raised = False try: input_json = {"operation": "max", "input_data": [1.0, 2.0, 3.0]} - ml_client.execute( + result = ml_client.execute( algorithm_name="local_sample_calculator", input_json=input_json ) + assert result["output"]["result"] == 3 except: # noqa: E722 raised = True assert ( @@ -88,9 +89,10 @@ def test_execute(): raised = False try: input_json = '{"operation": "max", "input_data": [1.0, 2.0, 3.0]}' - ml_client.execute( + result = ml_client.execute( algorithm_name="local_sample_calculator", input_json=input_json ) + assert result["output"]["result"] == 3 except: # noqa: E722 raised = True assert (