Skip to content

Commit

Permalink
Elasticsearch Sink Module (#1163)
Browse files Browse the repository at this point in the history
- Added Elasticsearch controller to establish client connection and read and write to Elasticsearch.
- Added tests

Closes #902

Authors:
  - Bhargav Suryadevara (https://github.com/bsuryadevara)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1163
  • Loading branch information
bsuryadevara authored Sep 19, 2023
1 parent aab0d96 commit 9174cec
Show file tree
Hide file tree
Showing 12 changed files with 702 additions and 39 deletions.
1 change: 1 addition & 0 deletions docker/conda/environments/cuda11.8_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies:
- dill
- docker-py=5.0
- docutils
- elasticsearch==8.9.0
- faker=12.3.0
- feedparser=6.0.10
- flake8
Expand Down
35 changes: 0 additions & 35 deletions docs/source/modules/core/multiplexer.md

This file was deleted.

45 changes: 45 additions & 0 deletions docs/source/modules/core/write_to_elasticsearch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<!--
SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

## Write to Elasticsearch Module

This module reads an input data stream, converts each row of data to a document format suitable for Elasticsearch, and writes the documents to the specified Elasticsearch index using the Elasticsearch bulk API.

### Configurable Parameters

| Parameter | Type | Description | Example Value | Default Value |
|-------------------------|--------------|---------------------------------------------------------------------------------------------------------|-------------------------------|---------------|
| `index` | str | Elasticsearch index. | "my_index" | `[Required]` |
| `connection_kwargs` | dict | Elasticsearch connection kwargs configuration. | {"hosts": ["host": "localhost", ...} | `[Required]` |
| `raise_on_exception` | bool | Raise or suppress exceptions when writing to Elasticsearch. | true | `false` |
| `pickled_func_config` | str | Pickled custom function configuration to update connection_kwargs as needed for the client connection. | See below | None |
| `refresh_period_secs` | int | Time in seconds to refresh the client connection. | 3600 | `2400` |

### Example JSON Configuration

```json
{
"index": "test_index",
"connection_kwargs": {"hosts": [{"host": "localhost", "port": 9200, "scheme": "http"}]},
"raise_on_exception": true,
"pickled_func_config": {
"pickled_func_str": "pickled function as a string",
"encoding": "latin1"
},
"refresh_period_secs": 2400
}
```
2 changes: 1 addition & 1 deletion docs/source/modules/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ limitations under the License.
./core/filter_detections.md
./core/from_control_message.md
./core/mlflow_model_writer.md
./core/multiplexer.md
./core/payload_batcher.md
./core/serialize.md
./core/to_control_message.md
./core/write_to_elasticsearch.md
./core/write_to_file.md
```

Expand Down
3 changes: 3 additions & 0 deletions morpheus/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,9 @@ def post_pipeline(ctx: click.Context, *args, **kwargs):
add_command("preprocess", "morpheus.stages.preprocess.preprocess_nlp_stage.PreprocessNLPStage", modes=NLP_ONLY)
add_command("serialize", "morpheus.stages.postprocess.serialize_stage.SerializeStage", modes=ALL)
add_command("timeseries", "morpheus.stages.postprocess.timeseries_stage.TimeSeriesStage", modes=AE_ONLY)
add_command("to-elasticsearch",
"morpheus.stages.output.write_to_elasticsearch_stage.WriteToElasticsearchStage",
modes=ALL)
add_command("to-file", "morpheus.stages.output.write_to_file_stage.WriteToFileStage", modes=ALL)
add_command("to-kafka", "morpheus.stages.output.write_to_kafka_stage.WriteToKafkaStage", modes=ALL)
add_command("to-http", "morpheus.stages.output.http_client_sink_stage.HttpClientSinkStage", modes=ALL)
Expand Down
166 changes: 166 additions & 0 deletions morpheus/controllers/elasticsearch_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import time

import pandas as pd
from elasticsearch import ConnectionError as ESConnectionError
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk

logger = logging.getLogger(__name__)


class ElasticsearchController:
"""
ElasticsearchController to perform read and write operations using Elasticsearch service.
Parameters
----------
connection_kwargs : dict
Keyword arguments to configure the Elasticsearch connection.
raise_on_exception : bool, optional, default: False
Whether to raise exceptions on Elasticsearch errors.
refresh_period_secs : int, optional, default: 2400
The refresh period in seconds for client refreshing.
"""

def __init__(self, connection_kwargs: dict, raise_on_exception: bool = False, refresh_period_secs: int = 2400):

self._client = None
self._last_refresh_time = None
self._raise_on_exception = raise_on_exception
self._refresh_period_secs = refresh_period_secs

if connection_kwargs is not None and not connection_kwargs:
raise ValueError("Connection kwargs cannot be none or empty.")

self._connection_kwargs = connection_kwargs

logger.debug("Creating Elasticsearch client with configuration: %s", connection_kwargs)

self.refresh_client(force=True)

logger.debug("Elasticsearch cluster info: %s", self._client.info)
logger.debug("Creating Elasticsearch client... Done!")

def refresh_client(self, force: bool = False) -> bool:
"""
Refresh the Elasticsearch client instance.
Parameters
----------
force : bool, optional, default: False
Force a client refresh.
Returns
-------
bool
Returns true if client is refreshed, otherwise false.
"""

is_refreshed = False
time_now = time.time()
if force or self._client is None or time_now - self._last_refresh_time >= self._refresh_period_secs:
if self._client:
try:
# Close the existing client
self.close_client()
except Exception as ex:
logger.warning("Ignoring client close error: %s", ex)
logger.debug("Refreshing Elasticsearch client....")

# Create Elasticsearch client
self._client = Elasticsearch(**self._connection_kwargs)

# Check if the client is connected
if self._client.ping():
logger.debug("Elasticsearch client is connected.")
else:
raise ESConnectionError("Elasticsearch client is not connected.")

logger.debug("Refreshing Elasticsearch client.... Done!")
self._last_refresh_time = time.time()
is_refreshed = True

return is_refreshed

def parallel_bulk_write(self, actions) -> None:
"""
Perform parallel bulk writes to Elasticsearch.
Parameters
----------
actions : list
List of actions to perform in parallel.
"""

self.refresh_client()

for success, info in parallel_bulk(self._client, actions=actions, raise_on_exception=self._raise_on_exception):
if not success:
logger.error("Error writing to ElasticSearch: %s", str(info))

def search_documents(self, index: str, query: dict, **kwargs) -> dict:
"""
Search for documents in Elasticsearch based on the given query.
Parameters
----------
index : str
The name of the index to search.
query : dict
The DSL query for the search.
**kwargs
Additional keyword arguments that are supported by the Elasticsearch search method.
Returns
-------
dict
The search result returned by Elasticsearch.
"""

try:
self.refresh_client()
result = self._client.search(index=index, query=query, **kwargs)
return result
except Exception as exc:
logger.error("Error searching documents: %s", exc)
if self._raise_on_exception:
raise RuntimeError(f"Error searching documents: {exc}") from exc

return {}

def df_to_parallel_bulk_write(self, index: str, df: pd.DataFrame) -> None:
"""
Converts DataFrames to actions and parallel bulk writes to Elasticsearch.
Parameters
----------
index : str
The name of the index to write.
df : pd.DataFrame
DataFrame entries that require writing to Elasticsearch.
"""

actions = [{"_index": index, "_source": row} for row in df.to_dict("records")]

self.parallel_bulk_write(actions) # Parallel bulk upload to Elasticsearch

def close_client(self) -> None:
"""
Close the Elasticsearch client connection.
"""
self._client.close()
6 changes: 4 additions & 2 deletions morpheus/modules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""
Morpheus module definitions, each module is automatically registered when imported
"""
from morpheus._lib import modules
# When segment modules are imported, they're added to the module registry.
# To avoid flake8 warnings about unused code, the noqa flag is used during import.
from morpheus.modules import file_batcher
Expand All @@ -26,8 +27,8 @@
from morpheus.modules import payload_batcher
from morpheus.modules import serialize
from morpheus.modules import to_control_message
from morpheus.modules import write_to_elasticsearch
from morpheus.modules import write_to_file
from morpheus._lib import modules

__all__ = [
"file_batcher",
Expand All @@ -41,5 +42,6 @@
"payload_batcher",
"serialize",
"to_control_message",
"write_to_file"
"write_to_file",
"write_to_elasticsearch"
]
82 changes: 82 additions & 0 deletions morpheus/modules/write_to_elasticsearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import pickle

import mrc
from mrc.core import operators as ops

from morpheus.controllers.elasticsearch_controller import ElasticsearchController
from morpheus.messages import ControlMessage
from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE
from morpheus.utils.module_ids import WRITE_TO_ELASTICSEARCH
from morpheus.utils.module_utils import register_module

logger = logging.getLogger(__name__)


@register_module(WRITE_TO_ELASTICSEARCH, MORPHEUS_MODULE_NAMESPACE)
def write_to_elasticsearch(builder: mrc.Builder):
"""
This module reads input data stream, converts each row of data to a document format suitable for Elasticsearch,
and writes the documents to the specified Elasticsearch index using the Elasticsearch bulk API.
Parameters
----------
builder : mrc.Builder
An mrc Builder object.
"""

config = builder.get_current_module_config()

index = config.get("index", None)

if index is None:
raise ValueError("Index must not be None.")

connection_kwargs = config.get("connection_kwargs")

if not isinstance(connection_kwargs, dict):
raise ValueError(f"Expects `connection_kwargs` as a dictionary, but it is of type {type(connection_kwargs)}")

raise_on_exception = config.get("raise_on_exception", False)
pickled_func_config = config.get("pickled_func_config", None)
refresh_period_secs = config.get("refresh_period_secs", 2400)

if pickled_func_config:
pickled_func_str = pickled_func_config.get("pickled_func_str")
encoding = pickled_func_config.get("encoding")

if pickled_func_str and encoding:
connection_kwargs_update_func = pickle.loads(bytes(pickled_func_str, encoding))
connection_kwargs = connection_kwargs_update_func(connection_kwargs)

controller = ElasticsearchController(connection_kwargs=connection_kwargs,
raise_on_exception=raise_on_exception,
refresh_period_secs=refresh_period_secs)

def on_data(message: ControlMessage):

df = message.payload().df.to_pandas()

controller.df_to_parallel_bulk_write(index=index, df=df)

return message

node = builder.make_node(WRITE_TO_ELASTICSEARCH, ops.map(on_data), ops.on_completed(controller.close_client))

# Register input and output port for a module.
builder.register_module_input("input", node)
builder.register_module_output("output", node)
Loading

0 comments on commit 9174cec

Please sign in to comment.