Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward-merge branch-23.07 to branch-23.11 [resolved conflicts] #1246

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ci/conda/recipes/morpheus/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ outputs:
- docker-py 5.0.*
- grpcio # Version determined from cudf
- libmrc
- mlflow >=2.2.1,<3
- libwebp>=1.3.2 # Required for CVE mitigation: https://nvd.nist.gov/vuln/detail/CVE-2023-4863
- mlflow>=2.2.1,<3
- mrc
- networkx 3.1.*
- numpydoc 1.4.*
Expand Down
4 changes: 4 additions & 0 deletions ci/scripts/github/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ function update_conda_env() {

if [[ "${SKIP_CONDA_ENV_UPDATE}" == "" ]]; then
rapids-logger "Checking for updates to conda env"

# Remove default/conflicting channels from base image
rm /opt/conda/.condarc

# Update the packages
rapids-mamba-retry env update -n morpheus --prune -q --file ${ENV_YAML}
fi
Expand Down
1 change: 1 addition & 0 deletions docker/conda/environments/cuda11.8_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ dependencies:
- jupyterlab
- libgrpc>=1.49
- librdkafka=1.9.2
- libwebp>=1.3.2 # Required for CVE mitigation: https://nvd.nist.gov/vuln/detail/CVE-2023-4863
- mlflow>=2.2.1,<3
dagardner-nv marked this conversation as resolved.
Show resolved Hide resolved
- mrc=23.11
- networkx=3.1
Expand Down
1 change: 1 addition & 0 deletions docker/conda/environments/cuda11.8_examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies:
- dgl=1.0.2
- dill=0.3.6
- distributed>=2023.1.1
- libwebp>=1.3.2 # Required for CVE mitigation: https://nvd.nist.gov/vuln/detail/CVE-2023-4863
- mlflow>=2.2.1,<3
- papermill=2.3.4
- s3fs>=2023.6
1 change: 1 addition & 0 deletions models/mlflow/docker/conda/mlflow-env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ channels:
dependencies:
- boto3
- onnx
- libwebp>=1.3.2 # Required for CVE mitigation: https://nvd.nist.gov/vuln/detail/CVE-2023-4863
- psycopg2<3
- pymysql
- python=3.11
46 changes: 25 additions & 21 deletions morpheus/loaders/sql_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy import engine

import cudf

Expand All @@ -40,30 +41,27 @@ def _parse_query_data(
Parameters
----------
query_data : Dict[str, Union[str, Optional[Dict[str, Any]]]]
The dictionary containing the connection string, query, and params (optional).
The dictionary containing the query, and params (optional).

Returns
-------
Dict[str, Union[str, Optional[Dict[str, Any]]]]
A dictionary containing parsed connection string, query, and params (if present).
"""

return {
"connection_string": query_data["connection_string"],
"query": query_data["query"],
"params": query_data.get("params", None)
}
return {"query": query_data["query"], "params": query_data.get("params", None)}


def _read_sql(connection_string: str, query: str, params: typing.Optional[typing.Dict[str, typing.Any]] = None) -> \
typing.Dict[str, pd.DataFrame]:
def _read_sql(engine_obj: engine.Engine,
query: str,
params: typing.Optional[typing.Dict[str, typing.Any]] = None) -> typing.Dict[str, pd.DataFrame]:
"""
Creates a DataFrame from a SQL query.

Parameters
----------
connection_string : str
Connection string to the database.
engine_obj : engine.Engine
SQL engine instance.
query : str
SQL query.
params : Optional[Dict[str, Any]], default=None
Expand All @@ -75,14 +73,10 @@ def _read_sql(connection_string: str, query: str, params: typing.Optional[typing
A dictionary containing a DataFrame of the SQL query result.
"""

# TODO(Devin): PERFORMANCE OPTIMIZATION
# TODO(Devin): Add connection pooling -- Probably needs to go on the actual loader
engine = create_engine(connection_string)

if (params is None):
df = pd.read_sql(query, engine)
df = pd.read_sql(query, engine_obj)
else:
df = pd.read_sql(query, engine, params=params)
df = pd.read_sql(query, engine_obj, params=params)

return {"df": df}

Expand Down Expand Up @@ -132,14 +126,24 @@ def sql_loader(control_message: ControlMessage, task: typing.Dict[str, typing.An

with CMDefaultFailureContextManager(control_message):
final_df = None
engine_registry = {}

sql_config = task["sql_config"]
queries = sql_config["queries"]

for query_data in queries:
aggregate_df = functools.partial(_aggregate_df, df_aggregate=final_df)
execution_chain = ExecutionChain(function_chain=[_parse_query_data, _read_sql, aggregate_df])
final_df = execution_chain(query_data=query_data)
try:
for query_data in queries:
conn_str = query_data.pop("connection_string")
if conn_str not in engine_registry:
engine_registry[conn_str] = create_engine(conn_str)

aggregate_df = functools.partial(_aggregate_df, df_aggregate=final_df)
read_sql = functools.partial(_read_sql, engine_obj=engine_registry[conn_str])
execution_chain = ExecutionChain(function_chain=[_parse_query_data, read_sql, aggregate_df])
final_df = execution_chain(query_data=query_data)
finally:
# Dispose all open connections.
for engine_obj in engine_registry.values():
engine_obj.dispose()

control_message.payload(MessageMeta(final_df))

Expand Down
7 changes: 4 additions & 3 deletions morpheus/utils/control_message_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,11 @@ def cm_default_failure_context_manager(raise_on_failure: bool = False) -> typing
def decorator(func):

@wraps(func)
def wrapper(control_messsage: ControlMessage, *args, **kwargs):
with CMDefaultFailureContextManager(control_message=control_messsage,
def wrapper(control_message: ControlMessage, *args, **kwargs):
ret_cm = control_message
with CMDefaultFailureContextManager(control_message=control_message,
raise_on_failure=raise_on_failure) as ctx_mgr:
cm_ensure_payload_not_null(control_message=control_messsage)
cm_ensure_payload_not_null(control_message=control_message)
ret_cm = func(ctx_mgr.control_message, *args, **kwargs)

return ret_cm
Expand Down