Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix distributed error info #27206

Merged
merged 11 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions paddle/fluid/operators/collective/c_allgather_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ class CAllGatherOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
void InferShape(framework::InferShapeContext *ctx) const override {
PADDLE_ENFORCE(ctx->HasInput("X"), "Input(X) should not be null");
PADDLE_ENFORCE(ctx->HasOutput("Out"), "Output(Out) should not be null.");
OP_INOUT_CHECK(ctx->HasInput("X"), "Input", "X", "AllGather");
OP_INOUT_CHECK(ctx->HasOutput("Out"), "Input", "Out", "AllGather");
int nranks = ctx->Attrs().Get<int>("nranks");
PADDLE_ENFORCE_GE(nranks, 2, "nranks should be >=2");
PADDLE_ENFORCE_GE(nranks, 2, platform::errors::InvalidArgument(
"The value of nranks should be >=2."));
framework::DDim dim = ctx->GetInputDim("X");
dim[0] = dim[0] * nranks;
if (dim[0] < 0) dim[0] = -1;
Expand Down
8 changes: 6 additions & 2 deletions paddle/fluid/operators/collective/c_allgather_op.cu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ class CAllGatherOpCUDAKernel : public framework::OpKernel<T> {
int rid = ctx.Attr<int>("ring_id");
auto place = ctx.GetPlace();
auto comm = platform::NCCLCommContext::Instance().Get(rid, place);
PADDLE_ENFORCE_EQ(nranks, comm->nranks());
PADDLE_ENFORCE_EQ(
nranks, comm->nranks(),
platform::errors::InvalidArgument("nranks: %s should equal to %s",
nranks, comm->nranks()));

framework::DDim out_dims = in->dims();
out_dims[0] *= nranks;
Expand All @@ -59,7 +62,8 @@ class CAllGatherOpCUDAKernel : public framework::OpKernel<T> {
send_buff, recv_buff, send_numel, static_cast<ncclDataType_t>(dtype),
comm->comm(), stream));
#else
PADDLE_THROW("PaddlePaddle should compile with GPU.");
PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should compile with GPU."));
#endif
}
};
Expand Down
6 changes: 4 additions & 2 deletions paddle/fluid/operators/collective/c_allreduce_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,15 @@ class CAllReduceOpCUDAKernel : public framework::OpKernel<T> {
break;

default:
PADDLE_THROW("Invalid reduce type: %d", red_type);
PADDLE_THROW(platform::errors::InvalidArgument(
"Invalid reduce type: %d", red_type));
}

PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclAllReduce(
sendbuff, recvbuff, numel, dtype, nccl_red_type, comm->comm(), stream));
#else
PADDLE_THROW("PaddlePaddle should compile with GPU.");
PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should compile with GPU."));
#endif
}
};
Expand Down
3 changes: 2 additions & 1 deletion paddle/fluid/operators/collective/c_broadcast_op.cu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class CBroadcastOpCUDAKernel : public framework::OpKernel<T> {
out->Resize(x->dims());
out->set_lod(x->lod());
#else
PADDLE_THROW("PaddlePaddle should compile with GPU.");
PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should compile with GPU."));
#endif
}
};
Expand Down
6 changes: 4 additions & 2 deletions paddle/fluid/operators/collective/c_comm_init_all_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class CCommInitAllOp : public framework::OperatorBase {
void RunImpl(const framework::Scope& scope,
const platform::Place& place) const override {
PADDLE_ENFORCE_EQ(is_gpu_place(place), true,
"CCommInitAllOp can run on gpu place only.");
platform::errors::PreconditionNotMet(
"CCommInitAllOp can run on gpu place only"));

#if defined(PADDLE_WITH_NCCL)
std::vector<int> devices = Attr<std::vector<int>>("devices");
Expand All @@ -62,7 +63,8 @@ class CCommInitAllOp : public framework::OperatorBase {

platform::NCCLCommContext::Instance().CreateAllNCCLComms(devices, rid);
#else
PADDLE_THROW("PaddlePaddle should compile with GPU.");
PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should compile with GPU."));
#endif
}
};
Expand Down
11 changes: 7 additions & 4 deletions paddle/fluid/operators/collective/c_comm_init_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ class CCommInitOp : public framework::OperatorBase {

void RunImpl(const framework::Scope& scope,
const platform::Place& place) const override {
PADDLE_ENFORCE(is_gpu_place(place),
"CCommInitOp can run on gpu place only.");
PADDLE_ENFORCE_EQ(is_gpu_place(place), true,
platform::errors::PreconditionNotMet(
"CCommInitOp can run on gpu place only."));

auto var = scope.FindVar(Input("X"));
PADDLE_ENFORCE_NOT_NULL(var);
PADDLE_ENFORCE_NOT_NULL(
var, platform::errors::InvalidArgument("Input con not be empty."));
#if defined(PADDLE_WITH_NCCL)
ncclUniqueId* nccl_id = var->GetMutable<ncclUniqueId>();

Expand All @@ -59,7 +61,8 @@ class CCommInitOp : public framework::OperatorBase {
platform::NCCLCommContext::Instance().CreateNCCLComm(
nccl_id, nranks, rank_id, device_id, rid);
#else
PADDLE_THROW("PaddlePaddle should compile with GPU.");
PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should compile with GPU."));
#endif
}
};
Expand Down
7 changes: 5 additions & 2 deletions paddle/fluid/operators/collective/c_gen_nccl_id_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ class CGenNCCLIdOp : public framework::OperatorBase {
const platform::DeviceContext& dev_ctx) const {
std::string var_name = Output("Out");
auto var = scope->FindVar(var_name);
PADDLE_ENFORCE_NOT_NULL(var);
PADDLE_ENFORCE_NOT_NULL(
var, platform::errors::InvalidArgument("Output can not be Null"));
auto id = var->GetMutable<ncclUniqueId>();
PADDLE_ENFORCE(platform::dynload::ncclGetUniqueId(id));
PADDLE_ENFORCE_EQ(platform::dynload::ncclGetUniqueId(id), 0,
platform::errors::InvalidArgument(
"ncclGetUniqueId failed with id %s", id));

std::vector<std::string> endpoint_list =
Attr<std::vector<std::string>>("other_endpoints");
Expand Down
11 changes: 6 additions & 5 deletions paddle/fluid/operators/collective/c_reducescatter_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ class CReduceScatterOp : public framework::OperatorWithKernel {
using framework::OperatorWithKernel::OperatorWithKernel;

void InferShape(framework::InferShapeContext *ctx) const override {
PADDLE_ENFORCE(ctx->HasInput("X"), "Input(X) should not be null");
PADDLE_ENFORCE(ctx->HasOutput("Out"), "Output(Out) should not be null.");
OP_INOUT_CHECK(ctx->HasInput("X"), "Input", "X", "ReduceScatter");
OP_INOUT_CHECK(ctx->HasOutput("Out"), "Output", "X", "ReduceScatter");
int nranks = ctx->Attrs().Get<int>("nranks");
framework::DDim dim = ctx->GetInputDim("X");
if (dim[0] > 0 || dim[0] < -1) {
PADDLE_ENFORCE(dim[0] % nranks == 0,
"dim[0] (%d) is not divisible by nranks(%d)", dim[0],
nranks);
PADDLE_ENFORCE_EQ(
dim[0] % nranks, 0,
platform::errors::InvalidArgument(
"dim[0] (%d) is not divisible by nranks(%d)", dim[0], nranks));
dim[0] /= nranks;
}
ctx->SetOutputDim("Out", dim);
Expand Down
3 changes: 2 additions & 1 deletion paddle/fluid/operators/collective/c_reducescatter_op.cu.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class CReduceScatterOpCUDAKernel : public framework::OpKernel<T> {
send_buff, recv_buff, recv_numel, static_cast<ncclDataType_t>(dtype),
ncclSum, comm->comm(), stream));
#else
PADDLE_THROW("PaddlePaddle should compile with GPU.");
PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should compile with GPU."));
#endif
}
};
Expand Down
3 changes: 2 additions & 1 deletion paddle/fluid/operators/collective/c_reducescatter_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ template <typename T>
class CReduceScatterOpCPUKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
PADDLE_THROW("Unimplemented cpu kernel for CReduceScatterOp.");
PADDLE_THROW(platform::errors::Unimplemented(
"Unimplemented cpu kernel for CReduceScatterOp."));
}
};

