Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Done] API for dist train #6297

Merged
merged 25 commits into from
Dec 22, 2017

Conversation

typhoonzero
Copy link
Contributor

@typhoonzero typhoonzero commented Dec 5, 2017

Fix #6286

Multi trainers still have some problem, will refine in next PR.

@typhoonzero typhoonzero changed the title [WIP] API for dist train [Done] API for dist train Dec 14, 2017
@Yancey1989
Copy link
Contributor

Yancey1989 commented Dec 18, 2017

Maybe the trainer can not connect to the pserver, too much TIME_WAIT TCP state, and no any logs, no CONNECTED state:

λ 3fab79b2d17f / netstat -anp |grep 6174
grep: warning: GREP_OPTIONS is deprecated; please use an alias or script
tcp        0      0 127.0.0.1:6174          0.0.0.0:*               LISTEN      94/python
tcp        0      0 127.0.0.1:39128         127.0.0.1:6174          TIME_WAIT   -
tcp        0      0 127.0.0.1:39654         127.0.0.1:6174          TIME_WAIT   -
tcp        0      0 127.0.0.1:39538         127.0.0.1:6174          TIME_WAIT   -
tcp        0      0 127.0.0.1:39504         127.0.0.1:6174          TIME_WAIT   -
tcp        0      0 127.0.0.1:39548         127.0.0.1:6174          TIME_WAIT   -
tcp        0      0 127.0.0.1:39160         127.0.0.1:6174          TIME_WAIT   -
tcp        0      0 127.0.0.1:39238         127.0.0.1:6174          TIME_WAIT   -
tcp        0      0 127.0.0.1:38952         127.0.0.1:6174          TIME_WAIT   -
tcp        0      0 127.0.0.1:39592         127.0.0.1:6174          TIME_WAIT   -
tcp        0      0 127.0.0.1:39332         127.0.0.1:6174          TIME_WAIT   -
tcp        0      0 127.0.0.1:39464         127.0.0.1:6174          TIME_WAIT   -
tcp        0      0 127.0.0.1:39090         127.0.0.1:6174          TIME_WAIT   -
...
  • start pserver
> PSERVER=127.0.0.1:6174 python notest_recognize_digits_conv_dist.py
  • start trainer
> python notest_recognize_digits_conv_dist.py

void RPCClient::Wait() {
ClientContext context;
VoidMessage call_msg, ret_msg;
stub_->Wait(&context, call_msg, &ret_msg);
Copy link
Contributor

@helinwang helinwang Dec 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here seems if the remote wait completed, but failed due to network issue. The next time this call retries, the remote will wait forever until next notify_all is called.

@Yancey1989
Copy link
Contributor

Create another PR to this branch : typhoonzero#2

  1. Fixed unit test failed, the sum op should have Input(X).
  2. Remaining problem:
I1220 12:13:03.567848 24671 executor.cc:129] Op(sum), inputs:{X[x0, x1]}, outputs:{Out[Out]}.
I1220 12:13:03.567870 24671 executor.cc:129] Op(sum), inputs:{X[x0, x1]}, outputs:{Out[Out]}.
I1220 12:13:03.567893 24671 executor.cc:129] Op(sum), inputs:{X[x0, x1]}, outputs:{Out[Out]}.
I1220 12:13:03.567896 24670 scope.cc:28] Destroy variable Out
I1220 12:13:03.567917 24671 executor.cc:129] Op(sum), inputs:{X[x0, x1]}, outputs:{Out[Out]}.
I1220 12:13:03.567924 24670 scope.cc:28] Destroy variable x1
I1220 12:13:03.567939 24670 scope.cc:28] Destroy variable x0
I1220 12:13:03.567942 24671 executor.cc:129] Op(sum), inputs:{X[x0, x1]}, outputs:{Out[Out]}.
terminate called without an active exception
I1220 12:13:03.567975 24671 executor.cc:129] Op(sum), inputs:{X[x0, x1]}, outputs:{Out[Out]}.
Aborted (core dumped)

Maybe trainer/pserver need graceful shutdown.

Copy link
Contributor

@helinwang helinwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the working code! There are some points we need to discuss and reach agreement, afterwards we can create the issue and TODO comment on points that we agree needs improvement. Then I think we can merge this in and iterate.

auto grad_list = Attr<std::vector<std::string>>("GradList");
auto trainer_count = Attr<int>("Trainers");
size_t param_count = param_list.size();
rpc_service_->Start();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe starting a new service (open new port, etc) for each run is a lot of overhead, and there could be multiple places in the ProgramDesc that have an recv OP. Maybe we need a lazy initialized service singleton.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recv_op's Run will only be called once, there's an inner event loop inside it. This will be changed to use while op in later versions: #6508

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think while loop is optional for the user to use. It's still very common for the user to used Python for loop and call run multiple times. So probably we need to have recv's run low overhead.
In my mind the recv op should no call while op, it should be blocked and the executor run the next op when it's done.

