Skip to content

Commit

Permalink
Respond to reviews
Browse files Browse the repository at this point in the history
- Refactor some of the tests to separate creation of experiment
and db
- Pin colocated database cpus based on the custom_pinning option
instead of two different inputs
  • Loading branch information
ashao committed Jun 29, 2023
1 parent d4150af commit b2b4ebe
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 117 deletions.
4 changes: 2 additions & 2 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,7 @@ def pytest_sessionfinish(session, exitstatus):
returning the exit status to the system.
"""
if exitstatus == 0:
# shutil.rmtree(test_dir)
pass
shutil.rmtree(test_dir)
else:
# kill all spawned processes in case of error
kill_all_test_spawned_processes()
Expand Down Expand Up @@ -598,6 +597,7 @@ def coloutils():
return ColoUtils

class ColoUtils:
@staticmethod
def setup_test_colo(fileutils, db_type, exp, db_args):
"""Setup things needed for setting up the colo pinning tests"""
# get test setup
Expand Down
17 changes: 4 additions & 13 deletions smartsim/_core/launcher/colocated.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ def _build_colocated_wrapper_cmd(
extra_db_args: t.Optional[t.Dict[str, str]] = None,
port: int = 6780,
ifname: t.Optional[t.Union[str, t.List[str]]] = None,
limit_db_cpus: bool = False,
db_cpu_list: t.Optional[str] = None,
custom_pinning: t.Optional[str] = None,
**kwargs: t.Any,
) -> str:
"""Build the command use to run a colocated DB application
Expand All @@ -99,8 +98,6 @@ def _build_colocated_wrapper_cmd(
:type port: int
:param ifname: network interface(s) to bind DB to
:type ifname: str | list[str], optional
:param limit_db_cpus: If True, limit the cpus that the database can run on
:type limit_db_cpus: bool, optional
:param db_cpu_list: The list of CPUs that the database should be limited to
:type db_cpu_list: str, optional
:return: the command to run
Expand All @@ -114,16 +111,11 @@ def _build_colocated_wrapper_cmd(
# the lock on the file.
lockfile = create_lockfile_name()




# create the command that will be used to launch the
# database with the python entrypoint for starting
# up the backgrounded db process

cmd = []
cmd.extend(
[
cmd = [
sys.executable,
"-m",
"smartsim._core.entrypoints.colocated",
Expand All @@ -132,7 +124,6 @@ def _build_colocated_wrapper_cmd(
"+db_cpus",
str(cpus),
]
)
# Add in the interface if using TCP/IP
if ifname:
if isinstance(ifname, str):
Expand All @@ -142,9 +133,9 @@ def _build_colocated_wrapper_cmd(
# collect DB binaries and libraries from the config

db_cmd = []
if limit_db_cpus and db_cpu_list:
if custom_pinning:
db_cmd.extend([
'taskset', '-c', db_cpu_list
'taskset', '-c', custom_pinning
])
db_cmd.extend([
CONFIG.database_exe,
Expand Down
5 changes: 2 additions & 3 deletions smartsim/_core/launcher/step/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ def get_colocated_launch_script(self) -> str:
else:
db_log_file = "/dev/null"

if sys.platform == 'darwin':
if isinstance(self, LocalStep) and db_settings["limit_db_cpus"] is True:
if sys.platform == 'darwin' and db_settings["db_cpu_list"]:
logger.warning(
"DB pinning is not supported on MacOS, setting limit_db_cpus = False")
db_settings["limit_db_cpus"] = False
Expand All @@ -93,7 +92,7 @@ def get_colocated_launch_script(self) -> str:
# entity currently being prepped to launch
write_colocated_launch_script(script_path, db_log_file, db_settings)
return script_path

def add_to_batch(self, step: Step) -> None:
"""Add a job step to this batch
Expand Down
90 changes: 44 additions & 46 deletions smartsim/entity/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@

from __future__ import annotations

import itertools
import typing as t
import warnings
import collections.abc

from .._core.utils.helpers import cat_arg_and_value, init_default
from ..error import EntityExistsError, SSUnsupportedError
Expand Down Expand Up @@ -154,10 +156,11 @@ def attach_generator_files(

def colocate_db(self, *args: t.Any, **kwargs: t.Any) -> None:
"""An alias for ``Model.colocate_db_tcp``"""
logger.warning(
warnings.warn(
(
"`colocate_db` has been deprecated and will be removed in a \n"
"future release. Please use `colocate_db_tcp` or `colocate_db_uds`."
"future release. Please use `colocate_db_tcp` or `colocate_db_uds`.",
FutureWarning
)
)
self.colocate_db_tcp(*args, **kwargs)
Expand All @@ -167,8 +170,7 @@ def colocate_db_uds(
unix_socket: str = "/tmp/redis.socket",
socket_permissions: int = 755,
db_cpus: int = 1,
limit_db_cpus: bool = True,
db_cpu_list: t.Optional[str] = None,
custom_pinning: t.Optional[t.Iterable[t.Union(int, t.Iterable[int])]] = None,
debug: bool = False,
**kwargs: t.Any,
) -> None:
Expand Down Expand Up @@ -200,11 +202,9 @@ def colocate_db_uds(
:type socket_permissions: int, optional
:param db_cpus: number of cpus to use for orchestrator, defaults to 1
:type db_cpus: int, optional
:param limit_db_cpus: whether to limit the number of cpus used by the database defaults to True
:type limit_db_cpus: bool, optional
:param db_cpu_list: lists the cpus which the database can be run on. Follows `taskset -c` syntax
e.g. '0-2,5' specifies processors 0, 1, 2, and 5
:type db_cpu_list: str, optional
:param custom_pinning: CPUs to pin the orchestrator to. Passing an empty iterable
disables pinning
:type custom_pinning: iterable of ints or iterable of ints, optional
:param debug: launch Model with extra debug information about the colocated db
:type debug: bool, optional
:param kwargs: additional keyword arguments to pass to the orchestrator database
Expand All @@ -219,8 +219,7 @@ def colocate_db_uds(

common_options = {
"cpus": db_cpus,
"limit_db_cpus": limit_db_cpus,
"db_cpu_list": db_cpu_list,
"custom_pinning": custom_pinning,
"debug": debug,
}
self._set_colocated_db_settings(uds_options, common_options, **kwargs)
Expand All @@ -230,8 +229,7 @@ def colocate_db_tcp(
port: int = 6379,
ifname: str = "lo",
db_cpus: int = 1,
limit_db_cpus: bool = True,
db_cpu_list: t.Optional[str] = None,
custom_pinning: t.Optional[t.Iterable[t.Union(int, t.Iterable[int])]] = None,
debug: bool = False,
**kwargs: t.Any,
) -> None:
Expand Down Expand Up @@ -263,11 +261,9 @@ def colocate_db_tcp(
:type ifname: str, optional
:param db_cpus: number of cpus to use for orchestrator, defaults to 1
:type db_cpus: int, optional
:param limit_db_cpus: whether to limit the number of cpus used by the database, defaults to True
:type limit_db_cpus: bool, optional
:param db_cpu_list: lists the cpus which the database can be run on. Follows `taskset -c` syntax
e.g. '0-2,5' specifies processors 0, 1, 2, and 5
:type db_cpu_list: str, optional
:param custom_pinning: CPUs to pin the orchestrator to. Passing an empty iterable
disables pinning
:type custom_pinning: iterable of ints or iterable of ints, optional
:param debug: launch Model with extra debug information about the colocated db
:type debug: bool, optional
:param kwargs: additional keyword arguments to pass to the orchestrator database
Expand All @@ -278,8 +274,7 @@ def colocate_db_tcp(
tcp_options = {"port": port, "ifname": ifname}
common_options = {
"cpus": db_cpus,
"limit_db_cpus": limit_db_cpus,
"db_cpu_list": db_cpu_list,
"custom_pinning": custom_pinning,
"debug": debug,
}
self._set_colocated_db_settings(tcp_options, common_options, **kwargs)
Expand All @@ -305,36 +300,15 @@ def _set_colocated_db_settings(

if "limit_app_cpus" in common_options:
raise SSUnsupportedError(
"Pinning of app CPUs via limit_app_cpus is no supported. Modify RunSettings " +
"Pinning of app CPUs via limit_app_cpus is no supported. Modify RunSettings "
"instead using the correct binding option for your launcher."
)

# TODO list which db settings can be extras
cpus = common_options["cpus"]
# Deal with cases where the database should be pinned to cpus
# (1) if the user set a db_cpu_list, but not limit_db_cpus
if common_options["db_cpu_list"] and not common_options["limit_db_cpus"]:
logger.warning(
"limit_db_cpus is False, but db_cpu_list is not None. Setting limit_db_cpus=True"
)
common_options["limit_db_cpus"] = True
# (2) limit_db_cpus, but not db_cpu_list is specified automatically set
# pin to the cpus 0:db_cpus-1
if common_options["limit_db_cpus"] and not common_options["db_cpu_list"]:
if cpus > 1:
cpu_list = f"0-{cpus-1}"
elif cpus == 1:
cpu_list = "0"
else:
raise ValueError("db_cpus must be a positive number")

logger.warning(
(
"limit_db_cpus is True, but db_cpu_list was not specified. Automatically "
f"pinning to processors {cpu_list}"
)
)
common_options["db_cpu_list"] = cpu_list
common_options["custom_pinning"] = self._create_pinning_string(
common_options["custom_pinning"],
common_options["cpus"]
)

colo_db_config = {}
colo_db_config.update(connection_options)
Expand All @@ -359,6 +333,30 @@ def _set_colocated_db_settings(

self.run_settings.colocated_db_settings = colo_db_config

@staticmethod
def _create_pinning_string(
pin_ids: t.Optional[t.Iterable[t.Union(int, t.Iterable[int])]],
cpus: int
):
"""Create a comma-separated string CPU ids. By default, None returns
0,1,...,cpus-1; an empty iterable will disable pinning altogether,
and an iterable constructs a comma separate string (e.g. 0,2,5)
"""
if pin_ids is None:
return ','.join(str(i) for i in range(cpus))
elif not pin_ids:
return None
elif isinstance(pin_ids, collections.abc.Iterable):
pin_list = []
for i in pin_ids:
if isinstance(i, collections.abc.Iterable):
pin_list.extend([str(j) for j in i])
else:
pin_list.append(str(i))
return ','.join(pin_list)
else:
raise TypeError("pin_ids must be an iterable of ints")

def params_to_args(self) -> None:
"""Convert parameters to command line arguments and update run settings."""
if self.params_as_args is not None:
Expand Down
9 changes: 0 additions & 9 deletions tests/backends/test_dbmodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ def test_colocated_db_model_pytorch(fileutils, wlmutils):
colo_model.colocate_db(
port=wlmutils.get_test_port(),
db_cpus=1,
limit_db_cpus=False,
debug=True,
ifname="lo",
)
Expand Down Expand Up @@ -412,7 +411,6 @@ def test_colocated_db_model_ensemble(fileutils, wlmutils):
colo_model.colocate_db(
port=wlmutils.get_test_port(),
db_cpus=1,
limit_db_cpus=False,
debug=True,
ifname="lo",
)
Expand All @@ -424,7 +422,6 @@ def test_colocated_db_model_ensemble(fileutils, wlmutils):
entity.colocate_db(
port=wlmutils.get_test_port() + i,
db_cpus=1,
limit_db_cpus=False,
debug=True,
ifname="lo",
)
Expand Down Expand Up @@ -454,7 +451,6 @@ def test_colocated_db_model_ensemble(fileutils, wlmutils):
colo_model.colocate_db(
port=wlmutils.get_test_port() + len(colo_ensemble),
db_cpus=1,
limit_db_cpus=False,
debug=True,
ifname="lo",
)
Expand Down Expand Up @@ -508,7 +504,6 @@ def test_colocated_db_model_ensemble_reordered(fileutils, wlmutils):
entity.colocate_db(
wlmutils.get_test_port() + i,
db_cpus=1,
limit_db_cpus=False,
debug=True,
ifname="lo",
)
Expand All @@ -527,7 +522,6 @@ def test_colocated_db_model_ensemble_reordered(fileutils, wlmutils):
colo_model.colocate_db(
port=wlmutils.get_test_port() + len(colo_ensemble),
db_cpus=1,
limit_db_cpus=False,
debug=True,
ifname="lo",
)
Expand Down Expand Up @@ -564,7 +558,6 @@ def test_colocated_db_model_errors(fileutils, wlmutils):
colo_model.colocate_db(
port=wlmutils.get_test_port(),
db_cpus=1,
limit_db_cpus=False,
debug=True,
ifname="lo",
)
Expand All @@ -584,7 +577,6 @@ def test_colocated_db_model_errors(fileutils, wlmutils):
entity.colocate_db(
port=wlmutils.get_test_port() + i,
db_cpus=1,
limit_db_cpus=False,
debug=True,
ifname="lo",
)
Expand Down Expand Up @@ -613,7 +605,6 @@ def test_colocated_db_model_errors(fileutils, wlmutils):
entity.colocate_db(
port=wlmutils.get_test_port() + i,
db_cpus=1,
limit_db_cpus=False,
debug=True,
ifname="lo",
)
Expand Down
8 changes: 0 additions & 8 deletions tests/backends/test_dbscript.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ def test_colocated_db_script(fileutils, wlmutils):
colo_model.colocate_db(
port=wlmutils.get_test_port(),
db_cpus=1,
limit_db_cpus=False,
debug=True,
ifname="lo",
)
Expand Down Expand Up @@ -204,7 +203,6 @@ def test_colocated_db_script_ensemble(fileutils, wlmutils):
entity.colocate_db(
port=wlmutils.get_test_port() + i,
db_cpus=1,
limit_db_cpus=False,
debug=True,
ifname="lo",
)
Expand All @@ -214,7 +212,6 @@ def test_colocated_db_script_ensemble(fileutils, wlmutils):
colo_model.colocate_db(
port=wlmutils.get_test_port() + len(colo_ensemble),
db_cpus=1,
limit_db_cpus=False,
debug=True,
ifname="lo",
)
Expand Down Expand Up @@ -268,7 +265,6 @@ def test_colocated_db_script_ensemble_reordered(fileutils, wlmutils):
entity.colocate_db(
port=wlmutils.get_test_port() + i,
db_cpus=1,
limit_db_cpus=False,
debug=True,
ifname="lo",
)
Expand All @@ -278,7 +274,6 @@ def test_colocated_db_script_ensemble_reordered(fileutils, wlmutils):
colo_model.colocate_db(
port=wlmutils.get_test_port() + len(colo_ensemble),
db_cpus=1,
limit_db_cpus=False,
debug=True,
ifname="lo",
)
Expand Down Expand Up @@ -315,7 +310,6 @@ def test_db_script_errors(fileutils, wlmutils):
colo_model.colocate_db(
port=wlmutils.get_test_port(),
db_cpus=1,
limit_db_cpus=False,
debug=True,
ifname="lo",
)
Expand All @@ -335,7 +329,6 @@ def test_db_script_errors(fileutils, wlmutils):
entity.colocate_db(
port=wlmutils.get_test_port() + i,
db_cpus=1,
limit_db_cpus=False,
debug=True,
ifname="lo",
)
Expand All @@ -358,7 +351,6 @@ def test_db_script_errors(fileutils, wlmutils):
entity.colocate_db(
port=wlmutils.get_test_port() + i,
db_cpus=1,
limit_db_cpus=False,
debug=True,
ifname="lo",
)
Expand Down
Loading

0 comments on commit b2b4ebe

Please sign in to comment.