Skip to content

Commit

Permalink
Domain Socket Support for co-located databases (#246)
Browse files Browse the repository at this point in the history
Co-located databases (i.e. a database deployed on the same node as the 
application) can now be configured to accept connections over Unix Domain
sockets instead of the loopback interface. This yields performance benefits
in the form of lower latency communication. Note that this PR depends on 
developments that were added to SmartRedis.

[ committed by @ashao ]
[ reviewed by @MattToast ]
  • Loading branch information
ashao authored Jan 28, 2023
1 parent 473566f commit 7bb68f3
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 53 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/run_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ jobs:
if: contains( matrix.os, 'ubuntu' ) && matrix.py_v == 3.9 && matrix.rai == '1.2.5'
run: singularity pull docker://alrigazzi/smartsim-testing

# Note: The develop branch of smartredis is installed first to ensure that any tests that depend
# on developments of the client are brought in.
- name: Install SmartSim (with ML backends)
run: python -m pip install .[dev,ml,ray]
run: |
python -m pip install git+https://github.com/CrayLabs/SmartRedis.git@develop#egg=smartredis
python -m pip install .[dev,ml,ray]
- name: Install ML Runtimes with Smart
if: contains( matrix.os, 'macos' )
Expand Down
19 changes: 15 additions & 4 deletions smartsim/_core/control/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,10 +460,21 @@ def _prep_entity_client_env(self, entity):
client_env["SSKEYOUT"] = entity.name

# Set address to local if it's a colocated model
if hasattr(entity, "colocated"):
if entity.colocated:
port = entity.run_settings.colocated_db_settings["port"]
client_env["SSDB"] = f"127.0.0.1:{str(port)}"
if hasattr(entity, "colocated") and entity.colocated:
port = entity.run_settings.colocated_db_settings.get("port",None)
socket = entity.run_settings.colocated_db_settings.get("unix_socket",None)
if socket and port:
raise SSInternalError(
"Co-located was configured for both TCP/IP and UDS"
)
if port:
client_env["SSDB"] = f"127.0.0.1:{str(port)}"
elif socket:
client_env["SSDB"] = f"unix://{socket}"
else:
raise SSInternalError(
"Colocated database was not configured for either TCP or UDS"
)
entity.run_settings.update_env(client_env)

def _save_orchestrator(self, orchestrator):
Expand Down
6 changes: 4 additions & 2 deletions smartsim/_core/entrypoints/colocated.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ def main(
global DBPID

try:
ip_address = current_ip(network_interface)
ip_address = None
if network_interface:
ip_address = current_ip(network_interface)
lo_address = current_ip("lo")
except ValueError as e:
logger.warning(e)
Expand Down Expand Up @@ -269,7 +271,7 @@ def cleanup():
prefix_chars="+", description="SmartSim Process Launcher"
)
parser.add_argument(
"+ifname", type=str, help="Network Interface name", default="lo"
"+ifname", type=str, help="Network Interface name", default=""
)
parser.add_argument(
"+lockfile", type=str, help="Filename to create for single proc per host"
Expand Down
45 changes: 29 additions & 16 deletions smartsim/_core/launcher/colocated.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import sys

from ...error import SSUnsupportedError
from ...error import SSUnsupportedError, SSInternalError
from ..config import CONFIG
from ..utils.helpers import create_lockfile_name

Expand All @@ -45,7 +45,7 @@ def write_colocated_launch_script(file_name, db_log, colocated_settings):
:type colocated_settings: dict[str, Any]
"""

colocated_cmd = _build_colocated_wrapper_cmd(**colocated_settings, db_log=db_log)
colocated_cmd = _build_colocated_wrapper_cmd(db_log, **colocated_settings)

with open(file_name, "w") as f:
f.write("#!/bin/bash\n")
Expand Down Expand Up @@ -75,22 +75,18 @@ def write_colocated_launch_script(file_name, db_log, colocated_settings):


def _build_colocated_wrapper_cmd(
port=6780,
db_log,
cpus=1,
interface="lo",
rai_args=None,
extra_db_args=None,
db_log=None,
port=6780,
ifname=None,
**kwargs,
):
"""Build the command use to run a colocated db application
:param port: db port, defaults to 6780
:type port: int, optional
:param cpus: db cpus, defaults to 1
:type cpus: int, optional
:param interface: network interface, defaults to "lo"
:type interface: str, optional
:param rai_args: redisai args, defaults to None
:type rai_args: dict[str, str], optional
:param extra_db_args: extra redis args, defaults to None
Expand All @@ -113,26 +109,44 @@ def _build_colocated_wrapper_cmd(
sys.executable,
"-m",
"smartsim._core.entrypoints.colocated",
"+ifname",
interface,
"+lockfile",
lockfile,
"+db_cpus",
str(cpus),
"+command",
]

# Add in the interface if using TCP/IP
if ifname:
cmd.extend(["+ifname", ifname])
cmd.append("+command")
# collect DB binaries and libraries from the config
db_cmd = [CONFIG.database_exe, CONFIG.database_conf, "--loadmodule", CONFIG.redisai]

# add extra redisAI configurations
for arg, value in rai_args.items():
if value:
# RAI wants arguments for inference in all caps
# ex. THREADS_PER_QUEUE=1
db_cmd.append(f"{arg.upper()} {str(value)}")

# add port and log information
db_cmd.extend(["--port", str(port), "--logfile", db_log]) # usually /dev/null
db_cmd.extend(["--port", str(port)])

# Add socket and permissions for UDS
unix_socket = kwargs.get("unix_socket", None)
socket_permissions = kwargs.get("socket_permissions", None)

if unix_socket and socket_permissions:
db_cmd.extend(
[
"--unixsocket", str(unix_socket),
"--unixsocketperm", str(socket_permissions)
]
)
elif bool(unix_socket) ^ bool(socket_permissions):
raise SSInternalError(
"`unix_socket` and `socket_permissions` must both be defined or undefined."
)

db_cmd.extend(["--logfile", db_log]) # usually /dev/null, unless debug was specified
for db_arg, value in extra_db_args.items():
# replace "_" with "-" in the db_arg because we use kwargs
# for the extra configurations and Python doesn't allow a hyphen
Expand All @@ -157,7 +171,6 @@ def _build_colocated_wrapper_cmd(
cmd.extend(db_cmd)
return " ".join(cmd)


def _build_db_model_cmd(db_models):
cmd = []
for db_model in db_models:
Expand Down
123 changes: 102 additions & 21 deletions smartsim/entity/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from .entity import SmartSimEntity
from .files import EntityFiles

import warnings

class Model(SmartSimEntity):
def __init__(self, name, params, path, run_settings, params_as_args=None):
Expand Down Expand Up @@ -128,21 +129,87 @@ def attach_generator_files(self, to_copy=None, to_symlink=None, to_configure=Non
to_configure = init_default([], to_configure, (list, str))
self.files = EntityFiles(to_configure, to_copy, to_symlink)

def colocate_db(
def colocate_db(self, *args, **kwargs):
warnings.warn(
(
"`colocate_db` has been deprecated and will be removed in a \n"
"future release. Please use `colocate_db_tcp` or `colocate_db_uds`."
),
category=DeprecationWarning
)
self.colocate_db_tcp(*args, **kwargs)

def colocate_db_uds(
self,
port=6379,
unix_socket="/tmp/redis.socket",
socket_permissions=755,
db_cpus=1,
limit_app_cpus=True,
debug=False,
**kwargs,
):
"""Colocate an Orchestrator instance with this Model over UDS.
This method will initialize settings which add an unsharded
database to this Model instance. Only this Model will be able to communicate
with this colocated database by using Unix Domain sockets.
Extra parameters for the db can be passed through kwargs. This includes
many performance, caching and inference settings.
.. highlight:: python
.. code-block:: python
example_kwargs = {
"maxclients": 100000,
"threads_per_queue": 1,
"inter_op_threads": 1,
"intra_op_threads": 1,
"server_threads": 2 # keydb only
}
Generally these don't need to be changed.
:param unix_socket: path to where the socket file will be created
:type unix_socket: str, optional
:param socket_permissions: permissions for the socketfile
:type socket_permissions: int, optional
:param db_cpus: number of cpus to use for orchestrator, defaults to 1
:type db_cpus: int, optional
:param limit_app_cpus: whether to limit the number of cpus used by the app, defaults to True
:type limit_app_cpus: bool, optional
:param debug: launch Model with extra debug information about the co-located db
:type debug: bool, optional
:param kwargs: additional keyword arguments to pass to the orchestrator database
:type kwargs: dict, optional
"""

uds_options = {
"unix_socket":unix_socket,
"socket_permissions":socket_permissions,
"port":0 # This is hardcoded to 0 as recommended by redis for UDS
}
common_options = {
"cpus":db_cpus,
"limit_app_cpus":limit_app_cpus,
"debug":debug
}
self._set_colocated_db_settings( uds_options, common_options, **kwargs)

def colocate_db_tcp(
self,
port=6379,
ifname="lo",
db_cpus=1,
limit_app_cpus=True,
debug=False,
**kwargs,
):
"""Colocate an Orchestrator instance with this Model at runtime.
"""Colocate an Orchestrator instance with this Model over TCP/IP.
This method will initialize settings which add an unsharded (not connected)
This method will initialize settings which add an unsharded
database to this Model instance. Only this Model will be able to communicate
with this colocated database by using the loopback TCP interface or Unix
Domain sockets (UDS coming soon).
with this colocated database by using the loopback TCP interface.
Extra parameters for the db can be passed through kwargs. This includes
many performance, caching and inference settings.
Expand All @@ -162,39 +229,53 @@ def colocate_db(
:param port: port to use for orchestrator database, defaults to 6379
:type port: int, optional
:param ifname: interface to use for orchestrator, defaults to "lo"
:type ifname: str, optional
:param db_cpus: number of cpus to use for orchestrator, defaults to 1
:type db_cpus: int, optional
:param limit_app_cpus: whether to limit the number of cpus used by the app, defaults to True
:type limit_app_cpus: bool, optional
:param ifname: interface to use for orchestrator, defaults to "lo"
:type ifname: str, optional
:param debug: launch Model with extra debug information about the co-located db
:type debug: bool, optional
:param kwargs: additional keyword arguments to pass to the orchestrator database
:type kwargs: dict, optional
"""

tcp_options = {
"port":port,
"ifname":ifname
}
common_options = {
"cpus":db_cpus,
"limit_app_cpus":limit_app_cpus,
"debug":debug
}
self._set_colocated_db_settings( tcp_options, common_options, **kwargs)

def _set_colocated_db_settings(self, connection_options, common_options, **kwargs):
"""
Ingest the connection-specific options (UDS/TCP) and set the final settings
for the co-located database
"""

if hasattr(self.run_settings, "mpmd") and len(self.run_settings.mpmd) > 0:
raise SSUnsupportedError(
"Models co-located with databases cannot be run as a mpmd workload"
)

if hasattr(self.run_settings, "_prep_colocated_db"):
self.run_settings._prep_colocated_db(db_cpus)
self.run_settings._prep_colocated_db(common_options['db_cpus'])

# TODO list which db settings can be extras
colo_db_config = {
"port": int(port),
"cpus": int(db_cpus),
"interface": ifname,
"limit_app_cpus": limit_app_cpus,
"debug": debug,
# redisai arguments for inference settings
"rai_args": {
"threads_per_queue": kwargs.get("threads_per_queue", None),
"inter_op_parallelism": kwargs.get("inter_op_parallelism", None),
"intra_op_parallelism": kwargs.get("intra_op_parallelism", None),
},
colo_db_config = {}
colo_db_config.update(connection_options)
colo_db_config.update(common_options)
# redisai arguments for inference settings
colo_db_config['rai_args'] = {
"threads_per_queue": kwargs.get("threads_per_queue", None),
"inter_op_parallelism": kwargs.get("inter_op_parallelism", None),
"intra_op_parallelism": kwargs.get("intra_op_parallelism", None),
}
colo_db_config["extra_db_args"] = dict(
[
Expand Down
27 changes: 22 additions & 5 deletions tests/on_wlm/test_colocated_model.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import sys

import pytest
import warnings

from smartsim import Experiment, status

# retrieved from pytest fixtures
if pytest.test_launcher not in pytest.wlm_options:
pytestmark = pytest.mark.skip(reason="Not testing WLM integrations")


def test_launch_colocated_model(fileutils, wlmutils):
@pytest.mark.parametrize("db_type", ["uds","tcp","deprecated"])
def test_launch_colocated_model(fileutils, wlmutils, db_type):
"""Test the launch of a model with a colocated database"""

launcher = wlmutils.get_test_launcher()
Expand All @@ -27,9 +28,25 @@ def test_launch_colocated_model(fileutils, wlmutils):

colo_model = exp.create_model("colocated_model", colo_settings)
colo_model.set_path(test_dir)
colo_model.colocate_db(
port=6780, db_cpus=1, limit_app_cpus=False, debug=True, ifname=network_interface
)
db_args = {
"db_cpus":1,
"limit_app_cpus":False,
"debug":True,
}

if db_type in ["tcp", "deprecated"]:
colocate_fun = {
"tcp":colo_model.colocate_db_tcp,
"deprecated":colo_model.colocate_db
}
with warnings.catch_warnings(record=True) as w:
colocate_fun[db_type](port=6780, ifname="lo", **db_args)
if db_type == "deprecated":
assert len(w) == 1
assert issubclass(w[-1].category, DeprecationWarning)
assert "Please use `colocate_db_tcp` or `colocate_db_uds`" in str(w[-1].message)
elif db_type == "uds":
colo_model.colocate_db_uds(**db_args)

# assert model will launch with colocated db
assert colo_model.colocated
Expand Down
Loading

0 comments on commit 7bb68f3

Please sign in to comment.