while (true) {
// Get from multiple trainers, we don't care about order in which
// the gradient arrives, just add suffix 0~n then average the gradient.
for (size_t i = 0; i < param_count * trainer_count; ++i) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we want the recv OP to be general, it probably should not care about trainer_count. I thought it's a node-from-node data transfer, not node-from-nodes data transfer.

Because we are likely have the case that one node need to recv different variables from different nodes, in that case the most straightforward way maybe is one send-recv pair per node-node pair.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call this fan_in in general?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, eventually we can move to one recv op receiving one variable.

auto *merged_grad = recv_scope.FindVar(grad_var_name);
if (merged_grad == nullptr) {
// create output of merged var.
auto merged_var = recv_scope.Var(grad_var_name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think send-recv OP should just send-recv, merging should be left for merging OP.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merging OP is currently added by transpiler, recv_op need to rename the variables sent by workers. Will refine this in following PRs.

std::string param_var_name;
if (it != grad_list.end()) {
param_var_name = param_list[it - grad_list.begin()];
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe log when could not find in grad_list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

rpc_service_->Push(out_var->Get<framework::LoDTensor>());
rpc_service_->SetScope(&recv_scope);
auto param_list = Attr<std::vector<std::string>>("ParamList");
auto grad_list = Attr<std::vector<std::string>>("GradList");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion send-recv should be general, we can send-recv any tensor. The send-recv OP should not know what kind of tensor (e.g., parameter v.s. gradient), so probably grad_list is not the best name.

Copy link
Contributor

@Yancey1989 Yancey1989 Dec 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the grad_list and param_list are used to ensure pserver received a pair of parameter vs. gradient, and maybe we could do this in an Op which named SyncWait(or others): checking the parameters vs. gradients in the blocking queue until received all tensors, and don't care about trainer_count in the recv op:

improve_multinodes

For the Recv Op:

  • Input: var name, var = "dW" in this example.
  • Kernel: Fetch var in the blocking queue and init Tensor: {dW: {dw0, dw1}}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we use one send-recv op for one var, then we don't need to maintain two lists. Another benefit is here: #6297 (comment)

Copy link
Contributor

@Yancey1989 Yancey1989 Dec 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but I think we also need a global Blocking Queue to store the variant so that recv op will fetch variant from the queue. And I updated the diagram.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@helinwang Indeed, param_list is not used in fact. The reason of we need to know the variable names is that we need to merge variables from multiple workers. The variable names from multiple workers are the same, so we need to rename them by adding numberic suffixes: xxx@GRAD.trainer_0 to xxx@GRAD.trainer_n then the sum_op will use them as input.

The rename operation must happen before the sub program runs, and only recv_op can know when the variable arrives. I can't find a better way right now. But still, can make the recv_op more general by change the names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yancey1989 param_list is in fact not used for now, will remove.

The idea of using a queue before sending variables is great that we can send the variable parallelly right after the variable is ready to gain some performance.

Copy link
Contributor

@helinwang helinwang Dec 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@typhoonzero I think we can merge this PR soon, this is great work! We can merge in to the develop branch to start iterate.
In the same time, in my mind send-recv op is lower level than merging variables, it should be used by the merging variable logic, rather than controlling the merging variable logic.
You mentioned we need to have more "temp" variables, maybe they should be created by the transpiler, and send-recv op does not need to know it. More specifically, I saw snprintf(ret, sizeof(ret), "%s.trainer_%d", varname.c_str() in recv_op.cc, maybe it should be in the transpiler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree!

}
rpc_service_->Start();

std::string program_str = Attr<std::string>("OptimizeProgram");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think send-recv should only focus on send-recv, and does know anything about optimization. Not all send-recv is followed by optimization.

// FIXME(typhoonzero): do not copy
framework::CopyFrom(v.second, dev_ctx.GetPlace(), dev_ctx, tensor);
}
rpc_service_->Start();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rpc_service_ is already started, do way still need this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a state reset, renamed to Reset.

LOG(ERROR) << "send variable error";
auto ins = Inputs("X");
std::vector<std::string> epmap = Attr<std::vector<std::string>>("epmap");
// TODO(typhoonzero): use async calls to send multiple variable asyncly.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make the implementation simpler (and easier to optimize), maybe we can create one send-recv pair for each variable that needs it. In this way we can use parallel.Do on top of all the send-recv pairs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm not sure the meaning of one send-recv pair for each variable, does it need a independent port for each variable?

Copy link
Contributor

@helinwang helinwang Dec 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking there is a singleton service that listens on one port, and all send-recv ops go through that port.
I know currently each send-recv is "heavy" (open a port) and does "heavy duty" (send recv many vars). But it's still possible that we need multiple send-recv ops in a program. Thus I think each send/recv op opens a new port is still too heavy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks.

t = fluid.DistributeTranspiler()
t.transpile(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1)

pserver_endpoint = os.getenv("PSERVER")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal is the user don't have to change many Python code, so in the distributed release we probably need a remote executor that sends the ProgramDesc to the remote cpp workers that are started by Kubernetes. The user only need to configure the cluster from Python/CLI and change executor to remote executor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, added this to project.

Copy link
Contributor

@Yancey1989 Yancey1989 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I think we can merge this PR, and iterate according to the TODO list.

@typhoonzero typhoonzero merged commit 8d6db25 into PaddlePaddle:develop Dec 22, 2017
@typhoonzero typhoonzero deleted the simple_dist_train_api branch December 22, 2017 05:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants