Skip to content

Commit

Permalink
Add ability to specify hardware policies on dragon run requests
Browse files Browse the repository at this point in the history
  • Loading branch information
ankona committed Jul 17, 2024
1 parent 96b37c2 commit fc83fd1
Show file tree
Hide file tree
Showing 16 changed files with 1,826 additions and 25 deletions.
1 change: 1 addition & 0 deletions doc/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ To be released at some future point in time

Description

- Add hardware pinning capability when using dragon
- Pin NumPy version to 1.x
- New launcher support for SGE (and similar derivatives)
- Fix test outputs being created in incorrect directory
Expand Down
28 changes: 28 additions & 0 deletions doc/dragon.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,34 @@ In the next sections, we detail how Dragon is integrated into SmartSim.

For more information on HPC launchers, visit the :ref:`Run Settings<run_settings_hpc_ex>` page.

Hardware Pinning
================

Dragon also enables users to specify hardware constraints using ``DragonRunSettings``. CPU
and GPU affinity can be specified using the ``DragonRunSettings`` object. The following
example demonstrates how to specify CPU affinity and GPU affinities simultaneously. Note
that affinities are passed as a list of device indices.

.. code-block:: python
# Because "dragon" was specified as the launcher during Experiment initialization,
# create_run_settings will return a DragonRunSettings object
rs = exp.create_run_settings(exe="mpi_app",
exe_args=["--option", "value"],
env_vars={"MYVAR": "VALUE"})
# Request the first 8 CPUs for this job
rs.set_cpu_affinity(list(range(9)))
# Request the first two GPUs on the node for this job
rs.set_gpu_affinity([0, 1])
.. note::

SmartSim launches jobs in the order they are received on the first available
host in a round-robin pattern. To ensure a process is launched on a node with
specific features, configure a hostname constraint.

