From c096b5b23f97b1cd03f6a001509eba7a7423c189 Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Tue, 12 Sep 2023 10:27:20 +0900 Subject: [PATCH 01/14] add argument to track resource usage --- src/otx/cli/tools/train.py | 14 +++ src/otx/cli/utils/experiment.py | 216 ++++++++++++++++++++++++++++++++ 2 files changed, 230 insertions(+) create mode 100644 src/otx/cli/utils/experiment.py diff --git a/src/otx/cli/tools/train.py b/src/otx/cli/tools/train.py index da5a3f9ffd6..5ea7cb3491b 100644 --- a/src/otx/cli/tools/train.py +++ b/src/otx/cli/tools/train.py @@ -42,6 +42,7 @@ get_parser_and_hprams_data, ) from otx.cli.utils.report import get_otx_report +from otx.cli.utils.experiment import ResourceTracker from otx.core.data.adapter import get_dataset_adapter @@ -159,6 +160,11 @@ def get_args(): default=None, help="Encryption key required to train the encrypted dataset. It is not required the non-encrypted dataset", ) + parser.add_argument( + "--track-resource-usage", + action="store_true", + help="Track CPU & GPU utilization and max memory usage and save them at the output path as a file.", + ) sub_parser = add_hyper_parameters_sub_parser(parser, hyper_parameters, return_sub_parser=True) # TODO: Temporary solution for cases where there is no template input @@ -273,10 +279,18 @@ def train(exit_stack: Optional[ExitStack] = None): # pylint: disable=too-many-b output_model = ModelEntity(dataset, environment.get_model_configuration()) + resource_tracker = None + if args.track_resource_usage and not is_multigpu_child_process(): + resource_tracker = ResourceTracker(config_manager.output_path, args.gpus) + resource_tracker.start() + task.train( dataset, output_model, train_parameters=TrainParameters(), seed=args.seed, deterministic=args.deterministic ) + if resource_tracker is not None: + resource_tracker.stop() + model_path = config_manager.output_path / "models" save_model_data(output_model, str(model_path)) diff --git a/src/otx/cli/utils/experiment.py b/src/otx/cli/utils/experiment.py new file mode 100644 index 00000000000..30f210eb3b1 --- /dev/null +++ b/src/otx/cli/utils/experiment.py @@ -0,0 +1,216 @@ +"""Utils function for experiments""" + +import multiprocessing as mp +import psutil +import yaml +import logging +import os +from abc import ABC, abstractmethod +from typing import Union, Optional, List, Dict, Any +from pathlib import Path + +try: + import pynvml +except ImportError: + pynvml = None + +logger = logging.getLogger(__name__) +GIB = 1024**3 + + +class ResourceTracker: + """Class to track resources usage. + + Args: + output_dir (Union[str, Path]): Output directory path where CPU & GPU utilization and max meory usage values + are saved. + gpu_ids (Optional[str]): GPU indices to record. + """ + def __init__(self, output_dir: Union[str, Path], gpu_ids: Optional[str] = None): + self._output_dir = output_dir if isinstance(output_dir, Path) else Path(output_dir) + self._gpu_ids = gpu_ids + self._mem_check_proc: Union[mp.Process, None] = None + self._queue: Union[mp.Queue, None] = None + + def start(self): + """Run a process which tracks resources usage""" + self._queue = mp.Queue() + self._mem_check_proc = mp.Process(target=_check_resource, args=(self._queue, ["cpu", "gpu"], self._gpu_ids)) + self._mem_check_proc.start() + + def stop(self): + """Terminate a process to record resources usage.""" + if self._mem_check_proc is None or not self._mem_check_proc.is_alive(): + return + + self._queue.put(self._output_dir) + self._mem_check_proc.join(10) + if self._mem_check_proc.exitcode is None: + self._mem_check_proc.terminate() + self._mem_check_proc.close() + + self._mem_check_proc = None + self._queue = None + + +def _check_resource( + queue: mp.Queue, + resource_types: Optional[Union[str, List[str]]] = None, + gpu_ids: Optional[List[int]] = None +): + if resource_types is None: + resource_types = [] + + trackers: Dict[str, ResourceRecorder] = {} + for resource_type in resource_types: + if resource_type == "cpu": + trackers[resource_type] = CpuUsageRecorder(psutil.Process().parent()) + elif resource_type == "gpu": + if pynvml is None: + logger.warning("GPU can't be found. Tracking GPU usage is skipped.") + continue + trackers[resource_type] = GpuUsageRecorder(gpu_ids) + else: + logger.warning( + "Resource type {} isn't supported now. Current available types are cpu and gpu.".format(resource_type) + ) + + if not trackers: + logger.warning("There is no resource to record.") + return + + while True: + for tracker in trackers.values(): + tracker.record() + + if not queue.empty(): + break + + output_path = Path(queue.get()) + + resource_record = {resource_type : tracker.report() for resource_type, tracker in trackers.items()} + with (output_path / "resource_usage.yaml").open("w") as f: + yaml.dump(resource_record, f, default_flow_style=False) + + +class ResourceRecorder(ABC): + """Base calss for each resource recorder.""" + @abstractmethod + def record(self): + """Record a resource usage.""" + raise NotImplementedError + + @abstractmethod + def report(self): + """Aggregate all resource usages.""" + raise NotImplementedError + + +class CpuUsageRecorder(ResourceRecorder): + """CPU usage recorder class. + + Args: + target_process Optional[psutil.Process]: Process to track. + """ + def __init__(self, target_process: Optional[psutil.Process] = None): + self._target_process = psutil.Process() if target_process is None else target_process + self._record_count: int = 0 + self._max_mem: Union[int, float] = 0 + self._avg_util: Union[int, float] = 0 + + def record(self): + """Record CPU usage.""" + # cpu mem + cpu_mem = self._target_process.memory_info().rss / GIB + if self._max_mem < cpu_mem: + self._max_mem = cpu_mem + + # cpu util + cpu_percent = self._target_process.cpu_percent() + if self._record_count != 0: # a value at the first time is meaningless + self._avg_util += cpu_percent + + self._record_count += 1 + + def report(self) -> Dict[str, str]: + """Aggregate CPU usage.""" + if self._record_count == 0: + return {} + + return { + "max_memory_usage" : f"{round(self._max_mem, 2)} GiB", + "avg_util" : f"{round(self._avg_util / self._record_count, 2)} %" + } + + +class GpuUsageRecorder(ResourceRecorder): + """GPU usage recorder class. + + Args: + gpu_ids Optional[List[int]]: GPU indices to record. If not given, first GPU is recorded. + """ + def __init__(self, gpu_ids: Optional[List[int]] = None): + if gpu_ids is None: + gpu_ids = [0] + + self._record_count: int = 0 + self._record: Dict[str, Union[int, float]] = {} + self._gpu_handlers: Dict[int, Any] = {} + + pynvml.nvmlInit() + gpu_to_track = self._get_gpu_to_track(gpu_ids) + for gpu_idx in gpu_to_track: + self._record[gpu_idx] = {"max_mem" : 0, "avg_util" : 0} + self._gpu_handlers[gpu_idx] = pynvml.nvmlDeviceGetHandleByIndex(gpu_idx) + + def _get_gpu_to_track(self, gpu_ids: List[int]) -> List[int]: + avaiable_gpus = os.environ.get("CUDA_VISIBLE_DEVICES") + if avaiable_gpus is None: + avaiable_gpus = list(range(pynvml.nvmlDeviceGetCount())) + else: + avaiable_gpus = [int(idx) for idx in avaiable_gpus.split(',')] + return [avaiable_gpus[gpu_idx] for gpu_idx in gpu_ids] + + def record(self): + """Record GPU usage.""" + for gpu_idx, record in self._record.items(): + # gpu util + gpu_info = pynvml.nvmlDeviceGetUtilizationRates(self._gpu_handlers[gpu_idx]) + record["avg_util"] += gpu_info.gpu + + # gpu mem + gpu_mem = pynvml.nvmlDeviceGetMemoryInfo(self._gpu_handlers[gpu_idx]) + mem_used = gpu_mem.used / GIB + if record["max_mem"] < mem_used: + record["max_mem"] = mem_used + + self._record_count += 1 + + def report(self) -> Dict[str, str]: + """Aggregate GPU usage.""" + if self._record_count == 0: + return {} + + total_max_mem = 0 + total_avg_util = 0 + gpus_record = self._record.copy() + for gpu_idx in list(gpus_record.keys()): + max_mem = gpus_record[gpu_idx]['max_mem'] + if total_max_mem < max_mem: + total_max_mem = max_mem + avg_util = gpus_record[gpu_idx]['avg_util'] / self._record_count + total_avg_util = avg_util + + gpus_record[gpu_idx]["avg_util"] = f"{round(avg_util, 2)} %" + gpus_record[gpu_idx]["max_mem"] = f"{round(max_mem, 2)} GiB" + gpus_record[f"gpu_{gpu_idx}"] = gpus_record[gpu_idx] + del gpus_record[gpu_idx] + + gpus_record["total_avg_util"] = f"{round(total_avg_util / len(gpus_record), 2)} %" + gpus_record["total_max_mem"] = f"{round(total_max_mem, 2)} GiB" + + return gpus_record + + def __del__(self): + """Shutdown nvml.""" + pynvml.nvmlShutdown() From f26c98b384ec297de7903e25e48e54e68ab68e6a Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Tue, 12 Sep 2023 14:00:24 +0900 Subject: [PATCH 02/14] fix bug --- src/otx/cli/utils/experiment.py | 35 +++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/src/otx/cli/utils/experiment.py b/src/otx/cli/utils/experiment.py index 30f210eb3b1..1d0b7b702fa 100644 --- a/src/otx/cli/utils/experiment.py +++ b/src/otx/cli/utils/experiment.py @@ -5,8 +5,10 @@ import yaml import logging import os +import time from abc import ABC, abstractmethod from typing import Union, Optional, List, Dict, Any +from statistics import mean from pathlib import Path try: @@ -28,6 +30,10 @@ class ResourceTracker: """ def __init__(self, output_dir: Union[str, Path], gpu_ids: Optional[str] = None): self._output_dir = output_dir if isinstance(output_dir, Path) else Path(output_dir) + if gpu_ids is not None: + gpu_ids = [int(idx) for idx in gpu_ids.split(',')] + if len(gpu_ids) == 1: # First GPU will be used in a single GPU training case. + gpu_ids = [0] self._gpu_ids = gpu_ids self._mem_check_proc: Union[mp.Process, None] = None self._queue: Union[mp.Queue, None] = None @@ -86,6 +92,8 @@ def _check_resource( if not queue.empty(): break + time.sleep(0.01) + output_path = Path(queue.get()) resource_record = {resource_type : tracker.report() for resource_type, tracker in trackers.items()} @@ -153,14 +161,13 @@ def __init__(self, gpu_ids: Optional[List[int]] = None): if gpu_ids is None: gpu_ids = [0] - self._record_count: int = 0 self._record: Dict[str, Union[int, float]] = {} self._gpu_handlers: Dict[int, Any] = {} pynvml.nvmlInit() gpu_to_track = self._get_gpu_to_track(gpu_ids) for gpu_idx in gpu_to_track: - self._record[gpu_idx] = {"max_mem" : 0, "avg_util" : 0} + self._record[gpu_idx] = {"max_mem" : 0, "util_record" : []} self._gpu_handlers[gpu_idx] = pynvml.nvmlDeviceGetHandleByIndex(gpu_idx) def _get_gpu_to_track(self, gpu_ids: List[int]) -> List[int]: @@ -176,7 +183,7 @@ def record(self): for gpu_idx, record in self._record.items(): # gpu util gpu_info = pynvml.nvmlDeviceGetUtilizationRates(self._gpu_handlers[gpu_idx]) - record["avg_util"] += gpu_info.gpu + record["util_record"].append(gpu_info.gpu) # gpu mem gpu_mem = pynvml.nvmlDeviceGetMemoryInfo(self._gpu_handlers[gpu_idx]) @@ -184,13 +191,8 @@ def record(self): if record["max_mem"] < mem_used: record["max_mem"] = mem_used - self._record_count += 1 - def report(self) -> Dict[str, str]: """Aggregate GPU usage.""" - if self._record_count == 0: - return {} - total_max_mem = 0 total_avg_util = 0 gpus_record = self._record.copy() @@ -198,12 +200,19 @@ def report(self) -> Dict[str, str]: max_mem = gpus_record[gpu_idx]['max_mem'] if total_max_mem < max_mem: total_max_mem = max_mem - avg_util = gpus_record[gpu_idx]['avg_util'] / self._record_count - total_avg_util = avg_util - gpus_record[gpu_idx]["avg_util"] = f"{round(avg_util, 2)} %" - gpus_record[gpu_idx]["max_mem"] = f"{round(max_mem, 2)} GiB" - gpus_record[f"gpu_{gpu_idx}"] = gpus_record[gpu_idx] + # Count utilization after it becomes bigger than 20% of max utilization + max_util = max(gpus_record[gpu_idx]['util_record']) + for idx, util in enumerate(gpus_record[gpu_idx]['util_record']): + if util * 5 > max_util: + break + avg_util = mean(gpus_record[gpu_idx]['util_record'][idx:]) + total_avg_util += avg_util + + gpus_record[f"gpu_{gpu_idx}"] = { + "avg_util" : f"{round(avg_util, 2)} %", + "max_mem" : f"{round(max_mem, 2)} GiB", + } del gpus_record[gpu_idx] gpus_record["total_avg_util"] = f"{round(total_avg_util / len(gpus_record), 2)} %" From 6579cff3a914f1b609190a4e1111aa502da7dd92 Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Tue, 12 Sep 2023 14:24:05 +0900 Subject: [PATCH 03/14] fix a bug in a multi gpu case --- src/otx/cli/utils/experiment.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/otx/cli/utils/experiment.py b/src/otx/cli/utils/experiment.py index 1d0b7b702fa..f932bed9adf 100644 --- a/src/otx/cli/utils/experiment.py +++ b/src/otx/cli/utils/experiment.py @@ -32,8 +32,7 @@ def __init__(self, output_dir: Union[str, Path], gpu_ids: Optional[str] = None): self._output_dir = output_dir if isinstance(output_dir, Path) else Path(output_dir) if gpu_ids is not None: gpu_ids = [int(idx) for idx in gpu_ids.split(',')] - if len(gpu_ids) == 1: # First GPU will be used in a single GPU training case. - gpu_ids = [0] + gpu_ids[0] = 0 self._gpu_ids = gpu_ids self._mem_check_proc: Union[mp.Process, None] = None self._queue: Union[mp.Queue, None] = None From dc55d49f77af28f9e70ef4b6a3eeb4423527373a Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Tue, 12 Sep 2023 15:12:12 +0900 Subject: [PATCH 04/14] use total cpu usage --- src/otx/cli/utils/experiment.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/otx/cli/utils/experiment.py b/src/otx/cli/utils/experiment.py index f932bed9adf..4c82c791d53 100644 --- a/src/otx/cli/utils/experiment.py +++ b/src/otx/cli/utils/experiment.py @@ -69,7 +69,7 @@ def _check_resource( trackers: Dict[str, ResourceRecorder] = {} for resource_type in resource_types: if resource_type == "cpu": - trackers[resource_type] = CpuUsageRecorder(psutil.Process().parent()) + trackers[resource_type] = CpuUsageRecorder() elif resource_type == "gpu": if pynvml is None: logger.warning("GPU can't be found. Tracking GPU usage is skipped.") @@ -119,8 +119,7 @@ class CpuUsageRecorder(ResourceRecorder): Args: target_process Optional[psutil.Process]: Process to track. """ - def __init__(self, target_process: Optional[psutil.Process] = None): - self._target_process = psutil.Process() if target_process is None else target_process + def __init__(self): self._record_count: int = 0 self._max_mem: Union[int, float] = 0 self._avg_util: Union[int, float] = 0 @@ -128,12 +127,13 @@ def __init__(self, target_process: Optional[psutil.Process] = None): def record(self): """Record CPU usage.""" # cpu mem - cpu_mem = self._target_process.memory_info().rss / GIB + memory_info = psutil.virtual_memory() + cpu_mem = (memory_info.total - memory_info.available) / GIB if self._max_mem < cpu_mem: self._max_mem = cpu_mem # cpu util - cpu_percent = self._target_process.cpu_percent() + cpu_percent = psutil.cpu_percent() if self._record_count != 0: # a value at the first time is meaningless self._avg_util += cpu_percent From 4b9c332ca6b669b398cdb96daf311fd031d3ff46 Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Wed, 13 Sep 2023 11:50:14 +0900 Subject: [PATCH 05/14] add unit test --- src/otx/cli/tools/train.py | 7 +- src/otx/cli/utils/experiment.py | 51 ++-- tests/unit/cli/utils/test_experiment.py | 300 ++++++++++++++++++++++++ 3 files changed, 341 insertions(+), 17 deletions(-) create mode 100644 tests/unit/cli/utils/test_experiment.py diff --git a/src/otx/cli/tools/train.py b/src/otx/cli/tools/train.py index 5ea7cb3491b..dafda8aad33 100644 --- a/src/otx/cli/tools/train.py +++ b/src/otx/cli/tools/train.py @@ -162,7 +162,8 @@ def get_args(): ) parser.add_argument( "--track-resource-usage", - action="store_true", + type=str, + default=None, help="Track CPU & GPU utilization and max memory usage and save them at the output path as a file.", ) @@ -281,7 +282,7 @@ def train(exit_stack: Optional[ExitStack] = None): # pylint: disable=too-many-b resource_tracker = None if args.track_resource_usage and not is_multigpu_child_process(): - resource_tracker = ResourceTracker(config_manager.output_path, args.gpus) + resource_tracker = ResourceTracker(args.track_resource_usage, args.gpus) resource_tracker.start() task.train( @@ -289,7 +290,7 @@ def train(exit_stack: Optional[ExitStack] = None): # pylint: disable=too-many-b ) if resource_tracker is not None: - resource_tracker.stop() + resource_tracker.stop(config_manager.output_path / "resource_usage.yaml") model_path = config_manager.output_path / "models" save_model_data(output_model, str(model_path)) diff --git a/src/otx/cli/utils/experiment.py b/src/otx/cli/utils/experiment.py index 4c82c791d53..043847ad211 100644 --- a/src/otx/cli/utils/experiment.py +++ b/src/otx/cli/utils/experiment.py @@ -18,37 +18,58 @@ logger = logging.getLogger(__name__) GIB = 1024**3 +AVAILABLE_RESOURCE_TYPE = ["cpu", "gpu"] class ResourceTracker: """Class to track resources usage. Args: - output_dir (Union[str, Path]): Output directory path where CPU & GPU utilization and max meory usage values - are saved. + resource_type (str, optional): Which resource to track. Available values are cpu, gpu or all now. + Defaults to "all". gpu_ids (Optional[str]): GPU indices to record. """ - def __init__(self, output_dir: Union[str, Path], gpu_ids: Optional[str] = None): - self._output_dir = output_dir if isinstance(output_dir, Path) else Path(output_dir) + def __init__(self, resource_type: str = "all", gpu_ids: Optional[str] = None): + if resource_type == "all": + resource_type = AVAILABLE_RESOURCE_TYPE + else: + resource_type = [val for val in resource_type.split(',')] + self._resource_type: List[str] = resource_type + if gpu_ids is not None: gpu_ids = [int(idx) for idx in gpu_ids.split(',')] gpu_ids[0] = 0 + self._gpu_ids = gpu_ids self._mem_check_proc: Union[mp.Process, None] = None self._queue: Union[mp.Queue, None] = None def start(self): - """Run a process which tracks resources usage""" + """Run a process which tracks resources usage.""" + if self._mem_check_proc is not None: + logger.warning("Resource tracker started already. Please execute start after executing stop.") + return + self._queue = mp.Queue() - self._mem_check_proc = mp.Process(target=_check_resource, args=(self._queue, ["cpu", "gpu"], self._gpu_ids)) + self._mem_check_proc = mp.Process( + target=_check_resource, + args=(self._queue, self._resource_type, self._gpu_ids) + ) self._mem_check_proc.start() - def stop(self): - """Terminate a process to record resources usage.""" + def stop(self, output_path: Union[str, Path]): + """Terminate a process to record resources usage. + + Args: + output_path (Union[str, Path]): Output file path to save CPU & GPU utilization and max meory usage values. + """ if self._mem_check_proc is None or not self._mem_check_proc.is_alive(): return - self._queue.put(self._output_dir) + if isinstance(output_path, str): + output_path = Path(output_path) + + self._queue.put(output_path) self._mem_check_proc.join(10) if self._mem_check_proc.exitcode is None: self._mem_check_proc.terminate() @@ -60,7 +81,7 @@ def stop(self): def _check_resource( queue: mp.Queue, - resource_types: Optional[Union[str, List[str]]] = None, + resource_types: Optional[List[str]] = None, gpu_ids: Optional[List[int]] = None ): if resource_types is None: @@ -96,7 +117,7 @@ def _check_resource( output_path = Path(queue.get()) resource_record = {resource_type : tracker.report() for resource_type, tracker in trackers.items()} - with (output_path / "resource_usage.yaml").open("w") as f: + with output_path.open("w") as f: yaml.dump(resource_record, f, default_flow_style=False) @@ -123,6 +144,7 @@ def __init__(self): self._record_count: int = 0 self._max_mem: Union[int, float] = 0 self._avg_util: Union[int, float] = 0 + self._first_record = True def record(self): """Record CPU usage.""" @@ -134,10 +156,11 @@ def record(self): # cpu util cpu_percent = psutil.cpu_percent() - if self._record_count != 0: # a value at the first time is meaningless + if self._first_record: + self._first_record = False + else: self._avg_util += cpu_percent - - self._record_count += 1 + self._record_count += 1 def report(self) -> Dict[str, str]: """Aggregate CPU usage.""" diff --git a/tests/unit/cli/utils/test_experiment.py b/tests/unit/cli/utils/test_experiment.py new file mode 100644 index 00000000000..5a2d001bffb --- /dev/null +++ b/tests/unit/cli/utils/test_experiment.py @@ -0,0 +1,300 @@ +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +from otx.cli.utils import experiment as target_file +from otx.cli.utils.experiment import ResourceTracker, _check_resource, CpuUsageRecorder, GpuUsageRecorder, GIB +from tests.test_suite.e2e_test_system import e2e_pytest_unit + + +class TestResourceTracker: + @pytest.fixture(autouse=True) + def _set_up(self, mocker): + self.mock_mp = mocker.patch.object(target_file, "mp") + + self.mock_proc = mocker.MagicMock() + self.mock_mp.Process.return_value = self.mock_proc + + self.mock_queue = mocker.MagicMock() + self.mock_mp.Queue.return_value = self.mock_queue + + @pytest.mark.parametrize("resource_type", ("cpu", "gpu", "all", "cpu,gpu")) + @pytest.mark.parametrize("gpu_ids", (None, "0", "0,3")) + def test_init(self, resource_type, gpu_ids): + ResourceTracker(resource_type, gpu_ids) + + @pytest.mark.parametrize("resource_type", ("cpu", "gpu", "all", "cpu,gpu")) + @pytest.mark.parametrize("gpu_ids", (None, "0", "0,3")) + def test_start(self, resource_type, gpu_ids): + # prepare + if resource_type == "all": + expected_resource_type = target_file.AVAILABLE_RESOURCE_TYPE + else: + expected_resource_type = [val for val in resource_type.split(',')] + + expected_gpu_ids = None + if gpu_ids is not None: + expected_gpu_ids = [int(idx) for idx in gpu_ids.split(',')] + expected_gpu_ids[0] = 0 + + # run + resource_tracker = ResourceTracker(resource_type, gpu_ids) + resource_tracker.start() + + self.mock_proc.start.assert_called_once() # check that a process to track resource usages starts + # check proper resource type and gpu_ids vaues are passed to a process to run + assert self.mock_mp.Process.call_args.kwargs['args'][1] == expected_resource_type + assert self.mock_mp.Process.call_args.kwargs['args'][2] == expected_gpu_ids + + def test_start_multiple_times(self): + resource_tracker = ResourceTracker() + + # run multiple times + resource_tracker.start() + resource_tracker.start() + + self.mock_proc.start.assert_called_once() # check that a process starts once + + def test_stop(self): + output_path = Path("fake") + + resource_tracker = ResourceTracker() + resource_tracker.start() + resource_tracker.stop(output_path) + + # check that code to terminate a process is executed properly + self.mock_queue.put.assert_called_once_with(output_path) + self.mock_proc.join.assert_called() + self.mock_proc.close.assert_called() + + def test_stop_not_exit_normally(self): + output_path = Path("fake") + self.mock_proc.exitcode = None + + resource_tracker = ResourceTracker() + resource_tracker.start() + resource_tracker.stop(output_path) + + # check that code to terminate a process is executed properly + self.mock_queue.put.assert_called_once_with(output_path) + self.mock_proc.join.assert_called() + # check that code to terminate a process forcibly if process doesn't exit normally + self.mock_proc.terminate.assert_called() + self.mock_proc.close.assert_called() + + def test_stop_before_start(self): + resource_tracker = ResourceTracker() + resource_tracker.stop("fake") + + # check that code to make a process done isn't called + self.mock_queue.put.assert_not_called() + self.mock_proc.join.assert_not_called() + self.mock_proc.close.assert_not_called() + + +class MockQueue: + def __init__(self, output_path: str): + self.output_path = output_path + + def empty(self): + return False + + def get(self): + return self.output_path + + +@pytest.mark.parametrize("resource_types", (["cpu"], ["gpu"], ["cpu", "gpu"])) +def test_check_resource(mocker, resource_types, tmp_path): + # prepare + gpu_ids = [0,1] + output_file = f"{tmp_path}/fake.yaml" + mock_queue = MockQueue(output_file) + + mock_cpu_recorder = mocker.MagicMock() + mocker.patch.object(target_file, "CpuUsageRecorder", return_value=mock_cpu_recorder) + mock_gpu_recorder = mocker.MagicMock() + mock_gpu_recorder_cls = mocker.patch.object(target_file, "GpuUsageRecorder", return_value=mock_gpu_recorder) + + mocker.patch.object(target_file, "yaml") + mocker.patch.object(target_file, "time") + + # run + _check_resource(mock_queue, resource_types, gpu_ids) + + # check the recorders record properly + if "cpu" in resource_types: + mock_cpu_recorder.record.assert_called_once() + if "gpu" in resource_types: + mock_gpu_recorder.record.assert_called_once() + mock_gpu_recorder_cls.assert_called_once_with(gpu_ids) + + assert Path(output_file).exists() # check a file is saved well + + +@pytest.mark.parametrize("resource_types", (["wrong"], None)) +def test_check_resource_wrong_resource_type(mocker, resource_types, tmp_path): + # prepare + output_file = f"{tmp_path}/fake.yaml" + mock_queue = MockQueue(output_file) + + mock_cpu_recorder = mocker.MagicMock() + mock_cpu_recorder_cls = mocker.patch.object(target_file, "CpuUsageRecorder", return_value=mock_cpu_recorder) + mock_gpu_recorder = mocker.MagicMock() + mock_gpu_recorder_cls = mocker.patch.object(target_file, "GpuUsageRecorder", return_value=mock_gpu_recorder) + + mocker.patch.object(target_file, "yaml") + mocker.patch.object(target_file, "time") + + # run + _check_resource(mock_queue, resource_types) + + # check the recorders aren't called + mock_cpu_recorder.record.assert_not_called() + mock_cpu_recorder_cls.assert_not_called() + mock_gpu_recorder.record.assert_not_called() + mock_gpu_recorder_cls.assert_not_called() + + assert not Path(output_file).exists() # check a file isn't saved + + +class TestCpuUsageRecorder: + @pytest.fixture(autouse=True) + def _set_up(self, mocker): + self.mock_psutil = mocker.patch.object(target_file, "psutil") + self.mock_virtual_memory = mocker.MagicMock() + self.mock_psutil.virtual_memory.return_value = self.mock_virtual_memory + self.set_mem_usage(0) + self.set_cpu_util(0) + + def set_mem_usage(self, mem_usage: int): + self.mock_virtual_memory.total = mem_usage + self.mock_virtual_memory.available = 0 + + def set_cpu_util(self, cpu_util: int): + self.mock_psutil.cpu_percent.return_value = cpu_util + + def test_init(self): + CpuUsageRecorder() + + def test_record_report(self): + cpu_usage_recorder = CpuUsageRecorder() + + # record cpu usage + cpu_usage_recorder.record() + self.set_mem_usage(4 * GIB) + self.set_cpu_util(40) + cpu_usage_recorder.record() + self.set_mem_usage(6 * GIB) + self.set_cpu_util(60) + cpu_usage_recorder.record() + report = cpu_usage_recorder.report() + + # check right values are returned when calling report + assert float(report["max_memory_usage"].split()[0]) == pytest.approx(6) + assert float(report["avg_util"].split()[0]) == pytest.approx(50) + + def test_report_wo_record(self): + cpu_usage_recorder = CpuUsageRecorder() + report = cpu_usage_recorder.report() + + assert report == {} # if report is called without calling record, empty dict should be returned + + +class TestGpuUsageRecorder: + @pytest.fixture(autouse=True) + def _set_up(self, mocker): + self.mock_pynvml = mocker.patch.object(target_file, "pynvml") + self.mock_pynvml.nvmlDeviceGetCount.return_value = 8 + self.mock_nvmlDeviceGetHandleByIndex = mocker.MagicMock(side_effect=lambda val : val) + self.mock_pynvml.nvmlDeviceGetHandleByIndex = self.mock_nvmlDeviceGetHandleByIndex + + self.gpu_usage = {} + self.mock_pynvml.nvmlDeviceGetMemoryInfo.side_effect = self.mock_nvmlDeviceGetMemoryInfo + self.mock_pynvml.nvmlDeviceGetUtilizationRates.side_effect = self.mock_nvmlDeviceGetUtilizationRates + + self.mock_os = mocker.patch.object(target_file, "os") + self.mock_os.environ = {} + + def mock_nvmlDeviceGetMemoryInfo(self, gpu_idx: int): + gpu_mem = MagicMock() + gpu_mem.used = self.gpu_usage.get(gpu_idx, {}).get("mem", 0) + return gpu_mem + + def mock_nvmlDeviceGetUtilizationRates(self, gpu_idx: int): + gpu_util = MagicMock() + gpu_util.gpu = self.gpu_usage.get(gpu_idx, {}).get("util", 0) + return gpu_util + + def set_mem_usage(self, gpu_idx: int, mem_usage: int): + if gpu_idx in self.gpu_usage: + self.gpu_usage[gpu_idx]["mem"] = mem_usage + else: + self.gpu_usage[gpu_idx] = {"mem" : mem_usage} + + def set_gpu_util(self, gpu_idx: int, gpu_util: int): + if gpu_idx in self.gpu_usage: + self.gpu_usage[gpu_idx]["util"] = gpu_util + else: + self.gpu_usage[gpu_idx] = {"util" : gpu_util} + + @pytest.mark.parametrize("gpu_to_track", ([0], [0,4])) + def test_init(self, mocker, gpu_to_track): + mocker.patch.object(GpuUsageRecorder, "_get_gpu_to_track", return_value=gpu_to_track) + + GpuUsageRecorder() + + self.mock_pynvml.nvmlInit.assert_called_once() # check nvml is initialized + # check proper gpu handler is gotten + for i, gpu_idx in enumerate(gpu_to_track): + self.mock_nvmlDeviceGetHandleByIndex.call_args_list[i].args == (gpu_idx,) + + @pytest.mark.parametrize("gpu_ids", ([0], [1, 2, 5])) + def test_get_gpu_to_track_no_cuda_env_var(self, gpu_ids): + gpu_usage_recorder = GpuUsageRecorder() + + assert gpu_usage_recorder._get_gpu_to_track(gpu_ids) == gpu_ids # check right gpu indices are returned + + @pytest.mark.parametrize("gpu_ids", ([0], [1, 2, 5])) + def test_get_gpu_to_track_cuda_env_var(self, gpu_ids): + cuda_visible_devices = [1, 2, 5, 7, 9, 10] + self.mock_os.environ = {"CUDA_VISIBLE_DEVICES" : ",".join(list(map(str, cuda_visible_devices)))} + gpu_to_track = [cuda_visible_devices[i] for i in gpu_ids] + + gpu_usage_recorder = GpuUsageRecorder() + + assert gpu_usage_recorder._get_gpu_to_track(gpu_ids) == gpu_to_track # check right gpu indices are returned + + def test_record_report(self): + gpu_ids = [0,1] + gpu_usage_recorder = GpuUsageRecorder(gpu_ids) + + # first record + self.set_mem_usage(0, 4 * GIB) + self.set_mem_usage(1, 6 * GIB) + self.set_gpu_util(0, 40) + self.set_gpu_util(1, 60) + gpu_usage_recorder.record() + + # second record + self.set_mem_usage(0, 6 * GIB) + self.set_mem_usage(1, 8 * GIB) + self.set_gpu_util(0, 60) + self.set_gpu_util(1, 80) + gpu_usage_recorder.record() + + report = gpu_usage_recorder.report() + + # check right values are returned + assert float(report["gpu_0"]["avg_util"].split()[0]) == pytest.approx(50) + assert float(report["gpu_0"]["max_mem"].split()[0]) == pytest.approx(6) + assert float(report["gpu_1"]["avg_util"].split()[0]) == pytest.approx(70) + assert float(report["gpu_1"]["max_mem"].split()[0]) == pytest.approx(8) + assert float(report["total_avg_util"].split()[0]) == pytest.approx(60) + assert float(report["total_max_mem"].split()[0]) == pytest.approx(8) + + def test_report_wo_record(self): + gpu_usage_recorder = GpuUsageRecorder() + report = gpu_usage_recorder.report() + + assert report == {} # if report is called without calling record, empty dict should be returned From 85a7af1e0e7076182961da9724f7e0445d71fb6b Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Wed, 13 Sep 2023 11:52:28 +0900 Subject: [PATCH 06/14] add mark to unit test --- tests/unit/cli/utils/test_experiment.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/unit/cli/utils/test_experiment.py b/tests/unit/cli/utils/test_experiment.py index 5a2d001bffb..7c946434574 100644 --- a/tests/unit/cli/utils/test_experiment.py +++ b/tests/unit/cli/utils/test_experiment.py @@ -19,11 +19,13 @@ def _set_up(self, mocker): self.mock_queue = mocker.MagicMock() self.mock_mp.Queue.return_value = self.mock_queue + @e2e_pytest_unit @pytest.mark.parametrize("resource_type", ("cpu", "gpu", "all", "cpu,gpu")) @pytest.mark.parametrize("gpu_ids", (None, "0", "0,3")) def test_init(self, resource_type, gpu_ids): ResourceTracker(resource_type, gpu_ids) + @e2e_pytest_unit @pytest.mark.parametrize("resource_type", ("cpu", "gpu", "all", "cpu,gpu")) @pytest.mark.parametrize("gpu_ids", (None, "0", "0,3")) def test_start(self, resource_type, gpu_ids): @@ -47,6 +49,7 @@ def test_start(self, resource_type, gpu_ids): assert self.mock_mp.Process.call_args.kwargs['args'][1] == expected_resource_type assert self.mock_mp.Process.call_args.kwargs['args'][2] == expected_gpu_ids + @e2e_pytest_unit def test_start_multiple_times(self): resource_tracker = ResourceTracker() @@ -56,6 +59,7 @@ def test_start_multiple_times(self): self.mock_proc.start.assert_called_once() # check that a process starts once + @e2e_pytest_unit def test_stop(self): output_path = Path("fake") @@ -68,6 +72,7 @@ def test_stop(self): self.mock_proc.join.assert_called() self.mock_proc.close.assert_called() + @e2e_pytest_unit def test_stop_not_exit_normally(self): output_path = Path("fake") self.mock_proc.exitcode = None @@ -83,6 +88,7 @@ def test_stop_not_exit_normally(self): self.mock_proc.terminate.assert_called() self.mock_proc.close.assert_called() + @e2e_pytest_unit def test_stop_before_start(self): resource_tracker = ResourceTracker() resource_tracker.stop("fake") @@ -105,6 +111,7 @@ def get(self): @pytest.mark.parametrize("resource_types", (["cpu"], ["gpu"], ["cpu", "gpu"])) +@e2e_pytest_unit def test_check_resource(mocker, resource_types, tmp_path): # prepare gpu_ids = [0,1] @@ -133,6 +140,7 @@ def test_check_resource(mocker, resource_types, tmp_path): @pytest.mark.parametrize("resource_types", (["wrong"], None)) +@e2e_pytest_unit def test_check_resource_wrong_resource_type(mocker, resource_types, tmp_path): # prepare output_file = f"{tmp_path}/fake.yaml" @@ -174,9 +182,11 @@ def set_mem_usage(self, mem_usage: int): def set_cpu_util(self, cpu_util: int): self.mock_psutil.cpu_percent.return_value = cpu_util + @e2e_pytest_unit def test_init(self): CpuUsageRecorder() + @e2e_pytest_unit def test_record_report(self): cpu_usage_recorder = CpuUsageRecorder() @@ -194,6 +204,7 @@ def test_record_report(self): assert float(report["max_memory_usage"].split()[0]) == pytest.approx(6) assert float(report["avg_util"].split()[0]) == pytest.approx(50) + @e2e_pytest_unit def test_report_wo_record(self): cpu_usage_recorder = CpuUsageRecorder() report = cpu_usage_recorder.report() @@ -238,6 +249,7 @@ def set_gpu_util(self, gpu_idx: int, gpu_util: int): else: self.gpu_usage[gpu_idx] = {"util" : gpu_util} + @e2e_pytest_unit @pytest.mark.parametrize("gpu_to_track", ([0], [0,4])) def test_init(self, mocker, gpu_to_track): mocker.patch.object(GpuUsageRecorder, "_get_gpu_to_track", return_value=gpu_to_track) @@ -249,12 +261,14 @@ def test_init(self, mocker, gpu_to_track): for i, gpu_idx in enumerate(gpu_to_track): self.mock_nvmlDeviceGetHandleByIndex.call_args_list[i].args == (gpu_idx,) + @e2e_pytest_unit @pytest.mark.parametrize("gpu_ids", ([0], [1, 2, 5])) def test_get_gpu_to_track_no_cuda_env_var(self, gpu_ids): gpu_usage_recorder = GpuUsageRecorder() assert gpu_usage_recorder._get_gpu_to_track(gpu_ids) == gpu_ids # check right gpu indices are returned + @e2e_pytest_unit @pytest.mark.parametrize("gpu_ids", ([0], [1, 2, 5])) def test_get_gpu_to_track_cuda_env_var(self, gpu_ids): cuda_visible_devices = [1, 2, 5, 7, 9, 10] @@ -265,6 +279,7 @@ def test_get_gpu_to_track_cuda_env_var(self, gpu_ids): assert gpu_usage_recorder._get_gpu_to_track(gpu_ids) == gpu_to_track # check right gpu indices are returned + @e2e_pytest_unit def test_record_report(self): gpu_ids = [0,1] gpu_usage_recorder = GpuUsageRecorder(gpu_ids) @@ -293,6 +308,7 @@ def test_record_report(self): assert float(report["total_avg_util"].split()[0]) == pytest.approx(60) assert float(report["total_max_mem"].split()[0]) == pytest.approx(8) + @e2e_pytest_unit def test_report_wo_record(self): gpu_usage_recorder = GpuUsageRecorder() report = gpu_usage_recorder.report() From d9dc81ef1784f8299eceae2bff683a0fc00a3f29 Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Wed, 13 Sep 2023 11:59:48 +0900 Subject: [PATCH 07/14] cover edge case --- src/otx/cli/utils/experiment.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/otx/cli/utils/experiment.py b/src/otx/cli/utils/experiment.py index 043847ad211..0bf34c8f526 100644 --- a/src/otx/cli/utils/experiment.py +++ b/src/otx/cli/utils/experiment.py @@ -215,6 +215,9 @@ def record(self): def report(self) -> Dict[str, str]: """Aggregate GPU usage.""" + if not list(self._record.values())[0]["util_record"]: # record isn't called + return {} + total_max_mem = 0 total_avg_util = 0 gpus_record = self._record.copy() From 7014a44a62424bdcfc01ff549b3c725bdf6ddec9 Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Wed, 13 Sep 2023 13:30:41 +0900 Subject: [PATCH 08/14] add pynvml in requirement --- requirements/base.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements/base.txt b/requirements/base.txt index 0bb5aa8f044..c616ad8c38a 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -10,3 +10,4 @@ scipy>=1.8 bayesian-optimization>=1.2.0 tensorboard>=2.11.0 multiprocess +pynvml From 60c04b7b48847d428d4132b89ede0f19b75e04a9 Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Wed, 13 Sep 2023 14:09:36 +0900 Subject: [PATCH 09/14] align with pre-commit --- src/otx/cli/tools/train.py | 4 +- src/otx/cli/utils/experiment.py | 70 ++++++++++++------------- tests/unit/cli/utils/test_experiment.py | 28 +++++----- 3 files changed, 51 insertions(+), 51 deletions(-) diff --git a/src/otx/cli/tools/train.py b/src/otx/cli/tools/train.py index dafda8aad33..f0ac3e6bd8a 100644 --- a/src/otx/cli/tools/train.py +++ b/src/otx/cli/tools/train.py @@ -31,6 +31,7 @@ from otx.api.usecases.adapters.model_adapter import ModelAdapter from otx.cli.manager import ConfigManager from otx.cli.manager.config_manager import TASK_TYPE_TO_SUB_DIR_NAME +from otx.cli.utils.experiment import ResourceTracker from otx.cli.utils.hpo import run_hpo from otx.cli.utils.importing import get_impl_class from otx.cli.utils.io import read_binary, read_label_schema, save_model_data @@ -42,7 +43,6 @@ get_parser_and_hprams_data, ) from otx.cli.utils.report import get_otx_report -from otx.cli.utils.experiment import ResourceTracker from otx.core.data.adapter import get_dataset_adapter @@ -164,7 +164,7 @@ def get_args(): "--track-resource-usage", type=str, default=None, - help="Track CPU & GPU utilization and max memory usage and save them at the output path as a file.", + help="Track CPU & GPU utilization and max memory usage and save values at the output path.", ) sub_parser = add_hyper_parameters_sub_parser(parser, hyper_parameters, return_sub_parser=True) diff --git a/src/otx/cli/utils/experiment.py b/src/otx/cli/utils/experiment.py index 0bf34c8f526..207ba402d86 100644 --- a/src/otx/cli/utils/experiment.py +++ b/src/otx/cli/utils/experiment.py @@ -1,15 +1,16 @@ -"""Utils function for experiments""" +"""Utils function for experiments.""" -import multiprocessing as mp -import psutil -import yaml import logging +import multiprocessing as mp import os import time from abc import ABC, abstractmethod -from typing import Union, Optional, List, Dict, Any -from statistics import mean from pathlib import Path +from statistics import mean +from typing import Any, Dict, List, Optional, Union, no_type_check + +import psutil +import yaml try: import pynvml @@ -29,18 +30,19 @@ class ResourceTracker: Defaults to "all". gpu_ids (Optional[str]): GPU indices to record. """ + def __init__(self, resource_type: str = "all", gpu_ids: Optional[str] = None): if resource_type == "all": - resource_type = AVAILABLE_RESOURCE_TYPE + self._resource_type = AVAILABLE_RESOURCE_TYPE else: - resource_type = [val for val in resource_type.split(',')] - self._resource_type: List[str] = resource_type + self._resource_type = [val for val in resource_type.split(",")] + gpu_ids_arr = None if gpu_ids is not None: - gpu_ids = [int(idx) for idx in gpu_ids.split(',')] - gpu_ids[0] = 0 + gpu_ids_arr = [int(idx) for idx in gpu_ids.split(",")] + gpu_ids_arr[0] = 0 - self._gpu_ids = gpu_ids + self._gpu_ids: Union[List[int], None] = gpu_ids_arr self._mem_check_proc: Union[mp.Process, None] = None self._queue: Union[mp.Queue, None] = None @@ -52,8 +54,7 @@ def start(self): self._queue = mp.Queue() self._mem_check_proc = mp.Process( - target=_check_resource, - args=(self._queue, self._resource_type, self._gpu_ids) + target=_check_resource, args=(self._queue, self._resource_type, self._gpu_ids) ) self._mem_check_proc.start() @@ -79,11 +80,7 @@ def stop(self, output_path: Union[str, Path]): self._queue = None -def _check_resource( - queue: mp.Queue, - resource_types: Optional[List[str]] = None, - gpu_ids: Optional[List[int]] = None -): +def _check_resource(queue: mp.Queue, resource_types: Optional[List[str]] = None, gpu_ids: Optional[List[int]] = None): if resource_types is None: resource_types = [] @@ -104,7 +101,7 @@ def _check_resource( if not trackers: logger.warning("There is no resource to record.") return - + while True: for tracker in trackers.values(): tracker.record() @@ -116,13 +113,14 @@ def _check_resource( output_path = Path(queue.get()) - resource_record = {resource_type : tracker.report() for resource_type, tracker in trackers.items()} + resource_record = {resource_type: tracker.report() for resource_type, tracker in trackers.items()} with output_path.open("w") as f: yaml.dump(resource_record, f, default_flow_style=False) class ResourceRecorder(ABC): """Base calss for each resource recorder.""" + @abstractmethod def record(self): """Record a resource usage.""" @@ -140,6 +138,7 @@ class CpuUsageRecorder(ResourceRecorder): Args: target_process Optional[psutil.Process]: Process to track. """ + def __init__(self): self._record_count: int = 0 self._max_mem: Union[int, float] = 0 @@ -168,8 +167,8 @@ def report(self) -> Dict[str, str]: return {} return { - "max_memory_usage" : f"{round(self._max_mem, 2)} GiB", - "avg_util" : f"{round(self._avg_util / self._record_count, 2)} %" + "max_memory_usage": f"{round(self._max_mem, 2)} GiB", + "avg_util": f"{round(self._avg_util / self._record_count, 2)} %", } @@ -179,25 +178,25 @@ class GpuUsageRecorder(ResourceRecorder): Args: gpu_ids Optional[List[int]]: GPU indices to record. If not given, first GPU is recorded. """ + def __init__(self, gpu_ids: Optional[List[int]] = None): if gpu_ids is None: gpu_ids = [0] - self._record: Dict[str, Union[int, float]] = {} + self._record: Dict[int, Dict[str, Union[int, List[int]]]] = {} self._gpu_handlers: Dict[int, Any] = {} pynvml.nvmlInit() gpu_to_track = self._get_gpu_to_track(gpu_ids) for gpu_idx in gpu_to_track: - self._record[gpu_idx] = {"max_mem" : 0, "util_record" : []} + self._record[gpu_idx] = {"max_mem": 0, "util_record": []} self._gpu_handlers[gpu_idx] = pynvml.nvmlDeviceGetHandleByIndex(gpu_idx) def _get_gpu_to_track(self, gpu_ids: List[int]) -> List[int]: - avaiable_gpus = os.environ.get("CUDA_VISIBLE_DEVICES") - if avaiable_gpus is None: - avaiable_gpus = list(range(pynvml.nvmlDeviceGetCount())) + if "CUDA_VISIBLE_DEVICES" in os.environ: + avaiable_gpus = [int(idx) for idx in os.environ["CUDA_VISIBLE_DEVICES"].split(",")] else: - avaiable_gpus = [int(idx) for idx in avaiable_gpus.split(',')] + avaiable_gpus = list(range(pynvml.nvmlDeviceGetCount())) return [avaiable_gpus[gpu_idx] for gpu_idx in gpu_ids] def record(self): @@ -213,6 +212,7 @@ def record(self): if record["max_mem"] < mem_used: record["max_mem"] = mem_used + @no_type_check def report(self) -> Dict[str, str]: """Aggregate GPU usage.""" if not list(self._record.values())[0]["util_record"]: # record isn't called @@ -222,21 +222,21 @@ def report(self) -> Dict[str, str]: total_avg_util = 0 gpus_record = self._record.copy() for gpu_idx in list(gpus_record.keys()): - max_mem = gpus_record[gpu_idx]['max_mem'] + max_mem = gpus_record[gpu_idx]["max_mem"] if total_max_mem < max_mem: total_max_mem = max_mem # Count utilization after it becomes bigger than 20% of max utilization - max_util = max(gpus_record[gpu_idx]['util_record']) - for idx, util in enumerate(gpus_record[gpu_idx]['util_record']): + max_util = max(gpus_record[gpu_idx]["util_record"]) + for idx, util in enumerate(gpus_record[gpu_idx]["util_record"]): if util * 5 > max_util: break - avg_util = mean(gpus_record[gpu_idx]['util_record'][idx:]) + avg_util = mean(gpus_record[gpu_idx]["util_record"][idx:]) total_avg_util += avg_util gpus_record[f"gpu_{gpu_idx}"] = { - "avg_util" : f"{round(avg_util, 2)} %", - "max_mem" : f"{round(max_mem, 2)} GiB", + "avg_util": f"{round(avg_util, 2)} %", + "max_mem": f"{round(max_mem, 2)} GiB", } del gpus_record[gpu_idx] diff --git a/tests/unit/cli/utils/test_experiment.py b/tests/unit/cli/utils/test_experiment.py index 7c946434574..7f0a695e119 100644 --- a/tests/unit/cli/utils/test_experiment.py +++ b/tests/unit/cli/utils/test_experiment.py @@ -33,11 +33,11 @@ def test_start(self, resource_type, gpu_ids): if resource_type == "all": expected_resource_type = target_file.AVAILABLE_RESOURCE_TYPE else: - expected_resource_type = [val for val in resource_type.split(',')] + expected_resource_type = [val for val in resource_type.split(",")] expected_gpu_ids = None if gpu_ids is not None: - expected_gpu_ids = [int(idx) for idx in gpu_ids.split(',')] + expected_gpu_ids = [int(idx) for idx in gpu_ids.split(",")] expected_gpu_ids[0] = 0 # run @@ -46,8 +46,8 @@ def test_start(self, resource_type, gpu_ids): self.mock_proc.start.assert_called_once() # check that a process to track resource usages starts # check proper resource type and gpu_ids vaues are passed to a process to run - assert self.mock_mp.Process.call_args.kwargs['args'][1] == expected_resource_type - assert self.mock_mp.Process.call_args.kwargs['args'][2] == expected_gpu_ids + assert self.mock_mp.Process.call_args.kwargs["args"][1] == expected_resource_type + assert self.mock_mp.Process.call_args.kwargs["args"][2] == expected_gpu_ids @e2e_pytest_unit def test_start_multiple_times(self): @@ -85,7 +85,7 @@ def test_stop_not_exit_normally(self): self.mock_queue.put.assert_called_once_with(output_path) self.mock_proc.join.assert_called() # check that code to terminate a process forcibly if process doesn't exit normally - self.mock_proc.terminate.assert_called() + self.mock_proc.terminate.assert_called() self.mock_proc.close.assert_called() @e2e_pytest_unit @@ -114,7 +114,7 @@ def get(self): @e2e_pytest_unit def test_check_resource(mocker, resource_types, tmp_path): # prepare - gpu_ids = [0,1] + gpu_ids = [0, 1] output_file = f"{tmp_path}/fake.yaml" mock_queue = MockQueue(output_file) @@ -135,7 +135,7 @@ def test_check_resource(mocker, resource_types, tmp_path): if "gpu" in resource_types: mock_gpu_recorder.record.assert_called_once() mock_gpu_recorder_cls.assert_called_once_with(gpu_ids) - + assert Path(output_file).exists() # check a file is saved well @@ -162,7 +162,7 @@ def test_check_resource_wrong_resource_type(mocker, resource_types, tmp_path): mock_cpu_recorder_cls.assert_not_called() mock_gpu_recorder.record.assert_not_called() mock_gpu_recorder_cls.assert_not_called() - + assert not Path(output_file).exists() # check a file isn't saved @@ -217,7 +217,7 @@ class TestGpuUsageRecorder: def _set_up(self, mocker): self.mock_pynvml = mocker.patch.object(target_file, "pynvml") self.mock_pynvml.nvmlDeviceGetCount.return_value = 8 - self.mock_nvmlDeviceGetHandleByIndex = mocker.MagicMock(side_effect=lambda val : val) + self.mock_nvmlDeviceGetHandleByIndex = mocker.MagicMock(side_effect=lambda val: val) self.mock_pynvml.nvmlDeviceGetHandleByIndex = self.mock_nvmlDeviceGetHandleByIndex self.gpu_usage = {} @@ -241,16 +241,16 @@ def set_mem_usage(self, gpu_idx: int, mem_usage: int): if gpu_idx in self.gpu_usage: self.gpu_usage[gpu_idx]["mem"] = mem_usage else: - self.gpu_usage[gpu_idx] = {"mem" : mem_usage} + self.gpu_usage[gpu_idx] = {"mem": mem_usage} def set_gpu_util(self, gpu_idx: int, gpu_util: int): if gpu_idx in self.gpu_usage: self.gpu_usage[gpu_idx]["util"] = gpu_util else: - self.gpu_usage[gpu_idx] = {"util" : gpu_util} + self.gpu_usage[gpu_idx] = {"util": gpu_util} @e2e_pytest_unit - @pytest.mark.parametrize("gpu_to_track", ([0], [0,4])) + @pytest.mark.parametrize("gpu_to_track", ([0], [0, 4])) def test_init(self, mocker, gpu_to_track): mocker.patch.object(GpuUsageRecorder, "_get_gpu_to_track", return_value=gpu_to_track) @@ -272,7 +272,7 @@ def test_get_gpu_to_track_no_cuda_env_var(self, gpu_ids): @pytest.mark.parametrize("gpu_ids", ([0], [1, 2, 5])) def test_get_gpu_to_track_cuda_env_var(self, gpu_ids): cuda_visible_devices = [1, 2, 5, 7, 9, 10] - self.mock_os.environ = {"CUDA_VISIBLE_DEVICES" : ",".join(list(map(str, cuda_visible_devices)))} + self.mock_os.environ = {"CUDA_VISIBLE_DEVICES": ",".join(list(map(str, cuda_visible_devices)))} gpu_to_track = [cuda_visible_devices[i] for i in gpu_ids] gpu_usage_recorder = GpuUsageRecorder() @@ -281,7 +281,7 @@ def test_get_gpu_to_track_cuda_env_var(self, gpu_ids): @e2e_pytest_unit def test_record_report(self): - gpu_ids = [0,1] + gpu_ids = [0, 1] gpu_usage_recorder = GpuUsageRecorder(gpu_ids) # first record From c87e63219abd68f9f88b36582b30e27493b55b2b Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Wed, 13 Sep 2023 14:12:09 +0900 Subject: [PATCH 10/14] add license comment --- src/otx/cli/utils/experiment.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/otx/cli/utils/experiment.py b/src/otx/cli/utils/experiment.py index 207ba402d86..61abc77d1f0 100644 --- a/src/otx/cli/utils/experiment.py +++ b/src/otx/cli/utils/experiment.py @@ -1,4 +1,7 @@ """Utils function for experiments.""" +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +# import logging import multiprocessing as mp From 6b37ffa990cd62cb1f9ce9055980115b98305782 Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Wed, 13 Sep 2023 14:38:45 +0900 Subject: [PATCH 11/14] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 59373a96b7f..e8db0295ead 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ All notable changes to this project will be documented in this file. - Add a new object detector Lite-DINO() - Add Semi-SL Mean Teacher algorithm for Instance Segmentation task() - Official supports for YOLOX-X, YOLOX-L, YOLOX-S, ResNeXt101-ATSS () +- Add new argument to track resource usage in train command() ### Enhancements From 1826f5f2bd5d1900457d2b79e690b8bdc3a23fe9 Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Wed, 13 Sep 2023 14:49:28 +0900 Subject: [PATCH 12/14] refine argument help --- src/otx/cli/tools/train.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/otx/cli/tools/train.py b/src/otx/cli/tools/train.py index f0ac3e6bd8a..a6b40b03d02 100644 --- a/src/otx/cli/tools/train.py +++ b/src/otx/cli/tools/train.py @@ -164,7 +164,9 @@ def get_args(): "--track-resource-usage", type=str, default=None, - help="Track CPU & GPU utilization and max memory usage and save values at the output path.", + help="Track resources utilization and max memory usage and save values at the output path. " + "The possible options are 'cpu', 'gpu' or you can set to a comma-separated list of resource types. " + "And 'all' is also available for choosing all resource types.", ) sub_parser = add_hyper_parameters_sub_parser(parser, hyper_parameters, return_sub_parser=True) From 3a10d56d75e94240e415b6bcdcd21bdc09454d00 Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Wed, 13 Sep 2023 14:59:48 +0900 Subject: [PATCH 13/14] align with pre-commit --- src/otx/cli/tools/train.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/otx/cli/tools/train.py b/src/otx/cli/tools/train.py index a6b40b03d02..cb7c3fb4830 100644 --- a/src/otx/cli/tools/train.py +++ b/src/otx/cli/tools/train.py @@ -165,8 +165,8 @@ def get_args(): type=str, default=None, help="Track resources utilization and max memory usage and save values at the output path. " - "The possible options are 'cpu', 'gpu' or you can set to a comma-separated list of resource types. " - "And 'all' is also available for choosing all resource types.", + "The possible options are 'cpu', 'gpu' or you can set to a comma-separated list of resource types. " + "And 'all' is also available for choosing all resource types.", ) sub_parser = add_hyper_parameters_sub_parser(parser, hyper_parameters, return_sub_parser=True) From 22ad518e43478297aafd7a5c9cf47d7adf1593c4 Mon Sep 17 00:00:00 2001 From: eunwoosh Date: Wed, 13 Sep 2023 16:44:37 +0900 Subject: [PATCH 14/14] add version to requirement and raise an error if not supported values are given --- requirements/base.txt | 2 +- src/otx/cli/utils/experiment.py | 2 +- tests/unit/cli/utils/test_experiment.py | 25 +++++++------------------ 3 files changed, 9 insertions(+), 20 deletions(-) diff --git a/requirements/base.txt b/requirements/base.txt index c616ad8c38a..7cd37e41b72 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -10,4 +10,4 @@ scipy>=1.8 bayesian-optimization>=1.2.0 tensorboard>=2.11.0 multiprocess -pynvml +pynvml>=11.0.0 diff --git a/src/otx/cli/utils/experiment.py b/src/otx/cli/utils/experiment.py index 61abc77d1f0..cc4af013f80 100644 --- a/src/otx/cli/utils/experiment.py +++ b/src/otx/cli/utils/experiment.py @@ -97,7 +97,7 @@ def _check_resource(queue: mp.Queue, resource_types: Optional[List[str]] = None, continue trackers[resource_type] = GpuUsageRecorder(gpu_ids) else: - logger.warning( + raise ValueError( "Resource type {} isn't supported now. Current available types are cpu and gpu.".format(resource_type) ) diff --git a/tests/unit/cli/utils/test_experiment.py b/tests/unit/cli/utils/test_experiment.py index 7f0a695e119..e5a61a8e445 100644 --- a/tests/unit/cli/utils/test_experiment.py +++ b/tests/unit/cli/utils/test_experiment.py @@ -139,31 +139,20 @@ def test_check_resource(mocker, resource_types, tmp_path): assert Path(output_file).exists() # check a file is saved well -@pytest.mark.parametrize("resource_types", (["wrong"], None)) -@e2e_pytest_unit -def test_check_resource_wrong_resource_type(mocker, resource_types, tmp_path): +def test_check_resource_wrong_resource_type(mocker, tmp_path): # prepare + resource_types = ["wrong"] output_file = f"{tmp_path}/fake.yaml" mock_queue = MockQueue(output_file) - mock_cpu_recorder = mocker.MagicMock() - mock_cpu_recorder_cls = mocker.patch.object(target_file, "CpuUsageRecorder", return_value=mock_cpu_recorder) - mock_gpu_recorder = mocker.MagicMock() - mock_gpu_recorder_cls = mocker.patch.object(target_file, "GpuUsageRecorder", return_value=mock_gpu_recorder) - + mocker.patch.object(target_file, "CpuUsageRecorder") + mocker.patch.object(target_file, "GpuUsageRecorder") mocker.patch.object(target_file, "yaml") mocker.patch.object(target_file, "time") - # run - _check_resource(mock_queue, resource_types) - - # check the recorders aren't called - mock_cpu_recorder.record.assert_not_called() - mock_cpu_recorder_cls.assert_not_called() - mock_gpu_recorder.record.assert_not_called() - mock_gpu_recorder_cls.assert_not_called() - - assert not Path(output_file).exists() # check a file isn't saved + # check that ValueError is raised. + with pytest.raises(ValueError): + _check_resource(mock_queue, resource_types) class TestCpuUsageRecorder: