Skip to content

Commit

Permalink
CLI: show daemon worker load even when under threshold
Browse files Browse the repository at this point in the history
The extra query performed in `verdi process list` to get the load on the
daemon runners is slow - and usually results in no output for the user
in case that the load is below the threshold. With this change, the load
is shown even when below the threshold. Furthermore, the query is only
performed if the daemon is running and skipped otherwise.
  • Loading branch information
ltalirz authored and sphuber committed Aug 12, 2021
1 parent 922034f commit c5f0a6c
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 70 deletions.
23 changes: 14 additions & 9 deletions aiida/cmdline/commands/cmd_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def process_list(
# pylint: disable=too-many-locals
from tabulate import tabulate
from aiida.cmdline.utils.common import print_last_process_state_change, check_worker_load
from aiida.engine.daemon.client import get_daemon_client

relationships = {}

Expand All @@ -77,15 +78,19 @@ def process_list(
echo.echo(tabulated)
echo.echo(f'\nTotal results: {len(projected)}\n')
print_last_process_state_change()
# Second query to get active process count
# Currently this is slow but will be fixed wiith issue #2770
# We place it at the end so that the user can Ctrl+C after getting the process table.
builder = CalculationQueryBuilder()
filters = builder.get_filters(process_state=('created', 'waiting', 'running'))
query_set = builder.get_query_set(filters=filters)
projected = builder.get_projected(query_set, projections=['pk'])
worker_slot_use = len(projected) - 1
check_worker_load(worker_slot_use)

if not get_daemon_client().is_daemon_running:
echo.echo_warning('the daemon is not running', bold=True)
else:
# Second query to get active process count
# Currently this is slow but will be fixed with issue #2770
# We place it at the end so that the user can Ctrl+C after getting the process table.
builder = CalculationQueryBuilder()
filters = builder.get_filters(process_state=('created', 'waiting', 'running'))
query_set = builder.get_query_set(filters=filters)
projected = builder.get_projected(query_set, projections=['pk'])
worker_slot_use = len(projected) - 1
check_worker_load(worker_slot_use)


@verdi_process.command('show')
Expand Down
17 changes: 7 additions & 10 deletions aiida/cmdline/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,15 @@ def format_local_time(timestamp, format_str='%Y-%m-%d %H:%M:%S'):
def print_last_process_state_change(process_type=None):
"""
Print the last time that a process of the specified type has changed its state.
This function will also print a warning if the daemon is not running.
:param process_type: optional process type for which to get the latest state change timestamp.
Valid process types are either 'calculation' or 'work'.
"""
from aiida.cmdline.utils.echo import echo_info, echo_warning
from aiida.cmdline.utils.echo import echo_info
from aiida.common import timezone
from aiida.common.utils import str_timedelta
from aiida.engine.daemon.client import get_daemon_client
from aiida.engine.utils import get_process_state_change_timestamp

client = get_daemon_client()

timestamp = get_process_state_change_timestamp(process_type)

if timestamp is None:
Expand All @@ -72,9 +68,6 @@ def print_last_process_state_change(process_type=None):
relative = str_timedelta(timedelta, negative_to_zero=True, max_num_fields=1)
echo_info(f'last time an entry changed state: {relative} ({formatted})')

if not client.is_daemon_running:
echo_warning('the daemon is not running', bold=True)


def get_node_summary(node):
"""Return a multi line string with a pretty formatted summary of a Node.
Expand Down Expand Up @@ -476,8 +469,8 @@ def get_num_workers():
raise CircusCallError
try:
return response['numprocesses']
except KeyError:
raise CircusCallError('Circus did not return the number of daemon processes')
except KeyError as exc:
raise CircusCallError('Circus did not return the number of daemon processes') from exc


def check_worker_load(active_slots):
Expand Down Expand Up @@ -511,3 +504,7 @@ def check_worker_load(active_slots):
echo.echo('') # New line
echo.echo_warning(f'{percent_load * 100:.0f}% of the available daemon worker slots have been used!')
echo.echo_warning("Increase the number of workers with 'verdi daemon incr'.\n")
else:
echo.echo_info(f'Using {percent_load * 100:.0f}% of the available daemon worker slots')
else:
echo.echo_info('No active daemon workers')
2 changes: 2 additions & 0 deletions aiida/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# yapf: disable
# pylint: disable=wildcard-import

from .daemon import *
from .exceptions import *
from .launch import *
from .persistence import *
Expand All @@ -30,6 +31,7 @@
'CalcJob',
'CalcJobOutputPort',
'CalcJobProcessSpec',
'DaemonClient',
'ExitCode',
'ExitCodesNamespace',
'FunctionProcess',
Expand Down
14 changes: 14 additions & 0 deletions aiida/engine/daemon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,17 @@
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Module with resources for the daemon."""

# AUTO-GENERATED

# yapf: disable
# pylint: disable=wildcard-import

from .client import *

__all__ = (
'DaemonClient',
)

# yapf: enable
7 changes: 3 additions & 4 deletions aiida/engine/daemon/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""
Controls the daemon
"""

"""Client to interact with the daemon."""
import enum
import os
import shutil
Expand All @@ -31,6 +28,8 @@
# see https://github.com/python/typing/issues/182
JsonDictType = Dict[str, Any]

__all__ = ('DaemonClient',)


class ControllerProtocol(enum.Enum):
"""
Expand Down
78 changes: 31 additions & 47 deletions tests/cmdline/commands/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,60 +288,44 @@ def test_report(self):
self.assertEqual(get_result_lines(result)[0], 'No log messages recorded for this entry')


class TestVerdiProcessListWarning(AiidaTestCase):
"""Tests for the `verdi process list` active slots warning."""

def setUp(self):
super().setUp()
self.cli_runner = CliRunner()
# Override the call to the circus client to retrieve the number of workers
# As we don't have a running circus client, this will normally fail, so here we simulate the
# response by redefining the function to get the final value we want.
import aiida.cmdline.utils.common
self.real_get_num_workers = aiida.cmdline.utils.common.get_num_workers
aiida.cmdline.utils.common.get_num_workers = lambda: 1

def tearDown(self):
# Reset the redefined function
import aiida.cmdline.utils.common
aiida.cmdline.utils.common.get_num_workers = self.real_get_num_workers
super().tearDown()

def test_list_worker_slot_warning(self):
"""
Test that the if the number of used worker process slots exceeds a threshold,
that the warning message is displayed to the user when running `verdi process list`
"""
from aiida.engine import ProcessState
from aiida.manage.configuration import get_config

# Get the number of allowed processes per worker:
config = get_config()
worker_process_slots = config.get_option('daemon.worker_process_slots', config.current_profile.name)
limit = int(worker_process_slots * 0.9)
@pytest.mark.usefixtures('clear_database_before_test')
def test_list_worker_slot_warning(run_cli_command, monkeypatch):
"""
Test that the if the number of used worker process slots exceeds a threshold,
that the warning message is displayed to the user when running `verdi process list`
"""
from aiida.cmdline.utils import common
from aiida.engine import ProcessState, DaemonClient
from aiida.manage.configuration import get_config

# Create additional active nodes such that we have 90% of the active slot limit
for _ in range(limit):
calc = WorkFunctionNode()
calc.set_process_state(ProcessState.RUNNING)
calc.store()
monkeypatch.setattr(common, 'get_num_workers', lambda: 1)
monkeypatch.setattr(DaemonClient, 'is_daemon_running', lambda: True)

# Default cmd should not throw the warning as we are below the limit
result = self.cli_runner.invoke(cmd_process.process_list)
self.assertClickResultNoException(result)
warning_phrase = 'of the available daemon worker slots have been used!'
self.assertTrue(all([warning_phrase not in line for line in get_result_lines(result)]))
# Get the number of allowed processes per worker:
config = get_config()
worker_process_slots = config.get_option('daemon.worker_process_slots', config.current_profile.name)
limit = int(worker_process_slots * 0.9)

# Add one more running node to put us over the limit
# Create additional active nodes such that we have 90% of the active slot limit
for _ in range(limit):
calc = WorkFunctionNode()
calc.set_process_state(ProcessState.RUNNING)
calc.store()

# Now the warning should fire
result = self.cli_runner.invoke(cmd_process.process_list)
self.assertClickResultNoException(result)
warning_phrase = '% of the available daemon worker slots have been used!'
self.assertTrue(any([warning_phrase in line for line in get_result_lines(result)]))
# Default cmd should not throw the warning as we are below the limit
result = run_cli_command(cmd_process.process_list)
warning_phrase = 'of the available daemon worker slots have been used!'
assert all([warning_phrase not in line for line in result.output_lines])

# Add one more running node to put us over the limit
calc = WorkFunctionNode()
calc.set_process_state(ProcessState.RUNNING)
calc.store()

# Now the warning should fire
result = run_cli_command(cmd_process.process_list)
warning_phrase = '% of the available daemon worker slots have been used!'
assert any([warning_phrase in line for line in result.output_lines])


class TestVerdiProcessCallRoot(AiidaTestCase):
Expand Down

0 comments on commit c5f0a6c

Please sign in to comment.