Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eagerly drop local scope in iteration #9838

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion paddle/fluid/framework/details/computation_op_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include "paddle/fluid/framework/details/computation_op_handle.h"

#include <string>

namespace paddle {
namespace framework {
namespace details {
Expand All @@ -33,7 +35,7 @@ void ComputationOpHandle::RunImpl() {
}
}

op_->Run(*scope_->FindVar("@TMP_SCOPE@")->Get<Scope *>(), place_);
op_->Run(*scope_->FindVar(kLocalExecScopeName)->Get<Scope *>(), place_);
}

std::string ComputationOpHandle::Name() const { return op_->Type(); }
Expand Down
8 changes: 7 additions & 1 deletion paddle/fluid/framework/details/fetch_op_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

#include "paddle/fluid/framework/details/fetch_op_handle.h"

#include <string>
#include <vector>

namespace paddle {
namespace framework {
namespace details {
Expand Down Expand Up @@ -57,7 +60,10 @@ void FetchOpHandle::RunImpl() {

for (size_t i = 0; i < scopes.size(); ++i) {
auto &scope = scopes[i];
auto &t = scope->FindVar(var_name)->Get<framework::LoDTensor>();
auto &t = scope->FindVar(kLocalExecScopeName)
->Get<Scope *>()
->FindVar(var_name)
->Get<framework::LoDTensor>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used to fetch the variable in the local scope?

if (platform::is_gpu_place(var->place_)) {
#ifdef PADDLE_WITH_CUDA
TensorCopy(t, cpu, *dev_ctxes_[t.place()], &tensors_[i]);
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/framework/details/op_handle_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ namespace paddle {
namespace framework {
namespace details {

constexpr char kLocalExecScopeName[] = "@LCOAL_SCOPE@";

class OpHandleBase {
private:
DISABLE_COPY_AND_ASSIGN(OpHandleBase);
Expand Down
4 changes: 3 additions & 1 deletion paddle/fluid/framework/details/ssa_graph_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
#pragma once

#include <memory>
#include <string>
#include <vector>

#include "paddle/fluid/framework/details/ssa_graph.h"
#include "paddle/fluid/framework/feed_fetch_type.h"

namespace paddle {
namespace framework {
namespace details {

class SSAGraphExecutor {
DISABLE_COPY_AND_ASSIGN(SSAGraphExecutor);

Expand Down
30 changes: 0 additions & 30 deletions paddle/fluid/framework/details/threaded_ssa_graph_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,6 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
ready_ops.clear();
};

// Create local scopes.
for (auto &scope : local_scopes_) {
auto &local_scope = scope->NewScope();
*scope->Var("@TMP_SCOPE@")->GetMutable<Scope *>() = &local_scope;
}

// Step 3. Execution
while (!pending_vars.empty() || !ready_ops.empty() || !delayed_ops.empty()) {
// 1. Run All Ready ops
Expand Down Expand Up @@ -189,34 +183,10 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
PADDLE_ENFORCE(ready_ops.empty());
PADDLE_ENFORCE(delayed_ops.empty());
PADDLE_ENFORCE(blocked_by_delayed_ops.empty());
++computation_count_;

auto sync_computation = [&] {
computation_count_ = 0;
// Wait All computational streams
for (auto p : this->places_) {
platform::DeviceContextPool::Instance().Get(p)->Wait();
}
for (auto &scope : local_scopes_) {
scope->DropKids();
}
};

// Wait FetchOps.
if (!fetch_ops.empty()) {
fetch_ops.clear();
sync_computation();
}

if (computation_count_ == max_async_computation) {
sync_computation();
}

// NOTE: the temp scope can be dropped lazily if needed.
// Drop tmp scopes;
for (auto &scope : local_scopes_) {
auto &kid = *scope->Var("@TMP_SCOPE@")->GetMutable<Scope *>();
kid = nullptr;
}

return fetch_data;
Expand Down
3 changes: 0 additions & 3 deletions paddle/fluid/framework/details/threaded_ssa_graph_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,6 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
std::unique_ptr<platform::EnforceNotMet> exception_;
std::atomic<int> running_ops_;
bool allow_op_delay_;

size_t computation_count_{0};
size_t max_async_computation{100};
};

} // namespace details
Expand Down
47 changes: 39 additions & 8 deletions paddle/fluid/framework/parallel_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License. */
#include "paddle/fluid/framework/parallel_executor.h"

#include <string>
#include <tuple>
#include <vector>

#ifdef PADDLE_WITH_CUDA
Expand All @@ -41,6 +42,8 @@ class ParallelExecutorPrivate {
#ifdef PADDLE_WITH_CUDA
std::unique_ptr<platform::NCCLContextMap> nccl_ctxs_;
#endif

std::vector<std::tuple<std::string, proto::VarType::Type, bool>> var_types_;
};

std::vector<Scope *> &ParallelExecutor::GetLocalScopes() {
Expand Down Expand Up @@ -97,14 +100,9 @@ ParallelExecutor::ParallelExecutor(
allow_op_delay));

// Step 3. Create vars in each scope;
for (auto *scope : member_->local_scopes_) {
for (auto *var : main_program.Block(0).AllVars()) {
if (scope->FindVar(var->Name()) != nullptr) {
continue;
}

InitializeVariable(scope->Var(var->Name()), var->GetType());
}
for (auto *var : main_program.Block(0).AllVars()) {
member_->var_types_.emplace_back(var->Name(), var->GetType(),
var->Persistable());
}
}

Expand Down Expand Up @@ -163,9 +161,42 @@ void ParallelExecutor::Run(
const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
platform::RecordBlock b(0);
SplitTensorToPlaces(feed_tensors);

// Create local scopes.
for (auto &scope : member_->local_scopes_) {
Scope &local_scope = scope->NewScope();
*scope->Var(details::kLocalExecScopeName)->GetMutable<Scope *>() =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is usefule of scope->Var(details::kLocalExecScopeName) ? Please add comments.

Copy link
Collaborator

@JiayiFeng JiayiFeng Apr 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, all non-persistable variables will be in the 'sub-scope of local scope'. The sub-scope is created before batch training and deleted after batch training.

&local_scope;

for (auto &name_type_pair : member_->var_types_) {
if (scope->FindVar(std::get<0>(name_type_pair)) != nullptr) {
continue;
}

if (std::get<2>(name_type_pair)) { // Persistable
InitializeVariable(scope->Var(std::get<0>(name_type_pair)),
std::get<1>(name_type_pair));
} else {
InitializeVariable(scope->Var(std::get<0>(name_type_pair)),
std::get<1>(name_type_pair));
}
}
}

auto fetch_data = member_->executor_->Run(fetch_tensors);
*member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
fetch_data;

// Wait All computational streams
for (auto p : member_->places_) {
platform::DeviceContextPool::Instance().Get(p)->Wait();
}
for (auto &scope : member_->local_scopes_) {
auto &local_scope =
*scope->Var(details::kLocalExecScopeName)->GetMutable<Scope *>();
scope->DeleteScope(local_scope);
local_scope = nullptr;
}
}

void ParallelExecutor::SplitTensorToPlaces(
Expand Down