diff --git a/applications/ColossalMoE/train.py b/applications/ColossalMoE/train.py index c567038ec252..99603282baf3 100644 --- a/applications/ColossalMoE/train.py +++ b/applications/ColossalMoE/train.py @@ -238,7 +238,6 @@ def main(): lambda x, y: x.loss, optimizer, return_loss=True, - return_outputs=True, ) # Backward and optimize if is_pp_last_stage: diff --git a/colossalai/booster/plugin/hybrid_parallel_plugin.py b/colossalai/booster/plugin/hybrid_parallel_plugin.py index c37a6b4df72d..f51cb060c356 100644 --- a/colossalai/booster/plugin/hybrid_parallel_plugin.py +++ b/colossalai/booster/plugin/hybrid_parallel_plugin.py @@ -1183,6 +1183,9 @@ def execute_pipeline( ) -> dict: assert self.enable_pipeline_parallelism, "pipeline parallelism is not enabled" + if return_outputs: + warnings.warn("return_outputs may lead to significant extra memory consumption.") + # Create a context for gradient synchronization based on the optimizer type. # If it's a HybridParallelZeroOptimizer, use optimizer.no_sync(); otherwise, use model.no_sync(). # This is to avoid redundant gradient reduction in pipeline parallelism (multiple microbatch values should be reduced once), diff --git a/colossalai/pipeline/schedule/one_f_one_b.py b/colossalai/pipeline/schedule/one_f_one_b.py index bf2f01b10e9b..58008b98f24e 100644 --- a/colossalai/pipeline/schedule/one_f_one_b.py +++ b/colossalai/pipeline/schedule/one_f_one_b.py @@ -7,7 +7,7 @@ from torch.utils._pytree import tree_map from colossalai.accelerator import get_accelerator -from colossalai.interface import ModelWrapper, OptimizerWrapper +from colossalai.interface import OptimizerWrapper from colossalai.pipeline.p2p import PipelineP2PCommunication, create_send_metadata from colossalai.pipeline.stage_manager import PipelineStageManager from colossalai.utils import get_current_device @@ -327,9 +327,7 @@ def run_forward_only( self.send_forward(output_obj) if outputs is not None: - if isinstance(model, ModelWrapper): - model = model.unwrap() - outputs = merge_batch(outputs, getattr(model, "batch_size_dim", 0)) + outputs = merge_batch(outputs) return {"loss": accum_loss, "outputs": outputs} def run_forward_backward( @@ -412,9 +410,7 @@ def run_forward_backward( assert all(len(v) == 0 for v in input_objs) and all(len(v) == 0 for v in output_objs) if outputs is not None: - if isinstance(model, ModelWrapper): - model = model.unwrap() - outputs = merge_batch(outputs, getattr(model, "batch_size_dim", 0)) + outputs = merge_batch(outputs) return {"loss": accum_loss, "outputs": outputs} def forward_backward_step( diff --git a/docs/source/en/advanced_tutorials/train_gpt_using_hybrid_parallelism.md b/docs/source/en/advanced_tutorials/train_gpt_using_hybrid_parallelism.md index e87eafb6eec7..0133dfd86ddf 100644 --- a/docs/source/en/advanced_tutorials/train_gpt_using_hybrid_parallelism.md +++ b/docs/source/en/advanced_tutorials/train_gpt_using_hybrid_parallelism.md @@ -178,7 +178,7 @@ def train_epoch( for _ in pbar: if use_pipeline: outputs = booster.execute_pipeline( - train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True + train_dataloader_iter, model, _criterion, optimizer, return_loss=True ) # Backward and optimize if is_pp_last_stage: diff --git a/docs/source/en/advanced_tutorials/train_vit_with_hybrid_parallelism.md b/docs/source/en/advanced_tutorials/train_vit_with_hybrid_parallelism.md index 93fed61c34da..dfc2cd596d79 100644 --- a/docs/source/en/advanced_tutorials/train_vit_with_hybrid_parallelism.md +++ b/docs/source/en/advanced_tutorials/train_vit_with_hybrid_parallelism.md @@ -231,7 +231,7 @@ def run_forward_backward( if isinstance(booster.plugin, HybridParallelPlugin) and booster.plugin.pp_size > 1: # run pipeline forward backward when enabling pp in hybrid parallel plugin output_dict = booster.execute_pipeline( - data_iter, model, criterion, optimizer, return_loss=True, return_outputs=True + data_iter, model, criterion, optimizer, return_loss=True ) loss, outputs = output_dict["loss"], output_dict["outputs"] else: diff --git a/docs/source/en/features/pipeline_parallel.md b/docs/source/en/features/pipeline_parallel.md index 31b20335e529..d6f3cdfafe16 100644 --- a/docs/source/en/features/pipeline_parallel.md +++ b/docs/source/en/features/pipeline_parallel.md @@ -198,8 +198,7 @@ def train_epoch(epoch: int, model: nn.Module, optimizer: Optimizer, _criterion: model, _criterion, optimizer, - return_loss=True, - return_outputs=True) + return_loss=True) # Backward and optimize if is_pp_last_stage: loss = outputs['loss'] diff --git a/docs/source/en/features/shardformer.md b/docs/source/en/features/shardformer.md index 1e633ebc0f7c..672945ea2336 100644 --- a/docs/source/en/features/shardformer.md +++ b/docs/source/en/features/shardformer.md @@ -271,7 +271,7 @@ However, if pipeline parallel is enabled, there are several usages different fro 3. Do forward and backward passing through calling `Booster.execute_pipeline` method: ```python outputs = booster.execute_pipeline( - train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True + train_dataloader_iter, model, _criterion, optimizer, return_loss=True ) ``` Backward passing has been completed by this method, so there is no need to call `loss.backward()` after executing this method. diff --git a/docs/source/zh-Hans/advanced_tutorials/train_gpt_using_hybrid_parallelism.md b/docs/source/zh-Hans/advanced_tutorials/train_gpt_using_hybrid_parallelism.md index ae941b489b90..cf7d191723e1 100644 --- a/docs/source/zh-Hans/advanced_tutorials/train_gpt_using_hybrid_parallelism.md +++ b/docs/source/zh-Hans/advanced_tutorials/train_gpt_using_hybrid_parallelism.md @@ -175,7 +175,7 @@ def train_epoch( for _ in pbar: if use_pipeline: outputs = booster.execute_pipeline( - train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True + train_dataloader_iter, model, _criterion, optimizer, return_loss=True ) # Backward and optimize if is_pp_last_stage: diff --git a/docs/source/zh-Hans/advanced_tutorials/train_vit_with_hybrid_parallelism.md b/docs/source/zh-Hans/advanced_tutorials/train_vit_with_hybrid_parallelism.md index 3de41601a231..92775bafb22b 100644 --- a/docs/source/zh-Hans/advanced_tutorials/train_vit_with_hybrid_parallelism.md +++ b/docs/source/zh-Hans/advanced_tutorials/train_vit_with_hybrid_parallelism.md @@ -234,7 +234,7 @@ def run_forward_backward( if isinstance(booster.plugin, HybridParallelPlugin) and booster.plugin.pp_size > 1: # run pipeline forward backward when enabling pp in hybrid parallel plugin output_dict = booster.execute_pipeline( - data_iter, model, criterion, optimizer, return_loss=True, return_outputs=True + data_iter, model, criterion, optimizer, return_loss=True ) loss, outputs = output_dict["loss"], output_dict["outputs"] else: diff --git a/docs/source/zh-Hans/features/pipeline_parallel.md b/docs/source/zh-Hans/features/pipeline_parallel.md index e688020556d8..38e1fbfc56b4 100644 --- a/docs/source/zh-Hans/features/pipeline_parallel.md +++ b/docs/source/zh-Hans/features/pipeline_parallel.md @@ -193,8 +193,7 @@ def train_epoch(epoch: int, model: nn.Module, optimizer: Optimizer, _criterion: model, _criterion, optimizer, - return_loss=True, - return_outputs=True) + return_loss=True) # Backward and optimize if is_pp_last_stage: loss = outputs['loss'] diff --git a/docs/source/zh-Hans/features/shardformer.md b/docs/source/zh-Hans/features/shardformer.md index 972c48b0c7a2..a7bcbd9f2816 100644 --- a/docs/source/zh-Hans/features/shardformer.md +++ b/docs/source/zh-Hans/features/shardformer.md @@ -264,7 +264,7 @@ elif args.plugin == "hybrid_parallel": 3. 通过调用`Booster.execute_pipeline` 方法来执行前向和后向传递: ```python outputs = booster.execute_pipeline( - train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True + train_dataloader_iter, model, _criterion, optimizer, return_loss=True ) ``` 该方法会自动执行后向传递,所以在执行该方法后不需要再调用 `loss.backward()`方法。 diff --git a/examples/images/vit/vit_benchmark.py b/examples/images/vit/vit_benchmark.py index 0780173241aa..32b1ec803aec 100644 --- a/examples/images/vit/vit_benchmark.py +++ b/examples/images/vit/vit_benchmark.py @@ -120,7 +120,7 @@ def criterion(outputs, inputs): # run pipeline forward backward batch = iter([batch]) outputs = booster.execute_pipeline( - batch, model, criterion, optimizer, return_loss=True, return_outputs=True + batch, model, criterion, optimizer, return_loss=True ) else: outputs = model(**batch) diff --git a/examples/language/bert/finetune.py b/examples/language/bert/finetune.py index 0b1e77ffff06..bd6c393a7ddc 100644 --- a/examples/language/bert/finetune.py +++ b/examples/language/bert/finetune.py @@ -148,7 +148,7 @@ def train_epoch( for _ in pbar: if use_pipeline: outputs = booster.execute_pipeline( - train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True + train_dataloader_iter, model, _criterion, optimizer, return_loss=True ) # Backward and optimize if is_pp_last_device: diff --git a/examples/language/gpt/hybridparallelism/finetune.py b/examples/language/gpt/hybridparallelism/finetune.py index eb56ee530a0a..888f47aaaab0 100644 --- a/examples/language/gpt/hybridparallelism/finetune.py +++ b/examples/language/gpt/hybridparallelism/finetune.py @@ -145,7 +145,7 @@ def train_epoch( for _ in pbar: if use_pipeline: outputs = booster.execute_pipeline( - train_dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True + train_dataloader_iter, model, _criterion, optimizer, return_loss=True ) # Backward and optimize if is_pp_last_stage: diff --git a/examples/language/llama2/finetune.py b/examples/language/llama2/finetune.py index 3dbd0cf357b4..122186c30a58 100644 --- a/examples/language/llama2/finetune.py +++ b/examples/language/llama2/finetune.py @@ -271,7 +271,7 @@ def main(): for step in pbar: if use_pipeline: outputs = booster.execute_pipeline( - dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True + dataloader_iter, model, _criterion, optimizer, return_loss=True ) loss = outputs["loss"] else: diff --git a/examples/language/llama2/pretrain.py b/examples/language/llama2/pretrain.py index fe7d958307e9..7b5805b801a8 100644 --- a/examples/language/llama2/pretrain.py +++ b/examples/language/llama2/pretrain.py @@ -185,7 +185,7 @@ def main(): microbatch_size=1, enable_jit_fused=False, zero_stage=0, - precision="fp32", + precision=args.mixed_precision, initial_scale=1, ) else: @@ -286,7 +286,7 @@ def main(): for step in pbar: if use_pipeline: outputs = booster.execute_pipeline( - dataloader_iter, model, _criterion, optimizer, return_loss=True, return_outputs=True + dataloader_iter, model, _criterion, optimizer, return_loss=True ) loss = outputs["loss"] else: diff --git a/examples/language/openmoe/benchmark/benchmark_cai.py b/examples/language/openmoe/benchmark/benchmark_cai.py index 770c500d86bf..a6d5f8bf2c0e 100644 --- a/examples/language/openmoe/benchmark/benchmark_cai.py +++ b/examples/language/openmoe/benchmark/benchmark_cai.py @@ -270,7 +270,6 @@ def main(): lambda x, y: x.loss, optimizer, return_loss=True, - return_outputs=True, ) # Backward and optimize if is_pp_last_stage: diff --git a/examples/language/openmoe/train.py b/examples/language/openmoe/train.py index 89c4d5420994..f3267b7c6a68 100644 --- a/examples/language/openmoe/train.py +++ b/examples/language/openmoe/train.py @@ -340,7 +340,6 @@ def main(): lambda x, y: x.loss, optimizer, return_loss=True, - return_outputs=True, ) # Backward and optimize if is_pp_last_stage: diff --git a/examples/language/opt/opt_train_demo.py b/examples/language/opt/opt_train_demo.py index fddbc1b408e7..82dff1920fde 100644 --- a/examples/language/opt/opt_train_demo.py +++ b/examples/language/opt/opt_train_demo.py @@ -42,7 +42,7 @@ def train_epoch(epoch, model, optimizer, _criterion, lr_scheduler, dataloader, b for _ in pbar: if use_pipeline: outputs = booster.execute_pipeline( - dataloader, model, _criterion, optimizer, return_loss=True, return_outputs=True + dataloader, model, _criterion, optimizer, return_loss=True ) # Backward and optimize if is_pp_last_stage: diff --git a/tests/test_booster/test_plugin/test_3d_plugin.py b/tests/test_booster/test_plugin/test_3d_plugin.py index 38361d803c49..61558c003c91 100644 --- a/tests/test_booster/test_plugin/test_3d_plugin.py +++ b/tests/test_booster/test_plugin/test_3d_plugin.py @@ -74,7 +74,7 @@ def _criterion(outputs, inputs): loss = criterion(outputs[output_key]) return loss - booster.execute_pipeline(data_iter, model, _criterion, optimizer, return_loss=True, return_outputs=False) + booster.execute_pipeline(data_iter, model, _criterion, optimizer, return_loss=True) optimizer.step() except Exception as e: diff --git a/tests/test_checkpoint_io/test_hybrid_parallel_plugin_checkpoint_io.py b/tests/test_checkpoint_io/test_hybrid_parallel_plugin_checkpoint_io.py index b5cb31715aed..557666a804e3 100644 --- a/tests/test_checkpoint_io/test_hybrid_parallel_plugin_checkpoint_io.py +++ b/tests/test_checkpoint_io/test_hybrid_parallel_plugin_checkpoint_io.py @@ -75,7 +75,7 @@ def _preprocess_data(data): model.train() if booster.plugin.stage_manager is not None: booster.execute_pipeline( - _preprocess_data(data), model, _criterion, optimizer, return_loss=True, return_outputs=False + _preprocess_data(data), model, _criterion, optimizer, return_loss=True ) else: output = model(**_preprocess_data(data)) @@ -109,7 +109,7 @@ def _preprocess_data(data): data_for_origin = data_gen_fn() if booster.plugin.stage_manager is not None: booster.execute_pipeline( - _preprocess_data(data_for_shard), model, _criterion, optimizer, return_loss=True, return_outputs=False + _preprocess_data(data_for_shard), model, _criterion, optimizer, return_loss=True ) booster.execute_pipeline( _preprocess_data(data_for_origin), @@ -117,7 +117,6 @@ def _preprocess_data(data): _criterion, new_optimizer, return_loss=True, - return_outputs=False, ) else: old_model_loss = criterion(model(**_preprocess_data(data_for_shard))) diff --git a/tests/test_moe/test_moe_checkpoint.py b/tests/test_moe/test_moe_checkpoint.py index d6dad2d7fb41..10e63592ac07 100644 --- a/tests/test_moe/test_moe_checkpoint.py +++ b/tests/test_moe/test_moe_checkpoint.py @@ -49,7 +49,6 @@ def run_fwd_bwd( lambda x, y: x.loss, optimizer, return_loss=True, - return_outputs=True, ) # Backward and optimize if is_pp_last_stage: diff --git a/tests/test_pipeline/test_schedule/test_interleaved.py b/tests/test_pipeline/test_schedule/test_interleaved.py index 0e81818eb239..7aa4640553ca 100644 --- a/tests/test_pipeline/test_schedule/test_interleaved.py +++ b/tests/test_pipeline/test_schedule/test_interleaved.py @@ -104,7 +104,7 @@ def criterion(x, *args, **kwargs): torch_loss.backward() pp_ret = schedule.forward_backward_step( - sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True, return_outputs=True + sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True ) # check loss @@ -134,7 +134,7 @@ def criterion(x, *args, **kwargs): torch_loss = criterion(torch_output) pp_ret = schedule.forward_backward_step( - sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True, return_outputs=True + sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True ) if stage_manager.is_last_stage(ignore_chunk=True): assert torch.allclose(torch_loss, pp_ret["loss"]) diff --git a/tests/test_pipeline/test_schedule/test_oneF_oneB.py b/tests/test_pipeline/test_schedule/test_oneF_oneB.py index a08dc6d277d0..e1a679890c8d 100644 --- a/tests/test_pipeline/test_schedule/test_oneF_oneB.py +++ b/tests/test_pipeline/test_schedule/test_oneF_oneB.py @@ -100,7 +100,7 @@ def custom_fwd(self, x): torch_loss = criterion(torch_output) torch_loss.backward() pp_ret = schedule.forward_backward_step( - sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True, return_outputs=True + sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True ) # check loss @@ -130,7 +130,7 @@ def custom_fwd(self, x): torch_loss = criterion(torch_output) pp_ret = schedule.forward_backward_step( - sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True, return_outputs=True + sharded_model, iter(input_list), criterion, pp_optimizer, return_loss=True ) if stage_manager.is_last_stage(): assert torch.allclose(torch_loss, pp_ret["loss"])