Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Co-located Orchestrator #139

Merged
merged 85 commits into from
Feb 11, 2022
Merged

Co-located Orchestrator #139

merged 85 commits into from
Feb 11, 2022

Conversation

Spartee
Copy link
Contributor

@Spartee Spartee commented Feb 5, 2022

This PR introduces a long awaited feature: database colocation. With this feature, users will be able to launch their workload on HPC systems with a single Redis/KeyDB shard placed on each compute node their application is using. This is specifically geared towards tightly coupled, performant, online inference.

As the database is not clustered, locality must be taken into consideration when using this approach. Each MPI rank will be able to interact with the rank data local to the compute node they are running on. This approach is called locality based inference. Typically in workflows that use online inference in SmartSim, such as (https://arxiv.org/abs/2104.09355), each MPI rank is performing some inference with data local to that rank, hence locality based inference.

A new method Model.colocate_db can be used to add a co-located orchestrator to any instantiated model. The full interface looks like

class Model:

    def colocate_db(self,
                    port=6379,
                    db_cpus=1,
                    limit_app_cpus=True,
                    ifname="lo",
                    debug=False,
                    **kwargs):
        """Colocate an Orchestrator instance with this Model at runtime.

        This method will initialize settings which add an unsharded (not connected)
        database to this Model instance. Only this Model will be able to communicate
        with this colocated database by using the loopback TCP interface or Unix
        Domain sockets (UDS coming soon).

        Extra parameters for the db can be passed through kwargs. This includes
        many performance, caching and inference settings.

        ex. kwargs = {
            maxclients: 100000,
            threads_per_queue: 1,
            inter_op_threads: 1,
            intra_op_threads: 1,
            server_threads: 2 # keydb only
        }

        Generally these don't need to be changed.

        :param port: port to use for orchestrator database, defaults to 6379
        :type port: int, optional
        :param db_cpus: number of cpus to use for orchestrator, defaults to 1
        :type db_cpus: int, optional
        :param limit_app_cpus: whether to limit the number of cpus used by the app, defaults to True
        :type limit_app_cpus: bool, optional
        :param ifname: interface to use for orchestrator, defaults to "lo"
        :type ifname: str, optional
        :param debug: launch Model with extra debug information about the colocated db
        :type debug: bool, optional
        :param kwargs: additional keyword arguments to pass to the orchestrator database
        :type kwargs: dict, optional
"""

This feature is accomplished through launching with a SmartSim entrypoint which is a new concept introduced in this Pr. A SmartSim entrypoint is a python module with main function for starting or performing a specific task. The entrypoint used to start a colocated database is called with python -m smartsim._core.entrypoints.colocated. The typical Orchestrator and Ray starter scripts have also been migrated to entrypoints as well.

In the Step creation, a wrapper script is created that

  • starts the database as a daemon using the entrypoint (pinned to the last db_cpus cpus)
  • then starts the user job (optionally pinned to all cpus but the last db_cpus cpus)
  • lastly, terminates the database when the user job has completed.

A full example of creating a colocated database model is as follows:

from smartsim import Experiment
exp = Experiment("colo-test", launcher="auto")

colo_settings = exp.create_run_settings(exe=my_app_binary)

colo_model = exp.create_model("colocated_model", colo_settings)
colo_model.colocate_db(
        port=6780,
        db_cpus=1,
        debug=False
        limit_app_cpus=False,
        ifname=network_interface
)
exp.start(colo_model)

This PR builds off the refactor work done in (#134). The changes there are:

  • Orchestrator class now is a generic class which can launch any orchestrator on supported launchers, with supported run commands. For example, instead of instantiating a SlurmOrchestrator(), users will instantiate Orchestator(launcher='slurm') or even Orchestrator(launcher='auto').
  • Orchestrator class has a new argument, single_cmd which is used to launch all shards with a single command, using the MPMD mechanism available for every run command (srun, jsrun, aprun, mpirun).
  • All old Orchestrator classes now call the parent Orchestrator __init__ function with the correct launcher arg.
  • A new function Experiment.create_database can be used to create an Orchestrator, similar to what can be done to create an ensemble, a model, and so on.
  • The Orchestrator.set_hosts function now only sets the hosts to each DBNode (or to each MPMD run settings) but not to the Orchestrator itself. The reason is that the host of each DBNode is what is used to launch the corresponding redis server, but the name or address needed by the Orchestrator is instead the one linked to the interface the redis server is bound to: if they differ, setting this with the name of the node can result in an error. We now always rely on parsing the output from the redisstarter.py script.
  • Parsing redisstarter.py output for the MPMD case has now two possibilities: either look for N IP addresses in one output (where N is the number of shards) or look for N output files, each one containing one IP address. Most run commands rely on the first mechanism, LSF relies on the second one.
  • The size of the Orchestrator is not obtained as len(Orchestrator) anymore, as this would not work for MPMD instances. We assign the shard count to it.
  • database_per_host is now removed, and only 1 db per node is supported.

TODO

  • Decide where and how all the Orchestrator arguments should be documented, considering that some Orchestrators won't need all of them (e.g. LSF needs gpus_per_shard and cpus_per_shard).
  • Decide whether tests for old orchestrators should be kept around as long as they are only deprecated.
  • Implement colocation with other run settings
    - mpirun
    - jsrun
    - aprun
  • Create tests that launch co-located models in a batch.
  • forbid users from running mpmd workloads with colocated models (raise error)
  • clean up output from Experiment._launch_summary with regards to colocated settings
  • Update documentation and examples

Sam Partee and others added 28 commits February 6, 2022 23:15
The scaling tests used the old Experiment.summary()
method to obtain job data. Since we decided to refactor
that to remove the pandas requirement, get_job_data
was introduced to obtain data from the Controller
instance.

We hide job data from the user intentionally, which
is why the Controller is a private variable in the
Experiment, but this method will be expanded upon
later to provide more job data (with filters) once
we have a reliable, persistent accross experiment run,
database.

Experiment._launch_summary() was also refactor to use
the logger instead prints and handle the co-located
Model case.
For the local launcher, the colocated db
process was not being killed correctly.

we now trap and cleanup the database process
in the bash script so we can be sure that despite
any exit code, the database will be terminated.
Add SbatchSettings.set_cpus_per_task which seems
to have been mistakenly taken out
Adds 1 test for colocated models, and
removed some tests that were unnessary.

spotted we were overridding the repr methods
in the base classes of some entities which
we shouldn't so that was removed as well
This option turns off the db log file which results
in a massive performance boost on shared(networked)
 file systems
launching colocated models with mpirunsettings is
now supported with this commit
This commit introduces the interrupt strategy
needed to handle jobs and tasks during a keyboard
interrupt.

follow ups will include the parameter to kill
all tasks on interrupt. this commit is primarily
geared towards informing users of colocated tasks
that may still be running when an interrupt is
triggered.

A few changes were also made to the colocated launch
script as it seems psutil does not support cpu pinning
on MacOS
Add orchestrator methods
@Spartee Spartee merged commit 56bfea2 into CrayLabs:develop Feb 11, 2022
@Spartee Spartee mentioned this pull request Feb 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: orchestrator Issues related to the Ochestrator API, launch, and runtime area: settings Issues related to Batch or Run settings type: feature Issues that include feature request or feature idea
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants