From 3eb4259364e45059810f7243291254c52827d758 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 31 Jan 2023 13:17:25 -0800 Subject: [PATCH 1/6] Configure monitor stage to enable/disable itself based on the configured log level --- morpheus/stages/general/monitor_stage.py | 25 +++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/morpheus/stages/general/monitor_stage.py b/morpheus/stages/general/monitor_stage.py index d882676d71..b218587c70 100644 --- a/morpheus/stages/general/monitor_stage.py +++ b/morpheus/stages/general/monitor_stage.py @@ -25,6 +25,7 @@ import cudf from morpheus.cli.register_stage import register_stage +from morpheus.cli.utils import parse_log_level from morpheus.config import Config from morpheus.messages import MessageMeta from morpheus.messages import MultiMessage @@ -165,7 +166,8 @@ class MonitorStage(SinglePortStage): determine_count_fn : typing.Callable[[typing.Any], int] Custom function for determining the count in a message. Gets called for each message. Allows for correct counting of batched and sliced messages. - + log_level : str, default = "INFO" + Enable this stage when the configured log level is at `log_level` or lower. """ stage_count: int = 0 @@ -175,7 +177,8 @@ def __init__(self, smoothing: float = 0.05, unit: str = "messages", delayed_start: bool = False, - determine_count_fn: typing.Callable[[typing.Any], int] = None): + determine_count_fn: typing.Callable[[typing.Any], int] = None, + log_level: str = "INFO"): super().__init__(c) self._progress: MorpheusTqdm = None @@ -190,6 +193,11 @@ def __init__(self, self._determine_count_fn = determine_count_fn + if isinstance(log_level, str): + log_level = parse_log_level(None, None, log_level) + + self._log_level = log_level + @property def name(self) -> str: return "monitor" @@ -213,12 +221,13 @@ def on_start(self): """ Starts the pipeline stage's progress bar. """ - # Set the monitor interval to 0 to use prevent using tqdms monitor - tqdm.monitor_interval = 0 + if logger.isEnabledFor(self._log_level): + # Set the monitor interval to 0 to use prevent using tqdms monitor + tqdm.monitor_interval = 0 - # Start the progress bar if we dont have a delayed start - if (not self._delayed_start): - self._ensure_progress_bar() + # Start the progress bar if we dont have a delayed start + if (not self._delayed_start): + self._ensure_progress_bar() def stop(self): """ @@ -241,6 +250,8 @@ def _ensure_progress_bar(self): self._progress.reset() def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair: + if not logger.isEnabledFor(self._log_level): + return input_stream def sink_on_completed(): # Set the name to complete. This refreshes the display From bbdddac24d2ff00e9452d95c345aed4eb6d58c38 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 31 Jan 2023 15:13:27 -0800 Subject: [PATCH 2/6] Fix monitor tests --- tests/conftest.py | 16 +++++++++++++ tests/test_monitor_stage.py | 48 +++++++++++++++++++++++++++++++------ 2 files changed, 57 insertions(+), 7 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 9ca54369e0..75da6637fc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -485,6 +485,22 @@ def _wrap_set_log_level(log_level: int): set_log_level(old_level) +@pytest.fixture(scope="function") +def reset_loglevel(): + """ + Fixture restores the log level after running the given test. + """ + import mrc + + from morpheus.utils.logger import set_log_level + + old_level = mrc.logging.get_level() + + yield + + set_log_level(old_level) + + @pytest.fixture(scope="function") def loglevel_debug(): """ diff --git a/tests/test_monitor_stage.py b/tests/test_monitor_stage.py index 26a21eee3e..de6a5724dc 100755 --- a/tests/test_monitor_stage.py +++ b/tests/test_monitor_stage.py @@ -15,6 +15,8 @@ # limitations under the License. import inspect +import logging +import os from unittest import mock import pytest @@ -22,13 +24,17 @@ import cudf from morpheus.messages import MultiMessage +from morpheus.pipeline import LinearPipeline from morpheus.stages.general.monitor_stage import MonitorStage +from morpheus.stages.input.file_source_stage import FileSourceStage +from morpheus.utils.logger import set_log_level +from utils import TEST_DIRS def test_constructor(config): # Intentionally not making assumptions about the defaults other than they exist # and still create a valid stage. - m = MonitorStage(config) + m = MonitorStage(config, log_level=logging.WARNING) assert m.name == "monitor" # Just ensure that we get a valid non-empty tuple @@ -50,7 +56,7 @@ def two_x(x): def test_on_start(mock_morph_tqdm, config): mock_morph_tqdm.return_value = mock_morph_tqdm - m = MonitorStage(config) + m = MonitorStage(config, log_level=logging.WARNING) assert m._progress is None m.on_start() @@ -63,7 +69,7 @@ def test_on_start(mock_morph_tqdm, config): def test_stop(mock_morph_tqdm, config): mock_morph_tqdm.return_value = mock_morph_tqdm - m = MonitorStage(config) + m = MonitorStage(config, log_level=logging.WARNING) assert m._progress is None # Calling on_stop is a noop if we are stopped @@ -79,7 +85,7 @@ def test_stop(mock_morph_tqdm, config): def test_refresh(mock_morph_tqdm, config): mock_morph_tqdm.return_value = mock_morph_tqdm - m = MonitorStage(config) + m = MonitorStage(config, log_level=logging.WARNING) assert m._progress is None m.on_start() @@ -99,7 +105,7 @@ def test_build_single(mock_morph_tqdm, mock_operators, config): mock_segment.make_node_full.return_value = mock_stream mock_input = mock.MagicMock() - m = MonitorStage(config) + m = MonitorStage(config, log_level=logging.WARNING) m._build_single(mock_segment, mock_input) m.on_start() @@ -123,7 +129,7 @@ def test_build_single(mock_morph_tqdm, mock_operators, config): def test_auto_count_fn(config): - m = MonitorStage(config) + m = MonitorStage(config, log_level=logging.WARNING) assert m._auto_count_fn(None) is None assert m._auto_count_fn([]) is None @@ -148,7 +154,7 @@ def test_auto_count_fn(config): def test_progress_sink(mock_morph_tqdm, config): mock_morph_tqdm.return_value = mock_morph_tqdm - m = MonitorStage(config) + m = MonitorStage(config, log_level=logging.WARNING) m.on_start() m._progress_sink(None) @@ -158,3 +164,31 @@ def test_progress_sink(mock_morph_tqdm, config): m._progress_sink(MultiMessage(None, 0, 12)) assert inspect.isfunction(m._determine_count_fn) mock_morph_tqdm.update.assert_called_once_with(n=12) + + +@pytest.mark.usefixtures("reset_loglevel") +@pytest.mark.parametrize('morpheus_log_level', + [logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG]) +@mock.patch('mrc.Builder.make_node_full') +@mock.patch('mrc.Builder.make_edge') +def test_log_level(mock_make_edge, mock_make_node_full, config, morpheus_log_level): + """ + Test ensures the monitor stage doesn't add itself to the MRC pipeline if not configured for the current log-level + """ + input_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv") + + set_log_level(morpheus_log_level) + monitor_stage_level = logging.INFO + + should_be_included = (morpheus_log_level <= monitor_stage_level) + + pipe = LinearPipeline(config) + pipe.set_source(FileSourceStage(config, filename=input_file)) + + ms = MonitorStage(config, log_level=monitor_stage_level) + + pipe.add_stage(ms) + pipe.run() + + expected_call_count = 1 if should_be_included else 0 + assert mock_make_node_full.call_count == expected_call_count From 0eaaa22e4cdadbabd9bbaebeca91908c3a04d8de Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 7 Feb 2023 08:17:06 -0800 Subject: [PATCH 3/6] wip --- morpheus/stages/general/monitor_stage.py | 20 ++++++++++++-------- morpheus/utils/logger.py | 3 +++ 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/morpheus/stages/general/monitor_stage.py b/morpheus/stages/general/monitor_stage.py index b218587c70..dbefbb8738 100644 --- a/morpheus/stages/general/monitor_stage.py +++ b/morpheus/stages/general/monitor_stage.py @@ -25,12 +25,12 @@ import cudf from morpheus.cli.register_stage import register_stage -from morpheus.cli.utils import parse_log_level from morpheus.config import Config from morpheus.messages import MessageMeta from morpheus.messages import MultiMessage from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.pipeline.stream_pair import StreamPair +from morpheus.utils.logger import LogLevels logger = logging.getLogger(__name__) @@ -166,7 +166,7 @@ class MonitorStage(SinglePortStage): determine_count_fn : typing.Callable[[typing.Any], int] Custom function for determining the count in a message. Gets called for each message. Allows for correct counting of batched and sliced messages. - log_level : str, default = "INFO" + log_level : `morpheus.utils.logger.LogLevels`, default = 'INFO' Enable this stage when the configured log level is at `log_level` or lower. """ stage_count: int = 0 @@ -178,7 +178,7 @@ def __init__(self, unit: str = "messages", delayed_start: bool = False, determine_count_fn: typing.Callable[[typing.Any], int] = None, - log_level: str = "INFO"): + log_level: LogLevels = LogLevels.INFO): super().__init__(c) self._progress: MorpheusTqdm = None @@ -193,10 +193,8 @@ def __init__(self, self._determine_count_fn = determine_count_fn - if isinstance(log_level, str): - log_level = parse_log_level(None, None, log_level) - self._log_level = log_level + self._enabled = None # defined on first call to _is_enabled @property def name(self) -> str: @@ -217,11 +215,17 @@ def accepted_types(self) -> typing.Tuple: def supports_cpp_node(self): return False + def _is_enabled(self) -> bool: + if self._enabled is None: + self._enabled = logger.isEnabledFor(self._log_level) + + return self._enabled + def on_start(self): """ Starts the pipeline stage's progress bar. """ - if logger.isEnabledFor(self._log_level): + if self._is_enabled(): # Set the monitor interval to 0 to use prevent using tqdms monitor tqdm.monitor_interval = 0 @@ -250,7 +254,7 @@ def _ensure_progress_bar(self): self._progress.reset() def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair: - if not logger.isEnabledFor(self._log_level): + if not self._is_enabled(): return input_stream def sink_on_completed(): diff --git a/morpheus/utils/logger.py b/morpheus/utils/logger.py index 4ff6a37e83..20d0742fdc 100644 --- a/morpheus/utils/logger.py +++ b/morpheus/utils/logger.py @@ -18,6 +18,7 @@ import logging.handlers import multiprocessing import os +from enum import Enum import appdirs import click @@ -25,6 +26,8 @@ from tqdm import tqdm +LogLevels = Enum('LogLevels', logging._nameToLevel) + class TqdmLoggingHandler(logging.Handler): """ Console log handler used by Morpheus, provides colorized output and sends From 6c99cd1d1913ca737d2f892cf7ecb3bd941781cf Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 14 Feb 2023 09:24:34 -0800 Subject: [PATCH 4/6] Formatting fix --- morpheus/utils/logger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/morpheus/utils/logger.py b/morpheus/utils/logger.py index 20d0742fdc..e60fb4b8cf 100644 --- a/morpheus/utils/logger.py +++ b/morpheus/utils/logger.py @@ -25,9 +25,9 @@ import mrc from tqdm import tqdm - LogLevels = Enum('LogLevels', logging._nameToLevel) + class TqdmLoggingHandler(logging.Handler): """ Console log handler used by Morpheus, provides colorized output and sends From c9a990c42647e155c7e103151871b536ef2a9c60 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 14 Feb 2023 11:27:35 -0800 Subject: [PATCH 5/6] Store the enum value not the enum --- morpheus/stages/general/monitor_stage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/morpheus/stages/general/monitor_stage.py b/morpheus/stages/general/monitor_stage.py index dbefbb8738..985494b622 100644 --- a/morpheus/stages/general/monitor_stage.py +++ b/morpheus/stages/general/monitor_stage.py @@ -193,7 +193,7 @@ def __init__(self, self._determine_count_fn = determine_count_fn - self._log_level = log_level + self._log_level = log_level.value self._enabled = None # defined on first call to _is_enabled @property From 67ce9735c964d7ffd1d1eab8035daf2a8633fbc1 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 14 Feb 2023 11:29:31 -0800 Subject: [PATCH 6/6] fix to allow int or enum --- morpheus/stages/general/monitor_stage.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/morpheus/stages/general/monitor_stage.py b/morpheus/stages/general/monitor_stage.py index 985494b622..dbcc35917b 100644 --- a/morpheus/stages/general/monitor_stage.py +++ b/morpheus/stages/general/monitor_stage.py @@ -193,7 +193,10 @@ def __init__(self, self._determine_count_fn = determine_count_fn - self._log_level = log_level.value + if isinstance(log_level, LogLevels): + log_level = log_level.value + + self._log_level = log_level self._enabled = None # defined on first call to _is_enabled @property