diff --git a/docs/source/_toctree.yml b/docs/source/_toctree.yml index 08cf5cbc2de..74a0d95f189 100644 --- a/docs/source/_toctree.yml +++ b/docs/source/_toctree.yml @@ -25,6 +25,8 @@ title: How perform inference on large models with small resources - local: usage_guides/gradient_accumulation title: Performing gradient accumulation + - local: usage_guides/local_sgd + title: Accelerating training with local SGD - local: usage_guides/checkpoint title: Saving and loading training states - local: usage_guides/tracking @@ -79,4 +81,4 @@ title: Utility functions and classes - local: package_reference/megatron_lm title: Megatron-LM Utilities - title: "Reference" \ No newline at end of file + title: "Reference" diff --git a/docs/source/usage_guides/local_sgd.mdx b/docs/source/usage_guides/local_sgd.mdx new file mode 100644 index 00000000000..e5e737b3d90 --- /dev/null +++ b/docs/source/usage_guides/local_sgd.mdx @@ -0,0 +1,105 @@ + + +# Using Local SGD with 🤗 Accelerate + +Local SGD is a technique for distributed training where gradients are not synchronized every step. Thus, each process updates its own version of the model weights and after a given number of steps these weights are synchronized by averaging across all processes. This improves communication efficiency and can lead to substantial training speed up especially when a computer lacks a faster interconnect such as NVLink. +Unlike gradient accumulation (where improving communication efficiency requires increasing the effective batch size), Local SGD does not require changing a batch size or a learning rate / schedule. However, if necessary, Local SGD can be combined with gradient accumulation as well. + +In this tutorial you will see how to quickly setup Local SGD 🤗 Accelerate. Compared to a standard Accelerate setup, this requires only two extra lines of code. + +This example will use a very simplistic PyTorch training loop that performs gradient accumulation every two batches: + +```python +device = "cuda" +model.to(device) + +gradient_accumulation_steps = 2 + +for index, batch in enumerate(training_dataloader): + inputs, targets = batch + inputs = inputs.to(device) + targets = targets.to(device) + outputs = model(inputs) + loss = loss_function(outputs, targets) + loss = loss / gradient_accumulation_steps + loss.backward() + if (index + 1) % gradient_accumulation_steps == 0: + optimizer.step() + scheduler.step() + optimizer.zero_grad() +``` + +## Converting it to 🤗 Accelerate + +First the code shown earlier will be converted to use 🤗 Accelerate with neither a LocalSGD or a gradient accumulation helper: + +```diff ++ from accelerate import Accelerator ++ accelerator = Accelerator() + ++ model, optimizer, training_dataloader, scheduler = accelerator.prepare( ++ model, optimizer, training_dataloader, scheduler ++ ) + + for index, batch in enumerate(training_dataloader): + inputs, targets = batch +- inputs = inputs.to(device) +- targets = targets.to(device) + outputs = model(inputs) + loss = loss_function(outputs, targets) + loss = loss / gradient_accumulation_steps ++ accelerator.backward(loss) + if (index+1) % gradient_accumulation_steps == 0: + optimizer.step() + scheduler.step() +``` + +## Letting 🤗 Accelerate handle model synchronization + +All that is left now is to let 🤗 Accelerate handle model parameter synchronization **and** the gradient accumulation for us. For simplicity let us assume we need to synchronize every 8 steps. This is +achieved by adding one `with LocalSGD` statement and one call `local_sgd.step()` after every optimizer step: + +```diff ++local_sgd_steps=8 + ++with LocalSGD(accelerator=accelerator, model=model, local_sgd_steps=8, enabled=True) as local_sgd: + for batch in training_dataloader: + with accelerator.accumulate(model): + inputs, targets = batch + outputs = model(inputs) + loss = loss_function(outputs, targets) + accelerator.backward(loss) + optimizer.step() + scheduler.step() + optimizer.zero_grad() ++ local_sgd.step() +``` + +Under the hood, the Local SGD code **disables** automatic gradient synchornization (but accumulation still works as expected!). Instead it averages model parameters every `local_sgd_steps` steps (as well as in the end of the training loop). + +## Limitations + +The current implementation works only with basic multi-GPU (or multi-CPU) training without, e.g., [DeepSpeed.](https://github.com/microsoft/DeepSpeed). + +## References + + Although we are not aware of the true origins of this simple approach, the idea of local SGD is quite old and goes + back to at least: + + Zhang, J., De Sa, C., Mitliagkas, I., & Ré, C. (2016). [Parallel SGD: When does averaging help?. arXiv preprint + arXiv:1606.07365.](https://arxiv.org/abs/1606.07365) + + We credit the term Local SGD to the following paper (but there might be earlier references we are not aware of). + + Stich, Sebastian Urban. ["Local SGD Converges Fast and Communicates Little." ICLR 2019-International Conference on + Learning Representations. No. CONF. 2019.](https://arxiv.org/abs/1805.09767) diff --git a/examples/by_feature/README.md b/examples/by_feature/README.md index 4f69c1cbf42..c25090173d1 100644 --- a/examples/by_feature/README.md +++ b/examples/by_feature/README.md @@ -77,4 +77,15 @@ These arguments should be added at the end of any method for starting the python ```bash accelerate launch ./gradient_accumulation.py --gradient_accumulation_steps 5 -``` \ No newline at end of file +``` + +### LocalSGD (`local_sgd.py`) +- Shows how to use `Accelerator.no_sync` to prevent gradient averaging in a distributed setup. However, unlike gradient accumulation, this method does not change the effective batch size. Local SGD can be combined with gradient accumulation. + +These arguments should be added at the end of any method for starting the python script (such as `python`, `accelerate launch`, `python -m torchrun`), such as: + +```bash +accelerate launch ./local_sgd.py --local_sgd_steps 4 +``` + + diff --git a/examples/by_feature/local_sgd.py b/examples/by_feature/local_sgd.py new file mode 100644 index 00000000000..9dc94ce2f44 --- /dev/null +++ b/examples/by_feature/local_sgd.py @@ -0,0 +1,238 @@ +# coding=utf-8 +# Copyright 2023 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import argparse +import os + +import evaluate +import torch +from datasets import load_dataset +from torch.optim import AdamW +from torch.utils.data import DataLoader +from transformers import AutoModelForSequenceClassification, AutoTokenizer, get_linear_schedule_with_warmup, set_seed + +from accelerate import Accelerator, DistributedType +from accelerate.local_sgd import LocalSGD + + +######################################################################## +# This is a fully working simple example to use Accelerate +# with LocalSGD, which is a method to synchronize model +# parameters every K batches. It is different, but complementary +# to gradient accumulation. +# +# This example trains a Bert base model on GLUE MRPC +# in any of the following settings (with the same script): +# - single CPU or single GPU +# - multi GPUS (using PyTorch distributed mode) +# - (multi) TPUs +# - fp16 (mixed-precision) or fp32 (normal precision) +# +# To run it in each of these various modes, follow the instructions +# in the readme for examples: +# https://github.com/huggingface/accelerate/tree/main/examples +# +######################################################################## + + +MAX_GPU_BATCH_SIZE = 16 +EVAL_BATCH_SIZE = 32 + + +def get_dataloaders(accelerator: Accelerator, batch_size: int = 16): + """ + Creates a set of `DataLoader`s for the `glue` dataset, + using "bert-base-cased" as the tokenizer. + + Args: + accelerator (`Accelerator`): + An `Accelerator` object + batch_size (`int`, *optional*): + The batch size for the train and validation DataLoaders. + """ + tokenizer = AutoTokenizer.from_pretrained("bert-base-cased") + datasets = load_dataset("glue", "mrpc") + + def tokenize_function(examples): + # max_length=None => use the model max length (it's actually the default) + outputs = tokenizer(examples["sentence1"], examples["sentence2"], truncation=True, max_length=None) + return outputs + + # Apply the method we just defined to all the examples in all the splits of the dataset + # starting with the main process first: + with accelerator.main_process_first(): + tokenized_datasets = datasets.map( + tokenize_function, + batched=True, + remove_columns=["idx", "sentence1", "sentence2"], + ) + + # We also rename the 'label' column to 'labels' which is the expected name for labels by the models of the + # transformers library + tokenized_datasets = tokenized_datasets.rename_column("label", "labels") + + def collate_fn(examples): + # On TPU it's best to pad everything to the same length or training will be very slow. + max_length = 128 if accelerator.distributed_type == DistributedType.TPU else None + # When using mixed precision we want round multiples of 8/16 + if accelerator.mixed_precision == "fp8": + pad_to_multiple_of = 16 + elif accelerator.mixed_precision != "no": + pad_to_multiple_of = 8 + else: + pad_to_multiple_of = None + + return tokenizer.pad( + examples, + padding="longest", + max_length=max_length, + pad_to_multiple_of=pad_to_multiple_of, + return_tensors="pt", + ) + + # Instantiate dataloaders. + train_dataloader = DataLoader( + tokenized_datasets["train"], shuffle=True, collate_fn=collate_fn, batch_size=batch_size + ) + eval_dataloader = DataLoader( + tokenized_datasets["validation"], shuffle=False, collate_fn=collate_fn, batch_size=EVAL_BATCH_SIZE + ) + + return train_dataloader, eval_dataloader + + +# For testing only +if os.environ.get("TESTING_MOCKED_DATALOADERS", None) == "1": + from accelerate.test_utils.training import mocked_dataloaders + + get_dataloaders = mocked_dataloaders # noqa: F811 + + +def training_function(config, args): + # For testing only + if os.environ.get("TESTING_MOCKED_DATALOADERS", None) == "1": + config["num_epochs"] = 2 + # New Code # + gradient_accumulation_steps = int(args.gradient_accumulation_steps) + local_sgd_steps = int(args.local_sgd_steps) + # Initialize accelerator + accelerator = Accelerator( + cpu=args.cpu, mixed_precision=args.mixed_precision, gradient_accumulation_steps=gradient_accumulation_steps + ) + if accelerator.distributed_type not in [DistributedType.NO, DistributedType.MULTI_CPU, DistributedType.MULTI_GPU]: + raise NotImplementedError("LocalSGD is supported only for CPUs and GPUs (no DeepSpeed or MegatronLM)") + # Sample hyper-parameters for learning rate, batch size, seed and a few other HPs + lr = config["lr"] + num_epochs = int(config["num_epochs"]) + seed = int(config["seed"]) + batch_size = int(config["batch_size"]) + + metric = evaluate.load("glue", "mrpc") + + set_seed(seed) + train_dataloader, eval_dataloader = get_dataloaders(accelerator, batch_size) + # Instantiate the model (we build the model here so that the seed also control new weights initialization) + model = AutoModelForSequenceClassification.from_pretrained("bert-base-cased", return_dict=True) + + # We could avoid this line since the accelerator is set with `device_placement=True` (default value). + # Note that if you are placing tensors on devices manually, this line absolutely needs to be before the optimizer + # creation otherwise training will not work on TPU (`accelerate` will kindly throw an error to make us aware of that). + model = model.to(accelerator.device) + + # Instantiate optimizer + optimizer = AdamW(params=model.parameters(), lr=lr) + + # Instantiate scheduler + lr_scheduler = get_linear_schedule_with_warmup( + optimizer=optimizer, + num_warmup_steps=100, + num_training_steps=(len(train_dataloader) * num_epochs), + ) + + # Prepare everything + # There is no specific order to remember, we just need to unpack the objects in the same order we gave them to the + # prepare method. + model, optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare( + model, optimizer, train_dataloader, eval_dataloader, lr_scheduler + ) + + # Now we train the model + for epoch in range(num_epochs): + model.train() + with LocalSGD( + accelerator=accelerator, model=model, local_sgd_steps=local_sgd_steps, enabled=local_sgd_steps is not None + ) as local_sgd: + for step, batch in enumerate(train_dataloader): + # We could avoid this line since we set the accelerator with `device_placement=True`. + batch.to(accelerator.device) + # New code # + # We use the new `accumulate` context manager to perform gradient accumulation + # We also currently do not support TPUs nor advise it as bugs were found on the XLA side when running our tests. + with accelerator.accumulate(model): + output = model(**batch) + loss = output.loss + accelerator.backward(loss) + optimizer.step() + lr_scheduler.step() + optimizer.zero_grad() + # LocalSGD-specific line + local_sgd.step() + + model.eval() + for step, batch in enumerate(eval_dataloader): + # We could avoid this line since we set the accelerator with `device_placement=True`. + batch.to(accelerator.device) + with torch.no_grad(): + outputs = model(**batch) + predictions = outputs.logits.argmax(dim=-1) + predictions, references = accelerator.gather_for_metrics((predictions, batch["labels"])) + metric.add_batch( + predictions=predictions, + references=references, + ) + + eval_metric = metric.compute() + # Use accelerator.print to print only on the main process. + accelerator.print(f"epoch {epoch}:", eval_metric) + + +def main(): + parser = argparse.ArgumentParser(description="Simple example of training script.") + parser.add_argument( + "--mixed_precision", + type=str, + default=None, + choices=["no", "fp16", "bf16", "fp8"], + help="Whether to use mixed precision. Choose" + "between fp16 and bf16 (bfloat16). Bf16 requires PyTorch >= 1.10." + "and an Nvidia Ampere GPU.", + ) + # New Code # + parser.add_argument( + "--gradient_accumulation_steps", + type=int, + default=1, + help="The number of minibatches to be ran before gradients are accumulated.", + ) + parser.add_argument( + "--local_sgd_steps", type=int, default=8, help="Number of local SGD steps or None to disable local SGD" + ) + parser.add_argument("--cpu", action="store_true", help="If passed, will train on the CPU.") + args = parser.parse_args() + config = {"lr": 2e-5, "num_epochs": 3, "seed": 42, "batch_size": 16} + training_function(config, args) + + +if __name__ == "__main__": + main() diff --git a/src/accelerate/local_sgd.py b/src/accelerate/local_sgd.py new file mode 100644 index 00000000000..5a51d1655ff --- /dev/null +++ b/src/accelerate/local_sgd.py @@ -0,0 +1,100 @@ +# Copyright 2023 The HuggingFace Team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import torch + +from accelerate import Accelerator, DistributedType + + +class LocalSGD: + """ + A helper class to support local SGD on top of Accelerator. It simply runs a given number of updates independently + on each device, and averages model weights every K synchronization step. + + It should be used only in the multi-GPU (or multi-CPU) setup without extensions such as DeepSpeed. In particular, + this is a simple implementation that cannot support scenarios such as model parallelism. + + + Although we are not aware of the true origins of this simple approach, the idea of local SGD is quite old and goes + back to at least: + + Zhang, J., De Sa, C., Mitliagkas, I., & Ré, C. (2016). [Parallel SGD: When does averaging help?. arXiv preprint + arXiv:1606.07365.](https://arxiv.org/abs/1606.07365) + + We credit the term Local SGD to the following paper (but there might be earlier references we are not aware of). + + Stich, Sebastian Urban. ["Local SGD Converges Fast and Communicates Little." ICLR 2019-International Conference on + Learning Representations. No. CONF. 2019.](https://arxiv.org/abs/1805.09767) + + """ + + def __enter__(self): + if self.enabled: + self.model_sync_obj = self.model.no_sync() + self.model_sync_obj.__enter__() + + return self + + def __exit__(self, type, value, tb): + if self.enabled: + # Average all models on exit + self._sync_and_avg_model_params() + self.model_sync_obj.__exit__(type, value, tb) + + def __init__(self, accelerator: Accelerator, model: torch.nn.Module, local_sgd_steps: int, enabled: bool = True): + """ + Constructor. + + Args: + model (`torch.nn.Module): + The model whose parameters we need to average. + accelerator (`Accelerator`): + Accelerator object. + local_sgd_steps (`int`): + A number of local SGD steps (before model parameters are synchronized). + enabled (`bool): + Local SGD is disabled if this parameter set to `False`. + """ + if accelerator.distributed_type not in [ + DistributedType.NO, + DistributedType.MULTI_CPU, + DistributedType.MULTI_GPU, + ]: + raise NotImplementedError("LocalSGD is supported only for CPUs and GPUs (no DeepSpeed or MegatronLM)") + self.enabled = enabled and accelerator.distributed_type != DistributedType.NO + self.num_steps = 0 + if self.enabled: + self.accelerator = accelerator + self.model = model + self.local_sgd_steps = local_sgd_steps + + def step(self): + """ + This function makes a "step" and synchronizes model parameters if necessary. + """ + self.num_steps += 1 + if not self.enabled: + return + + if self.num_steps % self.local_sgd_steps == 0: + self._sync_and_avg_model_params() + + def _sync_and_avg_model_params(self): + """ + Synchronize + Average model parameters across all GPUs + """ + + self.accelerator.wait_for_everyone() + with self.accelerator.autocast(): + for param in self.model.parameters(): + param.data = self.accelerator.reduce(param.data, reduction="mean") diff --git a/tests/test_examples.py b/tests/test_examples.py index 05688f2b5e9..ae11287e1dd 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -34,6 +34,7 @@ EXCLUDE_EXAMPLES = [ "cross_validation.py", "gradient_accumulation.py", + "local_sgd.py", "multi_process_metrics.py", "memory.py", "automatic_gradient_accumulation.py", @@ -214,3 +215,7 @@ def test_tracking(self): def test_gradient_accumulation(self): testargs = ["examples/by_feature/gradient_accumulation.py"] run_command(self._launch_args + testargs) + + def test_local_sgd(self): + testargs = ["examples/by_feature/local_sgd.py"] + run_command(self._launch_args + testargs)