Skip to content

Commit

Permalink
Add AEP: Allow CalcJobs to be actively monitored and interrupted
Browse files Browse the repository at this point in the history
  • Loading branch information
sphuber committed Sep 20, 2022
1 parent 37c934f commit ca315ba
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 0 deletions.
212 changes: 212 additions & 0 deletions 008_calcjob_monitors/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
# AEP 008: Allow `CalcJob`s to be actively monitored and interrupted

| AEP number | 008 |
|------------|-----------------------------------------------------------------|
| Title | Allow `CalcJob`s to be actively monitored and interrupted |
| Authors | [Sebastiaan P. Huber](mailto:sebastiaan.huber@epfl.ch) (sphuber)|
| Champions | [Sebastiaan P. Huber](mailto:sebastiaan.huber@epfl.ch) (sphuber)|
| Type | S - Standard |
| Created | 16-Sep-2022 |
| Status | submitted |

## Background
The `CalcJob` interface allows to run code, external to AiiDA, on a pre-configured compute resource.
Currently there is no way to control the termination of jobs, either manually or automatically, other than by killing the `CalcJob`.
This will cause the engine to terminate the job through the job scheduler associated with the compute resource.
The downside of this method is that a) it is not automated, and b) the calculation is killed in an abrupt manner, without giving it the opportunity to shut down cleanly.

There are a variety of use-cases where one would like a `CalcJob` to be monitored and have it shutdown, potentially in a clean manner, before it reaches the end.
This AEP describes new functionality that will allow users to define such monitors and attaching them to a `CalcJob`.

## Design requirements
The new functionality should satisfy the following requirements, in order of importance:

* **Functionality**: The `CalcJob` monitor interface should allow to retrieve files from the remote working directory of the job in order to inspect them and determine whether the job should be stopped
* **Functionality**: The `CalcJob` monitor interface should allow to write files to the remote working directory of the job. This is useful for codes that actively look for files in the running directory to determine whether to shut down cleanly.
* **Functionality**: The `CalcJob` monitor interface should allow to execute commands in the remote working directory of the job. This could be important to execute specific code that will pre-process output generated by the job. For example, this could be used to analyze large output files and produce smaller report files, which would allow to prevent retrieving large output files repeatedly.
* **Interface**: Defining new monitors and adding monitors to `CalcJob`s should be as simple as possible and should require as little custom setup and configuration as possible.
* **Performance**: The implementation should minimize the impact on the performance of AiiDA. It should limit network usage, opening transport connections to remote compute resources, the load on daemon workers, and the load on the database.
* **Functionality**: Ideally, the monitoring of jobs remains active even when AiiDA itself is not running.

## Implementation
The proposed implementation leverages the current design of the engine that uses an event loop to process tasks surrounding the `CalcJob`s it is running asynchronously.
After a `CalcJob` has been submitted to the scheduler, the engine will periodically poll its status through the scheduler.
This task is updated to also execute all monitors that were attached to the `CalcJob`, if any.
Each iteration of the job update task, the engine will loop over the monitors, call them one by one, and act based on their response, for example, kill the job.

### Base functionality

A `CalcJob` monitor is a function that implements the following signature:
```python
from aiida.orm import CalcJobNode
from aiida.transports import Transport

def monitor(node: CalcJobNode, transport: Transport) -> str | None:
"""Retrieve and inspect files in working directory of job to determine whether the job should be killed.
:param node: The node representing the calculation job.
:param transport: The transport that can be used to retrieve files from remote working directory.
:returns: A string if the job should be killed, `None` otherwise.
"""
```
The `node` and the `transport` arguments are required.
The `node` is a reference to the calculation job node, which can be used to retrieve its input nodes, for example.
The `transport` can be used to retrieve files from the working directory of the calculation running on the remote computer.
This allows you to inspect the content and determine whether the job should be prematurely killed.
In addition, it can also be used to write files to the working directory, or execute commands on the remote computer.

A monitor can define additional keyword arguments that a user can use to modify its behavior.
The arguments can take any value, as long as it is JSON-serializable.
This is necessary because the arguments that are passed to a monitor are stored in the database in order to preserve provenance.

