Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run ddp_spawn dataloader checks on windows #6930

Merged
merged 6 commits into from
Apr 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 35 additions & 36 deletions pytorch_lightning/trainer/data_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.
import inspect
import multiprocessing
import platform
from abc import ABC
from copy import deepcopy
from typing import Iterable, List, Tuple, Union
Expand Down Expand Up @@ -54,53 +53,53 @@ class TrainerDataLoadingMixin(ABC):
dev_debugger: InternalDebugger

def _worker_check(self, dataloader: DataLoader, name: str) -> None:
on_windows = platform.system() == 'Windows'
if not isinstance(dataloader, DataLoader):
return

# ddp_spawn + num_workers > 0 don't mix! tell the user
is_dataloader = isinstance(dataloader, DataLoader)
using_spawn = self.accelerator_connector.distributed_backend == "ddp_spawn"
if is_dataloader and not on_windows:
if dataloader.num_workers > 0 and using_spawn:
# checks for the attr persistent_workers available in pytorch >= 1.7
if hasattr(dataloader, "persistent_workers"):
if not dataloader.persistent_workers:
rank_zero_warn(
'num_workers>0, persistent_workers=False, and accelerator=ddp_spawn'
' may result in data loading bottlenecks.'
' Consider setting persistent_workers=True'
' (this is a limitation of Python .spawn() and PyTorch)'
)
else:
num_cpus = multiprocessing.cpu_count()

# ddp_spawn + num_workers > 0 don't mix! tell the user
if dataloader.num_workers > 0 and using_spawn:
# checks for the attr persistent_workers available in pytorch >= 1.7
if hasattr(dataloader, "persistent_workers"):
if not dataloader.persistent_workers:
rank_zero_warn(
'num_workers>0 and accelerator=ddp_spawn do not mix well'
' and may result in data loading bottlenecks.'
' Consider setting accelerator=ddp to use num_workers>0'
'num_workers>0, persistent_workers=False, and accelerator=ddp_spawn'
' may result in data loading bottlenecks.'
' Consider setting persistent_workers=True'
' (this is a limitation of Python .spawn() and PyTorch)'
)
else:
rank_zero_warn(
'num_workers>0 and accelerator=ddp_spawn do not mix well'
' and may result in data loading bottlenecks.'
' Consider setting accelerator=ddp to use num_workers>0'
' (this is a limitation of Python .spawn() and PyTorch)'
)

elif dataloader.num_workers == 0 and using_spawn:
# checks for the attr persistent_workers available in pytorch >= 1.7
if hasattr(dataloader, "persistent_workers"):
if not dataloader.persistent_workers:
rank_zero_warn(
'accelerator=ddp_spawn and num_workers=0 may result in data loading bottlenecks.'
' Consider setting num_workers>0 and persistent_workers=True'
)
else:
elif dataloader.num_workers == 0 and using_spawn:
# checks for the attr persistent_workers available in pytorch >= 1.7
if hasattr(dataloader, "persistent_workers"):
if not dataloader.persistent_workers:
rank_zero_warn(
'accelerator=ddp_spawn and num_workers=0 may result in data loading bottlenecks.'
' Consider setting accelerator=ddp and set num_workers>0'
' Consider setting num_workers>0 and persistent_workers=True'
)

elif dataloader.num_workers <= 2 and multiprocessing.cpu_count() > 2 and not using_spawn:
num_cpus = multiprocessing.cpu_count()
else:
rank_zero_warn(
f'The dataloader, {name}, does not have many workers which may be a bottleneck.'
' Consider increasing the value of the `num_workers` argument`'
f' (try {num_cpus} which is the number of cpus on this machine)'
f' in the `DataLoader` init to improve performance.'
'accelerator=ddp_spawn and num_workers=0 may result in data loading bottlenecks.'
' Consider setting accelerator=ddp and set num_workers>0'
)

elif dataloader.num_workers <= 2 < num_cpus and not using_spawn:
rank_zero_warn(
f'The dataloader, {name}, does not have many workers which may be a bottleneck.'
' Consider increasing the value of the `num_workers` argument`'
f' (try {num_cpus} which is the number of cpus on this machine)'
f' in the `DataLoader` init to improve performance.'
)

def auto_add_sampler(self, dataloader: DataLoader, shuffle: bool) -> DataLoader:

# don't do anything if it's not a dataloader
Expand Down
15 changes: 10 additions & 5 deletions tests/models/test_horovod.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ def _run_horovod(trainer_options, on_gpu=False):
# for Horovod, we interpret `gpus` to be set per worker
trainer_options.update(gpus=1 if on_gpu else None)
tutils.reset_seed()
# todo: Find why coverage breaks CI.
# TODO: Find out why coverage breaks CI.
# append = '-a' if '.coverage' in os.listdir(_PROJECT_ROOT) else ''
# str(num_processes), sys.executable, '-m', 'coverage', 'run', '--source', 'pytorch_lightning', append, # noqa E265
# str(num_processes), sys.executable, '-m', 'coverage', 'run', '--source', 'pytorch_lightning', append,
cmdline = [
'horovodrun', '-np',
str(num_processes), sys.executable, TEST_SCRIPT, '--trainer-options',
Expand Down Expand Up @@ -151,9 +151,10 @@ def test_horovod_multi_gpu_grad_by_value(tmpdir):
_run_horovod(trainer_options, on_gpu=True)


# todo: need to be fixed :]
# https://discuss.pytorch.org/t/torch-cuda-amp-vs-nvidia-apex/74994
Borda marked this conversation as resolved.
Show resolved Hide resolved
# Check with (tgaddair) on Horovod issues if this feature is needed
@pytest.mark.skip(reason="Horovod currently doesn't work with Apex") # todo
@pytest.mark.skip(reason="TODO: Horovod currently doesn't work with Apex")
@RunIf(min_gpus=2, skip_windows=True, amp_apex=True, horovod_nccl=True)
def test_horovod_apex(tmpdir):
"""Test Horovod with multi-GPU support using apex amp."""
Expand Down Expand Up @@ -240,6 +241,8 @@ def validation_step(self, batch, *args, **kwargs):
tpipes.run_model_test_without_loggers(trainer_options, model)


# todo: need to be fixed :]
@pytest.mark.skip('TODO: flaky test - Fatal Python error: Aborted')
Borda marked this conversation as resolved.
Show resolved Hide resolved
@RunIf(skip_windows=True, horovod=True)
def test_horovod_multi_optimizer(tmpdir):
model = BasicGAN()
Expand Down Expand Up @@ -272,7 +275,8 @@ def get_optimizer_params(optimizer):
assert get_model_params(model.discriminator) == get_optimizer_params(trainer.optimizers[1])


@pytest.mark.skipif(reason="CI agent.jobstatus=Succeeded: Permission denied")
# todo: need to be fixed :]
@pytest.mark.skip(reason="TODO: CI agent.jobstatus=Succeeded: Permission denied")
Borda marked this conversation as resolved.
Show resolved Hide resolved
@RunIf(skip_windows=True, horovod=True)
def test_result_reduce_horovod(tmpdir):
"""Make sure result logging works with Horovod.
Expand Down Expand Up @@ -322,7 +326,8 @@ def training_epoch_end(self, outputs) -> None:
horovod.run(hvd_test_fn, np=2)


@pytest.mark.skipif(reason="CI agent.jobstatus=Succeeded: Permission denied")
# todo: need to be fixed :]
@pytest.mark.skip(reason="TODO: CI agent.jobstatus=Succeeded: Permission denied")
Borda marked this conversation as resolved.
Show resolved Hide resolved
@RunIf(skip_windows=True, horovod=True, num_gpus=2)
def test_accuracy_metric_horovod():
num_batches = 10
Expand Down