Skip to content

Commit

Permalink
Add new argument to track CPU&GPU utilization and memory usage (#2500)
Browse files Browse the repository at this point in the history
* add argument to track resource usage

* fix bug

* fix a bug in a multi gpu case

* use total cpu usage

* add unit test

* add mark to unit test

* cover edge case

* add pynvml in requirement

* align with pre-commit

* add license comment

* update changelog

* refine argument help

* align with pre-commit

* add version to requirement and raise an error if not supported values are given
  • Loading branch information
eunwoosh committed Oct 19, 2023
1 parent df41ae6 commit c9f730f
Show file tree
Hide file tree
Showing 4 changed files with 581 additions and 66 deletions.
19 changes: 14 additions & 5 deletions otx/cli/tools/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from otx.api.usecases.adapters.model_adapter import ModelAdapter
from otx.cli.manager import ConfigManager
from otx.cli.manager.config_manager import TASK_TYPE_TO_SUB_DIR_NAME
from otx.cli.utils.experiment import ResourceTracker
from otx.cli.utils.hpo import run_hpo
from otx.cli.utils.importing import get_impl_class
from otx.cli.utils.io import read_binary, read_label_schema, save_model_data
Expand All @@ -40,7 +41,6 @@
get_parser_and_hprams_data,
)
from otx.cli.utils.report import get_otx_report
from otx.cli.utils.experiment import run_process_to_check_resource
from otx.core.data.adapter import get_dataset_adapter


Expand Down Expand Up @@ -141,7 +141,11 @@ def get_args():
)
parser.add_argument(
"--track-resource-usage",
action="store_true",
type=str,
default=None,
help="Track resources utilization and max memory usage and save values at the output path. "
"The possible options are 'cpu', 'gpu' or you can set to a comma-separated list of resource types. "
"And 'all' is also available for choosing all resource types.",
)

sub_parser = add_hyper_parameters_sub_parser(parser, hyper_parameters, return_sub_parser=True)
Expand Down Expand Up @@ -248,15 +252,20 @@ def train(exit_stack: Optional[ExitStack] = None): # pylint: disable=too-many-b
"if main process raises an error, all processes can be stuck."
)

if args.track_resource_usage:
run_process_to_check_resource(config_manager.output_path, exit_stack)

task = task_class(task_environment=environment, output_path=str(config_manager.output_path / "logs"))

output_model = ModelEntity(dataset, environment.get_model_configuration())

resource_tracker = None
if args.track_resource_usage and not is_multigpu_child_process():
resource_tracker = ResourceTracker(args.track_resource_usage, args.gpus)
resource_tracker.start()

task.train(dataset, output_model, train_parameters=TrainParameters())

if resource_tracker is not None:
resource_tracker.stop(config_manager.output_path / "resource_usage.yaml")

model_path = config_manager.output_path / "models"
save_model_data(output_model, str(model_path))

Expand Down
322 changes: 261 additions & 61 deletions otx/cli/utils/experiment.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,284 @@
"""Utils function for experiments"""
"""Utils function for experiments."""
# Copyright (C) 2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
#

import logging
import multiprocessing as mp
import pynvml
import psutil
import yaml
import os
from contextlib import ExitStack
from typing import Union
import time
from abc import ABC, abstractmethod
from pathlib import Path
from statistics import mean
from typing import Any, Dict, List, Optional, Union, no_type_check

import psutil
import yaml

def run_process_to_check_resource(output_dir: Union[str, Path], exit_stack: ExitStack):
if isinstance(output_dir, str):
output_dir = Path(output_dir)
try:
import pynvml
except ImportError:
pynvml = None

gpu_used = os.environ.get("CUDA_VISIBLE_DEVICES", 0)
logger = logging.getLogger(__name__)
GIB = 1024**3
AVAILABLE_RESOURCE_TYPE = ["cpu", "gpu"]

queue = mp.Queue()
mem_check_p = mp.Process(target=check_resource, args=(queue, gpu_used))
mem_check_p.start()

exit_stack.callback(mem_check_proc_callback, mem_check_p, queue, output_dir)
class ResourceTracker:
"""Class to track resources usage.
Args:
resource_type (str, optional): Which resource to track. Available values are cpu, gpu or all now.
Defaults to "all".
gpu_ids (Optional[str]): GPU indices to record.
"""

def mem_check_proc_callback(mem_check_p, queue, output_dir):
queue.put(output_dir)
mem_check_p.join(10)
if mem_check_p.exitcode is None:
mem_check_p.terminate()
mem_check_p.close()
def __init__(self, resource_type: str = "all", gpu_ids: Optional[str] = None):
if resource_type == "all":
self._resource_type = AVAILABLE_RESOURCE_TYPE
else:
self._resource_type = [val for val in resource_type.split(",")]

gpu_ids_arr = None
if gpu_ids is not None:
gpu_ids_arr = [int(idx) for idx in gpu_ids.split(",")]
gpu_ids_arr[0] = 0

def check_resource(queue: mp.Queue, gpu_idx: int = 0):
pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(gpu_idx)
max_gpu_mem = 0
avg_gpu_util = 0
max_cpu_mem = 0
avg_cpu_util = 0
gib = 1024**3
target_process = psutil.Process().parent()
self._gpu_ids: Union[List[int], None] = gpu_ids_arr
self._mem_check_proc: Union[mp.Process, None] = None
self._queue: Union[mp.Queue, None] = None

num_counts = 0
while True:
# gpu util
gpu_info = pynvml.nvmlDeviceGetUtilizationRates(handle)
avg_gpu_util += gpu_info.gpu
num_counts += 1
def start(self):
"""Run a process which tracks resources usage."""
if self._mem_check_proc is not None:
logger.warning("Resource tracker started already. Please execute start after executing stop.")
return

# gpu mem
gpu_mem = pynvml.nvmlDeviceGetMemoryInfo(handle)
mem_used = gpu_mem.used / gib
if max_gpu_mem < mem_used:
max_gpu_mem = mem_used
self._queue = mp.Queue()
self._mem_check_proc = mp.Process(
target=_check_resource, args=(self._queue, self._resource_type, self._gpu_ids)
)
self._mem_check_proc.start()

# cpu mem
# cpu_mem = psutil.virtual_memory()[3] / gib
# cpu_mem = target_process.memory_percent()
cpu_mem = target_process.memory_info().rss / gib
if max_cpu_mem < cpu_mem:
max_cpu_mem = cpu_mem
def stop(self, output_path: Union[str, Path]):
"""Terminate a process to record resources usage.
# cpu util
cpu_percent = target_process.cpu_percent()
avg_cpu_util += cpu_percent
Args:
output_path (Union[str, Path]): Output file path to save CPU & GPU utilization and max meory usage values.
"""
if self._mem_check_proc is None or not self._mem_check_proc.is_alive():
return

if isinstance(output_path, str):
output_path = Path(output_path)

self._queue.put(output_path)
self._mem_check_proc.join(10)
if self._mem_check_proc.exitcode is None:
self._mem_check_proc.terminate()
self._mem_check_proc.close()

self._mem_check_proc = None
self._queue = None


def _check_resource(queue: mp.Queue, resource_types: Optional[List[str]] = None, gpu_ids: Optional[List[int]] = None):
if resource_types is None:
resource_types = []

trackers: Dict[str, ResourceRecorder] = {}
for resource_type in resource_types:
if resource_type == "cpu":
trackers[resource_type] = CpuUsageRecorder()
elif resource_type == "gpu":
if pynvml is None:
logger.warning("GPU can't be found. Tracking GPU usage is skipped.")
continue
trackers[resource_type] = GpuUsageRecorder(gpu_ids)
else:
raise ValueError(
"Resource type {} isn't supported now. Current available types are cpu and gpu.".format(resource_type)
)

if not trackers:
logger.warning("There is no resource to record.")
return

while True:
for tracker in trackers.values():
tracker.record()

if not queue.empty():
break

pynvml.nvmlShutdown()
time.sleep(0.01)

output_path = Path(queue.get())

with (output_path / "resource_usage.yaml").open("w") as f:
yaml.dump(
{
"max_cpu_mem(GiB)" : round(max_cpu_mem, 2),
"avg_cpu_util(%)" : round(avg_cpu_util / num_counts, 2),
"max_gpu_mem(GiB)" : round(max_gpu_mem, 2),
"avg_gpu_util(%)" : round(avg_gpu_util / num_counts, 2),
},
f,
default_flow_style=False
)
resource_record = {resource_type: tracker.report() for resource_type, tracker in trackers.items()}
with output_path.open("w") as f:
yaml.dump(resource_record, f, default_flow_style=False)


class ResourceRecorder(ABC):
"""Base calss for each resource recorder."""

@abstractmethod
def record(self):
"""Record a resource usage."""
raise NotImplementedError

@abstractmethod
def report(self):
"""Aggregate all resource usages."""
raise NotImplementedError


class CpuUsageRecorder(ResourceRecorder):
"""CPU usage recorder class.
Args:
target_process Optional[psutil.Process]: Process to track.
"""

def __init__(self):
self._record_count: int = 0
self._max_mem: Union[int, float] = 0
self._avg_util: Union[int, float] = 0
self._first_record = True

