Skip to content

Commit

Permalink
Update dfp_training stage to support ControlMessages or MultiDFPMessa…
Browse files Browse the repository at this point in the history
…ges (#1155)

Closes #1148

This addresses relevant issues with 1148, additionally, in order to make the .fit interface work in a more unified way, we've merged and simplified the _fit_centralized and _fit_distributed code paths, and eliminated large portions of dead code.

Authors:
  - Devin Robison (https://github.com/drobison00)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)
  - Christopher Harris (https://github.com/cwharris)

URL: #1155
  • Loading branch information
drobison00 authored Sep 21, 2023
1 parent 3d9020d commit a88276e
Show file tree
Hide file tree
Showing 13 changed files with 387 additions and 615 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def on_data(control_message: ControlMessage):
run_validation = True

logger.debug("Training AE model for user: '%s'...", user_id)
model.fit(train_df, epochs=epochs, val_data=validation_df, run_validation=run_validation)
model.fit(train_df, epochs=epochs, validation_data=validation_df, run_validation=run_validation)
logger.debug("Training AE model for user: '%s'... Complete.", user_id)

dfp_mm = DFPMessageMeta(cudf.from_pandas(final_df), user_id=user_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Training stage for the DFP pipeline."""

import base64
import logging
import pickle
import typing

import mrc
from mrc.core import operators as ops
from sklearn.model_selection import train_test_split

from morpheus.config import Config
from morpheus.messages import ControlMessage
from morpheus.messages.multi_ae_message import MultiAEMessage
from morpheus.models.dfencoder import AutoEncoder
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair

from ..messages.multi_dfp_message import DFPMessageMeta
from ..messages.multi_dfp_message import MultiDFPMessage

logger = logging.getLogger(f"morpheus.{__name__}")
Expand Down Expand Up @@ -87,10 +90,43 @@ def supports_cpp_node(self):

def accepted_types(self) -> typing.Tuple:
"""Indicate which input message types this stage accepts."""
return (MultiDFPMessage, )
return (
ControlMessage,
MultiDFPMessage,
)

def _dfp_multimessage_from_control_message(self,
control_message: ControlMessage) -> typing.Union[MultiDFPMessage, None]:
"""Create a MultiDFPMessage from a ControlMessage."""
ctrl_msg_user_id = control_message.get_metadata("user_id")
message_meta = control_message.payload()

if (ctrl_msg_user_id is None or message_meta is None):
return None

with message_meta.mutable_dataframe() as dfm:
msg_meta_df = dfm.to_pandas()

msg_meta = DFPMessageMeta(msg_meta_df, user_id=str(ctrl_msg_user_id))
message = MultiDFPMessage(meta=msg_meta, mess_offset=0, mess_count=len(msg_meta_df))

return message

@typing.overload
def on_data(self, message: ControlMessage) -> ControlMessage:
...

@typing.overload
def on_data(self, message: MultiDFPMessage) -> MultiAEMessage:
...

def on_data(self, message):
"""Train the model and attach it to the output message."""
received_control_message = False
if (isinstance(message, ControlMessage)):
message = self._dfp_multimessage_from_control_message(message)
received_control_message = True

if (message is None or message.mess_count == 0):
return None

Expand All @@ -111,18 +147,30 @@ def on_data(self, message: MultiDFPMessage) -> MultiAEMessage:
run_validation = True

logger.debug("Training AE model for user: '%s'...", user_id)
model.fit(train_df, epochs=self._epochs, val_data=validation_df, run_validation=run_validation)
model.fit(train_df, epochs=self._epochs, validation_data=validation_df, run_validation=run_validation)
logger.debug("Training AE model for user: '%s'... Complete.", user_id)

output_message = MultiAEMessage(meta=message.meta,
mess_offset=message.mess_offset,
mess_count=message.mess_count,
model=model)
if (received_control_message):
output_message = ControlMessage(message.meta)
output_message.set_metadata("user_id", user_id)

pickled_model_bytes = pickle.dumps(model)
pickled_model_base64_str = base64.b64encode(pickled_model_bytes).decode('utf-8')
output_message.set_metadata("model", pickled_model_base64_str)
else:
output_message = MultiAEMessage(meta=message.meta,
mess_offset=message.mess_offset,
mess_count=message.mess_count,
model=model)

return output_message

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
stream = builder.make_node(self.unique_name, ops.map(self.on_data), ops.filter(lambda x: x is not None))
builder.make_edge(input_stream[0], stream)

return stream, MultiAEMessage
return_type = input_stream[1]
if (return_type == MultiDFPMessage):
return_type = MultiAEMessage

return stream, return_type
38 changes: 20 additions & 18 deletions models/training-tuning-scripts/dfp-models/hammah-20211017-script.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=invalid-name
"""
Example Usage:
python hammah-20211017-script.py \
Expand All @@ -24,13 +25,14 @@
import dill
import pandas as pd
import torch

from morpheus.models.dfencoder import AutoEncoder
from morpheus.utils.seed import manual_seed


def main():
X_train = pd.read_csv(args.trainingdata)
X_val = pd.read_csv(args.valdata)
x_train = pd.read_csv(args.trainingdata)
x_val = pd.read_csv(args.valdata)

features = [
'eventSource',
Expand Down Expand Up @@ -67,31 +69,31 @@ def main():
'responseElementsreservationId',
'requestParametersgroupName'
] # NO userIdentitysessionContextsessionIssuerarn,userIdentityuserName
for i in list(X_train):
for i in list(x_train):
if i not in features:
X_train = X_train.drop(i, axis=1)
for i in list(X_val):
x_train = x_train.drop(i, axis=1)
for i in list(x_val):
if i not in features:
X_val = X_val.drop(i, axis=1)
x_val = x_val.drop(i, axis=1)

X_train = X_train.dropna(axis=1, how='all')
X_val = X_val.dropna(axis=1, how='all')
x_train = x_train.dropna(axis=1, how='all')
x_val = x_val.dropna(axis=1, how='all')

for i in list(X_val):
if i not in list(X_train):
X_val = X_val.drop([i], axis=1)
for i in list(x_val):
if i not in list(x_train):
x_val = x_val.drop([i], axis=1)

for i in list(X_train):
if i not in list(X_val):
X_train = X_train.drop([i], axis=1)
for i in list(x_train):
if i not in list(x_val):
x_train = x_train.drop([i], axis=1)
manual_seed(42)
model = AutoEncoder(
encoder_layers=[512, 500], # layers of the encoding part
decoder_layers=[512], # layers of the decoding part
activation='relu', # activation function
swap_p=0.2, # noise parameter
lr=0.01, # learning rate
lr_decay=.99, # learning decay
swap_probability=0.2, # noise parameter
learning_rate=0.01, # learning rate
learning_rate_decay=.99, # learning decay
batch_size=512,
logger='ipynb',
verbose=False,
Expand All @@ -100,7 +102,7 @@ def main():
min_cats=1 # cut off for minority categories
)

model.fit(X_train, epochs=25, val=X_val)
model.fit(x_train, epochs=25, validation_data=x_val)

torch.save(model.state_dict(), args.trainingdata[:-4] + ".pkl")
with open(args.trainingdata[:-4] + 'dill' + '.pkl', 'wb') as f:
Expand Down
4 changes: 2 additions & 2 deletions morpheus/controllers/mlflow_model_writer_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ def on_data(self, message: MultiAEMessage):
# Log all params in one dict to avoid round trips
mlflow.log_params({
"Algorithm": "Denosing Autoencoder",
"Epochs": model.lr_decay.state_dict().get("last_epoch", "unknown"),
"Learning rate": model.lr,
"Epochs": model.learning_rate_decay.state_dict().get("last_epoch", "unknown"),
"Learning rate": model.learning_rate,
"Batch size": model.batch_size,
"Start Epoch": message.get_meta(self._timestamp_column_name).min(),
"End Epoch": message.get_meta(self._timestamp_column_name).max(),
Expand Down
8 changes: 4 additions & 4 deletions morpheus/models/dfencoder/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@
from .ae_module import CompleteLayer
from .autoencoder import AutoEncoder
from .dataframe import EncoderDataFrame
from .dataloader import DatasetFromDataframe
from .dataloader import DatasetFromPath
from .dataloader import DataframeDataset
from .dataloader import DFEncoderDataLoader
from .dataloader import FileSystemDataset
from .distributed_ae import DistributedAutoEncoder
from .logging import BasicLogger
from .logging import IpynbLogger
Expand All @@ -74,8 +74,8 @@
"CompleteLayer",
"AutoEncoder",
"EncoderDataFrame",
"DatasetFromDataframe",
"DatasetFromPath",
"DataframeDataset",
"FileSystemDataset",
"DFEncoderDataLoader",
"DistributedAutoEncoder",
"BasicLogger",
Expand Down
Loading

0 comments on commit a88276e

Please sign in to comment.