Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch Sink Module #1163

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e57df85
added elasticsearch sink
bsuryadevara Aug 30, 2023
c43ddd0
Merge remote-tracking branch 'upstream/branch-23.11' into 902-fea-add…
bsuryadevara Aug 30, 2023
efab6e3
added elasticsearch dependecy
bsuryadevara Aug 31, 2023
6a8631c
Merge branch 'branch-23.11' into 902-fea-add-elasticsearch-sink-module
bsuryadevara Aug 31, 2023
d3004a5
updated docs
bsuryadevara Aug 31, 2023
963c28d
Merge branch '902-fea-add-elasticsearch-sink-module' of github.com:bs…
bsuryadevara Aug 31, 2023
2fe5d64
Merge branch 'branch-23.11' into 902-fea-add-elasticsearch-sink-module
bsuryadevara Aug 31, 2023
9be0de0
Moved controllers module one level up
bsuryadevara Aug 31, 2023
4ea1067
Merge branch 'branch-23.11' into 902-fea-add-elasticsearch-sink-module
bsuryadevara Sep 5, 2023
386fc62
Merge remote-tracking branch 'upstream/branch-23.11' into 902-fea-add…
bsuryadevara Sep 7, 2023
5a7fded
updated write to elasticsearch tests
bsuryadevara Sep 7, 2023
ff58148
Merge branch '902-fea-add-elasticsearch-sink-module' of github.com:bs…
bsuryadevara Sep 7, 2023
6c02b71
updated write to elasticsearch tests
bsuryadevara Sep 7, 2023
347f5f2
updated write to elasticsearch tests
bsuryadevara Sep 7, 2023
0b9bcc1
updated write to elasticsearch tests
bsuryadevara Sep 7, 2023
2dae8a8
updated write to elasticsearch tests
bsuryadevara Sep 7, 2023
8ec3060
Update morpheus/controllers/elasticsearch_controller.py
bsuryadevara Sep 8, 2023
8e05f42
Update morpheus/controllers/elasticsearch_controller.py
bsuryadevara Sep 8, 2023
f623e2d
added write_to_elasticsearch stage test
bsuryadevara Sep 8, 2023
a2b0c0e
Merge branch 'branch-23.11' into 902-fea-add-elasticsearch-sink-module
bsuryadevara Sep 8, 2023
c6d937e
moved refresh_client call to controller impl
bsuryadevara Sep 13, 2023
719ba1a
updates to tests
bsuryadevara Sep 13, 2023
b26e418
Merge branch 'branch-23.11' into 902-fea-add-elasticsearch-sink-module
bsuryadevara Sep 18, 2023
b284048
Added refresh client call to search documents
bsuryadevara Sep 18, 2023
d471e96
Merge branch '902-fea-add-elasticsearch-sink-module' of github.com:bs…
bsuryadevara Sep 18, 2023
f62bdb7
updated elasticsearch sink tests
bsuryadevara Sep 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/conda/environments/cuda11.8_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies:
- dill
- docker-py=5.0
- docutils
- elasticsearch==8.9.0
- faker=12.3.0
- flake8
- flatbuffers=2.0
Expand Down
35 changes: 0 additions & 35 deletions docs/source/modules/core/multiplexer.md

This file was deleted.

45 changes: 45 additions & 0 deletions docs/source/modules/core/write_to_elasticsearch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<!--
SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

## Write to Elasticsearch Module

This module reads an input data stream, converts each row of data to a document format suitable for Elasticsearch, and writes the documents to the specified Elasticsearch index using the Elasticsearch bulk API.

### Configurable Parameters

