Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Control MonitorStage output with the log-level #659

Merged
Merged
25 changes: 18 additions & 7 deletions morpheus/stages/general/monitor_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import cudf

from morpheus.cli.register_stage import register_stage
from morpheus.cli.utils import parse_log_level
from morpheus.config import Config
from morpheus.messages import MessageMeta
from morpheus.messages import MultiMessage
Expand Down Expand Up @@ -165,7 +166,8 @@ class MonitorStage(SinglePortStage):
determine_count_fn : typing.Callable[[typing.Any], int]
Custom function for determining the count in a message. Gets called for each message. Allows for
correct counting of batched and sliced messages.

log_level : str, default = "INFO"
Enable this stage when the configured log level is at `log_level` or lower.
"""
stage_count: int = 0

Expand All @@ -175,7 +177,8 @@ def __init__(self,
smoothing: float = 0.05,
unit: str = "messages",
delayed_start: bool = False,
determine_count_fn: typing.Callable[[typing.Any], int] = None):
determine_count_fn: typing.Callable[[typing.Any], int] = None,
log_level: str = "INFO"):
dagardner-nv marked this conversation as resolved.
Show resolved Hide resolved
super().__init__(c)

self._progress: MorpheusTqdm = None
Expand All @@ -190,6 +193,11 @@ def __init__(self,

self._determine_count_fn = determine_count_fn

if isinstance(log_level, str):
log_level = parse_log_level(None, None, log_level)

self._log_level = log_level

@property
def name(self) -> str:
return "monitor"
Expand All @@ -213,12 +221,13 @@ def on_start(self):
"""
Starts the pipeline stage's progress bar.
"""
# Set the monitor interval to 0 to use prevent using tqdms monitor
tqdm.monitor_interval = 0
if logger.isEnabledFor(self._log_level):
dagardner-nv marked this conversation as resolved.
Show resolved Hide resolved
# Set the monitor interval to 0 to use prevent using tqdms monitor
tqdm.monitor_interval = 0

# Start the progress bar if we dont have a delayed start
if (not self._delayed_start):
self._ensure_progress_bar()
# Start the progress bar if we dont have a delayed start
if (not self._delayed_start):
self._ensure_progress_bar()

def stop(self):
"""
Expand All @@ -241,6 +250,8 @@ def _ensure_progress_bar(self):
self._progress.reset()

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
if not logger.isEnabledFor(self._log_level):
dagardner-nv marked this conversation as resolved.
Show resolved Hide resolved
return input_stream

def sink_on_completed():
# Set the name to complete. This refreshes the display
Expand Down
16 changes: 16 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,22 @@ def _wrap_set_log_level(log_level: int):
set_log_level(old_level)


@pytest.fixture(scope="function")
def reset_loglevel():
"""
Fixture restores the log level after running the given test.
"""
import mrc

from morpheus.utils.logger import set_log_level

old_level = mrc.logging.get_level()

yield

set_log_level(old_level)


@pytest.fixture(scope="function")
def loglevel_debug():
"""
Expand Down
48 changes: 41 additions & 7 deletions tests/test_monitor_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,26 @@
# limitations under the License.

import inspect
import logging
import os
from unittest import mock

import pytest

import cudf

from morpheus.messages import MultiMessage
from morpheus.pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.utils.logger import set_log_level
from utils import TEST_DIRS


def test_constructor(config):
# Intentionally not making assumptions about the defaults other than they exist
# and still create a valid stage.
m = MonitorStage(config)
m = MonitorStage(config, log_level=logging.WARNING)
assert m.name == "monitor"

# Just ensure that we get a valid non-empty tuple
Expand All @@ -50,7 +56,7 @@ def two_x(x):
def test_on_start(mock_morph_tqdm, config):
mock_morph_tqdm.return_value = mock_morph_tqdm

m = MonitorStage(config)
m = MonitorStage(config, log_level=logging.WARNING)
assert m._progress is None

m.on_start()
Expand All @@ -63,7 +69,7 @@ def test_on_start(mock_morph_tqdm, config):
def test_stop(mock_morph_tqdm, config):
mock_morph_tqdm.return_value = mock_morph_tqdm

m = MonitorStage(config)
m = MonitorStage(config, log_level=logging.WARNING)
assert m._progress is None

# Calling on_stop is a noop if we are stopped
Expand All @@ -79,7 +85,7 @@ def test_stop(mock_morph_tqdm, config):
def test_refresh(mock_morph_tqdm, config):
mock_morph_tqdm.return_value = mock_morph_tqdm

m = MonitorStage(config)
m = MonitorStage(config, log_level=logging.WARNING)
assert m._progress is None

m.on_start()
Expand All @@ -99,7 +105,7 @@ def test_build_single(mock_morph_tqdm, mock_operators, config):
mock_segment.make_node_full.return_value = mock_stream
mock_input = mock.MagicMock()

m = MonitorStage(config)
m = MonitorStage(config, log_level=logging.WARNING)
m._build_single(mock_segment, mock_input)
m.on_start()

Expand All @@ -123,7 +129,7 @@ def test_build_single(mock_morph_tqdm, mock_operators, config):


def test_auto_count_fn(config):
m = MonitorStage(config)
m = MonitorStage(config, log_level=logging.WARNING)

assert m._auto_count_fn(None) is None
assert m._auto_count_fn([]) is None
Expand All @@ -148,7 +154,7 @@ def test_auto_count_fn(config):
def test_progress_sink(mock_morph_tqdm, config):
mock_morph_tqdm.return_value = mock_morph_tqdm

m = MonitorStage(config)
m = MonitorStage(config, log_level=logging.WARNING)
m.on_start()

m._progress_sink(None)
Expand All @@ -158,3 +164,31 @@ def test_progress_sink(mock_morph_tqdm, config):
m._progress_sink(MultiMessage(None, 0, 12))
assert inspect.isfunction(m._determine_count_fn)
mock_morph_tqdm.update.assert_called_once_with(n=12)


@pytest.mark.usefixtures("reset_loglevel")
@pytest.mark.parametrize('morpheus_log_level',
[logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG])
@mock.patch('mrc.Builder.make_node_full')
@mock.patch('mrc.Builder.make_edge')
def test_log_level(mock_make_edge, mock_make_node_full, config, morpheus_log_level):
"""
Test ensures the monitor stage doesn't add itself to the MRC pipeline if not configured for the current log-level
"""
input_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv")

set_log_level(morpheus_log_level)
monitor_stage_level = logging.INFO

should_be_included = (morpheus_log_level <= monitor_stage_level)

pipe = LinearPipeline(config)
pipe.set_source(FileSourceStage(config, filename=input_file))

ms = MonitorStage(config, log_level=monitor_stage_level)

pipe.add_stage(ms)
pipe.run()

expected_call_count = 1 if should_be_included else 0
assert mock_make_node_full.call_count == expected_call_count