-
Notifications
You must be signed in to change notification settings - Fork 443
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add new argument to track CPU&GPU utilization and memory usage (#2500)
* add argument to track resource usage * fix bug * fix a bug in a multi gpu case * use total cpu usage * add unit test * add mark to unit test * cover edge case * add pynvml in requirement * align with pre-commit * add license comment * update changelog * refine argument help * align with pre-commit * add version to requirement and raise an error if not supported values are given
- Loading branch information
Showing
2 changed files
with
266 additions
and
66 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,84 +1,284 @@ | ||
"""Utils function for experiments""" | ||
"""Utils function for experiments.""" | ||
# Copyright (C) 2023 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
# | ||
|
||
import logging | ||
import multiprocessing as mp | ||
import pynvml | ||
import psutil | ||
import yaml | ||
import os | ||
from contextlib import ExitStack | ||
from typing import Union | ||
import time | ||
from abc import ABC, abstractmethod | ||
from pathlib import Path | ||
from statistics import mean | ||
from typing import Any, Dict, List, Optional, Union, no_type_check | ||
|
||
import psutil | ||
import yaml | ||
|
||
def run_process_to_check_resource(output_dir: Union[str, Path], exit_stack: ExitStack): | ||
if isinstance(output_dir, str): | ||
output_dir = Path(output_dir) | ||
try: | ||
import pynvml | ||
except ImportError: | ||
pynvml = None | ||
|
||
gpu_used = os.environ.get("CUDA_VISIBLE_DEVICES", 0) | ||
logger = logging.getLogger(__name__) | ||
GIB = 1024**3 | ||
AVAILABLE_RESOURCE_TYPE = ["cpu", "gpu"] | ||
|
||
queue = mp.Queue() | ||
mem_check_p = mp.Process(target=check_resource, args=(queue, gpu_used)) | ||
mem_check_p.start() | ||
|
||
exit_stack.callback(mem_check_proc_callback, mem_check_p, queue, output_dir) | ||
class ResourceTracker: | ||
"""Class to track resources usage. | ||
Args: | ||
resource_type (str, optional): Which resource to track. Available values are cpu, gpu or all now. | ||
Defaults to "all". | ||
gpu_ids (Optional[str]): GPU indices to record. | ||
""" | ||
|
||
def mem_check_proc_callback(mem_check_p, queue, output_dir): | ||
queue.put(output_dir) | ||
mem_check_p.join(10) | ||
if mem_check_p.exitcode is None: | ||
mem_check_p.terminate() | ||
mem_check_p.close() | ||
def __init__(self, resource_type: str = "all", gpu_ids: Optional[str] = None): | ||
if resource_type == "all": | ||
self._resource_type = AVAILABLE_RESOURCE_TYPE | ||
else: | ||
self._resource_type = [val for val in resource_type.split(",")] | ||
|
||
gpu_ids_arr = None | ||
if gpu_ids is not None: | ||
gpu_ids_arr = [int(idx) for idx in gpu_ids.split(",")] | ||
gpu_ids_arr[0] = 0 | ||
|
||
def check_resource(queue: mp.Queue, gpu_idx: int = 0): | ||
pynvml.nvmlInit() | ||
handle = pynvml.nvmlDeviceGetHandleByIndex(gpu_idx) | ||
max_gpu_mem = 0 | ||
avg_gpu_util = 0 | ||
max_cpu_mem = 0 | ||
avg_cpu_util = 0 | ||
gib = 1024**3 | ||
target_process = psutil.Process().parent() | ||
self._gpu_ids: Union[List[int], None] = gpu_ids_arr | ||
self._mem_check_proc: Union[mp.Process, None] = None | ||
self._queue: Union[mp.Queue, None] = None | ||
|
||
num_counts = 0 | ||
while True: | ||
# gpu util | ||
gpu_info = pynvml.nvmlDeviceGetUtilizationRates(handle) | ||
avg_gpu_util += gpu_info.gpu | ||
num_counts += 1 | ||
def start(self): | ||
"""Run a process which tracks resources usage.""" | ||
if self._mem_check_proc is not None: | ||
logger.warning("Resource tracker started already. Please execute start after executing stop.") | ||
return | ||
|
||
# gpu mem | ||
gpu_mem = pynvml.nvmlDeviceGetMemoryInfo(handle) | ||
mem_used = gpu_mem.used / gib | ||
if max_gpu_mem < mem_used: | ||
max_gpu_mem = mem_used | ||
self._queue = mp.Queue() | ||
self._mem_check_proc = mp.Process( | ||
target=_check_resource, args=(self._queue, self._resource_type, self._gpu_ids) | ||
) | ||
self._mem_check_proc.start() | ||
|
||
# cpu mem | ||
# cpu_mem = psutil.virtual_memory()[3] / gib | ||
# cpu_mem = target_process.memory_percent() | ||
cpu_mem = target_process.memory_info().rss / gib | ||
if max_cpu_mem < cpu_mem: | ||
max_cpu_mem = cpu_mem | ||
def stop(self, output_path: Union[str, Path]): | ||
"""Terminate a process to record resources usage. | ||
# cpu util | ||
cpu_percent = target_process.cpu_percent() | ||
avg_cpu_util += cpu_percent | ||
Args: | ||
output_path (Union[str, Path]): Output file path to save CPU & GPU utilization and max meory usage values. | ||
""" | ||
if self._mem_check_proc is None or not self._mem_check_proc.is_alive(): | ||
return | ||
|
||
if isinstance(output_path, str): | ||
output_path = Path(output_path) | ||
|
||
self._queue.put(output_path) | ||
self._mem_check_proc.join(10) | ||
if self._mem_check_proc.exitcode is None: | ||
self._mem_check_proc.terminate() | ||
self._mem_check_proc.close() | ||
|
||
self._mem_check_proc = None | ||
self._queue = None | ||
|
||
|
||
def _check_resource(queue: mp.Queue, resource_types: Optional[List[str]] = None, gpu_ids: Optional[List[int]] = None): | ||
if resource_types is None: | ||
resource_types = [] | ||
|
||
trackers: Dict[str, ResourceRecorder] = {} | ||
for resource_type in resource_types: | ||
if resource_type == "cpu": | ||
trackers[resource_type] = CpuUsageRecorder() | ||
elif resource_type == "gpu": | ||
if pynvml is None: | ||
logger.warning("GPU can't be found. Tracking GPU usage is skipped.") | ||
continue | ||
trackers[resource_type] = GpuUsageRecorder(gpu_ids) | ||
else: | ||
raise ValueError( | ||
"Resource type {} isn't supported now. Current available types are cpu and gpu.".format(resource_type) | ||
) | ||
|
||
if not trackers: | ||
logger.warning("There is no resource to record.") | ||
return | ||
|
||
while True: | ||
for tracker in trackers.values(): | ||
tracker.record() | ||
|
||
if not queue.empty(): | ||
break | ||
|
||
pynvml.nvmlShutdown() | ||
time.sleep(0.01) | ||
|
||
output_path = Path(queue.get()) | ||
|
||
with (output_path / "resource_usage.yaml").open("w") as f: | ||
yaml.dump( | ||
{ | ||
"max_cpu_mem(GiB)" : round(max_cpu_mem, 2), | ||
"avg_cpu_util(%)" : round(avg_cpu_util / num_counts, 2), | ||
"max_gpu_mem(GiB)" : round(max_gpu_mem, 2), | ||
"avg_gpu_util(%)" : round(avg_gpu_util / num_counts, 2), | ||
}, | ||
f, | ||
default_flow_style=False | ||
) | ||
resource_record = {resource_type: tracker.report() for resource_type, tracker in trackers.items()} | ||
with output_path.open("w") as f: | ||
yaml.dump(resource_record, f, default_flow_style=False) | ||
|
||
|
||
class ResourceRecorder(ABC): | ||
"""Base calss for each resource recorder.""" | ||
|
||
@abstractmethod | ||
def record(self): | ||
"""Record a resource usage.""" | ||
raise NotImplementedError | ||
|
||
@abstractmethod | ||
def report(self): | ||
"""Aggregate all resource usages.""" | ||
raise NotImplementedError | ||
|
||
|
||
class CpuUsageRecorder(ResourceRecorder): | ||
"""CPU usage recorder class. | ||
Args: | ||
target_process Optional[psutil.Process]: Process to track. | ||
""" | ||
|
||
def __init__(self): | ||
self._record_count: int = 0 | ||
self._max_mem: Union[int, float] = 0 | ||
self._avg_util: Union[int, float] = 0 | ||
self._first_record = True | ||
|
||
def record(self): | ||
"""Record CPU usage.""" | ||
# cpu mem | ||
memory_info = psutil.virtual_memory() | ||
cpu_mem = (memory_info.total - memory_info.available) / GIB | ||
if self._max_mem < cpu_mem: | ||
self._max_mem = cpu_mem | ||
|
||
# cpu util | ||
cpu_percent = psutil.cpu_percent() | ||
if self._first_record: | ||
self._first_record = False | ||
else: | ||
self._avg_util += cpu_percent | ||
self._record_count += 1 | ||
|
||
def report(self) -> Dict[str, str]: | ||
"""Aggregate CPU usage.""" | ||
if self._record_count == 0: | ||
return {} | ||
|
||
return { | ||
"max_memory_usage": f"{round(self._max_mem, 2)} GiB", | ||
"avg_util": f"{round(self._avg_util / self._record_count, 2)} %", | ||
} | ||
|
||
|
||
class GpuUsageRecorder(ResourceRecorder): | ||
"""GPU usage recorder class. | ||
Args: | ||
gpu_ids Optional[List[int]]: GPU indices to record. If not given, first GPU is recorded. | ||
""" | ||
|
||
def __init__(self, gpu_ids: Optional[List[int]] = None): | ||
if gpu_ids is None: | ||
gpu_ids = [0] | ||
|
||
self._record: Dict[int, Dict[str, Union[int, List[int]]]] = {} | ||
self._gpu_handlers: Dict[int, Any] = {} | ||
|
||
pynvml.nvmlInit() | ||
gpu_to_track = self._get_gpu_to_track(gpu_ids) | ||
for gpu_idx in gpu_to_track: | ||
self._record[gpu_idx] = {"max_mem": 0, "util_record": []} | ||
self._gpu_handlers[gpu_idx] = pynvml.nvmlDeviceGetHandleByIndex(gpu_idx) | ||
|
||
def _get_gpu_to_track(self, gpu_ids: List[int]) -> List[int]: | ||
if "CUDA_VISIBLE_DEVICES" in os.environ: | ||
avaiable_gpus = [int(idx) for idx in os.environ["CUDA_VISIBLE_DEVICES"].split(",")] | ||
else: | ||
avaiable_gpus = list(range(pynvml.nvmlDeviceGetCount())) | ||
return [avaiable_gpus[gpu_idx] for gpu_idx in gpu_ids] | ||
|
||
def record(self): | ||
"""Record GPU usage.""" | ||
for gpu_idx, record in self._record.items(): | ||
# gpu util | ||
gpu_info = pynvml.nvmlDeviceGetUtilizationRates(self._gpu_handlers[gpu_idx]) | ||
record["util_record"].append(gpu_info.gpu) | ||
|
||
# gpu mem | ||
gpu_mem = pynvml.nvmlDeviceGetMemoryInfo(self._gpu_handlers[gpu_idx]) | ||
mem_used = gpu_mem.used / GIB | ||
if record["max_mem"] < mem_used: | ||
record["max_mem"] = mem_used | ||
|
||
@no_type_check | ||
def report(self) -> Dict[str, str]: | ||
"""Aggregate GPU usage.""" | ||
if not list(self._record.values())[0]["util_record"]: # record isn't called | ||
return {} | ||
|
||
total_max_mem = 0 | ||
total_avg_util = 0 | ||
gpus_record = self._record.copy() | ||
for gpu_idx in list(gpus_record.keys()): | ||
max_mem = gpus_record[gpu_idx]["max_mem"] | ||
if total_max_mem < max_mem: | ||
total_max_mem = max_mem | ||
|
||
# Count utilization after it becomes bigger than 20% of max utilization | ||
max_util = max(gpus_record[gpu_idx]["util_record"]) | ||
for idx, util in enumerate(gpus_record[gpu_idx]["util_record"]): | ||
if util * 5 > max_util: | ||
break | ||
avg_util = mean(gpus_record[gpu_idx]["util_record"][idx:]) | ||
total_avg_util += avg_util | ||
|
||
gpus_record[f"gpu_{gpu_idx}"] = { | ||
"avg_util": f"{round(avg_util, 2)} %", | ||
"max_mem": f"{round(max_mem, 2)} GiB", | ||
} | ||
del gpus_record[gpu_idx] | ||
|
||
gpus_record["total_avg_util"] = f"{round(total_avg_util / len(gpus_record), 2)} %" | ||
gpus_record["total_max_mem"] = f"{round(total_max_mem, 2)} GiB" | ||
|
||
return gpus_record | ||
|
||
def __del__(self): | ||
"""Shutdown nvml.""" | ||
pynvml.nvmlShutdown() | ||
|
||
|
||
def set_arguments_to_cmd(command: List[str], key: str, value: Optional[str] = None, start_idx: int = 0, after_params: bool = False): | ||
"""Add arguments at proper position in command. | ||
Args: | ||
keys (str): arguement key. | ||
value (str or None): argument value. | ||
command (List[str]): list includng a otx command entry and arguments. | ||
start_idx (int, optional): find a position to put arguments in after this value. Defaults to 0. | ||
after_params (bool): whether argument should be after `param` or not. | ||
""" | ||
if key in command: | ||
if value is not None: | ||
command[command.index(key) + 1] = value | ||
return | ||
|
||
delimiters = ["demo", "deploy", "eval", "explain", "export", "find", "train", "optimize", "build", "run"] | ||
if not after_params: | ||
delimiters.append("params") | ||
|
||
for i in range(start_idx, len(command)): | ||
if command[i] in delimiters: | ||
if value is not None: | ||
command.insert(i, value) | ||
command.insert(i, key) | ||
break | ||
else: | ||
command.append(key) | ||
if value is not None: | ||
command.append(value) |