From e403bcf5800509ae18bf996f3cc5ca89ae3c250c Mon Sep 17 00:00:00 2001 From: zhaoyingli Date: Mon, 19 Jun 2023 10:51:27 +0800 Subject: [PATCH 1/9] auto parallel support pipeline scheduler with standalone executor --- .../framework/new_executor/interpretercore.cc | 4 + .../operators/controlflow/fetch_v2_op.cc | 2 + .../auto_parallel/static/engine.py | 6 +- .../auto_parallel/static/parallelizer_v2.py | 38 ++++-- .../passes/auto_parallel_gradient_merge.py | 9 +- .../passes/pipeline_scheduler_pass.py | 23 +++- python/paddle/fluid/executor.py | 92 ++++++++++++-- python/paddle/fluid/framework.py | 2 + .../pipeline_scheduler_FThenB.py | 114 ++++++++++++++++++ .../test_pipeline_scheduler_FThenB.py | 57 +++++++++ 10 files changed, 322 insertions(+), 25 deletions(-) create mode 100644 test/auto_parallel/pipeline_scheduler_FThenB.py create mode 100644 test/auto_parallel/test_pipeline_scheduler_FThenB.py diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index 0bb1113e0d8a5..ddfb9d3aa3dfd 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -52,6 +52,10 @@ PADDLE_DEFINE_EXPORTED_bool(new_executor_use_local_scope, true, "Use local_scope in new executor(especially used " "in UT), can turn off for better performance"); +PADDLE_DEFINE_EXPORTED_bool( + new_executor_micro_batching, + false, + "Enable micro_batching schedule for standalone executor , used for debug."); PHI_DECLARE_bool(check_nan_inf); DECLARE_bool(benchmark); diff --git a/paddle/fluid/operators/controlflow/fetch_v2_op.cc b/paddle/fluid/operators/controlflow/fetch_v2_op.cc index 9105fe8886296..bd0df3ba04e64 100644 --- a/paddle/fluid/operators/controlflow/fetch_v2_op.cc +++ b/paddle/fluid/operators/controlflow/fetch_v2_op.cc @@ -140,6 +140,8 @@ class FetchV2Kernel { "operator 'Fetch') of current fetching variable to be " "no less than 0. But received column index = %d.", col)); + VLOG(3) << "Fetch variable " << fetch_var_name << "'s " << col + << " column."; auto *fetch_list = out_var->GetMutable(); diff --git a/python/paddle/distributed/auto_parallel/static/engine.py b/python/paddle/distributed/auto_parallel/static/engine.py index 70df712d3ba22..dc8ac1a90e999 100644 --- a/python/paddle/distributed/auto_parallel/static/engine.py +++ b/python/paddle/distributed/auto_parallel/static/engine.py @@ -401,7 +401,9 @@ def _prepare_reader(self, feed_list=[]): self._has_prepared_reader[self._mode] = True # Insert read op to forward TaskNode if 1F1B pass is setted - if self.main_program._pipeline_opt: + if self.main_program._pipeline_opt and not os.environ.get( + 'FLAGS_new_executor_micro_batching', None + ): assert "tasks" in self.main_program._pipeline_opt["fleet_opt"] fleet_opt = self.main_program._pipeline_opt["fleet_opt"] fwd_task = None @@ -471,8 +473,6 @@ def _process_fetch_group(group_name, var_list): if var_name not in fetch_names: fetch_names.append(var_name) group_indices.append(fetch_names.index(var_name)) - if not group_indices: - fetch_names.append([]) fetch_indices.append(group_indices) dist_context = self._dist_contexts[mode] diff --git a/python/paddle/distributed/auto_parallel/static/parallelizer_v2.py b/python/paddle/distributed/auto_parallel/static/parallelizer_v2.py index fd5da8193f3ab..5d3de4396e297 100644 --- a/python/paddle/distributed/auto_parallel/static/parallelizer_v2.py +++ b/python/paddle/distributed/auto_parallel/static/parallelizer_v2.py @@ -14,6 +14,7 @@ import copy import logging +import os import time from paddle.distributed.passes import PassManager, new_pass @@ -38,6 +39,14 @@ def __init__(self, mode, completer, dist_context): self._strategy = self._dist_context.strategy self._logger = get_logger(logging.INFO) + @property + def is_train(self): + return self._mode == "train" + + @property + def is_test(self): + return self._mode in ["eval", "predict"] + def parallel_all(self): world_process_group = get_world_process_group() all_ranks = world_process_group.ranks @@ -50,7 +59,7 @@ def parallel(self, rank): serial_main_program = self._dist_context.serial_main_program serial_startup_program = self._dist_context.serial_startup_program serial_optimizer = self._dist_context.serial_optimizer - if self._mode == "train" and serial_optimizer: + if self.is_train and serial_optimizer: # Generate backward serial_loss = self._dist_context.serial_loss params_grads = self._generate_backward( @@ -191,8 +200,9 @@ def parallel(self, rank): time.time() - time0, self._mode ) ) + # Clone program for test - if self._mode != 'train': + if self.is_test: pipeline_opt = dist_main_prog._pipeline_opt dist_main_prog = dist_main_prog.clone(for_test=True) dist_startup_prog = dist_startup_prog.clone(for_test=True) @@ -263,7 +273,7 @@ def _apply_pre_optimization( # apply quantization pass # The pass can be applied when mode must be 'train' - if self._mode == 'train' and self._strategy.qat.enable: + if self.is_train and self._strategy.qat.enable: config = copy.deepcopy(self._strategy.qat.to_dict()) config["dist_context"] = self._dist_context config["params_grads"] = params_grads @@ -282,7 +292,7 @@ def _apply_pre_optimization( # apply recompute pass # recompute is then train-only optimization - if self._mode == "train" and self._strategy.recompute.enable: + if self.is_train and self._strategy.recompute.enable: config = copy.deepcopy(self._strategy.recompute.to_dict()) config["dist_context"] = self._dist_context config["no_grad_set"] = None @@ -326,7 +336,7 @@ def _apply_post_optimization( ) params_grads = self._pass_context.get_attr("params_grads") - if self._mode == "train": + if self.is_train: # GradClip is train-only optimization config = copy.deepcopy(self._strategy.sharding.to_dict()) config["dist_context"] = self._dist_context @@ -349,7 +359,7 @@ def _apply_post_optimization( [main_program], [startup_program], self._pass_context ) - if self._strategy.pipeline.enable: + if self.is_train and self._strategy.pipeline.enable: self._strategy.gradient_merge.enable = True self._strategy.gradient_merge.k_steps = ( self._strategy.pipeline.accumulate_steps @@ -357,7 +367,7 @@ def _apply_post_optimization( self._strategy.gradient_merge.avg = True # gradient_merge is then train-only optimization - if self._mode == "train" and self._strategy.gradient_merge.enable: + if self.is_train and self._strategy.gradient_merge.enable: config = copy.deepcopy(self._strategy.gradient_merge.to_dict()) config["dist_context"] = self._dist_context config["params_grads"] = params_grads @@ -368,7 +378,10 @@ def _apply_post_optimization( [main_program], [startup_program], self._pass_context ) - if self._strategy.pipeline.enable: + use_new_executor = os.environ.get( + 'FLAGS_new_executor_micro_batching', None + ) + if self._strategy.pipeline.enable and not use_new_executor: config = copy.deepcopy(self._strategy.pipeline.to_dict()) config["dist_context"] = self._dist_context auto_parallel_pipeline_pass = new_pass( @@ -378,10 +391,17 @@ def _apply_post_optimization( [main_program], [startup_program], self._pass_context ) - if self._mode == "train" and self._strategy.fused_passes.enable: + if self.is_train and self._strategy.fused_passes.enable: if len(self._strategy.fused_passes.fused_passes_list) > 0: new_pass_list = [] for op in self._strategy.fused_passes.fused_passes_list: new_pass_list.append(new_pass(op)) pass_manager = PassManager(new_pass_list) pass_manager.apply([main_program], [startup_program]) + + if self._strategy.pipeline.enable and use_new_executor: + main_program._pipeline_opt = {} + main_program._pipeline_opt["standalone_exe"] = { + "schedule_mode": self._strategy.pipeline.schedule_mode, + "num_micro_batches": self._strategy.pipeline.accumulate_steps, + } diff --git a/python/paddle/distributed/passes/auto_parallel_gradient_merge.py b/python/paddle/distributed/passes/auto_parallel_gradient_merge.py index 8a87ac7f599d2..21816c34ee445 100644 --- a/python/paddle/distributed/passes/auto_parallel_gradient_merge.py +++ b/python/paddle/distributed/passes/auto_parallel_gradient_merge.py @@ -94,8 +94,13 @@ def _get_gm_cond_var(main_program, k_steps, dist_context): ) set_var_dist_attr(dist_context, step_var, [-1], world_process_group.ranks) - cond_var = main_block.create_var( - name="gradient_merge_cond", shape=[1], dtype='bool' + cond_var = paddle.static.create_global_var( + name="gradient_merge_cond", + shape=[1], + value=bool(0), + dtype='bool', + persistable=True, + force_cpu=True, ) set_var_dist_attr(dist_context, cond_var, [-1], world_process_group.ranks) diff --git a/python/paddle/distributed/passes/pipeline_scheduler_pass.py b/python/paddle/distributed/passes/pipeline_scheduler_pass.py index 8ff5f2b35e7b5..39946d6649e48 100644 --- a/python/paddle/distributed/passes/pipeline_scheduler_pass.py +++ b/python/paddle/distributed/passes/pipeline_scheduler_pass.py @@ -257,7 +257,7 @@ def _program_for_fthenb_and_1f1b(program): } -@register_pass("pipeline_fthenb_scheduler") +@register_pass("pipeline_scheduler_FThenB") class PipelineFThenBPass(PassBase): def __init__(self): super().__init__() @@ -272,12 +272,12 @@ def _create_job_list(self): job_list = [] lr_job = core.Job("lr") job_list.append(lr_job) - for i in range(self._micro_batch_size): + for i in range(self._num_micro_batches): forward_job = core.Job("forward") forward_job.set_micro_batch_id(i) job_list.append(forward_job) - for i in range(self._micro_batch_size): + for i in range(self._num_micro_batches): backward_job = core.Job("backward") backward_job.set_micro_batch_id(i) job_list.append(backward_job) @@ -287,7 +287,7 @@ def _create_job_list(self): return job_list def _apply_single_impl(self, main_program, startup_program, context): - self._micro_batch_size = self.get_attr("micro_batch_size") + self._num_micro_batches = self.get_attr("num_micro_batches") self._program = main_program _insert_sync_for_fthenb_1f1b(self._program) @@ -296,3 +296,18 @@ def _apply_single_impl(self, main_program, startup_program, context): plan = core.Plan(job_list, type_to_program) context.set_attr("plan", plan) + + +def apply_pass(main_program, startup_program, pass_name, pass_attr={}): + from paddle.distributed.passes import PassContext, new_pass + + assert pass_name in [ + "FThenB" + ], "pipeline scheduler only support FThenB, but recieve {}".format( + pass_name + ) + pipeline_pass = new_pass("pipeline_scheduler_" + pass_name, pass_attr) + pass_context = PassContext() + pipeline_pass.apply([main_program], [startup_program], pass_context) + plan = pass_context.get_attr("plan") + return plan diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index d1445e227226d..4eb6b3a5c8259 100755 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -414,6 +414,34 @@ def _add_feed_fetch_ops( return tmp_program +def _set_micro_batch_fetch(plan): + if plan.micro_batch_num() <= 1: + return + + valid_fetch_types = ["fetch", "fetch_v2"] + for job in plan.job_list(): + idx_to_col_attr = {} + prog = plan.program(job.type()) + for i in range(prog.block(0).op_size()): + op = prog.block(0).op(i) + if op.type() in valid_fetch_types: + idx_to_col_attr[i] = op.attr('col') + + for idx, col in idx_to_col_attr.items(): + job.set_col_attr_for_fetch_op( + idx, col * plan.micro_batch_num() + job.micro_batch_id() + ) + + +def _merge_tensors(tensor, micro_batch_num): + assert len(tensor) % micro_batch_num == 0 + chunk_tensor = [ + tensor[i : i + micro_batch_num] + for i in range(0, len(tensor), micro_batch_num) + ] + return [np.array(chunk) for chunk in chunk_tensor] + + def _apply_inplace_addto_pass( program, enable_inplace, enable_addto, skip_var_names ): @@ -653,8 +681,15 @@ def run(self, feed_names, return_numpy=True): """ tensors = self._new_exe.run(feed_names)._move_to_list() if return_numpy: - return as_numpy(tensors, copy=True) + tensors = as_numpy(tensors, copy=True) + if self._plan.micro_batch_num() <= 1: + return tensors + return _merge_tensors(tensors, self._plan.micro_batch_num()) else: + if self._plan.micro_batch_num() > 1: + logging.warning( + "`merge_tensor` dose not support when return_numpy is False." + ) return tensors def _create_new_executor(self): @@ -662,6 +697,24 @@ def _create_new_executor(self): return new_exe + def _check_fetch(self, fetch_list): + if fetch_list is None: + fetch_list = [] + + res = [] + for fetch_var in fetch_list: + if isinstance(fetch_var, Variable): + fetch_var = fetch_var.name + elif not isinstance(fetch_var, str): + raise TypeError( + "Required fetch_var shall be str|Variable, but received {}".format( + type(fetch_var).__name__ + ) + ) + + res.append(fetch_var) + return res + class _ExecutorCache: class _CachedData: @@ -751,6 +804,7 @@ def get_program_and_executor( def _get_program_and_executor(self, cached_data): program = cached_data.program + pipeline_opt = program._pipeline_opt inner_program = ( program._program if isinstance(program, compiler.CompiledProgram) @@ -831,12 +885,29 @@ def _get_program_and_executor(self, cached_data): _apply_inplace_addto_pass( program, enable_inplace, enable_addto, skip_var_names ) + new_program = program.clone() - new_exe = _StandaloneExecutor( - place, - core.Plan([core.Job("default")], {"default": new_program.desc}), - scope, - ) + if pipeline_opt: + from paddle.distributed.passes.pipeline_scheduler_pass import ( + apply_pass, + ) + + pass_attr = { + "num_micro_batches": pipeline_opt["standalone_exe"][ + "num_micro_batches" + ] + } + pass_name = pipeline_opt["standalone_exe"]["schedule_mode"] + plan = apply_pass(new_program, new_program, pass_name, pass_attr) + else: + default_job = core.Job("default") + default_job.set_micro_batch_id(0) + type_to_program = {"default": new_program.desc} + plan = core.Plan([default_job], type_to_program) + + _set_micro_batch_fetch(plan) + + new_exe = _StandaloneExecutor(place, plan, scope) return new_program, new_exe @@ -1408,7 +1479,14 @@ def _run_impl( fetch_list = self._check_fetch_list(fetch_list) - if isinstance(program, Program) and program._pipeline_opt: + use_new_executor = os.environ.get( + 'FLAGS_new_executor_micro_batching', None + ) + if ( + isinstance(program, Program) + and program._pipeline_opt + and not use_new_executor + ): if "fleet_opt" in program._pipeline_opt: # Move prepare here for port conflict with nccl in startup program if self._fleet_executor is None: diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index b774afd8d5666..a9ba6f91a1e25 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -5921,6 +5921,8 @@ def network(): p._appending_grad_times = self._appending_grad_times if hasattr(self, 'lr_scheduler'): p.lr_scheduler = self.lr_scheduler + if hasattr(self, '_pipeline_opt'): + p._pipeline_opt = self._pipeline_opt # NOTE(zhiqiu): we sync the cloned program, to update its program by # its desc. diff --git a/test/auto_parallel/pipeline_scheduler_FThenB.py b/test/auto_parallel/pipeline_scheduler_FThenB.py new file mode 100644 index 0000000000000..963113abdaa80 --- /dev/null +++ b/test/auto_parallel/pipeline_scheduler_FThenB.py @@ -0,0 +1,114 @@ +# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import random +import unittest + +import numpy as np +from get_gpt_model import FakeDataset, generate_model + +import paddle +from paddle.distributed import ParallelEnv +from paddle.distributed.fleet import auto +from paddle.fluid.framework import set_flags + +paddle.enable_static() + + +def apply_pass(mode="1F1B"): + strategy = auto.Strategy() + strategy.auto_mode = "semi" + strategy.reinit = True + + pipeline = strategy.pipeline + pipeline.enable = True + pipeline.schedule_mode = mode + pipeline.accumulate_steps = 2 + + return strategy + + +def reset_prog(): + paddle.fluid.framework.switch_main_program(paddle.static.Program()) + paddle.fluid.framework.switch_startup_program(paddle.static.Program()) + + +class Test1F1BPass(unittest.TestCase): + def setUp(self): + self.rtol = 1e-5 + self.atol = 1e-8 + self.batch_size = 2 + self.batch_num = 10 + self.clip_norm = 0.2 + self.dataset = FakeDataset(self.batch_size * self.batch_num) + + def init(self, engine): + paddle.seed(2021) + np.random.seed(2021) + random.seed(2021) + paddle.distributed.fleet.init(is_collective=True) + place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) + engine._executor = paddle.static.Executor(place) + + def get_engine(self, mode="1F1B"): + reset_prog() + + strategy = apply_pass(mode) + clip = paddle.nn.ClipGradByGlobalNorm(self.clip_norm) + opt = paddle.optimizer.AdamW(learning_rate=0.00001, grad_clip=clip) + model, loss = generate_model("pp") + + engine = auto.Engine(model, loss, opt, strategy=strategy) + self.init(engine) + return engine + + def check_results(self, ref_losses, check_losses): + np.testing.assert_allclose( + ref_losses, + check_losses, + rtol=self.rtol, + atol=self.atol, + err_msg='pass {} has wrong results!, \nu={}\nv={}\ndiff={}'.format( + __class__, ref_losses, check_losses, ref_losses - check_losses + ), + ) + + def test_1f1b_pass(self): + # pp2 1f1b training with fleet executor + engine_1f1b = self.get_engine("1F1B") + history_1f1b = engine_1f1b.fit( + self.dataset, 3, batch_size=self.batch_size, log_freq=1 + ) + assert engine_1f1b._strategy.pipeline.schedule_mode == "1F1B" + assert os.environ.get('FLAGS_new_executor_micro_batching') is False + + # pp2 fthenb training with standalone executor + set_flags({'FLAGS_new_executor_micro_batching': True}) + engine_fthenb = self.get_engine("FThenB") + history_fthenb = engine_fthenb.fit( + self.dataset, 3, batch_size=self.batch_size, log_freq=1 + ) + assert engine_fthenb._strategy.pipeline.schedule_mode == "FThenB" + assert os.environ.get('FLAGS_new_executor_micro_batching') is True + + # # NOTE: every sample data from dataset is all the same + # if paddle.distributed.get_rank() == 1: + # losses_pp = np.array(history_pp.history["loss"]) + # losses_1f1b = np.array(history_1f1b.history["loss"]) + # self.check_results(losses_pp, losses_1f1b) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/auto_parallel/test_pipeline_scheduler_FThenB.py b/test/auto_parallel/test_pipeline_scheduler_FThenB.py new file mode 100644 index 0000000000000..fab5ed0863d93 --- /dev/null +++ b/test/auto_parallel/test_pipeline_scheduler_FThenB.py @@ -0,0 +1,57 @@ +# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import subprocess +import sys +import tempfile +import unittest + + +class TestFThenBPass(unittest.TestCase): + def test_pp2(self): + file_dir = os.path.dirname(os.path.abspath(__file__)) + launch_model_path = os.path.join( + file_dir, "pipeline_scheduler_FThenB.py" + ) + + if os.environ.get("WITH_COVERAGE", "OFF") == "ON": + coverage_args = ["-m", "coverage", "run", "--branch", "-p"] + else: + coverage_args = [] + + tmp_dir = tempfile.TemporaryDirectory() + cmd = ( + [sys.executable, "-u"] + + coverage_args + + [ + "-m", + "paddle.distributed.launch", + "--devices", + "0,1", + "--log_dir", + tmp_dir.name, + launch_model_path, + ] + ) + + process = subprocess.Popen(cmd) + process.wait() + self.assertEqual(process.returncode, 0) + + tmp_dir.cleanup() + + +if __name__ == "__main__": + unittest.main() From 9aad3d62a600b0d611963c332e1b79bf081a8d1d Mon Sep 17 00:00:00 2001 From: zhaoyingli Date: Mon, 19 Jun 2023 11:14:01 +0800 Subject: [PATCH 2/9] rm check_fetch --- python/paddle/fluid/executor.py | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 4eb6b3a5c8259..a8da27d128e20 100755 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -697,24 +697,6 @@ def _create_new_executor(self): return new_exe - def _check_fetch(self, fetch_list): - if fetch_list is None: - fetch_list = [] - - res = [] - for fetch_var in fetch_list: - if isinstance(fetch_var, Variable): - fetch_var = fetch_var.name - elif not isinstance(fetch_var, str): - raise TypeError( - "Required fetch_var shall be str|Variable, but received {}".format( - type(fetch_var).__name__ - ) - ) - - res.append(fetch_var) - return res - class _ExecutorCache: class _CachedData: From 51feb465a9391d00b82c1e1f17ecabbc658e1e2f Mon Sep 17 00:00:00 2001 From: zhaoyingli Date: Mon, 19 Jun 2023 14:40:04 +0800 Subject: [PATCH 3/9] update cmakelist and flags env --- .../auto_parallel/static/engine.py | 14 +++++-- .../auto_parallel/static/parallelizer_v2.py | 9 ++++- python/paddle/fluid/executor.py | 11 +++++- test/auto_parallel/CMakeLists.txt | 4 ++ .../pipeline_scheduler_FThenB.py | 38 ++++++++++--------- 5 files changed, 53 insertions(+), 23 deletions(-) diff --git a/python/paddle/distributed/auto_parallel/static/engine.py b/python/paddle/distributed/auto_parallel/static/engine.py index dc8ac1a90e999..85502083c9ba8 100644 --- a/python/paddle/distributed/auto_parallel/static/engine.py +++ b/python/paddle/distributed/auto_parallel/static/engine.py @@ -400,10 +400,18 @@ def _prepare_reader(self, feed_list=[]): dist_main_block._sync_with_cpp() self._has_prepared_reader[self._mode] = True - # Insert read op to forward TaskNode if 1F1B pass is setted - if self.main_program._pipeline_opt and not os.environ.get( + new_executor_micro_batching = os.environ.get( 'FLAGS_new_executor_micro_batching', None - ): + ) + use_new_executor = new_executor_micro_batching in [ + 1, + '1', + True, + 'True', + 'true', + ] + # Insert read op to forward TaskNode for fleet executor if 1F1B pass is setted + if self.main_program._pipeline_opt and not use_new_executor: assert "tasks" in self.main_program._pipeline_opt["fleet_opt"] fleet_opt = self.main_program._pipeline_opt["fleet_opt"] fwd_task = None diff --git a/python/paddle/distributed/auto_parallel/static/parallelizer_v2.py b/python/paddle/distributed/auto_parallel/static/parallelizer_v2.py index 5d3de4396e297..d6bf38c37a289 100644 --- a/python/paddle/distributed/auto_parallel/static/parallelizer_v2.py +++ b/python/paddle/distributed/auto_parallel/static/parallelizer_v2.py @@ -378,9 +378,16 @@ def _apply_post_optimization( [main_program], [startup_program], self._pass_context ) - use_new_executor = os.environ.get( + new_executor_micro_batching = os.environ.get( 'FLAGS_new_executor_micro_batching', None ) + use_new_executor = new_executor_micro_batching in [ + 1, + '1', + True, + 'True', + 'true', + ] if self._strategy.pipeline.enable and not use_new_executor: config = copy.deepcopy(self._strategy.pipeline.to_dict()) config["dist_context"] = self._dist_context diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index a8da27d128e20..743a9aeba6cb6 100755 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -26,7 +26,7 @@ from . import core from . import unique_name from . import compiler -from . import set_flags +from . import set_flags, get_flags from .trainer_factory import TrainerFactory from .trainer_factory import FetchHandlerMonitor import copy @@ -1461,9 +1461,16 @@ def _run_impl( fetch_list = self._check_fetch_list(fetch_list) - use_new_executor = os.environ.get( + new_executor_micro_batching = os.environ.get( 'FLAGS_new_executor_micro_batching', None ) + use_new_executor = new_executor_micro_batching in [ + 1, + '1', + True, + 'True', + 'true', + ] if ( isinstance(program, Program) and program._pipeline_opt diff --git a/test/auto_parallel/CMakeLists.txt b/test/auto_parallel/CMakeLists.txt index d7c86c4a01af7..7f4dbc9baa997 100644 --- a/test/auto_parallel/CMakeLists.txt +++ b/test/auto_parallel/CMakeLists.txt @@ -68,6 +68,10 @@ if(WITH_DISTRIBUTE AND WITH_GPU) py_test_modules(test_auto_tuner MODULES test_auto_tuner) set_tests_properties(test_auto_tuner PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 100) + py_test_modules(test_pipeline_scheduler_FThenB MODULES + test_pipeline_scheduler_FThenB) + set_tests_properties(test_pipeline_scheduler_FThenB + PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 50) # End of unittests WITH multi cards and timeout # NOTE(zyl): unittests WITH multi cards and WITHOUT timeout diff --git a/test/auto_parallel/pipeline_scheduler_FThenB.py b/test/auto_parallel/pipeline_scheduler_FThenB.py index 963113abdaa80..03b09e6c14e3c 100644 --- a/test/auto_parallel/pipeline_scheduler_FThenB.py +++ b/test/auto_parallel/pipeline_scheduler_FThenB.py @@ -22,19 +22,18 @@ import paddle from paddle.distributed import ParallelEnv from paddle.distributed.fleet import auto -from paddle.fluid.framework import set_flags paddle.enable_static() -def apply_pass(mode="1F1B"): +def apply_pass(use_standalone_exe=False): strategy = auto.Strategy() strategy.auto_mode = "semi" strategy.reinit = True pipeline = strategy.pipeline pipeline.enable = True - pipeline.schedule_mode = mode + pipeline.schedule_mode = "1F1B" if not use_standalone_exe else "FThenB" pipeline.accumulate_steps = 2 return strategy @@ -62,10 +61,10 @@ def init(self, engine): place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) engine._executor = paddle.static.Executor(place) - def get_engine(self, mode="1F1B"): + def get_engine(self, use_standalone_exe=False): reset_prog() - strategy = apply_pass(mode) + strategy = apply_pass(use_standalone_exe) clip = paddle.nn.ClipGradByGlobalNorm(self.clip_norm) opt = paddle.optimizer.AdamW(learning_rate=0.00001, grad_clip=clip) model, loss = generate_model("pp") @@ -85,29 +84,34 @@ def check_results(self, ref_losses, check_losses): ), ) - def test_1f1b_pass(self): + def test_pp_pass(self): # pp2 1f1b training with fleet executor - engine_1f1b = self.get_engine("1F1B") + os.environ['FLAGS_new_executor_micro_batching'] = 'False' + engine_1f1b = self.get_engine(use_standalone_exe=False) history_1f1b = engine_1f1b.fit( self.dataset, 3, batch_size=self.batch_size, log_freq=1 ) assert engine_1f1b._strategy.pipeline.schedule_mode == "1F1B" - assert os.environ.get('FLAGS_new_executor_micro_batching') is False + assert os.environ.get('FLAGS_new_executor_micro_batching') == "False" # pp2 fthenb training with standalone executor - set_flags({'FLAGS_new_executor_micro_batching': True}) - engine_fthenb = self.get_engine("FThenB") + os.environ['FLAGS_new_executor_micro_batching'] = 'True' + engine_fthenb = self.get_engine(use_standalone_exe=True) history_fthenb = engine_fthenb.fit( self.dataset, 3, batch_size=self.batch_size, log_freq=1 ) assert engine_fthenb._strategy.pipeline.schedule_mode == "FThenB" - assert os.environ.get('FLAGS_new_executor_micro_batching') is True - - # # NOTE: every sample data from dataset is all the same - # if paddle.distributed.get_rank() == 1: - # losses_pp = np.array(history_pp.history["loss"]) - # losses_1f1b = np.array(history_1f1b.history["loss"]) - # self.check_results(losses_pp, losses_1f1b) + assert os.environ.get('FLAGS_new_executor_micro_batching') == "True" + + # NOTE: every sample data from dataset is all the same + if paddle.distributed.get_rank() == 1: + losses_1f1b = np.array(history_1f1b.history["loss"]) + losses_fthenb = np.array(history_fthenb.history["loss"]) + # accumulate_steps is 2 + assert len(losses_fthenb[0] == 2) + # losses_1f1b is the last loss of accumulate_steps + # losses_fthenb is all the losses of accumulate_steps + self.check_results(losses_1f1b[0], losses_fthenb[0][-1]) if __name__ == "__main__": From 69774d9e084a08646cc1cda4910842e2a3ddc539 Mon Sep 17 00:00:00 2001 From: zhaoyingli Date: Mon, 19 Jun 2023 14:51:23 +0800 Subject: [PATCH 4/9] rm set micro batch id --- python/paddle/fluid/executor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 743a9aeba6cb6..7803a026b69b2 100755 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -883,7 +883,6 @@ def _get_program_and_executor(self, cached_data): plan = apply_pass(new_program, new_program, pass_name, pass_attr) else: default_job = core.Job("default") - default_job.set_micro_batch_id(0) type_to_program = {"default": new_program.desc} plan = core.Plan([default_job], type_to_program) From 1db5ae6ab3aaa4a1dbb1b01fad55f65c254dc3a0 Mon Sep 17 00:00:00 2001 From: zhaoyingli Date: Mon, 19 Jun 2023 14:52:22 +0800 Subject: [PATCH 5/9] rm import --- python/paddle/fluid/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 7803a026b69b2..ce1cb110a00b0 100755 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -26,7 +26,7 @@ from . import core from . import unique_name from . import compiler -from . import set_flags, get_flags +from . import set_flags from .trainer_factory import TrainerFactory from .trainer_factory import FetchHandlerMonitor import copy From 4420680458ac799736a48ae3aaf534498078b7d3 Mon Sep 17 00:00:00 2001 From: zhaoyingli Date: Mon, 19 Jun 2023 20:57:48 +0800 Subject: [PATCH 6/9] update utils func --- .../framework/new_executor/interpretercore.cc | 4 ---- .../auto_parallel/static/engine.py | 15 ++++--------- .../auto_parallel/static/parallelizer_v2.py | 19 ++++------------- .../distributed/auto_parallel/static/utils.py | 13 ++++++++++++ .../passes/pipeline_scheduler_pass.py | 4 +--- python/paddle/fluid/executor.py | 21 +++++++------------ .../pipeline_scheduler_FThenB.py | 2 +- 7 files changed, 30 insertions(+), 48 deletions(-) diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index ddfb9d3aa3dfd..0bb1113e0d8a5 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -52,10 +52,6 @@ PADDLE_DEFINE_EXPORTED_bool(new_executor_use_local_scope, true, "Use local_scope in new executor(especially used " "in UT), can turn off for better performance"); -PADDLE_DEFINE_EXPORTED_bool( - new_executor_micro_batching, - false, - "Enable micro_batching schedule for standalone executor , used for debug."); PHI_DECLARE_bool(check_nan_inf); DECLARE_bool(benchmark); diff --git a/python/paddle/distributed/auto_parallel/static/engine.py b/python/paddle/distributed/auto_parallel/static/engine.py index 85502083c9ba8..96c289235906d 100644 --- a/python/paddle/distributed/auto_parallel/static/engine.py +++ b/python/paddle/distributed/auto_parallel/static/engine.py @@ -400,18 +400,11 @@ def _prepare_reader(self, feed_list=[]): dist_main_block._sync_with_cpp() self._has_prepared_reader[self._mode] = True - new_executor_micro_batching = os.environ.get( - 'FLAGS_new_executor_micro_batching', None - ) - use_new_executor = new_executor_micro_batching in [ - 1, - '1', - True, - 'True', - 'true', - ] # Insert read op to forward TaskNode for fleet executor if 1F1B pass is setted - if self.main_program._pipeline_opt and not use_new_executor: + if ( + self.main_program._pipeline_opt + and not auto_utils.use_new_executor() + ): assert "tasks" in self.main_program._pipeline_opt["fleet_opt"] fleet_opt = self.main_program._pipeline_opt["fleet_opt"] fwd_task = None diff --git a/python/paddle/distributed/auto_parallel/static/parallelizer_v2.py b/python/paddle/distributed/auto_parallel/static/parallelizer_v2.py index d6bf38c37a289..2cbed1ee39819 100644 --- a/python/paddle/distributed/auto_parallel/static/parallelizer_v2.py +++ b/python/paddle/distributed/auto_parallel/static/parallelizer_v2.py @@ -14,7 +14,6 @@ import copy import logging -import os import time from paddle.distributed.passes import PassManager, new_pass @@ -26,7 +25,7 @@ from .partitioner import Partitioner from .process_group import get_world_process_group from .reshard import Resharder -from .utils import set_grad_var_shape +from .utils import set_grad_var_shape, use_new_executor class Parallelizer: @@ -378,17 +377,7 @@ def _apply_post_optimization( [main_program], [startup_program], self._pass_context ) - new_executor_micro_batching = os.environ.get( - 'FLAGS_new_executor_micro_batching', None - ) - use_new_executor = new_executor_micro_batching in [ - 1, - '1', - True, - 'True', - 'true', - ] - if self._strategy.pipeline.enable and not use_new_executor: + if self._strategy.pipeline.enable and not use_new_executor(): config = copy.deepcopy(self._strategy.pipeline.to_dict()) config["dist_context"] = self._dist_context auto_parallel_pipeline_pass = new_pass( @@ -406,9 +395,9 @@ def _apply_post_optimization( pass_manager = PassManager(new_pass_list) pass_manager.apply([main_program], [startup_program]) - if self._strategy.pipeline.enable and use_new_executor: + if self._strategy.pipeline.enable and use_new_executor(): main_program._pipeline_opt = {} - main_program._pipeline_opt["standalone_exe"] = { + main_program._pipeline_opt["standalone_opt"] = { "schedule_mode": self._strategy.pipeline.schedule_mode, "num_micro_batches": self._strategy.pipeline.accumulate_steps, } diff --git a/python/paddle/distributed/auto_parallel/static/utils.py b/python/paddle/distributed/auto_parallel/static/utils.py index 937fc89ee441c..cfd5e9b844c16 100644 --- a/python/paddle/distributed/auto_parallel/static/utils.py +++ b/python/paddle/distributed/auto_parallel/static/utils.py @@ -2367,3 +2367,16 @@ def __impl__(*args, **kwargs): dygraph_guard = wrap_decorator(_dygraph_guard_) + + +def use_new_executor(): + new_executor_micro_batching = os.environ.get( + 'FLAGS_new_executor_micro_batching', None + ) + return new_executor_micro_batching in [ + 1, + '1', + True, + 'True', + 'true', + ] diff --git a/python/paddle/distributed/passes/pipeline_scheduler_pass.py b/python/paddle/distributed/passes/pipeline_scheduler_pass.py index 39946d6649e48..3d63c14dde65c 100644 --- a/python/paddle/distributed/passes/pipeline_scheduler_pass.py +++ b/python/paddle/distributed/passes/pipeline_scheduler_pass.py @@ -22,7 +22,7 @@ from paddle.fluid import core from paddle.fluid.framework import Parameter, Program -from .pass_base import PassBase, register_pass +from .pass_base import PassBase, PassContext, new_pass, register_pass __not_shape_var_type__ = [ core.VarDesc.VarType.READER, @@ -299,8 +299,6 @@ def _apply_single_impl(self, main_program, startup_program, context): def apply_pass(main_program, startup_program, pass_name, pass_attr={}): - from paddle.distributed.passes import PassContext, new_pass - assert pass_name in [ "FThenB" ], "pipeline scheduler only support FThenB, but recieve {}".format( diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index ce1cb110a00b0..3e8b0bb381466 100755 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -874,12 +874,11 @@ def _get_program_and_executor(self, cached_data): apply_pass, ) + standalone_opt = pipeline_opt["standalone_opt"] + pass_name = standalone_opt["schedule_mode"] pass_attr = { - "num_micro_batches": pipeline_opt["standalone_exe"][ - "num_micro_batches" - ] + "num_micro_batches": standalone_opt["num_micro_batches"] } - pass_name = pipeline_opt["standalone_exe"]["schedule_mode"] plan = apply_pass(new_program, new_program, pass_name, pass_attr) else: default_job = core.Job("default") @@ -1460,20 +1459,14 @@ def _run_impl( fetch_list = self._check_fetch_list(fetch_list) - new_executor_micro_batching = os.environ.get( - 'FLAGS_new_executor_micro_batching', None + from paddle.distributed.auto_parallel.static.utils import ( + use_new_executor, ) - use_new_executor = new_executor_micro_batching in [ - 1, - '1', - True, - 'True', - 'true', - ] + if ( isinstance(program, Program) and program._pipeline_opt - and not use_new_executor + and not use_new_executor() ): if "fleet_opt" in program._pipeline_opt: # Move prepare here for port conflict with nccl in startup program diff --git a/test/auto_parallel/pipeline_scheduler_FThenB.py b/test/auto_parallel/pipeline_scheduler_FThenB.py index 03b09e6c14e3c..b02ed3a4739d0 100644 --- a/test/auto_parallel/pipeline_scheduler_FThenB.py +++ b/test/auto_parallel/pipeline_scheduler_FThenB.py @@ -108,7 +108,7 @@ def test_pp_pass(self): losses_1f1b = np.array(history_1f1b.history["loss"]) losses_fthenb = np.array(history_fthenb.history["loss"]) # accumulate_steps is 2 - assert len(losses_fthenb[0] == 2) + assert losses_fthenb[0].shape[0] == 2 # losses_1f1b is the last loss of accumulate_steps # losses_fthenb is all the losses of accumulate_steps self.check_results(losses_1f1b[0], losses_fthenb[0][-1]) From 9f3fe899b0195321aea7d006f65e3b904528ad95 Mon Sep 17 00:00:00 2001 From: zhaoyingli Date: Mon, 19 Jun 2023 21:03:08 +0800 Subject: [PATCH 7/9] raise error when merge tensor for return_numpy is False --- python/paddle/fluid/executor.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 3e8b0bb381466..6fc29bccbc119 100755 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -682,13 +682,11 @@ def run(self, feed_names, return_numpy=True): tensors = self._new_exe.run(feed_names)._move_to_list() if return_numpy: tensors = as_numpy(tensors, copy=True) - if self._plan.micro_batch_num() <= 1: - return tensors return _merge_tensors(tensors, self._plan.micro_batch_num()) else: if self._plan.micro_batch_num() > 1: - logging.warning( - "`merge_tensor` dose not support when return_numpy is False." + raise RuntimeError( + "`merge_tensor` does not support when return_numpy is False." ) return tensors From c23f655a819181a79d823f67eac232e3392cd929 Mon Sep 17 00:00:00 2001 From: zhaoyingli Date: Tue, 20 Jun 2023 15:06:22 +0800 Subject: [PATCH 8/9] fix _pipeline_opt --- python/paddle/fluid/executor.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 6fc29bccbc119..5123c9b4ed102 100755 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -434,6 +434,8 @@ def _set_micro_batch_fetch(plan): def _merge_tensors(tensor, micro_batch_num): + if micro_batch_num <= 1: + return tensor assert len(tensor) % micro_batch_num == 0 chunk_tensor = [ tensor[i : i + micro_batch_num] @@ -784,7 +786,6 @@ def get_program_and_executor( def _get_program_and_executor(self, cached_data): program = cached_data.program - pipeline_opt = program._pipeline_opt inner_program = ( program._program if isinstance(program, compiler.CompiledProgram) @@ -867,12 +868,15 @@ def _get_program_and_executor(self, cached_data): ) new_program = program.clone() - if pipeline_opt: + if ( + new_program._pipeline_opt + and "standalone_opt" in new_program._pipeline_opt + ): from paddle.distributed.passes.pipeline_scheduler_pass import ( apply_pass, ) - standalone_opt = pipeline_opt["standalone_opt"] + standalone_opt = new_program._pipeline_opt["standalone_opt"] pass_name = standalone_opt["schedule_mode"] pass_attr = { "num_micro_batches": standalone_opt["num_micro_batches"] From da00855ee63724ce742427bdae125b8e4175f5fc Mon Sep 17 00:00:00 2001 From: zhaoyingli Date: Tue, 20 Jun 2023 19:33:41 +0800 Subject: [PATCH 9/9] fix unittest --- .../test_standalone_executor_fthenb_plan.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/standalone_executor/test_standalone_executor_fthenb_plan.py b/test/standalone_executor/test_standalone_executor_fthenb_plan.py index 6912dc0e609f8..76557231b83e4 100644 --- a/test/standalone_executor/test_standalone_executor_fthenb_plan.py +++ b/test/standalone_executor/test_standalone_executor_fthenb_plan.py @@ -21,13 +21,13 @@ class TestStandaloneExecutorFThenBPlan(unittest.TestCase): def test_standalone_executor_fthenb_plan(self): config = {} - config["micro_batch_size"] = 4 + config["num_micro_batches"] = 4 pass_context = PassContext() startup_program = static.Program() main_program = static.Program() - pipeline_fthenb_pass = new_pass("pipeline_fthenb_scheduler", config) + pipeline_fthenb_pass = new_pass("pipeline_scheduler_FThenB", config) pipeline_fthenb_pass.apply( [main_program], [startup_program], pass_context )