Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

auto parallel support pipeline scheduler with standalone executor #54727

Merged
merged 11 commits into from
Jun 25, 2023

Conversation

zhaoyinglia
Copy link
Contributor

@zhaoyinglia zhaoyinglia commented Jun 19, 2023

PR types

Others

PR changes

Others

Description

Pcard-70448

  • 新增new_executor_micro_batching flag 来控制在流水线并行时,是使用standalone executor还是fleet executor。仅用于过渡时期debug,后续迁移完成会删除。
  • 新增在使用使用流水线调度后,将每个micro_batch返回结果合并的功能,目前仅支持 return_numpy 为True的场景,后续会支持为False的场景。
  • 对齐 pp2 + gpt 场景下 fleet executor 与 standalone executor 执行后结果。

@paddle-bot
Copy link

paddle-bot bot commented Jun 19, 2023

你的PR提交成功,感谢你对开源项目的贡献!
请关注后续CI自动化测试结果,详情请参考Paddle-CI手册
Your PR has been submitted. Thanks for your contribution!
Please wait for the result of CI firstly. See Paddle CI Manual for details.

@@ -52,6 +52,10 @@ PADDLE_DEFINE_EXPORTED_bool(new_executor_use_local_scope,
true,
"Use local_scope in new executor(especially used "
"in UT), can turn off for better performance");
PADDLE_DEFINE_EXPORTED_bool(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

用于控制python代码的环境变量,不需要在C++端声明。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -368,7 +378,17 @@ def _apply_post_optimization(
[main_program], [startup_program], self._pass_context
)

if self._strategy.pipeline.enable:
new_executor_micro_batching = os.environ.get(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里判断是否使用新执行器的代码在engine.py里也有类似的,两处代码是否可以合成一处?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


if self._strategy.pipeline.enable and use_new_executor:
main_program._pipeline_opt = {}
main_program._pipeline_opt["standalone_exe"] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_pipeline_opt用standalone_exe做key容易造成误解,建议可以改成standalone_opt或其它更合适的名字

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done。修改成了 standalone_opt



def apply_pass(main_program, startup_program, pass_name, pass_attr={}):
from paddle.distributed.passes import PassContext, new_pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

按照python编码规范,导入语句必须在文件顶部, 位于模块的注释和文档字符串之后、全局变量和全局常量之前。不建议在函数内部做导入。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -653,8 +681,15 @@ def run(self, feed_names, return_numpy=True):
"""
tensors = self._new_exe.run(feed_names)._move_to_list()
if return_numpy:
return as_numpy(tensors, copy=True)
tensors = as_numpy(tensors, copy=True)
if self._plan.micro_batch_num() <= 1:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_merge_tensors是否可以处理micro_batch_num=1的情况?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

支持。已合并。

else:
if self._plan.micro_batch_num() > 1:
logging.warning(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里建议直接抛错误

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

scope,
)
if pipeline_opt:
from paddle.distributed.passes.pipeline_scheduler_pass import (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不建议在函数内部做导入。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不在此处做导入会发生循环引用的问题。

@@ -1408,7 +1460,21 @@ def _run_impl(

fetch_list = self._check_fetch_list(fetch_list)

if isinstance(program, Program) and program._pipeline_opt:
new_executor_micro_batching = os.environ.get(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议将FLAGS开关判断代码统一到一处

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Copy link
Contributor

@XieYunshen XieYunshen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for set_tests_properties(test_pipeline_scheduler_FThenB PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 50)

Copy link
Contributor

@zhiqiu zhiqiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@From00 From00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@From00 From00 merged commit a702e17 into PaddlePaddle:develop Jun 25, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants