diff --git a/mlonmcu/artifact.py b/mlonmcu/artifact.py index d9fe2f5ad..ea56750b8 100644 --- a/mlonmcu/artifact.py +++ b/mlonmcu/artifact.py @@ -18,7 +18,7 @@ # """Artifacts defintions internally used to refer to intermediate results.""" -from enum import Enum +from enum import IntFlag, auto from pathlib import Path from mlonmcu.setup import utils @@ -28,24 +28,25 @@ # TODO: decide if inheritance based scheme would fit better -class ArtifactFormat(Enum): # TODO: ArtifactType, ArtifactKind? +# class ArtifactFormat(Enum): # TODO: ArtifactType, ArtifactKind? +class ArtifactFormat(IntFlag): """Enumeration of artifact types.""" - UNKNOWN = 0 - SOURCE = 1 - TEXT = 2 - MLF = 3 - MODEL = 4 - IMAGE = 5 - DATA = 6 - NUMPY = 7 - PARAMS = 8 - JSON = 9 # TODO: how about YAML or more general: DICT? - PATH = 10 # NOT A DIRECTORY? - RAW = 11 - BIN = 11 - SHARED_OBJECT = 12 # Here: the parent tar archive - ARCHIVE = 13 + UNKNOWN = auto() + SOURCE = auto() + TEXT = auto() + MLF = auto() + MODEL = auto() + IMAGE = auto() + DATA = auto() + NUMPY = auto() + PARAMS = auto() + JSON = auto() # TODO: how about YAML or more general: DICT? + PATH = auto() # NOT A DIRECTORY? + RAW = auto() + BIN = RAW + SHARED_OBJECT = auto() # Here: the parent tar archive + ARCHIVE = auto() def lookup_artifacts(artifacts, name=None, fmt=None, flags=None, first_only=False): @@ -100,12 +101,27 @@ def __init__( self.optional = optional self.validate() + def serialize(self): + return { + "name": self.name, + "content": self.content, + "path": str(self.path) if self.path else None, + "data": self.data, + "raw": self.raw, + "fmt": self.fmt.value, + "flags": list(self.flags), + "archive": self.archive, + "optional": self.optional, + } + + # TODO: unserialize + def __repr__(self): return f"Artifact({self.name}, fmt={self.fmt}, flags={self.flags})" @property def exported(self): - """Returns true if the artifact was writtem to disk.""" + """Returns true if the artifact was written to disk.""" return bool(self.path is not None) def validate(self): diff --git a/mlonmcu/cli/build.py b/mlonmcu/cli/build.py index b8293b842..bd1e2b8b7 100644 --- a/mlonmcu/cli/build.py +++ b/mlonmcu/cli/build.py @@ -22,7 +22,7 @@ from mlonmcu.cli.common import kickoff_runs from mlonmcu.cli.load import handle as handle_load, add_load_options from mlonmcu.context.context import MlonMcuContext -from mlonmcu.session.run import RunStage +from mlonmcu.session.run import RunStage, RunInitializer from mlonmcu.platform.lookup import get_platforms_targets, get_platforms_backends from .helper.parse import ( extract_backend_names, @@ -73,9 +73,12 @@ def _handle(args, context, require_target=False): session = context.sessions[-1] new_runs = [] for run in session.runs: + if isinstance(run, RunInitializer) and run.frozen: + new_runs.append(run) + continue for target_name in targets: for backend_name in backends: - new_run = run.copy() + new_run = run.copy(session=session) if backend_name is not None: platform_name = None for platform in platforms: diff --git a/mlonmcu/cli/common.py b/mlonmcu/cli/common.py index b761b092e..0145d4c2d 100644 --- a/mlonmcu/cli/common.py +++ b/mlonmcu/cli/common.py @@ -22,6 +22,7 @@ import logging import argparse +from mlonmcu.config import str2bool from mlonmcu.platform import get_platforms from mlonmcu.session.postprocess import SUPPORTED_POSTPROCESSES from mlonmcu.feature.features import get_available_feature_names @@ -93,6 +94,14 @@ def add_flow_options(parser): choices=get_available_feature_names(), help="Enabled features for target/framework/backend (choices: %(choices)s)", ) + flow_parser.add_argument( + "--initializer", + type=str, + metavar="INITIALIZER", + nargs="+", + # action="append", + help="List of yml files for initializing runs", + ) flow_parser.add_argument( "-c", "--config", @@ -154,6 +163,11 @@ def add_gen_args(parser, number): action="store_true", help="Display progress bar (default: %(default)s)", ) + flow_parser.add_argument( + "--noop", + action="store_true", + help="Skip processing of runs, just initialize (default: %(default)s)", + ) flow_parser.add_argument( "--resume", action="store_true", @@ -218,16 +232,18 @@ def kickoff_runs(args, until, context): assert len(context.sessions) > 0 session = context.sessions[-1] # session.label = args.label - config = extract_config(args) + config, config_gen = extract_config(args) # TODO: move into context/session per_stage = True print_report = True if "runs_per_stage" in config: - per_stage = bool(config["runs_per_stage"]) + value = config["runs_per_stage"] + per_stage = str2bool(value) if isinstance(value, str) else value elif "runs_per_stage" in context.environment.vars: per_stage = bool(context.environment.vars["runs_per_stage"]) if "print_report" in config: - print_report = bool(config["print_report"]) + value = config["print_report"] + print_report = str2bool(value) if isinstance(value, str) else value elif "print_report" in context.environment.vars: print_report = bool(context.environment.vars["print_report"]) with session: @@ -239,6 +255,7 @@ def kickoff_runs(args, until, context): progress=args.progress, context=context, export=True, + noop=args.noop, ) if not success: logger.error("At least one error occured!") diff --git a/mlonmcu/cli/compile.py b/mlonmcu/cli/compile.py index c01ce233e..79c42e651 100644 --- a/mlonmcu/cli/compile.py +++ b/mlonmcu/cli/compile.py @@ -24,7 +24,7 @@ add_build_options, ) from mlonmcu.context.context import MlonMcuContext -from mlonmcu.session.run import RunStage +from mlonmcu.session.run import RunStage, RunInitializer from mlonmcu.platform.lookup import get_platforms_targets from .helper.parse import extract_target_names, extract_platform_names, extract_config_and_feature_names @@ -54,13 +54,16 @@ def _handle(args, context): session = context.sessions[-1] new_runs = [] for run in session.runs: - if run.target is None: + if isinstance(run, RunInitializer) and run.frozen: + new_runs.append(run) + continue + if not run.has_target(): # assert run.compile_platform is None targets_ = targets else: targets_ = [None] for target_name in targets_: - new_run = run.copy() + new_run = run.copy(session=session) if target_name is not None: platform_name = None for platform in platforms: diff --git a/mlonmcu/cli/load.py b/mlonmcu/cli/load.py index d28d45434..b5772d818 100644 --- a/mlonmcu/cli/load.py +++ b/mlonmcu/cli/load.py @@ -18,6 +18,8 @@ # """Command line subcommand for the load stage.""" +from pathlib import Path + from mlonmcu.cli.common import ( add_common_options, add_context_options, @@ -30,7 +32,7 @@ from mlonmcu.context.context import MlonMcuContext from mlonmcu.models import SUPPORTED_FRONTENDS from mlonmcu.models.lookup import apply_modelgroups -from mlonmcu.session.run import RunStage +from mlonmcu.session.run import RunStage, RunInitializer def add_load_options(parser): @@ -62,11 +64,19 @@ def _handle(args, context): config = context.environment.vars new_config, features, gen_config, gen_features = extract_config_and_feature_names(args, context=context) config.update(new_config) + session = context.get_session(label=args.label, resume=args.resume, config=config) + initializers = args.initializer + if initializers is not None: + for initializer_file in initializers: + initializer_file = Path(initializer_file).resolve() + initializer = RunInitializer.from_file(initializer_file) + session.add_run(initializer, ignore_idx=True) frontends = extract_frontend_names(args, context=context) postprocesses = extract_postprocess_names(args, context=context) - session = context.get_session(label=args.label, resume=args.resume, config=config) models = apply_modelgroups(args.models, context=context) for model in models: + if model == "_": + continue for f in gen_features: for c in gen_config: all_config = {**config, **c} diff --git a/mlonmcu/context/context.py b/mlonmcu/context/context.py index 822ddb1e0..355c5701a 100644 --- a/mlonmcu/context/context.py +++ b/mlonmcu/context/context.py @@ -28,7 +28,7 @@ from mlonmcu.utils import ask_user from mlonmcu.logging import get_logger, set_log_file -from mlonmcu.session.run import Run +from mlonmcu.session.run import Run, ArchivedRun from mlonmcu.session.session import Session from mlonmcu.setup.cache import TaskCache import mlonmcu.setup.utils as utils @@ -186,9 +186,9 @@ def load_recent_sessions(env: Environment, count: int = None) -> List[Session]: run_directory = runs_directory / str(rid) # run_file = run_directory / "run.txt" # run = Run.from_file(run_file) # TODO: actually implement run restore - run = Run() # TODO: fix - run.archived = True - run.dir = run_directory + run = ArchivedRun.from_dir(run_directory) + # run.archived = True + # run.dir = run_directory runs.append(run) session = Session(idx=sid, archived=True, dir=session_directory) session.runs = runs @@ -307,7 +307,12 @@ def __init__(self, name: str = None, path: str = None, deps_lock: str = "write") self.cache = TaskCache() self.export_paths = set() - def create_session(self, label="", config=None): + def create_session(self, label="", config=None, custom_dir=None): + if custom_dir is not None: + logger.debug("Creating a new session with idx %s", idx) + session_dir = Path(custom_dir) + session = Session(idx=None, label=label, dir=session_dir, config=config) + return session try: lock = self.latest_session_link_lock.acquire(timeout=10) except filelock.Timeout as err: @@ -574,3 +579,13 @@ def __exit__(self, exception_type, exception_value, traceback): logger.debug("Releasing lock on context") self.deps_lock.release() return False + + def get_read_only_context(self): + return MlonMcuContextMinimal(self) + + +class MlonMcuContextMinimal: + + def __init__(self, context: MlonMcuContext): + self.environment = context.environment + self.cache = context.cache diff --git a/mlonmcu/environment/environment.py b/mlonmcu/environment/environment.py index ccb8a5e94..87600fe96 100644 --- a/mlonmcu/environment/environment.py +++ b/mlonmcu/environment/environment.py @@ -368,90 +368,14 @@ def __init__(self): PathConfig("./models"), ], } - self.repos = { - "tensorflow": RepoConfig("https://github.com/tensorflow/tensorflow.git", ref="v2.5.2"), - "tflite_micro_compiler": RepoConfig( - "https://github.com/cpetig/tflite_micro_compiler.git", ref="master" - ), # TODO: freeze ref? - "tvm": RepoConfig( - "https://github.com/tum-ei-eda/tvm.git", ref="tumeda" - ), # TODO: use upstream repo with suitable commit? - "utvm_staticrt_codegen": RepoConfig( - "https://github.com/tum-ei-eda/utvm_staticrt_codegen.git", ref="master" - ), # TODO: freeze ref? - "etiss": RepoConfig("https://github.com/tum-ei-eda/etiss.git", ref="master"), # TODO: freeze ref? - } - self.frameworks = [ - FrameworkConfig( - "tflm", - enabled=True, - backends=[ - BackendConfig("tflmc", enabled=True, features=[]), - BackendConfig("tflmi", enabled=True, features=[]), - ], - features=[ - FrameworkFeatureConfig("muriscvnn", framework="tflm", supported=False), - ], - ), - FrameworkConfig( - "utvm", - enabled=True, - backends=[ - BackendConfig( - "tvmaot", - enabled=True, - features=[ - BackendFeatureConfig("unpacked_api", backend="tvmaot", supported=True), - ], - ), - BackendConfig("tvmrt", enabled=True, features=[]), - BackendConfig("tvmcg", enabled=True, features=[]), - ], - features=[ - FrameworkFeatureConfig("memplan", framework="utvm", supported=False), - ], - ), - ] - self.frontends = [ - FrontendConfig("saved_model", enabled=False), - FrontendConfig("ipynb", enabled=False), - FrontendConfig( - "tflite", - enabled=True, - features=[ - FrontendFeatureConfig("packing", frontend="tflite", supported=False), - ], - ), - ] - self.vars = { - "TEST": "abc", - } + self.repos = {} + self.frameworks = [] + self.frontends = [] + self.vars = {} self.flags = {} - self.platforms = [ - PlatformConfig( - "mlif", - enabled=True, - features=[PlatformFeatureConfig("debug", platform="mlif", supported=True)], - ) - ] + self.platforms = [] self.toolchains = {} - self.targets = [ - TargetConfig( - "etiss_pulpino", - features=[ - TargetFeatureConfig("debug", target="etiss_pulpino", supported=True), - TargetFeatureConfig("attach", target="etiss_pulpino", supported=True), - TargetFeatureConfig("trace", target="etiss_pulpino", supported=True), - ], - ), - TargetConfig( - "host_x86", - features=[ - TargetFeatureConfig("debug", target="host_x86", supported=True), - TargetFeatureConfig("attach", target="host_x86", supported=True), - ], - ), - ] + self.targets = [] class UserEnvironment(DefaultEnvironment): diff --git a/mlonmcu/platform/espidf/espidf.py b/mlonmcu/platform/espidf/espidf.py index 4a04ca9f9..c4c06b0d8 100644 --- a/mlonmcu/platform/espidf/espidf.py +++ b/mlonmcu/platform/espidf/espidf.py @@ -125,7 +125,7 @@ def init_directory(self, path=None, context=None): if self.project_dir is not None: self.project_dir.mkdir(exist_ok=True) logger.debug("Project directory already initialized") - return + return self.project_dir dir_name = self.name if path is not None: self.project_dir = Path(path) @@ -146,6 +146,7 @@ def init_directory(self, path=None, context=None): self.project_dir = Path(self.tempdir.name) / dir_name logger.debug("Temporary project directory: %s", self.project_dir) self.project_dir.mkdir(exist_ok=True) + return self.project_dir def get_supported_targets(self): text = self.invoke_idf_exe("--list-targets", live=self.print_outputs) diff --git a/mlonmcu/platform/microtvm/microtvm_base_platform.py b/mlonmcu/platform/microtvm/microtvm_base_platform.py index 28f14afbb..f47fef194 100644 --- a/mlonmcu/platform/microtvm/microtvm_base_platform.py +++ b/mlonmcu/platform/microtvm/microtvm_base_platform.py @@ -123,7 +123,7 @@ def init_directory(self, path=None, context=None): if self.project_dir is not None: self.project_dir.mkdir(exist_ok=True) logger.debug("Project directory already initialized") - return + return self.project_dir dir_name = self.name if path is not None: self.project_dir = Path(path) @@ -144,6 +144,7 @@ def init_directory(self, path=None, context=None): self.project_dir = Path(self.tempdir.name) / dir_name logger.debug("Temporary project directory: %s", self.project_dir) self.project_dir.mkdir(exist_ok=True) + return self.project_dir @property def project_template(self): diff --git a/mlonmcu/platform/mlif/mlif.py b/mlonmcu/platform/mlif/mlif.py index a22241d8e..ca2e30c45 100644 --- a/mlonmcu/platform/mlif/mlif.py +++ b/mlonmcu/platform/mlif/mlif.py @@ -143,7 +143,7 @@ def init_directory(self, path=None, context=None): if self.build_dir is not None: self.build_dir.mkdir(exist_ok=True) logger.debug("Build directory already initialized") - return + return self.build_dir dir_name = self.name if path is not None: self.build_dir = Path(path) @@ -164,6 +164,7 @@ def init_directory(self, path=None, context=None): self.build_dir = Path(self.tempdir.name) / dir_name logger.info("Temporary build directory: %s", self.build_dir) self.build_dir.mkdir(exist_ok=True) + return self.build_dir def create_target(self, name): assert name in self.get_supported_targets(), f"{name} is not a valid MLIF target" @@ -333,7 +334,7 @@ def get_cmake_args(self): return cmakeArgs def prepare(self): - self.init_directory() + pass # TODO: is this used? def prepare_environment(self): env = os.environ.copy() diff --git a/mlonmcu/platform/platform.py b/mlonmcu/platform/platform.py index ece818976..b0dbf4b38 100644 --- a/mlonmcu/platform/platform.py +++ b/mlonmcu/platform/platform.py @@ -58,7 +58,7 @@ def __init__(self, name, features=None, config=None): self.config = filter_config(self.config, self.name, self.DEFAULTS, self.OPTIONAL, self.REQUIRED) self.artifacts = [] - def init_directory(self, path=None, context=None): + def init_directory(self, path=None, context=None) -> Path: raise NotImplementedError @property diff --git a/mlonmcu/platform/tvm/tvm_base_platform.py b/mlonmcu/platform/tvm/tvm_base_platform.py index 15e230753..5161414de 100644 --- a/mlonmcu/platform/tvm/tvm_base_platform.py +++ b/mlonmcu/platform/tvm/tvm_base_platform.py @@ -54,7 +54,7 @@ def init_directory(self, path=None, context=None): if self.project_dir is not None: self.project_dir.mkdir(exist_ok=True) logger.debug("Project directory already initialized") - return + return self.project_dir dir_name = self.name if path is not None: self.project_dir = Path(path) @@ -75,6 +75,7 @@ def init_directory(self, path=None, context=None): self.project_dir = Path(self.tempdir.name) / dir_name logger.debug("Temporary project directory: %s", self.project_dir) self.project_dir.mkdir(exist_ok=True) + return self.project_dir @property def tvmc_custom_script(self): diff --git a/mlonmcu/platform/zephyr/zephyr.py b/mlonmcu/platform/zephyr/zephyr.py index f3103b2c1..aa07aa6a0 100644 --- a/mlonmcu/platform/zephyr/zephyr.py +++ b/mlonmcu/platform/zephyr/zephyr.py @@ -120,7 +120,7 @@ def init_directory(self, path=None, context=None): if self.project_dir is not None: self.project_dir.mkdir(exist_ok=True) logger.debug("Project directory already initialized") - return + return self.project_dir dir_name = self.name if path is not None: self.project_dir = Path(path) @@ -141,6 +141,7 @@ def init_directory(self, path=None, context=None): self.project_dir = Path(self.tempdir.name) / dir_name logger.debug("Temporary project directory: %s", self.project_dir) self.project_dir.mkdir(exist_ok=True) + return self.project_dir def get_supported_targets(self): with tempfile.TemporaryDirectory() as temp: diff --git a/mlonmcu/session/progress.py b/mlonmcu/session/progress.py new file mode 100644 index 000000000..8431b2306 --- /dev/null +++ b/mlonmcu/session/progress.py @@ -0,0 +1,46 @@ +# +# Copyright (c) 2024 TUM Department of Electrical and Computer Engineering. +# +# This file is part of MLonMCU. +# See https://github.com/tum-ei-eda/mlonmcu.git for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Progress bar utilities for MLonMCU session.""" +from tqdm import tqdm + +from mlonmcu.logging import get_logger + +logger = get_logger() + + +def init_progress(total, msg="Processing..."): + """Helper function to initialize a progress bar for the session.""" + return tqdm( + total=total, + desc=msg, + ncols=100, + bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}s]", + leave=None, + ) + + +def update_progress(pbar, count=1): + """Helper function to update the progress bar for the session.""" + pbar.update(count) + + +def close_progress(pbar): + """Helper function to close the session progressbar, if available.""" + if pbar: + pbar.close() diff --git a/mlonmcu/session/rpc.py b/mlonmcu/session/rpc.py new file mode 100644 index 000000000..0be290f18 --- /dev/null +++ b/mlonmcu/session/rpc.py @@ -0,0 +1,340 @@ +# +# Copyright (c) 2024 TUM Department of Electrical and Computer Engineering. +# +# This file is part of MLonMCU. +# See https://github.com/tum-ei-eda/mlonmcu.git for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Definition of MLonMCU rpc utilities.""" +import struct +import socket +from dataclasses import dataclass +from typing import Optional, List +from threading import Thread + +from mlonmcu.session.run import RunInitializer, RunResult, RunStage +import mlonmcu.session.rpc_utils as base + + +@dataclass +class RemoteConfig: + tracker: str = "localhost:9000" + key: str = "default" + + @property + def tracker_host(self): + return self.tracker.split(":")[0] + + @property + def tracker_port(self): + return int(self.tracker.split(":")[1]) + + +class RPCSession(object): + """RPC Client session module + + Do not directly create the object, call connect + """ + + # def __init__(self, sess): + # self._sess = sess + def __init__(self, url, port, key="", session_timeout=0): + # self._sess = sess + # print("__init__") + self.url = url + self.port = port + self.key = key + self.session_timeout = session_timeout + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + # print("_sock.connect") + self._sock.connect((url, port)) + # print("_sock.connected") + + def __del__(self): + self.close() + + def close(self): + """Close the server connection.""" + if self._sock: + self._sock.close() + self._sock = None + + def execute(self, run_initializers: List[RunInitializer], until: RunStage, parallel: int = 1) -> RunResult: + # print("execute") + # TODO: move imports + import codecs + import pickle + import cloudpickle # TODO: update requirements.txt + run_initializers = [codecs.encode(cloudpickle.dumps(x), "base64").decode("utf8") for x in run_initializers] + msg = {"operation": "execute", "run_initializers": run_initializers, "until": until, "parallel": parallel} + # print("msg", msg) + assert self._sock is not None + # TODO: pickle? + base.sendjson(self._sock, msg) + response = base.recvjson(self._sock) + # print("response", response) + # <- {"results": [result0,...]} + assert response is not None + success = response.get("success", None) + assert success is not None + results = response.get("results", None) + assert results is not None + # print("success", success) + assert success, "Session failed!" + # print("r", results) + results = [pickle.loads(codecs.decode(x.encode("utf-8"), "base64")) for x in results] + # print("results", results) + return results + + def upload(self, data, target=None): + """Upload file to remote runtime temp folder + + Parameters + ---------- + data : str or bytearray + The file name or binary in local to upload. + + target : str, optional + The path in remote + """ + raise NotImplementedError + + def download(self, path): + """Download file from remote temp folder. + + Parameters + ---------- + path : str + The relative location to remote temp folder. + + Returns + ------- + blob : bytearray + The result blob from the file. + """ + raise NotImplementedError + + def remove(self, path): + """Remove file from remote temp folder. + + Parameters + ---------- + path: str + The relative location to remote temp folder. + """ + raise NotImplementedError + + def listdir(self, path): + """ls files from remote temp folder. + + Parameters + ---------- + path: str + The relative location to remote temp folder. + + Returns + ------- + dirs: str + The files in the given directory with split token ','. + """ + raise NotImplementedError + + +class TrackerSession: + """Tracker client session. + + Parameters + ---------- + addr : tuple + The address tuple + """ + + def __init__(self, addr): + self._addr = addr + self._sock = None + self._connect() + + def __del__(self): + self.close() + + def _connect(self): + timeout = 10 + self._sock = base.connect_with_retry(self._addr, timeout=timeout) + # TODO: implement magic + # self._sock.sendall(struct.pack(" timeout: + raise RuntimeError(f"Failed to connect to server {str(addr)}") + logger.warning( + f"Cannot connect to tracker {str(addr)}, retry in {retry_period:g} secs..." + ) + time.sleep(retry_period) diff --git a/mlonmcu/session/run.py b/mlonmcu/session/run.py index 7abae9859..6b0a2ed69 100644 --- a/mlonmcu/session/run.py +++ b/mlonmcu/session/run.py @@ -18,9 +18,10 @@ # """Definition of a MLonMCU Run which represents a single benchmark instance for a given set of options.""" import itertools -import os import copy +import shutil import tempfile +from typing import Union, Optional from pathlib import Path from enum import IntEnum from collections import defaultdict @@ -71,6 +72,208 @@ def add_any(new, base=None, append=True): return ret +class RunInitializer: + + @staticmethod + def from_file(src: Union[str, Path]): + if not isinstance(src, Path): + assert isinstance(src, str) + src = Path(src) + fmt = src.suffix + assert len(fmt) > 0 + fmt = fmt[1:].lower() + if fmt in ["yml", "yaml"]: + import yaml + + with open(src, "r") as f: + data = yaml.safe_load(f) + else: + raise ValueError(f"Unsupported format: {fmt}") + assert "runs" in data + runs = data["runs"] + if len(runs) == 0: + raise RuntimeError("Empty run initalizer") + elif len(runs) == 1: + run = runs[0] + else: + raise NotImplementedError("multiple runs per initializer not supported") + initializer = RunInitializer(**run) + initializer.frozen = True + return initializer + + def __init__( + self, + idx=None, + model_name=None, + framework_name=None, + frontend_names=None, + backend_name=None, + target_name=None, + platform_names=None, + feature_names=None, + config=None, + postprocess_names=None, + comment=None, + # from_stage=None, + # frozen=False, + ): + self.idx = idx + self.model_name = model_name + self.frontend_names = frontend_names + self.framework_name = framework_name + self.backend_name = backend_name + self.platform_names = platform_names + self.postprocess_names = postprocess_names + self.comment = comment + self.target_name = target_name + self.config = config + self.feature_names = feature_names + self.frozen = False + + def _serialize(self): + return { + "runs": [ + { + "idx": self.idx, + "model_name": self.model_name, + "frontend_names": self.frontend_names, + "framework_name": self.framework_name, + "backend_name": self.backend_name, + "platform_names": self.platform_names, + "postprocess_names": self.postprocess_names, + "comment": self.comment, + "target_name": self.target_name, + "config": self.config, + "feature_names": self.feature_names, + # not: frozen + } + ] + } + + def save(self, dest: Union[str, Path], fmt: Optional[str] = None): + if not isinstance(dest, Path): + assert isinstance(dest, str) + dest = Path(dest) + data = self._serialize() + if fmt is None: + fmt = dest.suffix + assert fmt + if fmt[0] == ".": + fmt = fmt[1:] + if fmt.lower() in ["yml", "yaml"]: + import yaml + + with open(dest, "w") as f: + yaml.dump(data, f, allow_unicode=True) + else: + raise ValueError(f"Unsupported format: {fmt}") + + def realize(self, context=None): + assert context is not None + run = Run( + idx=self.idx, + config=self.config, + ) + if self.comment: + run.comment = self.comment + if self.feature_names: + run.add_features_by_name(self.feature_names, context=context) + if self.frontend_names: + run.add_frontends_by_name(self.frontend_names, context=context) + if self.model_name: + run.add_model_by_name(self.model_name, context=context) + if self.platform_names: + run.add_platforms_by_name(self.platform_names, context=context) + if self.backend_name: + run.add_backend_by_name(self.backend_name, context=context) + if self.target_name: + run.add_target_by_name(self.target_name, context=context) + if self.postprocess_names: + run.add_postprocesses_by_name(self.postprocess_names, context=context) + return run + + def has_target(self): + return self.target_name is not None + + def add_model_by_name(self, model_name, context=None): + assert self.model_name is None + self.model_name = model_name + + def add_frontend_by_name(self, frontend_name, context=None): + self.add_frontends_by_name([frontend_name]) + + def add_frontends_by_name(self, frontend_names, context=None): + if self.frontend_names is None: + self.frontend_names = [] + self.frontend_names.extend(frontend_names) + + def add_backend_by_name(self, backend_name, context=None): + assert self.backend_name is None + self.backend_name = backend_name + + def add_target_by_name(self, target_name, context=None): + # assert self.target_name is None + self.target_name = target_name + + def add_platform_by_name(self, platform_name, context=None): + self.add_platforms_by_name([platform_name]) + + def add_platforms_by_name(self, platform_names, context=None): + if self.platform_names is None: + self.platform_names = [] + self.platform_names.extend(platform_names) + + def add_postprocess_by_name(self, postprocess_name, context=None): + self.add_postprocesses_by_name([postprocess_name]) + + def add_postprocesses_by_name(self, postprocess_names, context=None): + if self.postprocess_names is None: + self.postprocess_names = [] + self.postprocess_names.extend(postprocess_names) + + def add_feature_by_name(self, feature_name, context=None): + self.add_features_by_name([feature_name]) + + def add_features_by_name(self, feature_names, context=None): + if self.feature_names is None: + self.feature_names = [] + self.feature_names.extend(feature_names) + + def copy(self, session=None): + """Create a new runinitializer based on this instance.""" + new = copy.deepcopy(self) + assert session is not None, "Run.copy() needs session" + if session: + new_idx = session.request_run_idx() + new.idx = new_idx + # self.init_directory() + return new + + +class RunResult: + # def __init__(self, run: "Run", session: "Session"): + def __init__(self, run: "Run"): + self.idx = run.idx + self.dir = run.dir + self.failing = run.failing + self.failed_stage = run.failed_stage + self.reason = run.reason + # self.report = run.get_report(session=session) + self.report = run.get_report() + # self.artifacts_per_stage = {} + # self.stage = RunStage.NOP # max executed stage + # self.completed = {stage: stage == RunStage.NOP for stage in RunStage} + # self.directories = {} + + def get_report(self, session=None): + # TODO: read only?! + # if session is not None: + # pre = self.report.pre_df + # pre["Session"] = session.idx + # self.report.pre_df = pre + return self.report + + class Run: """A run is single model/backend/framework/target combination with a given set of features and configs.""" @@ -106,7 +309,6 @@ def __init__( config=None, # TODO: All config combined or explicit run-config? postprocesses=None, archived=False, - session=None, comment="", ): self.idx = idx @@ -117,14 +319,12 @@ def __init__( self.platforms = platforms if platforms is not None else [] self.artifacts_per_stage = {} self.archived = archived - self.session = session self.postprocesses = postprocesses if postprocesses else [] self.comment = comment # self.stage = RunStage.NOP # max executed stage self.completed = {stage: stage == RunStage.NOP for stage in RunStage} self.directories = {} - # self.init_directory() self.target = target self.cache_hints = [] self.config = config if config else {} @@ -134,13 +334,14 @@ def __init__( self.run_config = filter_config(self.config, "run", self.DEFAULTS, self.OPTIONAL, self.REQUIRED) self.sub_names = [] self.sub_parents = {} - self.result = None + # self.result = None self.failing = False # -> RunStatus self.reason = None self.failed_stage = None # self.lock = threading.Lock() # FIXME: use mutex instead of boolean self.locked = False self.report = None + self.dir = None def process_features(self, features): """Utility which handles postprocess_features.""" @@ -155,6 +356,9 @@ def process_features(self, features): self.run_config = filter_config(tmp_run_config, "run", self.DEFAULTS, self.OPTIONAL, self.REQUIRED) return features + def has_target(self): + return self.target is not None + @property def tune_enabled(self): """Get tune_enabled property.""" @@ -287,33 +491,42 @@ def unlock(self): # self.lock.release() self.locked = False - def init_directory(self): + def init_directory(self, session=None, parent=None): """Initialize the temporary directory for this run.""" - if self.session is None: + if parent is not None: + if not isinstance(parent, Path): + assert isinstance(parent, str) + parent = Path(parent) + assert parent.is_dir() + self.tempdir = None + self.dir = parent / str(self.idx) + elif session is not None: + logger.warning( + "session argument of run.init_directory is deprecated. Please use parent argument in the future." + ) + self.tempdir = None + self.dir = session.runs_dir / str(self.idx) + else: assert not self.archived self.tempdir = tempfile.TemporaryDirectory() self.dir = Path(self.tempdir.name) - else: - self.tempdir = None - self.dir = self.session.runs_dir / str(self.idx) - if not self.dir.is_dir(): - os.mkdir(self.dir) - # This is not a good idea, but else we would need a mutex/lock on the shared build_dir - # A solution would be to split up the framework runtime libs from the mlif... - for platform in self.platforms: # TODO: only do this if needed! (not for every platform) - # The stage_subdirs setting is ignored here because platforms can be multi-stage! - # platform.init_directory(path=Path(self.dir) / platform.name) - if platform in self.directories: - continue - platform_dir = Path(self.dir) / platform.name - if platform.init_directory(path=platform_dir): - self.directories[platform.name] = platform_dir - # if target not in self.directories: - # target_dir = Path(self.dir) /target.name - # if target.init_directory(path=target_dir) - # self.directories[target.name] = target_dir + self.dir.mkdir(exist_ok=True) + # This is not a good idea, but else we would need a mutex/lock on the shared build_dir + # A solution would be to split up the framework runtime libs from the mlif... + for platform in self.platforms: # TODO: only do this if needed! (not for every platform) + # The stage_subdirs setting is ignored here because platforms can be multi-stage! + # platform.init_directory(path=Path(self.dir) / platform.name) + if platform in self.directories: + continue + platform_dir = Path(self.dir) / platform.name + if platform.init_directory(path=platform_dir): + self.directories[platform.name] = platform_dir + # if target not in self.directories: + # target_dir = Path(self.dir) /target.name + # if target.init_directory(path=target_dir) + # self.directories[target.name] = target_dir - # TODO: other components + # TODO: other components def __deepcopy__(self, memo): cls = self.__class__ @@ -326,11 +539,12 @@ def __deepcopy__(self, memo): setattr(result, k, copy.deepcopy(v, memo)) return result - def copy(self): + def copy(self, session=None): """Create a new run based on this instance.""" new = copy.deepcopy(self) - if self.session: - new_idx = self.session.request_run_idx() + assert session is not None, "Run.copy() needs session" + if session: + new_idx = session.request_run_idx() new.idx = new_idx # self.init_directory() return new @@ -494,8 +708,7 @@ def add_model_by_name(self, model_name, context=None): if model is None: if reasons: logger.error("Lookup of model '%s' was not successfull. Reasons: %s", model_name, reasons) - else: - raise RuntimeError(f"Model with name '{model_name}' not found.") + raise RuntimeError(f"Model with name '{model_name}' not found.") self.add_model(model) def add_frontend_by_name(self, frontend_name, context=None): @@ -615,7 +828,7 @@ def frontend(self): @property def artifacts(self): sub = "default" - ret = sum(list(itertools.chain([subs[sub] for stage, subs in self.artifacts_per_stage.items()])), []) + ret = sum(list(itertools.chain([subs[sub] for stage, subs in self.artifacts_per_stage.items() if sub in subs])), []) return ret def get_all_sub_artifacts(self, sub, stage=None): @@ -1091,8 +1304,9 @@ def write_run_file(self): @property def prefix(self): """Get prefix property.""" + session = None # TODO: fix return ( - (f"[session-{self.session.idx}] [run-{self.idx}]" if self.session else f"[run-{self.idx}]") + (f"[session-{session.idx}] [run-{self.idx}]" if session else f"[run-{self.idx}]") if self.idx is not None else "" ) @@ -1180,7 +1394,7 @@ def config_helper(obj, prefix=None): ret.update(config_helper(postprocess)) return ret - def get_report(self): + def get_report(self, session=None): """Returns teh complete report of this run.""" if self.completed[RunStage.POSTPROCESS]: if self.report is not None: @@ -1190,8 +1404,8 @@ def get_report(self): # TODO: config or args for stuff like (session id) and run id as well as detailed features and configs report = Report() pre = {} - if self.session is not None: - pre["Session"] = self.session.idx + if session is not None: + pre["Session"] = session.idx if self.idx is not None: pre["Run"] = self.idx if self.model: @@ -1295,5 +1509,119 @@ def export(self, path=None, optional=False): self.write_run_file() + def result(self): + return RunResult(self) # TODO: session? + + # Everything in Run is serializable except PlatformTargets and PlatformBackends... + # def __getstate__(self): + # state = self.__dict__.copy() + # print("state", state) + # # Don't pickle baz + # # del state["baz"] + # # del state["platforms"] + # del state["target"] + # return state + + # def __setstate__(self, state): + # self.__dict__.update(state) + # # Add baz back since it doesn't exist in the pickle + # # self.baz = 0 + + def save(self, dest: Union[str, Path], fmt: Optional[str] = None): + raise NotImplementedError + # if not isinstance(dest, Path): + # assert isinstance(dest, str) + # dest = Path(dest) + # # data = self._serialize() + # if fmt is None: + # fmt = dest.suffix + # assert fmt + # if fmt[0] == ".": + # fmt = fmt[1:] + # if fmt.lower() in ["pkl", "pickle"]: + # import pickle + # with open(dest, "wb") as f: + # pickle.dump(self, f) + # else: + # raise ValueError(f"Unsupported format: {fmt}") + + def save_artifacts(self, dest: Union[str, Path], fmt: Optional[str] = None): + if not isinstance(dest, Path): + assert isinstance(dest, str) + dest = Path(dest) + artifacts = self.artifacts + if fmt is None: + fmt = dest.suffix + assert fmt + if fmt[0] == ".": + fmt = fmt[1:] + if fmt.lower() in ["pkl", "pickle"]: + data = {"artifacts": artifacts} + import pickle + + with open(dest, "wb") as f: + pickle.dump(data, f) + elif fmt.lower() in ["yml", "yaml"]: + data = {"artifacts": [artifact.serialize() for artifact in artifacts]} + import yaml + + with open(dest, "w") as f: + yaml.dump(data, f, allow_unicode=True) + else: + raise ValueError(f"Unsupported format: {fmt}") + + def cleanup_artifacts(self, dirs: bool = False): + artifacts = self.artifacts + for artifact in artifacts: + if artifact.fmt in [ArtifactFormat.ARCHIVE]: + logger.warning("Can not cleanup unzipped archives!") + if not artifact.exported: + continue + assert artifact.path.is_absolute() + if artifact.path.is_file(): + artifact.path.unlink() + elif artifact.path.is_dir() and dirs: + shutil.rmtree(artifact.path) + else: + assert False + + def cleanup_directories(self): + for directory in self.directories.values(): + assert directory.is_absolute() + if not directory.is_dir(): + continue + shutil.rmtree(directory) + + def initializer(self): + return RunInitializer( + idx=self.idx, + model_name=self.model.name, + framework_name=self.framework.name, + frontend_names=[frontend.name for frontend in self.frontends], + backend_name=self.backend.name, + target_name=self.target.name, + platform_names=[platform.name for platform in self.platforms], + feature_names=[feature.name for feature in self.features], + config={**self.config}, + postprocess_names=[postprocess.name for postprocess in self.postprocesses], + comment=self.comment, + ) + + +class ArchivedRun(Run): + + def __init__(self, TODO): + pass + + @staticmethod + def from_file(path: Union[Path, str]): + # TODO: yml, yaml, txt, tar, zip + pass + + @staticmethod + def from_dir(path: Union[Path, str]): + pass -# TODO: implement close()? and use closing contextlib? + @staticmethod + def restore(self): + pass diff --git a/mlonmcu/session/schedule.py b/mlonmcu/session/schedule.py new file mode 100644 index 000000000..c8832e100 --- /dev/null +++ b/mlonmcu/session/schedule.py @@ -0,0 +1,691 @@ +# +# Copyright (c) 2024 TUM Department of Electrical and Computer Engineering. +# +# This file is part of MLonMCU. +# See https://github.com/tum-ei-eda/mlonmcu.git for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Definition of MLonMCU session schedulers.""" +import random +import concurrent.futures +from pathlib import Path +from typing import List, Optional + +# from mlonmcu.context.context import MlonMcuContext +from mlonmcu.session.run import Run, RunInitializer, RunResult, RunStage +from mlonmcu.logging import get_logger +from mlonmcu.setup import utils + +from .postprocess.postprocess import SessionPostprocess +from .progress import init_progress, update_progress, close_progress +from .rpc import connect_tracker, RemoteConfig + +logger = get_logger() # TODO: rename to get_mlonmcu_logger + + +def chunks(lst, n): + """Yield successive n-sized chunks from lst.""" + for i in range(0, len(lst), n): + yield lst[i:i + n] + + +# class SessionExecutor(ABC): +# def submit_runs(self, ?): +# raise NotImplementedError + + +class ProcessPoolSessionExecutor(concurrent.futures.ProcessPoolExecutor): + + def __init__(self, max_workers: Optional[int] = None): + super().__init__(max_workers) + + def submit_runs( + self, + runs, + until=None, + skip=None, + export=False, + context=None, + runs_dir=None, + save=True, + cleanup=False, + ): + fn = _process_pickable + args = [ + runs, + ] + kwargs = { + "until": until, + "skip": skip, + "export": export, + "context": context, + "runs_dir": runs_dir, + "save": save, + "cleanup": cleanup, + } + return self.submit(fn, *args, **kwargs) + + +class ThreadPoolSessionExecutor(concurrent.futures.ThreadPoolExecutor): + def __init__(self, max_workers: Optional[int] = None): + super().__init__(max_workers) + + def submit_runs( + self, + runs, + until=None, + skip=None, + export=False, + context=None, + runs_dir=None, + save=True, + cleanup=False, + ): + fn = _process_default + args = [ + runs, + ] + kwargs = { + "until": until, + "skip": skip, + "export": export, + "context": context, + "runs_dir": runs_dir, + "save": save, + "cleanup": cleanup, + } + return self.submit(fn, *args, **kwargs) + + +class CmdlineSessionExecutor(concurrent.futures.ProcessPoolExecutor): + + def __init__(self, max_workers: Optional[int] = None, parallel_jobs: int = 1): + super().__init__(max_workers) + self.parallel_jobs = parallel_jobs + + # def submit_batch(self, fn, /, *args, **kwargs): + # fn = + # def get_process(self): + # TODO: move args + def submit_runs( + self, + runs, + until=None, + skip=None, + export=False, + context=None, + runs_dir=None, + save=True, + cleanup=False, + ): + fn = _process_cmdline + args = [ + runs, + ] + kwargs = { + "until": until, + "skip": skip, + "export": export, + "context": context, + "runs_dir": runs_dir, + "save": save, + "cleanup": cleanup, + "parallel_jobs": self.parallel_jobs, + } + return self.submit(fn, *args, **kwargs) + + +class ContextSessionExecutor(concurrent.futures.ProcessPoolExecutor): + + def __init__(self, max_workers: Optional[int] = None, parallel_jobs: int = 1): + super().__init__(max_workers) + self.parallel_jobs = parallel_jobs + + def submit_runs( + self, + runs, + until=None, + skip=None, + export=False, + context=None, + runs_dir=None, + save=True, + cleanup=False, + ): + fn = _process_context + # TODO: do not realize jobs (just dump initializer) + args = [ + runs, + ] + kwargs = { + "until": until, + "skip": skip, + "export": export, + # "context": context, + "runs_dir": runs_dir, + "save": save, + "cleanup": cleanup, + "parallel_jobs": self.parallel_jobs, + } + return self.submit(fn, *args, **kwargs) + + +class RPCSessionExecutor(concurrent.futures.ProcessPoolExecutor): + + def __init__( + self, + max_workers: Optional[int] = None, + remote_config: Optional[RemoteConfig] = None, + blocking: bool = True, + # batch_size? + parallel_jobs: int = 1 + ): + super().__init__(max_workers) + self.remote_config = remote_config + self.blocking = blocking + self.parallel_jobs = parallel_jobs + + def submit_runs( + self, + runs, + until=None, + skip=None, + export=False, + context=None, + runs_dir=None, + save=True, + cleanup=False, + ): + fn = _process_rpc + args = [ + runs, + ] + kwargs = { + "until": until, + # "skip": skip, + # "export": export, + # "context": context, + # "runs_dir": runs_dir, + # "save": save, + # "cleanup": cleanup, + "parallel_jobs": self.parallel_jobs, + "rpc_config": self.remote_config, + } + return self.submit(fn, *args, **kwargs) + + +def _handle_context(context, allow_none: bool = False, minimal: bool = False): + # TODO: handle (thread_pool, process_pool, remote, hybrid) + if context is None: + assert allow_none, "Missing context" + return None + if minimal: + return context.get_read_only_context() + return context + + +def _process_default(runs, until, skip, export, context, runs_dir, save, cleanup): + """Helper function to invoke the run.""" + assert isinstance(runs, list) + rets = [] + for run in runs: + run.process(until=until, skip=skip, export=export) + ret = run.result() + rets.append(ret) + if save: + # run.save(run.dir / "run.pkl") + # run.save_artifacts(run.dir / "artifacts.pkl") + run.save_artifacts(run.dir / "artifacts.yml") + if cleanup: + run.cleanup_artifacts(dirs=True) + run.cleanup_directories() + return rets + + +def _process_pickable(run_initializers, until, skip, export, context, runs_dir, save, cleanup): + """Helper function to invoke the run.""" + rets = [] + for run_initializer in run_initializers: + run = run_initializer.realize(context=context) + run.init_directory(parent=runs_dir) + run_initializer.save(run.dir / "initializer.yml") + used_stages = _used_stages([run], until) + assert skip is None + skip = [stage for stage in RunStage if stage not in used_stages] + run.process(until=until, skip=skip, export=export) + ret = run.result() + rets.append(ret) + save = True + if save: + # run.save(run.dir / "run.pkl") + # run.save_artifacts(run.dir / "artifacts.pkl") + run.save_artifacts(run.dir / "artifacts.yml") + if cleanup: + run.cleanup_artifacts(dirs=True) + run.cleanup_directories() + return rets + + +def _process_cmdline(run_initializers, until, skip, export, context, runs_dir, save, cleanup, parallel_jobs): + """Helper function to invoke the run.""" + rets = [] + # parallel_jobs = 1 + args = ["-m", "mlonmcu.cli.main", "flow", until.name.lower(), "_", "--parallel", str(parallel_jobs), "-c", "runs_per_stage=0", "-c", "session.use_init_stage=1", "--initializer"] + for run_initializer in run_initializers: + run = run_initializer.realize(context=context) + run.init_directory(parent=runs_dir) + run_initializer.save(run.dir / "initializer.yml") + args.append(run.dir / "initializer.yml") + res = None # TODO + rets.append(res) + # print("args", args) + # out = utils.python(*args) + _ = utils.python(*args) + # print("out", out) + return rets + + +def _process_context(run_initializers, until, skip, export, runs_dir, save, cleanup, parallel_jobs): + """Helper function to invoke the run.""" + rets = [] + # TODO: allow overriding home + from mlonmcu.context.context import MlonMcuContext + with MlonMcuContext(deps_lock="read") as context: + with context.get_session(resume=False) as session: + for run_initializer in run_initializers: + session.add_run(run_initializer, ignore_idx=True) + rets.append(None) + success = session.process_runs( + until=until, + per_stage=False, + print_report=False, + num_workers=parallel_jobs, + progress=False, + context=context, + export=True, + ) + assert success + return rets + + +def _process_rpc(run_initializers, until, parallel_jobs, rpc_config): + """Helper function to invoke the run.""" + # print("_process_rpc") + # TODO: allow overriding home + assert rpc_config.key is not None + tracker = connect_tracker(rpc_config.tracker_host, rpc_config.tracker_port, check=True) + # print("tracker", tracker) + server = tracker.request_server(key=rpc_config.key) + # print("server", server) + results = server.execute(run_initializers=run_initializers, until=until, parallel=parallel_jobs) + # print("results", results) + tracker.free_server(server) + # -> msg: {"action": "execute", "initializers": run_initializers, "until": until, "parallel": parallel_jobs} + # <- msg: {"action": "response", "results": results} + return results + + +def _postprocess_default(runs, report, dest, progress=False): + session_postprocesses = [] + num_failing = 0 + for run in runs: + for postprocess in run.postprocesses: + if isinstance(postprocess, SessionPostprocess): + if postprocess.name not in [p.name for p in session_postprocesses]: + session_postprocesses.append(postprocess) + if progress: + pbar = init_progress(len(session_postprocesses), msg="Postprocessing session") + for postprocess in session_postprocesses: + try: + artifacts = postprocess.post_session(report) + except Exception as e: + logger.exception(e) + num_failing += 1 + break + if progress: + update_progress(pbar) + if artifacts is not None: + for artifact in artifacts: + # Postprocess has an artifact: write to disk! + logger.debug("Writing postprocess artifact to disk: %s", artifact.name) + artifact.export(dest) + if progress: + close_progress(pbar) + return num_failing + + +def _postprocess_pickable(runs, report, dest, progress=False): + logger.error("Session Postprocesses are not supported in pickable mode!") + return 0 + + +# TODO: alternative _process functions + + +def _used_stages(runs, until): + """Determines the stages which are used by at least one run.""" + used = [] + for stage_index in list(range(RunStage.LOAD, until + 1)) + [RunStage.POSTPROCESS]: + stage = RunStage(stage_index) + if any(run.has_stage(stage) for run in runs): + used.append(stage) + return used + + +class SessionScheduler: + """TODO""" + + def __init__( + self, + runs: List[Run], + until: RunStage = RunStage.DONE, + per_stage: bool = False, + progress: bool = False, + executor: str = "thread_pool", + num_workers: int = 1, + shuffle: bool = False, + batch_size: int = 1, + parallel_jobs: int = 1, + remote_config: Optional[RemoteConfig] = None, + use_init_stage: bool = False, + prefix: Optional[str] = None, + runs_dir: Optional[Path] = None, + session=None, # TODO: typing + ): + self.runs = runs + self.results = [None] * len(runs) + self.until = until + self.per_stage = per_stage + self.progress = progress + self.executor = executor + self.num_workers = num_workers + self.parallel_jobs = parallel_jobs + self._executor_cls, self._executor_kwargs = self._handle_executor(executor, remote_config) + self.shuffle = shuffle + self.batch_size = batch_size + self.prefix = session.prefix if session is not None else prefix + self.runs_dir = session.runs_dir if session is not None else runs_dir + self.use_init_stage = use_init_stage + self._futures = [] + # TODO: contextmanager? + self.num_failures = 0 + self.stage_failures = {} + # worker_run_idx = [] + # self._future_run_idx = {} + self._future_batch_idx = {} + self._batch_run_idxs = {} + self._check() + self.used_stages, self.skipped_stages = self.prepare() + # self._process, self._postprocess = self._pick_process() + _, self._postprocess = self._pick_process() + + def _reset_futures(self): + self._futures = [] + # self._future_run_idx = {} + self._future_batch_idx = {} + self._batch_run_idxs = {} + + def _handle_executor(self, name: str, remote_config: Optional[RemoteConfig] = None): + # TODO: handle (thread_pool, process_pool, remote, hybrid) + EXECUTOR_LOOKUP = { + "thread_pool": ThreadPoolSessionExecutor, + "process_pool": ProcessPoolSessionExecutor, + "popen_pool": None, # TODO + "cmdline": CmdlineSessionExecutor, + "context": ContextSessionExecutor, + "rpc": RPCSessionExecutor, + } + ret = EXECUTOR_LOOKUP.get(name, None) + assert ret is not None, f"Executor not found: {name}" + kwargs = {"max_workers": self.num_workers} + if name in ["cmdline", "context", "rpc"]: + kwargs["parallel_jobs"] = self.parallel_jobs + if name in ["rpc"]: + kwargs["blocking"] = True + kwargs["remote_config"] = remote_config + else: + assert self.parallel_jobs == 1 + return ret, kwargs + + @property + def _prefix(self): + return f"{self.prefix} " if self.prefix else "" + + @property + def use_batches(self): + return self.batch_size > 1 + + def _pick_process(self): + ret = None + ret2 = _postprocess_default + needs_pickable = self.executor in ["process_pool", "cmdline", "context", "rpc"] + if needs_pickable: + # ret = _process_pickable + ret2 = _postprocess_pickable + # elif self.executor == "cmdline": + # ret = _process_cmdline + return ret, ret2 + + def _check(self): + has_initializer = False + for run in self.runs: + if isinstance(run, RunInitializer): + has_initializer = True + break + if has_initializer: + assert self.executor in ["process_pool", "cmdline", "context", "rpc"] or self.use_init_stage + # raise RuntimeError("RunInitializer needs init stage or process_pool executor") # TODO: change default + if self.executor in ["process_pool", "cmdline", "context", "rpc"]: + # assert not self.progress, "progress bar not supported if session.process_pool=1" + assert not self.per_stage, "per stage not supported if session.process_pool=1" + assert not self.use_init_stage, "use_init_stage not supported if session.process_pool=1" + + def prepare(self): + if self.executor in ["process_pool", "cmdline", "context", "rpc"] or self.use_init_stage: + return None, None # TODO + used_stages = _used_stages(self.runs, self.until) + skipped_stages = [stage for stage in RunStage if stage not in used_stages] + return used_stages, skipped_stages + + @property + def num_runs(self): + return len(self.runs) + + @property + def num_success(self): + return self.num_runs - self.num_failures + + def reset(self): + raise NotImplementedError(".reset() not implemented") + + def _join_futures(self, pbar): + """Helper function to collect all worker threads.""" + for f in concurrent.futures.as_completed(self._futures): + res = None + failing = False + batch_res = None + try: + batch_res = f.result() + assert isinstance(batch_res, list) + except Exception as e: + failing = True + logger.exception(e) + logger.error("An exception was thrown by a worker during simulation") + if self.progress: + update_progress(pbar) + batch_index = self._future_batch_idx[f] + run_idxs = self._batch_run_idxs[batch_index] + if failing: + assert batch_res is None + self.num_failures += len(run_idxs) + failed_stage = None + if failed_stage in self.stage_failures: + self.stage_failures[failed_stage] += run_idxs + else: + self.stage_failures[failed_stage] = [*run_idxs] + else: + assert len(batch_res) == len(run_idxs) + for res_idx, res in enumerate(batch_res): + if res is not None: + assert isinstance(res, RunResult), "Expected RunResult type" + if False: # Does not work if offloaded + run_index = res.idx + assert run_index == run_idxs[res_idx] + else: + run_index = run_idxs[res_idx] + res.idx = run_index + # run = res + # self.runs[run_index] = res + self.results[run_index] = res + else: + assert False, "Should not be used?" + run = self.runs[run_index] + if res.failing: + self.num_failures += 1 + failed_stage = RunStage(run.next_stage).name if isinstance(run, Run) else None # TODO + if failed_stage in self.stage_failures: + self.stage_failures[failed_stage].append(run_index) + else: + self.stage_failures[failed_stage] = [run_index] + self._reset_futures() + if self.progress: + close_progress(pbar) + + def initialize(self, context): + runs = [] + if self.progress: + pbar = init_progress(self.num_runs, msg="Initializing all runs") + for run_initializer in self.runs: + assert isinstance(run_initializer, RunInitializer) + run = run_initializer.realize(context=context) + run.init_directory(parent=self.runs_dir) + run_initializer.save(run.dir / "initializer.yml") + runs.append(run) + if self.progress: + update_progress(pbar) + self.runs = runs + if self.used_stages is None: + assert self.skipped_stages is None + self.used_stages = _used_stages(self.runs, self.until) + self.skipped_stages = [stage for stage in RunStage if stage not in self.used_stages] + if self.progress: + close_progress(pbar) + + def process( + self, + export=False, + context=None, + ): + pbar = None # Outer progress bar + pbar2 = None # Inner progress bar + context_ = _handle_context(context, minimal=True) + + # TODO: expose + save = True + cleanup = False # incompatible with per_stage + + if self.use_init_stage: + self.initialize(context) + + run_it = [*self.runs] + if self.shuffle: + run_it = sorted(run_it, key=lambda _: random.random()) + batches = list(chunks(run_it, self.batch_size)) + # TODO: per stage batching? + with self._executor_cls(**self._executor_kwargs) as executor: + if self.per_stage: + assert self.used_stages is not None + if self.progress: + pbar2 = init_progress(len(self.used_stages), msg="Processing stages") + for stage in self.used_stages: + run_stage = RunStage(stage).name + if self.progress: + pbar = init_progress(len(batches), msg=f"Processing stage {run_stage}") + else: + logger.info("%sProcessing stage %s", self._prefix, run_stage) + for b, runs in enumerate(batches): + runs_ = [run for run in runs if not run.failing] + idxs = [run.idx for run in runs_] + assert len(runs) > 0 + if len(runs_) < len(runs): + logger.warning("Skiping stage '%s' for failed run", run_stage) + else: + f = executor.submit_runs( + runs_, + until=stage, + skip=self.skipped_stages, + export=export, + context=context_, + runs_dir=self.runs_dir, + save=save, + cleanup=cleanup, + ) + self._futures.append(f) + # self._future_run_idx[f] = i + self._future_batch_idx[f] = b + self._batch_run_idxs[b] = idxs + self._join_futures(pbar) + if self.progress: + update_progress(pbar2) + if self.progress: + close_progress(pbar2) + else: + if self.progress: + pbar = init_progress(len(batches), msg="Processing batches" if self.use_batches else "Processing all runs") + else: + logger.info(self.prefix + "Processing all stages") + for b, runs in enumerate(batches): + idxs = [run.idx for run in runs] + assert len(runs) > 0 + f = executor.submit_runs( + runs, + until=self.until, + skip=self.skipped_stages, + export=export, + context=context_, + runs_dir=self.runs_dir, + save=save, + cleanup=cleanup, + ) + self._futures.append(f) + # self._future_run_idx[f] = i + self._future_batch_idx[f] = b + self._batch_run_idxs[b] = idxs + self._join_futures(pbar) + return self.runs, self.results + # return num_failures == 0 + + def postprocess(self, report, dest): + logger.info("Postprocessing session report") + # Warning: currently we only support one instance of the same type of postprocess, + # also it will be applied to all rows! + num_failing = self._postprocess(self.runs, report, dest, progress=self.progress) + self.num_failures += num_failing + return report + + def print_summary(self): + if self.num_failures == 0: + logger.info("All runs completed successfuly!") + elif self.num_failures == self.num_runs: + logger.error("All runs have failed to complete!") + else: + logger.warning("%d out or %d runs completed successfully!", self.num_success, self.num_runs) + summary = "\n".join( + [ + f"\t{stage}: \t{len(failed)} failed run(s): " + " ".join([str(idx) for idx in failed]) + for stage, failed in self.stage_failures.items() + if len(failed) > 0 + ] + ) + logger.info("Summary:\n%s", summary) diff --git a/mlonmcu/session/session.py b/mlonmcu/session/session.py index 6c25167de..47d0b0911 100644 --- a/mlonmcu/session/session.py +++ b/mlonmcu/session/session.py @@ -21,21 +21,20 @@ import shutil import filelock import tempfile -import multiprocessing +from typing import Optional, Union from datetime import datetime from enum import Enum from pathlib import Path -import concurrent.futures -from tqdm import tqdm - -from mlonmcu.session.run import Run +from mlonmcu.session.run import Run, RunInitializer, RunResult from mlonmcu.logging import get_logger from mlonmcu.report import Report from mlonmcu.config import filter_config +from mlonmcu.config import str2bool -from .postprocess.postprocess import SessionPostprocess from .run import RunStage +from .rpc import RemoteConfig +from .schedule import SessionScheduler logger = get_logger() # TODO: rename to get_mlonmcu_logger @@ -54,6 +53,15 @@ class Session: DEFAULTS = { "report_fmt": "csv", + # "process_pool": False, + "executor": "thread_pool", + "use_init_stage": False, + # "cleanup_runs": False, + "shuffle": False, + "batch_size": 1, # TODO: auto + "parallel_jobs": 1, + "rpc_tracker": None, + "rpc_key": None, } def __init__(self, label="", idx=None, archived=False, dir=None, config=None): @@ -68,6 +76,7 @@ def __init__(self, label="", idx=None, archived=False, dir=None, config=None): self.opened_at = None self.closed_at = None self.runs = [] + self.results = [] self.report = None self.next_run_idx = 0 self.archived = archived @@ -100,23 +109,98 @@ def report_fmt(self): """get report_fmt property.""" return str(self.config["report_fmt"]) + # @property + # def process_pool(self): + # """get process_pool property.""" + # value = self.config["process_pool"] + # return str2bool(value) if not isinstance(value, (bool, int)) else value + + @property + def executor(self): + """get executor property.""" + return str(self.config["executor"]) + + @property + def use_init_stage(self): + """get use_init_stage property.""" + value = self.config["use_init_stage"] + return str2bool(value) if not isinstance(value, (bool, int)) else value + + @property + def shuffle(self): + """get shuffle property.""" + value = self.config["shuffle"] + return str2bool(value) if not isinstance(value, (bool, int)) else value + + @property + def batch_size(self): + """get batch_size property.""" + return int(self.config["batch_size"]) + + @property + def parallel_jobs(self): + """get parallel_jobs property.""" + return int(self.config["parallel_jobs"]) + + @property + def rpc_tracker(self): + """get rpc_tracker property.""" + return self.config["rpc_tracker"] + + @property + def rpc_key(self): + """get rpc_key property.""" + return self.config["rpc_key"] + + @property + def needs_initializer(self): + """TODO""" + return self.executor in ["process_pool", "cmdline", "context", "rpc"] or self.use_init_stage + def create_run(self, *args, **kwargs): """Factory method to create a run and add it to this session.""" idx = len(self.runs) logger.debug("Creating a new run with id %s", idx) - run = Run(*args, idx=idx, session=self, **kwargs) + if self.needs_initializer: + run = RunInitializer(*args, idx=idx, **kwargs) + else: + run = Run(*args, idx=idx, **kwargs) self.runs.append(run) return run + def add_run(self, run: Union[Run, RunInitializer], ignore_idx: bool = True): + """TODO.""" + if ignore_idx: + idx = len(self.runs) + else: + raise NotImplementedError + if isinstance(run, RunInitializer): + self.runs.append(run) + elif isinstance(run, Run): + raise NotImplementedError + else: + assert False + logger.debug("Importing run with id %s", idx) + # def update_run(self): # TODO TODO # pass - def get_reports(self): + def get_reports(self, results: Optional[RunResult] = None): """Returns a full report which includes all runs in this session.""" if self.report: return self.report - reports = [run.get_report() for run in self.runs] + if results is None: + logger.warning("Use of session.get_reports without args is deprecated. Please pass list of RunResults!") + + reports = [run.get_report(session=self) for run in self.runs if not isinstance(run, RunInitializer)] + else: + assert isinstance(results, list) + if len(results) == 0: + logger.warning("The session results are empty") + # TODO: warn if None! + reports = [res.get_report(session=self) for res in results if res is not None] + merged = Report() merged.add(reports) return merged @@ -126,14 +210,19 @@ def enumerate_runs(self): # Find start index max_idx = -1 for run in self.runs: - if run.archived: + if not isinstance(run, RunInitializer) and run.archived: max_idx = max(max_idx, run.idx) run_idx = max_idx + 1 last_run_idx = None for run in self.runs: - if not run.archived: + if isinstance(run, RunInitializer): run.idx = run_idx - run.init_directory() + run_idx += 1 + last_run_idx = run.idx + elif not run.archived: + run.idx = run_idx + # run.init_directory(session=self) + run.init_directory(parent=self.runs_dir) run_idx += 1 last_run_idx = run.idx self.next_run_idx = run_idx @@ -162,186 +251,49 @@ def process_runs( progress=False, export=False, context=None, + noop=False, ): """Process a runs in this session until a given stage.""" # TODO: Add configurable callbacks for stage/run complete assert self.active, "Session needs to be opened first" + assert len(self.runs), "List of runs is empty" self.enumerate_runs() self.report = None assert num_workers > 0, "num_workers can not be < 1" - workers = [] - # results = [] - workers = [] - pbar = None # Outer progress bar - pbar2 = None # Inner progress bar - num_runs = len(self.runs) - num_failures = 0 - stage_failures = {} - worker_run_idx = [] - - def _init_progress(total, msg="Processing..."): - """Helper function to initialize a progress bar for the session.""" - return tqdm( - total=total, - desc=msg, - ncols=100, - bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}s]", - leave=None, + remote_config = None + if self.rpc_tracker: + assert self.executor == "rpc" + remote_config = RemoteConfig( + self.rpc_tracker, + self.rpc_key ) - - def _update_progress(pbar, count=1): - """Helper function to update the progress bar for the session.""" - pbar.update(count) - - def _close_progress(pbar): - """Helper function to close the session progressbar, if available.""" - if pbar: - pbar.close() - - def _process(pbar, run, until, skip): - """Helper function to invoke the run.""" - run.process(until=until, skip=skip, export=export) - if progress: - _update_progress(pbar) - - def _join_workers(workers): - """Helper function to collect all worker threads.""" - nonlocal num_failures - results = [] - for i, w in enumerate(workers): - try: - results.append(w.result()) - except Exception as e: - logger.exception(e) - logger.error("An exception was thrown by a worker during simulation") - run_index = worker_run_idx[i] - run = self.runs[run_index] - if run.failing: - num_failures += 1 - failed_stage = RunStage(run.next_stage).name - if failed_stage in stage_failures: - stage_failures[failed_stage].append(run_index) - else: - stage_failures[failed_stage] = [run_index] - if progress: - _close_progress(pbar) - return results - - def _used_stages(runs, until): - """Determines the stages which are used by at least one run.""" - used = [] - for stage_index in list(range(RunStage.LOAD, until + 1)) + [RunStage.POSTPROCESS]: - stage = RunStage(stage_index) - if any(run.has_stage(stage) for run in runs): - used.append(stage) - return used - - used_stages = _used_stages(self.runs, until) - skipped_stages = [stage for stage in RunStage if stage not in used_stages] - - with concurrent.futures.ThreadPoolExecutor(num_workers) as executor: - if per_stage: - if progress: - pbar2 = _init_progress(len(used_stages), msg="Processing stages") - for stage in used_stages: - run_stage = RunStage(stage).name - if progress: - pbar = _init_progress(len(self.runs), msg=f"Processing stage {run_stage}") - else: - logger.info("%s Processing stage %s", self.prefix, run_stage) - for i, run in enumerate(self.runs): - if i == 0: - total_threads = min(len(self.runs), num_workers) - cpu_count = multiprocessing.cpu_count() - if (stage == RunStage.COMPILE) and run.compile_platform: - total_threads *= run.compile_platform.num_threads - if total_threads > 2 * cpu_count: - if pbar2: - print() - logger.warning( - "The chosen configuration leads to a maximum of %d threads being" - + " processed which heavily exceeds the available CPU resources (%d)." - + " It is recommended to lower the value of 'mlif.num_threads'!", - total_threads, - cpu_count, - ) - if run.failing: - logger.warning("Skiping stage '%s' for failed run", run_stage) - else: - worker_run_idx.append(i) - workers.append(executor.submit(_process, pbar, run, until=stage, skip=skipped_stages)) - _join_workers(workers) - workers = [] - worker_run_idx = [] - if progress: - _update_progress(pbar2) - if progress: - _close_progress(pbar2) - else: - if progress: - pbar = _init_progress(len(self.runs), msg="Processing all runs") - else: - logger.info(self.prefix + "Processing all stages") - for i, run in enumerate(self.runs): - if i == 0: - total_threads = min(len(self.runs), num_workers) - cpu_count = multiprocessing.cpu_count() - if ( - (until >= RunStage.COMPILE) - and run.compile_platform is not None - and run.compile_platform.name == "mlif" - ): - total_threads *= ( - run.compile_platform.num_threads - ) # TODO: This should also be used for non-mlif platforms - if total_threads > 2 * cpu_count: - if pbar2: - print() - logger.warning( - "The chosen configuration leads to a maximum of %d being processed which" - + " heavily exceeds the available CPU resources (%d)." - + " It is recommended to lower the value of 'mlif.num_threads'!", - total_threads, - cpu_count, - ) - worker_run_idx.append(i) - workers.append(executor.submit(_process, pbar, run, until=until, skip=skipped_stages)) - _join_workers(workers) - if num_failures == 0: - logger.info("All runs completed successfuly!") - elif num_failures == num_runs: - logger.error("All runs have failed to complete!") else: - num_success = num_runs - num_failures - logger.warning("%d out or %d runs completed successfully!", num_success, num_runs) - summary = "\n".join( - [ - f"\t{stage}: \t{len(failed)} failed run(s): " + " ".join([str(idx) for idx in failed]) - for stage, failed in stage_failures.items() - if len(failed) > 0 - ] - ) - logger.info("Summary:\n%s", summary) - - report = self.get_reports() - logger.info("Postprocessing session report") - # Warning: currently we only support one instance of the same type of postprocess, - # also it will be applied to all rows! - session_postprocesses = [] - for run in self.runs: - for postprocess in run.postprocesses: - if isinstance(postprocess, SessionPostprocess): - if postprocess.name not in [p.name for p in session_postprocesses]: - session_postprocesses.append(postprocess) - for postprocess in session_postprocesses: - artifacts = postprocess.post_session(report) - if artifacts is not None: - for artifact in artifacts: - # Postprocess has an artifact: write to disk! - logger.debug("Writting postprocess artifact to disk: %s", artifact.name) - artifact.export(self.dir) + assert self.executor != "rpc" + scheduler = SessionScheduler( + self.runs, + until, + executor=self.executor, + per_stage=per_stage, + progress=progress, + num_workers=num_workers, + use_init_stage=self.use_init_stage, + session=self, + shuffle=self.shuffle, + batch_size=self.batch_size, + parallel_jobs=self.parallel_jobs, + remote_config=remote_config, + ) + if noop: + logger.info(self.prefix + "Skipping processing of runs") + scheduler.initialize(context=context) + return 0 + + self.runs, self.results = scheduler.process(export=export, context=context) + report = self.get_reports(results=self.results) + scheduler.print_summary() + report = scheduler.postprocess(report, dest=self.dir) report_file = Path(self.dir) / f"report.{self.report_fmt}" report.export(report_file) results_dir = context.environment.paths["results"].path @@ -352,7 +304,7 @@ def _used_stages(runs, until): if print_report: logger.info("Report:\n%s", str(report.df)) - return num_failures == 0 + return scheduler.num_failures == 0 def discard(self): """Discard a run and remove its directory.""" @@ -388,7 +340,7 @@ def failing(self): return False def open(self): - """Open this run.""" + """Open this session.""" self.status = SessionStatus.OPEN self.opened_at = datetime.now() if dir is None: diff --git a/mlonmcu/target/riscv/etiss.py b/mlonmcu/target/riscv/etiss.py index ad3fc6b4e..c6c740690 100644 --- a/mlonmcu/target/riscv/etiss.py +++ b/mlonmcu/target/riscv/etiss.py @@ -534,71 +534,73 @@ def _handle_exit(code, out=None): metrics = Metrics() self.parse_stdout(out, metrics, exit_code=exit_code) - get_metrics_args = [elf] etiss_ini = os.path.join(directory, "custom.ini") - if os.path.exists(etiss_ini): - get_metrics_args.extend(["--ini", etiss_ini]) - if trace_file: - get_metrics_args.extend(["--trace", trace_file]) - get_metrics_args.extend(["--out", metrics_file]) - if self.print_outputs: - out += execute(self.metrics_script.resolve(), *get_metrics_args, live=True) - else: - out += execute( - self.metrics_script.resolve(), - *get_metrics_args, - live=False, - cwd=directory, - print_func=lambda *args, **kwargs: None, - ) - - metrics_file = os.path.join(directory, "metrics.csv") - with open(metrics_file, "r") as handle: - metrics_content = handle.read() - lines = metrics_content.splitlines() - reader = csv.DictReader(lines) - data = list(reader)[0] - - def get_rom_sizes(data): - assert "rom_rodata" in data - rom_ro = int(data["rom_rodata"]) - assert "rom_code" in data - rom_code = int(data["rom_code"]) - assert "rom_misc" in data - rom_misc = int(data["rom_misc"]) - - rom_total = rom_ro + rom_code + rom_misc - return rom_total, rom_ro, rom_code, rom_misc - - def get_ram_sizes(data): - assert "ram_data" in data - ram_data = int(data["ram_data"]) - assert "ram_zdata" in data - ram_zdata = int(data["ram_zdata"]) - ram_total = ram_data + ram_zdata + needs_get_metrics = self.trace_memory + if needs_get_metrics: + get_metrics_args = [elf] + if os.path.exists(etiss_ini): + get_metrics_args.extend(["--ini", etiss_ini]) + if trace_file: + get_metrics_args.extend(["--trace", trace_file]) + get_metrics_args.extend(["--out", metrics_file]) + if self.print_outputs: + out += execute(self.metrics_script.resolve(), *get_metrics_args, live=True) + else: + out += execute( + self.metrics_script.resolve(), + *get_metrics_args, + live=False, + cwd=directory, + print_func=lambda *args, **kwargs: None, + ) + + metrics_file = os.path.join(directory, "metrics.csv") + with open(metrics_file, "r") as handle: + metrics_content = handle.read() + lines = metrics_content.splitlines() + reader = csv.DictReader(lines) + data = list(reader)[0] + + # def get_rom_sizes(data): + # assert "rom_rodata" in data + # rom_ro = int(data["rom_rodata"]) + # assert "rom_code" in data + # rom_code = int(data["rom_code"]) + # assert "rom_misc" in data + # rom_misc = int(data["rom_misc"]) + + # rom_total = rom_ro + rom_code + rom_misc + # return rom_total, rom_ro, rom_code, rom_misc + + def get_ram_sizes(data): + assert "ram_data" in data + ram_data = int(data["ram_data"]) + assert "ram_zdata" in data + ram_zdata = int(data["ram_zdata"]) + ram_total = ram_data + ram_zdata + if self.trace_memory: + assert "ram_stack" in data + ram_stack = int(data["ram_stack"]) + assert "ram_heap" in data + ram_heap = int(data["ram_heap"]) + ram_total += ram_stack + ram_heap + else: + ram_stack = None + ram_heap = None + return ram_total, ram_data, ram_zdata, ram_stack, ram_heap + + # rom_total, rom_ro, rom_code, rom_misc = get_rom_sizes(data) + ram_total, ram_data, ram_zdata, ram_stack, ram_heap = get_ram_sizes(data) + # metrics.add("Total ROM", rom_total) + metrics.add("Total RAM", ram_total) + # metrics.add("ROM read-only", rom_ro) + # metrics.add("ROM code", rom_code) + # metrics.add("ROM misc", rom_misc) + metrics.add("RAM data", ram_data) + metrics.add("RAM zero-init data", ram_zdata) if self.trace_memory: - assert "ram_stack" in data - ram_stack = int(data["ram_stack"]) - assert "ram_heap" in data - ram_heap = int(data["ram_heap"]) - ram_total += ram_stack + ram_heap - else: - ram_stack = None - ram_heap = None - return ram_total, ram_data, ram_zdata, ram_stack, ram_heap - - rom_total, rom_ro, rom_code, rom_misc = get_rom_sizes(data) - ram_total, ram_data, ram_zdata, ram_stack, ram_heap = get_ram_sizes(data) - metrics.add("Total ROM", rom_total) - metrics.add("Total RAM", ram_total) - metrics.add("ROM read-only", rom_ro) - metrics.add("ROM code", rom_code) - metrics.add("ROM misc", rom_misc) - metrics.add("RAM data", ram_data) - metrics.add("RAM zero-init data", ram_zdata) - if self.trace_memory: - metrics.add("RAM stack", ram_stack) - metrics.add("RAM heap", ram_heap) + metrics.add("RAM stack", ram_stack) + metrics.add("RAM heap", ram_heap) ini_content = open(etiss_ini, "r").read() ini_artifact = Artifact("custom.ini", content=ini_content, fmt=ArtifactFormat.TEXT) diff --git a/mlonmcu_eval.sh b/mlonmcu_eval.sh new file mode 100755 index 000000000..f3142cb58 --- /dev/null +++ b/mlonmcu_eval.sh @@ -0,0 +1,238 @@ +#!/bin/bash + +myPid="none" + +timestamp=$(date +"%Y%m%dT%H%M%S") + +# scenario=tvm +# scenario=tflm_8x +# scenario=tvm_2x +scenario=dummy +out_dir=eval_out/${scenario} +mkdir -p $out_dir +out_file=$out_dir/$timestamp.csv + +control_c() { + echo "Handling keyboardinterrupt!" + if [[ "$myPid" != "none" ]] + then + echo "Killing $myPid" + # kill -9 $myPid + pkill -TERM -P $myPid + exit + fi +} + +trap control_c SIGINT + +function monitor() { + DEST=$1 + INTERVAL=5 + # /usr/bin/time -f "Elapsed:%E, CPU: %P" "$@" & + /usr/bin/time -f "Elapsed:%e; CPU:%P" "$@" & + myPid=$! + OUT=./$myPid.metrics.csv + echo "TS;MEM;CPU;DISK;1M;5M;10M" > $OUT + while kill -0 "$myPid" 2> /dev/null + do + TS=$(date +%s) + MEM=$(free | grep Mem | tr -s ' ' | cut -d' ' -f3) + CPU=$(ps -A -o pcpu | tail -n+2 | paste -sd+ | bc) + # DISK=$(df . | tail -1 | cut -d' ' -f3) + DISK=$(df /data | tail -1 | cut -d' ' -f3) + UPTIME=$(uptime | grep -oP '(?<=average:).*' | tr -d '[:blank:]' | tr ',' ';') + echo "$TS;$MEM;$CPU;$DISK;$UPTIME" >> $OUT + sleep $INTERVAL + done + echo "Monitoring Results:" + cat $OUT + MAX_MEM=$(cat $OUT | tail -n +2 | awk -F';' 'BEGIN{a=0}{if ($2>(0+a)) a=$2 fi}END{print a}') + MIN_MEM=$(cat $OUT | tail -n +2 | awk -F';' 'BEGIN{a=99999999999}{if ($2<(0+a)) a=$2 fi}END{print a}') + AVG_MEM=$(cat $OUT | tail -n +2 | awk -F ';' '{ sum += $2 } END { if (NR > 0) print sum / NR }') + MAX_CPU=$(cat $OUT | tail -n +2 | awk -F';' 'BEGIN{a=0}{if ($3>(0+a)) a=$3 fi}END{print a}') + MIN_CPU=$(cat $OUT | tail -n +2 | awk -F';' 'BEGIN{a=9999999999}{if ($3<(0+a)) a=$3 fi}END{print a}') + AVG_CPU=$(cat $OUT | tail -n +2 | awk -F ';' '{ sum += $3 } END { if (NR > 0) print sum / NR }') + MAX_DISK=$(cat $OUT | tail -n +2 | awk -F';' 'BEGIN{a=0}{if ($4>(0+a)) a=$4 fi}END{print a}') + MIN_DISK=$(cat $OUT | tail -n +2 | awk -F';' 'BEGIN{a=9999999999}{if ($4(0+a)) a=$5 fi}END{print a}') + MAX_5M=$(cat $OUT | tail -n +2 | awk -F';' 'BEGIN{a=0}{if ($6>(0+a)) a=$6 fi}END{print a}') + MAX_10M=$(cat $OUT | tail -n +2 | awk -F';' 'BEGIN{a=0}{if ($7>(0+a)) a=$7 fi}END{print a}') + echo "Max. MEM: $MAX_MEM" + echo "Min. MEM: $MIN_MEM" + echo "Avg. MEM: $AVG_MEM" + echo "Max. CPU: $MAX_CPU" + echo "Min. CPU: $MIN_CPU" + echo "Avg. CPU: $AVG_CPU" + echo "Max. DISK: $MAX_DISK" + echo "Min. DISK: $MIN_DISK" + echo "Max. 1M: $MAX_1M" + echo "Max. 5M: $MAX_5M" + echo "Max. 10M: $MAX_10M" +} + +function monitor2() { + monitor $@ 2>&1 | tee /tmp/mlonmcu_eval_out.txt + ELAPSED=$(cat /tmp/mlonmcu_eval_out.txt | grep "Elapsed" | cut -d";" -f1 | cut -d":" -f2 | xargs) + CPU=$(cat /tmp/mlonmcu_eval_out.txt | grep "Elapsed:" | cut -d";" -f2 | cut -d":" -f2 | cut -d'%' -f1) + MAX_1M=$(cat /tmp/mlonmcu_eval_out.txt | grep "Max. 1M:" | cut -d":" -f2 | xargs) + echo ";$ELAPSED;$CPU;$MAX_1M" >> $out_file +} + +function run_mlonmcu() { + EXECUTOR=$1 + P=$2 + P_=0 # R? + N=$3 + B=1 + MULTIPLIER=$4 + shift 4 + ARGS=$@ + RPC_TRACKER=gpu2.eda.cit.tum.de:9000 + RPC_KEY=default + if [[ "$EXECUTOR" == "rpc" ]] + then + P_=$P + RPC_ARGS="-c session.rpc_tracker=$RPC_TRACKER -c session.rpc_key=$RPC_KEY -c session.parallel_jobs=$P_" + P=$(nproc) + B=$P_ + fi + + SESSION_ARGS="-c session.use_init_stage=0 -c print_report=0 -c runs_per_stage=0 --parallel $P -c mlif.num_threads=$N -c tvmaot.num_threads=$N -c session.executor=$EXECUTOR -c session.batch_size=$B --progress -c run.export_optional=1" + HOST=$(hostname -f) + CMD="python3 -m mlonmcu.cli.main $ARGS $SESSION_ARGS $RPC_ARGS" + CORES=$(grep ^cpu\\scores /proc/cpuinfo | uniq | awk '{print $4}') + THREADS=$(grep -c ^processor /proc/cpuinfo) + RAM=$(grep MemTotal /proc/meminfo | awk '{print $2 / 1024 / 1024, "GiB"}') + for i in $(eval echo "{1..$MULTIPLIER}") + do + CMD="$CMD --config-gen _" + done + echo "======================" + echo "Host: $HOST (${CORES}C${THREADS}T) RAM: $RAM" + echo "Scenario: EXECUTOR=$EXECUTOR MULTIPLIER=$MULTIPLIER ARGS=$ARGS" + echo "Config: P=$P P_=$P_ B=$B N=$N" + echo -n "$timestamp" >> $out_file + echo -n ";$HOST;$CORES;$THREADS;$EXECUTOR;$MULTIPLIER" >> $out_file + echo -n ";$P;$P_;$B;$N" >> $out_file + echo "> $CMD" + monitor2 $CMD + echo "Cooldown..." + sleep 30 + echo "----------------------" + + # --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ --config-gen _ + # monitor python3 -m mlonmcu.cli.main flow run aww --target etiss --backend tvmaot --platform mlif --config-gen _ --config-gen _ -c session.use_init_stage=0 -c runs_per_stage=0 --progress --parallel $P -c mlif.num_threads=$N -c tvmaot.num_threads=$N -c session.executor=$EXECUTOR + +} + +echo "TS;Host;Cores;Threads;Executor;Mult;P;P_;B;N;Elapsed;CPU;Load" > $out_file + +# Framework: TVM +run_mlonmcu process_pool 1 1 64 flow run aww --target etiss --backend tvmaot --platform mlif +# run_mlonmcu process_pool 1 2 64 flow run aww --target etiss --backend tvmaot --platform mlif +# run_mlonmcu process_pool 1 4 64 flow run aww --target etiss --backend tvmaot --platform mlif +# run_mlonmcu process_pool 1 8 64 flow run aww --target etiss --backend tvmaot --platform mlif +# run_mlonmcu process_pool 2 1 64 flow run aww --target etiss --backend tvmaot --platform mlif +# run_mlonmcu process_pool 2 2 64 flow run aww --target etiss --backend tvmaot --platform mlif +# run_mlonmcu process_pool 2 4 64 flow run aww --target etiss --backend tvmaot --platform mlif +# run_mlonmcu process_pool 2 8 64 flow run aww --target etiss --backend tvmaot --platform mlif +# run_mlonmcu process_pool 4 1 64 flow run aww --target etiss --backend tvmaot --platform mlif +# run_mlonmcu process_pool 4 2 64 flow run aww --target etiss --backend tvmaot --platform mlif +# run_mlonmcu process_pool 4 4 64 flow run aww --target etiss --backend tvmaot --platform mlif +# run_mlonmcu process_pool 4 8 64 flow run aww --target etiss --backend tvmaot --platform mlif +# run_mlonmcu process_pool 8 1 64 flow run aww --target etiss --backend tvmaot --platform mlif +# run_mlonmcu process_pool 8 2 64 flow run aww --target etiss --backend tvmaot --platform mlif +# run_mlonmcu process_pool 8 4 64 flow run aww --target etiss --backend tvmaot --platform mlif +# run_mlonmcu process_pool 8 8 64 flow run aww --target etiss --backend tvmaot --platform mlif + +run_mlonmcu rpc 1 1 64 flow run aww --target etiss --backend tvmaot --platform mlif +run_mlonmcu rpc 1 2 64 flow run aww --target etiss --backend tvmaot --platform mlif +run_mlonmcu rpc 1 4 64 flow run aww --target etiss --backend tvmaot --platform mlif +run_mlonmcu rpc 1 8 64 flow run aww --target etiss --backend tvmaot --platform mlif +run_mlonmcu rpc 2 1 64 flow run aww --target etiss --backend tvmaot --platform mlif +run_mlonmcu rpc 2 2 64 flow run aww --target etiss --backend tvmaot --platform mlif +run_mlonmcu rpc 2 4 64 flow run aww --target etiss --backend tvmaot --platform mlif +run_mlonmcu rpc 2 8 64 flow run aww --target etiss --backend tvmaot --platform mlif +run_mlonmcu rpc 4 1 64 flow run aww --target etiss --backend tvmaot --platform mlif +run_mlonmcu rpc 4 2 64 flow run aww --target etiss --backend tvmaot --platform mlif +run_mlonmcu rpc 4 4 64 flow run aww --target etiss --backend tvmaot --platform mlif +# run_mlonmcu rpc 4 8 64 flow run aww --target etiss --backend tvmaot --platform mlif +run_mlonmcu rpc 8 1 64 flow run aww --target etiss --backend tvmaot --platform mlif +run_mlonmcu rpc 8 2 64 flow run aww --target etiss --backend tvmaot --platform mlif +# run_mlonmcu rpc 8 4 64 flow run aww --target etiss --backend tvmaot --platform mlif +# run_mlonmcu rpc 8 8 64 flow run aww --target etiss --backend tvmaot --platform mlif + +# Framework: TFLM +# run_mlonmcu process_pool 1 1 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 1 2 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 1 4 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 1 8 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 1 16 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 1 32 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 2 1 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 2 2 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 2 4 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 2 8 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 2 16 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 2 32 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 4 1 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 4 2 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 4 4 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 4 8 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 4 16 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 4 32 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 8 1 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 8 2 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 8 4 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 8 8 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 8 16 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 8 32 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 16 1 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 16 2 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 16 4 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 16 8 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 16 16 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 16 32 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 32 1 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 32 2 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 32 4 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 32 8 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 32 16 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu process_pool 32 32 64 flow run aww --target etiss --backend tflmi --platform mlif + +# run_mlonmcu rpc 1 1 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu rpc 1 2 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu rpc 1 4 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu rpc 1 8 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 1 16 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 1 32 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu rpc 2 1 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu rpc 2 2 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu rpc 2 4 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu rpc 2 8 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 2 16 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 2 32 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu rpc 4 1 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu rpc 4 2 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu rpc 4 4 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 4 8 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 4 16 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 4 32 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu rpc 8 1 64 flow run aww --target etiss --backend tflmi --platform mlif +# run_mlonmcu rpc 8 2 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 8 4 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 8 8 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 8 16 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 8 32 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 16 1 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 16 2 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 16 4 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 16 8 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 16 16 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 16 32 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 32 1 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 32 2 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 32 4 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 32 8 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 32 16 64 flow run aww --target etiss --backend tflmi --platform mlif +## run_mlonmcu rpc 32 32 64 flow run aww --target etiss --backend tflmi --platform mlif