From e57df858218ced0b9789f9ea1abeafc03f3873fe Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 30 Aug 2023 16:32:44 -0500 Subject: [PATCH 01/16] added elasticsearch sink --- .../modules/core/write_to_elasticsearch.md | 45 +++++ docs/source/modules/index.md | 2 +- morpheus/modules/__init__.py | 6 +- morpheus/modules/write_to_elasticsearch.py | 89 ++++++++++ .../controllers/elasticsearch_controller.py | 166 ++++++++++++++++++ morpheus/utils/module_ids.py | 2 +- .../test_elasticsearch_controller.py | 156 ++++++++++++++++ 7 files changed, 462 insertions(+), 4 deletions(-) create mode 100644 docs/source/modules/core/write_to_elasticsearch.md create mode 100644 morpheus/modules/write_to_elasticsearch.py create mode 100644 morpheus/utils/controllers/elasticsearch_controller.py create mode 100644 tests/utils/controllers/test_elasticsearch_controller.py diff --git a/docs/source/modules/core/write_to_elasticsearch.md b/docs/source/modules/core/write_to_elasticsearch.md new file mode 100644 index 0000000000..7689b6e3fc --- /dev/null +++ b/docs/source/modules/core/write_to_elasticsearch.md @@ -0,0 +1,45 @@ + + +## Write to Elasticsearch Module + +This module reads an input data stream, converts each row of data to a document format suitable for Elasticsearch, and writes the documents to the specified Elasticsearch index using the Elasticsearch bulk API. + +### Configurable Parameters + +| Parameter | Type | Description | Example Value | Default Value | +|-------------------------|--------------|---------------------------------------------------------------------------------------------------------|-------------------------------|---------------| +| `index` | str | Elasticsearch index. | "my_index" | `[Required]` | +| `connection_kwargs` | dict | Elasticsearch connection kwargs configuration. | {"hosts": ["host": "localhost", ...} | `[Required]` | +| `raise_on_exception` | bool | Raise or suppress exceptions when writing to Elasticsearch. | true | `false` | +| `pickled_func_config` | str | Pickled custom function configuration to update connection_kwargs as needed for the client connection. | See below | None | +| `refresh_period_secs` | int | Time in seconds to refresh the client connection. | 3600 | `2400` | + +### Example JSON Configuration + +```json +{ + "index": "test_index", + "connection_kwargs": {"hosts": [{"host": "localhost", "port": 9200, "scheme": "http"}]}, + "raise_on_exception": true, + "pickled_func_config": { + "pickled_func_str": "pickled function as a string", + "encoding": "latin1" + }, + "refresh_period_secs": 2400 +} +``` diff --git a/docs/source/modules/index.md b/docs/source/modules/index.md index 042c35104c..d80fbe367a 100644 --- a/docs/source/modules/index.md +++ b/docs/source/modules/index.md @@ -31,10 +31,10 @@ limitations under the License. ./core/filter_detections.md ./core/from_control_message.md ./core/mlflow_model_writer.md -./core/multiplexer.md ./core/payload_batcher.md ./core/serialize.md ./core/to_control_message.md +./core/write_to_elasticsearch.md ./core/write_to_file.md ``` diff --git a/morpheus/modules/__init__.py b/morpheus/modules/__init__.py index 8d5d36e95a..fb5bb53025 100644 --- a/morpheus/modules/__init__.py +++ b/morpheus/modules/__init__.py @@ -14,6 +14,7 @@ """ Morpheus module definitions, each module is automatically registered when imported """ +from morpheus._lib import modules # When segment modules are imported, they're added to the module registry. # To avoid flake8 warnings about unused code, the noqa flag is used during import. from morpheus.modules import file_batcher @@ -26,8 +27,8 @@ from morpheus.modules import payload_batcher from morpheus.modules import serialize from morpheus.modules import to_control_message +from morpheus.modules import write_to_elasticsearch from morpheus.modules import write_to_file -from morpheus._lib import modules __all__ = [ "file_batcher", @@ -41,5 +42,6 @@ "payload_batcher", "serialize", "to_control_message", - "write_to_file" + "write_to_file", + "write_to_elasticsearch" ] diff --git a/morpheus/modules/write_to_elasticsearch.py b/morpheus/modules/write_to_elasticsearch.py new file mode 100644 index 0000000000..2211ad7159 --- /dev/null +++ b/morpheus/modules/write_to_elasticsearch.py @@ -0,0 +1,89 @@ +# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import mrc +from mrc.core import operators as ops + +from morpheus.messages import ControlMessage +from morpheus.utils.controllers.elasticsearch_controller import ElasticsearchController +from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE +from morpheus.utils.module_ids import WRITE_TO_ELASTICSEARCH +from morpheus.utils.module_utils import register_module + +logger = logging.getLogger(__name__) + + +@register_module(WRITE_TO_ELASTICSEARCH, MORPHEUS_MODULE_NAMESPACE) +def write_to_elasticsearch(builder: mrc.Builder): + """ + This module reads input data stream, converts each row of data to a document format suitable for Elasticsearch, + and writes the documents to the specified Elasticsearch index using the Elasticsearch bulk API. + + Parameters + ---------- + builder : mrc.Builder + An mrc Builder object. + + Notes + ----- + Configurable Parameters: + - index (str): Elastic search index. + - connection_kwargs (dict): Elastic search connection kwrags configuration. + - pickled_func_config (str): Pickled custom function configuration to updated connection_kwargs as needed + to established client connection. Custom function should return connection_kwargs; default: None. + - refresh_period_secs (int): Time in seconds to refresh the client connection; default: 2400. + - raise_on_exception (bool): It is used to raise or supress exceptions when writing to Elasticsearch; + deafult: False + """ + + config = builder.get_current_module_config() + + index = config.get("index", None) + connection_kwargs = config.get("connection_kwargs") + raise_on_exception = config.get("raise_on_exception", False) + pickled_func_config = config.get("pickled_func_config", None) + refresh_period_secs = config.get("refresh_period_secs", 2400) + + controller = ElasticsearchController(index=index, + connection_kwargs=connection_kwargs, + raise_on_exception=raise_on_exception, + refresh_period_secs=refresh_period_secs, + pickled_func_config=pickled_func_config) + + def on_data(message: ControlMessage): + + controller.refresh_client() + + meta = message.payload() + rows = meta.df.to_pandas().to_dict('records') + + actions = [] + for row in rows: + action = {"_index": index, "_source": row} + actions.append(action) + + controller.parallel_bulk_write(actions) # Parallel bulk upload to Elasticsearch + + return message + + def on_complete(): + controller.close_client() # Close client + + node = builder.make_node(WRITE_TO_ELASTICSEARCH, ops.map(on_data), ops.on_completed(on_complete)) + + # Register input and output port for a module. + builder.register_module_input("input", node) + builder.register_module_output("output", node) diff --git a/morpheus/utils/controllers/elasticsearch_controller.py b/morpheus/utils/controllers/elasticsearch_controller.py new file mode 100644 index 0000000000..4f8c4f9872 --- /dev/null +++ b/morpheus/utils/controllers/elasticsearch_controller.py @@ -0,0 +1,166 @@ +# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import pickle +import time + +from elasticsearch import ConnectionError as ESConnectionError +from elasticsearch import Elasticsearch +from elasticsearch.helpers import parallel_bulk + +logger = logging.getLogger(__name__) + + +class ElasticsearchController: + """ + ElasticsearchController to perform read and write operations using Elasticsearch service. + + Parameters + ---------- + index : str + The name of the index to interact with. + connection_kwargs : dict + Keyword arguments to configure the Elasticsearch connection. + raise_on_exception : bool, optional + Whether to raise exceptions on Elasticsearch errors. + refresh_period_secs : int, optional + The refresh period in seconds for client refreshing. + pickled_func_config : dict, optional + Configuration for a pickled function to modify connection parameters. + """ + + def __init__(self, + index: str, + connection_kwargs: dict, + raise_on_exception: bool = False, + refresh_period_secs: int = 2400, + pickled_func_config: dict = None): + + self._index = index + self._connection_kwargs = connection_kwargs + self._raise_on_exception = raise_on_exception + self._refresh_period_secs = refresh_period_secs + self._apply_derive_params_func(pickled_func_config) + + logger.debug("Creating Elasticsearch client with configuration: %s", connection_kwargs) + + # Create ElasticSearch client + self._client = Elasticsearch(**self._connection_kwargs) + + # Check if the client is connected + is_connected = self._client.ping() + + if is_connected: + logger.debug("Elasticsearch client is connected.") + else: + logger.debug("Elasticsearch client is not connected.") + raise ESConnectionError("Elasticsearch client is not connected.") + + self._last_refresh_time = time.time() + + logger.debug("Elasticsearch cluster info: %s", self._client.info) + logger.debug("Creating Elasticsearch client... Done!") + + @property + def client(self) -> Elasticsearch: + """ + Get the active Elasticsearch client instance. + + Returns + ------- + Elasticsearch + The active Elasticsearch client. + """ + return self._client + + def _apply_derive_params_func(self, pickled_func_config) -> None: + if pickled_func_config: + pickled_func_str = pickled_func_config["pickled_func_str"] + encoding = pickled_func_config["encoding"] + func = pickle.loads(bytes(pickled_func_str, encoding)) + self._connection_kwargs = func(self._connection_kwargs) + + def refresh_client(self, force=False) -> None: + """ + Refresh the Elasticsearch client instance. + + Parameters + ---------- + force : bool, optional + Force a client refresh. + """ + if force or self._client is None or time.time() - self._last_refresh_time >= self._refresh_period_secs: + if not force and self._client: + try: + # Close the existing client + self.close_client() + except Exception as ex: + logger.warning("Ignoring client close error: %s", ex) + logger.debug("Refreshing Elasticsearch client....") + self._client = Elasticsearch(**self._connection_kwargs) + logger.debug("Refreshing Elasticsearch client.... Done!") + + self._last_refresh_time = time.time() + + def parallel_bulk_write(self, actions) -> None: + """ + Perform parallel bulk writes to Elasticsearch. + + Parameters + ---------- + actions : list + List of actions to perform in parallel. + """ + + for success, info in parallel_bulk(self._client, actions=actions, raise_on_exception=self._raise_on_exception): + if not success: + logger.error("Error writing to ElasticSearch: %s", str(info)) + + def search_documents(self, query: dict, index: str = None, **kwargs) -> dict: + """ + Search for documents in Elasticsearch based on the given query. + + Parameters + ---------- + query : dict + The query DSL (Domain Specific Language) for the search. + index : str, optional + The name of the index to search. If not provided, the instance's index will be used. + **kwargs + Additional keyword arguments that are supported by the Elasticsearch search method. + + Returns + ------- + dict + The search result returned by Elasticsearch. + """ + if index is None: + index = self._index + + try: + result = self._client.search(index=index, query=query, **kwargs) + return result + except Exception as exc: + logger.error("Error searching documents: %s", exc) + if self._raise_on_exception: + raise RuntimeError(f"Error searching documents: {exc}") from exc + + return {} + + def close_client(self): + """ + Close the Elasticsearch client connection. + """ + self._client.close() diff --git a/morpheus/utils/module_ids.py b/morpheus/utils/module_ids.py index dfb704b45e..2626859871 100644 --- a/morpheus/utils/module_ids.py +++ b/morpheus/utils/module_ids.py @@ -21,9 +21,9 @@ FILTER_DETECTIONS = "FilterDetections" FROM_CONTROL_MESSAGE = "FromControlMessage" MLFLOW_MODEL_WRITER = "MLFlowModelWriter" -MULTIPLEXER = "Multiplexer" SERIALIZE = "Serialize" TO_CONTROL_MESSAGE = "ToControlMessage" WRITE_TO_FILE = "WriteToFile" FILTER_CM_FAILED = "FilterCmFailed" PAYLOAD_BATCHER = "PayloadBatcher" +WRITE_TO_ELASTICSEARCH = "WriteToElasticsearch" diff --git a/tests/utils/controllers/test_elasticsearch_controller.py b/tests/utils/controllers/test_elasticsearch_controller.py new file mode 100644 index 0000000000..9c5ee1caa4 --- /dev/null +++ b/tests/utils/controllers/test_elasticsearch_controller.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pickle +import time +from unittest.mock import Mock +from unittest.mock import patch + +import pytest +from elasticsearch import Elasticsearch + +from morpheus.utils.controllers.elasticsearch_controller import ElasticsearchController + +# pylint: disable=W0621 + + +# Define a mock function for _apply_derive_params_function +def mock_derive_params(kwargs): + kwargs["retry_on_status"] = 3 + kwargs["retry_on_timeout"] = 3 * 10 + + return kwargs + + +@pytest.fixture(scope="module") +def mock_es_controller(connection_kwargs): + with patch("morpheus.utils.controllers.elasticsearch_controller.Elasticsearch", Mock(spec=Elasticsearch)): + controller = ElasticsearchController("test_index", connection_kwargs, refresh_period_secs=10) + yield controller + + +@pytest.fixture(scope="module") +def connection_kwargs(): + kwargs = {"hosts": [{"host": "localhost", "port": 9200, "scheme": "http"}]} + yield kwargs + + +@pytest.mark.use_python +def test_constructor(mock_es_controller, connection_kwargs): + assert mock_es_controller._index == "test_index" + assert mock_es_controller._connection_kwargs == connection_kwargs + assert mock_es_controller._raise_on_exception is False + assert mock_es_controller._refresh_period_secs == 10 + assert mock_es_controller._client is not None + + +@pytest.mark.use_python +def test_apply_derive_params_func(mock_es_controller): + pickled_func_str = str(pickle.dumps(mock_derive_params), encoding="latin1") + + # Create pickled_func_config + pickled_func_config = { + "pickled_func_str": pickled_func_str, # Pickled function string + "encoding": "latin1" + } + + # Apply the mock function and check if connection_kwargs is updated + mock_es_controller._apply_derive_params_func(pickled_func_config) + assert mock_es_controller._connection_kwargs == { + "hosts": [{ + "host": "localhost", "port": 9200, "scheme": "http" + }], "retry_on_status": 3, "retry_on_timeout": 30 + } + + +@pytest.mark.use_python +def test_refresh_client_force(mock_es_controller): + # Simulate a force refresh + mock_es_controller.refresh_client(force=True) + + assert mock_es_controller._client is not None + assert mock_es_controller._last_refresh_time > 0 + assert isinstance(mock_es_controller._client, Mock) + + +@pytest.mark.use_python +def test_refresh_client_not_needed(mock_es_controller): + # Set last refresh time to a recent time + current_time = time.time() + mock_es_controller._last_refresh_time = current_time + + # Simulate a refresh not needed scenario + mock_es_controller.refresh_client() + # Assert client is not None + assert mock_es_controller._client is not None + # Assert last_refresh_time is unchanged + assert mock_es_controller._last_refresh_time == current_time + # Assert client type remains the same + assert isinstance(mock_es_controller._client, Mock) + + +@pytest.mark.use_python +def test_refresh_client_needed(mock_es_controller): + # Set last refresh time to a recent time + current_time = time.time() + mock_es_controller._refresh_period_secs = 1 + mock_es_controller._last_refresh_time = current_time + time.sleep(1) + + # Simulate a refresh needed scenario + mock_es_controller.refresh_client() + + # Assert client is not None + assert mock_es_controller._client is not None + # Assert last_refresh_time is changed + assert mock_es_controller._last_refresh_time > current_time + # Assert client type remains the same + assert isinstance(mock_es_controller._client, Mock) + + +@pytest.mark.use_python +@patch("morpheus.utils.controllers.elasticsearch_controller.parallel_bulk", return_value=[(True, None)]) +def test_parallel_bulk_write(mock_parallel_bulk, mock_es_controller): + # Define your mock actions + mock_actions = [{"_index": "test_index", "_id": 1, "_source": {"field1": "value1"}}] + + mock_es_controller.parallel_bulk_write(actions=mock_actions) + mock_parallel_bulk.assert_called_once() + + +def test_search_documents_success(mock_es_controller): + mock_es_controller._client.search.return_value = {"hits": {"total": 1, "hits": [{"_source": {"field1": "value1"}}]}} + query = {"match": {"field1": "value1"}} + result = mock_es_controller.search_documents(query=query) + assert isinstance(result, dict) + assert "hits" in result + assert "total" in result["hits"] + assert result["hits"]["total"] == 1 + + +def test_search_documents_failure_supress_errors(mock_es_controller): + mock_es_controller._client.search.side_effect = ConnectionError("Connection error") + query = {"match": {"field1": "value1"}} + result = mock_es_controller.search_documents(query=query) + assert isinstance(result, dict) + assert not result + + +def test_search_documents_failure_raise_error(mock_es_controller): + mock_es_controller._raise_on_exception = True + query = {"match": {"field1": "value1"}} + with pytest.raises(RuntimeError): + mock_es_controller.search_documents(query=query) From efab6e336d4b3b0b089b13ebe132ac521d6c1e35 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 30 Aug 2023 22:32:09 -0500 Subject: [PATCH 02/16] added elasticsearch dependecy --- docker/conda/environments/cuda11.8_dev.yml | 1 + morpheus/utils/controllers/__init__.py | 12 ++++++++++++ 2 files changed, 13 insertions(+) create mode 100644 morpheus/utils/controllers/__init__.py diff --git a/docker/conda/environments/cuda11.8_dev.yml b/docker/conda/environments/cuda11.8_dev.yml index e9b0b43018..5ab9e7e8ad 100644 --- a/docker/conda/environments/cuda11.8_dev.yml +++ b/docker/conda/environments/cuda11.8_dev.yml @@ -43,6 +43,7 @@ dependencies: - dill - docker-py=5.0 - docutils + - elasticsearch==8.9.0 - faker=12.3.0 - flake8 - flatbuffers=2.0 diff --git a/morpheus/utils/controllers/__init__.py b/morpheus/utils/controllers/__init__.py new file mode 100644 index 0000000000..a1dd01f33d --- /dev/null +++ b/morpheus/utils/controllers/__init__.py @@ -0,0 +1,12 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. From d3004a5e4324ce1aac67adfed4752acd08890b39 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Thu, 31 Aug 2023 10:50:17 -0500 Subject: [PATCH 03/16] updated docs --- docs/source/modules/core/multiplexer.md | 35 ---------------------- morpheus/modules/write_to_elasticsearch.py | 11 ------- 2 files changed, 46 deletions(-) delete mode 100644 docs/source/modules/core/multiplexer.md diff --git a/docs/source/modules/core/multiplexer.md b/docs/source/modules/core/multiplexer.md deleted file mode 100644 index cd4dc64611..0000000000 --- a/docs/source/modules/core/multiplexer.md +++ /dev/null @@ -1,35 +0,0 @@ - - -## Multiplexer Module - -The multiplexer receives data packets from one or more input ports and interleaves them into a single output. - -### Configurable Parameters - -| Parameter | Type | Description | Example Value | Default Value | -|---------------------------|-----------|-----------------------------------------------------------------------------------------------------------|---------------|---------------| -| `input_ports`| list[string] | Input ports data streams to be combined | `["intput_1", "input_2"]` | `None` | -| `c` | string | Output port where the combined streams to be passed | `output` | `None` | - -### Example JSON Configuration - -```json -{ - "input_ports": ["intput_1", "input_2"], - "output_port": "output" -} diff --git a/morpheus/modules/write_to_elasticsearch.py b/morpheus/modules/write_to_elasticsearch.py index 2211ad7159..7879b47187 100644 --- a/morpheus/modules/write_to_elasticsearch.py +++ b/morpheus/modules/write_to_elasticsearch.py @@ -36,17 +36,6 @@ def write_to_elasticsearch(builder: mrc.Builder): ---------- builder : mrc.Builder An mrc Builder object. - - Notes - ----- - Configurable Parameters: - - index (str): Elastic search index. - - connection_kwargs (dict): Elastic search connection kwrags configuration. - - pickled_func_config (str): Pickled custom function configuration to updated connection_kwargs as needed - to established client connection. Custom function should return connection_kwargs; default: None. - - refresh_period_secs (int): Time in seconds to refresh the client connection; default: 2400. - - raise_on_exception (bool): It is used to raise or supress exceptions when writing to Elasticsearch; - deafult: False """ config = builder.get_current_module_config() From 9be0de079f63a1967c19db7f77c7d8fced67b347 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Thu, 31 Aug 2023 11:36:37 -0500 Subject: [PATCH 04/16] Moved controllers module one level up --- morpheus/{utils => }/controllers/__init__.py | 0 .../{utils => }/controllers/elasticsearch_controller.py | 0 morpheus/modules/write_to_elasticsearch.py | 2 +- .../controllers/test_elasticsearch_controller.py | 6 +++--- 4 files changed, 4 insertions(+), 4 deletions(-) rename morpheus/{utils => }/controllers/__init__.py (100%) rename morpheus/{utils => }/controllers/elasticsearch_controller.py (100%) rename tests/{utils => }/controllers/test_elasticsearch_controller.py (94%) diff --git a/morpheus/utils/controllers/__init__.py b/morpheus/controllers/__init__.py similarity index 100% rename from morpheus/utils/controllers/__init__.py rename to morpheus/controllers/__init__.py diff --git a/morpheus/utils/controllers/elasticsearch_controller.py b/morpheus/controllers/elasticsearch_controller.py similarity index 100% rename from morpheus/utils/controllers/elasticsearch_controller.py rename to morpheus/controllers/elasticsearch_controller.py diff --git a/morpheus/modules/write_to_elasticsearch.py b/morpheus/modules/write_to_elasticsearch.py index 7879b47187..42a62875df 100644 --- a/morpheus/modules/write_to_elasticsearch.py +++ b/morpheus/modules/write_to_elasticsearch.py @@ -17,8 +17,8 @@ import mrc from mrc.core import operators as ops +from morpheus.controllers.elasticsearch_controller import ElasticsearchController from morpheus.messages import ControlMessage -from morpheus.utils.controllers.elasticsearch_controller import ElasticsearchController from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE from morpheus.utils.module_ids import WRITE_TO_ELASTICSEARCH from morpheus.utils.module_utils import register_module diff --git a/tests/utils/controllers/test_elasticsearch_controller.py b/tests/controllers/test_elasticsearch_controller.py similarity index 94% rename from tests/utils/controllers/test_elasticsearch_controller.py rename to tests/controllers/test_elasticsearch_controller.py index 9c5ee1caa4..8ca4976894 100644 --- a/tests/utils/controllers/test_elasticsearch_controller.py +++ b/tests/controllers/test_elasticsearch_controller.py @@ -22,7 +22,7 @@ import pytest from elasticsearch import Elasticsearch -from morpheus.utils.controllers.elasticsearch_controller import ElasticsearchController +from morpheus.controllers.elasticsearch_controller import ElasticsearchController # pylint: disable=W0621 @@ -37,7 +37,7 @@ def mock_derive_params(kwargs): @pytest.fixture(scope="module") def mock_es_controller(connection_kwargs): - with patch("morpheus.utils.controllers.elasticsearch_controller.Elasticsearch", Mock(spec=Elasticsearch)): + with patch("morpheus.controllers.elasticsearch_controller.Elasticsearch", Mock(spec=Elasticsearch)): controller = ElasticsearchController("test_index", connection_kwargs, refresh_period_secs=10) yield controller @@ -122,7 +122,7 @@ def test_refresh_client_needed(mock_es_controller): @pytest.mark.use_python -@patch("morpheus.utils.controllers.elasticsearch_controller.parallel_bulk", return_value=[(True, None)]) +@patch("morpheus.controllers.elasticsearch_controller.parallel_bulk", return_value=[(True, None)]) def test_parallel_bulk_write(mock_parallel_bulk, mock_es_controller): # Define your mock actions mock_actions = [{"_index": "test_index", "_id": 1, "_source": {"field1": "value1"}}] From 5a7fdedcb9c033150b3180439fb1882c43b665b8 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 6 Sep 2023 23:27:05 -0500 Subject: [PATCH 05/16] updated write to elasticsearch tests --- .../controllers/elasticsearch_controller.py | 64 ++++----- morpheus/modules/write_to_elasticsearch.py | 16 ++- .../test_elasticsearch_controller.py | 135 ++++++++++-------- 3 files changed, 110 insertions(+), 105 deletions(-) diff --git a/morpheus/controllers/elasticsearch_controller.py b/morpheus/controllers/elasticsearch_controller.py index 4f8c4f9872..d450c216d7 100644 --- a/morpheus/controllers/elasticsearch_controller.py +++ b/morpheus/controllers/elasticsearch_controller.py @@ -29,8 +29,6 @@ class ElasticsearchController: Parameters ---------- - index : str - The name of the index to interact with. connection_kwargs : dict Keyword arguments to configure the Elasticsearch connection. raise_on_exception : bool, optional @@ -42,13 +40,13 @@ class ElasticsearchController: """ def __init__(self, - index: str, connection_kwargs: dict, raise_on_exception: bool = False, refresh_period_secs: int = 2400, pickled_func_config: dict = None): - self._index = index + self._client = None + self._last_refresh_time = None self._connection_kwargs = connection_kwargs self._raise_on_exception = raise_on_exception self._refresh_period_secs = refresh_period_secs @@ -56,35 +54,11 @@ def __init__(self, logger.debug("Creating Elasticsearch client with configuration: %s", connection_kwargs) - # Create ElasticSearch client - self._client = Elasticsearch(**self._connection_kwargs) - - # Check if the client is connected - is_connected = self._client.ping() - - if is_connected: - logger.debug("Elasticsearch client is connected.") - else: - logger.debug("Elasticsearch client is not connected.") - raise ESConnectionError("Elasticsearch client is not connected.") - - self._last_refresh_time = time.time() + self.refresh_client(force=True) logger.debug("Elasticsearch cluster info: %s", self._client.info) logger.debug("Creating Elasticsearch client... Done!") - @property - def client(self) -> Elasticsearch: - """ - Get the active Elasticsearch client instance. - - Returns - ------- - Elasticsearch - The active Elasticsearch client. - """ - return self._client - def _apply_derive_params_func(self, pickled_func_config) -> None: if pickled_func_config: pickled_func_str = pickled_func_config["pickled_func_str"] @@ -92,7 +66,7 @@ def _apply_derive_params_func(self, pickled_func_config) -> None: func = pickle.loads(bytes(pickled_func_str, encoding)) self._connection_kwargs = func(self._connection_kwargs) - def refresh_client(self, force=False) -> None: + def refresh_client(self, force=False) -> bool: """ Refresh the Elasticsearch client instance. @@ -101,18 +75,32 @@ def refresh_client(self, force=False) -> None: force : bool, optional Force a client refresh. """ - if force or self._client is None or time.time() - self._last_refresh_time >= self._refresh_period_secs: - if not force and self._client: + + is_refreshed = False + time_now = time.time() + if force or self._client is None or time_now - self._last_refresh_time >= self._refresh_period_secs: + if self._client: try: # Close the existing client self.close_client() except Exception as ex: logger.warning("Ignoring client close error: %s", ex) logger.debug("Refreshing Elasticsearch client....") + + # Create Elasticsearch client self._client = Elasticsearch(**self._connection_kwargs) - logger.debug("Refreshing Elasticsearch client.... Done!") + # Check if the client is connected + if self._client.ping(): + logger.debug("Elasticsearch client is connected.") + else: + raise ESConnectionError("Elasticsearch client is not connected.") + + logger.debug("Refreshing Elasticsearch client.... Done!") self._last_refresh_time = time.time() + is_refreshed = True + + return is_refreshed def parallel_bulk_write(self, actions) -> None: """ @@ -128,16 +116,16 @@ def parallel_bulk_write(self, actions) -> None: if not success: logger.error("Error writing to ElasticSearch: %s", str(info)) - def search_documents(self, query: dict, index: str = None, **kwargs) -> dict: + def search_documents(self, index: str, query: dict, **kwargs) -> dict: """ Search for documents in Elasticsearch based on the given query. Parameters ---------- + index : str + The name of the index to search. query : dict - The query DSL (Domain Specific Language) for the search. - index : str, optional - The name of the index to search. If not provided, the instance's index will be used. + The DSL query for the search. **kwargs Additional keyword arguments that are supported by the Elasticsearch search method. @@ -146,8 +134,6 @@ def search_documents(self, query: dict, index: str = None, **kwargs) -> dict: dict The search result returned by Elasticsearch. """ - if index is None: - index = self._index try: result = self._client.search(index=index, query=query, **kwargs) diff --git a/morpheus/modules/write_to_elasticsearch.py b/morpheus/modules/write_to_elasticsearch.py index 42a62875df..9112990360 100644 --- a/morpheus/modules/write_to_elasticsearch.py +++ b/morpheus/modules/write_to_elasticsearch.py @@ -41,13 +41,20 @@ def write_to_elasticsearch(builder: mrc.Builder): config = builder.get_current_module_config() index = config.get("index", None) + + if index is None: + raise ValueError("Index must not be None.") + connection_kwargs = config.get("connection_kwargs") + + if not isinstance(connection_kwargs, dict): + raise ValueError(f"Expects `connection_kwargs` as a dictionary, but it is of type {type(connection_kwargs)}") + raise_on_exception = config.get("raise_on_exception", False) pickled_func_config = config.get("pickled_func_config", None) refresh_period_secs = config.get("refresh_period_secs", 2400) - controller = ElasticsearchController(index=index, - connection_kwargs=connection_kwargs, + controller = ElasticsearchController(connection_kwargs=connection_kwargs, raise_on_exception=raise_on_exception, refresh_period_secs=refresh_period_secs, pickled_func_config=pickled_func_config) @@ -68,10 +75,7 @@ def on_data(message: ControlMessage): return message - def on_complete(): - controller.close_client() # Close client - - node = builder.make_node(WRITE_TO_ELASTICSEARCH, ops.map(on_data), ops.on_completed(on_complete)) + node = builder.make_node(WRITE_TO_ELASTICSEARCH, ops.map(on_data), ops.on_completed(controller.close_client)) # Register input and output port for a module. builder.register_module_input("input", node) diff --git a/tests/controllers/test_elasticsearch_controller.py b/tests/controllers/test_elasticsearch_controller.py index 8ca4976894..d0a69cc9f7 100644 --- a/tests/controllers/test_elasticsearch_controller.py +++ b/tests/controllers/test_elasticsearch_controller.py @@ -16,16 +16,13 @@ import pickle import time -from unittest.mock import Mock +import typing from unittest.mock import patch import pytest -from elasticsearch import Elasticsearch from morpheus.controllers.elasticsearch_controller import ElasticsearchController -# pylint: disable=W0621 - # Define a mock function for _apply_derive_params_function def mock_derive_params(kwargs): @@ -35,30 +32,38 @@ def mock_derive_params(kwargs): return kwargs -@pytest.fixture(scope="module") -def mock_es_controller(connection_kwargs): - with patch("morpheus.controllers.elasticsearch_controller.Elasticsearch", Mock(spec=Elasticsearch)): - controller = ElasticsearchController("test_index", connection_kwargs, refresh_period_secs=10) - yield controller +@pytest.fixture(scope="function", autouse=True) +def patch_elasticsearch(): + with patch("morpheus.controllers.elasticsearch_controller.Elasticsearch", autospec=True): + yield -@pytest.fixture(scope="module") -def connection_kwargs(): +@pytest.fixture(scope="module", name="connection_kwargs") +def connection_kwargs_fixture(): kwargs = {"hosts": [{"host": "localhost", "port": 9200, "scheme": "http"}]} yield kwargs +@pytest.fixture(scope="module", name="create_controller") +def create_controller_fixture(connection_kwargs): + + def inner_create_controller(*, connection_kwargs=connection_kwargs, refresh_period_secs=10, **controller_kwargs): + return ElasticsearchController(connection_kwargs=connection_kwargs, + refresh_period_secs=refresh_period_secs, + **controller_kwargs) + + yield inner_create_controller + + @pytest.mark.use_python -def test_constructor(mock_es_controller, connection_kwargs): - assert mock_es_controller._index == "test_index" - assert mock_es_controller._connection_kwargs == connection_kwargs - assert mock_es_controller._raise_on_exception is False - assert mock_es_controller._refresh_period_secs == 10 - assert mock_es_controller._client is not None +def test_constructor(create_controller: typing.Callable[..., ElasticsearchController], connection_kwargs: dict): + assert create_controller(raise_on_exception=True)._raise_on_exception is True + assert create_controller(refresh_period_secs=1.5)._refresh_period_secs == 1.5 + assert create_controller()._connection_kwargs == connection_kwargs @pytest.mark.use_python -def test_apply_derive_params_func(mock_es_controller): +def test_apply_derive_params_func(create_controller: typing.Callable[..., ElasticsearchController]): pickled_func_str = str(pickle.dumps(mock_derive_params), encoding="latin1") # Create pickled_func_config @@ -67,9 +72,11 @@ def test_apply_derive_params_func(mock_es_controller): "encoding": "latin1" } + controller = create_controller() # Apply the mock function and check if connection_kwargs is updated - mock_es_controller._apply_derive_params_func(pickled_func_config) - assert mock_es_controller._connection_kwargs == { + controller._apply_derive_params_func(pickled_func_config) + + assert controller._connection_kwargs == { "hosts": [{ "host": "localhost", "port": 9200, "scheme": "http" }], "retry_on_status": 3, "retry_on_timeout": 30 @@ -77,80 +84,88 @@ def test_apply_derive_params_func(mock_es_controller): @pytest.mark.use_python -def test_refresh_client_force(mock_es_controller): - # Simulate a force refresh - mock_es_controller.refresh_client(force=True) +def test_refresh_client_force(create_controller: typing.Callable[..., ElasticsearchController]): + controller = create_controller(refresh_period_secs=1) + + client = controller._client + is_refreshed = controller.refresh_client(force=True) - assert mock_es_controller._client is not None - assert mock_es_controller._last_refresh_time > 0 - assert isinstance(mock_es_controller._client, Mock) + controller._client.close.assert_called_once() + assert client.ping.call_count == 2 + assert is_refreshed is True + assert controller._last_refresh_time > 0 @pytest.mark.use_python -def test_refresh_client_not_needed(mock_es_controller): +def test_refresh_client_not_needed(create_controller: typing.Callable[..., ElasticsearchController]): # Set last refresh time to a recent time current_time = time.time() - mock_es_controller._last_refresh_time = current_time + controller = create_controller() + controller._last_refresh_time = current_time + client = controller._client # Simulate a refresh not needed scenario - mock_es_controller.refresh_client() - # Assert client is not None - assert mock_es_controller._client is not None - # Assert last_refresh_time is unchanged - assert mock_es_controller._last_refresh_time == current_time - # Assert client type remains the same - assert isinstance(mock_es_controller._client, Mock) + is_refreshed = controller.refresh_client() + + client.close.assert_not_called() + assert client.ping.call_count == 1 + assert is_refreshed is False + assert controller._last_refresh_time == current_time @pytest.mark.use_python -def test_refresh_client_needed(mock_es_controller): - # Set last refresh time to a recent time - current_time = time.time() - mock_es_controller._refresh_period_secs = 1 - mock_es_controller._last_refresh_time = current_time - time.sleep(1) +def test_refresh_client_needed(create_controller: typing.Callable[..., ElasticsearchController]): + + # Set a 1 second refresh period + controller = create_controller(refresh_period_secs=1) + client = controller._client - # Simulate a refresh needed scenario - mock_es_controller.refresh_client() + is_refreshed = False + # Now "sleep" for more than 1 second to trigger a new client + with patch("time.time", return_value=time.time() + 1): + is_refreshed = controller.refresh_client() - # Assert client is not None - assert mock_es_controller._client is not None - # Assert last_refresh_time is changed - assert mock_es_controller._last_refresh_time > current_time - # Assert client type remains the same - assert isinstance(mock_es_controller._client, Mock) + client.close.assert_called_once() + assert client.ping.call_count == 2 + assert is_refreshed is True @pytest.mark.use_python @patch("morpheus.controllers.elasticsearch_controller.parallel_bulk", return_value=[(True, None)]) -def test_parallel_bulk_write(mock_parallel_bulk, mock_es_controller): +def test_parallel_bulk_write(mock_parallel_bulk, create_controller: typing.Callable[..., ElasticsearchController]): # Define your mock actions mock_actions = [{"_index": "test_index", "_id": 1, "_source": {"field1": "value1"}}] - mock_es_controller.parallel_bulk_write(actions=mock_actions) + create_controller().parallel_bulk_write(actions=mock_actions) mock_parallel_bulk.assert_called_once() -def test_search_documents_success(mock_es_controller): - mock_es_controller._client.search.return_value = {"hits": {"total": 1, "hits": [{"_source": {"field1": "value1"}}]}} +def test_search_documents_success(create_controller: typing.Callable[..., ElasticsearchController]): + controller = create_controller() + controller._client.search.return_value = {"hits": {"total": 1, "hits": [{"_source": {"field1": "value1"}}]}} query = {"match": {"field1": "value1"}} - result = mock_es_controller.search_documents(query=query) + result = controller.search_documents(index="test_index", query=query) + assert isinstance(result, dict) assert "hits" in result assert "total" in result["hits"] assert result["hits"]["total"] == 1 -def test_search_documents_failure_supress_errors(mock_es_controller): - mock_es_controller._client.search.side_effect = ConnectionError("Connection error") +def test_search_documents_failure_supress_errors(create_controller: typing.Callable[..., ElasticsearchController]): + controller = create_controller() + controller._client.search.side_effect = ConnectionError("Connection error") query = {"match": {"field1": "value1"}} - result = mock_es_controller.search_documents(query=query) + result = controller.search_documents(index="test_index", query=query) + assert isinstance(result, dict) assert not result -def test_search_documents_failure_raise_error(mock_es_controller): - mock_es_controller._raise_on_exception = True +def test_search_documents_failure_raise_error(create_controller: typing.Callable[..., ElasticsearchController]): + controller = create_controller(raise_on_exception=True) + controller._client.search.side_effect = Exception query = {"match": {"field1": "value1"}} + with pytest.raises(RuntimeError): - mock_es_controller.search_documents(query=query) + controller.search_documents(index="test_index", query=query) From 6c02b719b21a098cfba1a37b115c1ed6100d6311 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 6 Sep 2023 23:52:27 -0500 Subject: [PATCH 06/16] updated write to elasticsearch tests --- morpheus/controllers/elasticsearch_controller.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/morpheus/controllers/elasticsearch_controller.py b/morpheus/controllers/elasticsearch_controller.py index d450c216d7..151a7b3a9b 100644 --- a/morpheus/controllers/elasticsearch_controller.py +++ b/morpheus/controllers/elasticsearch_controller.py @@ -31,11 +31,11 @@ class ElasticsearchController: ---------- connection_kwargs : dict Keyword arguments to configure the Elasticsearch connection. - raise_on_exception : bool, optional + raise_on_exception : bool, optional, default: False Whether to raise exceptions on Elasticsearch errors. - refresh_period_secs : int, optional + refresh_period_secs : int, optional, default: 2400 The refresh period in seconds for client refreshing. - pickled_func_config : dict, optional + pickled_func_config : dict, optional, default: None Configuration for a pickled function to modify connection parameters. """ @@ -59,7 +59,7 @@ def __init__(self, logger.debug("Elasticsearch cluster info: %s", self._client.info) logger.debug("Creating Elasticsearch client... Done!") - def _apply_derive_params_func(self, pickled_func_config) -> None: + def _apply_derive_params_func(self, pickled_func_config: dict) -> None: if pickled_func_config: pickled_func_str = pickled_func_config["pickled_func_str"] encoding = pickled_func_config["encoding"] @@ -72,7 +72,7 @@ def refresh_client(self, force=False) -> bool: Parameters ---------- - force : bool, optional + force : bool, optional, default: False Force a client refresh. """ @@ -145,7 +145,7 @@ def search_documents(self, index: str, query: dict, **kwargs) -> dict: return {} - def close_client(self): + def close_client(self) -> None: """ Close the Elasticsearch client connection. """ From 347f5f2a8ba8d38ad2dc8e4070109c9efb28a941 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Thu, 7 Sep 2023 13:49:14 -0500 Subject: [PATCH 07/16] updated write to elasticsearch tests --- morpheus/cli/commands.py | 3 + .../controllers/elasticsearch_controller.py | 52 ++++++-- morpheus/modules/write_to_elasticsearch.py | 12 +- .../output/write_to_elasticsearch_stage.py | 119 ++++++++++++++++++ .../test_elasticsearch_controller.py | 48 ++++--- 5 files changed, 206 insertions(+), 28 deletions(-) create mode 100644 morpheus/stages/output/write_to_elasticsearch_stage.py diff --git a/morpheus/cli/commands.py b/morpheus/cli/commands.py index fae791cf0f..14687cf219 100644 --- a/morpheus/cli/commands.py +++ b/morpheus/cli/commands.py @@ -675,6 +675,9 @@ def post_pipeline(ctx: click.Context, *args, **kwargs): add_command("preprocess", "morpheus.stages.preprocess.preprocess_nlp_stage.PreprocessNLPStage", modes=NLP_ONLY) add_command("serialize", "morpheus.stages.postprocess.serialize_stage.SerializeStage", modes=ALL) add_command("timeseries", "morpheus.stages.postprocess.timeseries_stage.TimeSeriesStage", modes=AE_ONLY) +add_command("to-elasticsearch", + "morpheus.stages.output.write_to_elasticsearch_stage.WriteToElasticsearchStage", + modes=ALL) add_command("to-file", "morpheus.stages.output.write_to_file_stage.WriteToFileStage", modes=ALL) add_command("to-kafka", "morpheus.stages.output.write_to_kafka_stage.WriteToKafkaStage", modes=ALL) add_command("to-http", "morpheus.stages.output.http_client_sink_stage.HttpClientSinkStage", modes=ALL) diff --git a/morpheus/controllers/elasticsearch_controller.py b/morpheus/controllers/elasticsearch_controller.py index 151a7b3a9b..33c928363a 100644 --- a/morpheus/controllers/elasticsearch_controller.py +++ b/morpheus/controllers/elasticsearch_controller.py @@ -13,9 +13,10 @@ # limitations under the License. import logging -import pickle import time +import typing +import pandas as pd from elasticsearch import ConnectionError as ESConnectionError from elasticsearch import Elasticsearch from elasticsearch.helpers import parallel_bulk @@ -35,22 +36,21 @@ class ElasticsearchController: Whether to raise exceptions on Elasticsearch errors. refresh_period_secs : int, optional, default: 2400 The refresh period in seconds for client refreshing. - pickled_func_config : dict, optional, default: None - Configuration for a pickled function to modify connection parameters. + connection_kwargs_update_func : typing.Callable, optional, default: None + Custom function to update connection parameters. """ def __init__(self, connection_kwargs: dict, raise_on_exception: bool = False, refresh_period_secs: int = 2400, - pickled_func_config: dict = None): + connection_kwargs_update_func: typing.Callable = None): self._client = None self._last_refresh_time = None - self._connection_kwargs = connection_kwargs self._raise_on_exception = raise_on_exception self._refresh_period_secs = refresh_period_secs - self._apply_derive_params_func(pickled_func_config) + self._connection_kwargs = self._apply_custom_func(connection_kwargs_update_func, connection_kwargs) logger.debug("Creating Elasticsearch client with configuration: %s", connection_kwargs) @@ -59,14 +59,14 @@ def __init__(self, logger.debug("Elasticsearch cluster info: %s", self._client.info) logger.debug("Creating Elasticsearch client... Done!") - def _apply_derive_params_func(self, pickled_func_config: dict) -> None: - if pickled_func_config: - pickled_func_str = pickled_func_config["pickled_func_str"] - encoding = pickled_func_config["encoding"] - func = pickle.loads(bytes(pickled_func_str, encoding)) - self._connection_kwargs = func(self._connection_kwargs) + def _apply_custom_func(self, func, connection_kwargs): + # Apply custom function if it's available + if func: + connection_kwargs = func(connection_kwargs) - def refresh_client(self, force=False) -> bool: + return connection_kwargs + + def refresh_client(self, force: bool = False) -> bool: """ Refresh the Elasticsearch client instance. @@ -74,6 +74,11 @@ def refresh_client(self, force=False) -> bool: ---------- force : bool, optional, default: False Force a client refresh. + + Returns + ------- + bool + Returns true if client is refreshed, otherwise false. """ is_refreshed = False @@ -145,6 +150,27 @@ def search_documents(self, index: str, query: dict, **kwargs) -> dict: return {} + def df_to_parallel_bulk_write(self, index: str, df: pd.DataFrame) -> None: + """ + Convert dataframe entries to an actions and parallel bulk writes to Elasticsearch. + + Parameters + ---------- + index : str + The name of the index to write. + df : pd.DataFrame + DataFrame entries that require writing to Elasticsearch. + """ + + rows = df.to_dict('records') + + actions = [] + for row in rows: + action = {"_index": index, "_source": row} + actions.append(action) + + self.parallel_bulk_write(actions) # Parallel bulk upload to Elasticsearch + def close_client(self) -> None: """ Close the Elasticsearch client connection. diff --git a/morpheus/modules/write_to_elasticsearch.py b/morpheus/modules/write_to_elasticsearch.py index 9112990360..c80fc366ed 100644 --- a/morpheus/modules/write_to_elasticsearch.py +++ b/morpheus/modules/write_to_elasticsearch.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +import pickle import mrc from mrc.core import operators as ops @@ -54,10 +55,19 @@ def write_to_elasticsearch(builder: mrc.Builder): pickled_func_config = config.get("pickled_func_config", None) refresh_period_secs = config.get("refresh_period_secs", 2400) + connection_kwargs_update_func = None + + if pickled_func_config: + pickled_func_str = pickled_func_config.get("pickled_func_str") + encoding = pickled_func_config.get("encoding") + + if pickled_func_str and encoding: + connection_kwargs_update_func = pickle.loads(bytes(pickled_func_str, encoding)) + controller = ElasticsearchController(connection_kwargs=connection_kwargs, raise_on_exception=raise_on_exception, refresh_period_secs=refresh_period_secs, - pickled_func_config=pickled_func_config) + connection_kwargs_update_func=connection_kwargs_update_func) def on_data(message: ControlMessage): diff --git a/morpheus/stages/output/write_to_elasticsearch_stage.py b/morpheus/stages/output/write_to_elasticsearch_stage.py new file mode 100644 index 0000000000..f44bff52ac --- /dev/null +++ b/morpheus/stages/output/write_to_elasticsearch_stage.py @@ -0,0 +1,119 @@ +# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Write to Elasticsearch stage.""" + +import typing + +import mrc +import mrc.core.operators as ops +import yaml + +import cudf + +from morpheus.cli.register_stage import register_stage +from morpheus.config import Config +from morpheus.controllers.elasticsearch_controller import ElasticsearchController +from morpheus.messages import MessageMeta +from morpheus.pipeline.single_port_stage import SinglePortStage +from morpheus.pipeline.stream_pair import StreamPair + + +@register_stage("to-elasticsearch", ignore_args=["connection_kwargs_update_func"]) +class WriteToElasticsearchStage(SinglePortStage): + """ + + This class writes the messages as documents to Elasticsearch. + + Parameters + ---------- + config : `morpheus.config.Config` + Pipeline configuration instance. + index : str + Logical namespace that holds a collection of documents. + connection_conf_file : str + YAML configuration file for Elasticsearch connection kwargs settings. + raise_on_exception : bool, optional, default: False + Whether to raise exceptions on Elasticsearch errors. + refresh_period_secs : int, optional, default: 2400 + The refresh period in seconds for client refreshing. + connection_kwargs_update_func : typing.Callable, optional, default: None + Custom function to update connection parameters. + """ + + def __init__(self, + config: Config, + index: str, + connection_conf_file: str, + raise_on_exception: bool = False, + refresh_period_secs: int = 2400, + connection_kwargs_update_func: typing.Callable = None): + + super().__init__(config) + + self._index = index + + with open(connection_conf_file, "r", encoding="utf-8") as file: + connection_kwargs = yaml.safe_load(file) + + self._controller = ElasticsearchController(connection_kwargs=connection_kwargs, + raise_on_exception=raise_on_exception, + refresh_period_secs=refresh_period_secs, + connection_kwargs_update_func=connection_kwargs_update_func) + + @property + def name(self) -> str: + """Returns the name of this stage.""" + return "to-elasticsearch" + + def accepted_types(self) -> typing.Tuple: + """ + Returns accepted input types for this stage. + + Returns + ------- + typing.Tuple(`morpheus.pipeline.messages.MessageMeta`, ) + Accepted input types. + + """ + return (MessageMeta, ) + + def supports_cpp_node(self): + """Indicates whether this stage supports a C++ node.""" + return False + + def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair: + + stream = input_stream[0] + + def on_data(meta: MessageMeta): + + self._controller.refresh_client() + + df = meta.copy_dataframe() + if isinstance(df, cudf.DataFrame): + df = df.to_pandas() + + self._controller.df_to_parallel_bulk_write(index=self._index, df=df) + + return meta + + to_elasticsearch = builder.make_node(self.unique_name, + ops.map(on_data), + ops.on_completed(self._controller.close_client)) + + builder.make_edge(stream, to_elasticsearch) + stream = to_elasticsearch + + # Return input unchanged to allow passthrough + return stream, input_stream[1] diff --git a/tests/controllers/test_elasticsearch_controller.py b/tests/controllers/test_elasticsearch_controller.py index d0a69cc9f7..54f498bc5d 100644 --- a/tests/controllers/test_elasticsearch_controller.py +++ b/tests/controllers/test_elasticsearch_controller.py @@ -14,11 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import pickle import time import typing from unittest.mock import patch +import pandas as pd import pytest from morpheus.controllers.elasticsearch_controller import ElasticsearchController @@ -63,25 +63,21 @@ def test_constructor(create_controller: typing.Callable[..., ElasticsearchContro @pytest.mark.use_python -def test_apply_derive_params_func(create_controller: typing.Callable[..., ElasticsearchController]): - pickled_func_str = str(pickle.dumps(mock_derive_params), encoding="latin1") +def test_apply_custom_func(create_controller: typing.Callable[..., ElasticsearchController], connection_kwargs: dict): - # Create pickled_func_config - pickled_func_config = { - "pickled_func_str": pickled_func_str, # Pickled function string - "encoding": "latin1" - } - - controller = create_controller() - # Apply the mock function and check if connection_kwargs is updated - controller._apply_derive_params_func(pickled_func_config) - - assert controller._connection_kwargs == { + expected_connection_kwargs = { "hosts": [{ "host": "localhost", "port": 9200, "scheme": "http" }], "retry_on_status": 3, "retry_on_timeout": 30 } + controller = create_controller(connection_kwargs_update_func=mock_derive_params, + connection_kwargs=connection_kwargs.copy()) + + # Ensure the original connection_kwargs is not modified + assert connection_kwargs != controller._connection_kwargs + assert expected_connection_kwargs == controller._connection_kwargs + @pytest.mark.use_python def test_refresh_client_force(create_controller: typing.Callable[..., ElasticsearchController]): @@ -140,6 +136,30 @@ def test_parallel_bulk_write(mock_parallel_bulk, create_controller: typing.Calla mock_parallel_bulk.assert_called_once() +@pytest.mark.use_python +@patch("morpheus.controllers.elasticsearch_controller.parallel_bulk", return_value=[(True, None)]) +def test_df_to_parallel_bulk_write(mock_parallel_bulk: typing.Callable, + create_controller: typing.Callable[..., ElasticsearchController]): + data = {"field1": ["value1", "value2"], "field2": ["value3", "value4"]} + df = pd.DataFrame(data) + + expected_actions = [{ + "_index": "test_index", "_source": { + "field1": "value1", "field2": "value3" + } + }, { + "_index": "test_index", "_source": { + "field1": "value2", "field2": "value4" + } + }] + + controller = create_controller() + controller.df_to_parallel_bulk_write(index="test_index", df=df) + mock_parallel_bulk.assert_called_once_with(controller._client, + actions=expected_actions, + raise_on_exception=controller._raise_on_exception) + + def test_search_documents_success(create_controller: typing.Callable[..., ElasticsearchController]): controller = create_controller() controller._client.search.return_value = {"hits": {"total": 1, "hits": [{"_source": {"field1": "value1"}}]}} From 0b9bcc1905f7c9ebeaf34b6340fbd30360a192f5 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Thu, 7 Sep 2023 15:29:57 -0500 Subject: [PATCH 08/16] updated write to elasticsearch tests --- .../stages/output/write_to_elasticsearch_stage.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/morpheus/stages/output/write_to_elasticsearch_stage.py b/morpheus/stages/output/write_to_elasticsearch_stage.py index f44bff52ac..66b0b894bc 100644 --- a/morpheus/stages/output/write_to_elasticsearch_stage.py +++ b/morpheus/stages/output/write_to_elasticsearch_stage.py @@ -13,6 +13,7 @@ # limitations under the License. """Write to Elasticsearch stage.""" +import logging import typing import mrc @@ -28,6 +29,8 @@ from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.pipeline.stream_pair import StreamPair +logger = logging.getLogger(__name__) + @register_stage("to-elasticsearch", ignore_args=["connection_kwargs_update_func"]) class WriteToElasticsearchStage(SinglePortStage): @@ -63,8 +66,14 @@ def __init__(self, self._index = index - with open(connection_conf_file, "r", encoding="utf-8") as file: - connection_kwargs = yaml.safe_load(file) + try: + with open(connection_conf_file, "r", encoding="utf-8") as file: + connection_kwargs = yaml.safe_load(file) + except FileNotFoundError as exc: + raise FileNotFoundError( + f"The specified connection configuration file '{connection_conf_file}' does not exist.") from exc + except Exception as exc: + raise RuntimeError(f"An error occurred while loading the configuration file: {exc}") from exc self._controller = ElasticsearchController(connection_kwargs=connection_kwargs, raise_on_exception=raise_on_exception, @@ -103,6 +112,7 @@ def on_data(meta: MessageMeta): df = meta.copy_dataframe() if isinstance(df, cudf.DataFrame): df = df.to_pandas() + logger.debug("Converted cudf of size: %s to pandas dataframe.", len(df)) self._controller.df_to_parallel_bulk_write(index=self._index, df=df) From 2dae8a8d914ae19fa88d2748cf82ddc1f2618266 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Thu, 7 Sep 2023 15:34:03 -0500 Subject: [PATCH 09/16] updated write to elasticsearch tests --- morpheus/stages/output/write_to_elasticsearch_stage.py | 2 +- tests/controllers/test_elasticsearch_controller.py | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/morpheus/stages/output/write_to_elasticsearch_stage.py b/morpheus/stages/output/write_to_elasticsearch_stage.py index 66b0b894bc..a50cd9f93b 100644 --- a/morpheus/stages/output/write_to_elasticsearch_stage.py +++ b/morpheus/stages/output/write_to_elasticsearch_stage.py @@ -105,7 +105,7 @@ def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> Strea stream = input_stream[0] - def on_data(meta: MessageMeta): + def on_data(meta: MessageMeta) -> MessageMeta: self._controller.refresh_client() diff --git a/tests/controllers/test_elasticsearch_controller.py b/tests/controllers/test_elasticsearch_controller.py index 54f498bc5d..c8d0d836ff 100644 --- a/tests/controllers/test_elasticsearch_controller.py +++ b/tests/controllers/test_elasticsearch_controller.py @@ -20,6 +20,7 @@ import pandas as pd import pytest +from elasticsearch import Elasticsearch from morpheus.controllers.elasticsearch_controller import ElasticsearchController @@ -33,19 +34,19 @@ def mock_derive_params(kwargs): @pytest.fixture(scope="function", autouse=True) -def patch_elasticsearch(): +def patch_elasticsearch() -> Elasticsearch: with patch("morpheus.controllers.elasticsearch_controller.Elasticsearch", autospec=True): yield @pytest.fixture(scope="module", name="connection_kwargs") -def connection_kwargs_fixture(): +def connection_kwargs_fixture() -> dict: kwargs = {"hosts": [{"host": "localhost", "port": 9200, "scheme": "http"}]} yield kwargs @pytest.fixture(scope="module", name="create_controller") -def create_controller_fixture(connection_kwargs): +def create_controller_fixture(connection_kwargs) -> typing.Callable[..., ElasticsearchController]: def inner_create_controller(*, connection_kwargs=connection_kwargs, refresh_period_secs=10, **controller_kwargs): return ElasticsearchController(connection_kwargs=connection_kwargs, From 8ec3060a98b92435a6948700f0248195da455d47 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Thu, 7 Sep 2023 23:33:59 -0500 Subject: [PATCH 10/16] Update morpheus/controllers/elasticsearch_controller.py Co-authored-by: Christopher Harris --- morpheus/controllers/elasticsearch_controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/morpheus/controllers/elasticsearch_controller.py b/morpheus/controllers/elasticsearch_controller.py index 33c928363a..9113d7cf0e 100644 --- a/morpheus/controllers/elasticsearch_controller.py +++ b/morpheus/controllers/elasticsearch_controller.py @@ -152,7 +152,7 @@ def search_documents(self, index: str, query: dict, **kwargs) -> dict: def df_to_parallel_bulk_write(self, index: str, df: pd.DataFrame) -> None: """ - Convert dataframe entries to an actions and parallel bulk writes to Elasticsearch. + Converts DataFrames to actions and parallel bulk writes to Elasticsearch. Parameters ---------- From 8e05f42f8a5063d5a8e746bd2055e4395e97700a Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Thu, 7 Sep 2023 23:34:14 -0500 Subject: [PATCH 11/16] Update morpheus/controllers/elasticsearch_controller.py Co-authored-by: Christopher Harris --- morpheus/controllers/elasticsearch_controller.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/morpheus/controllers/elasticsearch_controller.py b/morpheus/controllers/elasticsearch_controller.py index 9113d7cf0e..f7529c5b11 100644 --- a/morpheus/controllers/elasticsearch_controller.py +++ b/morpheus/controllers/elasticsearch_controller.py @@ -162,12 +162,7 @@ def df_to_parallel_bulk_write(self, index: str, df: pd.DataFrame) -> None: DataFrame entries that require writing to Elasticsearch. """ - rows = df.to_dict('records') - - actions = [] - for row in rows: - action = {"_index": index, "_source": row} - actions.append(action) + actions = [{"_index": index, "_source": row} for row in df.to_dict("records")] self.parallel_bulk_write(actions) # Parallel bulk upload to Elasticsearch From f623e2d794ed845ff36ba2d2bd29ae4dd4ac5105 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Fri, 8 Sep 2023 14:43:36 -0500 Subject: [PATCH 12/16] added write_to_elasticsearch stage test --- .../controllers/elasticsearch_controller.py | 22 +--- morpheus/modules/write_to_elasticsearch.py | 16 +-- .../output/write_to_elasticsearch_stage.py | 8 +- .../test_elasticsearch_controller.py | 25 ---- .../test_write_to_elasticsearch_stage_pipe.py | 110 ++++++++++++++++++ 5 files changed, 125 insertions(+), 56 deletions(-) create mode 100644 tests/test_write_to_elasticsearch_stage_pipe.py diff --git a/morpheus/controllers/elasticsearch_controller.py b/morpheus/controllers/elasticsearch_controller.py index f7529c5b11..4eb62f9724 100644 --- a/morpheus/controllers/elasticsearch_controller.py +++ b/morpheus/controllers/elasticsearch_controller.py @@ -14,7 +14,6 @@ import logging import time -import typing import pandas as pd from elasticsearch import ConnectionError as ESConnectionError @@ -36,21 +35,19 @@ class ElasticsearchController: Whether to raise exceptions on Elasticsearch errors. refresh_period_secs : int, optional, default: 2400 The refresh period in seconds for client refreshing. - connection_kwargs_update_func : typing.Callable, optional, default: None - Custom function to update connection parameters. """ - def __init__(self, - connection_kwargs: dict, - raise_on_exception: bool = False, - refresh_period_secs: int = 2400, - connection_kwargs_update_func: typing.Callable = None): + def __init__(self, connection_kwargs: dict, raise_on_exception: bool = False, refresh_period_secs: int = 2400): self._client = None self._last_refresh_time = None self._raise_on_exception = raise_on_exception self._refresh_period_secs = refresh_period_secs - self._connection_kwargs = self._apply_custom_func(connection_kwargs_update_func, connection_kwargs) + + if connection_kwargs is not None and not connection_kwargs: + raise ValueError("Connection kwargs cannot be none or empty.") + + self._connection_kwargs = connection_kwargs logger.debug("Creating Elasticsearch client with configuration: %s", connection_kwargs) @@ -59,13 +56,6 @@ def __init__(self, logger.debug("Elasticsearch cluster info: %s", self._client.info) logger.debug("Creating Elasticsearch client... Done!") - def _apply_custom_func(self, func, connection_kwargs): - # Apply custom function if it's available - if func: - connection_kwargs = func(connection_kwargs) - - return connection_kwargs - def refresh_client(self, force: bool = False) -> bool: """ Refresh the Elasticsearch client instance. diff --git a/morpheus/modules/write_to_elasticsearch.py b/morpheus/modules/write_to_elasticsearch.py index c80fc366ed..521a800ed9 100644 --- a/morpheus/modules/write_to_elasticsearch.py +++ b/morpheus/modules/write_to_elasticsearch.py @@ -55,33 +55,25 @@ def write_to_elasticsearch(builder: mrc.Builder): pickled_func_config = config.get("pickled_func_config", None) refresh_period_secs = config.get("refresh_period_secs", 2400) - connection_kwargs_update_func = None - if pickled_func_config: pickled_func_str = pickled_func_config.get("pickled_func_str") encoding = pickled_func_config.get("encoding") if pickled_func_str and encoding: connection_kwargs_update_func = pickle.loads(bytes(pickled_func_str, encoding)) + connection_kwargs = connection_kwargs_update_func(connection_kwargs) controller = ElasticsearchController(connection_kwargs=connection_kwargs, raise_on_exception=raise_on_exception, - refresh_period_secs=refresh_period_secs, - connection_kwargs_update_func=connection_kwargs_update_func) + refresh_period_secs=refresh_period_secs) def on_data(message: ControlMessage): controller.refresh_client() - meta = message.payload() - rows = meta.df.to_pandas().to_dict('records') - - actions = [] - for row in rows: - action = {"_index": index, "_source": row} - actions.append(action) + df = message.payload().df.to_pandas() - controller.parallel_bulk_write(actions) # Parallel bulk upload to Elasticsearch + controller.df_to_parallel_bulk_write(index=index, df=df) return message diff --git a/morpheus/stages/output/write_to_elasticsearch_stage.py b/morpheus/stages/output/write_to_elasticsearch_stage.py index a50cd9f93b..d0bb062d6c 100644 --- a/morpheus/stages/output/write_to_elasticsearch_stage.py +++ b/morpheus/stages/output/write_to_elasticsearch_stage.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2023, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -75,10 +75,12 @@ def __init__(self, except Exception as exc: raise RuntimeError(f"An error occurred while loading the configuration file: {exc}") from exc + if connection_kwargs_update_func: + connection_kwargs = connection_kwargs_update_func(connection_kwargs) + self._controller = ElasticsearchController(connection_kwargs=connection_kwargs, raise_on_exception=raise_on_exception, - refresh_period_secs=refresh_period_secs, - connection_kwargs_update_func=connection_kwargs_update_func) + refresh_period_secs=refresh_period_secs) @property def name(self) -> str: diff --git a/tests/controllers/test_elasticsearch_controller.py b/tests/controllers/test_elasticsearch_controller.py index c8d0d836ff..f8b448400f 100644 --- a/tests/controllers/test_elasticsearch_controller.py +++ b/tests/controllers/test_elasticsearch_controller.py @@ -25,14 +25,6 @@ from morpheus.controllers.elasticsearch_controller import ElasticsearchController -# Define a mock function for _apply_derive_params_function -def mock_derive_params(kwargs): - kwargs["retry_on_status"] = 3 - kwargs["retry_on_timeout"] = 3 * 10 - - return kwargs - - @pytest.fixture(scope="function", autouse=True) def patch_elasticsearch() -> Elasticsearch: with patch("morpheus.controllers.elasticsearch_controller.Elasticsearch", autospec=True): @@ -63,23 +55,6 @@ def test_constructor(create_controller: typing.Callable[..., ElasticsearchContro assert create_controller()._connection_kwargs == connection_kwargs -@pytest.mark.use_python -def test_apply_custom_func(create_controller: typing.Callable[..., ElasticsearchController], connection_kwargs: dict): - - expected_connection_kwargs = { - "hosts": [{ - "host": "localhost", "port": 9200, "scheme": "http" - }], "retry_on_status": 3, "retry_on_timeout": 30 - } - - controller = create_controller(connection_kwargs_update_func=mock_derive_params, - connection_kwargs=connection_kwargs.copy()) - - # Ensure the original connection_kwargs is not modified - assert connection_kwargs != controller._connection_kwargs - assert expected_connection_kwargs == controller._connection_kwargs - - @pytest.mark.use_python def test_refresh_client_force(create_controller: typing.Callable[..., ElasticsearchController]): controller = create_controller(refresh_period_secs=1) diff --git a/tests/test_write_to_elasticsearch_stage_pipe.py b/tests/test_write_to_elasticsearch_stage_pipe.py new file mode 100644 index 0000000000..45bda15a30 --- /dev/null +++ b/tests/test_write_to_elasticsearch_stage_pipe.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import typing +from unittest.mock import patch + +import pandas as pd +import pytest +import yaml + +import cudf + +from morpheus.config import Config +from morpheus.pipeline.pipeline import Pipeline +from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage +from morpheus.stages.output.write_to_elasticsearch_stage import WriteToElasticsearchStage + + +def connection_kwargs_func(kwargs): + kwargs["retry_on_status"] = 3 + kwargs["retry_on_timeout"] = 3 * 10 + + return kwargs + + +@pytest.fixture(scope="function", name="connection_conf_file") +def connection_conf_file_fixture(tmp_path): + connection_kwargs = {"hosts": [{"host": "localhost", "port": 9201, "scheme": "http"}]} + + connection_conf_file = tmp_path / "connection_kwargs_conf.yaml" + with connection_conf_file.open(mode="w") as file: + yaml.dump(connection_kwargs, file) + + yield connection_conf_file + + +@pytest.mark.use_python +@pytest.mark.parametrize("conf_file, exception", [("connection_conf.yaml", FileNotFoundError), (None, Exception)]) +def test_constructor_invalid_conf_file(config: Config, + conf_file: str, + exception: typing.Union[Exception, FileNotFoundError]): + with pytest.raises(exception): + WriteToElasticsearchStage(config, index="t_index", connection_conf_file=conf_file) + + +@pytest.mark.use_python +@patch("morpheus.controllers.elasticsearch_controller.Elasticsearch") +def test_constructor_with_custom_func(config: Config, connection_conf_file: str): + expected_connection_kwargs = { + "hosts": [{ + "host": "localhost", "port": 9201, "scheme": "http" + }], "retry_on_status": 3, "retry_on_timeout": 30 + } + + stage = WriteToElasticsearchStage(config, + index="t_index", + connection_conf_file=connection_conf_file, + connection_kwargs_update_func=connection_kwargs_func) + + assert stage._controller._connection_kwargs == expected_connection_kwargs + + +@pytest.mark.use_python +@patch("morpheus.stages.output.write_to_elasticsearch_stage.ElasticsearchController") +def test_write_to_elasticsearch_stage_pipe(mock_controller: typing.Any, + connection_conf_file: str, + config: Config, + filter_probs_df: typing.Union[cudf.DataFrame, pd.DataFrame]): + mock_df_to_parallel_bulk_write = mock_controller.return_value.df_to_parallel_bulk_write + mock_refresh_client = mock_controller.return_value.refresh_client + + # Create a pipeline + pipe = Pipeline(config) + + # Add the source stage and the WriteToElasticsearchStage to the pipeline + source_stage = pipe.add_stage(InMemorySourceStage(config, [filter_probs_df])) + + write_to_elasticsearch_stage = pipe.add_stage( + WriteToElasticsearchStage(config, index="t_index", connection_conf_file=connection_conf_file)) + + # Connect the stages in the pipeline + pipe.add_edge(source_stage, write_to_elasticsearch_stage) + + # Run the pipeline + pipe.run() + + if isinstance(filter_probs_df, cudf.DataFrame): + filter_probs_df = filter_probs_df.to_pandas() + + expected_index = mock_df_to_parallel_bulk_write.call_args[1]["index"] + expected_df = mock_df_to_parallel_bulk_write.call_args[1]["df"] + + mock_refresh_client.assert_called_once() + mock_df_to_parallel_bulk_write.assert_called_once() + + assert expected_index == "t_index" + assert expected_df.equals(filter_probs_df) From c6d937ec4d8c72b94fe73751ee29d149aa7b68d5 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 13 Sep 2023 13:40:25 -0500 Subject: [PATCH 13/16] moved refresh_client call to controller impl --- morpheus/controllers/elasticsearch_controller.py | 2 ++ morpheus/modules/write_to_elasticsearch.py | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/morpheus/controllers/elasticsearch_controller.py b/morpheus/controllers/elasticsearch_controller.py index 4eb62f9724..0de58b4a00 100644 --- a/morpheus/controllers/elasticsearch_controller.py +++ b/morpheus/controllers/elasticsearch_controller.py @@ -107,6 +107,8 @@ def parallel_bulk_write(self, actions) -> None: List of actions to perform in parallel. """ + self.refresh_client() + for success, info in parallel_bulk(self._client, actions=actions, raise_on_exception=self._raise_on_exception): if not success: logger.error("Error writing to ElasticSearch: %s", str(info)) diff --git a/morpheus/modules/write_to_elasticsearch.py b/morpheus/modules/write_to_elasticsearch.py index 521a800ed9..9d64a5e186 100644 --- a/morpheus/modules/write_to_elasticsearch.py +++ b/morpheus/modules/write_to_elasticsearch.py @@ -69,8 +69,6 @@ def write_to_elasticsearch(builder: mrc.Builder): def on_data(message: ControlMessage): - controller.refresh_client() - df = message.payload().df.to_pandas() controller.df_to_parallel_bulk_write(index=index, df=df) From 719ba1a70bf7f4c67e0465a61943b7446bc9a900 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 13 Sep 2023 13:48:10 -0500 Subject: [PATCH 14/16] updates to tests --- tests/controllers/test_elasticsearch_controller.py | 4 ---- tests/test_write_to_elasticsearch_stage_pipe.py | 13 ++++--------- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/tests/controllers/test_elasticsearch_controller.py b/tests/controllers/test_elasticsearch_controller.py index f8b448400f..dac43d5aee 100644 --- a/tests/controllers/test_elasticsearch_controller.py +++ b/tests/controllers/test_elasticsearch_controller.py @@ -70,10 +70,7 @@ def test_refresh_client_force(create_controller: typing.Callable[..., Elasticsea @pytest.mark.use_python def test_refresh_client_not_needed(create_controller: typing.Callable[..., ElasticsearchController]): - # Set last refresh time to a recent time - current_time = time.time() controller = create_controller() - controller._last_refresh_time = current_time client = controller._client # Simulate a refresh not needed scenario @@ -82,7 +79,6 @@ def test_refresh_client_not_needed(create_controller: typing.Callable[..., Elast client.close.assert_not_called() assert client.ping.call_count == 1 assert is_refreshed is False - assert controller._last_refresh_time == current_time @pytest.mark.use_python diff --git a/tests/test_write_to_elasticsearch_stage_pipe.py b/tests/test_write_to_elasticsearch_stage_pipe.py index 45bda15a30..4d7b1f1297 100644 --- a/tests/test_write_to_elasticsearch_stage_pipe.py +++ b/tests/test_write_to_elasticsearch_stage_pipe.py @@ -24,7 +24,7 @@ import cudf from morpheus.config import Config -from morpheus.pipeline.pipeline import Pipeline +from morpheus.pipeline.linear_pipeline import LinearPipeline from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage from morpheus.stages.output.write_to_elasticsearch_stage import WriteToElasticsearchStage @@ -83,16 +83,11 @@ def test_write_to_elasticsearch_stage_pipe(mock_controller: typing.Any, mock_refresh_client = mock_controller.return_value.refresh_client # Create a pipeline - pipe = Pipeline(config) + pipe = LinearPipeline(config) # Add the source stage and the WriteToElasticsearchStage to the pipeline - source_stage = pipe.add_stage(InMemorySourceStage(config, [filter_probs_df])) - - write_to_elasticsearch_stage = pipe.add_stage( - WriteToElasticsearchStage(config, index="t_index", connection_conf_file=connection_conf_file)) - - # Connect the stages in the pipeline - pipe.add_edge(source_stage, write_to_elasticsearch_stage) + pipe.add_stage(InMemorySourceStage(config, [filter_probs_df])) + pipe.add_stage(WriteToElasticsearchStage(config, index="t_index", connection_conf_file=connection_conf_file)) # Run the pipeline pipe.run() From b284048865938837336e13521d179fb631fab982 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Mon, 18 Sep 2023 11:23:46 -0500 Subject: [PATCH 15/16] Added refresh client call to search documents --- morpheus/controllers/elasticsearch_controller.py | 1 + 1 file changed, 1 insertion(+) diff --git a/morpheus/controllers/elasticsearch_controller.py b/morpheus/controllers/elasticsearch_controller.py index 0de58b4a00..1c4bca4dfa 100644 --- a/morpheus/controllers/elasticsearch_controller.py +++ b/morpheus/controllers/elasticsearch_controller.py @@ -133,6 +133,7 @@ def search_documents(self, index: str, query: dict, **kwargs) -> dict: """ try: + self.refresh_client() result = self._client.search(index=index, query=query, **kwargs) return result except Exception as exc: From f62bdb73181d6109a46800c6a7c7e8fd8526f0e7 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Mon, 18 Sep 2023 12:55:39 -0500 Subject: [PATCH 16/16] updated elasticsearch sink tests --- tests/test_write_to_elasticsearch_stage_pipe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_write_to_elasticsearch_stage_pipe.py b/tests/test_write_to_elasticsearch_stage_pipe.py index 4d7b1f1297..07c9929526 100644 --- a/tests/test_write_to_elasticsearch_stage_pipe.py +++ b/tests/test_write_to_elasticsearch_stage_pipe.py @@ -86,7 +86,7 @@ def test_write_to_elasticsearch_stage_pipe(mock_controller: typing.Any, pipe = LinearPipeline(config) # Add the source stage and the WriteToElasticsearchStage to the pipeline - pipe.add_stage(InMemorySourceStage(config, [filter_probs_df])) + pipe.set_source(InMemorySourceStage(config, [filter_probs_df])) pipe.add_stage(WriteToElasticsearchStage(config, index="t_index", connection_conf_file=connection_conf_file)) # Run the pipeline