Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fleet_executor] run time graph on python side #38164

Merged
merged 22 commits into from
Dec 17, 2021
17 changes: 17 additions & 0 deletions paddle/fluid/distributed/fleet_executor/fleet_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
#include "paddle/fluid/distributed/fleet_executor/message_bus.h"
#include "paddle/fluid/distributed/fleet_executor/runtime_graph.h"
#include "paddle/fluid/distributed/fleet_executor/task_node.h"
#include "paddle/fluid/framework/executor_gc_helper.h"
#include "paddle/fluid/framework/op_desc.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
Expand All @@ -38,16 +41,30 @@ void FleetExecutor::Init(
const platform::Place& place, const std::vector<TaskNode*>& task_nodes,
const std::unordered_map<int64_t, int64_t>& task_id_to_rank) {
if (task_nodes.size() == 0) {
LOG(INFO) << "fleet executor will use c++ side scheduler construction.";
runtime_graph_ = std::make_shared<RuntimeGraph>(program_desc, exe_desc_);
} else {
LOG(INFO) << "fleet executor has been set dependency on python side.";
// TODO(fleet_exe devs): the unused_vars should be got from run time graph
std::vector<std::unique_ptr<framework::OperatorBase>> ops;
for (auto task_node : task_nodes) {
for (auto op : task_node->ops()) {
ops.emplace_back(std::unique_ptr<framework::OperatorBase>(op));
}
}
auto unused_vars = framework::GetUnusedVars(program_desc.Block(0), ops, {});
runtime_graph_ = std::make_shared<RuntimeGraph>();
std::unordered_map<int64_t, TaskNode*> interceptor_id_to_task;
for (auto task_node : task_nodes) {
task_node->SetUnusedVars(unused_vars);
int64_t interceptor_id = task_node->task_id();
interceptor_id_to_task.emplace(interceptor_id, task_node);
}
runtime_graph_->SetInterceptorIdToRank(task_id_to_rank);
runtime_graph_->SetInterceptorIdToNode(interceptor_id_to_task);
for (auto& unique_op : ops) {
unique_op.release();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🌚

}
}
root_scope_ = scope;
place_ = place;
Expand Down
24 changes: 23 additions & 1 deletion paddle/fluid/distributed/fleet_executor/task_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "paddle/fluid/distributed/fleet_executor/task_node.h"
#include "paddle/fluid/framework/op_desc.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"

Expand All @@ -39,7 +40,28 @@ TaskNode::TaskNode(const framework::ProgramDesc& program, int64_t rank,
}
}

