Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Multi-Stream, Single-Thread in New Executor #35024

Merged
merged 10 commits into from
Aug 26, 2021
290 changes: 265 additions & 25 deletions paddle/fluid/framework/new_executor/interpretercore.cc

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions paddle/fluid/framework/new_executor/interpretercore.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/event.h"

namespace paddle {
namespace framework {
Expand Down Expand Up @@ -63,9 +64,22 @@ class InterpreterCore {
void BuildVariableScope(const framework::ProgramDesc& pdesc,
VariableScope* var_scope);

platform::DeviceContext* ParseDeviceContextForInstruction(
const OpFuncNode& op_func_node, const OperatorBase& op_base);

void RecordEventInstruction(const Instruction& instruction,
const OpFuncNode& op_func_node);

void WaitOrSync(const std::vector<EventInter>& events,
const platform::DeviceContext* dev_ctx);

void StreamWaitEventOrSync(const Instruction& instruction);

const platform::Place& place_;
ProgramDesc main_program_;
VariableScope* global_scope_;
platform::DeviceContextPool d2h_ctx_pool_;
platform::DeviceContextPool h2d_ctx_pool_;
std::vector<VariableMetaInfo> vec_meta_info_;

std::vector<paddle::framework::OpFuncNode> vec_func_list_;
Expand All @@ -80,6 +94,7 @@ class InterpreterCore {
bool is_build_;

std::vector<std::string> feed_names_;
std::map<size_t, std::shared_ptr<platform::CudaEvent>> var_id2event_;

platform::DeviceContextPool fetch_context_pool_;
};
Expand Down
36 changes: 28 additions & 8 deletions paddle/fluid/framework/new_executor/new_executor_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <vector>

#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/platform/event.h"

namespace paddle {
namespace framework {
Expand All @@ -41,22 +42,30 @@ struct VariableScope {
std::map<std::string, int> name2id;
};

struct EventRun {
explicit EventRun(size_t op_id) : op_id_(op_id) {}
size_t op_id_;
};
struct NextInstruction {
std::vector<size_t> direct_run_;
std::vector<EventRun> event_wait_run_;
std::vector<EventRun> synchronize_run_;
std::vector<size_t> all_next_ops_;
};

struct EventInter {};
struct EventInter {
explicit EventInter(size_t var_id, std::shared_ptr<platform::CudaEvent> event,
bool is_sync)
: var_id_(var_id), event_(event), is_sync_(is_sync) {}
size_t var_id_;
std::shared_ptr<platform::CudaEvent> event_;
bool is_sync_;
};

struct InstructionInfo {
std::vector<size_t> dependecy_count_;
};

struct EventRun {
EventInter event_inter;
std::vector<size_t> same_device_run_;
std::vector<size_t> synchronized_run;
};

struct Instruction {
OpKernelFunc kernel_func_;
std::shared_ptr<RuntimeContext> runtime_ctx_;
Expand All @@ -67,7 +76,16 @@ struct Instruction {

std::vector<size_t> gc_check_var_list;
NextInstruction next_instruction_;
std::vector<EventInter> vec_event_list_;

std::vector<EventInter> intput_events_;
std::vector<EventInter> output_events_;

platform::DeviceContext* dev_ctx_; // not owned
};

enum class OpFuncType {
kQueueAsync, // GPU Kernel or d2h, h2d, send, recv, broadcast
kQueueSync, // CPU kernel, block host
};

struct OpFuncNode {
Expand All @@ -76,6 +94,8 @@ struct OpFuncNode {
std::map<std::string, std::vector<int>> output_index;

OpKernelComputeFunc kernel_func_;
platform::DeviceContext* dev_ctx_; // not owned
OpFuncType type_;
};

} // namespace framework
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/framework/new_executor/standalone_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ std::shared_ptr<InterpreterCore> StandaloneExecutor::GetInterpreterCore(
auto iter = interpretercores_.find(oss.str());

if (iter == interpretercores_.end()) {
VLOG(3) << "create interpreter_core for " << oss.str();
auto core = std::make_shared<InterpreterCore>(
place_, main_prog_, &global_scope_, feed_names, fetch_names);
interpretercores_.emplace(oss.str(), core);
Expand Down
148 changes: 148 additions & 0 deletions paddle/fluid/operators/memcpy_d2h_op.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/* Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/operators/memcpy_d2h_op.h"

#include <string>

namespace paddle {
namespace framework {
class OpDesc;
class InferShapeContext;
template <typename T>
class EmptyGradOpMaker;
} // namespace framework
namespace imperative {
class OpBase;
} // namespace imperative
namespace platform {
struct CPUPlace;
struct CUDAPlace;
struct float16;
} // namespace platform
} // namespace paddle

namespace paddle {
namespace operators {

class MemcpyD2HOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;

void InferShape(framework::InferShapeContext *ctx) const override {
auto type = ctx->GetInputsVarType("X")[0];
if (type == framework::proto::VarType::SELECTED_ROWS ||
type == framework::proto::VarType::LOD_TENSOR) {
ctx->SetOutputDim("Out", ctx->GetInputDim("X"));
if (type == framework::proto::VarType::LOD_TENSOR) {
ctx->ShareLoD("X", /*->*/ "Out");
}
}
}

protected:
framework::OpKernelType GetKernelTypeForVar(
const std::string &var_name, const framework::Tensor &tensor,
const framework::OpKernelType &expected_kernel_type) const override {
return framework::OpKernelType(expected_kernel_type.data_type_,
expected_kernel_type.place_,
tensor.layout());
}

framework::OpKernelType GetExpectedKernelType(
const framework::ExecutionContext &ctx) const override {
return framework::OpKernelType(
OperatorWithKernel::IndicateVarDataType(ctx, "X"),
ctx.device_context());
}
};

class MemcpyD2HInferVarType : public framework::VarTypeInference {
public:
void operator()(framework::InferVarTypeContext *ctx) const override {
ctx->SyncTypeAndDataType("X", "Out");
}
};

class MemcpyD2HKernel {
public:
void operator()(const framework::ExecutionContext &ctx) const {
auto *x = ctx.InputVar("X");
if (x == nullptr) {
return;
}
PADDLE_ENFORCE_EQ(ctx.HasOutput("Out"), true,
platform::errors::NotFound(
"Output(Out) of memcpy_d2h_op is not found."));
auto *out = ctx.OutputVar("Out");
// Get dev_ctx from ExecutionContext, it's D2H stream
auto &dev_ctx = ctx.device_context();
auto dst_place_type = ctx.Attr<int>("dst_place_type");
framework::VisitVarType(*x, MemcpyD2HFunctor(out, dev_ctx, dst_place_type));
Aurelius84 marked this conversation as resolved.
Show resolved Hide resolved
}
};

class MemcpyD2HOpProtoMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() override {
AddInput("X", "(LoDTensor) The input variable ");
AddOutput("Out",
"(LoDTensor) The type of output "
"is the same as input X.");
AddAttr<int>(
"dst_place_type",
"Determine the dst place of tensor copy. "
"By Now it ONLY support NPUPlace/CUDAPlace <-> CUDAPinnedPlace/CPU"
"Other place type is Unimplemented and will cause ERROR."
"0: dst is on CPUPlace. "
"1: dst is on CUDAPinnedPlace. ");
AddComment(R"DOC(
MemcpyD2H Operator.
By now, it ONLY supports the memcopy between NPUPlace/CUDAPlace <-> CUDAPinnedPlace/CPU.
You would have to update it if you want other more capacities.
Out = X, when type in [LoDTensor]
raise error if the type is not listed above.
)DOC");
}
};

} // namespace operators
} // namespace paddle

namespace ops = paddle::operators;
namespace plat = paddle::platform;
REGISTER_OPERATOR(
memcpy_d2h, ops::MemcpyD2HOp, ops::MemcpyD2HOpProtoMaker,
ops::MemcpyD2HInferVarType,
paddle::framework::EmptyGradOpMaker<paddle::framework::OpDesc>,
paddle::framework::EmptyGradOpMaker<paddle::imperative::OpBase>);

REGISTER_OP_CPU_KERNEL_FUNCTOR(memcpy_d2h, float, ops::MemcpyD2HKernel, double,
ops::MemcpyD2HKernel, int, ops::MemcpyD2HKernel,
int64_t, ops::MemcpyD2HKernel, bool,
ops::MemcpyD2HKernel, plat::float16,
ops::MemcpyD2HKernel);

#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_ROCM)
REGISTER_OP_CUDA_KERNEL_FUNCTOR(memcpy_d2h, float, ops::MemcpyD2HKernel, double,
ops::MemcpyD2HKernel, int, ops::MemcpyD2HKernel,
int64_t, ops::MemcpyD2HKernel, bool,
ops::MemcpyD2HKernel, plat::float16,
ops::MemcpyD2HKernel);
#endif

#ifdef PADDLE_WITH_ASCEND_CL
REGISTER_OP_NPU_KERNEL_FUNCTOR(memcpy_d2h, float, ops::MemcpyD2HKernel, double,
ops::MemcpyD2HKernel, int, ops::MemcpyD2HKernel,
int64_t, ops::MemcpyD2HKernel, bool,
ops::MemcpyD2HKernel, plat::float16,
ops::MemcpyD2HKernel);
#endif
78 changes: 78 additions & 0 deletions paddle/fluid/operators/memcpy_d2h_op.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/* Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once

#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/var_type.h"
#include "paddle/fluid/platform/device_context.h"

namespace paddle {
namespace platform {
class DeviceContext;
} // namespace platform
} // namespace paddle

namespace paddle {
namespace framework {
class LoDTensor;
class Variable;
class SelectedRows;
} // namespace framework
} // namespace paddle

namespace paddle {
namespace operators {
class MemcpyD2HFunctor {
public:
MemcpyD2HFunctor(framework::Variable *out,
const platform::DeviceContext &dev_ctx,
const int dst_place_type)
: out_(out), dev_ctx_(dev_ctx), dst_place_type_(dst_place_type) {}

void operator()(const framework::LoDTensor &lod_tensor) const {
auto &out_tensor = *out_->GetMutable<framework::LoDTensor>();

if (dst_place_type_ == 1) {
framework::TensorCopy(lod_tensor, platform::CUDAPinnedPlace(), dev_ctx_,
&out_tensor);
} else if (dst_place_type_ == 0) {
framework::TensorCopySync(lod_tensor, platform::CPUPlace(), &out_tensor);
} else {
PADDLE_THROW(platform::errors::Unimplemented(
"memcpy dst_place_type: %d is not supported yet.", dst_place_type_));
}
out_tensor.set_lod(lod_tensor.lod());
}

void operator()(const framework::SelectedRows &rows) const {
// (JZ-LIANG) to support SelectedRows
PADDLE_THROW(platform::errors::Unimplemented(
"Memcpy for SelectedRows is NOT support yet."));
}

template <typename T>
void operator()(const T &v) const {
PADDLE_ENFORCE_EQ(
true, false,
platform::errors::PermissionDenied(
"Not support type for Memcpy op with type %s", typeid(T).name()));
}

private:
framework::Variable *out_;
const platform::DeviceContext &dev_ctx_;
const int dst_place_type_;
};

} // namespace operators
} // namespace paddle
Loading