It is recommended to write out each supported keyword argument and not use the ``**kwargs`` catch-all, for example:
```python
from aiida.orm import CalcJobNode
from aiida.transports import Transport

def monitor(node: CalcJobNode, transport: Transport, custom_keyword: bool = False) -> str | None:
"""Retrieve and inspect files in working directory of job to determine whether the job should be killed.
:param node: The node representing the calculation job.
:param transport: The transport that can be used to retrieve files from remote working directory.
:param custom_keyword: Optional keyword, when set to ``True`` will do something different.
:returns: A string if the job should be killed, `None` otherwise.
"""
```
This will allow the engine to validate the arguments provided by a user.
If unsupported arguments are provided to a monitor, the calculation job will not start and the user will be notified of the mistake.

In order to attach a monitor to a `CalcJob` it first has to be registered with an entry point in the `aiida.calculations.monitors` group, for example:
```python
[project.entry-points.'aiida.calculations.monitors']
'core.always_kill' = 'aiida.calculations.monitors.base:always_kill'
```

Once registered, the entry point can be used to attach the corresponding monitor to a `CalcJob`.
The `CalcJob` input namespace contains the `monitors` namespace that accepts a dictionary of `Dict` nodes.
```python
builder = load_code('bash@localhost').get_builder()
builder.x = Int(1)
builder.y = Int(2)
builder.monitors = {'always_kill': Dict({'entry_point': 'core.always_kill'})}
run.get_node(builder)
```
Each `Dict` node corresponds to a monitor that will be attached and has a single required keyword `entry_point`, which should correspond to the entry point name of the monitor.

If during the runtime of the `CalcJob`, any of the monitors returns a string, the `CalcJob` will send the kill command through the scheduler, the current output files will be retrieved and the parser, if defined in the inputs, will be called.
The `CalcJob` will eventually terminate and the `CalcJob.exit_codes.STOPPED_BY_MONITOR` exit code will be set.

### Advanced functionality

#### Monitor result
The default operation for a monitor is to return a string, in which case the engine will kill the job through the scheduler and the `CalcJob` will proceed by retrieving and parsing the results.
This behavior can be further customized through the `MonitorResult` object.
This simple class has various attributes that can be used to control the response of the engine:

* `action`: Instance of the `MonitorResultAction` enum, where the default is `MonitorResultAction.kill`. The available options are:
* `kill`: Kill the job through the scheduler and proceed to the retrieve step of the `CalcJob`.
* `disable-self`: Disable the current monitor for this `CalcJob` and do not call it anymore.
* `disable-all`: Disable all monitors configured for this `CalcJob`.
* `retrieve`: Boolean, `True` by default. If `False`, skip the retrieval of the output files.
* `parse`: Boolean, `True` by default. If `False`, skip the parsing of the retrieved output files, if a parser was defined in the inputs of the `CalcJob`. Is ignored if `MonitorResult.retrieve == True`.
* `override_exit_code`: Boolean, `True` by default. If `False`, the exit code returned by the parser, if available, will not be overridden by the `CalcJob.exit_codes.STOPPED_BY_MONITOR` exit code.

#### Monitor execution order
By default, the monitors are executed in alphabetical order based on their keys in the `monitors` input namespace.
The order can be controlled using the `priority` key in the `monitors` input.
```python
builder.monitors = {
'monitor_one': Dict({'entry_point': 'entry_point_one', 'priority': 100})
'monitor_one': Dict({'entry_point': 'entry_point_one'})
}
```
Higher priorities will be executed first.
It is not necessary to define a priority for all monitors, in the absence of a priority, a priority of 0 is assumed.
For monitors with identical priority, the order remains alphabetical based on their key in the `monitors` input namespace.

#### Monitor execution frequency
By default, all monitors are executed during each scheduler update cycle.
This interval is controlled by the `minimum_scheduler_poll_interval` property of the `Computer`, which can be retrieved and set through the `get_minimum_job_poll_interval` and `set_minimum_job_poll_interval`, respectively.
The frequency of monitor execution can be reduced by setting a larger interval for the `minimum_poll_interval` key in the monitor input definition:
```python
builder.monitors = {
'monitor_one': Dict({'entry_point': 'entry_point_one', 'minimum_poll_interval': 600})
}
```
The engine will guarantee that the interval between calls of the monitor is at least the value specified by `minimum_poll_interval`.
Due to a number of other intervals that are part of the `CalcJob` pipeline, it is possible however, that the effective interval between monitor calls will be larger than that.

## Example monitors

### Stopping based on content in output file

To stop a calculation based on the content of a particular output file, first the file should be retrieved locally using the `transport`:
```python
from aiida.orm import CalcJobNode
from aiida.transports import Transport

def monitor(node: CalcJobNode, transport: Transport) -> str | None:
"""Retrieve and inspect files in working directory of job to determine whether the job should be killed.
:param node: The node representing the calculation job.
:param transport: The transport that can be used to retrieve files from remote working directory.
:returns: A string if the job should be killed, `None` otherwise.
"""
with tempfile.NamedTemporaryFile('w+') as handle:
transport.getfile('some-file.txt', handle.name)
handle.seek(0)
output = handle.read()

if 'problem' in output:
return 'The calculation has encountered a problem so were aborting.'
```
Then the file content can be analyzed to determine any particular problems.
If a problem is detected and the calculation should be stopped, it suffices to return a string with a relevant message.

### Performing a clean stop

When a monitor instructs the engine to stop the calculation, the engine will signal the kill command to the scheduler.
This will cause the calculation to be forcefully interrupted.
In certain cases, this is undesirable as the calculation does not get the chance to shut down cleanly, and the output files cannot be reused for a restart.

Certain codes allow to shutdown cleanly prematurely through a sentinel file.
A sentinel file is a file with a particular filename or format that the code will periodically look for in the working directory and when found will shutdown graciously.
This functionality can be used by a monitor to shutdown a job cleanly:
```python
from aiida.engine import CalcJobMonitorResult
from aiida.orm import CalcJobNode
from aiida.transports import Transport

def monitor(node: CalcJobNode, transport: Transport, custom_keyword: bool = False) -> str | None:
"""Retrieve and inspect files in working directory of job to determine whether the job should be killed.
:param node: The node representing the calculation job.
:param transport: The transport that can be used to retrieve files from remote working directory.
:param custom_keyword: Optional keyword, when set to ``True`` will do something different.
:returns: A string if the job should be killed, `None` otherwise.
"""
with tempfile.NamedTemporaryFile('w+') as handle:
transport.getfile('some-file.txt', handle.name)
handle.seek(0)
output = handle.read()

if 'problem' in output:
with tempfile.NamedTemporaryFile('w+') as handle:
handle.write('stop')
transport.put(handle.name, 'EXIT')

return CalcJobMonitorResult(
action='disable-all',
override_exit_code=False
)
```
The sentinel file is written to the working directory using the `Transport.put` method.
In this example, it writes the `EXIT` file in the remote working directory.

After the sentinel file is written, the job needs to be given the time to shut down graciously and so shouldn't be interrupted.
This is why the monitor returns a `CalcJobMonitorResult` instance, instead of just a simple string.
This is because returning a string would have aborted the job immediately.
The `CalcJobMonitorResult` instead allows the monitor to instruct the engine to disable all monitors (using `action='disable-all'`).
In addition, we set `override_exit_code=False` such that the exit code of the parser is kept as if the code would have terminated nominally.

### Execute command and retrieve file
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Accepted AEPs can be easily browsed and read on the [online documentation](https
| 005 | draft | [New Export Format](005_exportformat/readme.md) |
| 006 | implemented | [Efficient object store for the AiiDA repository](006_efficient_object_store_for_repository/readme.md) |
| 007 | implemented | [Abstract and improve the file repository](007_improved_file_repository/readme.md) |
| 008 | implemented | [Allow `CalcJob`s to be actively monitored and interrupted](008_calcjob_monitors/readme.md) |

## Submitting an AEP

Expand Down

0 comments on commit ca315ba

Please sign in to comment.