def record(self):
"""Record CPU usage."""
# cpu mem
memory_info = psutil.virtual_memory()
cpu_mem = (memory_info.total - memory_info.available) / GIB
if self._max_mem < cpu_mem:
self._max_mem = cpu_mem

# cpu util
cpu_percent = psutil.cpu_percent()
if self._first_record:
self._first_record = False
else:
self._avg_util += cpu_percent
self._record_count += 1

def report(self) -> Dict[str, str]:
"""Aggregate CPU usage."""
if self._record_count == 0:
return {}

return {
"max_memory_usage": f"{round(self._max_mem, 2)} GiB",
"avg_util": f"{round(self._avg_util / self._record_count, 2)} %",
}


class GpuUsageRecorder(ResourceRecorder):
"""GPU usage recorder class.
Args:
gpu_ids Optional[List[int]]: GPU indices to record. If not given, first GPU is recorded.
"""

def __init__(self, gpu_ids: Optional[List[int]] = None):
if gpu_ids is None:
gpu_ids = [0]

self._record: Dict[int, Dict[str, Union[int, List[int]]]] = {}
self._gpu_handlers: Dict[int, Any] = {}

pynvml.nvmlInit()
gpu_to_track = self._get_gpu_to_track(gpu_ids)
for gpu_idx in gpu_to_track:
self._record[gpu_idx] = {"max_mem": 0, "util_record": []}
self._gpu_handlers[gpu_idx] = pynvml.nvmlDeviceGetHandleByIndex(gpu_idx)

def _get_gpu_to_track(self, gpu_ids: List[int]) -> List[int]:
if "CUDA_VISIBLE_DEVICES" in os.environ:
avaiable_gpus = [int(idx) for idx in os.environ["CUDA_VISIBLE_DEVICES"].split(",")]
else:
avaiable_gpus = list(range(pynvml.nvmlDeviceGetCount()))
return [avaiable_gpus[gpu_idx] for gpu_idx in gpu_ids]

def record(self):
"""Record GPU usage."""
for gpu_idx, record in self._record.items():
# gpu util
gpu_info = pynvml.nvmlDeviceGetUtilizationRates(self._gpu_handlers[gpu_idx])
record["util_record"].append(gpu_info.gpu)

# gpu mem
gpu_mem = pynvml.nvmlDeviceGetMemoryInfo(self._gpu_handlers[gpu_idx])
mem_used = gpu_mem.used / GIB
if record["max_mem"] < mem_used:
record["max_mem"] = mem_used

@no_type_check
def report(self) -> Dict[str, str]:
"""Aggregate GPU usage."""
if not list(self._record.values())[0]["util_record"]: # record isn't called
return {}

total_max_mem = 0
total_avg_util = 0
gpus_record = self._record.copy()
for gpu_idx in list(gpus_record.keys()):
max_mem = gpus_record[gpu_idx]["max_mem"]
if total_max_mem < max_mem:
total_max_mem = max_mem

# Count utilization after it becomes bigger than 20% of max utilization
max_util = max(gpus_record[gpu_idx]["util_record"])
for idx, util in enumerate(gpus_record[gpu_idx]["util_record"]):
if util * 5 > max_util:
break
avg_util = mean(gpus_record[gpu_idx]["util_record"][idx:])
total_avg_util += avg_util

gpus_record[f"gpu_{gpu_idx}"] = {
"avg_util": f"{round(avg_util, 2)} %",
"max_mem": f"{round(max_mem, 2)} GiB",
}
del gpus_record[gpu_idx]

gpus_record["total_avg_util"] = f"{round(total_avg_util / len(gpus_record), 2)} %"
gpus_record["total_max_mem"] = f"{round(total_max_mem, 2)} GiB"

return gpus_record

def __del__(self):
"""Shutdown nvml."""
pynvml.nvmlShutdown()


def set_arguments_to_cmd(command: List[str], key: str, value: Optional[str] = None, start_idx: int = 0, after_params: bool = False):
"""Add arguments at proper position in command.
Args:
keys (str): arguement key.
value (str or None): argument value.
command (List[str]): list includng a otx command entry and arguments.
start_idx (int, optional): find a position to put arguments in after this value. Defaults to 0.
after_params (bool): whether argument should be after `param` or not.
"""
if key in command:
if value is not None:
command[command.index(key) + 1] = value
return

delimiters = ["demo", "deploy", "eval", "explain", "export", "find", "train", "optimize", "build", "run"]
if not after_params:
delimiters.append("params")

for i in range(start_idx, len(command)):
if command[i] in delimiters:
if value is not None:
command.insert(i, value)
command.insert(i, key)
break
else:
command.append(key)
if value is not None:
command.append(value)
Loading

0 comments on commit c9f730f

Please sign in to comment.