From 123c8486a2e4595c3a771eb7270cfe9a79b38ac1 Mon Sep 17 00:00:00 2001 From: David Gardner <96306125+dagardner-nv@users.noreply.github.com> Date: Tue, 14 Feb 2023 12:02:32 -0800 Subject: [PATCH] Control MonitorStage output with the log-level (#659) * Add new `log_level` constructor argument to `MonitorStage` * `MonitorStage` only adds itself to the MRC pipeline if the `morpheus.stages.general.monitor_stage` logger is configured at or below `log_level` Allows pipeline authors to control the inclusion of progress output via the Morpheus logger. By default monitor stage output will only be present for `INFO` level and below. Marking this as a breaking change for pipelines authors who expect to see monitor stage output when configuring Morpheus with `WARNING` level and higher. Requires #676 to be merged, and forward-merged into 23.03 first. fixes #553 Authors: - David Gardner (https://github.com/dagardner-nv) Approvers: - Michael Demoret (https://github.com/mdemoret-nv) URL: https://github.com/nv-morpheus/Morpheus/pull/659 --- morpheus/stages/general/monitor_stage.py | 32 ++++++++++++---- morpheus/utils/logger.py | 3 ++ tests/conftest.py | 16 ++++++++ tests/test_monitor_stage.py | 48 ++++++++++++++++++++---- 4 files changed, 85 insertions(+), 14 deletions(-) diff --git a/morpheus/stages/general/monitor_stage.py b/morpheus/stages/general/monitor_stage.py index d882676d71..dbcc35917b 100644 --- a/morpheus/stages/general/monitor_stage.py +++ b/morpheus/stages/general/monitor_stage.py @@ -30,6 +30,7 @@ from morpheus.messages import MultiMessage from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.pipeline.stream_pair import StreamPair +from morpheus.utils.logger import LogLevels logger = logging.getLogger(__name__) @@ -165,7 +166,8 @@ class MonitorStage(SinglePortStage): determine_count_fn : typing.Callable[[typing.Any], int] Custom function for determining the count in a message. Gets called for each message. Allows for correct counting of batched and sliced messages. - + log_level : `morpheus.utils.logger.LogLevels`, default = 'INFO' + Enable this stage when the configured log level is at `log_level` or lower. """ stage_count: int = 0 @@ -175,7 +177,8 @@ def __init__(self, smoothing: float = 0.05, unit: str = "messages", delayed_start: bool = False, - determine_count_fn: typing.Callable[[typing.Any], int] = None): + determine_count_fn: typing.Callable[[typing.Any], int] = None, + log_level: LogLevels = LogLevels.INFO): super().__init__(c) self._progress: MorpheusTqdm = None @@ -190,6 +193,12 @@ def __init__(self, self._determine_count_fn = determine_count_fn + if isinstance(log_level, LogLevels): + log_level = log_level.value + + self._log_level = log_level + self._enabled = None # defined on first call to _is_enabled + @property def name(self) -> str: return "monitor" @@ -209,16 +218,23 @@ def accepted_types(self) -> typing.Tuple: def supports_cpp_node(self): return False + def _is_enabled(self) -> bool: + if self._enabled is None: + self._enabled = logger.isEnabledFor(self._log_level) + + return self._enabled + def on_start(self): """ Starts the pipeline stage's progress bar. """ - # Set the monitor interval to 0 to use prevent using tqdms monitor - tqdm.monitor_interval = 0 + if self._is_enabled(): + # Set the monitor interval to 0 to use prevent using tqdms monitor + tqdm.monitor_interval = 0 - # Start the progress bar if we dont have a delayed start - if (not self._delayed_start): - self._ensure_progress_bar() + # Start the progress bar if we dont have a delayed start + if (not self._delayed_start): + self._ensure_progress_bar() def stop(self): """ @@ -241,6 +257,8 @@ def _ensure_progress_bar(self): self._progress.reset() def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair: + if not self._is_enabled(): + return input_stream def sink_on_completed(): # Set the name to complete. This refreshes the display diff --git a/morpheus/utils/logger.py b/morpheus/utils/logger.py index 4ff6a37e83..e60fb4b8cf 100644 --- a/morpheus/utils/logger.py +++ b/morpheus/utils/logger.py @@ -18,12 +18,15 @@ import logging.handlers import multiprocessing import os +from enum import Enum import appdirs import click import mrc from tqdm import tqdm +LogLevels = Enum('LogLevels', logging._nameToLevel) + class TqdmLoggingHandler(logging.Handler): """ diff --git a/tests/conftest.py b/tests/conftest.py index 9ca54369e0..75da6637fc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -485,6 +485,22 @@ def _wrap_set_log_level(log_level: int): set_log_level(old_level) +@pytest.fixture(scope="function") +def reset_loglevel(): + """ + Fixture restores the log level after running the given test. + """ + import mrc + + from morpheus.utils.logger import set_log_level + + old_level = mrc.logging.get_level() + + yield + + set_log_level(old_level) + + @pytest.fixture(scope="function") def loglevel_debug(): """ diff --git a/tests/test_monitor_stage.py b/tests/test_monitor_stage.py index 26a21eee3e..de6a5724dc 100755 --- a/tests/test_monitor_stage.py +++ b/tests/test_monitor_stage.py @@ -15,6 +15,8 @@ # limitations under the License. import inspect +import logging +import os from unittest import mock import pytest @@ -22,13 +24,17 @@ import cudf from morpheus.messages import MultiMessage +from morpheus.pipeline import LinearPipeline from morpheus.stages.general.monitor_stage import MonitorStage +from morpheus.stages.input.file_source_stage import FileSourceStage +from morpheus.utils.logger import set_log_level +from utils import TEST_DIRS def test_constructor(config): # Intentionally not making assumptions about the defaults other than they exist # and still create a valid stage. - m = MonitorStage(config) + m = MonitorStage(config, log_level=logging.WARNING) assert m.name == "monitor" # Just ensure that we get a valid non-empty tuple @@ -50,7 +56,7 @@ def two_x(x): def test_on_start(mock_morph_tqdm, config): mock_morph_tqdm.return_value = mock_morph_tqdm - m = MonitorStage(config) + m = MonitorStage(config, log_level=logging.WARNING) assert m._progress is None m.on_start() @@ -63,7 +69,7 @@ def test_on_start(mock_morph_tqdm, config): def test_stop(mock_morph_tqdm, config): mock_morph_tqdm.return_value = mock_morph_tqdm - m = MonitorStage(config) + m = MonitorStage(config, log_level=logging.WARNING) assert m._progress is None # Calling on_stop is a noop if we are stopped @@ -79,7 +85,7 @@ def test_stop(mock_morph_tqdm, config): def test_refresh(mock_morph_tqdm, config): mock_morph_tqdm.return_value = mock_morph_tqdm - m = MonitorStage(config) + m = MonitorStage(config, log_level=logging.WARNING) assert m._progress is None m.on_start() @@ -99,7 +105,7 @@ def test_build_single(mock_morph_tqdm, mock_operators, config): mock_segment.make_node_full.return_value = mock_stream mock_input = mock.MagicMock() - m = MonitorStage(config) + m = MonitorStage(config, log_level=logging.WARNING) m._build_single(mock_segment, mock_input) m.on_start() @@ -123,7 +129,7 @@ def test_build_single(mock_morph_tqdm, mock_operators, config): def test_auto_count_fn(config): - m = MonitorStage(config) + m = MonitorStage(config, log_level=logging.WARNING) assert m._auto_count_fn(None) is None assert m._auto_count_fn([]) is None @@ -148,7 +154,7 @@ def test_auto_count_fn(config): def test_progress_sink(mock_morph_tqdm, config): mock_morph_tqdm.return_value = mock_morph_tqdm - m = MonitorStage(config) + m = MonitorStage(config, log_level=logging.WARNING) m.on_start() m._progress_sink(None) @@ -158,3 +164,31 @@ def test_progress_sink(mock_morph_tqdm, config): m._progress_sink(MultiMessage(None, 0, 12)) assert inspect.isfunction(m._determine_count_fn) mock_morph_tqdm.update.assert_called_once_with(n=12) + + +@pytest.mark.usefixtures("reset_loglevel") +@pytest.mark.parametrize('morpheus_log_level', + [logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG]) +@mock.patch('mrc.Builder.make_node_full') +@mock.patch('mrc.Builder.make_edge') +def test_log_level(mock_make_edge, mock_make_node_full, config, morpheus_log_level): + """ + Test ensures the monitor stage doesn't add itself to the MRC pipeline if not configured for the current log-level + """ + input_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv") + + set_log_level(morpheus_log_level) + monitor_stage_level = logging.INFO + + should_be_included = (morpheus_log_level <= monitor_stage_level) + + pipe = LinearPipeline(config) + pipe.set_source(FileSourceStage(config, filename=input_file)) + + ms = MonitorStage(config, log_level=monitor_stage_level) + + pipe.add_stage(ms) + pipe.run() + + expected_call_count = 1 if should_be_included else 0 + assert mock_make_node_full.call_count == expected_call_count