From 8798c28066ae95b557e519453b01bb9442d05c2f Mon Sep 17 00:00:00 2001 From: Al Rigazzi Date: Wed, 11 May 2022 16:36:23 +0200 Subject: [PATCH] Set ML models and scripts from driver scripts (#185) Adds utilities to set ML models and ML scripts directly from driver scripts, as opposed to only from application code. [ committed by @al-rigazzi ] [ reviewed by @Spartee ] --- smartsim/_core/control/controller.py | 54 ++- smartsim/_core/control/manifest.py | 44 ++ smartsim/_core/entrypoints/colocated.py | 124 +++++- smartsim/_core/launcher/colocated.py | 68 +++- smartsim/_core/utils/__init__.py | 2 +- smartsim/_core/utils/redis.py | 106 +++++ smartsim/database/orchestrator.py | 23 +- smartsim/entity/__init__.py | 1 + smartsim/entity/dbobject.py | 204 ++++++++++ smartsim/entity/ensemble.py | 157 +++++++ smartsim/entity/model.py | 182 ++++++++- smartsim/ml/tf/utils.py | 2 + tests/backends/test_dbmodel.py | 384 ++++++++++++++++++ tests/backends/test_dbscript.py | 356 ++++++++++++++++ tests/test_configs/run_dbmodel_smartredis.py | 26 ++ tests/test_configs/run_dbscript_smartredis.py | 31 ++ tests/test_configs/torchscript.py | 4 + 17 files changed, 1738 insertions(+), 30 deletions(-) create mode 100644 smartsim/entity/dbobject.py create mode 100644 tests/backends/test_dbmodel.py create mode 100644 tests/backends/test_dbscript.py create mode 100644 tests/test_configs/run_dbmodel_smartredis.py create mode 100644 tests/test_configs/run_dbscript_smartredis.py create mode 100644 tests/test_configs/torchscript.py diff --git a/smartsim/_core/control/controller.py b/smartsim/_core/control/controller.py index d3bf2ecfb..459e4365a 100644 --- a/smartsim/_core/control/controller.py +++ b/smartsim/_core/control/controller.py @@ -30,8 +30,9 @@ import threading import time +from ..._core.utils.redis import db_is_active, set_ml_model, set_script from ...database import Orchestrator -from ...entity import DBNode, EntityList, SmartSimEntity +from ...entity import DBNode, DBModel, DBObject, DBScript, EntityList, SmartSimEntity from ...error import LauncherError, SmartSimError, SSInternalError, SSUnsupportedError from ...log import get_logger from ...status import STATUS_RUNNING, TERMINAL_STATUSES @@ -40,6 +41,10 @@ from ..utils import check_cluster_status, create_cluster from .jobmanager import JobManager +from smartredis import Client +from smartredis.error import RedisConnectionError, RedisReplyError + + logger = get_logger(__name__) # job manager lock @@ -286,9 +291,12 @@ def _launch(self, manifest): raise SmartSimError(msg) self._launch_orchestrator(orchestrator) - for rc in manifest.ray_clusters: + for rc in manifest.ray_clusters: # cov-wlm rc._update_workers() + if self.orchestrator_active: + self._set_dbobjects(manifest) + # create all steps prior to launch steps = [] all_entity_lists = manifest.ensembles + manifest.ray_clusters @@ -297,7 +305,7 @@ def _launch(self, manifest): batch_step = self._create_batch_job_step(elist) steps.append((batch_step, elist)) else: - # if ensemble is to be run as seperate job steps, aka not in a batch + # if ensemble is to be run as separate job steps, aka not in a batch job_steps = [(self._create_job_step(e), e) for e in elist.entities] steps.extend(job_steps) @@ -586,3 +594,43 @@ def reload_saved_db(self, checkpoint_file): finally: JM_LOCK.release() + + def _set_dbobjects(self, manifest): + if not manifest.has_db_objects: + return + + db_addresses = self._jobs.get_db_host_addresses() + + hosts = list(set([address.split(":")[0] for address in db_addresses])) + ports = list(set([address.split(":")[-1] for address in db_addresses])) + + if not db_is_active(hosts=hosts, + ports=ports, + num_shards=len(db_addresses)): + raise SSInternalError("Cannot set DB Objects, DB is not running") + + client = Client(address=db_addresses[0], cluster=len(db_addresses) > 1) + + for model in manifest.models: + if not model.colocated: + for db_model in model._db_models: + set_ml_model(db_model, client) + for db_script in model._db_scripts: + set_script(db_script, client) + + for ensemble in manifest.ensembles: + for db_model in ensemble._db_models: + set_ml_model(db_model, client) + for db_script in ensemble._db_scripts: + set_script(db_script, client) + for entity in ensemble: + if not entity.colocated: + # Set models which could belong only + # to the entities and not to the ensemble + # but avoid duplicates + for db_model in entity._db_models: + if db_model not in ensemble._db_models: + set_ml_model(db_model, client) + for db_script in entity._db_scripts: + if db_script not in ensemble._db_scripts: + set_script(db_script, client) diff --git a/smartsim/_core/control/manifest.py b/smartsim/_core/control/manifest.py index 00ad10913..652f6f625 100644 --- a/smartsim/_core/control/manifest.py +++ b/smartsim/_core/control/manifest.py @@ -194,3 +194,47 @@ def __str__(self): s += "\n" return s + + @property + def has_db_objects(self): + """Check if any entity has DBObjects to set + """ + + def has_db_models(entity): + if hasattr(entity, "_db_models"): + return len(entity._db_models) > 0 + def has_db_scripts(entity): + if hasattr(entity, "_db_scripts"): + return len(entity._db_scripts) > 0 + + + has_db_objects = False + for model in self.models: + has_db_objects |= hasattr(model, "_db_models") + + # Check if any model has either a DBModel or a DBScript + # we update has_db_objects so that as soon as one check + # returns True, we can exit + has_db_objects |= any([has_db_models(model) | has_db_scripts(model) for model in self.models]) + if has_db_objects: + return True + + # If there are no ensembles, there can be no outstanding model + # to check for DBObjects, return current value of DBObjects, which + # should be False + ensembles = self.ensembles + if not ensembles: + return has_db_objects + + # First check if there is any ensemble DBObject, if so, return True + has_db_objects |= any([has_db_models(ensemble) | has_db_scripts(ensemble) for ensemble in ensembles]) + if has_db_objects: + return True + for ensemble in ensembles: + # Last case, check if any model within an ensemble has DBObjects attached + has_db_objects |= any([has_db_models(model) | has_db_scripts(model) for model in ensemble]) + if has_db_objects: + return True + + # `has_db_objects` should be False here + return has_db_objects \ No newline at end of file diff --git a/smartsim/_core/entrypoints/colocated.py b/smartsim/_core/entrypoints/colocated.py index eaebeb3d7..1cd12f7f9 100644 --- a/smartsim/_core/entrypoints/colocated.py +++ b/smartsim/_core/entrypoints/colocated.py @@ -37,6 +37,8 @@ from pathlib import Path from subprocess import PIPE, STDOUT +from smartredis import Client +from smartredis.error import RedisConnectionError, RedisReplyError from smartsim._core.utils.network import current_ip from smartsim.error import SSInternalError from smartsim.log import get_logger @@ -55,8 +57,107 @@ def handle_signal(signo, frame): cleanup() +def launch_db_model(client: Client, db_model: List[str]): + """Parse options to launch model on local cluster -def main(network_interface: str, db_cpus: int, command: List[str]): + :param client: SmartRedis client connected to local DB + :type client: Client + :param db_model: List of arguments defining the model + :type db_model: List[str] + :return: Name of model + :rtype: str + """ + parser = argparse.ArgumentParser("Set ML model on DB") + parser.add_argument("--name", type=str) + parser.add_argument("--file", type=str) + parser.add_argument("--backend", type=str) + parser.add_argument("--device", type=str) + parser.add_argument("--devices_per_node", type=int) + parser.add_argument("--batch_size", type=int, default=0) + parser.add_argument("--min_batch_size", type=int, default=0) + parser.add_argument("--tag", type=str, default="") + parser.add_argument("--inputs", nargs="+", default=None) + parser.add_argument("--outputs", nargs="+", default=None) + + # Unused if we use SmartRedis + parser.add_argument("--min_batch_timeout", type=int, default=None) + args = parser.parse_args(db_model) + + if args.inputs: + inputs = list(args.inputs) + if args.outputs: + outputs = list(args.outputs) + + if args.devices_per_node == 1: + client.set_model_from_file(args.name, + args.file, + args.backend, + args.device, + args.batch_size, + args.min_batch_size, + args.tag, + inputs, + outputs) + else: + for device_num in range(args.devices_per_node): + client.set_model_from_file(args.name, + args.file, + args.backend, + args.device+f":{device_num}", + args.batch_size, + args.min_batch_size, + args.tag, + inputs, + outputs) + + return args.name + +def launch_db_script(client: Client, db_script: List[str]): + """Parse options to launch script on local cluster + + :param client: SmartRedis client connected to local DB + :type client: Client + :param db_model: List of arguments defining the script + :type db_model: List[str] + :return: Name of model + :rtype: str + """ + parser = argparse.ArgumentParser("Set script on DB") + parser.add_argument("--name", type=str) + parser.add_argument("--func", type=str) + parser.add_argument("--file", type=str) + parser.add_argument("--backend", type=str) + parser.add_argument("--device", type=str) + parser.add_argument("--devices_per_node", type=int) + args = parser.parse_args(db_script) + if args.func: + func = args.func.replace("\\n", "\n") + + if args.devices_per_node == 1: + client.set_script(args.name, + func, + args.device) + else: + for device_num in range(args.devices_per_node): + client.set_script(args.name, + func, + args.device+f":{device_num}") + elif args.file: + if args.devices_per_node == 1: + client.set_script_from_file(args.name, + args.file, + args.device) + else: + for device_num in range(args.devices_per_node): + client.set_script_from_file(args.name, + args.file, + args.device+f":{device_num}") + + + return args.name + + +def main(network_interface: str, db_cpus: int, command: List[str], db_models: List[List[str]], db_scripts: List[List[str]]): global DBPID try: @@ -102,6 +203,23 @@ def main(network_interface: str, db_cpus: int, command: List[str]): f"\tCommand: {' '.join(cmd)}\n\n" ))) + if db_models or db_scripts: + try: + client = Client(cluster=False) + for i, db_model in enumerate(db_models): + logger.debug("Uploading model") + model_name = launch_db_model(client, db_model) + logger.debug(f"Added model {model_name} ({i+1}/{len(db_models)})") + for i, db_script in enumerate(db_scripts): + logger.debug("Uploading script") + script_name = launch_db_script(client, db_script) + logger.debug(f"Added script {script_name} ({i+1}/{len(db_scripts)})") + # Make sure we don't keep this around + del client + except (RedisConnectionError, RedisReplyError): + raise SSInternalError("Failed to set model or script, could not connect to database") + + for line in iter(p.stdout.readline, b""): print(line.decode("utf-8").rstrip(), flush=True) @@ -144,6 +262,8 @@ def cleanup(): parser.add_argument("+lockfile", type=str, help="Filename to create for single proc per host") parser.add_argument("+db_cpus", type=int, default=2, help="Number of CPUs to use for DB") parser.add_argument("+command", nargs="+", help="Command to run") + parser.add_argument("+db_model", nargs="+", action="append", default=[], help="Model to set on DB") + parser.add_argument("+db_script", nargs="+", action="append", default=[], help="Script to set on DB") args = parser.parse_args() tmp_lockfile = Path(tempfile.gettempdir()) / args.lockfile @@ -160,7 +280,7 @@ def cleanup(): for sig in SIGNALS: signal.signal(sig, handle_signal) - main(args.ifname, args.db_cpus, args.command) + main(args.ifname, args.db_cpus, args.command, args.db_model, args.db_script) # gracefully exit the processes in the distributed application that # we do not want to have start a colocated process. Only one process diff --git a/smartsim/_core/launcher/colocated.py b/smartsim/_core/launcher/colocated.py index 223c0943f..0431f3ef0 100644 --- a/smartsim/_core/launcher/colocated.py +++ b/smartsim/_core/launcher/colocated.py @@ -25,7 +25,9 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import sys + from ..config import CONFIG +from ...error import SSUnsupportedError from ..utils.helpers import create_lockfile_name @@ -65,6 +67,7 @@ def write_colocated_launch_script(file_name, db_log, colocated_settings): f.write(f"{colocated_cmd}\n") f.write(f"DBPID=$!\n\n") + if colocated_settings["limit_app_cpus"]: cpus = colocated_settings["cpus"] f.write( @@ -129,7 +132,7 @@ def _build_colocated_wrapper_cmd(port=6780, # add extra redisAI configurations for arg, value in rai_args.items(): if value: - # RAI wants arguments for inference in all capps + # RAI wants arguments for inference in all caps # ex. THREADS_PER_QUEUE=1 db_cmd.append(f"{arg.upper()} {str(value)}") @@ -142,7 +145,7 @@ def _build_colocated_wrapper_cmd(port=6780, ]) for db_arg, value in extra_db_args.items(): # replace "_" with "-" in the db_arg because we use kwargs - # for the extra configurations and Python doesn't allow a hypon + # for the extra configurations and Python doesn't allow a hyphen # in a variable name. All redis and KeyDB configuration options # use hyphens in their names. db_arg = db_arg.replace("_", "-") @@ -150,9 +153,70 @@ def _build_colocated_wrapper_cmd(port=6780, f"--{db_arg}", value ]) + + db_models = kwargs.get("db_models", None) + if db_models: + db_model_cmd = _build_db_model_cmd(db_models) + db_cmd.extend(db_model_cmd) + + db_scripts = kwargs.get("db_scripts", None) + if db_scripts: + db_script_cmd = _build_db_script_cmd(db_scripts) + db_cmd.extend(db_script_cmd) + # run colocated db in the background db_cmd.append("&") cmd.extend(db_cmd) return " ".join(cmd) + +def _build_db_model_cmd(db_models): + cmd = [] + for db_model in db_models: + cmd.append("+db_model") + cmd.append(f"--name={db_model.name}") + + # Here db_model.file is guaranteed to exist + # because we don't allow the user to pass a serialized DBModel + cmd.append(f"--file={db_model.file}") + + cmd.append(f"--backend={db_model.backend}") + cmd.append(f"--device={db_model.device}") + cmd.append(f"--devices_per_node={db_model.devices_per_node}") + if db_model.batch_size: + cmd.append(f"--batch_size={db_model.batch_size}") + if db_model.min_batch_size: + cmd.append(f"--min_batch_size={db_model.min_batch_size}") + if db_model.min_batch_timeout: + cmd.append(f"--min_batch_timeout={db_model.min_batch_timeout}") + if db_model.tag: + cmd.append(f"--tag={db_model.tag}") + if db_model.inputs: + cmd.append("--inputs="+",".join(db_model.inputs)) + if db_model.outputs: + cmd.append("--outputs="+",".join(db_model.outputs)) + + return cmd + + +def _build_db_script_cmd(db_scripts): + cmd = [] + for db_script in db_scripts: + cmd.append("+db_script") + cmd.append(f"--name={db_script.name}") + if db_script.func: + # Notice that here db_script.func is guaranteed to be a str + # because we don't allow the user to pass a serialized function + sanitized_func = db_script.func.replace("\n", "\\n") + if not (sanitized_func.startswith("'") and sanitized_func.endswith("'") + or (sanitized_func.startswith('"') and sanitized_func.endswith('"'))): + sanitized_func = "\"" + sanitized_func + "\"" + cmd.append(f"--func={sanitized_func}") + elif db_script.file: + cmd.append(f"--file={db_script.file}") + cmd.append(f"--device={db_script.device}") + cmd.append(f"--devices_per_node={db_script.devices_per_node}") + + return cmd + \ No newline at end of file diff --git a/smartsim/_core/utils/__init__.py b/smartsim/_core/utils/__init__.py index 211f30e6b..05cfb446e 100644 --- a/smartsim/_core/utils/__init__.py +++ b/smartsim/_core/utils/__init__.py @@ -1,2 +1,2 @@ from .helpers import colorize, delete_elements, init_default, installed_redisai_backends -from .redis import check_cluster_status, create_cluster +from .redis import check_cluster_status, create_cluster, db_is_active diff --git a/smartsim/_core/utils/redis.py b/smartsim/_core/utils/redis.py index eabb87eb0..5659ee0e2 100644 --- a/smartsim/_core/utils/redis.py +++ b/smartsim/_core/utils/redis.py @@ -30,9 +30,12 @@ import redis from rediscluster import RedisCluster from rediscluster.exceptions import ClusterDownError, RedisClusterException +from smartredis import Client +from smartredis.error import RedisReplyError logging.getLogger("rediscluster").setLevel(logging.WARNING) +from ...entity import DBModel, DBScript from ...error import SSInternalError from ...log import get_logger from ..config import CONFIG @@ -110,3 +113,106 @@ def check_cluster_status(hosts, ports, trials=10): # cov-wlm trials -= 1 if trials == 0: raise SSInternalError("Cluster setup could not be verified") + + +def db_is_active(hosts, ports, num_shards): + """Check if a DB is running + + if the DB is clustered, check cluster status, otherwise + just ping DB. + + :param hosts: list of hosts + :type hosts: list[str] + :param ports: list of ports + :type ports: list[int] + :param num_shards: Number of DB shards + :type num_shards: int + :return: Whether DB is running + :rtype: bool + """ + # if single shard + if num_shards < 2: + host = hosts[0] + port = ports[0] + try: + client = redis.Redis(host=host, port=port, db=0) + if client.ping(): + return True + return False + except redis.RedisError: + return False + # if a cluster + else: + try: + check_cluster_status(hosts, ports, trials=1) + return True + # we expect this to fail if the cluster is not active + except SSInternalError: + return False + + +def set_ml_model(db_model: DBModel, client: Client): + logger.debug(f"Adding DBModel named {db_model.name}") + devices = db_model._enumerate_devices() + + for device in devices: + try: + if db_model.is_file: + client.set_model_from_file( + name=db_model.name, + model_file=str(db_model.file), + backend=db_model.backend, + device=device, + batch_size=db_model.batch_size, + min_batch_size=db_model.min_batch_size, + tag=db_model.tag, + inputs=db_model.inputs, + outputs=db_model.outputs + ) + else: + client.set_model( + name=db_model.name, + model=db_model.model, + backend=db_model.backend, + device=device, + batch_size=db_model.batch_size, + min_batch_size=db_model.min_batch_size, + tag=db_model.tag, + inputs=db_model.inputs, + outputs=db_model.outputs + ) + except RedisReplyError as error: # pragma: no cover + logger.error("Error while setting model on orchestrator.") + raise error + + +def set_script(db_script: DBScript, client: Client): + logger.debug(f"Adding DBScript named {db_script.name}") + + devices = db_script._enumerate_devices() + + for device in devices: + try: + if db_script.is_file: + client.set_script_from_file( + name=db_script.name, + file=str(db_script.file), + device=device + ) + else: + if isinstance(db_script.script, str): + client.set_script( + name=db_script.name, + script=db_script.script, + device=device + ) + else: + client.set_function( + name=db_script.name, + function=db_script.script, + device=device + ) + + except RedisReplyError as error: # pragma: no cover + logger.error("Error while setting model on orchestrator.") + raise error \ No newline at end of file diff --git a/smartsim/database/orchestrator.py b/smartsim/database/orchestrator.py index 57bdd20ef..f45329b07 100644 --- a/smartsim/database/orchestrator.py +++ b/smartsim/database/orchestrator.py @@ -34,7 +34,7 @@ from smartredis import Client from smartredis.error import RedisReplyError -from .._core.utils import check_cluster_status +from .._core.utils import db_is_active from .._core.config import CONFIG from .._core.utils.helpers import is_valid_cmd from .._core.utils.network import get_ip_from_host @@ -261,25 +261,8 @@ def is_active(self): if not self._hosts: return False - # if single shard - if self.num_shards < 2: - host = self._hosts[0] - port = self.ports[0] - try: - client = redis.Redis(host=host, port=port, db=0) - if client.ping(): - return True - return False - except redis.RedisError: - return False - # if a cluster - else: - try: - check_cluster_status(self._hosts, self.ports, trials=1) - return True - # we expect this to fail if the cluster is not active - except SSInternalError: - return False + return db_is_active(self._hosts, self.ports, self.num_shards) + @property def _rai_module(self): diff --git a/smartsim/entity/__init__.py b/smartsim/entity/__init__.py index 0c8c54c48..3595b6c96 100644 --- a/smartsim/entity/__init__.py +++ b/smartsim/entity/__init__.py @@ -3,3 +3,4 @@ from .entity import SmartSimEntity from .entityList import EntityList from .model import Model +from .dbobject import * diff --git a/smartsim/entity/dbobject.py b/smartsim/entity/dbobject.py new file mode 100644 index 000000000..4f3dfa009 --- /dev/null +++ b/smartsim/entity/dbobject.py @@ -0,0 +1,204 @@ +from pathlib import Path +from .._core.utils.helpers import init_default + +__all__ = ["DBObject", "DBModel", "DBScript"] + +class DBObject: + def __init__(self, name, func, file_path, device, devices_per_node): + self.name = name + self.func = func + if file_path: + self.file = self._check_filepath(file_path) + else: + # Need to have this explicitly to check on it + self.file = None + self.device = self._check_device(device) + self.devices_per_node = devices_per_node + + @property + def is_file(self): + if self.func: + return False + return True + + @staticmethod + def _check_tensor_args(inputs, outputs): + inputs = init_default([], inputs, (list, str)) + outputs = init_default([], outputs, (list, str)) + if isinstance(inputs, str): + inputs = [inputs] + if isinstance(outputs, str): + outputs = [outputs] + return inputs, outputs + + @staticmethod + def _check_backend(backend): + backend = backend.upper() + all_backends = ["TF", "TORCH", "ONNX"] + if backend in all_backends: + return backend + else: + raise ValueError( + f"Backend type {backend} unsupported. Options are {all_backends}") + + @staticmethod + def _check_filepath(file): + file_path = Path(file).resolve() + if not file_path.is_file(): + raise FileNotFoundError(file_path) + return file_path + + @staticmethod + def _check_device(device): + device = device.upper() + if not device.startswith("CPU") and not device.startswith("GPU"): + raise ValueError("Device argument must start with either CPU or GPU") + return device + + def _enumerate_devices(self): + """Enumerate devices for a DBObject + + :param dbobject: DBObject to enumerate + :type dbobject: DBObject + :return: list of device names + :rtype: list[str] + """ + devices = [] + if ":" in self.device and self.devices_per_node > 1: + msg = "Cannot set devices_per_node>1 if a device numeral is specified, " + msg += f"the device was set to {self.device} and devices_per_node=={self.devices_per_node}" + raise ValueError(msg) + if self.device in ["CPU", "GPU"] and self.devices_per_node > 1: + for device_num in range(self.devices_per_node): + devices.append(f"{self.device}:{str(device_num)}") + else: + devices = [self.device] + + return devices + +class DBScript(DBObject): + + def __init__(self, + name, + script=None, + script_path=None, + device="CPU", + devices_per_node=1 + ): + """TorchScript code represenation + + Device selection is either "GPU" or "CPU". If many devices are + present, a number can be passed for specification e.g. "GPU:1". + + Setting ``devices_per_node=N``, with N greater than one will result + in the model being stored in the first N devices of type ``device``. + + One of either script (in memory representation) or script_path (file) + must be provided + + :param name: key to store script under + :type name: str + :param script: TorchScript code + :type script: str, optional + :param script_path: path to TorchScript code, defaults to None + :type script_path: str, optional + :param device: device for script execution, defaults to "CPU" + :type device: str, optional + """ + super().__init__(name, script, script_path, device, devices_per_node) + if not script and not script_path: + raise ValueError("Either script or script_path must be provided") + + @property + def script(self): + return self.func + + def __str__(self): + desc_str = "Name: " + self.name + "\n" + if self.func: + desc_str += "Func: " + self.func + "\n" + if self.file: + desc_str += "File path: " + str(self.file) + "\n" + devices_str = self.device + ("s per node\n" if self.devices_per_node > 1 else " per node\n") + desc_str += "Devices: " + str(self.devices_per_node) + " " + devices_str + return desc_str + + +class DBModel(DBObject): + def __init__(self, + name, + backend, + model=None, + model_file=None, + device="CPU", + devices_per_node=1, + batch_size=0, + min_batch_size=0, + min_batch_timeout=0, + tag="", + inputs=None, + outputs=None): + """A TF, TF-lite, PT, or ONNX model to load into the DB at runtime + + One of either model (in memory representation) or model_path (file) + must be provided + + :param name: key to store model under + :type name: str + :param model: model in memory + :type model: str, optional + :param model_file: serialized model + :type model_file: file path to model, optional + :param backend: name of the backend (TORCH, TF, TFLITE, ONNX) + :type backend: str + :param device: name of device for execution, defaults to "CPU" + :type device: str, optional + :param batch_size: batch size for execution, defaults to 0 + :type batch_size: int, optional + :param min_batch_size: minimum batch size for model execution, defaults to 0 + :type min_batch_size: int, optional + :param min_batch_timeout: time to wait for minimum batch size, defaults to 0 + :type min_batch_timeout: int, optional + :param tag: additional tag for model information, defaults to "" + :type tag: str, optional + :param inputs: model inputs (TF only), defaults to None + :type inputs: list[str], optional + :param outputs: model outupts (TF only), defaults to None + :type outputs: list[str], optional + """ + super().__init__(name, model, model_file, device, devices_per_node) + self.backend = self._check_backend(backend) + if not model and not model_file: + raise ValueError("Either model or model_file must be provided") + self.batch_size = batch_size + self.min_batch_size = min_batch_size + self.min_batch_timeout = min_batch_timeout + self.tag = tag + self.inputs, self.outputs = self._check_tensor_args(inputs, outputs) + + @property + def model(self): + return self.func + + def __str__(self): + desc_str = "Name: " + self.name + "\n" + if self.model: + desc_str += "Model stored in memory\n" + if self.file: + desc_str += "File path: " + str(self.file) + "\n" + devices_str = self.device + ("s per node\n" if self.devices_per_node > 1 else " per node\n") + desc_str += "Devices: " + str(self.devices_per_node) + " " + devices_str + desc_str += "Backend: " + str(self.backend) + "\n" + if self.batch_size: + desc_str += "Batch size: " + str(self.batch_size) + "\n" + if self.min_batch_size: + desc_str += "Min batch size: " + str(self.min_batch_size) + "\n" + if self.min_batch_timeout: + desc_str += "Min batch time out: " + str(self.min_batch_timeout) + "\n" + if self.tag: + desc_str += "Tag: " + self.tag + "\n" + if self.inputs: + desc_str += "Inputs: " + str(self.inputs) + "\n" + if self.outputs: + desc_str += "Outputs: " + str(self.outputs) + "\n" + return desc_str diff --git a/smartsim/entity/ensemble.py b/smartsim/entity/ensemble.py index 972579f3e..9f1794327 100644 --- a/smartsim/entity/ensemble.py +++ b/smartsim/entity/ensemble.py @@ -36,6 +36,7 @@ ) from ..log import get_logger from ..settings.base import BatchSettings, RunSettings +from .dbobject import DBModel, DBScript from .entityList import EntityList from .model import Model from .strategies import create_all_permutations, random_permutations, step_values @@ -90,6 +91,8 @@ def __init__( self._key_prefixing_enabled = True self.batch_settings = init_default({}, batch_settings, BatchSettings) self.run_settings = init_default({}, run_settings, RunSettings) + self._db_models = [] + self._db_scripts = [] super().__init__(name, getcwd(), perm_strat=perm_strat, **kwargs) @property @@ -186,6 +189,12 @@ def add_model(self, model): raise EntityExistsError( f"Model {model.name} already exists in ensemble {self.name}" ) + + if self._db_models: + self._extend_entity_db_models(model, self._db_models) + if self._db_scripts: + self._extend_entity_db_scripts(model, self._db_scripts) + self.entities.append(model) def register_incoming_entity(self, incoming_entity): @@ -298,3 +307,151 @@ def _read_model_parameters(self): + "Must be list, int, or string." ) return param_names, parameters + + + def add_ml_model(self, + name, + backend, + model=None, + model_path=None, + device="CPU", + devices_per_node=1, + batch_size=0, + min_batch_size=0, + tag="", + inputs=None, + outputs=None): + """A TF, TF-lite, PT, or ONNX model to load into the DB at runtime + + Each ML Model added will be loaded into an + orchestrator (converged or not) prior to the execution + of every entity belonging to this ensemble + + One of either model (in memory representation) or model_path (file) + must be provided + + :param name: key to store model under + :type name: str + :param model: model in memory + :type model: str, optional # TODO figure out what to type hint this as + :param model_path: serialized model + :type model_path: file path to model + :param backend: name of the backend (TORCH, TF, TFLITE, ONNX) + :type backend: str + :param device: name of device for execution, defaults to "CPU" + :type device: str, optional + :param batch_size: batch size for execution, defaults to 0 + :type batch_size: int, optional + :param min_batch_size: minimum batch size for model execution, defaults to 0 + :type min_batch_size: int, optional + :param tag: additional tag for model information, defaults to "" + :type tag: str, optional + :param inputs: model inputs (TF only), defaults to None + :type inputs: list[str], optional + :param outputs: model outupts (TF only), defaults to None + :type outputs: list[str], optional + """ + db_model = DBModel( + name=name, + backend=backend, + model=model, + model_file=model_path, + device=device, + devices_per_node=devices_per_node, + batch_size=batch_size, + min_batch_size=min_batch_size, + tag=tag, + inputs=inputs, + outputs=outputs + ) + self._db_models.append(db_model) + for entity in self: + self._extend_entity_db_models(entity, [db_model]) + + + def add_script(self, name, script=None, script_path=None, device="CPU", devices_per_node=1): + """TorchScript to launch with every entity belonging to this ensemble + + Each script added to the model will be loaded into an + orchestrator (converged or not) prior to the execution + of every entity belonging to this ensemble + + Device selection is either "GPU" or "CPU". If many devices are + present, a number can be passed for specification e.g. "GPU:1". + + Setting ``devices_per_node=N``, with N greater than one will result + in the model being stored in the first N devices of type ``device``. + + One of either script (in memory string representation) or script_path (file) + must be provided + + :param name: key to store script under + :type name: str + :param script: TorchScript code + :type script: str, optional + :param script_path: path to TorchScript code + :type script_path: str, optional + :param device: device for script execution, defaults to "CPU" + :type device: str, optional + :param devices_per_node: number of devices on each host + :type devices_per_node: int + """ + db_script = DBScript( + name=name, + script=script, + script_path=script_path, + device=device, + devices_per_node=devices_per_node + ) + self._db_scripts.append(db_script) + for entity in self: + self._extend_entity_db_scripts(entity, [db_script]) + + + def add_function(self, name, function=None, device="CPU", devices_per_node=1): + """TorchScript function to launch with every entity belonging to this ensemble + + Each script function to the model will be loaded into a + non-converged orchestrator prior to the execution + of every entity belonging to this ensemble. + + For converged orchestrators, the :meth:`add_script` method should be used. + + Device selection is either "GPU" or "CPU". If many devices are + present, a number can be passed for specification e.g. "GPU:1". + + Setting ``devices_per_node=N``, with N greater than one will result + in the model being stored in the first N devices of type ``device``. + + :param name: key to store function under + :type name: str + :param script: TorchScript code + :type script: str, optional + :param script_path: path to TorchScript code + :type script_path: str, optional + :param device: device for script execution, defaults to "CPU" + :type device: str, optional + :param devices_per_node: number of devices on each host + :type devices_per_node: int + """ + db_script = DBScript( + name=name, + script=function, + device=device, + devices_per_node=devices_per_node + ) + self._db_scripts.append(db_script) + for entity in self: + self._extend_entity_db_scripts(entity, [db_script]) + + def _extend_entity_db_models(self, model, db_models): + entity_db_models = [db_model.name for db_model in model._db_models] + for db_model in db_models: + if not db_model.name in entity_db_models: + model._append_db_model(db_model) + + def _extend_entity_db_scripts(self, model, db_scripts): + entity_db_scripts = [db_script.name for db_script in model._db_scripts] + for db_script in db_scripts: + if not db_script.name in entity_db_scripts: + model._append_db_script(db_script) \ No newline at end of file diff --git a/smartsim/entity/model.py b/smartsim/entity/model.py index 5b467559a..c4a969cf6 100644 --- a/smartsim/entity/model.py +++ b/smartsim/entity/model.py @@ -29,6 +29,7 @@ from ..error import EntityExistsError, SSUnsupportedError from .entity import SmartSimEntity from .files import EntityFiles +from .dbobject import DBScript, DBModel class Model(SmartSimEntity): @@ -54,6 +55,8 @@ def __init__(self, name, params, path, run_settings, params_as_args=None): self.params_as_args = params_as_args self.incoming_entities = [] self._key_prefixing_enabled = False + self._db_models = [] + self._db_scripts = [] self.files = None @property @@ -195,8 +198,12 @@ def colocate_db(self, colo_db_config["extra_db_args"] = dict([ (k,str(v)) for k,v in kwargs.items() if k not in colo_db_config["rai_args"] ]) - self.run_settings.colocated_db_settings = colo_db_config + self._check_db_objects_colo() + colo_db_config["db_models"] = self._db_models + colo_db_config["db_scripts"] = self._db_scripts + + self.run_settings.colocated_db_settings = colo_db_config def params_to_args(self): """Convert parameters to command line arguments and update run settings.""" @@ -213,6 +220,137 @@ def params_to_args(self): ) self.run_settings.add_exe_args(cat_arg_and_value(param, self.params[param])) + def add_ml_model(self, + name, + backend, + model=None, + model_path=None, + device="CPU", + devices_per_node=1, + batch_size=0, + min_batch_size=0, + tag="", + inputs=None, + outputs=None): + """A TF, TF-lite, PT, or ONNX model to load into the DB at runtime + + Each ML Model added will be loaded into an + orchestrator (converged or not) prior to the execution + of this Model instance + + One of either model (in memory representation) or model_path (file) + must be provided + + :param name: key to store model under + :type name: str + :param model: model in memory + :type model: byte string, optional + :param model_path: serialized model + :type model_path: file path to model + :param backend: name of the backend (TORCH, TF, TFLITE, ONNX) + :type backend: str + :param device: name of device for execution, defaults to "CPU" + :type device: str, optional + :param batch_size: batch size for execution, defaults to 0 + :type batch_size: int, optional + :param min_batch_size: minimum batch size for model execution, defaults to 0 + :type min_batch_size: int, optional + :param tag: additional tag for model information, defaults to "" + :type tag: str, optional + :param inputs: model inputs (TF only), defaults to None + :type inputs: list[str], optional + :param outputs: model outupts (TF only), defaults to None + :type outputs: list[str], optional + """ + db_model = DBModel( + name=name, + backend=backend, + model=model, + model_file=model_path, + device=device, + devices_per_node=devices_per_node, + batch_size=batch_size, + min_batch_size=min_batch_size, + tag=tag, + inputs=inputs, + outputs=outputs + ) + self._append_db_model(db_model) + + + + def add_script(self, name, script=None, script_path=None, device="CPU", devices_per_node=1): + """TorchScript to launch with this Model instance + + Each script added to the model will be loaded into an + orchestrator (converged or not) prior to the execution + of this Model instance + + Device selection is either "GPU" or "CPU". If many devices are + present, a number can be passed for specification e.g. "GPU:1". + + Setting ``devices_per_node=N``, with N greater than one will result + in the model being stored in the first N devices of type ``device``. + + One of either script (in memory string representation) or script_path (file) + must be provided + + :param name: key to store script under + :type name: str + :param script: TorchScript code + :type script: str, optional + :param script_path: path to TorchScript code + :type script_path: str, optional + :param device: device for script execution, defaults to "CPU" + :type device: str, optional + :param devices_per_node: number of devices on each host + :type devices_per_node: int + """ + db_script = DBScript( + name=name, + script=script, + script_path=script_path, + device=device, + devices_per_node=devices_per_node + ) + self._append_db_script(db_script) + + + + def add_function(self, name, function=None, device="CPU", devices_per_node=1): + """TorchScript function to launch with this Model instance + + Each script function to the model will be loaded into a + non-converged orchestrator prior to the execution + of this Model instance. + + For converged orchestrators, the :meth:`add_script` method should be used. + + Device selection is either "GPU" or "CPU". If many devices are + present, a number can be passed for specification e.g. "GPU:1". + + Setting ``devices_per_node=N``, with N greater than one will result + in the model being stored in the first N devices of type ``device``. + + :param name: key to store function under + :type name: str + :param script: TorchScript code + :type script: str or byte string, optional + :param script_path: path to TorchScript code + :type script_path: str, optional + :param device: device for script execution, defaults to "CPU" + :type device: str, optional + :param devices_per_node: number of devices on each host + :type devices_per_node: int + """ + db_script = DBScript( + name=name, + script=function, + device=device, + devices_per_node=devices_per_node + ) + self._append_db_script(db_script) + def __eq__(self, other): if self.name == other.name: return True @@ -221,5 +359,45 @@ def __eq__(self, other): def __str__(self): # pragma: no cover entity_str = "Name: " + self.name + "\n" entity_str += "Type: " + self.type + "\n" - entity_str += str(self.run_settings) + entity_str += str(self.run_settings) + "\n" + if self._db_models: + entity_str += "DB Models: \n" + str(len(self._db_models)) + "\n" + if self._db_scripts: + entity_str += "DB Scripts: \n" + str(len(self._db_scripts)) + "\n" return entity_str + + + def _append_db_model(self, db_model): + if not db_model.is_file and self.colocated: + err_msg = "ML model can not be set from memory for colocated databases.\n" + err_msg += f"Please store the ML model named {db_model.name} in binary format " + err_msg += "and add it to the SmartSim Model as file." + raise SSUnsupportedError(err_msg) + + self._db_models.append(db_model) + + def _append_db_script(self, db_script): + if db_script.func and self.colocated: + if not isinstance(db_script.func, str): + err_msg = "Functions can not be set from memory for colocated databases.\n" + err_msg += f"Please convert the function named {db_script.name} to a string or store " + err_msg += "it as a text file and add it to the SmartSim Model with add_script." + raise SSUnsupportedError(err_msg) + self._db_scripts.append(db_script) + + def _check_db_objects_colo(self): + + for db_model in self._db_models: + if not db_model.is_file: + err_msg = "ML model can not be set from memory for colocated databases.\n" + err_msg += f"Please store the ML model named {db_model.name} in binary format " + err_msg += "and add it to the SmartSim Model as file." + raise SSUnsupportedError(err_msg) + + for db_script in self._db_scripts: + if db_script.func: + if not isinstance(db_script.func, str): + err_msg = "Functions can not be set from memory for colocated databases.\n" + err_msg += f"Please convert the function named {db_script.name} to a string or store it " + err_msg += "as a text file and add it to the SmartSim Model with add_script." + raise SSUnsupportedError(err_msg) diff --git a/smartsim/ml/tf/utils.py b/smartsim/ml/tf/utils.py index a43cc8b8d..496c28d9f 100644 --- a/smartsim/ml/tf/utils.py +++ b/smartsim/ml/tf/utils.py @@ -64,6 +64,8 @@ def serialize_model(model): :param model: TensorFlow or Keras model :type model: tf.Module + :return: serialized model, model input layer names, model output layer names + :rtype: str, list[str], list[str] """ full_model = tf.function(lambda x: model(x)) diff --git a/tests/backends/test_dbmodel.py b/tests/backends/test_dbmodel.py new file mode 100644 index 000000000..84229f952 --- /dev/null +++ b/tests/backends/test_dbmodel.py @@ -0,0 +1,384 @@ +import sys +import pytest + +from smartsim import Experiment, status +import smartsim +from smartsim._core.utils import installed_redisai_backends +from smartsim.error.errors import SSUnsupportedError + +should_run = True + +try: + import tensorflow.keras as keras + from tensorflow.keras.layers import Conv2D, Input +except ImportError: + should_run = False + +should_run &= "tensorflow" in installed_redisai_backends() + +class Net(keras.Model): + def __init__(self): + super(Net, self).__init__(name="cnn") + self.conv = Conv2D(1, 3, 1) + + def call(self, x): + y = self.conv(x) + return y + + +def save_tf_cnn(path, file_name): + """Create a Keras CNN for testing purposes + + """ + from smartsim.ml.tf import freeze_model + n = Net() + input_shape = (3,3,1) + n.build(input_shape=(None,*input_shape)) + inputs = Input(input_shape) + outputs = n(inputs) + model = keras.Model(inputs=inputs, outputs=outputs, name=n.name) + + return freeze_model(model, path, file_name) + + +def create_tf_cnn(): + """Create a Keras CNN for testing purposes + + """ + from smartsim.ml.tf import serialize_model + n = Net() + input_shape = (3,3,1) + inputs = Input(input_shape) + outputs = n(inputs) + model = keras.Model(inputs=inputs, outputs=outputs, name=n.name) + + return serialize_model(model) + + +@pytest.mark.skipif(not should_run, reason="Test needs TF to run") +def test_db_model(fileutils): + """Test DB Models on remote DB""" + + exp_name = "test-db-model" + + # get test setup + test_dir = fileutils.make_test_dir() + sr_test_script = fileutils.get_test_conf_path("run_dbmodel_smartredis.py") + + exp = Experiment(exp_name, exp_path=test_dir, launcher="local") + # create colocated model + run_settings = exp.create_run_settings( + exe=sys.executable, + exe_args=sr_test_script + ) + + smartsim_model = exp.create_model("smartsim_model", run_settings) + smartsim_model.set_path(test_dir) + + db = exp.create_database(port=6780, interface="lo") + exp.generate(db) + + model, inputs, outputs = create_tf_cnn() + model_file2, inputs2, outputs2 = save_tf_cnn(test_dir, "model2.pb") + + smartsim_model.add_ml_model("cnn", "TF", model=model, device="CPU", inputs=inputs, outputs=outputs, tag="test") + smartsim_model.add_ml_model("cnn2", "TF", model_path=model_file2, device="CPU", inputs=inputs2, outputs=outputs2, tag="test") + + for db_model in smartsim_model._db_models: + print(db_model) + + # Assert we have added both models + assert(len(smartsim_model._db_models) == 2) + + exp.start(db, smartsim_model, block=True) + statuses = exp.get_status(smartsim_model) + exp.stop(db) + assert all([stat == status.STATUS_COMPLETED for stat in statuses]) + + +@pytest.mark.skipif(not should_run, reason="Test needs TF to run") +def test_db_model_ensemble(fileutils): + """Test DBModels on remote DB, with an ensemble""" + + exp_name = "test-db-model-ensemble" + + # get test setup + test_dir = fileutils.make_test_dir() + sr_test_script = fileutils.get_test_conf_path("run_dbmodel_smartredis.py") + + exp = Experiment(exp_name, exp_path=test_dir, launcher="local") + # create colocated model + run_settings = exp.create_run_settings( + exe=sys.executable, + exe_args=sr_test_script + ) + + smartsim_ensemble = exp.create_ensemble("smartsim_model", run_settings=run_settings, replicas=2) + smartsim_ensemble.set_path(test_dir) + + smartsim_model = exp.create_model("smartsim_model", run_settings) + smartsim_model.set_path(test_dir) + + db = exp.create_database(port=6780, interface="lo") + exp.generate(db) + + model, inputs, outputs = create_tf_cnn() + model_file2, inputs2, outputs2 = save_tf_cnn(test_dir, "model2.pb") + + smartsim_ensemble.add_ml_model("cnn", "TF", model=model, device="CPU", inputs=inputs, outputs=outputs) + + for entity in smartsim_ensemble: + entity.disable_key_prefixing() + entity.add_ml_model("cnn2", "TF", model_path=model_file2, device="CPU", inputs=inputs2, outputs=outputs2) + + # Ensemble must add all available DBModels to new entity + smartsim_ensemble.add_model(smartsim_model) + smartsim_model.add_ml_model("cnn2", "TF", model_path=model_file2, device="CPU", inputs=inputs2, outputs=outputs2) + + # Assert we have added one model to the ensemble + assert(len(smartsim_ensemble._db_models) == 1) + # Assert we have added two models to each entity + assert(all([len(entity._db_models)==2 for entity in smartsim_ensemble])) + + exp.start(db, smartsim_ensemble, block=True) + statuses = exp.get_status(smartsim_ensemble) + exp.stop(db) + assert all([stat == status.STATUS_COMPLETED for stat in statuses]) + + +@pytest.mark.skipif(not should_run, reason="Test needs TF to run") +def test_colocated_db_model(fileutils): + """Test DB Models on colocated DB""" + + exp_name = "test-colocated-db-model" + exp = Experiment(exp_name, launcher="local") + + # get test setup + test_dir = fileutils.make_test_dir() + sr_test_script = fileutils.get_test_conf_path("run_dbmodel_smartredis.py") + + # create colocated model + colo_settings = exp.create_run_settings( + exe=sys.executable, + exe_args=sr_test_script + ) + + colo_model = exp.create_model("colocated_model", colo_settings) + colo_model.set_path(test_dir) + colo_model.colocate_db( + port=6780, + db_cpus=1, + limit_app_cpus=False, + debug=True, + ifname="lo" + ) + + model_file, inputs, outputs = save_tf_cnn(test_dir, "model1.pb") + model_file2, inputs2, outputs2 = save_tf_cnn(test_dir, "model2.pb") + + colo_model.add_ml_model("cnn", "TF", model_path=model_file, device="CPU", inputs=inputs, outputs=outputs) + colo_model.add_ml_model("cnn2", "TF", model_path=model_file2, device="CPU", inputs=inputs2, outputs=outputs2) + + # Assert we have added both models + assert(len(colo_model._db_models) == 2) + + exp.start(colo_model, block=True) + statuses = exp.get_status(colo_model) + assert all([stat == status.STATUS_COMPLETED for stat in statuses]) + + +@pytest.mark.skipif(not should_run, reason="Test needs TF to run") +def test_colocated_db_model_ensemble(fileutils): + """Test DBModel on colocated ensembles, first colocating DB, + then adding DBModel. + """ + + exp_name = "test-colocated-db-model-ensemble" + + # get test setup + test_dir = fileutils.make_test_dir() + exp = Experiment(exp_name, launcher="local", exp_path=test_dir) + sr_test_script = fileutils.get_test_conf_path("run_dbmodel_smartredis.py") + + # create colocated model + colo_settings = exp.create_run_settings( + exe=sys.executable, + exe_args=sr_test_script + ) + + colo_ensemble = exp.create_ensemble("colocated_ens", run_settings=colo_settings, replicas=2) + colo_ensemble.set_path(test_dir) + + colo_model = exp.create_model("colocated_model", colo_settings) + colo_model.set_path(test_dir) + colo_model.colocate_db( + port=6780, + db_cpus=1, + limit_app_cpus=False, + debug=True, + ifname="lo" + ) + + model_file, inputs, outputs = save_tf_cnn(test_dir, "model1.pb") + model_file2, inputs2, outputs2 = save_tf_cnn(test_dir, "model2.pb") + + for i, entity in enumerate(colo_ensemble): + entity.colocate_db( + port=6780+i, + db_cpus=1, + limit_app_cpus=False, + debug=True, + ifname="lo" + ) + # Test that models added individually do not conflict with enemble ones + entity.add_ml_model("cnn2", "TF", model_path=model_file2, device="CPU", inputs=inputs2, outputs=outputs2) + + # Test adding a model from ensemble + colo_ensemble.add_ml_model("cnn", "TF", model_path=model_file, device="CPU", inputs=inputs, outputs=outputs, tag="test") + + # Ensemble should add all available DBModels to new model + colo_ensemble.add_model(colo_model) + colo_model.colocate_db( + port=6780+len(colo_ensemble), + db_cpus=1, + limit_app_cpus=False, + debug=True, + ifname="lo" + ) + colo_model.add_ml_model("cnn2", "TF", model_path=model_file2, device="CPU", inputs=inputs2, outputs=outputs2) + + + exp.start(colo_ensemble, block=True) + statuses = exp.get_status(colo_ensemble) + assert all([stat == status.STATUS_COMPLETED for stat in statuses]) + + +@pytest.mark.skipif(not should_run, reason="Test needs TF to run") +def test_colocated_db_model_ensemble_reordered(fileutils): + """Test DBModel on colocated ensembles, first adding the DBModel to the + ensemble, then colocating DB. + """ + + exp_name = "test-colocated-db-model-ensemble-reordered" + + # get test setup + test_dir = fileutils.make_test_dir() + exp = Experiment(exp_name, launcher="local", exp_path=test_dir) + sr_test_script = fileutils.get_test_conf_path("run_dbmodel_smartredis.py") + + # create colocated model + colo_settings = exp.create_run_settings( + exe=sys.executable, + exe_args=sr_test_script + ) + + colo_ensemble = exp.create_ensemble("colocated_ens", run_settings=colo_settings, replicas=2) + colo_ensemble.set_path(test_dir) + + colo_model = exp.create_model("colocated_model", colo_settings) + colo_model.set_path(test_dir) + + model_file, inputs, outputs = save_tf_cnn(test_dir, "model1.pb") + model_file2, inputs2, outputs2 = save_tf_cnn(test_dir, "model2.pb") + + # Test adding a model from ensemble + colo_ensemble.add_ml_model("cnn", "TF", model_path=model_file, device="CPU", inputs=inputs, outputs=outputs) + + for i, entity in enumerate(colo_ensemble): + entity.colocate_db( + port=6780+i, + db_cpus=1, + limit_app_cpus=False, + debug=True, + ifname="lo" + ) + # Test that models added individually do not conflict with enemble ones + entity.add_ml_model("cnn2", "TF", model_path=model_file2, device="CPU", inputs=inputs2, outputs=outputs2) + + + # Ensemble should add all available DBModels to new model + colo_ensemble.add_model(colo_model) + colo_model.colocate_db( + port=6780+len(colo_ensemble), + db_cpus=1, + limit_app_cpus=False, + debug=True, + ifname="lo" + ) + colo_model.add_ml_model("cnn2", "TF", model_path=model_file2, device="CPU", inputs=inputs2, outputs=outputs2) + + exp.start(colo_ensemble, block=True) + statuses = exp.get_status(colo_ensemble) + assert all([stat == status.STATUS_COMPLETED for stat in statuses]) + + +@pytest.mark.skipif(not should_run, reason="Test needs TF to run") +def test_colocated_db_model_errors(fileutils): + """Test error when colocated db model has no file.""" + + exp_name = "test-colocated-db-model-error" + exp = Experiment(exp_name, launcher="local") + + # get test setup + test_dir = fileutils.make_test_dir() + sr_test_script = fileutils.get_test_conf_path("run_dbmodel_smartredis.py") + + # create colocated model + colo_settings = exp.create_run_settings( + exe=sys.executable, + exe_args=sr_test_script + ) + + colo_model = exp.create_model("colocated_model", colo_settings) + colo_model.set_path(test_dir) + colo_model.colocate_db( + port=6780, + db_cpus=1, + limit_app_cpus=False, + debug=True, + ifname="lo" + ) + + model, inputs, outputs = create_tf_cnn() + + with pytest.raises(SSUnsupportedError): + colo_model.add_ml_model("cnn", "TF", model=model, device="CPU", inputs=inputs, outputs=outputs) + + + colo_ensemble = exp.create_ensemble("colocated_ens", run_settings=colo_settings, replicas=2) + colo_ensemble.set_path(test_dir) + for i, entity in enumerate(colo_ensemble): + entity.colocate_db( + port=6780+i, + db_cpus=1, + limit_app_cpus=False, + debug=True, + ifname="lo" + ) + + with pytest.raises(SSUnsupportedError): + colo_ensemble.add_ml_model("cnn", "TF", model=model, device="CPU", inputs=inputs, outputs=outputs) + + # Check errors for reverse order of DBModel addition and DB colocation + # create colocated model + colo_settings2 = exp.create_run_settings( + exe=sys.executable, + exe_args=sr_test_script + ) + + # Reverse order of DBModel and model + colo_ensemble2 = exp.create_ensemble("colocated_ens", run_settings=colo_settings2, replicas=2) + colo_ensemble2.set_path(test_dir) + colo_ensemble2.add_ml_model("cnn", "TF", model=model, device="CPU", inputs=inputs, outputs=outputs) + for i, entity in enumerate(colo_ensemble2): + with pytest.raises(SSUnsupportedError): + entity.colocate_db( + port=6780+i, + db_cpus=1, + limit_app_cpus=False, + debug=True, + ifname="lo" + ) + + with pytest.raises(SSUnsupportedError): + colo_ensemble.add_model(colo_model) diff --git a/tests/backends/test_dbscript.py b/tests/backends/test_dbscript.py new file mode 100644 index 000000000..78f6ed032 --- /dev/null +++ b/tests/backends/test_dbscript.py @@ -0,0 +1,356 @@ +import sys +import pytest + +from smartsim import Experiment, status +from smartsim._core.utils import installed_redisai_backends +from smartsim.error.errors import SSUnsupportedError + +should_run = True + +try: + import torch +except ImportError: + should_run = False + +should_run &= "torch" in installed_redisai_backends() + +def timestwo(x): + return 2*x + + +@pytest.mark.skipif(not should_run, reason="Test needs Torch to run") +def test_db_script(fileutils): + """Test DB scripts on remote DB""" + + exp_name = "test-db-script" + + # get test setup + test_dir = fileutils.make_test_dir() + sr_test_script = fileutils.get_test_conf_path("run_dbscript_smartredis.py") + torch_script = fileutils.get_test_conf_path("torchscript.py") + + exp = Experiment(exp_name, exp_path=test_dir, launcher="local") + # create colocated model + run_settings = exp.create_run_settings( + exe=sys.executable, + exe_args=sr_test_script + ) + + smartsim_model = exp.create_model("smartsim_model", run_settings) + smartsim_model.set_path(test_dir) + + db = exp.create_database(port=6780, interface="lo") + exp.generate(db) + + torch_script_str = "def negate(x):\n\treturn torch.neg(x)\n" + + smartsim_model.add_script("test_script1", script_path=torch_script, device="CPU") + smartsim_model.add_script("test_script2", script=torch_script_str, device="CPU") + smartsim_model.add_function("test_func", function=timestwo, device="CPU") + + # Assert we have all three models + assert(len(smartsim_model._db_scripts) == 3) + + exp.start(db, smartsim_model, block=True) + statuses = exp.get_status(smartsim_model) + exp.stop(db) + assert all([stat == status.STATUS_COMPLETED for stat in statuses]) + + +@pytest.mark.skipif(not should_run, reason="Test needs Torch to run") +def test_db_script_ensemble(fileutils): + """Test DB scripts on remote DB""" + + exp_name = "test-db-script" + + # get test setup + test_dir = fileutils.make_test_dir() + sr_test_script = fileutils.get_test_conf_path("run_dbscript_smartredis.py") + torch_script = fileutils.get_test_conf_path("torchscript.py") + + exp = Experiment(exp_name, exp_path=test_dir, launcher="local") + # create colocated model + run_settings = exp.create_run_settings( + exe=sys.executable, + exe_args=sr_test_script + ) + + ensemble = exp.create_ensemble("dbscript_ensemble", run_settings=run_settings, replicas=2) + ensemble.set_path(test_dir) + + smartsim_model = exp.create_model("smartsim_model", run_settings) + smartsim_model.set_path(test_dir) + + db = exp.create_database(port=6780, interface="lo") + exp.generate(db) + + torch_script_str = "def negate(x):\n\treturn torch.neg(x)\n" + + ensemble.add_script("test_script1", script_path=torch_script, device="CPU") + + for entity in ensemble: + entity.disable_key_prefixing() + entity.add_script("test_script2", script=torch_script_str, device="CPU") + + ensemble.add_function("test_func", function=timestwo, device="CPU") + + # Ensemble must add all available DBScripts to new entity + ensemble.add_model(smartsim_model) + smartsim_model.add_script("test_script2", script=torch_script_str, device="CPU") + + # Assert we have added both models to the ensemble + assert(len(ensemble._db_scripts) == 2) + # Assert we have added all three models to entities in ensemble + assert(all([len(entity._db_scripts) == 3 for entity in ensemble])) + + exp.start(db, ensemble, block=True) + statuses = exp.get_status(ensemble) + exp.stop(db) + assert all([stat == status.STATUS_COMPLETED for stat in statuses]) + + +@pytest.mark.skipif(not should_run, reason="Test needs Torch to run") +def test_colocated_db_script(fileutils): + """Test DB Scripts on colocated DB""" + + exp_name = "test-colocated-db-script" + exp = Experiment(exp_name, launcher="local") + + # get test setup + test_dir = fileutils.make_test_dir() + sr_test_script = fileutils.get_test_conf_path("run_dbscript_smartredis.py") + torch_script = fileutils.get_test_conf_path("torchscript.py") + + # create colocated model + colo_settings = exp.create_run_settings( + exe=sys.executable, + exe_args=sr_test_script + ) + + colo_model = exp.create_model("colocated_model", colo_settings) + colo_model.set_path(test_dir) + colo_model.colocate_db( + port=6780, + db_cpus=1, + limit_app_cpus=False, + debug=True, + ifname="lo" + ) + + torch_script_str = "def negate(x):\n\treturn torch.neg(x)\n" + + colo_model.add_script("test_script1", script_path=torch_script, device="CPU") + colo_model.add_script("test_script2", script=torch_script_str, device="CPU") + + # Assert we have added both models + assert(len(colo_model._db_scripts) == 2) + + for db_script in colo_model._db_scripts: + print(db_script) + + exp.start(colo_model, block=True) + statuses = exp.get_status(colo_model) + assert all([stat == status.STATUS_COMPLETED for stat in statuses]) + + +@pytest.mark.skipif(not should_run, reason="Test needs Torch to run") +def test_colocated_db_script_ensemble(fileutils): + """Test DB Scripts on colocated DB from ensemble, first colocating DB, + then adding script. + """ + + exp_name = "test-colocated-db-script" + exp = Experiment(exp_name, launcher="local") + + # get test setup + test_dir = fileutils.make_test_dir() + sr_test_script = fileutils.get_test_conf_path("run_dbscript_smartredis.py") + torch_script = fileutils.get_test_conf_path("torchscript.py") + + # create colocated model + colo_settings = exp.create_run_settings( + exe=sys.executable, + exe_args=sr_test_script + ) + + colo_ensemble = exp.create_ensemble("colocated_ensemble", run_settings=colo_settings, replicas=2) + colo_ensemble.set_path(test_dir) + + colo_model = exp.create_model("colocated_model", colo_settings) + colo_model.set_path(test_dir) + + for i, entity in enumerate(colo_ensemble): + entity.disable_key_prefixing() + entity.colocate_db( + port=6780+i, + db_cpus=1, + limit_app_cpus=False, + debug=True, + ifname="lo" + ) + + entity.add_script("test_script1", script_path=torch_script, device="CPU") + + colo_model.colocate_db( + port=6780+len(colo_ensemble), + db_cpus=1, + limit_app_cpus=False, + debug=True, + ifname="lo" + ) + + torch_script_str = "def negate(x):\n\treturn torch.neg(x)\n" + + colo_ensemble.add_script("test_script2", script=torch_script_str, device="CPU") + + colo_ensemble.add_model(colo_model) + colo_model.add_script("test_script1", script_path=torch_script, device="CPU") + + # Assert we have added one model to the ensemble + assert(len(colo_ensemble._db_scripts) == 1) + # Assert we have added both models to each entity + assert(all([len(entity._db_scripts)==2 for entity in colo_ensemble])) + + + exp.start(colo_ensemble, block=True) + statuses = exp.get_status(colo_ensemble) + assert all([stat == status.STATUS_COMPLETED for stat in statuses]) + + +@pytest.mark.skipif(not should_run, reason="Test needs Torch to run") +def test_colocated_db_script_ensemble_reordered(fileutils): + """Test DB Scripts on colocated DB from ensemble, first adding the + script to the ensemble, then colocating the DB""" + + exp_name = "test-colocated-db-script" + exp = Experiment(exp_name, launcher="local") + + # get test setup + test_dir = fileutils.make_test_dir() + sr_test_script = fileutils.get_test_conf_path("run_dbscript_smartredis.py") + torch_script = fileutils.get_test_conf_path("torchscript.py") + + # create colocated model + colo_settings = exp.create_run_settings( + exe=sys.executable, + exe_args=sr_test_script + ) + + colo_ensemble = exp.create_ensemble("colocated_ensemble", run_settings=colo_settings, replicas=2) + colo_ensemble.set_path(test_dir) + + colo_model = exp.create_model("colocated_model", colo_settings) + colo_model.set_path(test_dir) + + torch_script_str = "def negate(x):\n\treturn torch.neg(x)\n" + colo_ensemble.add_script("test_script2", script=torch_script_str, device="CPU") + + for i, entity in enumerate(colo_ensemble): + entity.disable_key_prefixing() + entity.colocate_db( + port=6780+i, + db_cpus=1, + limit_app_cpus=False, + debug=True, + ifname="lo" + ) + + entity.add_script("test_script1", script_path=torch_script, device="CPU") + + colo_model.colocate_db( + port=6780+len(colo_ensemble), + db_cpus=1, + limit_app_cpus=False, + debug=True, + ifname="lo" + ) + + colo_ensemble.add_model(colo_model) + colo_model.add_script("test_script1", script_path=torch_script, device="CPU") + + # Assert we have added one model to the ensemble + assert(len(colo_ensemble._db_scripts) == 1) + # Assert we have added both models to each entity + assert(all([len(entity._db_scripts)==2 for entity in colo_ensemble])) + + + exp.start(colo_ensemble, block=True) + statuses = exp.get_status(colo_ensemble) + assert all([stat == status.STATUS_COMPLETED for stat in statuses]) + + +@pytest.mark.skipif(not should_run, reason="Test needs Torch to run") +def test_db_script_errors(fileutils): + """Test DB Scripts error when setting a serialized function on colocated DB""" + + exp_name = "test-colocated-db-script" + exp = Experiment(exp_name, launcher="local") + + # get test setup + test_dir = fileutils.make_test_dir() + sr_test_script = fileutils.get_test_conf_path("run_dbscript_smartredis.py") + + # create colocated model + colo_settings = exp.create_run_settings( + exe=sys.executable, + exe_args=sr_test_script + ) + + colo_model = exp.create_model("colocated_model", colo_settings) + colo_model.set_path(test_dir) + colo_model.colocate_db( + port=6780, + db_cpus=1, + limit_app_cpus=False, + debug=True, + ifname="lo" + ) + + with pytest.raises(SSUnsupportedError): + colo_model.add_function("test_func", function=timestwo, device="CPU") + + # create colocated model + colo_settings = exp.create_run_settings( + exe=sys.executable, + exe_args=sr_test_script + ) + + colo_ensemble = exp.create_ensemble("colocated_ensemble", run_settings=colo_settings, replicas=2) + colo_ensemble.set_path(test_dir) + + for i, entity in enumerate(colo_ensemble): + entity.colocate_db( + port=6780+i, + db_cpus=1, + limit_app_cpus=False, + debug=True, + ifname="lo" + ) + + with pytest.raises(SSUnsupportedError): + colo_ensemble.add_function("test_func", function=timestwo, device="CPU") + + # create colocated model + colo_settings = exp.create_run_settings( + exe=sys.executable, + exe_args=sr_test_script + ) + + colo_ensemble = exp.create_ensemble("colocated_ensemble", run_settings=colo_settings, replicas=2) + colo_ensemble.set_path(test_dir) + + colo_ensemble.add_function("test_func", function=timestwo, device="CPU") + + for i, entity in enumerate(colo_ensemble): + with pytest.raises(SSUnsupportedError): + entity.colocate_db( + port=6780+i, + db_cpus=1, + limit_app_cpus=False, + debug=True, + ifname="lo" + ) + + + with pytest.raises(SSUnsupportedError): + colo_ensemble.add_model(colo_model) diff --git a/tests/test_configs/run_dbmodel_smartredis.py b/tests/test_configs/run_dbmodel_smartredis.py new file mode 100644 index 000000000..e94dd73dd --- /dev/null +++ b/tests/test_configs/run_dbmodel_smartredis.py @@ -0,0 +1,26 @@ +import numpy as np +from smartredis import Client + +def main(): + # address should be set as we are launching through + # SmartSim. + client = Client(cluster=False) + + array = np.ones((1, 3, 3, 1)).astype(np.single) + client.put_tensor("test_array", array) + assert client.poll_model("cnn", 500, 30) + client.run_model("cnn", ["test_array"], ["test_output"]) + returned = client.get_tensor("test_output") + + assert returned.shape == (1, 1, 1, 1) + + array = np.ones((1, 3, 3, 1)).astype(np.single) + assert client.poll_model("cnn2", 500, 30) + client.run_model("cnn2", ["test_array"], ["test_output"]) + returned = client.get_tensor("test_output") + + assert returned.shape == (1, 1, 1, 1) + print(f"Test worked!") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/tests/test_configs/run_dbscript_smartredis.py b/tests/test_configs/run_dbscript_smartredis.py new file mode 100644 index 000000000..e88a9540c --- /dev/null +++ b/tests/test_configs/run_dbscript_smartredis.py @@ -0,0 +1,31 @@ +import numpy as np +from smartredis import Client +from pytest import approx + +def main(): + # address should be set as we are launching through + # SmartSim. + client = Client(cluster=False) + + array = np.ones((1, 3, 3, 1)).astype(np.single) + client.put_tensor("test_array", array) + assert client.poll_model("test_script1", 500, 30) + client.run_script("test_script1", "average", ["test_array"], ["test_output"]) + returned = client.get_tensor("test_output") + assert returned == approx(np.mean(array)) + + assert client.poll_model("test_script2", 500, 30) + client.run_script("test_script2", "negate", ["test_array"], ["test_output"]) + returned = client.get_tensor("test_output") + + assert returned == approx(-array) + + if client.model_exists("test_func"): + client.run_script("test_func", "timestwo", ["test_array"], ["test_output"]) + returned = client.get_tensor("test_output") + assert returned == approx(2*array) + + print(f"Test worked!") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/tests/test_configs/torchscript.py b/tests/test_configs/torchscript.py new file mode 100644 index 000000000..ca7ccee71 --- /dev/null +++ b/tests/test_configs/torchscript.py @@ -0,0 +1,4 @@ +# import torch + +def average(x): + return torch.tensor(torch.mean(x)).unsqueeze(0)