From 2242423b753b51dcda0139e35d3fdb7d1b6f1974 Mon Sep 17 00:00:00 2001 From: shuyingsunshine21 <80445420+shuyingsunshine21@users.noreply.github.com> Date: Sat, 22 May 2021 13:19:24 -0700 Subject: [PATCH] refactor accelerator teardown -> training type plugin teardown (#7579) --- CHANGELOG.md | 3 + pytorch_lightning/accelerators/accelerator.py | 8 +-- pytorch_lightning/accelerators/gpu.py | 7 --- pytorch_lightning/accelerators/tpu.py | 5 -- .../plugins/training_type/ddp.py | 4 +- .../plugins/training_type/parallel.py | 17 +++++- .../plugins/training_type/single_device.py | 13 ++++- .../plugins/training_type/single_tpu.py | 8 +-- .../plugins/training_type/tpu_spawn.py | 14 +++-- .../training_type/training_type_plugin.py | 16 ++++++ tests/plugins/test_ddp_plugin.py | 48 ++++++++++++++++ tests/plugins/test_ddp_spawn_plugin.py | 42 ++++++++++++++ tests/plugins/test_deepspeed_plugin.py | 1 + tests/plugins/test_single_device_plugin.py | 56 +++++++++++++++++++ tests/plugins/test_tpu_spawn.py | 27 +++++++++ 15 files changed, 237 insertions(+), 32 deletions(-) create mode 100644 tests/plugins/test_ddp_plugin.py create mode 100644 tests/plugins/test_ddp_spawn_plugin.py create mode 100644 tests/plugins/test_single_device_plugin.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d81adb23cf86..44914061b8275 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -82,6 +82,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - MLFlowLogger now accepts `run_name` as an constructor argument ([#7622](https://github.com/PyTorchLightning/pytorch-lightning/issues/7622)) +- Changed `teardown()` in `Accelerator` to allow `training_type_plugin` to customize `teardown` logic ([#7579](https://github.com/PyTorchLightning/pytorch-lightning/pull/7579)) + + ### Deprecated diff --git a/pytorch_lightning/accelerators/accelerator.py b/pytorch_lightning/accelerators/accelerator.py index 657f7c5703dad..4ea017ae0c208 100644 --- a/pytorch_lightning/accelerators/accelerator.py +++ b/pytorch_lightning/accelerators/accelerator.py @@ -151,17 +151,15 @@ def lightning_module(self) -> 'pl.LightningModule': @property def root_device(self) -> torch.device: + """Returns the root device""" return self.training_type_plugin.root_device def teardown(self) -> None: """ This method is called to teardown the training process. - It is the right place to release memory and free other ressources. - - By default we add a barrier here to synchronize processes before returning - control back to the caller. + It is the right place to release memory and free other resources. """ - self.barrier("teardown") + self.training_type_plugin.teardown() def batch_to_device( self, batch: Any, device: Optional[torch.device] = None, dataloader_idx: Optional[int] = None diff --git a/pytorch_lightning/accelerators/gpu.py b/pytorch_lightning/accelerators/gpu.py index 16f9c2bdbef85..7543a2b794b5d 100644 --- a/pytorch_lightning/accelerators/gpu.py +++ b/pytorch_lightning/accelerators/gpu.py @@ -47,13 +47,6 @@ def on_train_start(self) -> None: with torch.cuda.device(self.root_device): torch.cuda.empty_cache() - def teardown(self) -> None: - self.lightning_module.cpu() - - # clean up memory - with torch.cuda.device(self.root_device): - torch.cuda.empty_cache() - @staticmethod def set_nvidia_flags(local_rank: int) -> None: # set the correct cuda visible devices (using pci order) diff --git a/pytorch_lightning/accelerators/tpu.py b/pytorch_lightning/accelerators/tpu.py index 973c0b21c8f13..60a48118ce18c 100644 --- a/pytorch_lightning/accelerators/tpu.py +++ b/pytorch_lightning/accelerators/tpu.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import os from typing import Any, Callable from torch.optim import Optimizer @@ -51,10 +50,6 @@ def setup(self, trainer: 'pl.Trainer', model: 'pl.LightningModule') -> None: raise MisconfigurationException("TPUs only support a single tpu core or tpu spawn training.") return super().setup(trainer, model) - def teardown(self) -> None: - if "PT_XLA_DEBUG" in os.environ: - del os.environ["PT_XLA_DEBUG"] - def run_optimizer_step( self, optimizer: Optimizer, optimizer_idx: int, lambda_closure: Callable, **kwargs: Any ) -> None: diff --git a/pytorch_lightning/plugins/training_type/ddp.py b/pytorch_lightning/plugins/training_type/ddp.py index 89e714d57f870..e65a6512d3846 100644 --- a/pytorch_lightning/plugins/training_type/ddp.py +++ b/pytorch_lightning/plugins/training_type/ddp.py @@ -96,7 +96,7 @@ def __init__( self.set_world_ranks() @property - def root_device(self): + def root_device(self) -> torch.device: return self.parallel_devices[self.local_rank] @property @@ -126,7 +126,7 @@ def distributed_sampler_kwargs(self): def _is_single_process_single_device(self) -> bool: return True - def setup_environment(self): + def setup_environment(self) -> None: # start the other scripts if not self.cluster_environment.creates_children() and os.environ.get("PL_IN_DDP_SUBPROCESS", "0") != "1": self._call_children_scripts() diff --git a/pytorch_lightning/plugins/training_type/parallel.py b/pytorch_lightning/plugins/training_type/parallel.py index 696e9695f2200..a8028e5be1a69 100644 --- a/pytorch_lightning/plugins/training_type/parallel.py +++ b/pytorch_lightning/plugins/training_type/parallel.py @@ -23,6 +23,7 @@ from pytorch_lightning.overrides.base import unwrap_lightning_module from pytorch_lightning.plugins.environments.cluster_environment import ClusterEnvironment from pytorch_lightning.plugins.training_type.training_type_plugin import TrainingTypePlugin +from pytorch_lightning.utilities import _XLA_AVAILABLE from pytorch_lightning.utilities.distributed import all_gather_ddp_if_available, ReduceOp @@ -40,13 +41,17 @@ def __init__( @property @abstractmethod - def root_device(self): + def root_device(self) -> torch.device: raise NotImplementedError @property - def on_gpu(self): + def on_gpu(self) -> bool: return self.root_device.type == "cuda" and torch.cuda.is_available() + @property + def on_tpu(self) -> bool: + return self.root_device.type == "xla" and _XLA_AVAILABLE + @property def lightning_module(self): return unwrap_lightning_module(self._model) @@ -122,3 +127,11 @@ def block_backward_sync(self): yield None else: yield None + + def teardown(self) -> None: + if self.on_gpu: + # GPU teardown + self.lightning_module.cpu() + # clean up memory + with torch.cuda.device(self.root_device): + torch.cuda.empty_cache() diff --git a/pytorch_lightning/plugins/training_type/single_device.py b/pytorch_lightning/plugins/training_type/single_device.py index df91b8f6647a9..1816f5838c948 100644 --- a/pytorch_lightning/plugins/training_type/single_device.py +++ b/pytorch_lightning/plugins/training_type/single_device.py @@ -16,6 +16,7 @@ import torch from pytorch_lightning.plugins.training_type.training_type_plugin import TrainingTypePlugin +from pytorch_lightning.utilities import _XLA_AVAILABLE class SingleDevicePlugin(TrainingTypePlugin): @@ -30,11 +31,11 @@ def __init__(self, device: torch.device): @property def on_tpu(self) -> bool: - return False + return self.root_device.type == "xla" and _XLA_AVAILABLE @property def on_gpu(self) -> bool: - return self.device.type == "cuda" and torch.cuda.is_available() + return self.root_device.type == "cuda" and torch.cuda.is_available() def reduce(self, tensor: Union[Any, torch.Tensor], *args: Any, **kwargs: Any) -> Union[Any, torch.Tensor]: """ @@ -78,3 +79,11 @@ def barrier(self, *args, **kwargs) -> None: def broadcast(self, obj: object, src: int = 0) -> object: return obj + + def teardown(self) -> None: + if self.on_gpu: + # GPU teardown + self.lightning_module.cpu() + # clean up memory + with torch.cuda.device(self.root_device): + torch.cuda.empty_cache() diff --git a/pytorch_lightning/plugins/training_type/single_tpu.py b/pytorch_lightning/plugins/training_type/single_tpu.py index fce325f322cc3..a61dd1cbc5dbd 100644 --- a/pytorch_lightning/plugins/training_type/single_tpu.py +++ b/pytorch_lightning/plugins/training_type/single_tpu.py @@ -35,10 +35,6 @@ def __init__(self, device: int, debug: bool = False): self.tpu_local_core_rank = 0 self.tpu_global_core_rank = 0 - @property - def on_tpu(self) -> bool: - return True - @property def is_distributed(self) -> bool: return False @@ -63,3 +59,7 @@ def on_save(self, checkpoint: dict) -> dict: https://github.com/pytorch/xla/blob/master/API_GUIDE.md#saving-and-loading-xla-tensors """ return move_data_to_device(checkpoint, torch.device("cpu")) + + def teardown(self) -> None: + # TPU teardown + os.environ.pop("PT_XLA_DEBUG", None) diff --git a/pytorch_lightning/plugins/training_type/tpu_spawn.py b/pytorch_lightning/plugins/training_type/tpu_spawn.py index 76756ca6904ed..8a93faa0281cd 100644 --- a/pytorch_lightning/plugins/training_type/tpu_spawn.py +++ b/pytorch_lightning/plugins/training_type/tpu_spawn.py @@ -71,7 +71,7 @@ def world_size(self) -> int: @property def root_device(self) -> torch.device: - return self.device + return xm.xla_device() @staticmethod def _validate_dataloader(dataloaders: Union[List[DataLoader], DataLoader]) -> None: @@ -129,7 +129,7 @@ def is_distributed(self) -> bool: def process_dataloader(self, dataloader: DataLoader) -> MpDeviceLoader: TPUSpawnPlugin._validate_dataloader(dataloader) - return MpDeviceLoader(dataloader, self.device) + return MpDeviceLoader(dataloader, self.root_device) def configure_ddp(self) -> None: pass @@ -172,8 +172,7 @@ def new_process(self, process_idx: int, trainer, mp_queue) -> None: time.sleep(2) def model_to_device(self) -> None: - self.device = xm.xla_device() - self.model = self.wrapped_model.to(self.device) + self.model = self.wrapped_model.to(self.root_device) def barrier(self, name: Optional[str] = None) -> None: # HOST_WORLD_SIZE is None outside the xmp.spawn process @@ -209,7 +208,7 @@ def broadcast(self, obj: object, src: int = 0) -> object: buffer = io.BytesIO() torch.save(obj, buffer) data = bytearray(buffer.getbuffer()) - data_tensor = torch.tensor(data, device=self.device, dtype=torch.float) + data_tensor = torch.tensor(data, device=self.root_device, dtype=torch.float) data = xm.all_gather(data_tensor) buffer = io.BytesIO(data.cpu().byte().numpy()) obj = torch.load(buffer) @@ -302,3 +301,8 @@ def all_gather(self, tensor: torch.Tensor, group: Optional[Any] = None, sync_gra if isinstance(tensor, torch.Tensor) and tensor.dim() == 0: tensor = tensor.unsqueeze(0) return xm.all_gather(tensor) + + def teardown(self) -> None: + # TPU teardown + os.environ.pop("PT_XLA_DEBUG", None) + self.barrier("teardown") diff --git a/pytorch_lightning/plugins/training_type/training_type_plugin.py b/pytorch_lightning/plugins/training_type/training_type_plugin.py index 4abbb4dbb7c05..0a148a01dbb69 100644 --- a/pytorch_lightning/plugins/training_type/training_type_plugin.py +++ b/pytorch_lightning/plugins/training_type/training_type_plugin.py @@ -60,11 +60,19 @@ def setup(self, model: Module) -> None: @abstractmethod def on_gpu(self) -> bool: """Returns whether the current process is done on GPU""" + raise NotImplementedError + + @property + @abstractmethod + def on_tpu(self) -> bool: + """Returns whether the current process is done on TPU""" + raise NotImplementedError @property @abstractmethod def root_device(self) -> torch.device: """Returns the root device""" + raise NotImplementedError @abstractmethod def model_to_device(self) -> None: @@ -290,6 +298,14 @@ def call_configure_sharded_model_hook(self) -> bool: def call_configure_sharded_model_hook(self, mode: bool) -> None: self._call_configure_sharded_model_hook = mode + @abstractmethod + def teardown(self) -> None: + """ + This method is called to teardown the training process. + It is the right place to release memory and free other resources. + """ + raise NotImplementedError + @classmethod def register_plugins(cls, plugin_registry): pass diff --git a/tests/plugins/test_ddp_plugin.py b/tests/plugins/test_ddp_plugin.py new file mode 100644 index 0000000000000..d236dc145d96c --- /dev/null +++ b/tests/plugins/test_ddp_plugin.py @@ -0,0 +1,48 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import torch + +from pytorch_lightning import Trainer +from pytorch_lightning.plugins import DDPPlugin +from tests.helpers.boring_model import BoringModel +from tests.helpers.runif import RunIf + + +class BoringModelGPU(BoringModel): + + def on_train_start(self) -> None: + # make sure that the model is on GPU when training + assert self.device == torch.device(f"cuda:{self.trainer.training_type_plugin.local_rank}") + self.start_cuda_memory = torch.cuda.memory_allocated() + + +@RunIf(skip_windows=True, min_gpus=2, special=True) +def test_ddp_with_2_gpus(): + """Tests if device is set correctely when training and after teardown for DDPPlugin.""" + trainer = Trainer(gpus=2, accelerator="ddp", fast_dev_run=True) + # assert training type plugin attributes for device setting + assert isinstance(trainer.training_type_plugin, DDPPlugin) + assert trainer.training_type_plugin.on_gpu + assert not trainer.training_type_plugin.on_tpu + local_rank = trainer.training_type_plugin.local_rank + assert trainer.training_type_plugin.root_device == torch.device(f"cuda:{local_rank}") + + model = BoringModelGPU() + + trainer.fit(model) + + # assert after training, model is moved to CPU and memory is deallocated + assert model.device == torch.device("cpu") + cuda_memory = torch.cuda.memory_allocated() + assert cuda_memory < model.start_cuda_memory diff --git a/tests/plugins/test_ddp_spawn_plugin.py b/tests/plugins/test_ddp_spawn_plugin.py new file mode 100644 index 0000000000000..8afc30c4692ec --- /dev/null +++ b/tests/plugins/test_ddp_spawn_plugin.py @@ -0,0 +1,42 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import torch + +from pytorch_lightning import Trainer +from pytorch_lightning.plugins import DDPSpawnPlugin +from tests.helpers.boring_model import BoringModel +from tests.helpers.runif import RunIf + + +class BoringModelDDPCPU(BoringModel): + + def on_train_start(self) -> None: + # make sure that the model is on CPU when training + assert self.device == torch.device("cpu") + + +@RunIf(skip_windows=True) +def test_ddp_cpu(): + """Tests if device is set correctely when training for DDPSpawnPlugin.""" + trainer = Trainer(num_processes=2, fast_dev_run=True) + # assert training type plugin attributes for device setting + + assert isinstance(trainer.training_type_plugin, DDPSpawnPlugin) + assert not trainer.training_type_plugin.on_gpu + assert not trainer.training_type_plugin.on_tpu + assert trainer.training_type_plugin.root_device == torch.device("cpu") + + model = BoringModelDDPCPU() + + trainer.fit(model) diff --git a/tests/plugins/test_deepspeed_plugin.py b/tests/plugins/test_deepspeed_plugin.py index 905ec3d0fa542..85d069b90288d 100644 --- a/tests/plugins/test_deepspeed_plugin.py +++ b/tests/plugins/test_deepspeed_plugin.py @@ -568,6 +568,7 @@ def test_deepspeed_multigpu_stage_2_accumulated_grad_batches(tmpdir, cpu_offload """ Test to ensure with Stage 2 and multiple GPUs, accumulated grad batches works. """ + os.environ['MASTER_PORT'] = "29500" seed_everything(42) class VerificationCallback(Callback): diff --git a/tests/plugins/test_single_device_plugin.py b/tests/plugins/test_single_device_plugin.py new file mode 100644 index 0000000000000..a398d960daf91 --- /dev/null +++ b/tests/plugins/test_single_device_plugin.py @@ -0,0 +1,56 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import torch + +from pytorch_lightning import Trainer +from pytorch_lightning.plugins import SingleDevicePlugin +from tests.helpers.boring_model import BoringModel +from tests.helpers.runif import RunIf + + +def test_single_cpu(): + """Tests if on_gpu and on_tpu is set correctly for single cpu plugin.""" + trainer = Trainer() + assert isinstance(trainer.training_type_plugin, SingleDevicePlugin) + assert not trainer.training_type_plugin.on_gpu + assert not trainer.training_type_plugin.on_tpu + assert trainer.training_type_plugin.root_device == torch.device("cpu") + + +class BoringModelGPU(BoringModel): + + def on_train_start(self) -> None: + # make sure that the model is on GPU when training + assert self.device == torch.device("cuda:0") + self.start_cuda_memory = torch.cuda.memory_allocated() + + +@RunIf(skip_windows=True, min_gpus=1) +def test_single_gpu(): + """Tests if device is set correctely when training and after teardown for single GPU plugin.""" + trainer = Trainer(gpus=1, fast_dev_run=True) + # assert training type plugin attributes for device setting + assert isinstance(trainer.training_type_plugin, SingleDevicePlugin) + assert trainer.training_type_plugin.on_gpu + assert not trainer.training_type_plugin.on_tpu + assert trainer.training_type_plugin.root_device == torch.device("cuda:0") + + model = BoringModelGPU() + + trainer.fit(model) + + # assert after training, model is moved to CPU and memory is deallocated + assert model.device == torch.device("cpu") + cuda_memory = torch.cuda.memory_allocated() + assert cuda_memory < model.start_cuda_memory diff --git a/tests/plugins/test_tpu_spawn.py b/tests/plugins/test_tpu_spawn.py index 8aa56c636cf47..85e1ecb781946 100644 --- a/tests/plugins/test_tpu_spawn.py +++ b/tests/plugins/test_tpu_spawn.py @@ -11,17 +11,22 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os from unittest import mock from unittest.mock import MagicMock import pytest +import torch from torch.utils.data import DataLoader +from pytorch_lightning import Trainer from pytorch_lightning.plugins.training_type import TPUSpawnPlugin from pytorch_lightning.trainer.connectors.data_connector import DataConnector from pytorch_lightning.utilities.exceptions import MisconfigurationException from tests.helpers.boring_model import BoringModel, RandomDataset from tests.helpers.dataloaders import CustomNotImplementedErrorDataloader +from tests.helpers.runif import RunIf +from tests.helpers.utils import pl_multi_process_test class BoringModelNoDataloaders(BoringModel): @@ -76,3 +81,25 @@ def test_error_patched_iterable_dataloaders( def test_error_process_iterable_dataloader(_): with pytest.raises(MisconfigurationException, match="TPUs do not currently support"): TPUSpawnPlugin(MagicMock()).process_dataloader(_loader_no_len) + + +class BoringModelTPU(BoringModel): + + def on_train_start(self) -> None: + assert self.device == torch.device("xla") + assert os.environ.get("PT_XLA_DEBUG") == "1" + + +@RunIf(tpu=True) +@pl_multi_process_test +def test_model_tpu_one_core(): + """Tests if device/debug flag is set correctely when training and after teardown for TPUSpawnPlugin.""" + trainer = Trainer(tpu_cores=1, fast_dev_run=True, plugin=TPUSpawnPlugin(debug=True)) + # assert training type plugin attributes for device setting + assert isinstance(trainer.training_type_plugin, TPUSpawnPlugin) + assert not trainer.training_type_plugin.on_gpu + assert trainer.training_type_plugin.on_tpu + assert trainer.training_type_plugin.root_device == torch.device("xla") + model = BoringModelTPU() + trainer.fit(model) + assert "PT_XLA_DEBUG" not in os.environ