Skip to content

Commit

Permalink
Adding support for local SGD. (#1378)
Browse files Browse the repository at this point in the history
* Adding support for local SGD.

* Update src/accelerate/local_sgd.py

Co-authored-by: Zachary Mueller <muellerzr@gmail.com>

* Update src/accelerate/local_sgd.py

Co-authored-by: Zachary Mueller <muellerzr@gmail.com>

* Update src/accelerate/local_sgd.py

Co-authored-by: Zachary Mueller <muellerzr@gmail.com>

* fixing reduction + adding a test.

* style fix.

* Update docs/source/usage_guides/local_sgd.mdx

Co-authored-by: Sylvain Gugger <35901082+sgugger@users.noreply.github.com>

* Update src/accelerate/local_sgd.py

Co-authored-by: Sylvain Gugger <35901082+sgugger@users.noreply.github.com>

* Update examples/by_feature/local_sgd.py

Co-authored-by: Sylvain Gugger <35901082+sgugger@users.noreply.github.com>

---------

Co-authored-by: Zachary Mueller <muellerzr@gmail.com>
Co-authored-by: Sylvain Gugger <35901082+sgugger@users.noreply.github.com>
  • Loading branch information
3 people authored May 9, 2023
1 parent d95d68e commit da39665
Show file tree
Hide file tree
Showing 6 changed files with 463 additions and 2 deletions.
4 changes: 3 additions & 1 deletion docs/source/_toctree.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
title: How perform inference on large models with small resources
- local: usage_guides/gradient_accumulation
title: Performing gradient accumulation
- local: usage_guides/local_sgd
title: Accelerating training with local SGD
- local: usage_guides/checkpoint
title: Saving and loading training states
- local: usage_guides/tracking
Expand Down Expand Up @@ -79,4 +81,4 @@
title: Utility functions and classes
- local: package_reference/megatron_lm
title: Megatron-LM Utilities
title: "Reference"
title: "Reference"
105 changes: 105 additions & 0 deletions docs/source/usage_guides/local_sgd.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
<!--Copyright 2023 The HuggingFace Team. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
-->

# Using Local SGD with 🤗 Accelerate

Local SGD is a technique for distributed training where gradients are not synchronized every step. Thus, each process updates its own version of the model weights and after a given number of steps these weights are synchronized by averaging across all processes. This improves communication efficiency and can lead to substantial training speed up especially when a computer lacks a faster interconnect such as NVLink.
Unlike gradient accumulation (where improving communication efficiency requires increasing the effective batch size), Local SGD does not require changing a batch size or a learning rate / schedule. However, if necessary, Local SGD can be combined with gradient accumulation as well.

In this tutorial you will see how to quickly setup Local SGD 🤗 Accelerate. Compared to a standard Accelerate setup, this requires only two extra lines of code.

This example will use a very simplistic PyTorch training loop that performs gradient accumulation every two batches:

```python
device = "cuda"
model.to(device)

gradient_accumulation_steps = 2

for index, batch in enumerate(training_dataloader):
inputs, targets = batch
inputs = inputs.to(device)
targets = targets.to(device)
outputs = model(inputs)
loss = loss_function(outputs, targets)
loss = loss / gradient_accumulation_steps
loss.backward()
if (index + 1) % gradient_accumulation_steps == 0:
optimizer.step()
scheduler.step()
optimizer.zero_grad()
```

## Converting it to 🤗 Accelerate

First the code shown earlier will be converted to use 🤗 Accelerate with neither a LocalSGD or a gradient accumulation helper:

```diff
+ from accelerate import Accelerator
+ accelerator = Accelerator()

+ model, optimizer, training_dataloader, scheduler = accelerator.prepare(
+ model, optimizer, training_dataloader, scheduler
+ )

for index, batch in enumerate(training_dataloader):
inputs, targets = batch
- inputs = inputs.to(device)
- targets = targets.to(device)
outputs = model(inputs)
loss = loss_function(outputs, targets)
loss = loss / gradient_accumulation_steps
+ accelerator.backward(loss)
if (index+1) % gradient_accumulation_steps == 0:
optimizer.step()
scheduler.step()
```

## Letting 🤗 Accelerate handle model synchronization

All that is left now is to let 🤗 Accelerate handle model parameter synchronization **and** the gradient accumulation for us. For simplicity let us assume we need to synchronize every 8 steps. This is
achieved by adding one `with LocalSGD` statement and one call `local_sgd.step()` after every optimizer step:

```diff
+local_sgd_steps=8

+with LocalSGD(accelerator=accelerator, model=model, local_sgd_steps=8, enabled=True) as local_sgd:
for batch in training_dataloader:
with accelerator.accumulate(model):
inputs, targets = batch
outputs = model(inputs)
loss = loss_function(outputs, targets)
accelerator.backward(loss)
optimizer.step()
scheduler.step()
optimizer.zero_grad()
+ local_sgd.step()
```

Under the hood, the Local SGD code **disables** automatic gradient synchornization (but accumulation still works as expected!). Instead it averages model parameters every `local_sgd_steps` steps (as well as in the end of the training loop).

## Limitations

The current implementation works only with basic multi-GPU (or multi-CPU) training without, e.g., [DeepSpeed.](https://github.com/microsoft/DeepSpeed).

## References

Although we are not aware of the true origins of this simple approach, the idea of local SGD is quite old and goes
back to at least:

Zhang, J., De Sa, C., Mitliagkas, I., & Ré, C. (2016). [Parallel SGD: When does averaging help?. arXiv preprint
arXiv:1606.07365.](https://arxiv.org/abs/1606.07365)

We credit the term Local SGD to the following paper (but there might be earlier references we are not aware of).

Stich, Sebastian Urban. ["Local SGD Converges Fast and Communicates Little." ICLR 2019-International Conference on
Learning Representations. No. CONF. 2019.](https://arxiv.org/abs/1805.09767)
13 changes: 12 additions & 1 deletion examples/by_feature/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,15 @@ These arguments should be added at the end of any method for starting the python

```bash
accelerate launch ./gradient_accumulation.py --gradient_accumulation_steps 5
```
```

### LocalSGD (`local_sgd.py`)
- Shows how to use `Accelerator.no_sync` to prevent gradient averaging in a distributed setup. However, unlike gradient accumulation, this method does not change the effective batch size. Local SGD can be combined with gradient accumulation.

These arguments should be added at the end of any method for starting the python script (such as `python`, `accelerate launch`, `python -m torchrun`), such as:

```bash
accelerate launch ./local_sgd.py --local_sgd_steps 4
```


238 changes: 238 additions & 0 deletions examples/by_feature/local_sgd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
# coding=utf-8
# Copyright 2023 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import os

import evaluate
import torch
from datasets import load_dataset
from torch.optim import AdamW
from torch.utils.data import DataLoader
from transformers import AutoModelForSequenceClassification, AutoTokenizer, get_linear_schedule_with_warmup, set_seed

from accelerate import Accelerator, DistributedType
from accelerate.local_sgd import LocalSGD


########################################################################
# This is a fully working simple example to use Accelerate
# with LocalSGD, which is a method to synchronize model
# parameters every K batches. It is different, but complementary
# to gradient accumulation.
#
# This example trains a Bert base model on GLUE MRPC
# in any of the following settings (with the same script):
# - single CPU or single GPU
# - multi GPUS (using PyTorch distributed mode)
# - (multi) TPUs
# - fp16 (mixed-precision) or fp32 (normal precision)
#
# To run it in each of these various modes, follow the instructions
# in the readme for examples:
# https://github.com/huggingface/accelerate/tree/main/examples
#
########################################################################


MAX_GPU_BATCH_SIZE = 16
EVAL_BATCH_SIZE = 32


def get_dataloaders(accelerator: Accelerator, batch_size: int = 16):
"""
Creates a set of `DataLoader`s for the `glue` dataset,
using "bert-base-cased" as the tokenizer.
Args:
accelerator (`Accelerator`):
An `Accelerator` object
batch_size (`int`, *optional*):
The batch size for the train and validation DataLoaders.
"""
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
datasets = load_dataset("glue", "mrpc")

def tokenize_function(examples):
# max_length=None => use the model max length (it's actually the default)
outputs = tokenizer(examples["sentence1"], examples["sentence2"], truncation=True, max_length=None)
return outputs

# Apply the method we just defined to all the examples in all the splits of the dataset
# starting with the main process first:
with accelerator.main_process_first():
tokenized_datasets = datasets.map(
tokenize_function,
batched=True,
remove_columns=["idx", "sentence1", "sentence2"],
)

# We also rename the 'label' column to 'labels' which is the expected name for labels by the models of the
# transformers library
tokenized_datasets = tokenized_datasets.rename_column("label", "labels")

def collate_fn(examples):
# On TPU it's best to pad everything to the same length or training will be very slow.
max_length = 128 if accelerator.distributed_type == DistributedType.TPU else None
# When using mixed precision we want round multiples of 8/16
if accelerator.mixed_precision == "fp8":
pad_to_multiple_of = 16
elif accelerator.mixed_precision != "no":
pad_to_multiple_of = 8
else:
pad_to_multiple_of = None

return tokenizer.pad(
examples,
padding="longest",
max_length=max_length,
pad_to_multiple_of=pad_to_multiple_of,
return_tensors="pt",
)

# Instantiate dataloaders.
train_dataloader = DataLoader(
tokenized_datasets["train"], shuffle=True, collate_fn=collate_fn, batch_size=batch_size
)
eval_dataloader = DataLoader(
tokenized_datasets["validation"], shuffle=False, collate_fn=collate_fn, batch_size=EVAL_BATCH_SIZE
)

return train_dataloader, eval_dataloader


# For testing only
if os.environ.get("TESTING_MOCKED_DATALOADERS", None) == "1":
from accelerate.test_utils.training import mocked_dataloaders

get_dataloaders = mocked_dataloaders # noqa: F811


def training_function(config, args):
# For testing only
if os.environ.get("TESTING_MOCKED_DATALOADERS", None) == "1":
config["num_epochs"] = 2
# New Code #
gradient_accumulation_steps = int(args.gradient_accumulation_steps)
local_sgd_steps = int(args.local_sgd_steps)
# Initialize accelerator
accelerator = Accelerator(
cpu=args.cpu, mixed_precision=args.mixed_precision, gradient_accumulation_steps=gradient_accumulation_steps
)
if accelerator.distributed_type not in [DistributedType.NO, DistributedType.MULTI_CPU, DistributedType.MULTI_GPU]:
raise NotImplementedError("LocalSGD is supported only for CPUs and GPUs (no DeepSpeed or MegatronLM)")
# Sample hyper-parameters for learning rate, batch size, seed and a few other HPs
lr = config["lr"]
num_epochs = int(config["num_epochs"])
seed = int(config["seed"])
batch_size = int(config["batch_size"])

metric = evaluate.load("glue", "mrpc")

set_seed(seed)
train_dataloader, eval_dataloader = get_dataloaders(accelerator, batch_size)
# Instantiate the model (we build the model here so that the seed also control new weights initialization)
model = AutoModelForSequenceClassification.from_pretrained("bert-base-cased", return_dict=True)

# We could avoid this line since the accelerator is set with `device_placement=True` (default value).
# Note that if you are placing tensors on devices manually, this line absolutely needs to be before the optimizer
# creation otherwise training will not work on TPU (`accelerate` will kindly throw an error to make us aware of that).
model = model.to(accelerator.device)

# Instantiate optimizer
optimizer = AdamW(params=model.parameters(), lr=lr)

# Instantiate scheduler
lr_scheduler = get_linear_schedule_with_warmup(
optimizer=optimizer,
num_warmup_steps=100,
num_training_steps=(len(train_dataloader) * num_epochs),
)

# Prepare everything
# There is no specific order to remember, we just need to unpack the objects in the same order we gave them to the
# prepare method.
model, optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare(
model, optimizer, train_dataloader, eval_dataloader, lr_scheduler
)

# Now we train the model
for epoch in range(num_epochs):
model.train()
with LocalSGD(
accelerator=accelerator, model=model, local_sgd_steps=local_sgd_steps, enabled=local_sgd_steps is not None
) as local_sgd:
for step, batch in enumerate(train_dataloader):
# We could avoid this line since we set the accelerator with `device_placement=True`.
batch.to(accelerator.device)
# New code #
# We use the new `accumulate` context manager to perform gradient accumulation
# We also currently do not support TPUs nor advise it as bugs were found on the XLA side when running our tests.
with accelerator.accumulate(model):
output = model(**batch)
loss = output.loss
accelerator.backward(loss)
optimizer.step()
lr_scheduler.step()
optimizer.zero_grad()
# LocalSGD-specific line
local_sgd.step()

model.eval()
for step, batch in enumerate(eval_dataloader):
# We could avoid this line since we set the accelerator with `device_placement=True`.
batch.to(accelerator.device)
with torch.no_grad():
outputs = model(**batch)
predictions = outputs.logits.argmax(dim=-1)
predictions, references = accelerator.gather_for_metrics((predictions, batch["labels"]))
metric.add_batch(
predictions=predictions,
references=references,
)

eval_metric = metric.compute()
# Use accelerator.print to print only on the main process.
accelerator.print(f"epoch {epoch}:", eval_metric)


def main():
parser = argparse.ArgumentParser(description="Simple example of training script.")
parser.add_argument(
"--mixed_precision",
type=str,
default=None,
choices=["no", "fp16", "bf16", "fp8"],
help="Whether to use mixed precision. Choose"
"between fp16 and bf16 (bfloat16). Bf16 requires PyTorch >= 1.10."
"and an Nvidia Ampere GPU.",
)
# New Code #
parser.add_argument(
"--gradient_accumulation_steps",
type=int,
default=1,
help="The number of minibatches to be ran before gradients are accumulated.",
)
parser.add_argument(
"--local_sgd_steps", type=int, default=8, help="Number of local SGD steps or None to disable local SGD"
)
parser.add_argument("--cpu", action="store_true", help="If passed, will train on the CPU.")
args = parser.parse_args()
config = {"lr": 2e-5, "num_epochs": 3, "seed": 42, "batch_size": 16}
training_function(config, args)


if __name__ == "__main__":
main()
Loading

0 comments on commit da39665

Please sign in to comment.