Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cli: notify users of runner usage #4663

Merged
merged 2 commits into from
Aug 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions aiida/cmdline/commands/cmd_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def process_list(
# pylint: disable=too-many-locals
from tabulate import tabulate
from aiida.cmdline.utils.common import print_last_process_state_change, check_worker_load
from aiida.engine.daemon.client import get_daemon_client

relationships = {}

Expand All @@ -77,15 +78,19 @@ def process_list(
echo.echo(tabulated)
echo.echo(f'\nTotal results: {len(projected)}\n')
print_last_process_state_change()
# Second query to get active process count
# Currently this is slow but will be fixed wiith issue #2770
# We place it at the end so that the user can Ctrl+C after getting the process table.
builder = CalculationQueryBuilder()
filters = builder.get_filters(process_state=('created', 'waiting', 'running'))
query_set = builder.get_query_set(filters=filters)
projected = builder.get_projected(query_set, projections=['pk'])
worker_slot_use = len(projected) - 1
check_worker_load(worker_slot_use)

if not get_daemon_client().is_daemon_running:
echo.echo_warning('the daemon is not running', bold=True)
else:
# Second query to get active process count
# Currently this is slow but will be fixed with issue #2770
# We place it at the end so that the user can Ctrl+C after getting the process table.
builder = CalculationQueryBuilder()
filters = builder.get_filters(process_state=('created', 'waiting', 'running'))
query_set = builder.get_query_set(filters=filters)
projected = builder.get_projected(query_set, projections=['pk'])
worker_slot_use = len(projected) - 1
check_worker_load(worker_slot_use)


@verdi_process.command('show')
Expand Down
17 changes: 7 additions & 10 deletions aiida/cmdline/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,15 @@ def format_local_time(timestamp, format_str='%Y-%m-%d %H:%M:%S'):
def print_last_process_state_change(process_type=None):
"""
Print the last time that a process of the specified type has changed its state.
This function will also print a warning if the daemon is not running.

:param process_type: optional process type for which to get the latest state change timestamp.
Valid process types are either 'calculation' or 'work'.
"""
from aiida.cmdline.utils.echo import echo_info, echo_warning
from aiida.cmdline.utils.echo import echo_info
from aiida.common import timezone
from aiida.common.utils import str_timedelta
from aiida.engine.daemon.client import get_daemon_client
from aiida.engine.utils import get_process_state_change_timestamp

client = get_daemon_client()

timestamp = get_process_state_change_timestamp(process_type)

if timestamp is None:
Expand All @@ -72,9 +68,6 @@ def print_last_process_state_change(process_type=None):
relative = str_timedelta(timedelta, negative_to_zero=True, max_num_fields=1)
echo_info(f'last time an entry changed state: {relative} ({formatted})')

if not client.is_daemon_running:
echo_warning('the daemon is not running', bold=True)


def get_node_summary(node):
"""Return a multi line string with a pretty formatted summary of a Node.
Expand Down Expand Up @@ -476,8 +469,8 @@ def get_num_workers():
raise CircusCallError
try:
return response['numprocesses']
except KeyError:
raise CircusCallError('Circus did not return the number of daemon processes')
except KeyError as exc:
raise CircusCallError('Circus did not return the number of daemon processes') from exc


def check_worker_load(active_slots):
Expand Down Expand Up @@ -511,3 +504,7 @@ def check_worker_load(active_slots):
echo.echo('') # New line
echo.echo_warning(f'{percent_load * 100:.0f}% of the available daemon worker slots have been used!')
echo.echo_warning("Increase the number of workers with 'verdi daemon incr'.\n")
else:
echo.echo_info(f'Using {percent_load * 100:.0f}% of the available daemon worker slots')
else:
echo.echo_info('No active daemon workers')
2 changes: 2 additions & 0 deletions aiida/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# yapf: disable
# pylint: disable=wildcard-import

from .daemon import *
from .exceptions import *
from .launch import *
from .persistence import *
Expand All @@ -30,6 +31,7 @@
'CalcJob',
'CalcJobOutputPort',
'CalcJobProcessSpec',
'DaemonClient',
'ExitCode',
'ExitCodesNamespace',
'FunctionProcess',
Expand Down
14 changes: 14 additions & 0 deletions aiida/engine/daemon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,17 @@
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Module with resources for the daemon."""

# AUTO-GENERATED

# yapf: disable
# pylint: disable=wildcard-import

from .client import *

__all__ = (
'DaemonClient',
)

# yapf: enable
7 changes: 3 additions & 4 deletions aiida/engine/daemon/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""
Controls the daemon
"""

"""Client to interact with the daemon."""
import enum
import os
import shutil
Expand All @@ -31,6 +28,8 @@
# see https://github.com/python/typing/issues/182
JsonDictType = Dict[str, Any]

__all__ = ('DaemonClient',)


class ControllerProtocol(enum.Enum):
"""
Expand Down
78 changes: 31 additions & 47 deletions tests/cmdline/commands/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,60 +288,44 @@ def test_report(self):
self.assertEqual(get_result_lines(result)[0], 'No log messages recorded for this entry')


class TestVerdiProcessListWarning(AiidaTestCase):
"""Tests for the `verdi process list` active slots warning."""

def setUp(self):
super().setUp()
self.cli_runner = CliRunner()
# Override the call to the circus client to retrieve the number of workers
# As we don't have a running circus client, this will normally fail, so here we simulate the
# response by redefining the function to get the final value we want.
import aiida.cmdline.utils.common
self.real_get_num_workers = aiida.cmdline.utils.common.get_num_workers
aiida.cmdline.utils.common.get_num_workers = lambda: 1

def tearDown(self):
# Reset the redefined function
import aiida.cmdline.utils.common
aiida.cmdline.utils.common.get_num_workers = self.real_get_num_workers
super().tearDown()

def test_list_worker_slot_warning(self):
"""
Test that the if the number of used worker process slots exceeds a threshold,
that the warning message is displayed to the user when running `verdi process list`
"""
from aiida.engine import ProcessState
from aiida.manage.configuration import get_config

# Get the number of allowed processes per worker:
config = get_config()
worker_process_slots = config.get_option('daemon.worker_process_slots', config.current_profile.name)
limit = int(worker_process_slots * 0.9)
@pytest.mark.usefixtures('clear_database_before_test')
def test_list_worker_slot_warning(run_cli_command, monkeypatch):
"""
Test that the if the number of used worker process slots exceeds a threshold,
that the warning message is displayed to the user when running `verdi process list`
"""
from aiida.cmdline.utils import common
from aiida.engine import ProcessState, DaemonClient
from aiida.manage.configuration import get_config

# Create additional active nodes such that we have 90% of the active slot limit
for _ in range(limit):
calc = WorkFunctionNode()
calc.set_process_state(ProcessState.RUNNING)
calc.store()
monkeypatch.setattr(common, 'get_num_workers', lambda: 1)
monkeypatch.setattr(DaemonClient, 'is_daemon_running', lambda: True)

# Default cmd should not throw the warning as we are below the limit
result = self.cli_runner.invoke(cmd_process.process_list)
self.assertClickResultNoException(result)
warning_phrase = 'of the available daemon worker slots have been used!'
self.assertTrue(all([warning_phrase not in line for line in get_result_lines(result)]))
# Get the number of allowed processes per worker:
config = get_config()
worker_process_slots = config.get_option('daemon.worker_process_slots', config.current_profile.name)
limit = int(worker_process_slots * 0.9)

# Add one more running node to put us over the limit
# Create additional active nodes such that we have 90% of the active slot limit
for _ in range(limit):
calc = WorkFunctionNode()
calc.set_process_state(ProcessState.RUNNING)
calc.store()

# Now the warning should fire
result = self.cli_runner.invoke(cmd_process.process_list)
self.assertClickResultNoException(result)
warning_phrase = '% of the available daemon worker slots have been used!'
self.assertTrue(any([warning_phrase in line for line in get_result_lines(result)]))
# Default cmd should not throw the warning as we are below the limit
result = run_cli_command(cmd_process.process_list)
warning_phrase = 'of the available daemon worker slots have been used!'
assert all([warning_phrase not in line for line in result.output_lines])

# Add one more running node to put us over the limit
calc = WorkFunctionNode()
calc.set_process_state(ProcessState.RUNNING)
calc.store()

# Now the warning should fire
result = run_cli_command(cmd_process.process_list)
warning_phrase = '% of the available daemon worker slots have been used!'
assert any([warning_phrase in line for line in result.output_lines])


class TestVerdiProcessCallRoot(AiidaTestCase):
Expand Down