Skip to content

Commit

Permalink
Co-located Orchestrator (#139)
Browse files Browse the repository at this point in the history
This PR introduces a long awaited feature: database colocation.
With this feature, users will be able to launch their workload on
HPC systems with a single Redis/KeyDB shard placed on each
compute node their application is using. This is specifically
geared towards tightly coupled, performant, online inference. 

Interface example of co-located db with a ``Model``.

```python
from smartsim import Experiment
exp = Experiment("colo-test", launcher="auto")

colo_settings = exp.create_run_settings(exe=my_app_binary)

colo_model = exp.create_model("colocated_model", colo_settings)
colo_model.colocate_db(
        port=6780,
        db_cpus=1,
        debug=False
        limit_app_cpus=False,
        ifname=network_interface
)
exp.start(colo_model)
```


[ committed by @Spartee @al-rigazzi @mellis13 ]
[ reviewed by @al-rigazzi ]
  • Loading branch information
Sam Partee authored Feb 11, 2022
1 parent be73de2 commit 56bfea2
Show file tree
Hide file tree
Showing 90 changed files with 3,010 additions and 2,452 deletions.
18 changes: 18 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,24 @@ def get_orchestrator(nodes=1, port=6780, batch=False):
db = Orchestrator(port=port, interface="lo")
return db


@pytest.fixture
def local_db(fileutils, wlmutils, request):
"""Yield fixture for startup and teardown of an local orchestrator"""

exp_name = request.function.__name__
exp = Experiment(exp_name, launcher="local")
test_dir = fileutils.make_test_dir(exp_name)
db = wlmutils.get_orchestrator()
db.set_path(test_dir)
exp.start(db)

yield db
# pass or fail, the teardown code below is ran after the
# completion of a test case that uses this fixture
exp.stop(db)


@pytest.fixture
def db(fileutils, wlmutils, request):
"""Yield fixture for startup and teardown of an orchestrator"""
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ def has_ext_modules(_placeholder):
"tabulate>=0.8.9",
"redis-py-cluster==2.1.3",
"redis==3.5.3",
"tqdm>=4.50.2"
"tqdm>=4.50.2",
"filelock>=3.4.2"
]

# Add SmartRedis at specific version
Expand Down
1 change: 1 addition & 0 deletions smartsim/_core/_install/buildenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ def _torch_import_path():
"""Find through importing torch"""
try:
import torch as t

torch_paths = [Path(p) for p in t.__path__]
for _path in torch_paths:
torch_path = _path / "share/cmake/Torch"
Expand Down
10 changes: 5 additions & 5 deletions smartsim/_core/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@
class Config:
def __init__(self):
# SmartSim/smartsim/_core
core_path = Path(os.path.abspath(__file__)).parent.parent
self.lib_path = Path(core_path, "lib").resolve()
self.bin_path = Path(core_path, "bin").resolve()
self.conf_path = Path(core_path, "config", "redis6.conf")
self.core_path = Path(os.path.abspath(__file__)).parent.parent
self.lib_path = Path(self.core_path, "lib").resolve()
self.bin_path = Path(self.core_path, "bin").resolve()
self.conf_path = Path(self.core_path, "config", "redis6.conf")

@property
def redisai(self) -> str:
Expand Down Expand Up @@ -140,7 +140,7 @@ def log_level(self) -> str:

@property
def jm_interval(self) -> int:
return os.environ.get("SMARTSIM_JM_INTERVAL", 10)
return int(os.environ.get("SMARTSIM_JM_INTERVAL", 10))

@property
def test_launcher(self) -> str:
Expand Down
8 changes: 4 additions & 4 deletions smartsim/_core/config/keydb.conf
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pidfile /var/run/redis_6379.pid
# verbose (many rarely useful info, but not a mess like the debug level)
# notice (moderately verbose, what you want in production probably)
# warning (only very important / critical messages are logged)
loglevel verbose
loglevel notice