| Parameter | Type | Description | Example Value | Default Value |
|-------------------------|--------------|---------------------------------------------------------------------------------------------------------|-------------------------------|---------------|
| `index` | str | Elasticsearch index. | "my_index" | `[Required]` |
| `connection_kwargs` | dict | Elasticsearch connection kwargs configuration. | {"hosts": ["host": "localhost", ...} | `[Required]` |
| `raise_on_exception` | bool | Raise or suppress exceptions when writing to Elasticsearch. | true | `false` |
| `pickled_func_config` | str | Pickled custom function configuration to update connection_kwargs as needed for the client connection. | See below | None |
| `refresh_period_secs` | int | Time in seconds to refresh the client connection. | 3600 | `2400` |

### Example JSON Configuration

```json
{
"index": "test_index",
"connection_kwargs": {"hosts": [{"host": "localhost", "port": 9200, "scheme": "http"}]},
"raise_on_exception": true,
"pickled_func_config": {
"pickled_func_str": "pickled function as a string",
"encoding": "latin1"
},
"refresh_period_secs": 2400
}
```
2 changes: 1 addition & 1 deletion docs/source/modules/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ limitations under the License.
./core/filter_detections.md
./core/from_control_message.md
./core/mlflow_model_writer.md
./core/multiplexer.md
./core/payload_batcher.md
./core/serialize.md
./core/to_control_message.md
./core/write_to_elasticsearch.md
./core/write_to_file.md
```

Expand Down
12 changes: 12 additions & 0 deletions morpheus/controllers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
166 changes: 166 additions & 0 deletions morpheus/controllers/elasticsearch_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import pickle
import time

from elasticsearch import ConnectionError as ESConnectionError
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk

logger = logging.getLogger(__name__)


class ElasticsearchController:
"""
ElasticsearchController to perform read and write operations using Elasticsearch service.

Parameters
----------
index : str
The name of the index to interact with.
connection_kwargs : dict
Keyword arguments to configure the Elasticsearch connection.
raise_on_exception : bool, optional
Whether to raise exceptions on Elasticsearch errors.
refresh_period_secs : int, optional
The refresh period in seconds for client refreshing.
pickled_func_config : dict, optional
Configuration for a pickled function to modify connection parameters.
"""

def __init__(self,
index: str,
connection_kwargs: dict,
raise_on_exception: bool = False,
refresh_period_secs: int = 2400,
pickled_func_config: dict = None):

self._index = index
self._connection_kwargs = connection_kwargs
self._raise_on_exception = raise_on_exception
self._refresh_period_secs = refresh_period_secs
self._apply_derive_params_func(pickled_func_config)

logger.debug("Creating Elasticsearch client with configuration: %s", connection_kwargs)

# Create ElasticSearch client
self._client = Elasticsearch(**self._connection_kwargs)

# Check if the client is connected
is_connected = self._client.ping()

if is_connected:
logger.debug("Elasticsearch client is connected.")
else:
logger.debug("Elasticsearch client is not connected.")
mdemoret-nv marked this conversation as resolved.
Show resolved Hide resolved
raise ESConnectionError("Elasticsearch client is not connected.")
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved

self._last_refresh_time = time.time()

logger.debug("Elasticsearch cluster info: %s", self._client.info)
logger.debug("Creating Elasticsearch client... Done!")
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved

@property
def client(self) -> Elasticsearch:
"""
Get the active Elasticsearch client instance.

Returns
-------
Elasticsearch
The active Elasticsearch client.
"""
return self._client
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved

def _apply_derive_params_func(self, pickled_func_config) -> None:
if pickled_func_config:
pickled_func_str = pickled_func_config["pickled_func_str"]
encoding = pickled_func_config["encoding"]
func = pickle.loads(bytes(pickled_func_str, encoding))
self._connection_kwargs = func(self._connection_kwargs)

def refresh_client(self, force=False) -> None:
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
"""
Refresh the Elasticsearch client instance.

Parameters
----------
force : bool, optional
Force a client refresh.
"""
if force or self._client is None or time.time() - self._last_refresh_time >= self._refresh_period_secs:
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
if not force and self._client:
try:
# Close the existing client
self.close_client()
except Exception as ex:
logger.warning("Ignoring client close error: %s", ex)
logger.debug("Refreshing Elasticsearch client....")
self._client = Elasticsearch(**self._connection_kwargs)
logger.debug("Refreshing Elasticsearch client.... Done!")

self._last_refresh_time = time.time()

def parallel_bulk_write(self, actions) -> None:
"""
Perform parallel bulk writes to Elasticsearch.

Parameters
----------
actions : list
List of actions to perform in parallel.
"""

for success, info in parallel_bulk(self._client, actions=actions, raise_on_exception=self._raise_on_exception):
if not success:
logger.error("Error writing to ElasticSearch: %s", str(info))

def search_documents(self, query: dict, index: str = None, **kwargs) -> dict:
"""
Search for documents in Elasticsearch based on the given query.

Parameters
----------
query : dict
The query DSL (Domain Specific Language) for the search.
index : str, optional
The name of the index to search. If not provided, the instance's index will be used.
**kwargs
Additional keyword arguments that are supported by the Elasticsearch search method.

Returns
-------
dict
The search result returned by Elasticsearch.
"""
if index is None:
bsuryadevara marked this conversation as resolved.
Show resolved Hide resolved
index = self._index

try:
result = self._client.search(index=index, query=query, **kwargs)
return result
except Exception as exc:
logger.error("Error searching documents: %s", exc)
if self._raise_on_exception:
raise RuntimeError(f"Error searching documents: {exc}") from exc

return {}

def close_client(self):
"""
Close the Elasticsearch client connection.
"""
self._client.close()
mdemoret-nv marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 4 additions & 2 deletions morpheus/modules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""
Morpheus module definitions, each module is automatically registered when imported
"""
from morpheus._lib import modules
# When segment modules are imported, they're added to the module registry.
# To avoid flake8 warnings about unused code, the noqa flag is used during import.
from morpheus.modules import file_batcher
Expand All @@ -26,8 +27,8 @@
from morpheus.modules import payload_batcher
from morpheus.modules import serialize
from morpheus.modules import to_control_message
from morpheus.modules import write_to_elasticsearch
from morpheus.modules import write_to_file
from morpheus._lib import modules

__all__ = [
"file_batcher",
Expand All @@ -41,5 +42,6 @@
"payload_batcher",
"serialize",
"to_control_message",
"write_to_file"
"write_to_file",
"write_to_elasticsearch"
]
78 changes: 78 additions & 0 deletions morpheus/modules/write_to_elasticsearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

import mrc
from mrc.core import operators as ops

from morpheus.controllers.elasticsearch_controller import ElasticsearchController
from morpheus.messages import ControlMessage
from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE
from morpheus.utils.module_ids import WRITE_TO_ELASTICSEARCH
from morpheus.utils.module_utils import register_module

logger = logging.getLogger(__name__)


@register_module(WRITE_TO_ELASTICSEARCH, MORPHEUS_MODULE_NAMESPACE)
def write_to_elasticsearch(builder: mrc.Builder):
"""
This module reads input data stream, converts each row of data to a document format suitable for Elasticsearch,
and writes the documents to the specified Elasticsearch index using the Elasticsearch bulk API.

Parameters
----------
builder : mrc.Builder
An mrc Builder object.
"""

config = builder.get_current_module_config()

index = config.get("index", None)
connection_kwargs = config.get("connection_kwargs")
raise_on_exception = config.get("raise_on_exception", False)
pickled_func_config = config.get("pickled_func_config", None)
refresh_period_secs = config.get("refresh_period_secs", 2400)

controller = ElasticsearchController(index=index,
connection_kwargs=connection_kwargs,
raise_on_exception=raise_on_exception,
refresh_period_secs=refresh_period_secs,
pickled_func_config=pickled_func_config)

def on_data(message: ControlMessage):

controller.refresh_client()

meta = message.payload()
rows = meta.df.to_pandas().to_dict('records')

actions = []
for row in rows:
action = {"_index": index, "_source": row}
actions.append(action)

controller.parallel_bulk_write(actions) # Parallel bulk upload to Elasticsearch

return message

def on_complete():
controller.close_client() # Close client
mdemoret-nv marked this conversation as resolved.
Show resolved Hide resolved

node = builder.make_node(WRITE_TO_ELASTICSEARCH, ops.map(on_data), ops.on_completed(on_complete))

# Register input and output port for a module.
builder.register_module_input("input", node)
builder.register_module_output("output", node)
2 changes: 1 addition & 1 deletion morpheus/utils/module_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
FILTER_DETECTIONS = "FilterDetections"
FROM_CONTROL_MESSAGE = "FromControlMessage"
MLFLOW_MODEL_WRITER = "MLFlowModelWriter"
MULTIPLEXER = "Multiplexer"
SERIALIZE = "Serialize"
TO_CONTROL_MESSAGE = "ToControlMessage"
WRITE_TO_FILE = "WriteToFile"
FILTER_CM_FAILED = "FilterCmFailed"
PAYLOAD_BATCHER = "PayloadBatcher"
WRITE_TO_ELASTICSEARCH = "WriteToElasticsearch"
Loading