From 2884fda01cfe17ef06242cd359df7572a041aa90 Mon Sep 17 00:00:00 2001 From: Hanlin Tang Date: Thu, 16 Feb 2023 06:28:53 -0800 Subject: [PATCH 01/12] WIP --- composer/callbacks/health_checker.py | 205 +++++++++++++++++++++++++++ 1 file changed, 205 insertions(+) create mode 100644 composer/callbacks/health_checker.py diff --git a/composer/callbacks/health_checker.py b/composer/callbacks/health_checker.py new file mode 100644 index 0000000000..deb37540bf --- /dev/null +++ b/composer/callbacks/health_checker.py @@ -0,0 +1,205 @@ +# Copyright 2022 MosaicML Composer authors +# SPDX-License-Identifier: Apache-2.0 + +"""Log memory usage during training.""" +import logging +from collections import deque +from typing import Optional + +try: + import psutil +except ImportError: + psutil = None + +try: + import pynvml +except ImportError: + pynvml = None + +import os + +import numpy as np +import pynvml +from slack_sdk.webhook import WebhookClient + +from composer.core import Callback, State +from composer.core.time import Timestamp +from composer.loggers import Logger +from composer.utils import dist + +log = logging.getLogger(__name__) + +__all__ = ['HealthChecker'] + + +# modified from https://github.com/wandb/wandb/blob/main/wandb/sdk/internal/system/assets/gpu.py +def gpu_in_use_by_this_process(gpu_handle) -> bool: + + pid = os.getpid() + + if psutil is None: + return False + + try: + base_process = psutil.Process(pid=pid) + except psutil.NoSuchProcess: + # do not report any gpu metrics if the base process cant be found + return False + + our_processes = base_process.children(recursive=True) + our_processes.append(base_process) + + our_pids = {process.pid for process in our_processes} + + compute_pids = { + process.pid for process in pynvml.nvmlDeviceGetComputeRunningProcesses(gpu_handle) # type: ignore + } + graphics_pids = { + process.pid for process in pynvml.nvmlDeviceGetGraphicsRunningProcesses(gpu_handle) # type: ignore + } + + pids_using_device = compute_pids | graphics_pids + + return len(pids_using_device & our_pids) > 0 + + +class HealthChecker(Callback): + """Checks for GPU health. + + This callback checks for GPU health by measuring GPU utilization across all + involved GPUs/nodes and alerts if the range of utilizations exceeds + a certain threshold. + + For example, if utilization across GPUs for this run are + [30, 30, 45], then the range (45-30=15) would exceed a threshold of 10%. + + Only GPUs involved in this training run are checked. + + """ + + def __init__( + self, + threshold=0.10, + sample_freq=5, + check_freq=120, + wait=120, + slack_webhook_url=None, + ) -> None: + self.threshold = threshold + self.sample_freq = sample_freq + self.check_freq = check_freq + self.wait = wait + self.slack_webhook_url = slack_webhook_url + + if not self.slack_webhook_url: + self.slack_webhook_url = os.environ.get('SLACK_WEBHOOK_URL', None) + + self.alerted = False + self.last_sample = 0 + self.last_check = 0 + + self.metrics = [GPUUtilization()] + + def init(self, state: State, logger: Logger) -> None: + pass + + def after_train_batch(self, state: State, logger: Logger): + if self.alerted: + # only alert once + return + + if self._sample(state.timestamp): + for metric in self.metrics: + metric.sample() + + if self._check(state.timestamp): + for metric in self.metrics: + if metric.check(): + self._alert(state, metric) + self.alerted = True + metric.clear() + + def _sample(self, timestamp: Timestamp) -> bool: + now = timestamp.total_wct.seconds + + if now < self.wait: + return False + + if now - self.last_sample > self.sample_freq: + self.last_sample = now + return True + + return False + + def _check(self, timestamp: Timestamp) -> bool: + now = timestamp.total_wct.seconds + + if now - self.last_check > self.check_freq: + self.last_check = now + return True + return False + + def _alert(self, state: State, metric) -> None: + message = 'Found a potential issue!' + + logging.warning(message) + if self.slack_webhook_url: + client = WebhookClient(url=self.slack_webhook_url) + client.send(text=message) + + +class GPUUtilization: + + def __init__(self, threshold=10) -> None: + self.samples = deque() + self.threshold = threshold + + def sample(self) -> None: + sample = self._sample() + if sample is not None: + self.samples.append(sample) + + def _sample(self) -> Optional[float]: + device_count = pynvml.nvmlDeviceGetCount() # type: ignore + for i in range(device_count): + handle = pynvml.nvmlDeviceGetHandleByIndex(i) # type: ignore + if gpu_in_use_by_this_process(handle): + return pynvml.nvmlDeviceGetUtilizationRates(handle).gpu, # type: ignore + return None + + def check(self) -> bool: + sample = [s for s in self.samples if s is not None] + average_sample = np.mean(sample) if sample else None + all_samples = dist.all_gather_object(average_sample) + + return np.nanmax(all_samples) - np.nanmin(all_samples) > self.threshold + + def clear(self) -> None: + self.samples.clear() + + +class ECCErrors: + + def __init__(self, threshold=100) -> None: + self.samples = deque() + self.threshold = threshold + + def sample(self) -> None: + sample = self._sample() + if sample is not None: + self.samples.append(sample) + + def _sample(self) -> Optional[float]: + device_count = pynvml.nvmlDeviceGetCount() # type: ignore + for i in range(device_count): + handle = pynvml.nvmlDeviceGetHandleByIndex(i) # type: ignore + if gpu_in_use_by_this_process(handle): + return pynvml.nvmlDeviceGetMemoryErrorCounter(handle, 0, 0, 2) # type: ignore + return None + + def check(self) -> bool: + sample = [s for s in self.samples if s is not None] + return np.nanmax(sample) - np.nanmin(sample) > self.threshold + + def clear(self) -> None: + self.samples.clear() From f6bd39213a487d514367731340243a839bb2ab13 Mon Sep 17 00:00:00 2001 From: Hanlin Tang Date: Thu, 23 Feb 2023 19:22:59 +0000 Subject: [PATCH 02/12] WIP --- composer/callbacks/__init__.py | 2 + composer/callbacks/health_checker.py | 148 +++++++++++++------------ tests/callbacks/test_health_checker.py | 44 ++++++++ 3 files changed, 123 insertions(+), 71 deletions(-) create mode 100644 tests/callbacks/test_health_checker.py diff --git a/composer/callbacks/__init__.py b/composer/callbacks/__init__.py index 6ce952ad40..d9f3ee837e 100644 --- a/composer/callbacks/__init__.py +++ b/composer/callbacks/__init__.py @@ -9,6 +9,7 @@ from composer.callbacks.checkpoint_saver import CheckpointSaver from composer.callbacks.early_stopper import EarlyStopper from composer.callbacks.export_for_inference import ExportForInferenceCallback +from composer.callbacks.health_checker import HealthChecker from composer.callbacks.image_visualizer import ImageVisualizer from composer.callbacks.lr_monitor import LRMonitor from composer.callbacks.memory_monitor import MemoryMonitor @@ -28,4 +29,5 @@ 'ExportForInferenceCallback', 'ThresholdStopper', 'ImageVisualizer', + 'HealthChecker', ] diff --git a/composer/callbacks/health_checker.py b/composer/callbacks/health_checker.py index deb37540bf..d22f528886 100644 --- a/composer/callbacks/health_checker.py +++ b/composer/callbacks/health_checker.py @@ -4,7 +4,9 @@ """Log memory usage during training.""" import logging from collections import deque -from typing import Optional +from typing import List, Optional + +import torch try: import psutil @@ -32,37 +34,6 @@ __all__ = ['HealthChecker'] -# modified from https://github.com/wandb/wandb/blob/main/wandb/sdk/internal/system/assets/gpu.py -def gpu_in_use_by_this_process(gpu_handle) -> bool: - - pid = os.getpid() - - if psutil is None: - return False - - try: - base_process = psutil.Process(pid=pid) - except psutil.NoSuchProcess: - # do not report any gpu metrics if the base process cant be found - return False - - our_processes = base_process.children(recursive=True) - our_processes.append(base_process) - - our_pids = {process.pid for process in our_processes} - - compute_pids = { - process.pid for process in pynvml.nvmlDeviceGetComputeRunningProcesses(gpu_handle) # type: ignore - } - graphics_pids = { - process.pid for process in pynvml.nvmlDeviceGetGraphicsRunningProcesses(gpu_handle) # type: ignore - } - - pids_using_device = compute_pids | graphics_pids - - return len(pids_using_device & our_pids) > 0 - - class HealthChecker(Callback): """Checks for GPU health. @@ -94,29 +65,27 @@ def __init__( if not self.slack_webhook_url: self.slack_webhook_url = os.environ.get('SLACK_WEBHOOK_URL', None) - self.alerted = False self.last_sample = 0 self.last_check = 0 - self.metrics = [GPUUtilization()] + self.metrics = [] + if self._is_available(): + self.metrics.append(GPUUtilization()) def init(self, state: State, logger: Logger) -> None: pass def after_train_batch(self, state: State, logger: Logger): - if self.alerted: - # only alert once - return - if self._sample(state.timestamp): for metric in self.metrics: metric.sample() if self._check(state.timestamp): for metric in self.metrics: - if metric.check(): - self._alert(state, metric) - self.alerted = True + message, alert = metric.check() + if alert and not metric.alerted: + self._alert(message) + metric.alerted = True metric.clear() def _sample(self, timestamp: Timestamp) -> bool: @@ -139,7 +108,7 @@ def _check(self, timestamp: Timestamp) -> bool: return True return False - def _alert(self, state: State, metric) -> None: + def _alert(self, message: str) -> None: message = 'Found a potential issue!' logging.warning(message) @@ -147,32 +116,58 @@ def _alert(self, state: State, metric) -> None: client = WebhookClient(url=self.slack_webhook_url) client.send(text=message) + @staticmethod + def _is_available() -> bool: + if not torch.cuda.is_available(): + return False + try: + pynvml.nvmlInit() # type: ignore + return True + except pynvml.NVMLError_LibraryNotFound: # type: ignore + logging.warning('NVML not found, disabling GPU health checking') + except ImportError: + logging.warning('pynvml library not found, disabling GPU health checking.') + except Exception as e: + logging.warning(f'Error initializing NVML: {e}') + + return False + class GPUUtilization: + alerted: bool = False + def __init__(self, threshold=10) -> None: self.samples = deque() self.threshold = threshold def sample(self) -> None: - sample = self._sample() - if sample is not None: - self.samples.append(sample) - - def _sample(self) -> Optional[float]: - device_count = pynvml.nvmlDeviceGetCount() # type: ignore + if dist.get_local_rank == 0: + sample = self._sample() + if sample is not None: + self.samples.append(sample) + + def _sample(self) -> Optional[List[float]]: + # TODO: catch NVMLError + samples = [] + device_count = pynvml.nvmlDeviceGetCount() for i in range(device_count): - handle = pynvml.nvmlDeviceGetHandleByIndex(i) # type: ignore - if gpu_in_use_by_this_process(handle): - return pynvml.nvmlDeviceGetUtilizationRates(handle).gpu, # type: ignore - return None - - def check(self) -> bool: - sample = [s for s in self.samples if s is not None] - average_sample = np.mean(sample) if sample else None - all_samples = dist.all_gather_object(average_sample) - - return np.nanmax(all_samples) - np.nanmin(all_samples) > self.threshold + handle = pynvml.nvmlDeviceGetHandleByIndex(i) + samples.append(pynvml.nvmlDeviceGetUtilizationRates(handle).gpu) + return samples + + def check(self, state: State) -> Tuple[str, bool]: + if dist.get_local_rank == 0: + average_sample = np.nanmean(self.samples, axis=0) + if np.nanmax(average_sample) - np.nanmin(average_sample) > self.threshold: + message = '{run_name} experiencing abnormal GPU utilizations on rank {rank}: {utils}' + return message.format( + run_name=state.run_name, + rank=dist.node_rank, + utils=average_sample, + ), True + + return None, False def clear(self) -> None: self.samples.clear() @@ -180,26 +175,37 @@ def clear(self) -> None: class ECCErrors: + alerted: bool = False + def __init__(self, threshold=100) -> None: self.samples = deque() self.threshold = threshold def sample(self) -> None: - sample = self._sample() - if sample is not None: - self.samples.append(sample) + if dist.get_local_rank == 0: + sample = self._sample() + if sample is not None: + self.samples.append(sample) def _sample(self) -> Optional[float]: - device_count = pynvml.nvmlDeviceGetCount() # type: ignore + samples = [] + device_count = pynvml.nvmlDeviceGetCount() for i in range(device_count): - handle = pynvml.nvmlDeviceGetHandleByIndex(i) # type: ignore - if gpu_in_use_by_this_process(handle): - return pynvml.nvmlDeviceGetMemoryErrorCounter(handle, 0, 0, 2) # type: ignore - return None - - def check(self) -> bool: - sample = [s for s in self.samples if s is not None] - return np.nanmax(sample) - np.nanmin(sample) > self.threshold + handle = pynvml.nvmlDeviceGetHandleByIndex(i) + samples.append(pynvml.nvmlDeviceGetMemoryErrorCounter(handle, 0, 0, 2)) + return samples + + def check(self, state: State) -> Tuple[str, bool]: + if dist.get_local_rank == 0: + min_counter = np.min(self.samples, axis=0) + max_counter = np.max(self.samples, axis=0) + gpus_with_error = np.where(max_counter - min_counter > self.threshold) + if len(gpus_with_error) > 0: + message = '{run_name} reporting high memory ECC error on rank {rank} for GPUs: {gpus}' + ecc_data = ['GPU: {} ({} -> {})'.format(i, min_counter[i], max_counter[i]) for i in gpus_with_error] + return message.format(run_name=state.run_name, rank=dist.node_rank, gpus=ecc_data), True + + return None, False def clear(self) -> None: self.samples.clear() diff --git a/tests/callbacks/test_health_checker.py b/tests/callbacks/test_health_checker.py new file mode 100644 index 0000000000..451e380f6e --- /dev/null +++ b/tests/callbacks/test_health_checker.py @@ -0,0 +1,44 @@ +# Copyright 2022 MosaicML Composer authors +# SPDX-License-Identifier: Apache-2.0 + +from unittest.mock import patch + +import pytest +from torch.utils.data import DataLoader + +from composer.callbacks import HealthChecker +from composer.callbacks.health_checker import GPUUtilization +from composer.loggers import InMemoryLogger +from composer.trainer import Trainer +from tests.common import RandomClassificationDataset, SimpleModel, device + +# @pytest.mark.gpu +# def test_health_checker(): +# # Construct the trainer +# health_checker = HealthChecker(wait=0, sample_freq=1, check_freq=2) +# in_memory_logger = InMemoryLogger() +# trainer = Trainer( +# model=SimpleModel(), +# callbacks=health_checker, +# loggers=in_memory_logger, +# train_dataloader=DataLoader(RandomClassificationDataset()), +# max_duration='10000ba', +# device='gpu', +# ) +# trainer.fit() + + +@pytest.mark.gpu +def test_gpu_utilization(): + import pynvml + HealthChecker._is_available() + + with patch.object(pynvml, 'nvmlDeviceGetUtilizationRates') as mock_method: + mock_return = MagicMock() + mock_return.gpu = MagicMock(side_effect=[100, 100, 80]) + mock_method.return_value = mock_return + gpu_utilization = GPUUtilization() + gpu_utilization.sample() + gpu_utilization.sample() + gpu_utilization.sample() + gpu_utilization.check() From f8dd61ccd2803ad64ede40a097a2926f73f43590 Mon Sep 17 00:00:00 2001 From: Hanlin Tang Date: Sat, 25 Feb 2023 03:03:42 +0000 Subject: [PATCH 03/12] WIP --- composer/callbacks/health_checker.py | 128 ++++++++++++---------- tests/callbacks/test_health_checker.py | 140 ++++++++++++++++++++----- 2 files changed, 186 insertions(+), 82 deletions(-) diff --git a/composer/callbacks/health_checker.py b/composer/callbacks/health_checker.py index d22f528886..1b7448026d 100644 --- a/composer/callbacks/health_checker.py +++ b/composer/callbacks/health_checker.py @@ -4,15 +4,11 @@ """Log memory usage during training.""" import logging from collections import deque -from typing import List, Optional +from datetime import datetime +from typing import List, Optional, Tuple import torch -try: - import psutil -except ImportError: - psutil = None - try: import pynvml except ImportError: @@ -37,28 +33,33 @@ class HealthChecker(Callback): """Checks for GPU health. - This callback checks for GPU health by measuring GPU utilization across all - involved GPUs/nodes and alerts if the range of utilizations exceeds - a certain threshold. + This callback checks for GPU health by tracking and alerting for abnormal + GPU utilizations. - For example, if utilization across GPUs for this run are + For example, if the average utilization during the observation window is, [30, 30, 45], then the range (45-30=15) would exceed a threshold of 10%. - Only GPUs involved in this training run are checked. - + Args: + threshold (float, optional): Threshold of GPU utilization range to + trigger an alert. Defaults to 10. + sample_freq (int, optional): Sample frequency in seconds. Default: 5. + window_size (int, optional): Window size in seconds. HealthChecker will + check for abnormalities at this frequency. Default: 120. + wait (int, optional): Seconds to wait for starting to sample. Default: 120. + slack_webhook_url (str, optional): Slack URL to send alerts. Can also + be set with the SLACK_WEBHOOK_URL environment variable. Default: None """ def __init__( self, - threshold=0.10, - sample_freq=5, - check_freq=120, - wait=120, - slack_webhook_url=None, + threshold: float = 10, + sample_freq: int = 5, + window_size: int = 120, + wait: int = 120, + slack_webhook_url: Optional[str] = None, ) -> None: - self.threshold = threshold self.sample_freq = sample_freq - self.check_freq = check_freq + self.window_size = window_size self.wait = wait self.slack_webhook_url = slack_webhook_url @@ -70,12 +71,15 @@ def __init__( self.metrics = [] if self._is_available(): - self.metrics.append(GPUUtilization()) + self.metrics.append(GPUUtilization(threshold)) def init(self, state: State, logger: Logger) -> None: pass def after_train_batch(self, state: State, logger: Logger): + if not self.metrics: + return + if self._sample(state.timestamp): for metric in self.metrics: metric.sample() @@ -84,7 +88,7 @@ def after_train_batch(self, state: State, logger: Logger): for metric in self.metrics: message, alert = metric.check() if alert and not metric.alerted: - self._alert(message) + self._alert(message, state) metric.alerted = True metric.clear() @@ -94,7 +98,7 @@ def _sample(self, timestamp: Timestamp) -> bool: if now < self.wait: return False - if now - self.last_sample > self.sample_freq: + if now - self.last_sample >= self.sample_freq: self.last_sample = now return True @@ -103,13 +107,23 @@ def _sample(self, timestamp: Timestamp) -> bool: def _check(self, timestamp: Timestamp) -> bool: now = timestamp.total_wct.seconds - if now - self.last_check > self.check_freq: + if now - self.last_check >= self.window_size: self.last_check = now return True return False - def _alert(self, message: str) -> None: - message = 'Found a potential issue!' + def _alert(self, message: str, state: State) -> None: + prefix = '[{now}][{run_name}][node_rank={node_rank}]'.format( + now=datetime.now(), + run_name=state.run_name, + node_rank=dist.get_node_rank(), + ) + + node_name = os.environ.get('NODENAME', None) + if node_name: + prefix += f'[node={node_name}]' + + message = prefix + ' : ' + message logging.warning(message) if self.slack_webhook_url: @@ -142,30 +156,28 @@ def __init__(self, threshold=10) -> None: self.threshold = threshold def sample(self) -> None: - if dist.get_local_rank == 0: + if dist.get_local_rank() == 0: sample = self._sample() if sample is not None: self.samples.append(sample) - def _sample(self) -> Optional[List[float]]: - # TODO: catch NVMLError - samples = [] - device_count = pynvml.nvmlDeviceGetCount() - for i in range(device_count): - handle = pynvml.nvmlDeviceGetHandleByIndex(i) - samples.append(pynvml.nvmlDeviceGetUtilizationRates(handle).gpu) + def _sample(self) -> Optional[List]: + try: + samples = [] + device_count = pynvml.nvmlDeviceGetCount() # type: ignore + for i in range(device_count): + handle = pynvml.nvmlDeviceGetHandleByIndex(i) # type: ignore + samples.append(pynvml.nvmlDeviceGetUtilizationRates(handle).gpu) # type: ignore + except pynvml.NVMLError: # type: ignore + return None return samples - def check(self, state: State) -> Tuple[str, bool]: - if dist.get_local_rank == 0: - average_sample = np.nanmean(self.samples, axis=0) + def check(self) -> Tuple[Optional[str], bool]: + if dist.get_local_rank() == 0: + average_sample = np.nanmean(list(self.samples), axis=0) if np.nanmax(average_sample) - np.nanmin(average_sample) > self.threshold: - message = '{run_name} experiencing abnormal GPU utilizations on rank {rank}: {utils}' - return message.format( - run_name=state.run_name, - rank=dist.node_rank, - utils=average_sample, - ), True + message = 'Abnormal GPU utilizations: {utils}' + return message.format(utils=average_sample,), True return None, False @@ -182,28 +194,34 @@ def __init__(self, threshold=100) -> None: self.threshold = threshold def sample(self) -> None: - if dist.get_local_rank == 0: + if dist.get_local_rank() == 0: sample = self._sample() if sample is not None: self.samples.append(sample) - def _sample(self) -> Optional[float]: - samples = [] - device_count = pynvml.nvmlDeviceGetCount() - for i in range(device_count): - handle = pynvml.nvmlDeviceGetHandleByIndex(i) - samples.append(pynvml.nvmlDeviceGetMemoryErrorCounter(handle, 0, 0, 2)) + def _sample(self) -> Optional[List]: + try: + samples = [] + device_count = pynvml.nvmlDeviceGetCount() # type: ignore + for i in range(device_count): + handle = pynvml.nvmlDeviceGetHandleByIndex(i) # type: ignore + samples.append(pynvml.nvmlDeviceGetMemoryErrorCounter(handle, 0, 0, 2)) # type: ignore + except pynvml.NVMLError: # type: ignore + return None return samples - def check(self, state: State) -> Tuple[str, bool]: - if dist.get_local_rank == 0: - min_counter = np.min(self.samples, axis=0) - max_counter = np.max(self.samples, axis=0) + def check(self) -> Tuple[Optional[str], bool]: + if dist.get_local_rank() == 0: + min_counter = np.min(list(self.samples), axis=0) + max_counter = np.max(list(self.samples), axis=0) gpus_with_error = np.where(max_counter - min_counter > self.threshold) if len(gpus_with_error) > 0: - message = '{run_name} reporting high memory ECC error on rank {rank} for GPUs: {gpus}' + message = 'High memory ECC error for GPUs : {gpus}' ecc_data = ['GPU: {} ({} -> {})'.format(i, min_counter[i], max_counter[i]) for i in gpus_with_error] - return message.format(run_name=state.run_name, rank=dist.node_rank, gpus=ecc_data), True + return message.format( + rank=dist.get_node_rank(), + gpus=ecc_data, + ), True return None, False diff --git a/tests/callbacks/test_health_checker.py b/tests/callbacks/test_health_checker.py index 451e380f6e..5068e5edf6 100644 --- a/tests/callbacks/test_health_checker.py +++ b/tests/callbacks/test_health_checker.py @@ -1,44 +1,130 @@ # Copyright 2022 MosaicML Composer authors # SPDX-License-Identifier: Apache-2.0 -from unittest.mock import patch +import datetime +from unittest.mock import MagicMock, patch import pytest -from torch.utils.data import DataLoader +from composer import Timestamp from composer.callbacks import HealthChecker -from composer.callbacks.health_checker import GPUUtilization -from composer.loggers import InMemoryLogger -from composer.trainer import Trainer -from tests.common import RandomClassificationDataset, SimpleModel, device - -# @pytest.mark.gpu -# def test_health_checker(): -# # Construct the trainer -# health_checker = HealthChecker(wait=0, sample_freq=1, check_freq=2) -# in_memory_logger = InMemoryLogger() -# trainer = Trainer( -# model=SimpleModel(), -# callbacks=health_checker, -# loggers=in_memory_logger, -# train_dataloader=DataLoader(RandomClassificationDataset()), -# max_duration='10000ba', -# device='gpu', -# ) -# trainer.fit() +from composer.callbacks.health_checker import ECCErrors, GPUUtilization +from composer.utils import dist +from tests.common import world_size + + +class MockUtil: + + def __init__(self, util): + self.gpu = util @pytest.mark.gpu -def test_gpu_utilization(): +@world_size(1, 2) +def test_gpu_utilization(world_size): import pynvml HealthChecker._is_available() - with patch.object(pynvml, 'nvmlDeviceGetUtilizationRates') as mock_method: - mock_return = MagicMock() - mock_return.gpu = MagicMock(side_effect=[100, 100, 80]) - mock_method.return_value = mock_return + gpu_utilization_values = [ + MockUtil(100), + MockUtil(10), + MockUtil(100), + MockUtil(100), + MockUtil(100), + MockUtil(100), + ] + + with patch.multiple(pynvml, + nvmlDeviceGetUtilizationRates=MagicMock(side_effect=gpu_utilization_values), + nvmlDeviceGetCount=MagicMock(return_value=world_size)): + gpu_utilization = GPUUtilization() gpu_utilization.sample() gpu_utilization.sample() gpu_utilization.sample() - gpu_utilization.check() + _, alert = gpu_utilization.check() + + should_alert = dist.get_local_rank() == 0 and world_size > 1 + assert alert == should_alert + + +@pytest.mark.gpu +@world_size(1, 2) +def test_ecc_counters(world_size): + import pynvml + HealthChecker._is_available() + + ecc_counters = [0, 0, 150, 0, 300, 0] + + with patch.multiple(pynvml, + nvmlDeviceGetMemoryErrorCounter=MagicMock(side_effect=ecc_counters), + nvmlDeviceGetCount=MagicMock(return_value=world_size)): + + ecc_counter = ECCErrors() + ecc_counter.sample() + ecc_counter.sample() + ecc_counter.sample() + _, alert = ecc_counter.check() + + # only the local rank 0 alerts + assert alert == (dist.get_local_rank() == 0) + + +@pytest.mark.gpu +@world_size(1, 2) +def test_health_checker(world_size): + import pynvml + + state = MagicMock() + state.run_name = 'pytest-mock-run-kwei73' + logger = MagicMock() + + health_checker = HealthChecker( + sample_freq=1, + window_size=3, + wait=0, + ) + + gpu_utilization_values = [ + MockUtil(100), + MockUtil(10), + MockUtil(100), + MockUtil(100), + MockUtil(100), + MockUtil(100), + ] + + with patch.multiple(pynvml, + nvmlDeviceGetUtilizationRates=MagicMock(side_effect=gpu_utilization_values), + nvmlDeviceGetCount=MagicMock(return_value=world_size)): + + # collect data and checker + for seconds in [1, 2, 3]: + state.timestamp = Timestamp(total_wct=datetime.timedelta(seconds=seconds)) + health_checker.after_train_batch(state, logger) + + should_alert = dist.get_local_rank() == 0 and world_size > 1 + assert health_checker.metrics[0].alerted == should_alert + + +def test_health_checker_sampling(): + timestamp = Timestamp(total_wct=datetime.timedelta(seconds=0)) + + health_checker = HealthChecker( + sample_freq=1, + window_size=5, + wait=10, + ) + + config = [ + (5, False), # before wait + (11, True), + (11.5, False), # below sample frequency + (12, True), + (20, True), + (11, False), # no time travel + ] + + for seconds, is_sample in config: + timestamp = Timestamp(total_wct=datetime.timedelta(seconds=seconds)) + assert health_checker._sample(timestamp) == is_sample From f10874bb5e4f7cd4e264a9b45a3f2aa7352aa8e4 Mon Sep 17 00:00:00 2001 From: Hanlin Tang Date: Sat, 25 Feb 2023 03:10:51 +0000 Subject: [PATCH 04/12] fix docs --- composer/callbacks/health_checker.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/composer/callbacks/health_checker.py b/composer/callbacks/health_checker.py index 1b7448026d..c8f1ecdff6 100644 --- a/composer/callbacks/health_checker.py +++ b/composer/callbacks/health_checker.py @@ -1,7 +1,7 @@ # Copyright 2022 MosaicML Composer authors # SPDX-License-Identifier: Apache-2.0 -"""Log memory usage during training.""" +"""Check GPU Health during training.""" import logging from collections import deque from datetime import datetime @@ -148,7 +148,8 @@ def _is_available() -> bool: class GPUUtilization: - + """GPU Utilization Metric + """ alerted: bool = False def __init__(self, threshold=10) -> None: @@ -186,7 +187,8 @@ def clear(self) -> None: class ECCErrors: - + """Metric for ECC counters + """ alerted: bool = False def __init__(self, threshold=100) -> None: From b385389cd7a221c600f8fca62dd398cbdad97944 Mon Sep 17 00:00:00 2001 From: Hanlin Tang Date: Sat, 25 Feb 2023 03:19:28 +0000 Subject: [PATCH 05/12] add pynvml to setup.py --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 31a7574c4a..65ebc60bc3 100644 --- a/setup.py +++ b/setup.py @@ -87,6 +87,7 @@ def package_files(prefix: str, directory: str, extension: str): 'py-cpuinfo>=8.0.0,<10', 'packaging>=21.3.0,<23', 'importlib-metadata>=5.0.0,<7', + 'pynvml>=11.5.0,<12', ] extra_deps = {} From 838bbdad2c7d9dd09b9ec3e98c01579ec660ff11 Mon Sep 17 00:00:00 2001 From: Hanlin Tang Date: Sat, 25 Feb 2023 03:32:33 +0000 Subject: [PATCH 06/12] add slack_sdk to setup.py --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 65ebc60bc3..06b5b4d421 100644 --- a/setup.py +++ b/setup.py @@ -88,6 +88,7 @@ def package_files(prefix: str, directory: str, extension: str): 'packaging>=21.3.0,<23', 'importlib-metadata>=5.0.0,<7', 'pynvml>=11.5.0,<12', + 'slack_sdk>=3.19.5,<4', ] extra_deps = {} From 09e232cef96f211a670b12932ebc972fc3c340f0 Mon Sep 17 00:00:00 2001 From: Hanlin Tang Date: Sat, 25 Feb 2023 17:12:47 +0000 Subject: [PATCH 07/12] docs linting --- composer/callbacks/health_checker.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/composer/callbacks/health_checker.py b/composer/callbacks/health_checker.py index c8f1ecdff6..fb32928e53 100644 --- a/composer/callbacks/health_checker.py +++ b/composer/callbacks/health_checker.py @@ -148,8 +148,7 @@ def _is_available() -> bool: class GPUUtilization: - """GPU Utilization Metric - """ + """GPU Utilization Metric.""" alerted: bool = False def __init__(self, threshold=10) -> None: @@ -187,8 +186,7 @@ def clear(self) -> None: class ECCErrors: - """Metric for ECC counters - """ + """Metric for ECC counters.""" alerted: bool = False def __init__(self, threshold=100) -> None: From 9b824fbdb6ec79660a649b28b59a824550c0fe80 Mon Sep 17 00:00:00 2001 From: Hanlin Tang Date: Sat, 25 Feb 2023 11:31:33 -0800 Subject: [PATCH 08/12] add test mode --- composer/callbacks/health_checker.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/composer/callbacks/health_checker.py b/composer/callbacks/health_checker.py index fb32928e53..132045edd8 100644 --- a/composer/callbacks/health_checker.py +++ b/composer/callbacks/health_checker.py @@ -48,6 +48,8 @@ class HealthChecker(Callback): wait (int, optional): Seconds to wait for starting to sample. Default: 120. slack_webhook_url (str, optional): Slack URL to send alerts. Can also be set with the SLACK_WEBHOOK_URL environment variable. Default: None + test_mode (bool, optional): If True, will send a test alert at the first check. + Default: False """ def __init__( @@ -57,11 +59,13 @@ def __init__( window_size: int = 120, wait: int = 120, slack_webhook_url: Optional[str] = None, + test_mode: bool = False, ) -> None: self.sample_freq = sample_freq self.window_size = window_size self.wait = wait self.slack_webhook_url = slack_webhook_url + self.test_mode = test_mode if not self.slack_webhook_url: self.slack_webhook_url = os.environ.get('SLACK_WEBHOOK_URL', None) @@ -87,6 +91,9 @@ def after_train_batch(self, state: State, logger: Logger): if self._check(state.timestamp): for metric in self.metrics: message, alert = metric.check() + if self.test_mode: + alert = True + message = '[**THIS IS A TEST**]' + message if alert and not metric.alerted: self._alert(message, state) metric.alerted = True From ab00146c823c94f696e782f16d6afbeb75e1b253 Mon Sep 17 00:00:00 2001 From: Hanlin Tang Date: Sat, 25 Feb 2023 11:43:21 -0800 Subject: [PATCH 09/12] test --- composer/callbacks/health_checker.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/composer/callbacks/health_checker.py b/composer/callbacks/health_checker.py index 132045edd8..e7bd3a3f1b 100644 --- a/composer/callbacks/health_checker.py +++ b/composer/callbacks/health_checker.py @@ -91,7 +91,7 @@ def after_train_batch(self, state: State, logger: Logger): if self._check(state.timestamp): for metric in self.metrics: message, alert = metric.check() - if self.test_mode: + if self.test_mode and message: alert = True message = '[**THIS IS A TEST**]' + message if alert and not metric.alerted: @@ -183,9 +183,11 @@ def check(self) -> Tuple[Optional[str], bool]: if dist.get_local_rank() == 0: average_sample = np.nanmean(list(self.samples), axis=0) if np.nanmax(average_sample) - np.nanmin(average_sample) > self.threshold: - message = 'Abnormal GPU utilizations: {utils}' - return message.format(utils=average_sample,), True - + message = f'Abnormal GPU utilizations: {average_sample}' + return message, True + else: + message = f':+1: Normal GPU utilizations: {average_sample}' + return message, False return None, False def clear(self) -> None: From 25565f6407b1bc6c5c67377b330e2ee7226133a5 Mon Sep 17 00:00:00 2001 From: Hanlin Tang Date: Mon, 27 Feb 2023 15:53:26 -0800 Subject: [PATCH 10/12] address reviews --- composer/callbacks/health_checker.py | 12 ++++-------- setup.py | 4 ++-- tests/callbacks/test_health_checker.py | 10 +++++----- 3 files changed, 11 insertions(+), 15 deletions(-) diff --git a/composer/callbacks/health_checker.py b/composer/callbacks/health_checker.py index e7bd3a3f1b..789c81c9ee 100644 --- a/composer/callbacks/health_checker.py +++ b/composer/callbacks/health_checker.py @@ -17,7 +17,6 @@ import os import numpy as np -import pynvml from slack_sdk.webhook import WebhookClient from composer.core import Callback, State @@ -127,7 +126,7 @@ def _alert(self, message: str, state: State) -> None: ) node_name = os.environ.get('NODENAME', None) - if node_name: + if node_name is not None: prefix += f'[node={node_name}]' message = prefix + ' : ' + message @@ -156,11 +155,11 @@ def _is_available() -> bool: class GPUUtilization: """GPU Utilization Metric.""" - alerted: bool = False def __init__(self, threshold=10) -> None: self.samples = deque() self.threshold = threshold + self.alerted = False def sample(self) -> None: if dist.get_local_rank() == 0: @@ -196,11 +195,11 @@ def clear(self) -> None: class ECCErrors: """Metric for ECC counters.""" - alerted: bool = False def __init__(self, threshold=100) -> None: self.samples = deque() self.threshold = threshold + self.alerted = False def sample(self) -> None: if dist.get_local_rank() == 0: @@ -227,10 +226,7 @@ def check(self) -> Tuple[Optional[str], bool]: if len(gpus_with_error) > 0: message = 'High memory ECC error for GPUs : {gpus}' ecc_data = ['GPU: {} ({} -> {})'.format(i, min_counter[i], max_counter[i]) for i in gpus_with_error] - return message.format( - rank=dist.get_node_rank(), - gpus=ecc_data, - ), True + return message.format(gpus=ecc_data,), True return None, False diff --git a/setup.py b/setup.py index 06b5b4d421..4b7027d960 100644 --- a/setup.py +++ b/setup.py @@ -87,8 +87,6 @@ def package_files(prefix: str, directory: str, extension: str): 'py-cpuinfo>=8.0.0,<10', 'packaging>=21.3.0,<23', 'importlib-metadata>=5.0.0,<7', - 'pynvml>=11.5.0,<12', - 'slack_sdk>=3.19.5,<4', ] extra_deps = {} @@ -136,6 +134,8 @@ def package_files(prefix: str, directory: str, extension: str): 'cryptography==38.0.4', 'pytest-httpserver>=1.0.4,<1.1', 'setuptools<=59.5.0', + 'pynvml>=11.5.0,<12', + 'slack_sdk>=3.19.5,<4', ] extra_deps['deepspeed'] = [ diff --git a/tests/callbacks/test_health_checker.py b/tests/callbacks/test_health_checker.py index 5068e5edf6..547edefe6e 100644 --- a/tests/callbacks/test_health_checker.py +++ b/tests/callbacks/test_health_checker.py @@ -12,6 +12,9 @@ from composer.utils import dist from tests.common import world_size +pynvml = pytest.importorskip('pynvml') +pytest.importorskip('slack_sdk') + class MockUtil: @@ -22,8 +25,7 @@ def __init__(self, util): @pytest.mark.gpu @world_size(1, 2) def test_gpu_utilization(world_size): - import pynvml - HealthChecker._is_available() + assert HealthChecker._is_available() gpu_utilization_values = [ MockUtil(100), @@ -51,8 +53,7 @@ def test_gpu_utilization(world_size): @pytest.mark.gpu @world_size(1, 2) def test_ecc_counters(world_size): - import pynvml - HealthChecker._is_available() + assert HealthChecker._is_available() ecc_counters = [0, 0, 150, 0, 300, 0] @@ -73,7 +74,6 @@ def test_ecc_counters(world_size): @pytest.mark.gpu @world_size(1, 2) def test_health_checker(world_size): - import pynvml state = MagicMock() state.run_name = 'pytest-mock-run-kwei73' From 415d6a69dc40376d341d9beb10442a8c3352472b Mon Sep 17 00:00:00 2001 From: Hanlin Tang Date: Mon, 27 Feb 2023 16:15:54 -0800 Subject: [PATCH 11/12] remove ECC counters --- composer/callbacks/health_checker.py | 41 -------------------------- tests/callbacks/test_health_checker.py | 23 +-------------- 2 files changed, 1 insertion(+), 63 deletions(-) diff --git a/composer/callbacks/health_checker.py b/composer/callbacks/health_checker.py index 789c81c9ee..b052bb47cb 100644 --- a/composer/callbacks/health_checker.py +++ b/composer/callbacks/health_checker.py @@ -191,44 +191,3 @@ def check(self) -> Tuple[Optional[str], bool]: def clear(self) -> None: self.samples.clear() - - -class ECCErrors: - """Metric for ECC counters.""" - - def __init__(self, threshold=100) -> None: - self.samples = deque() - self.threshold = threshold - self.alerted = False - - def sample(self) -> None: - if dist.get_local_rank() == 0: - sample = self._sample() - if sample is not None: - self.samples.append(sample) - - def _sample(self) -> Optional[List]: - try: - samples = [] - device_count = pynvml.nvmlDeviceGetCount() # type: ignore - for i in range(device_count): - handle = pynvml.nvmlDeviceGetHandleByIndex(i) # type: ignore - samples.append(pynvml.nvmlDeviceGetMemoryErrorCounter(handle, 0, 0, 2)) # type: ignore - except pynvml.NVMLError: # type: ignore - return None - return samples - - def check(self) -> Tuple[Optional[str], bool]: - if dist.get_local_rank() == 0: - min_counter = np.min(list(self.samples), axis=0) - max_counter = np.max(list(self.samples), axis=0) - gpus_with_error = np.where(max_counter - min_counter > self.threshold) - if len(gpus_with_error) > 0: - message = 'High memory ECC error for GPUs : {gpus}' - ecc_data = ['GPU: {} ({} -> {})'.format(i, min_counter[i], max_counter[i]) for i in gpus_with_error] - return message.format(gpus=ecc_data,), True - - return None, False - - def clear(self) -> None: - self.samples.clear() diff --git a/tests/callbacks/test_health_checker.py b/tests/callbacks/test_health_checker.py index 547edefe6e..deff058a45 100644 --- a/tests/callbacks/test_health_checker.py +++ b/tests/callbacks/test_health_checker.py @@ -8,7 +8,7 @@ from composer import Timestamp from composer.callbacks import HealthChecker -from composer.callbacks.health_checker import ECCErrors, GPUUtilization +from composer.callbacks.health_checker import GPUUtilization from composer.utils import dist from tests.common import world_size @@ -50,27 +50,6 @@ def test_gpu_utilization(world_size): assert alert == should_alert -@pytest.mark.gpu -@world_size(1, 2) -def test_ecc_counters(world_size): - assert HealthChecker._is_available() - - ecc_counters = [0, 0, 150, 0, 300, 0] - - with patch.multiple(pynvml, - nvmlDeviceGetMemoryErrorCounter=MagicMock(side_effect=ecc_counters), - nvmlDeviceGetCount=MagicMock(return_value=world_size)): - - ecc_counter = ECCErrors() - ecc_counter.sample() - ecc_counter.sample() - ecc_counter.sample() - _, alert = ecc_counter.check() - - # only the local rank 0 alerts - assert alert == (dist.get_local_rank() == 0) - - @pytest.mark.gpu @world_size(1, 2) def test_health_checker(world_size): From 3e39dbb4d505f4534919776ac5f87fd6fa80355f Mon Sep 17 00:00:00 2001 From: Hanlin Tang Date: Mon, 27 Feb 2023 20:07:53 -0800 Subject: [PATCH 12/12] Update setup.py Co-authored-by: Mihir Patel --- setup.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 0f075f8bfa..b62c4829b8 100644 --- a/setup.py +++ b/setup.py @@ -134,9 +134,12 @@ def package_files(prefix: str, directory: str, extension: str): 'cryptography==38.0.4', 'pytest-httpserver>=1.0.4,<1.1', 'setuptools<=59.5.0', +] + +extra_deps['health_checker'] = { 'pynvml>=11.5.0,<12', 'slack_sdk>=3.19.5,<4', -] +} extra_deps['deepspeed'] = [ 'deepspeed==0.7.7',