Expand Down
8 changes: 5 additions & 3 deletions paddle/fluid/operators/collective/c_sync_calc_stream_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,16 @@ class CSyncCalcStreamOp : public framework::OperatorBase {

void RunImpl(const framework::Scope& scope,
const platform::Place& place) const override {
PADDLE_ENFORCE(is_gpu_place(place),
"Sync stream op can run on gpu place only for now.");
PADDLE_ENFORCE_EQ(is_gpu_place(place), true,
platform::errors::PreconditionNotMet(
"Sync stream op can run on gpu place only for now."));
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
auto dev_ctx = static_cast<platform::CUDADeviceContext*>(
platform::DeviceContextPool::Instance().Get(place));
PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(dev_ctx->stream()));
#else
PADDLE_THROW("PaddlePaddle should compile with GPU.");
PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should compile with GPU."));
#endif
}
};
Expand Down
6 changes: 4 additions & 2 deletions paddle/fluid/operators/collective/c_sync_comm_stream_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,17 @@ class CSyncCommStreamOp : public framework::OperatorBase {
void RunImpl(const framework::Scope& scope,
const platform::Place& place) const override {
PADDLE_ENFORCE_EQ(is_gpu_place(place), true,
"Sync stream op can run on gpu place only for now.");
platform::errors::PreconditionNotMet(
"Sync stream op can run on gpu place only for now."));

#if defined(PADDLE_WITH_NCCL)
int ring_id = Attr<int>("ring_id");
auto stream =
platform::NCCLCommContext::Instance().Get(ring_id, place)->stream();
PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(stream));
#else
PADDLE_THROW("PaddlePaddle should compile with GPU.");
PADDLE_THROW(platform::errors::PreconditionNotMet(
"PaddlePaddle should compile with GPU."));
#endif
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ class AsyncSparseParamUpdateRecorder {
std::vector<int64_t>* result) {
VLOG(3) << "GetAndClear param: " << param_name
<< " for trainer: " << trainer_id;
PADDLE_ENFORCE_LT(trainer_id, trainer_num_);
PADDLE_ENFORCE_LT(
trainer_id, trainer_num_,
platform::errors::InvalidArgument(
"The value of trainer_id: %s should less than trainer_num: %s.",
trainer_id, trainer_num_));
param_to_updated_rows_.at(param_name)[trainer_id]
->GetAndClear(result)
.wait();
Expand Down
10 changes: 5 additions & 5 deletions paddle/fluid/operators/distributed/brpc/brpc_rdma_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ void* RdmaMemPool::Find(const std::string& varname, int64_t size) {
auto info = it->second;
if (info.data_size != size) {
pthread_rwlock_unlock(&access_);
PADDLE_ENFORCE(false, "var:%s size:%ld != %ld", varname, size,
info.data_size);
PADDLE_THROW(platform::errors::InvalidArgument(
"var:%s size:%ld != %ld", varname, size, info.data_size));
return nullptr;
}

Expand All @@ -52,9 +52,9 @@ void RdmaMemPool::Register(const std::string& varname, void* data,
int64_t data_size) {
void* old = Find(varname, data_size);
if (old != nullptr) {
if (data != old) {
PADDLE_ENFORCE(false, "var:%s data:%ld != %ld", varname, data, old);
}
PADDLE_ENFORCE_EQ(
data, old, platform::errors::InvalidArgument("var:%s data:%ld != %ld",
varname, data, old));
VLOG(7) << "Find on rdma:" << varname << " data:" << data
<< " data_size:" << data_size;
return;
Expand Down
19 changes: 14 additions & 5 deletions paddle/fluid/operators/distributed/brpc/brpc_sendrecvop_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,15 @@ void SerializeToIOBuf(const std::string& name, framework::Variable* var,
return;
#endif
} else {
PADDLE_THROW("Serialize does not support type: %s",
typeid(var->Type()).name());
PADDLE_THROW(platform::errors::InvalidArgument(
"Serialize does not support type: %s", typeid(var->Type()).name()));
}

PADDLE_ENFORCE_NOT_NULL(payload);
PADDLE_ENFORCE_NOT_NULL(
payload,
platform::errors::InvalidArgument(
"Not support type: %s, need to be LOD_TENSOR or SELECTED_ROWS.",
var->Type()));

// FIXME(gongwb): it seems that can use zero copy.
if (var_is_not_stable) {
Expand All @@ -186,7 +190,10 @@ void SerializeToIOBuf(const std::string& name, framework::Variable* var,

if (var->IsType<framework::SelectedRows>()) {
auto* slr = var->GetMutable<framework::SelectedRows>();
PADDLE_ENFORCE(VectorElemName(slr->rows()) == typeid(int64_t).name());
PADDLE_ENFORCE_EQ(VectorElemName(slr->rows()), typeid(int64_t).name(),
platform::errors::InvalidArgument(
"Got wrong type: %s, expect type: int64_t",
VectorElemName(slr->rows())));
size_t rows_memory_size = slr->rows().size() * sizeof(int64_t);

IOBufWriter::Append(name, iobuf,
Expand All @@ -202,7 +209,9 @@ void DeserializeFromIOBuf(const ::sendrecv::VariableMessage& meta,
const framework::Scope* scope,
framework::Variable** var, int* trainer_id) {
operators::distributed::BRPCVariableResponse resp(scope, &ctx);
PADDLE_ENFORCE(resp.Parse(iobuf, meta) == 0, "parse iobuf to tensor error!");
PADDLE_ENFORCE_EQ(
resp.Parse(iobuf, meta), 0,
platform::errors::InvalidArgument("parse iobuf to tensor error!"));
*var = resp.GetVar();
*trainer_id = resp.GetTrainerId();
}
Expand Down
Loading