# Specify the log file name. Also the empty string can be used to force
# Redis to log on the standard output. Note that if you use standard
Expand All @@ -183,7 +183,7 @@ logfile ""
# Set the number of databases. The default database is DB 0, you can select
# a different one on a per-connection basis using SELECT <dbid> where
# dbid is a number between 0 and 'databases'-1
databases 16
databases 1

# By default Redis shows an ASCII art logo only when started to log to the
# standard output and if the standard output is a TTY. Basically this means
Expand Down Expand Up @@ -213,7 +213,7 @@ always-show-logo yes
# points by adding a save directive with a single empty string argument
# like in the following example:
#
# save ""
save ""

#save 900 1
#save 300 10
Expand Down Expand Up @@ -1551,7 +1551,7 @@ rdb-save-incremental-fsync yes
# Number of worker threads serving requests. This number should be related to the performance
# of your network hardware, not the number of cores on your machine. We don't recommend going
# above 4 at this time. By default this is set 1.
#server-threads 4
#server-threads 1

# Should KeyDB pin threads to CPUs? By default this is disabled, and KeyDB will not bind threads.
# When enabled threads are bount to cores sequentially starting at core 0.
Expand Down
95 changes: 70 additions & 25 deletions smartsim/_core/control/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,24 @@ def start(self, manifest, block=True):
The controller will start the job-manager thread upon
execution of all jobs.
"""
self._launch(manifest)
try:
self._launch(manifest)

# start the job manager thread if not already started
if not self._jobs.actively_monitoring:
self._jobs.start()

# start the job manager thread if not already started
if not self._jobs.actively_monitoring:
self._jobs.start()
except KeyboardInterrupt:
self._jobs.signal_interrupt()
raise

# block until all non-database jobs are complete
if block:
# poll handles it's own keyboard interrupt as
# it may be called seperately
self.poll(5, True)


@property
def orchestrator_active(self):
JM_LOCK.acquire()
Expand All @@ -97,19 +105,25 @@ def poll(self, interval, verbose):
:param verbose: set verbosity
:type verbose: bool
"""
to_monitor = self._jobs.jobs
while len(to_monitor) > 0:
time.sleep(interval)

# acquire lock to avoid "dictionary changed during iteration" error
# without having to copy dictionary each time.
if verbose:
JM_LOCK.acquire()
try:
for job in to_monitor.values():
logger.info(job)
finally:
JM_LOCK.release()
try:
to_monitor = self._jobs.jobs
while len(to_monitor) > 0:
time.sleep(interval)

# acquire lock to avoid "dictionary changed during iteration" error
# without having to copy dictionary each time.
if verbose:
JM_LOCK.acquire()
try:
for job in to_monitor.values():
logger.info(job)
finally:
JM_LOCK.release()

except KeyboardInterrupt:
self._jobs.signal_interrupt()
raise


def finished(self, entity):
"""Return a boolean indicating wether a job has finished or not
Expand Down Expand Up @@ -179,6 +193,17 @@ def stop_entity_list(self, entity_list):
for entity in entity_list.entities:
self.stop_entity(entity)

def get_jobs(self):
"""Return a dictionary of completed job data
:returns: dict[str, Job]
"""
JM_LOCK.acquire()
try:
return self._jobs.completed
finally:
JM_LOCK.release()

def get_entity_status(self, entity):
"""Get the status of an entity
Expand Down Expand Up @@ -254,7 +279,9 @@ def _launch(self, manifest):
"""
orchestrator = manifest.db
if orchestrator:
if len(orchestrator) > 1 and isinstance(self._launcher, LocalLauncher):
if orchestrator.num_shards > 1 and isinstance(
self._launcher, LocalLauncher
):
raise SmartSimError(
"Local launcher does not support multi-host orchestrators"
)
Expand All @@ -280,8 +307,9 @@ def _launch(self, manifest):
steps.extend(job_steps)

# models themselves cannot be batch steps
job_steps = [(self._create_job_step(e), e) for e in manifest.models]
steps.extend(job_steps)
for model in manifest.models:
job_step = self._create_job_step(model)
steps.append((job_step, model))

# launch steps
for job_step in steps:
Expand Down Expand Up @@ -361,12 +389,17 @@ def _launch_step(self, job_step, entity):
logger.error(msg)
raise SmartSimError(f"Job step {entity.name} failed to launch") from e

# a job step is a task if it is not managed by a workload manager (i.e. Slurm)
# but is rather started, monitored, and exited through the Popen interface
# in the taskmanager
is_task = not job_step.managed

if self._jobs.query_restart(entity.name):
logger.debug(f"Restarting {entity.name}")
self._jobs.restart_job(job_step.name, job_id, entity.name)
self._jobs.restart_job(job_step.name, job_id, entity.name, is_task)
else:
logger.debug(f"Launching {entity.name}")
self._jobs.add_job(job_step.name, job_id, entity)
self._jobs.add_job(job_step.name, job_id, entity, is_task)

def _create_batch_job_step(self, entity_list):
"""Use launcher to create batch job step
Expand Down Expand Up @@ -423,8 +456,15 @@ def _prep_entity_client_env(self, entity):
)
if entity.query_key_prefixing():
client_env["SSKEYOUT"] = entity.name

