Skip to content

Commit

Permalink
Control MonitorStage output with the log-level (nv-morpheus#659)
Browse files Browse the repository at this point in the history
* Add new `log_level` constructor argument to `MonitorStage`
* `MonitorStage` only adds itself to the MRC pipeline if the `morpheus.stages.general.monitor_stage` logger is configured at or below `log_level`

Allows pipeline authors to control the inclusion of progress output via the Morpheus logger. By default monitor stage output will only be present for `INFO` level and below.

Marking this as a breaking change for pipelines authors who expect to see monitor stage output when configuring Morpheus with `WARNING` level and higher.

Requires nv-morpheus#676 to be merged, and forward-merged into 23.03 first.

fixes nv-morpheus#553

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: nv-morpheus#659
  • Loading branch information
dagardner-nv authored and jjacobelli committed Mar 7, 2023
1 parent 91b96da commit 123c848
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 14 deletions.
32 changes: 25 additions & 7 deletions morpheus/stages/general/monitor_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from morpheus.messages import MultiMessage
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.utils.logger import LogLevels

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -165,7 +166,8 @@ class MonitorStage(SinglePortStage):
determine_count_fn : typing.Callable[[typing.Any], int]
Custom function for determining the count in a message. Gets called for each message. Allows for
correct counting of batched and sliced messages.
log_level : `morpheus.utils.logger.LogLevels`, default = 'INFO'
Enable this stage when the configured log level is at `log_level` or lower.
"""
stage_count: int = 0

Expand All @@ -175,7 +177,8 @@ def __init__(self,
smoothing: float = 0.05,
unit: str = "messages",
delayed_start: bool = False,
determine_count_fn: typing.Callable[[typing.Any], int] = None):
determine_count_fn: typing.Callable[[typing.Any], int] = None,
log_level: LogLevels = LogLevels.INFO):
super().__init__(c)

self._progress: MorpheusTqdm = None
Expand All @@ -190,6 +193,12 @@ def __init__(self,

self._determine_count_fn = determine_count_fn

if isinstance(log_level, LogLevels):
log_level = log_level.value

self._log_level = log_level
self._enabled = None # defined on first call to _is_enabled

@property
def name(self) -> str:
return "monitor"
Expand All @@ -209,16 +218,23 @@ def accepted_types(self) -> typing.Tuple:
def supports_cpp_node(self):
return False

def _is_enabled(self) -> bool:
if self._enabled is None:
self._enabled = logger.isEnabledFor(self._log_level)

return self._enabled

def on_start(self):
"""
Starts the pipeline stage's progress bar.
"""
# Set the monitor interval to 0 to use prevent using tqdms monitor
tqdm.monitor_interval = 0
if self._is_enabled():
# Set the monitor interval to 0 to use prevent using tqdms monitor
tqdm.monitor_interval = 0

# Start the progress bar if we dont have a delayed start
if (not self._delayed_start):
self._ensure_progress_bar()
# Start the progress bar if we dont have a delayed start
if (not self._delayed_start):
self._ensure_progress_bar()

def stop(self):
"""
Expand All @@ -241,6 +257,8 @@ def _ensure_progress_bar(self):
self._progress.reset()

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
if not self._is_enabled():
return input_stream

def sink_on_completed():
# Set the name to complete. This refreshes the display
Expand Down
3 changes: 3 additions & 0 deletions morpheus/utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
import logging.handlers
import multiprocessing
import os
from enum import Enum

import appdirs
import click
import mrc
from tqdm import tqdm

LogLevels = Enum('LogLevels', logging._nameToLevel)


class TqdmLoggingHandler(logging.Handler):
"""
Expand Down
16 changes: 16 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,22 @@ def _wrap_set_log_level(log_level: int):
set_log_level(old_level)


@pytest.fixture(scope="function")
def reset_loglevel():
"""
Fixture restores the log level after running the given test.
"""
import mrc

from morpheus.utils.logger import set_log_level

old_level = mrc.logging.get_level()

yield

set_log_level(old_level)


@pytest.fixture(scope="function")
def loglevel_debug():
"""
Expand Down
48 changes: 41 additions & 7 deletions tests/test_monitor_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,26 @@
# limitations under the License.

import inspect
import logging
import os
from unittest import mock

import pytest

import cudf

from morpheus.messages import MultiMessage
from morpheus.pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.utils.logger import set_log_level
from utils import TEST_DIRS


def test_constructor(config):
# Intentionally not making assumptions about the defaults other than they exist
# and still create a valid stage.
m = MonitorStage(config)
m = MonitorStage(config, log_level=logging.WARNING)
assert m.name == "monitor"

# Just ensure that we get a valid non-empty tuple
Expand All @@ -50,7 +56,7 @@ def two_x(x):
def test_on_start(mock_morph_tqdm, config):
mock_morph_tqdm.return_value = mock_morph_tqdm

m = MonitorStage(config)
m = MonitorStage(config, log_level=logging.WARNING)
assert m._progress is None

m.on_start()
Expand All @@ -63,7 +69,7 @@ def test_on_start(mock_morph_tqdm, config):
def test_stop(mock_morph_tqdm, config):
mock_morph_tqdm.return_value = mock_morph_tqdm

m = MonitorStage(config)
m = MonitorStage(config, log_level=logging.WARNING)
assert m._progress is None

# Calling on_stop is a noop if we are stopped
Expand All @@ -79,7 +85,7 @@ def test_stop(mock_morph_tqdm, config):
def test_refresh(mock_morph_tqdm, config):
mock_morph_tqdm.return_value = mock_morph_tqdm

m = MonitorStage(config)
m = MonitorStage(config, log_level=logging.WARNING)
assert m._progress is None

m.on_start()
Expand All @@ -99,7 +105,7 @@ def test_build_single(mock_morph_tqdm, mock_operators, config):
mock_segment.make_node_full.return_value = mock_stream
mock_input = mock.MagicMock()

m = MonitorStage(config)
m = MonitorStage(config, log_level=logging.WARNING)
m._build_single(mock_segment, mock_input)
m.on_start()

Expand All @@ -123,7 +129,7 @@ def test_build_single(mock_morph_tqdm, mock_operators, config):


def test_auto_count_fn(config):
m = MonitorStage(config)
m = MonitorStage(config, log_level=logging.WARNING)

assert m._auto_count_fn(None) is None
assert m._auto_count_fn([]) is None
Expand All @@ -148,7 +154,7 @@ def test_auto_count_fn(config):
def test_progress_sink(mock_morph_tqdm, config):
mock_morph_tqdm.return_value = mock_morph_tqdm

m = MonitorStage(config)
m = MonitorStage(config, log_level=logging.WARNING)
m.on_start()

m._progress_sink(None)
Expand All @@ -158,3 +164,31 @@ def test_progress_sink(mock_morph_tqdm, config):
m._progress_sink(MultiMessage(None, 0, 12))
assert inspect.isfunction(m._determine_count_fn)
mock_morph_tqdm.update.assert_called_once_with(n=12)


@pytest.mark.usefixtures("reset_loglevel")
@pytest.mark.parametrize('morpheus_log_level',
[logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG])
@mock.patch('mrc.Builder.make_node_full')
@mock.patch('mrc.Builder.make_edge')
def test_log_level(mock_make_edge, mock_make_node_full, config, morpheus_log_level):
"""
Test ensures the monitor stage doesn't add itself to the MRC pipeline if not configured for the current log-level
"""
input_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv")

set_log_level(morpheus_log_level)
monitor_stage_level = logging.INFO

should_be_included = (morpheus_log_level <= monitor_stage_level)

pipe = LinearPipeline(config)
pipe.set_source(FileSourceStage(config, filename=input_file))

ms = MonitorStage(config, log_level=monitor_stage_level)

pipe.add_stage(ms)
pipe.run()

expected_call_count = 1 if should_be_included else 0
assert mock_make_node_full.call_count == expected_call_count

0 comments on commit 123c848

Please sign in to comment.