-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Done] API for dist train #6297
[Done] API for dist train #6297
Conversation
… simple_dist_train_api
Maybe the trainer can not connect to the pserver, too much TIME_WAIT TCP state, and no any logs, no CONNECTED state: λ 3fab79b2d17f / netstat -anp |grep 6174
grep: warning: GREP_OPTIONS is deprecated; please use an alias or script
tcp 0 0 127.0.0.1:6174 0.0.0.0:* LISTEN 94/python
tcp 0 0 127.0.0.1:39128 127.0.0.1:6174 TIME_WAIT -
tcp 0 0 127.0.0.1:39654 127.0.0.1:6174 TIME_WAIT -
tcp 0 0 127.0.0.1:39538 127.0.0.1:6174 TIME_WAIT -
tcp 0 0 127.0.0.1:39504 127.0.0.1:6174 TIME_WAIT -
tcp 0 0 127.0.0.1:39548 127.0.0.1:6174 TIME_WAIT -
tcp 0 0 127.0.0.1:39160 127.0.0.1:6174 TIME_WAIT -
tcp 0 0 127.0.0.1:39238 127.0.0.1:6174 TIME_WAIT -
tcp 0 0 127.0.0.1:38952 127.0.0.1:6174 TIME_WAIT -
tcp 0 0 127.0.0.1:39592 127.0.0.1:6174 TIME_WAIT -
tcp 0 0 127.0.0.1:39332 127.0.0.1:6174 TIME_WAIT -
tcp 0 0 127.0.0.1:39464 127.0.0.1:6174 TIME_WAIT -
tcp 0 0 127.0.0.1:39090 127.0.0.1:6174 TIME_WAIT -
...
> PSERVER=127.0.0.1:6174 python notest_recognize_digits_conv_dist.py
> python notest_recognize_digits_conv_dist.py |
void RPCClient::Wait() { | ||
ClientContext context; | ||
VoidMessage call_msg, ret_msg; | ||
stub_->Wait(&context, call_msg, &ret_msg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here seems if the remote wait completed, but failed due to network issue. The next time this call retries, the remote will wait forever until next notify_all
is called.
Create another PR to this branch : typhoonzero#2
I1220 12:13:03.567848 24671 executor.cc:129] Op(sum), inputs:{X[x0, x1]}, outputs:{Out[Out]}.
I1220 12:13:03.567870 24671 executor.cc:129] Op(sum), inputs:{X[x0, x1]}, outputs:{Out[Out]}.
I1220 12:13:03.567893 24671 executor.cc:129] Op(sum), inputs:{X[x0, x1]}, outputs:{Out[Out]}.
I1220 12:13:03.567896 24670 scope.cc:28] Destroy variable Out
I1220 12:13:03.567917 24671 executor.cc:129] Op(sum), inputs:{X[x0, x1]}, outputs:{Out[Out]}.
I1220 12:13:03.567924 24670 scope.cc:28] Destroy variable x1
I1220 12:13:03.567939 24670 scope.cc:28] Destroy variable x0
I1220 12:13:03.567942 24671 executor.cc:129] Op(sum), inputs:{X[x0, x1]}, outputs:{Out[Out]}.
terminate called without an active exception
I1220 12:13:03.567975 24671 executor.cc:129] Op(sum), inputs:{X[x0, x1]}, outputs:{Out[Out]}.
Aborted (core dumped) Maybe trainer/pserver need graceful shutdown. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the working code! There are some points we need to discuss and reach agreement, afterwards we can create the issue and TODO comment on points that we agree needs improvement. Then I think we can merge this in and iterate.
paddle/operators/recv_op.cc
Outdated
auto grad_list = Attr<std::vector<std::string>>("GradList"); | ||
auto trainer_count = Attr<int>("Trainers"); | ||
size_t param_count = param_list.size(); | ||
rpc_service_->Start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe starting a new service (open new port, etc) for each run is a lot of overhead, and there could be multiple places in the ProgramDesc that have an recv OP. Maybe we need a lazy initialized service singleton.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recv_op
's Run
will only be called once, there's an inner event loop inside it. This will be changed to use while op in later versions: #6508
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think while loop is optional for the user to use. It's still very common for the user to used Python for loop and call run multiple times. So probably we need to have recv's run low overhead.
In my mind the recv op should no call while op, it should be blocked and the executor run the next op when it's done.
while (true) { | ||
// Get from multiple trainers, we don't care about order in which | ||
// the gradient arrives, just add suffix 0~n then average the gradient. | ||
for (size_t i = 0; i < param_count * trainer_count; ++i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we want the recv OP to be general, it probably should not care about trainer_count
. I thought it's a node-from-node data transfer, not node-from-nodes data transfer.
Because we are likely have the case that one node need to recv different variables from different nodes, in that case the most straightforward way maybe is one send-recv pair per node-node pair.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we call this fan_in
in general?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, eventually we can move to one recv op receiving one variable.
auto *merged_grad = recv_scope.FindVar(grad_var_name); | ||
if (merged_grad == nullptr) { | ||
// create output of merged var. | ||
auto merged_var = recv_scope.Var(grad_var_name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think send-recv OP should just send-recv, merging should be left for merging OP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merging OP is currently added by transpiler
, recv_op need to rename the variables sent by workers. Will refine this in following PRs.
paddle/operators/recv_op.cc
Outdated
std::string param_var_name; | ||
if (it != grad_list.end()) { | ||
param_var_name = param_list[it - grad_list.begin()]; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe log when could not find in grad_list
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
rpc_service_->Push(out_var->Get<framework::LoDTensor>()); | ||
rpc_service_->SetScope(&recv_scope); | ||
auto param_list = Attr<std::vector<std::string>>("ParamList"); | ||
auto grad_list = Attr<std::vector<std::string>>("GradList"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion send-recv should be general, we can send-recv any tensor. The send-recv OP should not know what kind of tensor (e.g., parameter v.s. gradient), so probably grad_list
is not the best name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the grad_list
and param_list
are used to ensure pserver received a pair of parameter vs. gradient, and maybe we could do this in an Op which named SyncWait
(or others): checking the parameters vs. gradients in the blocking queue until received all tensors, and don't care about trainer_count
in the recv op:
For the Recv Op
:
- Input: var name,
var = "dW"
in this example. - Kernel: Fetch var in the blocking queue and init Tensor:
{dW: {dw0, dw1}}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we use one send-recv op for one var, then we don't need to maintain two lists. Another benefit is here: #6297 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, but I think we also need a global Blocking Queue to store the variant so that recv op will fetch variant from the queue. And I updated the diagram.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@helinwang Indeed, param_list
is not used in fact. The reason of we need to know the variable names is that we need to merge variables from multiple workers. The variable names from multiple workers are the same, so we need to rename them by adding numberic suffixes: xxx@GRAD.trainer_0
to xxx@GRAD.trainer_n
then the sum_op
will use them as input.
The rename operation must happen before the sub program runs, and only recv_op
can know when the variable arrives. I can't find a better way right now. But still, can make the recv_op
more general by change the names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Yancey1989 param_list
is in fact not used for now, will remove.
The idea of using a queue before sending variables is great that we can send the variable parallelly right after the variable is ready to gain some performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@typhoonzero I think we can merge this PR soon, this is great work! We can merge in to the develop branch to start iterate.
In the same time, in my mind send-recv op is lower level than merging variables, it should be used by the merging variable logic, rather than controlling the merging variable logic.
You mentioned we need to have more "temp" variables, maybe they should be created by the transpiler, and send-recv op does not need to know it. More specifically, I saw snprintf(ret, sizeof(ret), "%s.trainer_%d", varname.c_str()
in recv_op.cc
, maybe it should be in the transpiler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree!
paddle/operators/recv_op.cc
Outdated
} | ||
rpc_service_->Start(); | ||
|
||
std::string program_str = Attr<std::string>("OptimizeProgram"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think send-recv should only focus on send-recv, and does know anything about optimization. Not all send-recv is followed by optimization.
paddle/operators/recv_op.cc
Outdated
// FIXME(typhoonzero): do not copy | ||
framework::CopyFrom(v.second, dev_ctx.GetPlace(), dev_ctx, tensor); | ||
} | ||
rpc_service_->Start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rpc_service_ is already started, do way still need this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just a state reset, renamed to Reset
.
LOG(ERROR) << "send variable error"; | ||
auto ins = Inputs("X"); | ||
std::vector<std::string> epmap = Attr<std::vector<std::string>>("epmap"); | ||
// TODO(typhoonzero): use async calls to send multiple variable asyncly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make the implementation simpler (and easier to optimize), maybe we can create one send-recv pair for each variable that needs it. In this way we can use parallel.Do
on top of all the send-recv pairs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I'm not sure the meaning of one send-recv pair for each variable
, does it need a independent port for each variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking there is a singleton service that listens on one port, and all send-recv ops go through that port.
I know currently each send-recv is "heavy" (open a port) and does "heavy duty" (send recv many vars). But it's still possible that we need multiple send-recv ops in a program. Thus I think each send/recv op opens a new port is still too heavy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thanks.
t = fluid.DistributeTranspiler() | ||
t.transpile(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1) | ||
|
||
pserver_endpoint = os.getenv("PSERVER") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal is the user don't have to change many Python code, so in the distributed release we probably need a remote executor that sends the ProgramDesc to the remote cpp workers that are started by Kubernetes. The user only need to configure the cluster from Python/CLI and change executor to remote executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, added this to project.
Fix unit test failed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I think we can merge this PR, and iterate according to the TODO list.
Fix #6286
Multi trainers still have some problem, will refine in next PR.