Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[hotfix] set return_outputs=False in examples and polish code #5404

Merged
merged 6 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion applications/ColossalMoE/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ def main():
lambda x, y: x.loss,
optimizer,
return_loss=True,
return_outputs=True,
)
# Backward and optimize
if is_pp_last_stage:
Expand Down
3 changes: 3 additions & 0 deletions colossalai/booster/plugin/hybrid_parallel_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,9 @@ def execute_pipeline(
) -> dict:
assert self.enable_pipeline_parallelism, "pipeline parallelism is not enabled"

if return_outputs:
warnings.warn("return_outputs may lead to significant extra memory consumption.")

# Create a context for gradient synchronization based on the optimizer type.
# If it's a HybridParallelZeroOptimizer, use optimizer.no_sync(); otherwise, use model.no_sync().
# This is to avoid redundant gradient reduction in pipeline parallelism (multiple microbatch values should be reduced once),
Expand Down
10 changes: 3 additions & 7 deletions colossalai/pipeline/schedule/one_f_one_b.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from torch.utils._pytree import tree_map

from colossalai.accelerator import get_accelerator
from colossalai.interface import ModelWrapper, OptimizerWrapper
from colossalai.interface import OptimizerWrapper
from colossalai.pipeline.p2p import PipelineP2PCommunication, create_send_metadata
from colossalai.pipeline.stage_manager import PipelineStageManager
from colossalai.utils import get_current_device
Expand Down Expand Up @@ -327,9 +327,7 @@ def run_forward_only(
self.send_forward(output_obj)

if outputs is not None:
if isinstance(model, ModelWrapper):
model = model.unwrap()
outputs = merge_batch(outputs, getattr(model, "batch_size_dim", 0))
outputs = merge_batch(outputs)
return {"loss": accum_loss, "outputs": outputs}

def run_forward_backward(
Expand Down Expand Up @@ -412,9 +410,7 @@ def run_forward_backward(
assert all(len(v) == 0 for v in input_objs) and all(len(v) == 0 for v in output_objs)

if outputs is not None:
if isinstance(model, ModelWrapper):
model = model.unwrap()
outputs = merge_batch(outputs, getattr(model, "batch_size_dim", 0))
outputs = merge_batch(outputs)
return {"loss": accum_loss, "outputs": outputs}

def forward_backward_step(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def train_epoch(
for _ in pbar:
if use_pipeline:
outputs = booster.execute_pipeline(
train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True
train_dataloader_iter, model, _criterion, optimizer, return_loss=True
)
# Backward and optimize
if is_pp_last_stage:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def run_forward_backward(
if isinstance(booster.plugin, HybridParallelPlugin) and booster.plugin.pp_size > 1:
# run pipeline forward backward when enabling pp in hybrid parallel plugin
output_dict = booster.execute_pipeline(
data_iter, model, criterion, optimizer, return_loss=True, return_outputs=True
data_iter, model, criterion, optimizer, return_loss=True
)
loss, outputs = output_dict["loss"], output_dict["outputs"]
else:
Expand Down
3 changes: 1 addition & 2 deletions docs/source/en/features/pipeline_parallel.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,7 @@ def train_epoch(epoch: int, model: nn.Module, optimizer: Optimizer, _criterion:
model,
_criterion,
optimizer,
return_loss=True,
return_outputs=True)
return_loss=True)
# Backward and optimize
if is_pp_last_stage:
loss = outputs['loss']
Expand Down
2 changes: 1 addition & 1 deletion docs/source/en/features/shardformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ However, if pipeline parallel is enabled, there are several usages different fro
3. Do forward and backward passing through calling `Booster.execute_pipeline` method:
```python
outputs = booster.execute_pipeline(
train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True
train_dataloader_iter, model, _criterion, optimizer, return_loss=True
)
```
Backward passing has been completed by this method, so there is no need to call `loss.backward()` after executing this method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def train_epoch(
for _ in pbar:
if use_pipeline:
outputs = booster.execute_pipeline(
train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True
train_dataloader_iter, model, _criterion, optimizer, return_loss=True
)
# Backward and optimize
if is_pp_last_stage:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def run_forward_backward(
if isinstance(booster.plugin, HybridParallelPlugin) and booster.plugin.pp_size > 1:
# run pipeline forward backward when enabling pp in hybrid parallel plugin
output_dict = booster.execute_pipeline(
data_iter, model, criterion, optimizer, return_loss=True, return_outputs=True
data_iter, model, criterion, optimizer, return_loss=True
)
loss, outputs = output_dict["loss"], output_dict["outputs"]
else:
Expand Down
3 changes: 1 addition & 2 deletions docs/source/zh-Hans/features/pipeline_parallel.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,7 @@ def train_epoch(epoch: int, model: nn.Module, optimizer: Optimizer, _criterion:
model,
_criterion,
optimizer,
return_loss=True,
return_outputs=True)
return_loss=True)
# Backward and optimize
if is_pp_last_stage:
loss = outputs['loss']
Expand Down
2 changes: 1 addition & 1 deletion docs/source/zh-Hans/features/shardformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ elif args.plugin == "hybrid_parallel":
3. 通过调用`Booster.execute_pipeline` 方法来执行前向和后向传递:
```python
outputs = booster.execute_pipeline(
train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True
train_dataloader_iter, model, _criterion, optimizer, return_loss=True
)
```
该方法会自动执行后向传递,所以在执行该方法后不需要再调用 `loss.backward()`方法。
Expand Down
2 changes: 1 addition & 1 deletion examples/images/vit/vit_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def criterion(outputs, inputs):
# run pipeline forward backward
batch = iter([batch])
outputs = booster.execute_pipeline(
batch, model, criterion, optimizer, return_loss=True, return_outputs=True
batch, model, criterion, optimizer, return_loss=True
)
else:
outputs = model(**batch)
Expand Down
2 changes: 1 addition & 1 deletion examples/language/bert/finetune.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def train_epoch(
for _ in pbar:
if use_pipeline:
outputs = booster.execute_pipeline(
train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True
train_dataloader_iter, model, _criterion, optimizer, return_loss=True
)
# Backward and optimize
if is_pp_last_device:
Expand Down
2 changes: 1 addition & 1 deletion examples/language/gpt/hybridparallelism/finetune.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def train_epoch(
for _ in pbar:
if use_pipeline:
outputs = booster.execute_pipeline(
train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True
train_dataloader_iter, model, _criterion, optimizer, return_loss=True
)
# Backward and optimize
if is_pp_last_stage:
Expand Down
2 changes: 1 addition & 1 deletion examples/language/llama2/finetune.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def main():
for step in pbar:
if use_pipeline:
outputs = booster.execute_pipeline(
dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True
dataloader_iter, model, _criterion, optimizer, return_loss=True
)
loss = outputs["loss"]
else:
Expand Down
4 changes: 2 additions & 2 deletions examples/language/llama2/pretrain.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def main():
microbatch_size=1,
enable_jit_fused=False,
zero_stage=0,
precision="fp32",
precision=args.mixed_precision,
initial_scale=1,
)
else:
Expand Down Expand Up @@ -286,7 +286,7 @@ def main():
for step in pbar:
if use_pipeline:
outputs = booster.execute_pipeline(
dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True
dataloader_iter, model, _criterion, optimizer, return_loss=True
)
loss = outputs["loss"]
else:
Expand Down
1 change: 0 additions & 1 deletion examples/language/openmoe/benchmark/benchmark_cai.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ def main():
lambda x, y: x.loss,
optimizer,
return_loss=True,
return_outputs=True,
)
# Backward and optimize
if is_pp_last_stage:
Expand Down
1 change: 0 additions & 1 deletion examples/language/openmoe/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,6 @@ def main():
lambda x, y: x.loss,
optimizer,
return_loss=True,
return_outputs=True,
)
# Backward and optimize
if is_pp_last_stage:
Expand Down
2 changes: 1 addition & 1 deletion examples/language/opt/opt_train_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def train_epoch(epoch, model, optimizer, _criterion, lr_scheduler, dataloader, b
for _ in pbar:
if use_pipeline:
outputs = booster.execute_pipeline(
dataloader, model, _criterion, optimizer, return_loss=True, return_outputs=True
dataloader, model, _criterion, optimizer, return_loss=True
)
# Backward and optimize
if is_pp_last_stage:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_booster/test_plugin/test_3d_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def _criterion(outputs, inputs):
loss = criterion(outputs[output_key])
return loss

booster.execute_pipeline(data_iter, model, _criterion, optimizer, return_loss=True, return_outputs=False)
booster.execute_pipeline(data_iter, model, _criterion, optimizer, return_loss=True)
optimizer.step()

except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _preprocess_data(data):
model.train()
if booster.plugin.stage_manager is not None:
booster.execute_pipeline(
_preprocess_data(data), model, _criterion, optimizer, return_loss=True, return_outputs=False
_preprocess_data(data), model, _criterion, optimizer, return_loss=True
)
else:
output = model(**_preprocess_data(data))
Expand Down Expand Up @@ -109,15 +109,14 @@ def _preprocess_data(data):
data_for_origin = data_gen_fn()
if booster.plugin.stage_manager is not None:
booster.execute_pipeline(
_preprocess_data(data_for_shard), model, _criterion, optimizer, return_loss=True, return_outputs=False
_preprocess_data(data_for_shard), model, _criterion, optimizer, return_loss=True
)
booster.execute_pipeline(
_preprocess_data(data_for_origin),
new_model,
_criterion,
new_optimizer,
return_loss=True,
return_outputs=False,
)
else:
old_model_loss = criterion(model(**_preprocess_data(data_for_shard)))
Expand Down
1 change: 0 additions & 1 deletion tests/test_moe/test_moe_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def run_fwd_bwd(
lambda x, y: x.loss,
optimizer,
return_loss=True,
return_outputs=True,
)
# Backward and optimize
if is_pp_last_stage:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_pipeline/test_schedule/test_interleaved.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def criterion(x, *args, **kwargs):
torch_loss.backward()

pp_ret = schedule.forward_backward_step(
sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True, return_outputs=True
sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True
)

# check loss
Expand Down Expand Up @@ -134,7 +134,7 @@ def criterion(x, *args, **kwargs):
torch_loss = criterion(torch_output)

pp_ret = schedule.forward_backward_step(
sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True, return_outputs=True
sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True
)
if stage_manager.is_last_stage(ignore_chunk=True):
assert torch.allclose(torch_loss, pp_ret["loss"])
Expand Down
4 changes: 2 additions & 2 deletions tests/test_pipeline/test_schedule/test_oneF_oneB.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def custom_fwd(self, x):
torch_loss = criterion(torch_output)
torch_loss.backward()
pp_ret = schedule.forward_backward_step(
sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True, return_outputs=True
sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True
)

# check loss
Expand Down Expand Up @@ -130,7 +130,7 @@ def custom_fwd(self, x):
torch_loss = criterion(torch_output)

pp_ret = schedule.forward_backward_step(
sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True, return_outputs=True
sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True
)
if stage_manager.is_last_stage():
assert torch.allclose(torch_loss, pp_ret["loss"])
Expand Down
Loading