diff --git a/.github/workflows/amd-mi100.yml b/.github/workflows/amd-mi100.yml index a6c7cb5d567e..7ad0f4178db4 100644 --- a/.github/workflows/amd-mi100.yml +++ b/.github/workflows/amd-mi100.yml @@ -23,7 +23,7 @@ jobs: - name: Install pytorch run: | - pip install torch==1.13.1 torchvision --extra-index-url https://download.pytorch.org/whl/rocm5.1.1 + pip install --cache-dir $TORCH_CACHE torch==1.13.1 torchvision --extra-index-url https://download.pytorch.org/whl/rocm5.1.1 python -c "import torch; print('torch:', torch.__version__, torch)" python -c "import torch; print('CUDA available:', torch.cuda.is_available())" @@ -50,7 +50,7 @@ jobs: # Runs a set of commands using the runners shell - name: Unit tests run: | - if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi + unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch cd tests - TORCH_EXTENSIONS_DIR=./torch-extensions pytest -n 4 --verbose unit/ - TORCH_EXTENSIONS_DIR=./torch-extensions pytest -m 'sequential' unit/ + pytest $PYTEST_OPTS -n 4 --verbose unit/ + pytest $PYTEST_OPTS -m 'sequential' unit/ diff --git a/.github/workflows/amd-mi200.yml b/.github/workflows/amd-mi200.yml index c8e1192d1d56..b99fa927e9dc 100644 --- a/.github/workflows/amd-mi200.yml +++ b/.github/workflows/amd-mi200.yml @@ -23,7 +23,7 @@ jobs: - name: Install pytorch run: | - pip install -U --cache-dir /blob/torch_cache torch torchvision --extra-index-url https://download.pytorch.org/whl/rocm5.4.2 + pip install -U --cache-dir $TORCH_CACHE torch torchvision --extra-index-url https://download.pytorch.org/whl/rocm5.4.2 python -c "import torch; print('torch:', torch.__version__, torch)" python -c "import torch; print('CUDA available:', torch.cuda.is_available())" @@ -58,7 +58,7 @@ jobs: # Runs a set of commands using the runners shell - name: Unit tests run: | - if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi + unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch cd tests - TORCH_EXTENSIONS_DIR=./torch-extensions pytest -n 4 --verbose unit/ - TORCH_EXTENSIONS_DIR=./torch-extensions pytest -m 'sequential' unit/ + pytest $PYTEST_OPTS -n 4 --verbose unit/ + pytest $PYTEST_OPTS -m 'sequential' unit/ diff --git a/.github/workflows/cpu-inference.yml b/.github/workflows/cpu-inference.yml index 0c32bd5ecc62..62915c90e935 100644 --- a/.github/workflows/cpu-inference.yml +++ b/.github/workflows/cpu-inference.yml @@ -75,7 +75,6 @@ jobs: run: | source oneCCL/build/_install/env/setvars.sh unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch - if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi cd tests - TRANSFORMERS_CACHE=~/tmp/transformers_cache/ TORCH_EXTENSIONS_DIR=./torch-extensions pytest -m 'inference' unit/inference/test_inference_config.py - TRANSFORMERS_CACHE=~/tmp/transformers_cache/ TORCH_EXTENSIONS_DIR=./torch-extensions pytest -k TestDistAllReduce unit/comm/test_dist.py + TRANSFORMERS_CACHE=~/tmp/transformers_cache/ pytest $PYTEST_OPTS -m 'inference' unit/inference/test_inference_config.py + TRANSFORMERS_CACHE=~/tmp/transformers_cache/ pytest $PYTEST_OPTS -k TestDistAllReduce unit/comm/test_dist.py diff --git a/.github/workflows/nv-accelerate-v100.yml b/.github/workflows/nv-accelerate-v100.yml index 24aa9c9347dd..5bd0c22c8b98 100644 --- a/.github/workflows/nv-accelerate-v100.yml +++ b/.github/workflows/nv-accelerate-v100.yml @@ -26,7 +26,7 @@ jobs: - name: Install pytorch run: | - pip install -U --cache-dir /blob/torch_cache torch torchvision --extra-index-url https://download.pytorch.org/whl/cu111 + pip install -U --cache-dir $TORCH_CACHE torch torchvision --extra-index-url https://download.pytorch.org/whl/cu111 python -c "import torch; print('torch:', torch.__version__, torch)" python -c "import torch; print('CUDA available:', torch.cuda.is_available())" @@ -41,7 +41,7 @@ jobs: - name: HF Accelerate tests run: | - if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi + unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch git clone https://github.com/huggingface/accelerate cd accelerate git rev-parse --short HEAD @@ -52,4 +52,4 @@ jobs: # tmp fix: force newer datasets version #pip install "datasets>=2.0.0" pip list - HF_DATASETS_CACHE=/blob/datasets_cache/ TRANSFORMERS_CACHE=/blob/transformers_cache/ TORCH_EXTENSIONS_DIR=./torch-extensions pytest --color=yes --durations=0 --verbose tests/deepspeed + pytest $PYTEST_OPTS --color=yes --durations=0 --verbose tests/deepspeed diff --git a/.github/workflows/nv-h100.yml b/.github/workflows/nv-h100.yml index 9f5d8ec7b110..33f248c4299f 100644 --- a/.github/workflows/nv-h100.yml +++ b/.github/workflows/nv-h100.yml @@ -46,7 +46,6 @@ jobs: - name: Unit tests run: | unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch - if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi cd tests - TORCH_EXTENSIONS_DIR=./torch-extensions python -m pytest -n 4 unit/ --torch_ver="2.0" --cuda_ver="12" - TORCH_EXTENSIONS_DIR=./torch-extensions python -m pytest -m 'sequential' unit/ --torch_ver="2.0" --cuda_ver="12" + python -m pytest $PYTEST_OPTS -n 4 unit/ --torch_ver="2.0" --cuda_ver="12" + python -m pytest $PYTEST_OPTS -m 'sequential' unit/ --torch_ver="2.0" --cuda_ver="12" diff --git a/.github/workflows/nv-inference.yml b/.github/workflows/nv-inference.yml index 2241087f0d9d..61f85d8d0598 100644 --- a/.github/workflows/nv-inference.yml +++ b/.github/workflows/nv-inference.yml @@ -26,7 +26,7 @@ jobs: - name: Install pytorch run: | - pip install -U --cache-dir /blob/torch_cache torch==1.13.1 torchvision --extra-index-url https://download.pytorch.org/whl/cu116 + pip install -U --cache-dir $TORCH_CACHE torch==1.13.1 torchvision --extra-index-url https://download.pytorch.org/whl/cu116 python -c "import torch; print('torch:', torch.__version__, torch)" python -c "import torch; print('CUDA available:', torch.cuda.is_available())" @@ -49,8 +49,13 @@ jobs: - name: Unit tests run: | unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch - if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi cd tests - TRANSFORMERS_CACHE=/blob/transformers_cache/ TORCH_EXTENSIONS_DIR=./torch-extensions pytest -m 'seq_inference' unit/ --torch_ver="1.13" --cuda_ver="11.6" - TRANSFORMERS_CACHE=/blob/transformers_cache/ TORCH_EXTENSIONS_DIR=./torch-extensions pytest -m 'inference_ops' unit/ --torch_ver="1.13" --cuda_ver="11.6" - TRANSFORMERS_CACHE=/blob/transformers_cache/ TORCH_EXTENSIONS_DIR=./torch-extensions pytest --forked -n 4 -m 'inference' unit/ --torch_ver="1.13" --cuda_ver="11.6" + coverage run --concurrency=multiprocessing -m pytest $PYTEST_OPTS -m 'seq_inference' unit/ --torch_ver="1.13" --cuda_ver="11.6" + coverage run --concurrency=multiprocessing -m pytest $PYTEST_OPTS -m 'inference_ops' unit/ --torch_ver="1.13" --cuda_ver="11.6" + coverage run --concurrency=multiprocessing -m pytest $PYTEST_OPTS --forked -n 4 -m 'inference' unit/ --torch_ver="1.13" --cuda_ver="11.6" + + - name: Coverage report + run: | + cd tests + coverage combine + coverage report -m diff --git a/.github/workflows/nv-lightning-v100.yml b/.github/workflows/nv-lightning-v100.yml index 4a3909bfaab1..341dea57b663 100644 --- a/.github/workflows/nv-lightning-v100.yml +++ b/.github/workflows/nv-lightning-v100.yml @@ -26,7 +26,7 @@ jobs: - name: Install pytorch run: | - pip install -U --cache-dir /blob/torch_cache torch torchvision --extra-index-url https://download.pytorch.org/whl/cu116 + pip install -U --cache-dir $TORCH_CACHE torch torchvision --extra-index-url https://download.pytorch.org/whl/cu116 python -c "import torch; print('torch:', torch.__version__, torch)" python -c "import torch; print('CUDA available:', torch.cuda.is_available())" @@ -41,8 +41,8 @@ jobs: - name: PyTorch Lightning Tests run: | - if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi + unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch pip install pytorch-lightning pip install "protobuf<4.21.0" cd tests - TORCH_EXTENSIONS_DIR=./torch-extensions pytest --color=yes --durations=0 --verbose lightning/ + pytest $PYTEST_OPTS lightning/ diff --git a/.github/workflows/nv-megatron.yml b/.github/workflows/nv-megatron.yml index c602cc1c6f44..2fb9e37e5e9c 100644 --- a/.github/workflows/nv-megatron.yml +++ b/.github/workflows/nv-megatron.yml @@ -26,7 +26,7 @@ jobs: - name: Install pytorch run: | - pip install -U --cache-dir /blob/torch_cache torch==1.13.1 torchvision --extra-index-url https://download.pytorch.org/whl/cu116 + pip install -U --cache-dir $TORCH_CACHE torch==1.13.1 torchvision --extra-index-url https://download.pytorch.org/whl/cu116 python -c "import torch; print('torch:', torch.__version__, torch)" python -c "import torch; print('CUDA available:', torch.cuda.is_available())" @@ -57,6 +57,5 @@ jobs: cd Megatron-DeepSpeed pip install . unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch - if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi cd tests - MEGATRON_CKPT_DIR=/blob/megatron_ckpt/ TORCH_EXTENSIONS_DIR=./torch-extensions pytest --color=yes --durations=0 --verbose ./ + pytest $PYTEST_OPTS ./ diff --git a/.github/workflows/nv-mii.yml b/.github/workflows/nv-mii.yml index e426b850ca93..a8c5a9afefcc 100644 --- a/.github/workflows/nv-mii.yml +++ b/.github/workflows/nv-mii.yml @@ -26,7 +26,7 @@ jobs: - name: Install pytorch run: | - pip3 install -U --cache-dir /blob/torch_cache torch + pip3 install -U --cache-dir $TORCH_CACHE torch python -c "import torch; print('torch:', torch.__version__, torch)" python -c "import torch; print('CUDA available:', torch.cuda.is_available())" @@ -54,6 +54,5 @@ jobs: cd DeepSpeed-MII pip install .[dev] unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch - if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi cd tests - TRANSFORMERS_CACHE=/blob/transformers_cache/ TORCH_EXTENSIONS_DIR=./torch-extensions pytest --color=yes --durations=0 --forked --verbose -m "deepspeed" ./ + pytest $PYTEST_OPTS --forked -m "deepspeed" ./ diff --git a/.github/workflows/nv-nightly.yml b/.github/workflows/nv-nightly.yml index 309514e7eb4c..e2128d9dd2bb 100644 --- a/.github/workflows/nv-nightly.yml +++ b/.github/workflows/nv-nightly.yml @@ -20,7 +20,7 @@ jobs: - name: Install pytorch run: | - pip install -U --cache-dir /blob/torch_cache torch==1.13.1 torchvision --extra-index-url https://download.pytorch.org/whl/cu116 + pip install -U --cache-dir $TORCH_CACHE torch==1.13.1 torchvision --extra-index-url https://download.pytorch.org/whl/cu116 python -c "import torch; print('torch:', torch.__version__, torch)" python -c "import torch; print('CUDA available:', torch.cuda.is_available())" @@ -45,6 +45,5 @@ jobs: - name: Unit tests run: | unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch - if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi cd tests - TRANSFORMERS_CACHE=/blob/transformers_cache/ TORCH_EXTENSIONS_DIR=./torch-extensions pytest --forked -m 'nightly' unit/ --torch_ver="1.13" --cuda_ver="11.6" + pytest $PYTEST_OPTS --forked -m 'nightly' unit/ --torch_ver="1.13" --cuda_ver="11.6" diff --git a/.github/workflows/nv-torch-latest-cpu.yml b/.github/workflows/nv-torch-latest-cpu.yml index 7dc297803531..3c2b7301acf0 100644 --- a/.github/workflows/nv-torch-latest-cpu.yml +++ b/.github/workflows/nv-torch-latest-cpu.yml @@ -42,7 +42,6 @@ jobs: - name: Unit tests run: | unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch - if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi cd tests - TORCH_EXTENSIONS_DIR=./torch-extensions pytest -n 4 unit/ --torch_ver="1.12" - TORCH_EXTENSIONS_DIR=./torch-extensions pytest -m 'sequential' unit/ --torch_ver="1.12" + TRANSFORMERS_CACHE=/tmp/transformers_cache/ pytest $PYTEST_OPTS -n 4 unit/ --torch_ver="1.12" + TRANSFORMERS_CACHE=/tmp/transformers_cache/ pytest $PYTEST_OPTS -m 'sequential' unit/ --torch_ver="1.12" diff --git a/.github/workflows/nv-torch-latest-v100.yml b/.github/workflows/nv-torch-latest-v100.yml index c58ae0841100..544fb50acec3 100644 --- a/.github/workflows/nv-torch-latest-v100.yml +++ b/.github/workflows/nv-torch-latest-v100.yml @@ -26,7 +26,7 @@ jobs: - name: Install pytorch run: | - pip install -U --cache-dir /blob/torch_cache torch torchvision --extra-index-url https://download.pytorch.org/whl/cu116 + pip install -U --cache-dir $TORCH_CACHE torch torchvision --extra-index-url https://download.pytorch.org/whl/cu116 python -c "import torch; print('torch:', torch.__version__, torch)" python -c "import torch; print('CUDA available:', torch.cuda.is_available())" @@ -51,7 +51,12 @@ jobs: - name: Unit tests run: | unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch - if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi cd tests - TORCH_EXTENSIONS_DIR=./torch-extensions pytest --forked -n 4 unit/ --torch_ver="2.0" --cuda_ver="11.7" - TORCH_EXTENSIONS_DIR=./torch-extensions pytest --forked -m 'sequential' unit/ --torch_ver="2.0" --cuda_ver="11.7" + coverage run --concurrency=multiprocessing -m pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="2.0" --cuda_ver="11.7" + coverage run --concurrency=multiprocessing -m pytest $PYTEST_OPTS --forked -m 'sequential' unit/ --torch_ver="2.0" --cuda_ver="11.7" + + - name: Coverage report + run: | + cd tests + coverage combine + coverage report -m diff --git a/.github/workflows/nv-torch-nightly-v100.yml b/.github/workflows/nv-torch-nightly-v100.yml index 36af529523a0..0ce900cde4f9 100644 --- a/.github/workflows/nv-torch-nightly-v100.yml +++ b/.github/workflows/nv-torch-nightly-v100.yml @@ -45,7 +45,6 @@ jobs: - name: Unit tests run: | unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch - if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi cd tests - TORCH_EXTENSIONS_DIR=./torch-extensions pytest --forked -n 4 unit/ - TORCH_EXTENSIONS_DIR=./torch-extensions pytest --forked -m 'sequential' unit/ + pytest $PYTEST_OPTS --forked -n 4 unit/ + pytest $PYTEST_OPTS --forked -m 'sequential' unit/ diff --git a/.github/workflows/nv-torch19-p40.yml b/.github/workflows/nv-torch19-p40.yml index 6ae41651e9d2..b9ff936c7857 100644 --- a/.github/workflows/nv-torch19-p40.yml +++ b/.github/workflows/nv-torch19-p40.yml @@ -20,7 +20,7 @@ jobs: - name: Install pytorch run: | - pip install -U --cache-dir /blob/torch_cache torch==1.9.0+cu111 torchvision==0.10.0+cu111 -f https://download.pytorch.org/whl/torch_stable.html + pip install -U --cache-dir $TORCH_CACHE torch==1.9.0+cu111 torchvision==0.10.0+cu111 -f https://download.pytorch.org/whl/torch_stable.html python -c "import torch; print('torch:', torch.__version__, torch)" python -c "import torch; print('CUDA available:', torch.cuda.is_available())" @@ -44,6 +44,6 @@ jobs: - name: Unit tests run: | - if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi + unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch cd tests - TORCH_EXTENSIONS_DIR=./torch-extensions pytest --forked -n 4 unit/ --torch_ver="1.9" --cuda_ver="11.1" + pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="1.9" --cuda_ver="11.1" diff --git a/.github/workflows/nv-torch19-v100.yml b/.github/workflows/nv-torch19-v100.yml index fa4d58d13f7e..61abe0f601c0 100644 --- a/.github/workflows/nv-torch19-v100.yml +++ b/.github/workflows/nv-torch19-v100.yml @@ -20,7 +20,7 @@ jobs: - name: Install pytorch run: | - pip install -U --cache-dir /blob/torch_cache torch==1.9.0+cu111 torchvision==0.10.0+cu111 -f https://download.pytorch.org/whl/torch_stable.html + pip install -U --cache-dir $TORCH_CACHE torch==1.9.0+cu111 torchvision==0.10.0+cu111 -f https://download.pytorch.org/whl/torch_stable.html python -c "import torch; print('torch:', torch.__version__, torch)" python -c "import torch; print('CUDA available:', torch.cuda.is_available())" @@ -45,7 +45,6 @@ jobs: - name: Unit tests run: | unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch - if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi cd tests - TORCH_EXTENSIONS_DIR=./torch-extensions pytest --forked -n 4 unit/ --torch_ver="1.9" --cuda_ver="11" - TORCH_EXTENSIONS_DIR=./torch-extensions pytest --forked -m 'sequential' unit/ --torch_ver="1.9" --cuda_ver="11" + pytest $PYTEST_OPTS --forked -n 4 unit/ --torch_ver="1.9" --cuda_ver="11" + pytest $PYTEST_OPTS --forked -m 'sequential' unit/ --torch_ver="1.9" --cuda_ver="11" diff --git a/.github/workflows/nv-transformers-v100.yml b/.github/workflows/nv-transformers-v100.yml index 63d66efbc040..22a341e8b687 100644 --- a/.github/workflows/nv-transformers-v100.yml +++ b/.github/workflows/nv-transformers-v100.yml @@ -27,7 +27,7 @@ jobs: - name: Install pytorch run: | # use the same pytorch version as transformers CI - pip install -U --cache-dir /blob/torch_cache torch torchvision torchaudio -f https://download.pytorch.org/whl/torch_stable.html + pip install -U --cache-dir $TORCH_CACHE torch torchvision torchaudio -f https://download.pytorch.org/whl/torch_stable.html python -c "import torch; print('torch:', torch.__version__, torch)" python -c "import torch; print('CUDA available:', torch.cuda.is_available())" @@ -42,7 +42,7 @@ jobs: - name: HF transformers tests run: | - if [[ -d ./torch-extensions ]]; then rm -rf ./torch-extensions; fi + unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch git clone https://github.com/huggingface/transformers cd transformers # if needed switch to the last known good SHA until transformers@master is fixed @@ -57,4 +57,4 @@ jobs: # force protobuf version due to issues pip install "protobuf<4.21.0" pip list - HF_DATASETS_CACHE=/blob/datasets_cache/ TRANSFORMERS_CACHE=/blob/transformers_cache/ WANDB_DISABLED=true TORCH_EXTENSIONS_DIR=./torch-extensions RUN_SLOW=1 pytest --color=yes --durations=0 --verbose tests/deepspeed + WANDB_DISABLED=true RUN_SLOW=1 pytest $PYTEST_OPTS tests/deepspeed diff --git a/.github/workflows/setup-venv/action.yml b/.github/workflows/setup-venv/action.yml index 1828b2ed9eef..cfffd0abeafe 100644 --- a/.github/workflows/setup-venv/action.yml +++ b/.github/workflows/setup-venv/action.yml @@ -18,6 +18,16 @@ runs: pip install wheel # required after pip>=23.1 echo PATH=$PATH >> $GITHUB_ENV # Make it so venv is inherited for other steps shell: bash + - id: set-env-vars + run: | + echo TEST_DATA_DIR=/blob/ >> $GITHUB_ENV + echo TRANSFORMERS_CACHE=/blob/transformers_cache/ >> $GITHUB_ENV + echo TORCH_EXTENSIONS_DIR=./torch-extensions/ >> $GITHUB_ENV + echo TORCH_CACHE=/blob/torch_cache/ >> $GITHUB_ENV + echo HF_DATASETS_CACHE=/blob/datasets_cache/ >> $GITHUB_ENV + echo MEGATRON_CKPT_DIR=/blob/megatron_ckpt/ >> $GITHUB_ENV + echo PYTEST_OPTS="--color=yes --durations=0 --verbose -rF" >> $GITHUB_ENV + shell: bash - id: print-env run: | which python diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 731b67a26814..0106e3785459 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -1,4 +1,5 @@ clang-format==16.0.2 +coverage docutils<0.18 future importlib-metadata>=4 diff --git a/tests/.coveragerc b/tests/.coveragerc new file mode 100644 index 000000000000..dccaba6b57a3 --- /dev/null +++ b/tests/.coveragerc @@ -0,0 +1,5 @@ +# .coveragerc to control coverage.py +[run] +parallel = True +sigterm = True +source = deepspeed diff --git a/tests/conftest.py b/tests/conftest.py index e5a8cce45fd9..45e8434a021b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -70,10 +70,18 @@ def pytest_runtest_call(item): item.runtest = lambda: True # Dummy function so test is not run twice +# We allow DistributedTest to reuse distributed environments. When the last +# test for a class is run, we want to make sure those distributed environments +# are destroyed. +def pytest_runtest_teardown(item, nextitem): + if getattr(item.cls, "reuse_dist_env", False) and not nextitem: + dist_test_class = item.cls() + for num_procs, pool in dist_test_class._pool_cache.items(): + dist_test_class._close_pool(pool, num_procs, force=True) + + @pytest.hookimpl(tryfirst=True) def pytest_fixture_setup(fixturedef, request): if getattr(fixturedef.func, "is_dist_fixture", False): - #for val in dir(request): - # print(val.upper(), getattr(request, val), "\n") dist_fixture_class = fixturedef.func() dist_fixture_class(request) diff --git a/tests/unit/alexnet_model.py b/tests/unit/alexnet_model.py index 7f9e37f289f0..8ec349804eb1 100644 --- a/tests/unit/alexnet_model.py +++ b/tests/unit/alexnet_model.py @@ -4,6 +4,7 @@ # DeepSpeed Team import pytest +import os import torch import torch.nn as nn import torch.nn.functional as F @@ -98,7 +99,12 @@ def cifar_trainset(fp16=False): dist.barrier() if local_rank != 0: dist.barrier() - trainset = torchvision.datasets.CIFAR10(root='/blob/cifar10-data', train=True, download=True, transform=transform) + + data_root = os.getenv("TEST_DATA_DIR", "/tmp/") + trainset = torchvision.datasets.CIFAR10(root=os.path.join(data_root, "cifar10-data"), + train=True, + download=True, + transform=transform) if local_rank == 0: dist.barrier() return trainset @@ -114,6 +120,18 @@ def train_cifar(model, config, num_steps=400, average_dp_losses=True, fp16=True, trainset = cifar_trainset(fp16=fp16) config['local_rank'] = dist.get_rank() + # deepspeed_io defaults to creating a dataloader that uses a + # multiprocessing pool. Our tests use pools and we cannot nest pools in + # python. Therefore we're injecting this kwarg to ensure that no pools + # are used in the dataloader. + old_method = deepspeed.runtime.engine.DeepSpeedEngine.deepspeed_io + + def new_method(*args, **kwargs): + kwargs["num_local_io_workers"] = 0 + return old_method(*args, **kwargs) + + deepspeed.runtime.engine.DeepSpeedEngine.deepspeed_io = new_method + engine, _, _, _ = deepspeed.initialize(config=config, model=model, model_parameters=[p for p in model.parameters()], diff --git a/tests/unit/common.py b/tests/unit/common.py index a4c64de18682..4cfbd66b88ca 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -4,8 +4,11 @@ # DeepSpeed Team import os +import re import time import inspect +import socket +import subprocess from abc import ABC, abstractmethod from pathlib import Path @@ -14,7 +17,6 @@ import deepspeed from deepspeed.accelerator import get_accelerator import deepspeed.comm as dist -from torch.multiprocessing import Process import pytest from _pytest.outcomes import Skipped @@ -40,11 +42,10 @@ def get_xdist_worker_id(): def get_master_port(): - master_port = os.environ.get('DS_TEST_PORT', '29503') - xdist_worker_id = get_xdist_worker_id() - if xdist_worker_id is not None: - master_port = str(int(master_port) + xdist_worker_id) - return master_port + # Select a random open port + with socket.socket() as s: + s.bind(('', 0)) + return str(s.getsockname()[1]) def set_accelerator_visible(): @@ -54,7 +55,6 @@ def set_accelerator_visible(): xdist_worker_id = 0 if cuda_visible is None: # CUDA_VISIBLE_DEVICES is not set, discover it using accelerator specific command instead - import subprocess if get_accelerator().device_name() == 'cuda': if is_rocm_pytorch(): rocm_smi = subprocess.check_output(['rocm-smi', '--showid']) @@ -64,7 +64,6 @@ def set_accelerator_visible(): nvidia_smi = subprocess.check_output(['nvidia-smi', '--list-gpus']) num_accelerators = len(nvidia_smi.decode('utf-8').strip().split('\n')) elif get_accelerator().device_name() == 'xpu': - import re clinfo = subprocess.check_output(['clinfo']) lines = clinfo.decode('utf-8').strip().split('\n') num_accelerators = 0 @@ -100,6 +99,8 @@ class DistributedExec(ABC): init_distributed = True set_dist_env = True requires_cuda_env = True + reuse_dist_env = False + _pool_cache = {} @abstractmethod def run(self): @@ -115,7 +116,6 @@ def __call__(self, request=None): world_size = [world_size] for procs in world_size: self._launch_procs(procs) - time.sleep(0.5) def _get_fixture_kwargs(self, request, func): if not request: @@ -132,92 +132,92 @@ def _get_fixture_kwargs(self, request, func): return fixture_kwargs def _launch_procs(self, num_procs): + # Verify we have enough accelerator devices to run this test if get_accelerator().is_available() and get_accelerator().device_count() < num_procs: pytest.skip( f"Skipping test because not enough GPUs are available: {num_procs} required, {get_accelerator().device_count()} available" ) + + # Set start method to `forkserver` (or `fork`) mp.set_start_method('forkserver', force=True) - skip_msg = mp.Queue() # Allows forked processes to share pytest.skip reason - processes = [] - for local_rank in range(num_procs): - p = Process(target=self._dist_init, args=(local_rank, num_procs, skip_msg)) - p.start() - processes.append(p) - - # Now loop and wait for a test to complete. The spin-wait here isn't a big - # deal because the number of processes will be O(#GPUs) << O(#CPUs). - any_done = False - start = time.time() - while (not any_done) and ((time.time() - start) < DEEPSPEED_TEST_TIMEOUT): - for p in processes: - if not p.is_alive(): - any_done = True - break - time.sleep(.1) # So we don't hog CPU - - # If we hit the timeout, then presume a test is hanged - if not any_done: - for p in processes: - p.terminate() + + # Create process pool or use cached one + master_port = None + if self.reuse_dist_env: + if num_procs not in self._pool_cache: + self._pool_cache[num_procs] = mp.Pool(processes=num_procs) + master_port = get_master_port() + pool = self._pool_cache[num_procs] + else: + pool = mp.Pool(processes=num_procs) + master_port = get_master_port() + + # Run the test + args = [(local_rank, num_procs, master_port) for local_rank in range(num_procs)] + skip_msgs_async = pool.starmap_async(self._dist_run, args) + + try: + skip_msgs = skip_msgs_async.get(DEEPSPEED_TEST_TIMEOUT) + except mp.TimeoutError: + # Shortcut to exit pytest in the case of a hanged test. This + # usually means an environment error and the rest of tests will + # hang (causing super long unit test runtimes) pytest.exit("Test hanged, exiting", returncode=0) - # Wait for all other processes to complete - for p in processes: - p.join(DEEPSPEED_UNIT_WORKER_TIMEOUT) - - failed = [(rank, p) for rank, p in enumerate(processes) if p.exitcode != 0] - for rank, p in failed: - # If it still hasn't terminated, kill it because it hung. - if p.exitcode is None: - p.terminate() - pytest.fail(f'Worker {rank} hung.', pytrace=False) - if p.exitcode < 0: - pytest.fail(f'Worker {rank} killed by signal {-p.exitcode}', pytrace=False) - if p.exitcode > 0: - pytest.fail(f'Worker {rank} exited with code {p.exitcode}', pytrace=False) - - if not skip_msg.empty(): - # This assumed all skip messages are the same, it may be useful to - # add a check here to assert all exit messages are equal - pytest.skip(skip_msg.get()) - - def _dist_init(self, local_rank, num_procs, skip_msg): - """Initialize deepspeed.comm and execute the user function. """ - if self.set_dist_env: - os.environ['MASTER_ADDR'] = '127.0.0.1' - os.environ['MASTER_PORT'] = get_master_port() - os.environ['LOCAL_RANK'] = str(local_rank) - # NOTE: unit tests don't support multi-node so local_rank == global rank - os.environ['RANK'] = str(local_rank) - os.environ['WORLD_SIZE'] = str(num_procs) - - # turn off NCCL logging if set - os.environ.pop('NCCL_DEBUG', None) - - if get_accelerator().is_available(): - set_accelerator_visible() - - if self.init_distributed: - deepspeed.init_distributed(dist_backend=self.backend) - dist.barrier() + # Tear down distributed environment and close process pools + self._close_pool(pool, num_procs) + + # If we skipped a test, propagate that to this process + if any(skip_msgs): + assert len(set(skip_msgs)) == 1, "Multiple different skip messages received" + pytest.skip(skip_msgs[0]) + + def _dist_run(self, local_rank, num_procs, master_port): + skip_msg = '' + if not dist.is_initialized(): + """ Initialize deepspeed.comm and execute the user function. """ + if self.set_dist_env: + os.environ['MASTER_ADDR'] = '127.0.0.1' + os.environ['MASTER_PORT'] = str(master_port) + os.environ['LOCAL_RANK'] = str(local_rank) + # NOTE: unit tests don't support multi-node so local_rank == global rank + os.environ['RANK'] = str(local_rank) + os.environ['WORLD_SIZE'] = str(num_procs) + + # turn off NCCL logging if set + os.environ.pop('NCCL_DEBUG', None) - if get_accelerator().is_available(): - get_accelerator().set_device(local_rank) + if get_accelerator().is_available(): + set_accelerator_visible() + + if self.init_distributed: + deepspeed.init_distributed(dist_backend=self.backend) + dist.barrier() + + if get_accelerator().is_available(): + get_accelerator().set_device(local_rank) try: self.run(**self._fixture_kwargs) except BaseException as e: if isinstance(e, Skipped): - skip_msg.put(e.msg) + skip_msg = e.msg else: raise e - if self.init_distributed or dist.is_initialized(): - # make sure all ranks finish at the same time + return skip_msg + + def _dist_destroy(self): + if (dist is not None) and dist.is_initialized(): dist.barrier() - # tear down after test completes dist.destroy_process_group() + def _close_pool(self, pool, num_procs, force=False): + if force or not self.reuse_dist_env: + msg = pool.starmap(self._dist_destroy, [() for _ in range(num_procs)]) + pool.close() + pool.join() + class DistributedFixture(DistributedExec): """ diff --git a/tests/unit/ops/accelerators/test_accelerator_backward.py b/tests/unit/ops/accelerators/test_accelerator_backward.py index 093c8390f5e6..7dadd104de4d 100644 --- a/tests/unit/ops/accelerators/test_accelerator_backward.py +++ b/tests/unit/ops/accelerators/test_accelerator_backward.py @@ -244,9 +244,7 @@ def run_backward(ds_config, seq_len, atol=1e-2, verbose=False): check_equal(base_grads, ds_grads, atol=atol, verbose=verbose) -#test_backward[3-1024-120-16-24-True-True-0.05] -#test_backward[3-1024-52-16-24-False-True-0.2] -# 3-128-54-2-24-False-True-0.2 +# NOTE: Keep these different params as they have helped find divergence in behavior between AMD and NVIDIA. @pytest.mark.parametrize('batch_size, hidden_size, seq_len, heads, num_layers, is_preln, use_fp16, atol', [ (64,160,128,2,24,False,True, 0.2), @@ -254,12 +252,6 @@ def run_backward(ds_config, seq_len, atol=1e-2, verbose=False): (8,1600,128,25,3,True,True, 0.05), (8,160,128,2,3,True,True, 0.1), (8,1600,128,2,3,True,True, 0.05), - #(3,1024,119,16,24,True,False, 0.05), - #(3,1024,115,16,24,True,True, 0.05), - #(1024,128,10,2,2,False,False, 0.1), - #(3,1024,52,16,24,False,True, 0.2), - #(3,128,51,2,24,False,False, 0.1), - #(3,128,54,2,24,False,True, 0.2), ]) # yapf: disable class TestCUDABackward(DistributedTest): world_size = 1 @@ -267,7 +259,7 @@ class TestCUDABackward(DistributedTest): #This is to flush denorms in forward pass. Please refer to https://github.com/pytorch/pytorch/blob/main/docs/source/notes/numerical_accuracy.rst#reduced-precision-fp16-and-bf16-gemms-and-convolutions-on-amd-instinct-mi200-devices os.environ['ROCBLAS_INTERNAL_FP16_ALT_IMPL'] = '1' - def test_backward(self, batch_size, hidden_size, seq_len, heads, num_layers, is_preln, use_fp16, atol): + def test_backward(self, is_preln, use_fp16, batch_size, hidden_size, seq_len, heads, num_layers, atol): # Only run fp16 test cases on devices with FP16 capability. if not get_accelerator().is_fp16_supported() and (use_fp16 is True or is_preln is False): return @@ -286,38 +278,3 @@ def test_backward(self, batch_size, hidden_size, seq_len, heads, num_layers, is_ ds_config.fp16 = use_fp16 run_backward(ds_config, seq_len, atol=atol, verbose=True) - - # [ - # (3,1024,128,16,24,True,False, 0.07), - # (3,1024,128,16,24,True,True, 0.05), - # (3,1024,128,16,24,False,False, 0.1), - # (3,1024,128,16,24,False,True, 0.2), - # ]) # yapf: disable - #def test_backward_stochastic(batch_size, - # hidden_size, - # seq_len, - # heads, - # num_layers, - # is_preln, - # use_fp16, - # atol): - # # Only run fp16 test cases on devices with FP16 capability. - # if not get_accelerator().is_fp16_supported() and use_fp16 is True: - # return - # - # ds_config = DeepSpeedTransformerConfig() - # ds_config.layer_id = None - # ds_config.batch_size = batch_size - # ds_config.hidden_size = hidden_size - # ds_config.intermediate_size = 4 * hidden_size - # ds_config.max_seq_length = seq_len - # ds_config.heads = heads - # ds_config.attn_dropout_ratio = 0.0 - # ds_config.hidden_dropout_ratio = 0.0 - # ds_config.num_hidden_layers = num_layers - # ds_config.pre_layer_norm = is_preln - # ds_config.initializer_range = 0.02 - # ds_config.fp16 = use_fp16 - # ds_config.stochastic_mode = True - # - # run_backward(ds_config, atol=atol) diff --git a/tests/unit/ops/accelerators/test_accelerator_forward.py b/tests/unit/ops/accelerators/test_accelerator_forward.py index 7c5580e4676a..990a4767fc76 100644 --- a/tests/unit/ops/accelerators/test_accelerator_forward.py +++ b/tests/unit/ops/accelerators/test_accelerator_forward.py @@ -224,6 +224,7 @@ def run_forward(ds_config, seq_len, atol=1e-2, verbose=False, test_bsz=None): ]) # yapf: disable class TestCUDAForward(DistributedTest): world_size = 1 + reuse_dist_env = True def test_forward(self, batch_size, hidden_size, seq_len, heads, num_layers, is_preln, use_fp16): # Only run fp16 test cases on devices with FP16 capability. diff --git a/tests/unit/ops/adagrad/test_cpu_adagrad.py b/tests/unit/ops/adagrad/test_cpu_adagrad.py index d38d42217872..99e934e2efda 100644 --- a/tests/unit/ops/adagrad/test_cpu_adagrad.py +++ b/tests/unit/ops/adagrad/test_cpu_adagrad.py @@ -34,17 +34,7 @@ class TestCPUAdagrad(DistributedTest): init_distributed = False set_dist_env = False - @pytest.mark.parametrize('model_size', - [ - (64), - (22), - (55), - (127), - (1024), - (1048576), - (30000000), - ]) # yapf: disable - def test_cpu_adagrad_opt(self, model_size): + def test_cpu_adagrad_opt(self, model_size=64): device = 'cpu' rng_state = torch.get_rng_state() param = torch.nn.Parameter(torch.randn(model_size, device=device)) @@ -65,14 +55,7 @@ def test_cpu_adagrad_opt(self, model_size): check_equal(param, param1, atol=1e-2, verbose=True) - - @pytest.mark.parametrize('model_size,vocabulary_size,dim', - [ - (16 * 2, 16 * 4, 16), - (16 * 32, 16 * 256, 16), - (16 * 256, 16 * 16384, 16), - ]) # yapf: disable - def test_cpu_adagrad_opt_sparse_embedding(self, model_size, vocabulary_size, dim): + def test_cpu_adagrad_opt_sparse_embedding(self, model_size=32, vocabulary_size=64, dim=16): device = 'cpu' rng_state = torch.get_rng_state() diff --git a/tests/unit/ops/adam/test_adamw.py b/tests/unit/ops/adam/test_adamw.py index 8b6f8101cb77..6a1b0afc2882 100644 --- a/tests/unit/ops/adam/test_adamw.py +++ b/tests/unit/ops/adam/test_adamw.py @@ -36,6 +36,7 @@ adam_configs) class TestAdamConfigs(DistributedTest): world_size = 1 + reuse_dist_env = True def test(self, optimizer, diff --git a/tests/unit/ops/adam/test_cpu_adam.py b/tests/unit/ops/adam/test_cpu_adam.py index a48b7c7f2839..9a6ff6689446 100644 --- a/tests/unit/ops/adam/test_cpu_adam.py +++ b/tests/unit/ops/adam/test_cpu_adam.py @@ -55,6 +55,7 @@ def _compare_optimizers(model_size, param1, optimizer1, param2, optimizer2): ]) # yapf: disable class TestCPUAdam(DistributedTest): world_size = 1 + reuse_dist_env = True requires_cuda_env = False if not get_accelerator().is_available(): init_distributed = False diff --git a/tests/unit/ops/aio/test_aio.py b/tests/unit/ops/aio/test_aio.py index a37bcd9c869b..f6d175ce67bc 100644 --- a/tests/unit/ops/aio/test_aio.py +++ b/tests/unit/ops/aio/test_aio.py @@ -83,6 +83,7 @@ def _validate_handle_state(handle, single_submit, overlap_events): @pytest.mark.parametrize("overlap_events", [True, False]) class TestRead(DistributedTest): world_size = 1 + reuse_dist_env = True requires_cuda_env = False if not get_accelerator().is_available(): init_distributed = False @@ -148,6 +149,7 @@ def test_async_read(self, tmpdir, use_cuda_pinned_tensor, single_submit, overlap @pytest.mark.parametrize("overlap_events", [True, False]) class TestWrite(DistributedTest): world_size = 1 + reuse_dist_env = True requires_cuda_env = False if not get_accelerator().is_available(): init_distributed = False diff --git a/tests/unit/runtime/half_precision/onebit/test_onebit.py b/tests/unit/runtime/half_precision/onebit/test_onebit.py index d3b0a90e2fa5..6f30ebae5bfb 100644 --- a/tests/unit/runtime/half_precision/onebit/test_onebit.py +++ b/tests/unit/runtime/half_precision/onebit/test_onebit.py @@ -8,7 +8,6 @@ import deepspeed.comm as dist import deepspeed import pytest -import copy import os import numpy as np @@ -334,18 +333,10 @@ def test_overflow(self, tmpdir): @pytest.mark.parametrize( "topo_config", [ - { - "num_pp": 1, - "num_dp": 4 - }, { "num_pp": 2, "num_dp": 2 }, - { - "num_pp": 4, - "num_dp": 1 - }, ], ) class TestOneBitAdamFP16Pipeline(DistributedTest): @@ -353,8 +344,8 @@ class TestOneBitAdamFP16Pipeline(DistributedTest): def test(self, topo_config): config_dict = { - "train_batch_size": 16, - "train_micro_batch_size_per_gpu": 4, + "train_batch_size": 4, + "grandient_accumulation_steps": 1, "steps_per_print": 20, "optimizer": { "type": "OneBitAdam", @@ -384,20 +375,12 @@ def test(self, topo_config): } topo = PipeTopo(**topo_config) - steps = 500 # Must be >=100 - - # Allocate model for consistent initial weights. - init_net = AlexNetPipe() + steps = 100 - test_net = copy.deepcopy(init_net) + # TODO: Add correctness tests/asserts comparing with baseline? + test_net = AlexNetPipe() test_model = PipelineModule(layers=test_net.to_layers(), topology=topo, loss_fn=nn.CrossEntropyLoss()) - - test_losses = train_cifar( - test_model, - config=config_dict, - num_steps=steps, - fp16=config_dict["fp16"]["enabled"], - ) + test_losses = train_cifar(test_model, config=config_dict, num_steps=steps, fp16=config_dict['fp16']['enabled']) @pytest.mark.parametrize("dtype", [torch.float32, torch.float16], ids=["fp32", "fp16"]) @@ -707,18 +690,10 @@ def test_overflow(self, tmpdir): @pytest.mark.parametrize( "topo_config", [ - { - "num_pp": 1, - "num_dp": 4 - }, { "num_pp": 2, "num_dp": 2 }, - { - "num_pp": 4, - "num_dp": 1 - }, ], ) class TestZeroOneAdamFP16Pipeline(DistributedTest): @@ -726,8 +701,8 @@ class TestZeroOneAdamFP16Pipeline(DistributedTest): def test(self, topo_config): config_dict = { - "train_batch_size": 16, - "train_micro_batch_size_per_gpu": 4, + "train_batch_size": 4, + "grandient_accumulation_steps": 1, "steps_per_print": 20, "optimizer": { "type": "ZeroOneAdam", @@ -760,20 +735,12 @@ def test(self, topo_config): } topo = PipeTopo(**topo_config) - steps = 500 # Must be >=100 + steps = 100 - # Allocate model for consistent initial weights. - init_net = AlexNetPipe() - - test_net = copy.deepcopy(init_net) + # TODO: Add correctness tests/asserts comparing with baseline? + test_net = AlexNetPipe() test_model = PipelineModule(layers=test_net.to_layers(), topology=topo, loss_fn=nn.CrossEntropyLoss()) - - test_losses = train_cifar( - test_model, - config=config_dict, - num_steps=steps, - fp16=config_dict["fp16"]["enabled"], - ) + test_losses = train_cifar(test_model, config=config_dict, num_steps=steps, fp16=config_dict['fp16']['enabled']) @pytest.mark.parametrize("dtype", [torch.float32, torch.float16], ids=["fp32", "fp16"]) @@ -1109,18 +1076,10 @@ def test_overflow(self, tmpdir): @pytest.mark.parametrize( "topo_config", [ - { - "num_pp": 1, - "num_dp": 4 - }, { "num_pp": 2, "num_dp": 2 }, - { - "num_pp": 4, - "num_dp": 1 - }, ], ) class TestOneBitLambFP16Pipeline(DistributedTest): @@ -1128,8 +1087,8 @@ class TestOneBitLambFP16Pipeline(DistributedTest): def test(self, topo_config): config_dict = { - "train_batch_size": 16, - "train_micro_batch_size_per_gpu": 4, + "train_batch_size": 4, + "grandient_accumulation_steps": 1, "steps_per_print": 20, "optimizer": { "type": "OneBitLamb", @@ -1159,20 +1118,12 @@ def test(self, topo_config): } topo = PipeTopo(**topo_config) - steps = 500 # Must be >=100 - - # Allocate model for consistent initial weights. - init_net = AlexNetPipe() + steps = 100 - test_net = copy.deepcopy(init_net) + # TODO: Add correctness tests/asserts comparing with baseline? + test_net = AlexNetPipe() test_model = PipelineModule(layers=test_net.to_layers(), topology=topo, loss_fn=nn.CrossEntropyLoss()) - - test_losses = train_cifar( - test_model, - config=config_dict, - num_steps=steps, - fp16=config_dict["fp16"]["enabled"], - ) + test_losses = train_cifar(test_model, config=config_dict, num_steps=steps, fp16=config_dict['fp16']['enabled']) @pytest.mark.sequential diff --git a/tests/unit/runtime/half_precision/test_fp16.py b/tests/unit/runtime/half_precision/test_fp16.py index 6d88af00078a..3cc277fa8dc1 100644 --- a/tests/unit/runtime/half_precision/test_fp16.py +++ b/tests/unit/runtime/half_precision/test_fp16.py @@ -319,7 +319,7 @@ def test(self, zero_stage, use_cpu_offload): model = SimpleModel(hidden_dim) model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters()) - data_loader = random_dataloader(model=model, total_samples=50, hidden_dim=hidden_dim, device=model.device) + data_loader = random_dataloader(model=model, total_samples=10, hidden_dim=hidden_dim, device=model.device) for n, batch in enumerate(data_loader): loss = model(batch[0], batch[1]) model.backward(loss) @@ -328,11 +328,10 @@ def test(self, zero_stage, use_cpu_offload): @pytest.mark.parametrize("zero_stage", [1, 2, 3]) @pytest.mark.parametrize("use_cpu_offload", [True, False]) -@pytest.mark.parametrize("hidden_dim", [9, 10]) class TestZeroStaticScale(DistributedTest): world_size = 1 - def test(self, zero_stage, use_cpu_offload, hidden_dim): + def test(self, zero_stage, use_cpu_offload, hidden_dim=4): if use_cpu_offload and not deepspeed.ops.__compatible_ops__[CPUAdamBuilder.NAME]: pytest.skip("cpu-adam is not compatible") diff --git a/tests/unit/runtime/pipe/test_pipe.py b/tests/unit/runtime/pipe/test_pipe.py index c4958b721f2c..dae791c8f860 100644 --- a/tests/unit/runtime/pipe/test_pipe.py +++ b/tests/unit/runtime/pipe/test_pipe.py @@ -42,8 +42,8 @@ def test(self, topo_config): skip_on_arch(min_arch=7) config_dict = { - "train_batch_size": 16, - "train_micro_batch_size_per_gpu": 4, + "train_batch_size": 4, + "grandient_accumulation_steps": 1, "steps_per_print": 20, "optimizer": { "type": "Adam", @@ -67,7 +67,7 @@ def test(self, topo_config): } topo = PipeTopo(**topo_config) - steps = 500 # must be >=100 + steps = 100 # must be >=100 # Allocate model for consistent initial weights. init_net = AlexNetPipe() diff --git a/tests/unit/runtime/test_data.py b/tests/unit/runtime/test_data.py index 8f71ca979b4d..7ae0814c823a 100644 --- a/tests/unit/runtime/test_data.py +++ b/tests/unit/runtime/test_data.py @@ -42,6 +42,7 @@ def test(self, train_batch_size, drop_last): model=model, training_data=train_dataset, optimizer=optimizer) + training_dataloader.num_local_io_workers = 0 # We can't do nested mp.pool for n, batch in enumerate(training_dataloader): x = batch[0].to(get_accelerator().current_device_name()) y = batch[1].to(get_accelerator().current_device_name()) diff --git a/tests/unit/runtime/test_ds_initialize.py b/tests/unit/runtime/test_ds_initialize.py index 4ff64dea96ef..636b1c02dbc1 100644 --- a/tests/unit/runtime/test_ds_initialize.py +++ b/tests/unit/runtime/test_ds_initialize.py @@ -117,6 +117,7 @@ def test(self, client_parameters): @pytest.mark.parametrize('grad_accum_dtype', [None, 'fp16', 'bf16', 'fp32']) class TestOptimizerImplementation(DistributedTest): world_size = 1 + reuse_dist_env = True def test(self, optimizer_extension, model_dtype, grad_accum_dtype): if optimizer_extension == 'zero1': @@ -125,9 +126,9 @@ def test(self, optimizer_extension, model_dtype, grad_accum_dtype): zero_stage = 2 else: zero_stage = 0 - amp = True if optimizer_extension == 'amp' else False - fp16 = True if model_dtype == 'fp16' else False - bf16 = True if model_dtype == 'bf16' else False + amp = (optimizer_extension == 'amp') + fp16 = (model_dtype == 'fp16') + bf16 = (model_dtype == 'bf16') # Skip checks if bf16 and not bf16_required_version_check(): pytest.skip( diff --git a/tests/unit/runtime/zero/test_zero.py b/tests/unit/runtime/zero/test_zero.py index 85ed0cffa7c2..2ed5f1f9286d 100644 --- a/tests/unit/runtime/zero/test_zero.py +++ b/tests/unit/runtime/zero/test_zero.py @@ -52,7 +52,7 @@ def dump_state_dict(model): print(f"{name} {param.data}") -@pytest.mark.parametrize('zero_stage', [1, 2, 3]) +@pytest.mark.parametrize("zero_stage", [1, 2, 3]) class TestZeroUnbalancedGradients(DistributedTest): world_size = 1 @@ -73,7 +73,7 @@ def test(self, zero_stage): "fp16": { "enabled": True, "initial_scale_power": 8 - } + }, } hidden_dim = 4 @@ -96,7 +96,7 @@ def test(self, zero_stage=3): "steps_per_print": 1, "zero_optimization": { "stage": zero_stage, - "stage3_param_persistence_threshold": 0 + "stage3_param_persistence_threshold": 0, }, "optimizer": { "type": "Adam", @@ -107,7 +107,7 @@ def test(self, zero_stage=3): "fp16": { "enabled": True, "initial_scale_power": 8 - } + }, } hidden_dim = 4 @@ -137,8 +137,8 @@ def forward(self, x, y): # testing the fix https://github.com/microsoft/DeepSpeed/pull/1227 # also reproduces the https://github.com/microsoft/DeepSpeed/pull/1372 -@pytest.mark.parametrize('zero_stage', [2, 3]) -@pytest.mark.parametrize('freeze_params', [True, False]) +@pytest.mark.parametrize("zero_stage", [2, 3]) +@pytest.mark.parametrize("freeze_params", [True, False]) class TestZeroToFP32(DistributedTest): world_size = 2 @@ -151,7 +151,7 @@ def test_1_param_group(self, tmpdir, zero_stage, freeze_params): "steps_per_print": 1, "zero_optimization": { "stage": zero_stage, - "stage3_param_persistence_threshold": 0 + "stage3_param_persistence_threshold": 0, }, "optimizer": { "type": "Adam", @@ -162,7 +162,7 @@ def test_1_param_group(self, tmpdir, zero_stage, freeze_params): "fp16": { "enabled": True, "initial_scale_power": 8 - } + }, } class MyModel(torch.nn.Module): @@ -227,7 +227,7 @@ def forward(self, x, y): fp32_model = load_state_dict_from_zero_checkpoint(model.module, tmpdir) fp32_state_dict = fp32_model.state_dict() - #dump_state_dict(fp32_model) + # dump_state_dict(fp32_model) if dist.get_rank() == 0: for name in orig_state_dict.keys(): @@ -245,7 +245,7 @@ def test_2_param_groups(self, tmpdir, zero_stage, freeze_params): "zero_allow_untested_optimizer": 1, "zero_optimization": { "stage": zero_stage, - "stage3_param_persistence_threshold": 0 + "stage3_param_persistence_threshold": 0, }, "optimizer": { "type": "Adam", @@ -256,7 +256,7 @@ def test_2_param_groups(self, tmpdir, zero_stage, freeze_params): "fp16": { "enabled": True, "initial_scale_power": 8 - } + }, } class MyModel(torch.nn.Module): @@ -293,10 +293,12 @@ def forward(self, x, y): ] optim = torch.optim.SGD(optim_groups, lr=0.1) - model, _, _, _ = deepspeed.initialize(model=model, - model_parameters=model.parameters(), - optimizer=optim, - config=config_dict) + model, _, _, _ = deepspeed.initialize( + model=model, + model_parameters=model.parameters(), + optimizer=optim, + config=config_dict, + ) model.empty_partition_cache() data_loader = random_dataloader(model=model, total_samples=16, hidden_dim=hidden_dim, device=model.device) @@ -312,7 +314,7 @@ def forward(self, x, y): # make sure all sides saved it dist.barrier() - #dump_state_dict(model) + # dump_state_dict(model) orig_state_dict = {} for name, param in model.module.named_parameters(): @@ -330,7 +332,7 @@ def forward(self, x, y): fp32_model = load_state_dict_from_zero_checkpoint(model.module, tmpdir) fp32_state_dict = fp32_model.state_dict() - #dump_state_dict(fp32_model) + # dump_state_dict(fp32_model) if dist.get_rank() == 0: for name in orig_state_dict.keys(): @@ -349,7 +351,7 @@ def test(self, allgather_bucket_size, zero_stage=2): "steps_per_print": 1, "zero_optimization": { "stage": zero_stage, - "allgather_bucket_size": allgather_bucket_size + "allgather_bucket_size": allgather_bucket_size, }, "optimizer": { "type": "Adam", @@ -360,7 +362,7 @@ def test(self, allgather_bucket_size, zero_stage=2): "fp16": { "enabled": True, "initial_scale_power": 8 - } + }, } hidden_dim = 4 @@ -372,7 +374,7 @@ def test(self, allgather_bucket_size, zero_stage=2): model, _, _, _ = deepspeed.initialize(config=config_dict, model=model, model_parameters=model.parameters()) - assert "allgather_bucket_size must be a multiple of nccl_start_alignment_factor" in str(assertinfo) + assert ("allgather_bucket_size must be a multiple of nccl_start_alignment_factor" in str(assertinfo)) class TestPartitionNcclAlignment(DistributedTest): @@ -395,7 +397,7 @@ def test(self, zero_stage=2): "fp16": { "enabled": True, "initial_scale_power": 8 - } + }, } hidden_dim = 4 @@ -405,7 +407,8 @@ def test(self, zero_stage=2): # get nccl all-gather send buffers alignment factor nccl_start_alignment_factor = model.optimizer.nccl_start_alignment_factor - parallel_partitioned_bit16_groups = model.optimizer.parallel_partitioned_bit16_groups if zero_stage == 2 else model.optimizer.parallel_partitioned_fp16_groups + parallel_partitioned_bit16_groups = (model.optimizer.parallel_partitioned_bit16_groups + if zero_stage == 2 else model.optimizer.parallel_partitioned_fp16_groups) for data_parallel_partitions in parallel_partitioned_bit16_groups: for partition_id, partitioned_data in enumerate(data_parallel_partitions): # verify that data partition start locations are 4-byte aligned @@ -458,9 +461,14 @@ def __init__( self.loss = L1Loss(reduction="none") def forward(self, x: Tensor, y: Tensor, use_module_trace: bool, param_prefetching: bool) -> Dict[str, Tensor]: - _assert_partition_status(self, - {ZeroParamStatus.NOT_AVAILABLE, ZeroParamStatus.INFLIGHT, ZeroParamStatus.AVAILABLE} - if use_module_trace else {ZeroParamStatus.NOT_AVAILABLE}) + _assert_partition_status( + self, + { + ZeroParamStatus.NOT_AVAILABLE, + ZeroParamStatus.INFLIGHT, + ZeroParamStatus.AVAILABLE, + } if use_module_trace else {ZeroParamStatus.NOT_AVAILABLE}, + ) pre_layer_expected_states = { ZeroParamStatus.INFLIGHT if param_prefetching else ZeroParamStatus.NOT_AVAILABLE, @@ -485,9 +493,14 @@ def forward(self, x: Tensor, y: Tensor, use_module_trace: bool, param_prefetchin loss = self.loss(y_hat, y) - _assert_partition_status(self, - {ZeroParamStatus.NOT_AVAILABLE, ZeroParamStatus.INFLIGHT, ZeroParamStatus.AVAILABLE} - if use_module_trace else {ZeroParamStatus.NOT_AVAILABLE}) + _assert_partition_status( + self, + { + ZeroParamStatus.NOT_AVAILABLE, + ZeroParamStatus.INFLIGHT, + ZeroParamStatus.AVAILABLE, + } if use_module_trace else {ZeroParamStatus.NOT_AVAILABLE}, + ) return { "hidden1": hidden1, @@ -512,10 +525,12 @@ class EltwiseMultiplicationTestNetwork_NamedTuple(EltwiseMultiplicationTestNetwo def forward(self, *args, **kwargs) -> EltwiseMultiplicationNamedTuple: outputs_dicts = super().forward(*args, **kwargs) - return EltwiseMultiplicationNamedTuple(hidden1=outputs_dicts['hidden1'], - hidden2=outputs_dicts['hidden2'], - y_hat=outputs_dicts['y_hat'], - loss=outputs_dicts['loss']) + return EltwiseMultiplicationNamedTuple( + hidden1=outputs_dicts["hidden1"], + hidden2=outputs_dicts["hidden2"], + y_hat=outputs_dicts["y_hat"], + loss=outputs_dicts["loss"], + ) @staticmethod def to_dict(outputs: EltwiseMultiplicationNamedTuple) -> Dict[str, Tensor]: @@ -527,18 +542,20 @@ def to_dict(outputs: EltwiseMultiplicationNamedTuple) -> Dict[str, Tensor]: } -EltwiseMultiplication_namedtuple = namedtuple('EltwiseMultiplication_namedtuple', - ['hidden1', 'hidden2', 'y_hat', 'loss']) +EltwiseMultiplication_namedtuple = namedtuple("EltwiseMultiplication_namedtuple", + ["hidden1", "hidden2", "y_hat", "loss"]) class EltwiseMultiplicationTestNetwork_namedtuple(EltwiseMultiplicationTestNetwork_Dict): def forward(self, *args, **kwargs) -> EltwiseMultiplication_namedtuple: outputs_dicts = super().forward(*args, **kwargs) - return EltwiseMultiplication_namedtuple(hidden1=outputs_dicts['hidden1'], - hidden2=outputs_dicts['hidden2'], - y_hat=outputs_dicts['y_hat'], - loss=outputs_dicts['loss']) + return EltwiseMultiplication_namedtuple( + hidden1=outputs_dicts["hidden1"], + hidden2=outputs_dicts["hidden2"], + y_hat=outputs_dicts["y_hat"], + loss=outputs_dicts["loss"], + ) @staticmethod def to_dict(outputs: EltwiseMultiplicationNamedTuple) -> Dict[str, Tensor]: @@ -554,7 +571,12 @@ class EltwiseMultiplicationTestNetwork_Tuple(EltwiseMultiplicationTestNetwork_Di def forward(self, *args, **kwargs) -> Tuple[Tensor, Tensor, Tensor, Tensor]: outputs_dicts = super().forward(*args, **kwargs) - return (outputs_dicts['hidden1'], outputs_dicts['hidden2'], outputs_dicts['y_hat'], outputs_dicts['loss']) + return ( + outputs_dicts["hidden1"], + outputs_dicts["hidden2"], + outputs_dicts["y_hat"], + outputs_dicts["loss"], + ) @staticmethod def to_dict(outputs: Tuple[Tensor, Tensor, Tensor, Tensor]) -> Dict[str, Tensor]: @@ -570,7 +592,12 @@ class EltwiseMultiplicationTestNetwork_List(EltwiseMultiplicationTestNetwork_Dic def forward(self, *args, **kwargs) -> List[Tensor]: outputs_dicts = super().forward(*args, **kwargs) - return [outputs_dicts['hidden1'], outputs_dicts['hidden2'], outputs_dicts['y_hat'], outputs_dicts['loss']] + return [ + outputs_dicts["hidden1"], + outputs_dicts["hidden2"], + outputs_dicts["y_hat"], + outputs_dicts["loss"], + ] @staticmethod def to_dict(outputs: List[Tensor]) -> Dict[str, Tensor]: @@ -582,31 +609,55 @@ def to_dict(outputs: List[Tensor]) -> Dict[str, Tensor]: } -@pytest.mark.parametrize("param_persistence_threshold", [0, 10]) -@pytest.mark.parametrize("fp16_enabled", [True, False]) -@pytest.mark.parametrize("contiguous_gradients", [True, False]) -@pytest.mark.parametrize("offload_optimizer", [True, False]) -@pytest.mark.parametrize("zero_grad", [True, False]) -@pytest.mark.parametrize("prefetching", [True, False]) -@pytest.mark.parametrize("reduce_scatter", [True, False]) -@pytest.mark.parametrize("model_class", [ - EltwiseMultiplicationTestNetwork_Dict, EltwiseMultiplicationTestNetwork_NamedTuple, - EltwiseMultiplicationTestNetwork_namedtuple, EltwiseMultiplicationTestNetwork_Tuple, - EltwiseMultiplicationTestNetwork_List -]) class TestZero3ParamPartitioningBase(DistributedTest): world_size = 2 - def test( + @pytest.mark.parametrize("param_persistence_threshold", [0, 10]) + def test_param_persistence_threshold(self, param_persistence_threshold): + self._test(param_persistence_threshold=param_persistence_threshold) + + @pytest.mark.parametrize("fp16_enabled", [True, False]) + def test_fp16_enabled(self, fp16_enabled): + self._test(fp16_enabled=fp16_enabled) + + @pytest.mark.parametrize("contiguous_gradients", [True, False]) + def test_contiguous_gradients(self, contiguous_gradients): + self._test(contiguous_gradients=contiguous_gradients) + + @pytest.mark.parametrize("offload_optimizer", [True, False]) + def test_offload_optimizer(self, offload_optimizer): + self._test(offload_optimizer=offload_optimizer) + + @pytest.mark.parametrize("zero_grad", [True, False]) + def test_zero_grad(self, zero_grad): + self._test(zero_grad=zero_grad) + + @pytest.mark.parametrize("prefetching", [True, False]) + def test_prefetching(self, prefetching): + self._test(prefetching=prefetching) + + @pytest.mark.parametrize("reduce_scatter", [True, False]) + def test_reduce_scatter(self, reduce_scatter): + self._test(reduce_scatter=reduce_scatter) + + @pytest.mark.parametrize("model_class", [ + EltwiseMultiplicationTestNetwork_Dict, EltwiseMultiplicationTestNetwork_NamedTuple, + EltwiseMultiplicationTestNetwork_namedtuple, EltwiseMultiplicationTestNetwork_Tuple, + EltwiseMultiplicationTestNetwork_List + ]) + def test_model_class(self, model_class): + self._test(model_class=model_class) + + def _test( self, - param_persistence_threshold: int, - fp16_enabled: bool, - contiguous_gradients: bool, - offload_optimizer: bool, - zero_grad: bool, - prefetching: bool, - reduce_scatter: bool, - model_class: EltwiseMultiplicationTestNetwork_Dict, + param_persistence_threshold: int = 0, + fp16_enabled: bool = False, + contiguous_gradients: bool = False, + offload_optimizer: bool = False, + zero_grad: bool = False, + prefetching: bool = False, + reduce_scatter: bool = False, + model_class: EltwiseMultiplicationTestNetwork_Dict = EltwiseMultiplicationTestNetwork_Dict, ) -> None: if offload_optimizer and not contiguous_gradients: return @@ -624,18 +675,18 @@ def test( "stage3_param_persistence_threshold": param_persistence_threshold, "contiguous_gradients": contiguous_gradients, "stage3_prefetch_bucket_size": prefetch_bucket_size if prefetching else 0, - "reduce_scatter": reduce_scatter + "reduce_scatter": reduce_scatter, }, "optimizer": { "type": "Adam", "params": { - "lr": 1. + "lr": 1.0 } }, "fp16": { "enabled": fp16_enabled, - "loss_scale": 1., - } + "loss_scale": 1.0, + }, } if offload_optimizer: @@ -649,9 +700,11 @@ def test( weight.ds_tensor.data = torch.full_like(weight.ds_tensor.data, (i + 1) * (1 + dist.get_rank())) def create_tensor(vals, dtype: torch.dtype = None) -> Tensor: - return torch.as_tensor(vals, - dtype=dtype or (torch.float16 if fp16_enabled else torch.float32), - device=ds_engine.device) + return torch.as_tensor( + vals, + dtype=dtype or (torch.float16 if fp16_enabled else torch.float32), + device=ds_engine.device, + ) expected_hidden1 = create_tensor([ [1, 1, 1, 1, 1], @@ -672,8 +725,16 @@ def create_tensor(vals, dtype: torch.dtype = None) -> Tensor: for train_iter in range(3): activations = ds_engine( - x=torch.ones((m, n), dtype=torch.float16 if fp16_enabled else torch.float32, device=ds_engine.device), - y=torch.ones((m, n), dtype=torch.float16 if fp16_enabled else torch.float32, device=ds_engine.device), + x=torch.ones( + (m, n), + dtype=torch.float16 if fp16_enabled else torch.float32, + device=ds_engine.device, + ), + y=torch.ones( + (m, n), + dtype=torch.float16 if fp16_enabled else torch.float32, + device=ds_engine.device, + ), use_module_trace=train_iter > 0, param_prefetching=prefetching and train_iter > 0, ) @@ -708,21 +769,33 @@ def create_tensor(vals, dtype: torch.dtype = None) -> Tensor: grad_multiplier = 1 if zero_grad else (train_iter + 1) if dist.get_rank() == 0: - assert torch.allclose(dloss_wrt_layer3.to(get_accelerator().device_name()), - grad_multiplier * create_tensor([2] * 8, torch.float)) - assert torch.allclose(dloss_wrt_layer2.to(get_accelerator().device_name()), - grad_multiplier * create_tensor([3 * 1] * 8, torch.float)) - assert torch.allclose(dloss_wrt_layer1.to(get_accelerator().device_name()), - grad_multiplier * create_tensor([3 * 2 * 1] * 8, torch.float)) + assert torch.allclose( + dloss_wrt_layer3.to(get_accelerator().device_name()), + grad_multiplier * create_tensor([2] * 8, torch.float), + ) + assert torch.allclose( + dloss_wrt_layer2.to(get_accelerator().device_name()), + grad_multiplier * create_tensor([3 * 1] * 8, torch.float), + ) + assert torch.allclose( + dloss_wrt_layer1.to(get_accelerator().device_name()), + grad_multiplier * create_tensor([3 * 2 * 1] * 8, torch.float), + ) elif dist.get_rank() == 1: # parameters dont split evenly across ranks so rank 1 has a zero-padded # partition - assert torch.allclose(dloss_wrt_layer3.to(get_accelerator().device_name()), - grad_multiplier * create_tensor(([8] * 7) + [0], torch.float)) - assert torch.allclose(dloss_wrt_layer2.to(get_accelerator().device_name()), - grad_multiplier * create_tensor(([6 * 2] * 7) + [0], torch.float)) - assert torch.allclose(dloss_wrt_layer1.to(get_accelerator().device_name()), - grad_multiplier * create_tensor(([6 * 4 * 1] * 7) + [0], torch.float)) + assert torch.allclose( + dloss_wrt_layer3.to(get_accelerator().device_name()), + grad_multiplier * create_tensor(([8] * 7) + [0], torch.float), + ) + assert torch.allclose( + dloss_wrt_layer2.to(get_accelerator().device_name()), + grad_multiplier * create_tensor(([6 * 2] * 7) + [0], torch.float), + ) + assert torch.allclose( + dloss_wrt_layer1.to(get_accelerator().device_name()), + grad_multiplier * create_tensor(([6 * 4 * 1] * 7) + [0], torch.float), + ) else: raise RuntimeError("test has world size of two") @@ -776,13 +849,13 @@ def forward(self, x: Tensor) -> Tensor: "optimizer": { "type": "Adam", "params": { - "lr": 1. + "lr": 1.0 } }, "fp16": { "enabled": True, - "loss_scale": 1., - } + "loss_scale": 1.0, + }, } with deepspeed.zero.Init(mem_efficient_linear=False, enabled=init_context_manager): model = LargeParamModel() @@ -794,26 +867,27 @@ def forward(self, x: Tensor) -> Tensor: partition_sz = math.ceil(param_sz / self.world_size) for rank_idx, start_idx in enumerate(range(0, param_sz, partition_sz)): activation_from_partition = activation[start_idx:start_idx + partition_sz] - assert torch.allclose(activation_from_partition, torch.full_like(activation_from_partition, rank_idx)) + assert torch.allclose( + activation_from_partition, + torch.full_like(activation_from_partition, rank_idx), + ) ds_engine.backward(activation.sum()) ds_engine.allreduce_gradients() avgd_gradients = ds_engine.optimizer.averaged_gradients assert set(avgd_gradients.keys()) == {0}, "should only have one parameter group" - weight_gradient, = avgd_gradients[0] + (weight_gradient, ) = avgd_gradients[0] expected_weight_gradient = (train_iter + 1) * torch.full_like(weight_gradient, 1) assert torch.allclose(weight_gradient, expected_weight_gradient) -@pytest.mark.parametrize("param_sz", [100, 1_000, 10_000]) -@pytest.mark.parametrize("n_layers", [100, 1_000]) @pytest.mark.parametrize("init_context_manager", [True, False]) class TestZero3ParamPartitioningManyParams(DistributedTest): - world_size = 4 + world_size = 2 - def test(self, param_sz: int, n_layers: int, init_context_manager: bool) -> None: + def test(self, init_context_manager: bool, param_sz: int = 100, n_layers: int = 100) -> None: class ManyParamModel(Module): @@ -854,13 +928,13 @@ def forward(self, x: Tensor) -> Tensor: "optimizer": { "type": "Adam", "params": { - "lr": 1. + "lr": 1.0 } }, "fp16": { "enabled": True, - "loss_scale": 1., - } + "loss_scale": 1.0, + }, } with deepspeed.zero.Init(config=ds_cfg, mem_efficient_linear=False, enabled=init_context_manager): @@ -923,20 +997,23 @@ def __init_weights(self, module): "optimizer": { "type": "Adam", "params": { - "lr": 1. + "lr": 1.0 } }, "fp16": { "enabled": True, - "loss_scale": 1., - } + "loss_scale": 1.0, + }, } with deepspeed.zero.Init(config=ds_cfg, mem_efficient_linear=False, enabled=True): model = ModelWhereParentInitializesChildWeights() assert model.linear.weight.ds_tensor.numel() == math.ceil(12 / self.world_size) - assert torch.allclose(model.linear.weight.ds_tensor, torch.full_like(model.linear.weight.ds_tensor, 1)) + assert torch.allclose( + model.linear.weight.ds_tensor, + torch.full_like(model.linear.weight.ds_tensor, 1), + ) @pytest.mark.skip("not working") @@ -946,17 +1023,29 @@ def __init_weights(self, module): @pytest.mark.parametrize("zero_grad", [True, False]) @pytest.mark.parametrize("prefetching", [True, False]) @pytest.mark.parametrize("reduce_scatter", [True, False]) -@pytest.mark.parametrize("model_class", [ - EltwiseMultiplicationTestNetwork_Dict, EltwiseMultiplicationTestNetwork_NamedTuple, - EltwiseMultiplicationTestNetwork_namedtuple, EltwiseMultiplicationTestNetwork_Tuple, - EltwiseMultiplicationTestNetwork_List -]) +@pytest.mark.parametrize( + "model_class", + [ + EltwiseMultiplicationTestNetwork_Dict, + EltwiseMultiplicationTestNetwork_NamedTuple, + EltwiseMultiplicationTestNetwork_namedtuple, + EltwiseMultiplicationTestNetwork_Tuple, + EltwiseMultiplicationTestNetwork_List, + ], +) class TestZero3ParamPartitioningBaseBF16(DistributedTest): world_size = 2 - def test(self, param_persistence_threshold: int, contiguous_gradients: bool, offload_optimizer: bool, - zero_grad: bool, prefetching: bool, reduce_scatter: bool, - model_class: EltwiseMultiplicationTestNetwork_Dict) -> None: + def test( + self, + param_persistence_threshold: int, + contiguous_gradients: bool, + offload_optimizer: bool, + zero_grad: bool, + prefetching: bool, + reduce_scatter: bool, + model_class: EltwiseMultiplicationTestNetwork_Dict, + ) -> None: if offload_optimizer and not contiguous_gradients: return @@ -973,18 +1062,18 @@ def test(self, param_persistence_threshold: int, contiguous_gradients: bool, off "stage3_param_persistence_threshold": param_persistence_threshold, "contiguous_gradients": contiguous_gradients, "stage3_prefetch_bucket_size": prefetch_bucket_size if prefetching else 0, - "reduce_scatter": reduce_scatter + "reduce_scatter": reduce_scatter, }, "optimizer": { "type": "Adam", "params": { - "lr": 1. + "lr": 1.0 } }, "bf16": { "enabled": True, - "loss_scale": 1., - } + "loss_scale": 1.0, + }, } if offload_optimizer: @@ -1055,21 +1144,33 @@ def create_tensor(vals): grad_multiplier = 1 if zero_grad else (train_iter + 1) if dist.get_rank() == 0: - assert torch.allclose(dloss_wrt_layer3.to(get_accelerator().device_name()), - grad_multiplier * create_tensor([2] * 8).to(expected_grad_dtype)) - assert torch.allclose(dloss_wrt_layer2.to(get_accelerator().device_name()), - grad_multiplier * create_tensor([3 * 1] * 8).to(expected_grad_dtype)) - assert torch.allclose(dloss_wrt_layer1.to(get_accelerator().device_name()), - grad_multiplier * create_tensor([3 * 2 * 1] * 8).to(expected_grad_dtype)) + assert torch.allclose( + dloss_wrt_layer3.to(get_accelerator().device_name()), + grad_multiplier * create_tensor([2] * 8).to(expected_grad_dtype), + ) + assert torch.allclose( + dloss_wrt_layer2.to(get_accelerator().device_name()), + grad_multiplier * create_tensor([3 * 1] * 8).to(expected_grad_dtype), + ) + assert torch.allclose( + dloss_wrt_layer1.to(get_accelerator().device_name()), + grad_multiplier * create_tensor([3 * 2 * 1] * 8).to(expected_grad_dtype), + ) elif dist.get_rank() == 1: # parameters dont split evenly across ranks so rank 1 has a zero-padded # partition - assert torch.allclose(dloss_wrt_layer3.to(get_accelerator().device_name()), - grad_multiplier * create_tensor(([8] * 7) + [0]).to(expected_grad_dtype)) - assert torch.allclose(dloss_wrt_layer2.to(get_accelerator().device_name()), - grad_multiplier * create_tensor(([6 * 2] * 7) + [0]).to(expected_grad_dtype)) - assert torch.allclose(dloss_wrt_layer1.to(get_accelerator().device_name()), - grad_multiplier * create_tensor(([6 * 4 * 1] * 7) + [0]).to(expected_grad_dtype)) + assert torch.allclose( + dloss_wrt_layer3.to(get_accelerator().device_name()), + grad_multiplier * create_tensor(([8] * 7) + [0]).to(expected_grad_dtype), + ) + assert torch.allclose( + dloss_wrt_layer2.to(get_accelerator().device_name()), + grad_multiplier * create_tensor(([6 * 2] * 7) + [0]).to(expected_grad_dtype), + ) + assert torch.allclose( + dloss_wrt_layer1.to(get_accelerator().device_name()), + grad_multiplier * create_tensor(([6 * 4 * 1] * 7) + [0]).to(expected_grad_dtype), + ) else: raise RuntimeError("test has world size of two") @@ -1104,7 +1205,7 @@ def test(self): "offload_optimizer": { "device": "cpu" } - } + }, } hidden_dim = 10 @@ -1118,7 +1219,7 @@ def test(self): model.step() -@pytest.mark.parametrize('return_type', [tuple, list, dict]) +@pytest.mark.parametrize("return_type", [tuple, list, dict]) class TestZero3DictFwd(DistributedTest): world_size = 1 @@ -1137,7 +1238,7 @@ def test(self, return_type): }, "zero_optimization": { "stage": 3 - } + }, } hidden_dim = 10 @@ -1152,7 +1253,7 @@ def forward(self, x, y): x = self.l1(x) loss = self.cel(x, y) if return_type == dict: - val = {'a': x, 'loss': loss, 'b': 1, 'c': None} + val = {"a": x, "loss": loss, "b": 1, "c": None} elif return_type == list: val = [x, loss] elif return_type == tuple: @@ -1170,14 +1271,14 @@ def forward(self, x, y): for n, batch in enumerate(data_loader): loss = model(batch[0], batch[1]) if return_type == dict: - loss = loss['loss'] + loss = loss["loss"] else: loss = loss[1] model.backward(loss) model.step() -@pytest.mark.parametrize('zero_stage', [1, 2, 3]) +@pytest.mark.parametrize("zero_stage", [1, 2, 3]) class TestZeroAdamOptimizerStepCount(DistributedTest): world_size = 1 @@ -1201,7 +1302,7 @@ def test(self, zero_stage): "fp16": { "enabled": True, "initial_scale_power": 8 - } + }, } hidden_dim = 4 @@ -1221,13 +1322,13 @@ def test(self, zero_stage): for sub_group_id, _ in enumerate(optimizer.fp16_groups): fp32_param = optimizer.fp32_partitioned_groups_flat[sub_group_id] state = optimizer.optimizer.state[fp32_param] - step_counts.append(state['step']) + step_counts.append(state["step"]) assert all(step == step_counts[0] for step in step_counts) elif zero_stage == 1 or zero_stage == 2: for param_group in optimizer.optimizer.param_groups: - for param in param_group['params']: + for param in param_group["params"]: state = optimizer.optimizer.state[param] - step_counts.append(state['step']) + step_counts.append(state["step"]) assert all(step == step_counts[0] for step in step_counts) @@ -1249,7 +1350,7 @@ def test(self): }, "zero_optimization": { "stage": 3 - } + }, } hidden_dim = 10 @@ -1287,7 +1388,7 @@ def forward(self, x, y): model.step() -@pytest.mark.parametrize('force_ds_optim', [True, False]) +@pytest.mark.parametrize("force_ds_optim", [True, False]) class TestZeroOffloadOptim(DistributedTest): world_size = 1 @@ -1320,7 +1421,7 @@ def test(self, force_ds_optim): model, _, _, _ = deepspeed.initialize(model=model, optimizer=optimizer, config=config_dict) -@pytest.mark.parametrize('training', [True, False]) +@pytest.mark.parametrize("training", [True, False]) class TestZeroPartitionCache(DistributedTest): world_size = 1 @@ -1334,8 +1435,8 @@ def test_training_partition_cache(self, training): }, "zero_optimization": { "stage": 3, - "stage3_param_persistence_threshold": hidden_dim - } + "stage3_param_persistence_threshold": hidden_dim, + }, } if training: config_dict["optimizer"] = {"type": "Adam"} @@ -1346,11 +1447,13 @@ def test_training_partition_cache(self, training): model, _, _, _ = deepspeed.initialize(model=model, config=config_dict) dtype = torch.half - data_loader = random_dataloader(model=model, - total_samples=6, - hidden_dim=hidden_dim, - device=model.device, - dtype=dtype) + data_loader = random_dataloader( + model=model, + total_samples=6, + hidden_dim=hidden_dim, + device=model.device, + dtype=dtype, + ) for _, batch in enumerate(data_loader): loss = model(batch[0], batch[1]) diff --git a/tests/unit/runtime/zero/test_zero_tensor_fragment.py b/tests/unit/runtime/zero/test_zero_tensor_fragment.py index 459d41f98eea..475502561418 100644 --- a/tests/unit/runtime/zero/test_zero_tensor_fragment.py +++ b/tests/unit/runtime/zero/test_zero_tensor_fragment.py @@ -68,11 +68,15 @@ def run_fragmented_model(model, config_dict, hidden_dim, dtype): validate_full_tensors(model) model.step() + # Needed in ZeRO 3. Not doing so can give memory leak + model.destroy() + @pytest.mark.parametrize('frozen_weights', [True, False]) class TestTensorFragment(DistributedTest): # Need multiple gpus to test possible hanging world_size = 2 + reuse_dist_env = True @pytest.mark.parametrize('zero_stage', [1, 2, 3]) @pytest.mark.parametrize('offload_device', [OffloadDeviceEnum.none, OffloadDeviceEnum.cpu, OffloadDeviceEnum.nvme])