TaskNode::TaskNode(int32_t role, const std::vector<OperatorBase*>& ops,
TaskNode::TaskNode(int32_t role,
const std::vector<framework::OpDesc*>& op_descs,
int64_t rank, int64_t task_id, int64_t max_run_times,
int64_t max_slot_nums)
: role_(role),
rank_(rank),
task_id_(task_id),
max_run_times_(max_run_times),
max_slot_nums_(max_slot_nums) {
if (op_descs.empty()) {
return;
}
for (const auto& desc : op_descs) {
ops_vec_.emplace_back(framework::OpRegistry::CreateOp(*desc));
}
for (const auto& op : ops_vec_) {
ops_.emplace_back(op.get());
}
}

TaskNode::TaskNode(int32_t role,
const std::vector<framework::OperatorBase*>& ops,
int64_t rank, int64_t task_id, int64_t max_run_times,
int64_t max_slot_nums)
: ops_(ops),
Expand Down
9 changes: 7 additions & 2 deletions paddle/fluid/distributed/fleet_executor/task_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
namespace paddle {
namespace framework {
class OperatorBase;
class OpDesc;
}
namespace distributed {

Expand All @@ -33,8 +34,12 @@ class TaskNode final {
using OperatorBase = paddle::framework::OperatorBase;
TaskNode(int32_t role, int64_t rank, int64_t task_id, int64_t max_run_times,
int64_t max_slot_nums);
TaskNode(int32_t role, const std::vector<OperatorBase*>& ops, int64_t rank,
int64_t task_id, int64_t max_run_times, int64_t max_slot_nums);
TaskNode(int32_t role, const std::vector<framework::OpDesc*>& op_descs,
int64_t rank, int64_t task_id, int64_t max_run_times,
int64_t max_slot_nums);
TaskNode(int32_t role, const std::vector<framework::OperatorBase*>& ops,
int64_t rank, int64_t task_id, int64_t max_run_times,
int64_t max_slot_nums);
TaskNode(const paddle::framework::ProgramDesc& program, int64_t rank,
int64_t max_run_times, int64_t max_slot_nums);
~TaskNode() = default;
Expand Down
9 changes: 8 additions & 1 deletion paddle/fluid/pybind/bind_fleet_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace pybind {

using paddle::distributed::FleetExecutor;
using paddle::distributed::TaskNode;
using paddle::framework::OpDesc;

void BindFleetExecutor(py::module* m) {
py::class_<FleetExecutor>(*m, "FleetExecutor")
Expand All @@ -38,9 +39,15 @@ void BindFleetExecutor(py::module* m) {

py::class_<TaskNode>(*m, "TaskNode")
.def(py::init<const framework::ProgramDesc&, int64_t, int64_t, int64_t>())
.def(py::init<int32_t, const std::vector<framework::OpDesc*>&, int64_t,
int64_t, int64_t, int64_t>())
.def("task_id", &TaskNode::task_id)
.def("add_upstream_task", &TaskNode::AddUpstreamTask)
.def("add_downstream_task", &TaskNode::AddDownstreamTask);
.def("add_downstream_task", &TaskNode::AddDownstreamTask)
.def("set_run_pre_steps", &TaskNode::SetRunPerSteps)
.def("set_run_at_offset", &TaskNode::SetRunAtOffset)
.def("set_type", &TaskNode::SetType)
.def("role", &TaskNode::role);
}
} // namespace pybind
} // namespace paddle
203 changes: 203 additions & 0 deletions python/paddle/distributed/fleet/fleet_executor_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from paddle.distributed.fleet.meta_optimizers.common import OpRole, OP_ROLE_KEY
from paddle.fluid import core


class CoordSys:
"""
This class is used to mapping rank to (mp rank, sharding rank, pp rank, dp rank).
"""

def __init__(self, dist_opt):
self.dp_degree = dist_opt.get('dp_degree', 1)
self.pp_degree = dist_opt.get('pp_degree', 1)
self.sharding_degree = dist_opt.get('sharding_degree', 1)
self.mp_degree = dist_opt.get('mp_degree', 1)

def _invalide_coord(self, coord):
"""
Test the input coord is valid or not.
:param coord: The coord to be tested
:return: False if valid, True if invalid.
"""
return coord['mp_idx'] < 0 or coord['mp_idx'] >= self.mp_degree or \
coord['sharding_idx'] < 0 or coord['sharding_idx'] >= self.sharding_degree or \
coord['pp_idx'] < 0 or coord['pp_idx'] >= self.pp_degree or \
coord['dp_idx'] < 0 or coord['dp_idx'] >= self.dp_degree

def coord_to_rank(self, coord):
"""
Map the input coord to it's corresponding rank.
:param coord: The coord to be converted
:return: The rank corresponding with the coord
"""
if self._invalide_coord(coord):
return -1
return int(coord['dp_idx'] * self.pp_degree * self.sharding_degree * self.mp_degree + \
coord['pp_idx'] * self.sharding_degree * self.mp_degree + \
coord['sharding_idx'] * self.mp_degree + coord['mp_idx'])

def rank_to_coord(self, rank):
"""
Map the input rank to it's corresponding coord
:param rank: The rank to be converted
:return: The coord corresponding with the rank
"""
mp_idx = rank % self.mp_degree
rank //= self.mp_degree
sharding_idx = rank % self.sharding_degree
rank //= self.sharding_degree
pp_idx = rank % self.pp_degree
rank //= self.pp_degree
dp_idx = rank % self.dp_degree
return {
'mp_idx': int(mp_idx),
'sharding_idx': int(sharding_idx),
'pp_idx': int(pp_idx),
'dp_idx': int(dp_idx)
}


def is_optimizer_op(op_role):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这几个在其它文件应该有,或许可以放一起

return op_role == int(OpRole.Optimize)


def is_lr_sched_op(op_role):
return op_role == int(OpRole.Optimize.LRSched)


def is_forward_op(op_role):
return (op_role == int(OpRole.Forward)) or \
(op_role == (int(OpRole.Forward) ^ int(OpRole.Loss)))


def is_backward_op(op_role):
return (op_role == int(OpRole.Backward)) or \
(op_role == (int(OpRole.Backward) ^ int(OpRole.Loss)))


def one_f_one_b(program, cur_rank, max_run_times, dist_opt, nrank):
"""
Split the program to support 1f1b pipeline scheduler.
This funct will split the program based on the op_role.
The program will be split into four parts: lr_sched, fwd, bwd, opt.
And will create task nodes based on the four parts of the program.
:param program: The origin program.
:param cur_rank: Current rank (can be got from fleet.worker_index()).
:param max_run_times: Max run times for a micro batch. AKA number of micro steps.
:param dist_opt: The fleet_opt configured by user.
:param nrank: Number of workers (can be got from fleet.worker_num()).
:return:
task_nodes (list): four task nodes for current rank
task_id_to_rank (dict): task nodes' ids to it's corresponding rank
"""
print("fleet executor will use python side 1f1b scheduler.")
coord_sys = CoordSys(dist_opt)
coord = coord_sys.rank_to_coord(cur_rank)
max_slot_times = int(max_run_times - coord['pp_idx'])
num_of_functionality = 4

def create_task_node(role, ops, offset, node_type):
task_id = int(cur_rank * num_of_functionality + offset)
print("Creating task node with role:", role, "and with id:", task_id)
node = core.TaskNode(role, ops, cur_rank, task_id, max_run_times,
max_slot_times)
node.set_type(node_type)
return node

lr_ops, fwd_ops, bwd_ops, opt_ops = [], [], [], []
for op in program.block(0).ops:
# split the program based on the op_role
op_role = int(op.all_attrs()[OP_ROLE_KEY])
if is_lr_sched_op(op_role):
lr_ops.append(op.desc)
elif is_optimizer_op(op_role):
opt_ops.append(op.desc)
elif is_forward_op(op_role):
fwd_ops.append(op.desc)
elif is_backward_op(op_role):
bwd_ops.append(op.desc)
else:
raise "The op role: " + str(
op_role
) + " isn't one of LRSched, Forward, Backward or Optimizer."

# Create task nodes.
# The lr_sched and opt should be 'amplifier interceptor.
# The fwd and bwd should be 'compute interceptor'.
lr_task_node = create_task_node(
int(OpRole.Optimize.LRSched), lr_ops, 0, "Amplifier")
lr_task_node.set_run_pre_steps(max_run_times)
fwd_task_node = create_task_node(int(OpRole.Forward), fwd_ops, 1, "Compute")
bwd_task_node = create_task_node(
int(OpRole.Backward), bwd_ops, 2, "Compute")
opt_task_node = create_task_node(
int(OpRole.Optimize), opt_ops, 3, "Amplifier")
opt_task_node.set_run_pre_steps(max_run_times)
opt_task_node.set_run_at_offset(max_run_times - 1)
task_nodes = [lr_task_node, fwd_task_node, bwd_task_node, opt_task_node]

# Generated the dependency based on this graph:
# lr(1:m) -> forward -> backward -> (m:1)optimize
# ↑ ↓
# lr(1:m) -> forward -> backward -> (m:1)optimize
# ↑ ↓
# lr(1:m) -> forward -> backward -> (m:1)optimize
upstream_coord, downstream_coord = coord.copy(), coord.copy()
upstream_coord['pp_idx'] = upstream_coord['pp_idx'] - 1
downstream_coord['pp_idx'] = downstream_coord['pp_idx'] + 1
pp_upstream = coord_sys.coord_to_rank(upstream_coord)
pp_downstream = coord_sys.coord_to_rank(downstream_coord)
first_stage = (pp_upstream == -1)
last_stage = (pp_downstream == -1)
for i in range(num_of_functionality):
task_node = task_nodes[i]
task_role = task_node.role()
cur_id = int(cur_rank * num_of_functionality + i)
prev_id = cur_id - 1
next_id = cur_id + 1
upstream_id = int(pp_upstream * num_of_functionality + i)
downstream_id = int(pp_downstream * num_of_functionality + i)
pp_buff_size = int(dist_opt['pp_degree'] - coord['pp_idx'])
ups = []
downs = []
if not is_lr_sched_op(task_role):
buf_size = pp_buff_size if is_backward_op(task_role) else 2
ups.append((prev_id, buf_size))
if not is_optimizer_op(task_role):
buf_size = pp_buff_size if is_forward_op(task_role) else 2
downs.append((next_id, buf_size))
if is_forward_op(task_role):
if not first_stage:
ups.append((upstream_id, 2))
if not last_stage:
downs.append((downstream_id, 2))
elif is_backward_op(task_role):
if not last_stage:
ups.append((downstream_id, 2))
if not first_stage:
downs.append((upstream_id, 2))
for up in ups:
print("Task:", cur_id, "'s upstream includes:", up[0])
task_node.add_upstream_task(up[0], up[1])
for down in downs:
print("Task:", cur_id, "'s downstream includes:", down[0])
task_node.add_downstream_task(down[0], down[1])
task_id_to_rank = {}
for i in range(nrank):
for j in range(num_of_functionality):
task_id_to_rank[int(i * num_of_functionality + j)] = i
return task_nodes, task_id_to_rank
22 changes: 19 additions & 3 deletions python/paddle/fluid/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1964,7 +1964,8 @@ def _prepare_fleet_executor(self, program=None, scope=None, fleet_opt=None):
trainer_endpoints_str = os.getenv("PADDLE_TRAINER_ENDPOINTS", "")
trainer_endpoints = trainer_endpoints_str.split(',')
fleet_exe_desc = fleet_executor_desc_pb2.FleetExecutorDesc()
fleet_exe_desc.cur_rank = int(os.getenv("PADDLE_TRAINER_ID", 0))
cur_rank = int(os.getenv("PADDLE_TRAINER_ID", 0))
fleet_exe_desc.cur_rank = cur_rank
nrank = len(trainer_endpoints)
for rank, endpoint in enumerate(trainer_endpoints):
rank_info = fleet_executor_desc_pb2.RankInfo()
Expand All @@ -1979,8 +1980,23 @@ def _prepare_fleet_executor(self, program=None, scope=None, fleet_opt=None):
fleet_exe_desc.num_micro_batches = fleet_opt["num_micro_batches"]
num_of_gpu = fleet_exe_desc.dp_degree * fleet_exe_desc.mp_degree * fleet_exe_desc.pp_degree
assert nrank == num_of_gpu, "The number of rank is not equal to the number of gpu."
task_id_to_rank = fleet_opt.get("task_id_to_rank", {})
tasks = fleet_opt.get("tasks", [])
if 'python_side' in fleet_opt:
strategy = fleet_opt['python_side']
if strategy == '1F1B':
from paddle.distributed.fleet.fleet_executor_utils import one_f_one_b
tasks, task_id_to_rank = one_f_one_b(
program, cur_rank,
fleet_opt.get('num_micro_batches', 1),
fleet_opt.get('dist_strategy', {}), nrank)
# NOTE: have to hold these vars, otherwise will be destructed
fleet_opt['tasks'] = tasks
fleet_opt['task_id_to_rank'] = task_id_to_rank
else:
raise "Fleet_executor only supports 1F1B scheduler if you choose python side split, " \
"but received " + str(strategy) + "."
else:
task_id_to_rank = fleet_opt.get("task_id_to_rank", {})
tasks = fleet_opt.get("tasks", [])
fleet_exe = core.FleetExecutor(fleet_exe_desc.SerializeToString())
place = core.Place()
place.set_place(self.place)
Expand Down
3 changes: 2 additions & 1 deletion python/paddle/fluid/tests/unittests/test_fleet_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def fake_fleet_opt(self):
strategy.pipeline_configs = {"accumulate_steps": 1}
fleet_opt = {
"dist_strategy": strategy.sharding_configs,
"num_micro_batches": strategy.pipeline_configs["accumulate_steps"]
"num_micro_batches": strategy.pipeline_configs["accumulate_steps"],
"python_side": "1F1B"
}
return fleet_opt

Expand Down