=================
The Dragon Server
=================
Expand Down
6 changes: 6 additions & 0 deletions doc/tutorials/online_analysis/lattice/online_analysis.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@
},
{
"cell_type": "code",
"id": "6f3ed63d-e324-443d-9b68-b2cf618d31c7",
"execution_count": 7,
"metadata": {},
"outputs": [
Expand All @@ -399,13 +400,15 @@
},
{
"cell_type": "markdown",
"id": "96c154fe-5ca8-4d89-91f8-8fd4e75cb80e",
"metadata": {},
"source": [
"We then apply the function `probe_points` to the `ux` and `uy` tensors computed in the last time step of the previous simulation. Note that all tensors are already on the DB, thus we can reference them by name. Finally, we download and plot the output (a 2D velocity field), which is stored as `probe_u` on the DB."
]
},
{
"cell_type": "code",
"id": "36e3b415-dcc1-4d25-9cce-52388146a4bb",
"execution_count": 8,
"metadata": {},
"outputs": [
Expand All @@ -432,6 +435,7 @@
},
{
"cell_type": "markdown",
"id": "9d7e4966-a0de-480c-9556-936197a5a5d2",
"metadata": {},
"source": [
"### Uploading a function inline\n",
Expand All @@ -453,6 +457,7 @@
},
{
"cell_type": "markdown",
"id": "1c4daf43-34d0-482a-b9b5-b3b6f1e173c4",
"metadata": {},
"source": [
"We then store the function on the DB under the key `norm_function`."
Expand All @@ -470,6 +475,7 @@
},
{
"cell_type": "markdown",
"id": "19409ac6-e118-44db-a847-2d905fdf0331",
"metadata": {},
"source": [
"Note that the key we used identifies a functional unit containing the function itself: this is similar to the key used to store the `probe` script above. When we want to run the function, we just call it with `run_script`, by indicating the `script` key as `\"norm_function\"` and the name of the function itself as `\"compute_norm\"`."
Expand Down
85 changes: 79 additions & 6 deletions smartsim/_core/launcher/dragon/dragonBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,12 @@ def group_infos(self) -> dict[str, ProcessGroupInfo]:
def _initialize_hosts(self) -> None:
with self._queue_lock:
self._hosts: t.List[str] = sorted(
dragon_machine.Node(node).hostname
for node in dragon_machine.System().nodes
node for node in dragon_machine.System().nodes
)
self._nodes = [dragon_machine.Node(node) for node in self._hosts]
self._cpus = [node.num_cpus for node in self._nodes]
self._gpus = [node.num_gpus for node in self._nodes]

"""List of hosts available in allocation"""
self._free_hosts: t.Deque[str] = collections.deque(self._hosts)
"""List of hosts on which steps can be launched"""
Expand Down Expand Up @@ -285,6 +288,34 @@ def current_time(self) -> float:
"""Current time for DragonBackend object, in seconds since the Epoch"""
return time.time()

def _can_honor_policy(
self, request: DragonRunRequest
) -> t.Tuple[bool, t.Optional[str]]:
"""Check if the policy can be honored with resources available
in the allocation.
:param request: DragonRunRequest containing policy information
:returns: Tuple indicating if the policy can be honored and
an optional error message"""
# ensure the policy can be honored
if request.policy:
if request.policy.cpu_affinity:
# make sure some node has enough CPUs
available = max(self._cpus)
requested = max(request.policy.cpu_affinity)

if requested >= available:
return False, "Cannot satisfy request, not enough CPUs available"

if request.policy.gpu_affinity:
# make sure some node has enough GPUs
available = max(self._gpus)
requested = max(request.policy.gpu_affinity)

if requested >= available:
return False, "Cannot satisfy request, not enough GPUs available"

return True, None

def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str]]:
"""Check if request can be honored with resources available in the allocation.
Expand All @@ -299,6 +330,11 @@ def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str]
if self._shutdown_requested:
message = "Cannot satisfy request, server is shutting down."
return False, message

honorable, err = self._can_honor_policy(request)
if not honorable:
return False, err

return True, None

def _allocate_step(
Expand Down Expand Up @@ -391,6 +427,46 @@ def _stop_steps(self) -> None:
self._group_infos[step_id].status = SmartSimStatus.STATUS_CANCELLED
self._group_infos[step_id].return_codes = [-9]

@staticmethod
def create_run_policy(
request: DragonRequest, node_name: str
) -> "dragon_policy.Policy":
"""Create a dragon Policy from the request and node name
:param request: DragonRunRequest containing policy information
:param node_name: Name of the node on which the process will run
:returns: dragon_policy.Policy object mapped from request properties"""
if isinstance(request, DragonRunRequest):
run_request: DragonRunRequest = request

affinity = dragon_policy.Policy.Affinity.DEFAULT
cpu_affinity: t.List[int] = []
gpu_affinity: t.List[int] = []

# Customize policy only if the client requested it, otherwise use default
if run_request.policy is not None:
# Affinities are not mutually exclusive. If specified, both are used
if run_request.policy.cpu_affinity:
affinity = dragon_policy.Policy.Affinity.SPECIFIC
cpu_affinity = run_request.policy.cpu_affinity

if run_request.policy.gpu_affinity:
affinity = dragon_policy.Policy.Affinity.SPECIFIC
gpu_affinity = run_request.policy.gpu_affinity

if affinity != dragon_policy.Policy.Affinity.DEFAULT:
return dragon_policy.Policy(
placement=dragon_policy.Policy.Placement.HOST_NAME,
host_name=node_name,
affinity=affinity,
cpu_affinity=cpu_affinity,
gpu_affinity=gpu_affinity,
)

return dragon_policy.Policy(
placement=dragon_policy.Policy.Placement.HOST_NAME,
host_name=node_name,
)

def _start_steps(self) -> None:
self._heartbeat()
with self._queue_lock:
Expand All @@ -412,10 +488,7 @@ def _start_steps(self) -> None:

policies = []
for node_name in hosts:
local_policy = dragon_policy.Policy(
placement=dragon_policy.Policy.Placement.HOST_NAME,
host_name=node_name,
)
local_policy = self.create_run_policy(request, node_name)
policies.extend([local_policy] * request.tasks_per_node)
tmp_proc = dragon_process.ProcessTemplate(
target=request.exe,
Expand Down
6 changes: 6 additions & 0 deletions smartsim/_core/launcher/dragon/dragonLauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import os
import typing as t

from smartsim._core.schemas.dragonRequests import DragonRunPolicy

from ...._core.launcher.stepMapping import StepMap
from ....error import LauncherError, SmartSimError
from ....log import get_logger
Expand Down Expand Up @@ -168,6 +170,9 @@ def run(self, step: Step) -> t.Optional[str]:
merged_env = self._connector.merge_persisted_env(os.environ.copy())
nodes = int(run_args.get("nodes", None) or 1)
tasks_per_node = int(run_args.get("tasks-per-node", None) or 1)

policy = DragonRunPolicy.from_run_args(run_args)

response = _assert_schema_type(
self._connector.send_request(
DragonRunRequest(
Expand All @@ -181,6 +186,7 @@ def run(self, step: Step) -> t.Optional[str]:
current_env=merged_env,
output_file=out,
error_file=err,
policy=policy,
)
),
DragonRunResponse,
Expand Down
10 changes: 9 additions & 1 deletion smartsim/_core/launcher/step/dragonStep.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
import sys
import typing as t

from ...._core.schemas.dragonRequests import DragonRunRequest, request_registry
from ...._core.schemas.dragonRequests import (
DragonRunPolicy,
DragonRunRequest,
request_registry,
)
from ....error.errors import SSUnsupportedError
from ....log import get_logger
from ....settings import (
Expand Down Expand Up @@ -166,8 +170,11 @@ def _write_request_file(self) -> str:
nodes = int(run_args.get("nodes", None) or 1)
tasks_per_node = int(run_args.get("tasks-per-node", None) or 1)

policy = DragonRunPolicy.from_run_args(run_args)

cmd = step.get_launch_cmd()
out, err = step.get_output_files()

request = DragonRunRequest(
exe=cmd[0],
exe_args=cmd[1:],
Expand All @@ -179,6 +186,7 @@ def _write_request_file(self) -> str:
current_env=os.environ,
output_file=out,
error_file=err,
policy=policy,
)
requests.append(request_registry.to_string(request))
with open(request_file, "w", encoding="utf-8") as script_file:
Expand Down
3 changes: 2 additions & 1 deletion smartsim/_core/launcher/step/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from __future__ import annotations

import copy
import functools
import os.path as osp
import pathlib
Expand All @@ -51,7 +52,7 @@ def __init__(self, name: str, cwd: str, step_settings: SettingsBase) -> None:
self.entity_name = name
self.cwd = cwd
self.managed = False
self.step_settings = step_settings
self.step_settings = copy.deepcopy(step_settings)
self.meta: t.Dict[str, str] = {}

@property
Expand Down
41 changes: 40 additions & 1 deletion smartsim/_core/schemas/dragonRequests.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@

import typing as t

from pydantic import BaseModel, Field, PositiveInt
from pydantic import BaseModel, Field, NonNegativeInt, PositiveInt, ValidationError

import smartsim._core.schemas.utils as _utils
from smartsim.error.errors import SmartSimError

# Black and Pylint disagree about where to put the `...`
# pylint: disable=multiple-statements
Expand All @@ -39,6 +40,43 @@
class DragonRequest(BaseModel): ...


class DragonRunPolicy(BaseModel):
"""Policy specifying hardware constraints when running a Dragon job"""

cpu_affinity: t.List[NonNegativeInt] = Field(default_factory=list)
"""List of CPU indices to which the job should be pinned"""
gpu_affinity: t.List[NonNegativeInt] = Field(default_factory=list)
"""List of GPU indices to which the job should be pinned"""

@staticmethod
def from_run_args(
run_args: t.Dict[str, t.Union[int, str, float, None]]
) -> "DragonRunPolicy":
"""Create a DragonRunPolicy with hardware constraints passed from
a dictionary of run arguments
:param run_args: Dictionary of run arguments
:returns: DragonRunPolicy instance created from the run arguments"""
gpu_args = ""
if gpu_arg_value := run_args.get("gpu-affinity", None):
gpu_args = str(gpu_arg_value)

cpu_args = ""
if cpu_arg_value := run_args.get("cpu-affinity", None):
cpu_args = str(cpu_arg_value)

# run args converted to a string must be split back into a list[int]
gpu_affinity = [int(x.strip()) for x in gpu_args.split(",") if x]
cpu_affinity = [int(x.strip()) for x in cpu_args.split(",") if x]

try:
return DragonRunPolicy(
cpu_affinity=cpu_affinity,
gpu_affinity=gpu_affinity,
)
except ValidationError as ex:
raise SmartSimError("Unable to build DragonRunPolicy") from ex


class DragonRunRequestView(DragonRequest):
exe: t.Annotated[str, Field(min_length=1)]
exe_args: t.List[t.Annotated[str, Field(min_length=1)]] = []
Expand All @@ -57,6 +95,7 @@ class DragonRunRequestView(DragonRequest):
@request_registry.register("run")
class DragonRunRequest(DragonRunRequestView):
current_env: t.Dict[str, t.Optional[str]] = {}
policy: t.Optional[DragonRunPolicy] = None

def __str__(self) -> str:
return str(DragonRunRequestView.parse_obj(self.dict(exclude={"current_env"})))
Expand Down
Loading

0 comments on commit fc83fd1

Please sign in to comment.