Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dygraph] Finish fixing mem bugs of no sync in DataParallel #47444

Merged
merged 3 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions paddle/fluid/distributed/collective/reducer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -588,10 +588,9 @@ void EagerReducer::TraverseBackwardGraph(const std::vector<Tensor> &outputs) {
}
}

void EagerReducer::PrepareForBackward(const std::vector<Tensor> &outputs,
const bool is_sync) {
void EagerReducer::PrepareForBackward(const std::vector<Tensor> &outputs) {
VLOG(3) << "after forward, then reset count for backward.";
grad_need_hooks_ = is_sync;
grad_need_hooks_ = true;

next_group_ = 0;
std::for_each(groups_.begin(), groups_.end(), [](EagerGroup &group) {
Expand Down Expand Up @@ -660,9 +659,25 @@ void EagerReducer::AddDistHook(size_t var_index) {
var_index));

// gradient synchronization is not required when grad_need_hooks_ is false.
// if (!grad_need_hooks_) {
// return;
// }
if (!grad_need_hooks_) {
const auto &var_locator = variable_locators_[var_index];
const auto group_index = var_locator.group_index;
const auto inside_group_index = var_locator.inside_group_index;
auto &group = groups_[group_index];
auto &group_tensor = group.dense_tensors_[inside_group_index];

auto *autograd_meta = tensors_[var_index].get_autograd_meta();
auto &grad_tensor = static_cast<egr::AutogradMeta *>(autograd_meta)->Grad();

if (!HasGrad(var_index)) {
group_tensor.ShareDataWith(phi::DenseTensor());
} else {
auto grad_dense_tensor =
*(std::dynamic_pointer_cast<phi::DenseTensor>(grad_tensor.impl()));
group_tensor.ShareDataWith(grad_dense_tensor);
}
return;
}

VLOG(3) << "Tensor[" << var_index << "] [" << tensors_[var_index].name()
<< "@Grad] arrived and triggered disthook";
Expand Down Expand Up @@ -828,12 +843,10 @@ void EagerReducer::MarkGroupReady(size_t group_index) {
for (; next_group_ < groups_.size() && groups_[next_group_].pending_ == 0;
++next_group_) {
UNUSED auto &group = groups_[next_group_];
if (grad_need_hooks_) {
if (group.is_sparse_) {
AllReduceSparse(&group, next_group_);
} else {
FusedAllReduceSchedule(&group, next_group_);
}
if (group.is_sparse_) {
AllReduceSparse(&group, next_group_);
} else {
FusedAllReduceSchedule(&group, next_group_);
}
}
}
Expand Down Expand Up @@ -921,14 +934,15 @@ void EagerReducer::ProcessUnusedDenseVars() {

void EagerReducer::FinalizeBackward() {
groups_need_finalize_ = false;
grad_need_hooks_ = false;
for (auto &group : groups_) {
if (!group.is_sparse_ && grad_need_hooks_) {
if (!group.is_sparse_) {
group.task->Synchronize();
}
}

for (auto &group : groups_) {
if (!group.is_sparse_ && grad_need_hooks_) {
if (!group.is_sparse_) {
group.dense_contents_.reset();
}
}
Expand All @@ -940,7 +954,6 @@ void EagerReducer::FinalizeBackward() {
VLOG(3) << "ProcessUnusedDenseVars is finished.";
}

grad_need_hooks_ = false;
VLOG(3) << "In the batch, Reducer is finished.";
}

Expand Down
3 changes: 1 addition & 2 deletions paddle/fluid/distributed/collective/reducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ class EagerReducer {
void InitializeGroups(const std::vector<std::vector<size_t>> &group_indices);
void InitializeDenseGroups(const std::vector<size_t> &tensor_indices_,
EagerGroup *p_group);
void PrepareForBackward(const std::vector<Tensor> &outputs,
const bool is_sync);
void PrepareForBackward(const std::vector<Tensor> &outputs);
void AddDistHook(size_t var_index);
void MarkVarReady(const size_t var_index, const bool is_used_var);
void MarkGroupReady(const size_t group_index);
Expand Down
9 changes: 3 additions & 6 deletions paddle/fluid/imperative/reducer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -675,10 +675,9 @@ void Reducer::TraverseBackwardGraph(
// After each batch is calculated, the counter of each group(group.pending_)
// and allreudce sequence counter(next_group_) will be cleaned up again.
void Reducer::PrepareForBackward(
const std::vector<std::shared_ptr<imperative::VarBase>> &outputs,
const bool is_sync) {
const std::vector<std::shared_ptr<imperative::VarBase>> &outputs) {
VLOG(3) << "after forward, then reset count for backward.";
grad_need_hooks_ = is_sync;
grad_need_hooks_ = true;
next_group_ = 0;
std::for_each(groups_.begin(), groups_.end(), [](Group &group) {
group.pending_ = group.variable_indices_.size();
Expand Down Expand Up @@ -711,9 +710,7 @@ void Reducer::PrepareForBackward(

if (find_unused_vars_once_ || find_unused_vars_each_step_) {
unused_vars_.clear();
if (grad_need_hooks_) {
TraverseBackwardGraph(outputs);
}
TraverseBackwardGraph(outputs);
// only check once in first step
find_unused_vars_once_ = false;
}
Expand Down
3 changes: 1 addition & 2 deletions paddle/fluid/imperative/reducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ class Reducer {
void PrepareDeps(const std::unordered_set<GradOpNode*>& init_nodes);

void PrepareForBackward(
const std::vector<std::shared_ptr<imperative::VarBase>>& outputs,
const bool is_sync);
const std::vector<std::shared_ptr<imperative::VarBase>>& outputs);

void AddDistHook(size_t var_index);

Expand Down
7 changes: 2 additions & 5 deletions paddle/fluid/pybind/distributed_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1407,14 +1407,11 @@ void BindDistributed(py::module *m) {
.def(py::init(&CreateEagerReducer))
.def(
"prepare_for_backward",
[](distributed::EagerReducer &self,
py::handle py_tensors,
bool is_sync) {
[](distributed::EagerReducer &self, py::handle py_tensors) {
auto params = CastPyArg2VectorOfTensor(py_tensors.ptr(), 0);
self.PrepareForBackward(params, is_sync);
self.PrepareForBackward(params);
},
py::arg("tensors"),
py::arg("is_sync"),
py::call_guard<py::gil_scoped_release>());
}

Expand Down
1 change: 0 additions & 1 deletion paddle/fluid/pybind/imperative.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2569,7 +2569,6 @@ void BindImperative(py::module *m_ptr) {
.def("prepare_for_backward",
&imperative::Reducer::PrepareForBackward,
py::arg("vars"),
py::arg("is_sync"),
py::call_guard<py::gil_scoped_release>());

m.def("assign_group_by_size",
Expand Down
8 changes: 6 additions & 2 deletions python/paddle/fluid/dygraph/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,9 +818,13 @@ def forward(self, x):

def forward(self, *inputs, **kwargs):
outputs = self._layers(*inputs, **kwargs)
if self._strategy.nranks > 1 and framework._dygraph_tracer()._has_grad:
if (
self._strategy.nranks > 1
and framework._dygraph_tracer()._has_grad
and self.grad_need_sync
):
self._reducer.prepare_for_backward(
list(self._find_varbase(outputs)), self.grad_need_sync
list(self._find_varbase(outputs))
)
return outputs

Expand Down