diff --git a/.github/workflows/cicd-main.yml b/.github/workflows/cicd-main.yml index a4b2baa59550..a8b0fa661676 100644 --- a/.github/workflows/cicd-main.yml +++ b/.github/workflows/cicd-main.yml @@ -78,7 +78,7 @@ jobs: run: | docker run --rm --device=/dev/nvidia0 --gpus all --shm-size=8g --env TRANSFORMERS_OFFLINE=0 --env HYDRA_FULL_ERROR=1 --env PYTHONUNBUFFERED=1 nemoci.azurecr.io/nemo_container:${{ github.run_id }} bash -c '\ # PyTorch Lightning version - python -c "import pytorch_lightning; print(pytorch_lightning.__version__)" + python -c "import lightning.pytorch; print(lightning.pytorch.__version__)" # PyTorch Lightning DDP Checks CUDA_VISIBLE_DEVICES="0,1" python "tests/core_ptl/check_for_ranks.py" @@ -3921,6 +3921,46 @@ jobs: --index-mapping-dir=/tmp/llm_tests/llama_index_mappings \ --cp 1 --tp 2 --sp 1 + L2_NeMo_2_llama3_fault_tolerance_plugin: + needs: [cicd-test-container-setup] + uses: ./.github/workflows/_test_template.yml + if: contains(fromJSON(needs.cicd-test-container-setup.outputs.test_to_run), 'L2_NeMo_2_llama3_fault_tolerance_plugin') || needs.cicd-test-container-setup.outputs.all == 'true' + with: + RUNNER: self-hosted-azure + SCRIPT: | + + mkdir -p /tmp/llm_tests/llama_pretrain_results \ + export FAULT_TOL_CFG_PATH="/tmp/llm_tests/llama_pretrain_results/sample_job_ft_cfg.yml"; \ + export FAULT_TOL_FINISHED_FLAG_FILE="/tmp/llm_tests/llama_pretrain_results/sample_job_finished_flag"; \ + python tests/collections/llm/test_fault_nvrx.py \ + --devices=2 \ + --crash-step=16 \ + --experiment-dir=/tmp/llm_tests/llama_pretrain_results \ + --data-path=/home/TestData/nlp/megatron_llama/data/rp2_sample_sentencepiece_preproc_text_document \ + --tokenizer-path=/home/TestData/nlp/megatron_llama/tokenizer.model \ + --index-mapping-dir=/tmp/llm_tests/llama_index_mappings \ + 2>&1 | tee /tmp/llm_tests/llama_pretrain_results/run.log \ + + L2_NeMo_2_llama3_straggler_detection: + needs: [cicd-test-container-setup] + uses: ./.github/workflows/_test_template.yml + if: contains(fromJSON(needs.cicd-test-container-setup.outputs.test_to_run), 'L2_NeMo_2_llama3_straggler_detection') || needs.cicd-test-container-setup.outputs.all == 'true' + with: + RUNNER: self-hosted-azure + SCRIPT: | + + mkdir -p /tmp/llm_tests/llama_pretrain_results \ + export FAULT_TOL_CFG_PATH="/tmp/llm_tests/llama_pretrain_results/sample_job_ft_cfg.yml"; \ + export FAULT_TOL_FINISHED_FLAG_FILE="/tmp/llm_tests/llama_pretrain_results/sample_job_finished_flag"; \ + python tests/collections/llm/test_fault_nvrx.py \ + --devices=2 \ + --check-report=True \ + --experiment-dir=/tmp/llm_tests/llama_pretrain_results \ + --data-path=/home/TestData/nlp/megatron_llama/data/rp2_sample_sentencepiece_preproc_text_document \ + --tokenizer-path=/home/TestData/nlp/megatron_llama/tokenizer.model \ + --index-mapping-dir=/tmp/llm_tests/llama_index_mappings \ + 2>&1 | tee /tmp/llm_tests/llama_pretrain_results/run.log \ + L2_NeMo_2_GPT_DDP_Param_Parity_check: needs: [cicd-test-container-setup] uses: ./.github/workflows/_test_template.yml @@ -4587,6 +4627,8 @@ jobs: - L2_NeMo_2_GPT_DDP_Param_Parity_check - L2_NeMo_2_HF_MODEL_IMPORT - L2_NeMo_2_llama3_pretraining_recipe + - L2_NeMo_2_llama3_fault_tolerance_plugin + - L2_NeMo_2_llama3_straggler_detection - L2_HF_Transformer_SFT_TE_Acceleration - L2_NeMo_2_SSM_Pretraining - L2_NeMo_2_SSM_Finetuning @@ -4760,4 +4802,4 @@ jobs: - name: "Pipeline not successful, set exit code to 1" if: ${{ always() && steps.pipeline-conclusion.outputs.SUCCESS == 'false' }} - run: exit 1 \ No newline at end of file + run: exit 1 diff --git a/.github/workflows/secrets-detector.yml b/.github/workflows/secrets-detector.yml index 825ae7a653fc..f8d82ec49a8c 100644 --- a/.github/workflows/secrets-detector.yml +++ b/.github/workflows/secrets-detector.yml @@ -38,7 +38,7 @@ jobs: - name: Run on change-set run: | - git diff --name-only --diff-filter=d --merge-base origin/main -z | xargs -0 detect-secrets-hook --baseline .secrets.baseline + git diff --name-only --diff-filter=d --merge-base origin/main -z | xargs -0 detect-secrets-hook --disable-plugin HexHighEntropyString --baseline .secrets.baseline - uses: EndBug/add-and-commit@v9 # Commit changes. Nothing is committed if no changes. diff --git a/Dockerfile.ci b/Dockerfile.ci index e1b78547325a..149e701a376f 100644 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -84,6 +84,9 @@ git checkout ${MCORE_TAG} && \ popd export PYTHONPATH="${PYTHONPATH}:/workspace/Megatron-LM" +# Install nvidia-resiliency-ext +pip install --no-cache-dir "git+https://github.com/NVIDIA/nvidia-resiliency-ext.git@97aad77609d2e25ed38ac5c99f0c13f93c48464e" + EOF # Copy over NeMo code diff --git a/nemo/lightning/run/plugins.py b/nemo/lightning/run/plugins.py index 645665723706..7118d6de0831 100644 --- a/nemo/lightning/run/plugins.py +++ b/nemo/lightning/run/plugins.py @@ -91,13 +91,13 @@ class FaultTolerancePlugin(run.Plugin): This plugin enables workload hang detection, automatic calculation of timeouts used for hang detection, detection of rank(s) terminated due to an error and workload respawning in case of a failure. Note: FaultTolerancePlugin does not work with the NsysPlugin. Args: - num_in_process_restarts (int): Max number of restarts on failure, within the same job. Default is 3. + num_in_job_restarts (int): Max number of restarts on failure, within the same job. Default is 3. num_job_retries_on_failure (int): Max number of new job restarts on failure. Default is 2. initial_rank_heartbeat_timeout (int): Timeouts are time intervals used by a rank monitor to detect that a rank is not alive. This is the max timeout for the initial heartbeat. Default is 1800. rank_heartbeat_timeout (int): This is the timeout for subsequent hearbeats after the initial heartbeat. Default is 300. """ - num_in_process_restarts: int = 3 + num_in_job_restarts: int = 3 num_job_retries_on_failure: int = 2 initial_rank_heartbeat_timeout: int = 1800 rank_heartbeat_timeout: int = 300 @@ -107,7 +107,7 @@ def setup(self, task: run.Partial | run.Script, executor: run.Executor): assert HAVE_RES, "nvidia-resiliency-ext.ptl_resiliency is required to use the FaultTolerancePlugin." executor.launcher = run.FaultTolerance( - max_restarts=self.num_in_process_restarts, + max_restarts=self.num_in_job_restarts, initial_rank_heartbeat_timeout=self.initial_rank_heartbeat_timeout, rank_heartbeat_timeout=self.rank_heartbeat_timeout, ) diff --git a/tests/collections/llm/common.py b/tests/collections/llm/common.py index c17243936bd1..8e93c9c84c9e 100644 --- a/tests/collections/llm/common.py +++ b/tests/collections/llm/common.py @@ -15,6 +15,7 @@ import os import lightning.pytorch as pl +import nemo_run as run import torch from nemo import lightning as nl @@ -27,8 +28,9 @@ def train_data( data_path: str, tokenizer_path: str, index_mapping_dir: str, seq_length: int ) -> llm.PreTrainingDataModule: """Single shard dataset tokenized by SentencePiece""" - tokenizer = SentencePieceTokenizer(model_path=tokenizer_path) - return llm.PreTrainingDataModule( + tokenizer = run.Config(SentencePieceTokenizer, model_path=tokenizer_path) + return run.Config( + llm.PreTrainingDataModule, paths=data_path, tokenizer=tokenizer, seq_length=seq_length, @@ -41,7 +43,8 @@ def train_data( def small_llama_cfg(seq_length: int) -> llm.GPTConfig: """Small 145m model""" - return llm.Llama3Config8B( + return run.Config( + llm.Llama3Config8B, rotary_base=500_000, seq_length=seq_length, num_layers=12, diff --git a/tests/collections/llm/llama3_pretraining.py b/tests/collections/llm/llama3_pretraining.py index 24eeca8f01c8..aefb937fbf1d 100644 --- a/tests/collections/llm/llama3_pretraining.py +++ b/tests/collections/llm/llama3_pretraining.py @@ -80,7 +80,7 @@ def main(): dir=args.experiment_dir, name=exp_name, num_gpus_per_node=args.devices ) - pretrain_recipe.model = llm.LlamaModel(small_llama_cfg(args.seq_length)) + pretrain_recipe.model = run.Config(llm.LlamaModel, small_llama_cfg(args.seq_length)) if args.data_path and args.tokenizer_path: pretrain_recipe.data = train_data( diff --git a/tests/collections/llm/test_fault_nvrx.py b/tests/collections/llm/test_fault_nvrx.py new file mode 100644 index 000000000000..5ad7a6f4a230 --- /dev/null +++ b/tests/collections/llm/test_fault_nvrx.py @@ -0,0 +1,134 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test fault tolerance with LLaMA3 recipe and a smaller model. +""" + +import argparse +import os + +import nemo_run as run + +from lightning.pytorch.callbacks import Callback + +from nemo.collections import llm +from nemo.collections.llm.recipes.callbacks.common import straggler_det_callback +from nemo.lightning.run.plugins import FaultTolerancePlugin +from nemo.utils.exp_manager import TimingCallback +from tests.collections.llm.common import small_llama_cfg, train_data + + +class CrashCallback(Callback): + def __init__(self, crash_step=16): + self.crash_step = crash_step + self.current_step = 0 + print(f"Setup to simulate a crash if step == {self.crash_step}") + + def on_train_batch_end(self, trainer, pl_module, outputs, batch, batch_idx): + self.current_step = self.current_step + 1 + if self.crash_step and self.current_step == self.crash_step: + raise Exception(f"Simulating a crash at step {self.crash_step}!") + + +def get_args(): + parser = argparse.ArgumentParser(prog="", description="") + parser.add_argument('--devices', type=int, required=True, help="Number of devices to use for training") + parser.add_argument( + '--crash-step', + type=int, + help="Step when a crash should be simulated", + ) + parser.add_argument( + '--check-report', type=bool, default=False, help="Check if StragglerDetection reports performance scores" + ) + parser.add_argument( + '--experiment-dir', type=str, required=True, help="directory to write results and checkpoints to" + ) + parser.add_argument( + '--data-path', type=str, default=None, help="Path to data file. If not specified, uses mock data." + ) + parser.add_argument( + '--tokenizer-path', + type=str, + default=None, + help="Path to a sentencepiece tokenizer model file. If not specified, uses mock data.", + ) + parser.add_argument('--index-mapping-dir', type=str, help="directory to write index mappings to") + + return parser.parse_args() + + +def main(): + args = get_args() + + exp_name = "L2_llama3_small_pretrain_fault_tolerance_test" + pretrain_recipe = llm.llama3_8b.pretrain_recipe( + dir=args.experiment_dir, name=exp_name, num_gpus_per_node=args.devices + ) + + pretrain_recipe.model = run.Config(llm.LlamaModel, small_llama_cfg(1024)) + + if args.data_path and args.tokenizer_path: + pretrain_recipe.data = train_data( + data_path=args.data_path, + tokenizer_path=args.tokenizer_path, + index_mapping_dir=args.index_mapping_dir, + seq_length=1024, + ) + + # Recipe Overrides + pretrain_recipe.trainer.max_steps = 20 + pretrain_recipe.trainer.log_every_n_steps = 1 + # Enable ckpt save so that after the simulated crash, training can resume from ckpt + pretrain_recipe.log.ckpt.every_n_train_steps = 10 + pretrain_recipe.log.ckpt.train_time_interval = None + # Disable async ckpt because the simulated crash happens during ckpt save + # So only an unfinished ckpt would be available for resume which can cause errors + pretrain_recipe.trainer.strategy.ckpt_async_save = False + pretrain_recipe.trainer.val_check_interval = 30 + pretrain_recipe.trainer.limit_val_batches = 2 + + executor: run.SlurmExecutor = run.LocalExecutor(ntasks_per_node=args.devices, launcher="ft") + # Add the fault tolerance plugin which enables restart after a crash + run_plugins: list[run.Plugin] = [FaultTolerancePlugin(num_in_job_restarts=1, num_job_retries_on_failure=0)] + pretrain_recipe.trainer.callbacks = [ + run.Config(TimingCallback), + straggler_det_callback(straggler_report_time_interval=0.5), + ] + + if args.crash_step: + pretrain_recipe.trainer.callbacks.append(run.Config(CrashCallback, crash_step=args.crash_step)) + + run.run(pretrain_recipe, plugins=run_plugins, executor=executor) + + # Assumes that NeMo logs are written into "run.log" + # When a crash a simulated, error shows up on the terminal but it is not written to a file + # So the test appends run output to run.log in the experiment-dir + log_content = None + with open(os.path.join(args.experiment_dir, "run.log")) as f: + log_content = f.read() + + if args.check_report: + assert "GPU relative performance" in log_content + assert "GPU individual performance" in log_content + assert "Straggler report processing time" in log_content + if args.crash_step: + assert f"Exception: Simulating a crash at step {args.crash_step}!" in log_content + assert "Restored all states from the checkpoint" in log_content + assert "`Trainer.fit` stopped: `max_steps=20` reached" in log_content + + +if __name__ == '__main__': + main()