# Set address to local if it's a colocated model
if hasattr(entity, "colocated"):
if entity.colocated:
port = entity.run_settings.colocated_db_settings["port"]
client_env["SSDB"] = f"127.0.0.1:{str(port)}"
entity.run_settings.update_env(client_env)


def _save_orchestrator(self, orchestrator):
"""Save the orchestrator object via pickle
Expand Down Expand Up @@ -484,11 +524,16 @@ def _orchestrator_launch_wait(self, orchestrator):
raise SmartSimError(msg)
else:
logger.debug("Waiting for orchestrator instances to spin up...")
except KeyboardInterrupt as e:
except KeyboardInterrupt:

logger.info("Orchestrator launch cancelled - requesting to stop")
self.stop_entity_list(orchestrator)
raise SmartSimError("Orchestrator launch manually stopped") from e
# TODO stop all running jobs here?

# re-raise keyboard interrupt so the job manager will display
# any running and un-killed jobs as this method is only called
# during launch and we handle all keyboard interrupts during
# launch explicitly
raise

def reload_saved_db(self, checkpoint_file):
JM_LOCK.acquire()
Expand Down
13 changes: 11 additions & 2 deletions smartsim/_core/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class Job:
the controller class.
"""

def __init__(self, job_name, job_id, entity):
def __init__(self, job_name, job_id, entity, launcher, is_task):
"""Initialize a Job.
:param job_name: Name of the job step
Expand All @@ -45,6 +45,10 @@ def __init__(self, job_name, job_id, entity):
:type job_id: str
:param entity: The SmartSim entity associated with the job
:type entity: SmartSimEntity
:param launcher: Launcher job was started with
:type launcher: str
:param is_task: process monitored by TaskManager (True) or the WLM (True)
:type is_task: bool
"""
self.name = job_name
self.jid = job_id
Expand All @@ -55,6 +59,8 @@ def __init__(self, job_name, job_id, entity):
self.output = None # only populated if it's system related (e.g. a command failed immediately)
self.error = None # same as output
self.hosts = [] # currently only used for DB jobs
self.launched_with = launcher
self.is_task = is_task
self.start_time = time.time()
self.history = History()

Expand Down Expand Up @@ -82,13 +88,15 @@ def record_history(self):
job_time = time.time() - self.start_time
self.history.record(self.jid, self.status, self.returncode, job_time)

def reset(self, new_job_name, new_job_id):
def reset(self, new_job_name, new_job_id, is_task):
"""Reset the job in order to be able to restart it.
:param new_job_name: name of the new job step
:type new_job_name: str
:param new_job_id: new job id to launch under
:type new_job_id: str
:param is_task: process monitored by TaskManager (True) or the WLM (True)
:type is_task: bool
"""
self.name = new_job_name
self.jid = new_job_id
Expand All @@ -97,6 +105,7 @@ def reset(self, new_job_name, new_job_id):
self.output = None
self.error = None
self.hosts = []
self.is_task = is_task
self.start_time = time.time()
self.history.new_run()

Expand Down
Loading

0 comments on commit 56bfea2

Please sign in to comment.