From 575aedb34178c2c5b171c8f633c29aca92285641 Mon Sep 17 00:00:00 2001 From: Minsoo Patrick Kang Date: Mon, 5 Feb 2024 00:39:57 -0800 Subject: [PATCH 01/10] Timer function wrapper and instant timer --- tests/util/timer.py | 123 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 tests/util/timer.py diff --git a/tests/util/timer.py b/tests/util/timer.py new file mode 100644 index 00000000..96f481f2 --- /dev/null +++ b/tests/util/timer.py @@ -0,0 +1,123 @@ +import logging +import time +import functools + +import torch +import psutil +import os + + +def get_gpu_memory(device_index=None): + return torch.cuda.memory_allocated(device_index) + + +def get_gpu_max_memory(device_index=None): + return torch.cuda.max_memory_allocated(device_index) + + +def get_device_memory(): + process = psutil.Process(os.getpid()) + return process.memory_info().rss + + +def get_cpu_swap_memory(): + return psutil.swap_memory().used + + +class FunctionTimer: + def __init__(self, host_timer=False): + self.host_timer = host_timer + self.log = {} + + def __call__(self, func): + def _host_timer_wrapper(*args, **kwargs): + before_memory = get_device_memory() + start_time = time.time() + result = func(*args, **kwargs) + end_time = time.time() + after_memory = get_device_memory() + self.log[func.__name__] = { + "timer": end_time - start_time, + "cpu_memory": (before_memory - after_memory) >> 20, + } + return result + + def _device_timer_wrapper(*args, **kwargs): + before_memory = get_gpu_memory() + start_event = torch.cuda.Event(enable_timing=True) + start_event.record() + result = func(*args, **kwargs) + end_event = torch.cuda.Event(enable_timing=True) + end_event.record() + after_memory = get_gpu_memory() + torch.cuda.current_stream().wait_event(end_event) + torch.cuda.synchronize() + self.log[func.__name__] = { + "timer": start_event.elapsed_time(end_event), + "gpu_memory": (before_memory - after_memory) >> 20, + } + return result + + @functools.wraps(func) + def wrapper(*args, **kwargs): + if self.host_timer: + return _host_timer_wrapper(*args, **kwargs) + return _device_timer_wrapper(*args, **kwargs) + + def get_log(self): + return self.log + + +class Timer: + def __init__(self, host_timer=False): + self.host_timer = host_timer + self.timers = { + "cpu": {}, + "gpu": {}, + } + self.timer_info = {} # synchronized. + self.is_synchromized = False + + def start_timer(self, name, host_timer=False): + if host_timer: + if name in self.timers["cpu"]: + logging.warning(f"timer for {name} already exist") + return + start_time = time.time() + self.timers["cpu"][name] = [start_time] + else: + self.is_synchromized = False + + if name in self.timers["gpu"]: + logging.warning(f"timer for {name} already exist") + return + start_event = torch.cuda.Event(enable_timing=True) + start_event.record() + self.timers["gpu"][name] = [start_event] + + def stop_timer(self, name): + if name in self.timers["cpu"]: + end_time = time.time() + self.timers["cpu"][name].append(end_time) + if name in self.timers["gpu"]: + self.is_synchromized = False + end_event = torch.cuda.Event(enable_timing=True) + end_event.record() + self.timers["gpu"][name].append(end_event) + + def _calculate_elapse_time(self): + for name, timer in self.timers["cpu"].items(): + assert len(timer) == 2 + self.timer_info[name] = (timer[1] - timer[0]) * 1000 + if not self.is_synchromized: + for name, events in self.timers["gpu"].items(): + assert len(events) == 2 + torch.cuda.current_stream().wait_event(events[1]) + torch.cuda.synchronize() + self.timer_info[name] = events[0].elapsed_time(events[1]) + self.is_synchromized = True + + def get_timer_info(self): + if not self.is_synchromized: + self._calculate_elapse_time() + return self.timer_info From f6e29817bf61317dfbc877a4eb7eb4bd8b97e42a Mon Sep 17 00:00:00 2001 From: Minsoo Patrick Kang Date: Mon, 5 Feb 2024 00:45:46 -0800 Subject: [PATCH 02/10] Timer function wrapper and instant timer --- tests/util/timer.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/util/timer.py b/tests/util/timer.py index 96f481f2..346464ac 100644 --- a/tests/util/timer.py +++ b/tests/util/timer.py @@ -69,8 +69,7 @@ def get_log(self): class Timer: - def __init__(self, host_timer=False): - self.host_timer = host_timer + def __init__(self): self.timers = { "cpu": {}, "gpu": {}, @@ -86,11 +85,10 @@ def start_timer(self, name, host_timer=False): start_time = time.time() self.timers["cpu"][name] = [start_time] else: - self.is_synchromized = False - if name in self.timers["gpu"]: logging.warning(f"timer for {name} already exist") return + self.is_synchromized = False start_event = torch.cuda.Event(enable_timing=True) start_event.record() self.timers["gpu"][name] = [start_event] @@ -117,7 +115,7 @@ def _calculate_elapse_time(self): self.timer_info[name] = events[0].elapsed_time(events[1]) self.is_synchromized = True - def get_timer_info(self): + def get_info(self): if not self.is_synchromized: self._calculate_elapse_time() return self.timer_info From 45ea001bb1ad47f58387810928fadc834a3a12ae Mon Sep 17 00:00:00 2001 From: Minsoo Patrick Kang Date: Tue, 6 Feb 2024 01:14:28 -0800 Subject: [PATCH 03/10] comments --- tests/util/timer.py | 74 +++++++++++++++++++++++++++------------------ 1 file changed, 44 insertions(+), 30 deletions(-) diff --git a/tests/util/timer.py b/tests/util/timer.py index 346464ac..78ee69ca 100644 --- a/tests/util/timer.py +++ b/tests/util/timer.py @@ -29,40 +29,54 @@ def __init__(self, host_timer=False): self.host_timer = host_timer self.log = {} - def __call__(self, func): - def _host_timer_wrapper(*args, **kwargs): - before_memory = get_device_memory() - start_time = time.time() - result = func(*args, **kwargs) - end_time = time.time() - after_memory = get_device_memory() - self.log[func.__name__] = { - "timer": end_time - start_time, - "cpu_memory": (before_memory - after_memory) >> 20, - } - return result - - def _device_timer_wrapper(*args, **kwargs): - before_memory = get_gpu_memory() - start_event = torch.cuda.Event(enable_timing=True) - start_event.record() - result = func(*args, **kwargs) - end_event = torch.cuda.Event(enable_timing=True) - end_event.record() - after_memory = get_gpu_memory() - torch.cuda.current_stream().wait_event(end_event) - torch.cuda.synchronize() - self.log[func.__name__] = { - "timer": start_event.elapsed_time(end_event), - "gpu_memory": (before_memory - after_memory) >> 20, - } - return result + def __call__(self, label_or_func): + if isinstance(label_or_func, str): + label = label_or_func + def decorator(func): + return self._wrap_function(func, label) + + return decorator + + else: + func = label_or_func + return self._wrap_function(func, func.__name__) + + def _wrap_function(self, func, label): @functools.wraps(func) def wrapper(*args, **kwargs): if self.host_timer: - return _host_timer_wrapper(*args, **kwargs) - return _device_timer_wrapper(*args, **kwargs) + return self._device_timer_wrapper(func, label, *args, **kwargs) + # device timer. + return self._device_timer_wrapper(func, label, *args, **kwargs) + + def _host_timer_wrapper(self, func, label, *args, **kwargs): + before_memory = get_device_memory() + start_time = time.time() + result = func(*args, **kwargs) + end_time = time.time() + after_memory = get_device_memory() + self.log[label] = { + "timer": end_time - start_time, + "cpu_memory": (before_memory - after_memory) >> 20, + } + return result + + def _device_timer_wrapper(self, func, label, *args, **kwargs): + before_memory = get_gpu_memory() + start_event = torch.cuda.Event(enable_timing=True) + start_event.record() + result = func(*args, **kwargs) + end_event = torch.cuda.Event(enable_timing=True) + end_event.record() + after_memory = get_gpu_memory() + torch.cuda.current_stream().wait_event(end_event) + torch.cuda.synchronize() + self.log[label] = { + "timer": start_event.elapsed_time(end_event), + "gpu_memory": (before_memory - after_memory) >> 20, + } + return result def get_log(self): return self.log From 60c3696f21ef70a953cdfa8bc01fcabb9e10750a Mon Sep 17 00:00:00 2001 From: Minsoo Patrick Kang Date: Sun, 18 Feb 2024 23:46:22 -0500 Subject: [PATCH 04/10] fix --- analog/analog.py | 3 + analog/analysis/influence_function.py | 3 + analog/logging/log_saver.py | 3 + analog/logging/logger.py | 7 + analog/timer/__init__.py | 1 + analog/timer/timer.py | 186 ++++++++++++++++++ .../mnist_influence/compute_influences.py | 3 + tests/util/timer.py | 135 ------------- 8 files changed, 206 insertions(+), 135 deletions(-) create mode 100644 analog/timer/__init__.py create mode 100644 analog/timer/timer.py delete mode 100644 tests/util/timer.py diff --git a/analog/analog.py b/analog/analog.py index b7e05973..d74ea089 100644 --- a/analog/analog.py +++ b/analog/analog.py @@ -17,6 +17,7 @@ from analog.lora import LoRAHandler from analog.lora.utils import is_lora from analog.state import AnaLogState +from analog.timer.timer import DeviceFunctionTimer from analog.utils import ( get_logger, get_rank, @@ -237,6 +238,7 @@ def __exit__(self, exc_type, exc_value, traceback) -> None: """ self.logger.update() + @DeviceFunctionTimer.timer def build_log_dataset(self): """ Constructs the log dataset from the stored logs. This dataset can then be used @@ -249,6 +251,7 @@ def build_log_dataset(self): log_dataset = LogDataset(log_dir=self.log_dir, config=self.influence_config) return log_dataset + @DeviceFunctionTimer.timer def build_log_dataloader( self, batch_size: int = 16, num_workers: int = 0, pin_memory: bool = False ): diff --git a/analog/analysis/influence_function.py b/analog/analysis/influence_function.py index 64df43aa..24d54f27 100644 --- a/analog/analysis/influence_function.py +++ b/analog/analysis/influence_function.py @@ -5,6 +5,7 @@ from einops import einsum, rearrange, reduce from analog.config import InfluenceConfig from analog.state import AnaLogState +from analog.timer.timer import DeviceFunctionTimer from analog.utils import get_logger, nested_dict from analog.analysis.utils import synchronize_device @@ -24,6 +25,7 @@ def __init__(self, config: InfluenceConfig, state: AnaLogState): self.influence_scores = pd.DataFrame() self.flatten = config.flatten + @DeviceFunctionTimer.timer @torch.no_grad() def precondition( self, @@ -212,6 +214,7 @@ def flatten_log(self, src): to_cat.append(log.view(bsz, -1)) return torch.cat(to_cat, dim=1) + @DeviceFunctionTimer.timer def compute_influence_all( self, src_log: Tuple[str, Dict[str, Dict[str, torch.Tensor]]], diff --git a/analog/logging/log_saver.py b/analog/logging/log_saver.py index 26b25408..e3326597 100644 --- a/analog/logging/log_saver.py +++ b/analog/logging/log_saver.py @@ -1,6 +1,7 @@ from concurrent.futures import ThreadPoolExecutor import torch +from analog.timer.timer import HostFunctionTimer, DeviceFunctionTimer from analog.utils import nested_dict, to_numpy, get_rank from analog.logging.mmap import MemoryMapHandler @@ -21,6 +22,7 @@ def __init__(self, config, state): self.buffer = nested_dict() self.buffer_size = 0 + @DeviceFunctionTimer.timer def buffer_write(self, binfo): """ Add log state on exit. @@ -85,6 +87,7 @@ def _flush_serialized(self, log_dir) -> str: del buffer_list return log_dir + @DeviceFunctionTimer.timer def flush(self) -> None: """ For the DefaultHandler, there's no batch operation needed since each add operation writes to the file. diff --git a/analog/logging/logger.py b/analog/logging/logger.py index 684bb156..edff8987 100644 --- a/analog/logging/logger.py +++ b/analog/logging/logger.py @@ -10,6 +10,7 @@ from analog.logging.option import LogOption from analog.logging.log_saver import LogSaver from analog.logging.utils import compute_per_sample_gradient +from analog.timer.timer import DeviceFunctionTimer from analog.utils import get_logger @@ -42,6 +43,7 @@ def __init__( self.grad_hooks = [] self.tensor_hooks = [] + @DeviceFunctionTimer.timer def log(self, data_id: Any, mask: Optional[torch.Tensor] = None): """ Add log state on exit. @@ -59,6 +61,7 @@ def log(self, data_id: Any, mask: Optional[torch.Tensor] = None): return log + @DeviceFunctionTimer.timer def update(self): # Update statistics for stat in self.opt.statistic["grad"]: @@ -82,6 +85,7 @@ def update(self): self.log_saver.buffer_write(binfo=self.binfo) self.log_saver.flush() + @DeviceFunctionTimer.timer def _forward_hook_fn( self, module: nn.Module, inputs: Tuple[torch.Tensor], module_name: str ) -> None: @@ -131,6 +135,7 @@ def _forward_hook_fn( cpu_offload=self.cpu_offload, ) + @DeviceFunctionTimer.timer def _backward_hook_fn( self, module: nn.Module, @@ -172,6 +177,7 @@ def _backward_hook_fn( cpu_offload=self.cpu_offload, ) + @DeviceFunctionTimer.timer def _grad_hook_fn( self, module: nn.Module, @@ -270,6 +276,7 @@ def _tensor_backward_hook_fn(self, grad: torch.Tensor, tensor_name: str) -> None cpu_offload=self.cpu_offload, ) + @DeviceFunctionTimer.timer def register_all_module_hooks(self) -> None: """ Register all module hooks. diff --git a/analog/timer/__init__.py b/analog/timer/__init__.py new file mode 100644 index 00000000..6187990c --- /dev/null +++ b/analog/timer/__init__.py @@ -0,0 +1 @@ +from .timer import FunctionTimer, Timer diff --git a/analog/timer/timer.py b/analog/timer/timer.py new file mode 100644 index 00000000..dd663d62 --- /dev/null +++ b/analog/timer/timer.py @@ -0,0 +1,186 @@ +import logging +import time +import functools + +import torch +import psutil +import os + + +def get_gpu_memory(device_index=None): + return torch.cuda.memory_allocated(device_index) + + +def get_gpu_max_memory(device_index=None): + return torch.cuda.max_memory_allocated(device_index) + + +def get_host_memory(): + process = psutil.Process(os.getpid()) + return process.memory_info().rss + + +def get_cpu_swap_memory(): + return psutil.swap_memory().used + + +class FunctionTimer: + log = {} + + @classmethod + def _wrap_function(cls, func, label, host_timer): + @functools.wraps(func) + def wrapper(*args, **kwargs): + if host_timer: + return cls._host_timer_wrapper(func, label, *args, **kwargs) + else: + return cls._device_timer_wrapper(func, label, *args, **kwargs) + + return wrapper + + @classmethod + def _host_timer_wrapper(cls, func, label, *args, **kwargs): + before_memory = get_host_memory() + start_time = time.time() + result = func(*args, **kwargs) + end_time = time.time() + after_memory = get_host_memory() + if label not in cls.log: + cls.log[label] = [ + { + "time_delta": end_time - start_time, + "memory_delta": (before_memory - after_memory) >> 20, + } + ] + else: + cls.log[label].append( + { + "time_delta": end_time - start_time, + "memory_delta": (before_memory - after_memory) >> 20, + } + ) + return result + + @classmethod + def _device_timer_wrapper(cls, func, label, *args, **kwargs): + before_memory = get_gpu_memory() + start_event = torch.cuda.Event(enable_timing=True) + start_event.record() + result = func(*args, **kwargs) + end_event = torch.cuda.Event(enable_timing=True) + end_event.record() + after_memory = get_gpu_memory() + torch.cuda.current_stream().wait_event(end_event) + torch.cuda.synchronize() + if label not in cls.log: + cls.log[label] = [ + { + "time_delta": start_event.elapsed_time(end_event) + / 1000, # turn to seconds + "memory_delta": (before_memory - after_memory) >> 20, + } + ] + else: + cls.log[label].append( + { + "time_delta": start_event.elapsed_time(end_event) + / 1000, # turn to seconds + "memory_delta": (before_memory - after_memory) >> 20, + } + ) + return result + + @classmethod + def timer(cls, label_or_func=None): + host_timer = getattr( + cls, "host_timer", False + ) # Fallback to False if not defined + + def decorator(func): + label = label_or_func if isinstance(label_or_func, str) else func.__name__ + return cls._wrap_function(func, label, host_timer) + + if callable(label_or_func): + return decorator(label_or_func) + return decorator + + @classmethod + def get_log(cls): + return cls.log + + @classmethod + def print_log(cls): + print("Function Timer Logs:") + for label, details in cls.log.items(): + print(f" {label}:") + sum_time = 0 + for log in details: + for key, value in log.items(): + if key == "time_delta": + sum_time += value + print(f" operation costs {sum_time} seconds") + + +class HostFunctionTimer(FunctionTimer): + host_timer = True + + +class DeviceFunctionTimer(FunctionTimer): + if torch.cuda.is_available(): + host_timer = False + else: + logging.warning("CUDA is not set, setting the timer is set to host timer.") + host_timer = True + + +class Timer: + def __init__(self): + self.timers = { + "cpu": {}, + "gpu": {}, + } + self.timer_info = {} # synchronized. + self.is_synchronized = False + + def start_timer(self, name, host_timer=False): + if host_timer: + if name in self.timers["cpu"]: + logging.warning(f"timer for {name} already exist") + return + start_time = time.time() + self.timers["cpu"][name] = [start_time] + else: + if name in self.timers["gpu"]: + logging.warning(f"timer for {name} already exist") + return + self.is_synchronized = False + start_event = torch.cuda.Event(enable_timing=True) + start_event.record() + self.timers["gpu"][name] = [start_event] + + def stop_timer(self, name): + if name in self.timers["cpu"]: + end_time = time.time() + self.timers["cpu"][name].append(end_time) + if name in self.timers["gpu"]: + self.is_synchronized = False + end_event = torch.cuda.Event(enable_timing=True) + end_event.record() + self.timers["gpu"][name].append(end_event) + + def _calculate_elapse_time(self): + for name, timer in self.timers["cpu"].items(): + assert len(timer) == 2 + self.timer_info[name] = (timer[1] - timer[0]) * 1000 + if not self.is_synchronized: + for name, events in self.timers["gpu"].items(): + assert len(events) == 2 + torch.cuda.current_stream().wait_event(events[1]) + torch.cuda.synchronize() + self.timer_info[name] = events[0].elapsed_time(events[1]) + self.is_synchronized = True + + def get_info(self): + if not self.is_synchronized: + self._calculate_elapse_time() + return self.timer_info diff --git a/examples/mnist_influence/compute_influences.py b/examples/mnist_influence/compute_influences.py index b84b8b11..10120a6b 100644 --- a/examples/mnist_influence/compute_influences.py +++ b/examples/mnist_influence/compute_influences.py @@ -11,6 +11,8 @@ construct_mlp, ) +from analog.timer import FunctionTimer + parser = argparse.ArgumentParser("MNIST Influence Analysis") parser.add_argument("--data", type=str, default="mnist", help="mnist or fmnist") parser.add_argument("--eval-idxs", type=int, nargs="+", default=[0]) @@ -79,6 +81,7 @@ ) _, top_influential_data = torch.topk(if_scores, k=10) +FunctionTimer.print_log() # Save if_scores = if_scores.cpu().numpy().tolist()[0] torch.save(if_scores, "if_analog.pt") diff --git a/tests/util/timer.py b/tests/util/timer.py deleted file mode 100644 index 78ee69ca..00000000 --- a/tests/util/timer.py +++ /dev/null @@ -1,135 +0,0 @@ -import logging -import time -import functools - -import torch -import psutil -import os - - -def get_gpu_memory(device_index=None): - return torch.cuda.memory_allocated(device_index) - - -def get_gpu_max_memory(device_index=None): - return torch.cuda.max_memory_allocated(device_index) - - -def get_device_memory(): - process = psutil.Process(os.getpid()) - return process.memory_info().rss - - -def get_cpu_swap_memory(): - return psutil.swap_memory().used - - -class FunctionTimer: - def __init__(self, host_timer=False): - self.host_timer = host_timer - self.log = {} - - def __call__(self, label_or_func): - if isinstance(label_or_func, str): - label = label_or_func - - def decorator(func): - return self._wrap_function(func, label) - - return decorator - - else: - func = label_or_func - return self._wrap_function(func, func.__name__) - - def _wrap_function(self, func, label): - @functools.wraps(func) - def wrapper(*args, **kwargs): - if self.host_timer: - return self._device_timer_wrapper(func, label, *args, **kwargs) - # device timer. - return self._device_timer_wrapper(func, label, *args, **kwargs) - - def _host_timer_wrapper(self, func, label, *args, **kwargs): - before_memory = get_device_memory() - start_time = time.time() - result = func(*args, **kwargs) - end_time = time.time() - after_memory = get_device_memory() - self.log[label] = { - "timer": end_time - start_time, - "cpu_memory": (before_memory - after_memory) >> 20, - } - return result - - def _device_timer_wrapper(self, func, label, *args, **kwargs): - before_memory = get_gpu_memory() - start_event = torch.cuda.Event(enable_timing=True) - start_event.record() - result = func(*args, **kwargs) - end_event = torch.cuda.Event(enable_timing=True) - end_event.record() - after_memory = get_gpu_memory() - torch.cuda.current_stream().wait_event(end_event) - torch.cuda.synchronize() - self.log[label] = { - "timer": start_event.elapsed_time(end_event), - "gpu_memory": (before_memory - after_memory) >> 20, - } - return result - - def get_log(self): - return self.log - - -class Timer: - def __init__(self): - self.timers = { - "cpu": {}, - "gpu": {}, - } - self.timer_info = {} # synchronized. - self.is_synchromized = False - - def start_timer(self, name, host_timer=False): - if host_timer: - if name in self.timers["cpu"]: - logging.warning(f"timer for {name} already exist") - return - start_time = time.time() - self.timers["cpu"][name] = [start_time] - else: - if name in self.timers["gpu"]: - logging.warning(f"timer for {name} already exist") - return - self.is_synchromized = False - start_event = torch.cuda.Event(enable_timing=True) - start_event.record() - self.timers["gpu"][name] = [start_event] - - def stop_timer(self, name): - if name in self.timers["cpu"]: - end_time = time.time() - self.timers["cpu"][name].append(end_time) - if name in self.timers["gpu"]: - self.is_synchromized = False - end_event = torch.cuda.Event(enable_timing=True) - end_event.record() - self.timers["gpu"][name].append(end_event) - - def _calculate_elapse_time(self): - for name, timer in self.timers["cpu"].items(): - assert len(timer) == 2 - self.timer_info[name] = (timer[1] - timer[0]) * 1000 - if not self.is_synchromized: - for name, events in self.timers["gpu"].items(): - assert len(events) == 2 - torch.cuda.current_stream().wait_event(events[1]) - torch.cuda.synchronize() - self.timer_info[name] = events[0].elapsed_time(events[1]) - self.is_synchromized = True - - def get_info(self): - if not self.is_synchromized: - self._calculate_elapse_time() - return self.timer_info From 81ce797a4dc892ed6d5c0495f197b400439bed77 Mon Sep 17 00:00:00 2001 From: Minsoo Patrick Kang Date: Mon, 19 Feb 2024 00:17:55 -0500 Subject: [PATCH 05/10] requirements --- requirements.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index 8a703764..cda9499b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,5 @@ numpy pandas torch einops - +psutil pyyaml - From b1d86d31a2176d2d9ee28318d4bb6a76ba58fdb3 Mon Sep 17 00:00:00 2001 From: Minsoo Patrick Kang Date: Tue, 20 Feb 2024 16:47:16 -0500 Subject: [PATCH 06/10] fix --- analog/timer/timer.py | 42 ++++++++++++++++-------------------------- requirements.txt | 2 +- 2 files changed, 17 insertions(+), 27 deletions(-) diff --git a/analog/timer/timer.py b/analog/timer/timer.py index dd663d62..923255b6 100644 --- a/analog/timer/timer.py +++ b/analog/timer/timer.py @@ -3,8 +3,6 @@ import functools import torch -import psutil -import os def get_gpu_memory(device_index=None): @@ -15,15 +13,6 @@ def get_gpu_max_memory(device_index=None): return torch.cuda.max_memory_allocated(device_index) -def get_host_memory(): - process = psutil.Process(os.getpid()) - return process.memory_info().rss - - -def get_cpu_swap_memory(): - return psutil.swap_memory().used - - class FunctionTimer: log = {} @@ -40,36 +29,30 @@ def wrapper(*args, **kwargs): @classmethod def _host_timer_wrapper(cls, func, label, *args, **kwargs): - before_memory = get_host_memory() start_time = time.time() result = func(*args, **kwargs) end_time = time.time() - after_memory = get_host_memory() if label not in cls.log: cls.log[label] = [ { "time_delta": end_time - start_time, - "memory_delta": (before_memory - after_memory) >> 20, } ] else: cls.log[label].append( { "time_delta": end_time - start_time, - "memory_delta": (before_memory - after_memory) >> 20, } ) return result @classmethod def _device_timer_wrapper(cls, func, label, *args, **kwargs): - before_memory = get_gpu_memory() start_event = torch.cuda.Event(enable_timing=True) start_event.record() result = func(*args, **kwargs) end_event = torch.cuda.Event(enable_timing=True) end_event.record() - after_memory = get_gpu_memory() torch.cuda.current_stream().wait_event(end_event) torch.cuda.synchronize() if label not in cls.log: @@ -77,7 +60,6 @@ def _device_timer_wrapper(cls, func, label, *args, **kwargs): { "time_delta": start_event.elapsed_time(end_event) / 1000, # turn to seconds - "memory_delta": (before_memory - after_memory) >> 20, } ] else: @@ -85,7 +67,6 @@ def _device_timer_wrapper(cls, func, label, *args, **kwargs): { "time_delta": start_event.elapsed_time(end_event) / 1000, # turn to seconds - "memory_delta": (before_memory - after_memory) >> 20, } ) return result @@ -110,15 +91,24 @@ def get_log(cls): @classmethod def print_log(cls): - print("Function Timer Logs:") + print( + "###########################################################################" + ) + print( + "################################ TIMER LOG ################################" + ) + header = f"{'Label':<50} | {'Total Time (sec)':>20}" + print(header) + print("-" * len(header)) for label, details in cls.log.items(): - print(f" {label}:") sum_time = 0 - for log in details: - for key, value in log.items(): - if key == "time_delta": - sum_time += value - print(f" operation costs {sum_time} seconds") + for log_entry in details: + time_delta = log_entry.get("time_delta", 0) + sum_time += time_delta + # truncate 47 letters if the label is longer than 50. + display_label = (label[:47] + "...") if len(label) > 50 else label + row = f"{display_label:<50} | {sum_time:>20.4f}" + print(row) class HostFunctionTimer(FunctionTimer): diff --git a/requirements.txt b/requirements.txt index cda9499b..89411e3a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,5 @@ numpy pandas torch einops -psutil + pyyaml From a7db7a9e4b83b8ac7608d6c5ccb796e8246df973 Mon Sep 17 00:00:00 2001 From: Minsoo Patrick Kang Date: Sun, 25 Feb 2024 00:23:26 -0800 Subject: [PATCH 07/10] profiler --- analog/analog.py | 2 +- analog/analysis/influence_function.py | 2 +- analog/logging/log_saver.py | 2 +- analog/logging/logger.py | 2 +- analog/timer/__init__.py | 1 - analog/timer/timer.py | 176 ------------------ .../mnist_influence/compute_influences.py | 2 +- 7 files changed, 5 insertions(+), 182 deletions(-) delete mode 100644 analog/timer/__init__.py delete mode 100644 analog/timer/timer.py diff --git a/analog/analog.py b/analog/analog.py index d74ea089..cc4c0b7a 100644 --- a/analog/analog.py +++ b/analog/analog.py @@ -17,7 +17,7 @@ from analog.lora import LoRAHandler from analog.lora.utils import is_lora from analog.state import AnaLogState -from analog.timer.timer import DeviceFunctionTimer +from analog.monitor_util.timer import DeviceFunctionTimer from analog.utils import ( get_logger, get_rank, diff --git a/analog/analysis/influence_function.py b/analog/analysis/influence_function.py index 24d54f27..2834cbc1 100644 --- a/analog/analysis/influence_function.py +++ b/analog/analysis/influence_function.py @@ -5,7 +5,7 @@ from einops import einsum, rearrange, reduce from analog.config import InfluenceConfig from analog.state import AnaLogState -from analog.timer.timer import DeviceFunctionTimer +from analog.monitor_util.timer import DeviceFunctionTimer from analog.utils import get_logger, nested_dict from analog.analysis.utils import synchronize_device diff --git a/analog/logging/log_saver.py b/analog/logging/log_saver.py index e3326597..e95d5ced 100644 --- a/analog/logging/log_saver.py +++ b/analog/logging/log_saver.py @@ -1,7 +1,7 @@ from concurrent.futures import ThreadPoolExecutor import torch -from analog.timer.timer import HostFunctionTimer, DeviceFunctionTimer +from analog.monitor_util.timer import DeviceFunctionTimer from analog.utils import nested_dict, to_numpy, get_rank from analog.logging.mmap import MemoryMapHandler diff --git a/analog/logging/logger.py b/analog/logging/logger.py index edff8987..a3d6b3eb 100644 --- a/analog/logging/logger.py +++ b/analog/logging/logger.py @@ -10,7 +10,7 @@ from analog.logging.option import LogOption from analog.logging.log_saver import LogSaver from analog.logging.utils import compute_per_sample_gradient -from analog.timer.timer import DeviceFunctionTimer +from analog.monitor_util.timer import DeviceFunctionTimer from analog.utils import get_logger diff --git a/analog/timer/__init__.py b/analog/timer/__init__.py deleted file mode 100644 index 6187990c..00000000 --- a/analog/timer/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .timer import FunctionTimer, Timer diff --git a/analog/timer/timer.py b/analog/timer/timer.py deleted file mode 100644 index 923255b6..00000000 --- a/analog/timer/timer.py +++ /dev/null @@ -1,176 +0,0 @@ -import logging -import time -import functools - -import torch - - -def get_gpu_memory(device_index=None): - return torch.cuda.memory_allocated(device_index) - - -def get_gpu_max_memory(device_index=None): - return torch.cuda.max_memory_allocated(device_index) - - -class FunctionTimer: - log = {} - - @classmethod - def _wrap_function(cls, func, label, host_timer): - @functools.wraps(func) - def wrapper(*args, **kwargs): - if host_timer: - return cls._host_timer_wrapper(func, label, *args, **kwargs) - else: - return cls._device_timer_wrapper(func, label, *args, **kwargs) - - return wrapper - - @classmethod - def _host_timer_wrapper(cls, func, label, *args, **kwargs): - start_time = time.time() - result = func(*args, **kwargs) - end_time = time.time() - if label not in cls.log: - cls.log[label] = [ - { - "time_delta": end_time - start_time, - } - ] - else: - cls.log[label].append( - { - "time_delta": end_time - start_time, - } - ) - return result - - @classmethod - def _device_timer_wrapper(cls, func, label, *args, **kwargs): - start_event = torch.cuda.Event(enable_timing=True) - start_event.record() - result = func(*args, **kwargs) - end_event = torch.cuda.Event(enable_timing=True) - end_event.record() - torch.cuda.current_stream().wait_event(end_event) - torch.cuda.synchronize() - if label not in cls.log: - cls.log[label] = [ - { - "time_delta": start_event.elapsed_time(end_event) - / 1000, # turn to seconds - } - ] - else: - cls.log[label].append( - { - "time_delta": start_event.elapsed_time(end_event) - / 1000, # turn to seconds - } - ) - return result - - @classmethod - def timer(cls, label_or_func=None): - host_timer = getattr( - cls, "host_timer", False - ) # Fallback to False if not defined - - def decorator(func): - label = label_or_func if isinstance(label_or_func, str) else func.__name__ - return cls._wrap_function(func, label, host_timer) - - if callable(label_or_func): - return decorator(label_or_func) - return decorator - - @classmethod - def get_log(cls): - return cls.log - - @classmethod - def print_log(cls): - print( - "###########################################################################" - ) - print( - "################################ TIMER LOG ################################" - ) - header = f"{'Label':<50} | {'Total Time (sec)':>20}" - print(header) - print("-" * len(header)) - for label, details in cls.log.items(): - sum_time = 0 - for log_entry in details: - time_delta = log_entry.get("time_delta", 0) - sum_time += time_delta - # truncate 47 letters if the label is longer than 50. - display_label = (label[:47] + "...") if len(label) > 50 else label - row = f"{display_label:<50} | {sum_time:>20.4f}" - print(row) - - -class HostFunctionTimer(FunctionTimer): - host_timer = True - - -class DeviceFunctionTimer(FunctionTimer): - if torch.cuda.is_available(): - host_timer = False - else: - logging.warning("CUDA is not set, setting the timer is set to host timer.") - host_timer = True - - -class Timer: - def __init__(self): - self.timers = { - "cpu": {}, - "gpu": {}, - } - self.timer_info = {} # synchronized. - self.is_synchronized = False - - def start_timer(self, name, host_timer=False): - if host_timer: - if name in self.timers["cpu"]: - logging.warning(f"timer for {name} already exist") - return - start_time = time.time() - self.timers["cpu"][name] = [start_time] - else: - if name in self.timers["gpu"]: - logging.warning(f"timer for {name} already exist") - return - self.is_synchronized = False - start_event = torch.cuda.Event(enable_timing=True) - start_event.record() - self.timers["gpu"][name] = [start_event] - - def stop_timer(self, name): - if name in self.timers["cpu"]: - end_time = time.time() - self.timers["cpu"][name].append(end_time) - if name in self.timers["gpu"]: - self.is_synchronized = False - end_event = torch.cuda.Event(enable_timing=True) - end_event.record() - self.timers["gpu"][name].append(end_event) - - def _calculate_elapse_time(self): - for name, timer in self.timers["cpu"].items(): - assert len(timer) == 2 - self.timer_info[name] = (timer[1] - timer[0]) * 1000 - if not self.is_synchronized: - for name, events in self.timers["gpu"].items(): - assert len(events) == 2 - torch.cuda.current_stream().wait_event(events[1]) - torch.cuda.synchronize() - self.timer_info[name] = events[0].elapsed_time(events[1]) - self.is_synchronized = True - - def get_info(self): - if not self.is_synchronized: - self._calculate_elapse_time() - return self.timer_info diff --git a/examples/mnist_influence/compute_influences.py b/examples/mnist_influence/compute_influences.py index 10120a6b..bf974cff 100644 --- a/examples/mnist_influence/compute_influences.py +++ b/examples/mnist_influence/compute_influences.py @@ -11,7 +11,7 @@ construct_mlp, ) -from analog.timer import FunctionTimer +from analog.monitor_util import FunctionTimer parser = argparse.ArgumentParser("MNIST Influence Analysis") parser.add_argument("--data", type=str, default="mnist", help="mnist or fmnist") From eca583dedc542cbfa90e65a192ed4fcc8d58aec1 Mon Sep 17 00:00:00 2001 From: Minsoo Patrick Kang Date: Sun, 25 Feb 2024 00:24:10 -0800 Subject: [PATCH 08/10] profiler --- analog/monitor_util/__init__.py | 2 + analog/monitor_util/timer.py | 178 ++++++++++++++++++++++++++++++++ 2 files changed, 180 insertions(+) create mode 100644 analog/monitor_util/__init__.py create mode 100644 analog/monitor_util/timer.py diff --git a/analog/monitor_util/__init__.py b/analog/monitor_util/__init__.py new file mode 100644 index 00000000..45761224 --- /dev/null +++ b/analog/monitor_util/__init__.py @@ -0,0 +1,2 @@ +from .timer import FunctionTimer, Timer +from .profiler import memory_profiler diff --git a/analog/monitor_util/timer.py b/analog/monitor_util/timer.py new file mode 100644 index 00000000..fed99c20 --- /dev/null +++ b/analog/monitor_util/timer.py @@ -0,0 +1,178 @@ +import logging +import time +import functools + +import torch + + +def get_gpu_memory(device_index=None): + return torch.cuda.memory_allocated(device_index) + + +def get_gpu_max_memory(device_index=None): + return torch.cuda.max_memory_allocated(device_index) + + +class FunctionTimer: + log = {} + + @classmethod + def _wrap_function(cls, func, label, host_timer): + @functools.wraps(func) + def wrapper(*args, **kwargs): + if host_timer: + return cls._host_timer_wrapper(func, label, *args, **kwargs) + else: + return cls._device_timer_wrapper(func, label, *args, **kwargs) + + return wrapper + + @classmethod + def _host_timer_wrapper(cls, func, label, *args, **kwargs): + start_time = time.time() + result = func(*args, **kwargs) + end_time = time.time() + if label not in cls.log: + cls.log[label] = [ + { + "time_delta": end_time - start_time, + } + ] + else: + cls.log[label].append( + { + "time_delta": end_time - start_time, + } + ) + return result + + @classmethod + def _device_timer_wrapper(cls, func, label, *args, **kwargs): + start_event = torch.cuda.Event(enable_timing=True) + start_event.record() + result = func(*args, **kwargs) + end_event = torch.cuda.Event(enable_timing=True) + end_event.record() + torch.cuda.current_stream().wait_event(end_event) + torch.cuda.synchronize() + if label not in cls.log: + cls.log[label] = [ + { + "time_delta": start_event.elapsed_time(end_event) + / 1000, # turn to seconds + } + ] + else: + cls.log[label].append( + { + "time_delta": start_event.elapsed_time(end_event) + / 1000, # turn to seconds + } + ) + return result + + @classmethod + def timer(cls, label_or_func=None): + host_timer = getattr( + cls, "host_timer", False + ) # Fallback to False if not defined + + def decorator(func): + label = label_or_func if isinstance(label_or_func, str) else func.__name__ + return cls._wrap_function(func, label, host_timer) + + if callable(label_or_func): + return decorator(label_or_func) + return decorator + + @classmethod + def get_log(cls): + return cls.log + + @classmethod + def print_log(cls): + print( + "###########################################################################" + ) + print( + "################################ TIMER LOG ################################" + ) + header = f"{'Label':<50} | {'Total Time (sec)':>20}" + print(header) + print("-" * len(header)) + for label, details in cls.log.items(): + sum_time = 0 + for log_entry in details: + time_delta = log_entry.get("time_delta", 0) + sum_time += time_delta + # truncate 47 letters if the label is longer than 50. + display_label = (label[:47] + "...") if len(label) > 50 else label + row = f"{display_label:<50} | {sum_time:>20.4f}" + print(row) + + +class HostFunctionTimer(FunctionTimer): + host_timer = True + + +class DeviceFunctionTimer(FunctionTimer): + if torch.cuda.is_available(): + host_timer = False + else: + logging.warning( + "CUDA is not set, setting the monitor_util is set to host monitor_util." + ) + host_timer = True + + +class Timer: + def __init__(self): + self.timers = { + "cpu": {}, + "gpu": {}, + } + self.timer_info = {} # synchronized. + self.is_synchronized = False + + def start_timer(self, name, host_timer=False): + if host_timer: + if name in self.timers["cpu"]: + logging.warning(f"monitor_util for {name} already exist") + return + start_time = time.time() + self.timers["cpu"][name] = [start_time] + else: + if name in self.timers["gpu"]: + logging.warning(f"monitor_util for {name} already exist") + return + self.is_synchronized = False + start_event = torch.cuda.Event(enable_timing=True) + start_event.record() + self.timers["gpu"][name] = [start_event] + + def stop_timer(self, name): + if name in self.timers["cpu"]: + end_time = time.time() + self.timers["cpu"][name].append(end_time) + if name in self.timers["gpu"]: + self.is_synchronized = False + end_event = torch.cuda.Event(enable_timing=True) + end_event.record() + self.timers["gpu"][name].append(end_event) + + def _calculate_elapse_time(self): + for name, timer in self.timers["cpu"].items(): + assert len(timer) == 2 + self.timer_info[name] = (timer[1] - timer[0]) * 1000 + if not self.is_synchronized: + for name, events in self.timers["gpu"].items(): + assert len(events) == 2 + torch.cuda.current_stream().wait_event(events[1]) + torch.cuda.synchronize() + self.timer_info[name] = events[0].elapsed_time(events[1]) + self.is_synchronized = True + + def get_info(self): + if not self.is_synchronized: + self._calculate_elapse_time() + return self.timer_info From ff1ad6f70d1b53ef23fbe6f4579b7909abbac7b7 Mon Sep 17 00:00:00 2001 From: Minsoo Patrick Kang Date: Sun, 25 Feb 2024 00:25:23 -0800 Subject: [PATCH 09/10] profiler --- analog/monitor_util/profiler.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 analog/monitor_util/profiler.py diff --git a/analog/monitor_util/profiler.py b/analog/monitor_util/profiler.py new file mode 100644 index 00000000..647fe440 --- /dev/null +++ b/analog/monitor_util/profiler.py @@ -0,0 +1,26 @@ +import torch +import functools +from torch.profiler import profile, ProfilerActivity + + +def memory_profiler(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + activities = [ProfilerActivity.CPU] + if device.type == "cuda": + activities.append(ProfilerActivity.CUDA) + + with profile(activities=activities, profile_memory=True) as prof: + result = func(*args, **kwargs) + + print( + prof.key_averages().table( + sort_by="self_cuda_memory_usage" + if device.type == "cuda" + else "self_cpu_memory_usage" + ) + ) + return result + + return wrapper From 8326b59882297cb89ad6702ec83e45d8189605cc Mon Sep 17 00:00:00 2001 From: Minsoo Patrick Kang Date: Sun, 25 Feb 2024 00:34:56 -0800 Subject: [PATCH 10/10] black test --- analog/monitor_util/profiler.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/analog/monitor_util/profiler.py b/analog/monitor_util/profiler.py index 647fe440..30066d16 100644 --- a/analog/monitor_util/profiler.py +++ b/analog/monitor_util/profiler.py @@ -16,9 +16,11 @@ def wrapper(*args, **kwargs): print( prof.key_averages().table( - sort_by="self_cuda_memory_usage" - if device.type == "cuda" - else "self_cpu_memory_usage" + sort_by=( + "self_cuda_memory_usage" + if device.type == "cuda" + else "self_cpu_memory_usage" + ) ) ) return result