diff --git a/otx/cli/tools/train.py b/otx/cli/tools/train.py index 2b041edbad2..64afae41e54 100644 --- a/otx/cli/tools/train.py +++ b/otx/cli/tools/train.py @@ -30,6 +30,7 @@ from otx.api.usecases.adapters.model_adapter import ModelAdapter from otx.cli.manager import ConfigManager from otx.cli.manager.config_manager import TASK_TYPE_TO_SUB_DIR_NAME +from otx.cli.utils.experiment import ResourceTracker from otx.cli.utils.hpo import run_hpo from otx.cli.utils.importing import get_impl_class from otx.cli.utils.io import read_binary, read_label_schema, save_model_data @@ -40,7 +41,6 @@ get_parser_and_hprams_data, ) from otx.cli.utils.report import get_otx_report -from otx.cli.utils.experiment import run_process_to_check_resource from otx.core.data.adapter import get_dataset_adapter @@ -141,7 +141,11 @@ def get_args(): ) parser.add_argument( "--track-resource-usage", - action="store_true", + type=str, + default=None, + help="Track resources utilization and max memory usage and save values at the output path. " + "The possible options are 'cpu', 'gpu' or you can set to a comma-separated list of resource types. " + "And 'all' is also available for choosing all resource types.", ) sub_parser = add_hyper_parameters_sub_parser(parser, hyper_parameters, return_sub_parser=True) @@ -248,15 +252,20 @@ def train(exit_stack: Optional[ExitStack] = None): # pylint: disable=too-many-b "if main process raises an error, all processes can be stuck." ) - if args.track_resource_usage: - run_process_to_check_resource(config_manager.output_path, exit_stack) - task = task_class(task_environment=environment, output_path=str(config_manager.output_path / "logs")) output_model = ModelEntity(dataset, environment.get_model_configuration()) + resource_tracker = None + if args.track_resource_usage and not is_multigpu_child_process(): + resource_tracker = ResourceTracker(args.track_resource_usage, args.gpus) + resource_tracker.start() + task.train(dataset, output_model, train_parameters=TrainParameters()) + if resource_tracker is not None: + resource_tracker.stop(config_manager.output_path / "resource_usage.yaml") + model_path = config_manager.output_path / "models" save_model_data(output_model, str(model_path)) diff --git a/otx/cli/utils/experiment.py b/otx/cli/utils/experiment.py index 065941a1000..2455fdc4b5c 100644 --- a/otx/cli/utils/experiment.py +++ b/otx/cli/utils/experiment.py @@ -1,84 +1,284 @@ -"""Utils function for experiments""" +"""Utils function for experiments.""" +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +# +import logging import multiprocessing as mp -import pynvml -import psutil -import yaml import os -from contextlib import ExitStack -from typing import Union +import time +from abc import ABC, abstractmethod from pathlib import Path +from statistics import mean +from typing import Any, Dict, List, Optional, Union, no_type_check +import psutil +import yaml -def run_process_to_check_resource(output_dir: Union[str, Path], exit_stack: ExitStack): - if isinstance(output_dir, str): - output_dir = Path(output_dir) +try: + import pynvml +except ImportError: + pynvml = None - gpu_used = os.environ.get("CUDA_VISIBLE_DEVICES", 0) +logger = logging.getLogger(__name__) +GIB = 1024**3 +AVAILABLE_RESOURCE_TYPE = ["cpu", "gpu"] - queue = mp.Queue() - mem_check_p = mp.Process(target=check_resource, args=(queue, gpu_used)) - mem_check_p.start() - exit_stack.callback(mem_check_proc_callback, mem_check_p, queue, output_dir) +class ResourceTracker: + """Class to track resources usage. + Args: + resource_type (str, optional): Which resource to track. Available values are cpu, gpu or all now. + Defaults to "all". + gpu_ids (Optional[str]): GPU indices to record. + """ -def mem_check_proc_callback(mem_check_p, queue, output_dir): - queue.put(output_dir) - mem_check_p.join(10) - if mem_check_p.exitcode is None: - mem_check_p.terminate() - mem_check_p.close() + def __init__(self, resource_type: str = "all", gpu_ids: Optional[str] = None): + if resource_type == "all": + self._resource_type = AVAILABLE_RESOURCE_TYPE + else: + self._resource_type = [val for val in resource_type.split(",")] + gpu_ids_arr = None + if gpu_ids is not None: + gpu_ids_arr = [int(idx) for idx in gpu_ids.split(",")] + gpu_ids_arr[0] = 0 -def check_resource(queue: mp.Queue, gpu_idx: int = 0): - pynvml.nvmlInit() - handle = pynvml.nvmlDeviceGetHandleByIndex(gpu_idx) - max_gpu_mem = 0 - avg_gpu_util = 0 - max_cpu_mem = 0 - avg_cpu_util = 0 - gib = 1024**3 - target_process = psutil.Process().parent() + self._gpu_ids: Union[List[int], None] = gpu_ids_arr + self._mem_check_proc: Union[mp.Process, None] = None + self._queue: Union[mp.Queue, None] = None - num_counts = 0 - while True: - # gpu util - gpu_info = pynvml.nvmlDeviceGetUtilizationRates(handle) - avg_gpu_util += gpu_info.gpu - num_counts += 1 + def start(self): + """Run a process which tracks resources usage.""" + if self._mem_check_proc is not None: + logger.warning("Resource tracker started already. Please execute start after executing stop.") + return - # gpu mem - gpu_mem = pynvml.nvmlDeviceGetMemoryInfo(handle) - mem_used = gpu_mem.used / gib - if max_gpu_mem < mem_used: - max_gpu_mem = mem_used + self._queue = mp.Queue() + self._mem_check_proc = mp.Process( + target=_check_resource, args=(self._queue, self._resource_type, self._gpu_ids) + ) + self._mem_check_proc.start() - # cpu mem - # cpu_mem = psutil.virtual_memory()[3] / gib - # cpu_mem = target_process.memory_percent() - cpu_mem = target_process.memory_info().rss / gib - if max_cpu_mem < cpu_mem: - max_cpu_mem = cpu_mem + def stop(self, output_path: Union[str, Path]): + """Terminate a process to record resources usage. - # cpu util - cpu_percent = target_process.cpu_percent() - avg_cpu_util += cpu_percent + Args: + output_path (Union[str, Path]): Output file path to save CPU & GPU utilization and max meory usage values. + """ + if self._mem_check_proc is None or not self._mem_check_proc.is_alive(): + return + + if isinstance(output_path, str): + output_path = Path(output_path) + + self._queue.put(output_path) + self._mem_check_proc.join(10) + if self._mem_check_proc.exitcode is None: + self._mem_check_proc.terminate() + self._mem_check_proc.close() + + self._mem_check_proc = None + self._queue = None + + +def _check_resource(queue: mp.Queue, resource_types: Optional[List[str]] = None, gpu_ids: Optional[List[int]] = None): + if resource_types is None: + resource_types = [] + + trackers: Dict[str, ResourceRecorder] = {} + for resource_type in resource_types: + if resource_type == "cpu": + trackers[resource_type] = CpuUsageRecorder() + elif resource_type == "gpu": + if pynvml is None: + logger.warning("GPU can't be found. Tracking GPU usage is skipped.") + continue + trackers[resource_type] = GpuUsageRecorder(gpu_ids) + else: + raise ValueError( + "Resource type {} isn't supported now. Current available types are cpu and gpu.".format(resource_type) + ) + + if not trackers: + logger.warning("There is no resource to record.") + return + + while True: + for tracker in trackers.values(): + tracker.record() if not queue.empty(): break - pynvml.nvmlShutdown() + time.sleep(0.01) + output_path = Path(queue.get()) - with (output_path / "resource_usage.yaml").open("w") as f: - yaml.dump( - { - "max_cpu_mem(GiB)" : round(max_cpu_mem, 2), - "avg_cpu_util(%)" : round(avg_cpu_util / num_counts, 2), - "max_gpu_mem(GiB)" : round(max_gpu_mem, 2), - "avg_gpu_util(%)" : round(avg_gpu_util / num_counts, 2), - }, - f, - default_flow_style=False - ) + resource_record = {resource_type: tracker.report() for resource_type, tracker in trackers.items()} + with output_path.open("w") as f: + yaml.dump(resource_record, f, default_flow_style=False) + + +class ResourceRecorder(ABC): + """Base calss for each resource recorder.""" + + @abstractmethod + def record(self): + """Record a resource usage.""" + raise NotImplementedError + + @abstractmethod + def report(self): + """Aggregate all resource usages.""" + raise NotImplementedError + + +class CpuUsageRecorder(ResourceRecorder): + """CPU usage recorder class. + + Args: + target_process Optional[psutil.Process]: Process to track. + """ + + def __init__(self): + self._record_count: int = 0 + self._max_mem: Union[int, float] = 0 + self._avg_util: Union[int, float] = 0 + self._first_record = True + + def record(self): + """Record CPU usage.""" + # cpu mem + memory_info = psutil.virtual_memory() + cpu_mem = (memory_info.total - memory_info.available) / GIB + if self._max_mem < cpu_mem: + self._max_mem = cpu_mem + + # cpu util + cpu_percent = psutil.cpu_percent() + if self._first_record: + self._first_record = False + else: + self._avg_util += cpu_percent + self._record_count += 1 + + def report(self) -> Dict[str, str]: + """Aggregate CPU usage.""" + if self._record_count == 0: + return {} + + return { + "max_memory_usage": f"{round(self._max_mem, 2)} GiB", + "avg_util": f"{round(self._avg_util / self._record_count, 2)} %", + } + + +class GpuUsageRecorder(ResourceRecorder): + """GPU usage recorder class. + + Args: + gpu_ids Optional[List[int]]: GPU indices to record. If not given, first GPU is recorded. + """ + + def __init__(self, gpu_ids: Optional[List[int]] = None): + if gpu_ids is None: + gpu_ids = [0] + + self._record: Dict[int, Dict[str, Union[int, List[int]]]] = {} + self._gpu_handlers: Dict[int, Any] = {} + + pynvml.nvmlInit() + gpu_to_track = self._get_gpu_to_track(gpu_ids) + for gpu_idx in gpu_to_track: + self._record[gpu_idx] = {"max_mem": 0, "util_record": []} + self._gpu_handlers[gpu_idx] = pynvml.nvmlDeviceGetHandleByIndex(gpu_idx) + + def _get_gpu_to_track(self, gpu_ids: List[int]) -> List[int]: + if "CUDA_VISIBLE_DEVICES" in os.environ: + avaiable_gpus = [int(idx) for idx in os.environ["CUDA_VISIBLE_DEVICES"].split(",")] + else: + avaiable_gpus = list(range(pynvml.nvmlDeviceGetCount())) + return [avaiable_gpus[gpu_idx] for gpu_idx in gpu_ids] + + def record(self): + """Record GPU usage.""" + for gpu_idx, record in self._record.items(): + # gpu util + gpu_info = pynvml.nvmlDeviceGetUtilizationRates(self._gpu_handlers[gpu_idx]) + record["util_record"].append(gpu_info.gpu) + + # gpu mem + gpu_mem = pynvml.nvmlDeviceGetMemoryInfo(self._gpu_handlers[gpu_idx]) + mem_used = gpu_mem.used / GIB + if record["max_mem"] < mem_used: + record["max_mem"] = mem_used + + @no_type_check + def report(self) -> Dict[str, str]: + """Aggregate GPU usage.""" + if not list(self._record.values())[0]["util_record"]: # record isn't called + return {} + + total_max_mem = 0 + total_avg_util = 0 + gpus_record = self._record.copy() + for gpu_idx in list(gpus_record.keys()): + max_mem = gpus_record[gpu_idx]["max_mem"] + if total_max_mem < max_mem: + total_max_mem = max_mem + + # Count utilization after it becomes bigger than 20% of max utilization + max_util = max(gpus_record[gpu_idx]["util_record"]) + for idx, util in enumerate(gpus_record[gpu_idx]["util_record"]): + if util * 5 > max_util: + break + avg_util = mean(gpus_record[gpu_idx]["util_record"][idx:]) + total_avg_util += avg_util + + gpus_record[f"gpu_{gpu_idx}"] = { + "avg_util": f"{round(avg_util, 2)} %", + "max_mem": f"{round(max_mem, 2)} GiB", + } + del gpus_record[gpu_idx] + + gpus_record["total_avg_util"] = f"{round(total_avg_util / len(gpus_record), 2)} %" + gpus_record["total_max_mem"] = f"{round(total_max_mem, 2)} GiB" + + return gpus_record + + def __del__(self): + """Shutdown nvml.""" + pynvml.nvmlShutdown() + + +def set_arguments_to_cmd(command: List[str], key: str, value: Optional[str] = None, start_idx: int = 0, after_params: bool = False): + """Add arguments at proper position in command. + + Args: + keys (str): arguement key. + value (str or None): argument value. + command (List[str]): list includng a otx command entry and arguments. + start_idx (int, optional): find a position to put arguments in after this value. Defaults to 0. + after_params (bool): whether argument should be after `param` or not. + """ + if key in command: + if value is not None: + command[command.index(key) + 1] = value + return + + delimiters = ["demo", "deploy", "eval", "explain", "export", "find", "train", "optimize", "build", "run"] + if not after_params: + delimiters.append("params") + + for i in range(start_idx, len(command)): + if command[i] in delimiters: + if value is not None: + command.insert(i, value) + command.insert(i, key) + break + else: + command.append(key) + if value is not None: + command.append(value) diff --git a/requirements/base.txt b/requirements/base.txt index 9432e7faf22..39d61bfdf87 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -10,3 +10,4 @@ scipy>=1.8 bayesian-optimization>=1.2.0 tensorboard>=2.11.0 multiprocess +pynvml>=11.0.0 diff --git a/tests/unit/cli/utils/test_experiment.py b/tests/unit/cli/utils/test_experiment.py new file mode 100644 index 00000000000..e5a61a8e445 --- /dev/null +++ b/tests/unit/cli/utils/test_experiment.py @@ -0,0 +1,305 @@ +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +from otx.cli.utils import experiment as target_file +from otx.cli.utils.experiment import ResourceTracker, _check_resource, CpuUsageRecorder, GpuUsageRecorder, GIB +from tests.test_suite.e2e_test_system import e2e_pytest_unit + + +class TestResourceTracker: + @pytest.fixture(autouse=True) + def _set_up(self, mocker): + self.mock_mp = mocker.patch.object(target_file, "mp") + + self.mock_proc = mocker.MagicMock() + self.mock_mp.Process.return_value = self.mock_proc + + self.mock_queue = mocker.MagicMock() + self.mock_mp.Queue.return_value = self.mock_queue + + @e2e_pytest_unit + @pytest.mark.parametrize("resource_type", ("cpu", "gpu", "all", "cpu,gpu")) + @pytest.mark.parametrize("gpu_ids", (None, "0", "0,3")) + def test_init(self, resource_type, gpu_ids): + ResourceTracker(resource_type, gpu_ids) + + @e2e_pytest_unit + @pytest.mark.parametrize("resource_type", ("cpu", "gpu", "all", "cpu,gpu")) + @pytest.mark.parametrize("gpu_ids", (None, "0", "0,3")) + def test_start(self, resource_type, gpu_ids): + # prepare + if resource_type == "all": + expected_resource_type = target_file.AVAILABLE_RESOURCE_TYPE + else: + expected_resource_type = [val for val in resource_type.split(",")] + + expected_gpu_ids = None + if gpu_ids is not None: + expected_gpu_ids = [int(idx) for idx in gpu_ids.split(",")] + expected_gpu_ids[0] = 0 + + # run + resource_tracker = ResourceTracker(resource_type, gpu_ids) + resource_tracker.start() + + self.mock_proc.start.assert_called_once() # check that a process to track resource usages starts + # check proper resource type and gpu_ids vaues are passed to a process to run + assert self.mock_mp.Process.call_args.kwargs["args"][1] == expected_resource_type + assert self.mock_mp.Process.call_args.kwargs["args"][2] == expected_gpu_ids + + @e2e_pytest_unit + def test_start_multiple_times(self): + resource_tracker = ResourceTracker() + + # run multiple times + resource_tracker.start() + resource_tracker.start() + + self.mock_proc.start.assert_called_once() # check that a process starts once + + @e2e_pytest_unit + def test_stop(self): + output_path = Path("fake") + + resource_tracker = ResourceTracker() + resource_tracker.start() + resource_tracker.stop(output_path) + + # check that code to terminate a process is executed properly + self.mock_queue.put.assert_called_once_with(output_path) + self.mock_proc.join.assert_called() + self.mock_proc.close.assert_called() + + @e2e_pytest_unit + def test_stop_not_exit_normally(self): + output_path = Path("fake") + self.mock_proc.exitcode = None + + resource_tracker = ResourceTracker() + resource_tracker.start() + resource_tracker.stop(output_path) + + # check that code to terminate a process is executed properly + self.mock_queue.put.assert_called_once_with(output_path) + self.mock_proc.join.assert_called() + # check that code to terminate a process forcibly if process doesn't exit normally + self.mock_proc.terminate.assert_called() + self.mock_proc.close.assert_called() + + @e2e_pytest_unit + def test_stop_before_start(self): + resource_tracker = ResourceTracker() + resource_tracker.stop("fake") + + # check that code to make a process done isn't called + self.mock_queue.put.assert_not_called() + self.mock_proc.join.assert_not_called() + self.mock_proc.close.assert_not_called() + + +class MockQueue: + def __init__(self, output_path: str): + self.output_path = output_path + + def empty(self): + return False + + def get(self): + return self.output_path + + +@pytest.mark.parametrize("resource_types", (["cpu"], ["gpu"], ["cpu", "gpu"])) +@e2e_pytest_unit +def test_check_resource(mocker, resource_types, tmp_path): + # prepare + gpu_ids = [0, 1] + output_file = f"{tmp_path}/fake.yaml" + mock_queue = MockQueue(output_file) + + mock_cpu_recorder = mocker.MagicMock() + mocker.patch.object(target_file, "CpuUsageRecorder", return_value=mock_cpu_recorder) + mock_gpu_recorder = mocker.MagicMock() + mock_gpu_recorder_cls = mocker.patch.object(target_file, "GpuUsageRecorder", return_value=mock_gpu_recorder) + + mocker.patch.object(target_file, "yaml") + mocker.patch.object(target_file, "time") + + # run + _check_resource(mock_queue, resource_types, gpu_ids) + + # check the recorders record properly + if "cpu" in resource_types: + mock_cpu_recorder.record.assert_called_once() + if "gpu" in resource_types: + mock_gpu_recorder.record.assert_called_once() + mock_gpu_recorder_cls.assert_called_once_with(gpu_ids) + + assert Path(output_file).exists() # check a file is saved well + + +def test_check_resource_wrong_resource_type(mocker, tmp_path): + # prepare + resource_types = ["wrong"] + output_file = f"{tmp_path}/fake.yaml" + mock_queue = MockQueue(output_file) + + mocker.patch.object(target_file, "CpuUsageRecorder") + mocker.patch.object(target_file, "GpuUsageRecorder") + mocker.patch.object(target_file, "yaml") + mocker.patch.object(target_file, "time") + + # check that ValueError is raised. + with pytest.raises(ValueError): + _check_resource(mock_queue, resource_types) + + +class TestCpuUsageRecorder: + @pytest.fixture(autouse=True) + def _set_up(self, mocker): + self.mock_psutil = mocker.patch.object(target_file, "psutil") + self.mock_virtual_memory = mocker.MagicMock() + self.mock_psutil.virtual_memory.return_value = self.mock_virtual_memory + self.set_mem_usage(0) + self.set_cpu_util(0) + + def set_mem_usage(self, mem_usage: int): + self.mock_virtual_memory.total = mem_usage + self.mock_virtual_memory.available = 0 + + def set_cpu_util(self, cpu_util: int): + self.mock_psutil.cpu_percent.return_value = cpu_util + + @e2e_pytest_unit + def test_init(self): + CpuUsageRecorder() + + @e2e_pytest_unit + def test_record_report(self): + cpu_usage_recorder = CpuUsageRecorder() + + # record cpu usage + cpu_usage_recorder.record() + self.set_mem_usage(4 * GIB) + self.set_cpu_util(40) + cpu_usage_recorder.record() + self.set_mem_usage(6 * GIB) + self.set_cpu_util(60) + cpu_usage_recorder.record() + report = cpu_usage_recorder.report() + + # check right values are returned when calling report + assert float(report["max_memory_usage"].split()[0]) == pytest.approx(6) + assert float(report["avg_util"].split()[0]) == pytest.approx(50) + + @e2e_pytest_unit + def test_report_wo_record(self): + cpu_usage_recorder = CpuUsageRecorder() + report = cpu_usage_recorder.report() + + assert report == {} # if report is called without calling record, empty dict should be returned + + +class TestGpuUsageRecorder: + @pytest.fixture(autouse=True) + def _set_up(self, mocker): + self.mock_pynvml = mocker.patch.object(target_file, "pynvml") + self.mock_pynvml.nvmlDeviceGetCount.return_value = 8 + self.mock_nvmlDeviceGetHandleByIndex = mocker.MagicMock(side_effect=lambda val: val) + self.mock_pynvml.nvmlDeviceGetHandleByIndex = self.mock_nvmlDeviceGetHandleByIndex + + self.gpu_usage = {} + self.mock_pynvml.nvmlDeviceGetMemoryInfo.side_effect = self.mock_nvmlDeviceGetMemoryInfo + self.mock_pynvml.nvmlDeviceGetUtilizationRates.side_effect = self.mock_nvmlDeviceGetUtilizationRates + + self.mock_os = mocker.patch.object(target_file, "os") + self.mock_os.environ = {} + + def mock_nvmlDeviceGetMemoryInfo(self, gpu_idx: int): + gpu_mem = MagicMock() + gpu_mem.used = self.gpu_usage.get(gpu_idx, {}).get("mem", 0) + return gpu_mem + + def mock_nvmlDeviceGetUtilizationRates(self, gpu_idx: int): + gpu_util = MagicMock() + gpu_util.gpu = self.gpu_usage.get(gpu_idx, {}).get("util", 0) + return gpu_util + + def set_mem_usage(self, gpu_idx: int, mem_usage: int): + if gpu_idx in self.gpu_usage: + self.gpu_usage[gpu_idx]["mem"] = mem_usage + else: + self.gpu_usage[gpu_idx] = {"mem": mem_usage} + + def set_gpu_util(self, gpu_idx: int, gpu_util: int): + if gpu_idx in self.gpu_usage: + self.gpu_usage[gpu_idx]["util"] = gpu_util + else: + self.gpu_usage[gpu_idx] = {"util": gpu_util} + + @e2e_pytest_unit + @pytest.mark.parametrize("gpu_to_track", ([0], [0, 4])) + def test_init(self, mocker, gpu_to_track): + mocker.patch.object(GpuUsageRecorder, "_get_gpu_to_track", return_value=gpu_to_track) + + GpuUsageRecorder() + + self.mock_pynvml.nvmlInit.assert_called_once() # check nvml is initialized + # check proper gpu handler is gotten + for i, gpu_idx in enumerate(gpu_to_track): + self.mock_nvmlDeviceGetHandleByIndex.call_args_list[i].args == (gpu_idx,) + + @e2e_pytest_unit + @pytest.mark.parametrize("gpu_ids", ([0], [1, 2, 5])) + def test_get_gpu_to_track_no_cuda_env_var(self, gpu_ids): + gpu_usage_recorder = GpuUsageRecorder() + + assert gpu_usage_recorder._get_gpu_to_track(gpu_ids) == gpu_ids # check right gpu indices are returned + + @e2e_pytest_unit + @pytest.mark.parametrize("gpu_ids", ([0], [1, 2, 5])) + def test_get_gpu_to_track_cuda_env_var(self, gpu_ids): + cuda_visible_devices = [1, 2, 5, 7, 9, 10] + self.mock_os.environ = {"CUDA_VISIBLE_DEVICES": ",".join(list(map(str, cuda_visible_devices)))} + gpu_to_track = [cuda_visible_devices[i] for i in gpu_ids] + + gpu_usage_recorder = GpuUsageRecorder() + + assert gpu_usage_recorder._get_gpu_to_track(gpu_ids) == gpu_to_track # check right gpu indices are returned + + @e2e_pytest_unit + def test_record_report(self): + gpu_ids = [0, 1] + gpu_usage_recorder = GpuUsageRecorder(gpu_ids) + + # first record + self.set_mem_usage(0, 4 * GIB) + self.set_mem_usage(1, 6 * GIB) + self.set_gpu_util(0, 40) + self.set_gpu_util(1, 60) + gpu_usage_recorder.record() + + # second record + self.set_mem_usage(0, 6 * GIB) + self.set_mem_usage(1, 8 * GIB) + self.set_gpu_util(0, 60) + self.set_gpu_util(1, 80) + gpu_usage_recorder.record() + + report = gpu_usage_recorder.report() + + # check right values are returned + assert float(report["gpu_0"]["avg_util"].split()[0]) == pytest.approx(50) + assert float(report["gpu_0"]["max_mem"].split()[0]) == pytest.approx(6) + assert float(report["gpu_1"]["avg_util"].split()[0]) == pytest.approx(70) + assert float(report["gpu_1"]["max_mem"].split()[0]) == pytest.approx(8) + assert float(report["total_avg_util"].split()[0]) == pytest.approx(60) + assert float(report["total_max_mem"].split()[0]) == pytest.approx(8) + + @e2e_pytest_unit + def test_report_wo_record(self): + gpu_usage_recorder = GpuUsageRecorder() + report = gpu_usage_recorder.report() + + assert report == {} # if report is called without calling record, empty dict should be returned