Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

listen_and_serv_op support async update #10042

Merged

Conversation

jacquesqiao
Copy link
Member

@jacquesqiao jacquesqiao commented Apr 19, 2018

fix: #9997

…qiao/Paddle into add-async-listen-and-serv-op
…qiao/Paddle into add-async-listen-and-serv-op
…om/jacquesqiao/Paddle into add-async-listen-and-serv-op
…om/jacquesqiao/Paddle into add-async-listen-and-serv-op
…om/jacquesqiao/Paddle into add-async-listen-and-serv-op
@jacquesqiao jacquesqiao changed the title [WIP] listen_and_serv_op support async update listen_and_serv_op support async update Apr 24, 2018
std::unordered_map<std::string, int32_t> grad_to_id;
std::unordered_map<int32_t, std::string> id_to_grad;

auto grad_to_id_str = Attr<std::vector<std::string>>("grad_to_id");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grad_to_id_str can be generated by listen_and_serv_op when initializing, read the ProgramDesc blocks and create a mapping, so we can save this attribute.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think grad_to_id_str should be created in Python by transpiler because the transpile logic know how to split the operator and block, listen_and_serv_op just use the result is fine, or it has to understand the detailed logic of transpiler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Firstly we want to make listen_and_serv_op general, that means it should not know that attributes and inputs are "grads" or parameters, it should simply receive the data and run a block.

In that case, for Async Execution, listen_and_serv is responsible to determine which block need to run when the data arrives. Just open for discussion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussing with @typhoonzero, I get the point, I totally agree with the idea that listen_and_serv_op should be a general operator! We will find a better way to implement async update in the future PRs.

@@ -30,9 +30,13 @@ enum CallStatus { PROCESS = 0, FINISH };
class RequestBase {
public:
explicit RequestBase(GrpcService::AsyncService* service,
::grpc::ServerCompletionQueue* cq,
::grpc::ServerCompletionQueue* cq, bool sync_mode,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe is_sync or just sync can tell the meaning?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think sync_mode means it works in a mode, but is_sync means itself is async. So I think sync_mode is better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

auto optimize_prepared = executor->Prepare(*program, block_list);
std::unordered_map<std::string,
std::shared_ptr<framework::ExecutorPrepareContext>>
grad_to_prepared;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grad_to_prepared_block

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

LOG(ERROR) << "run sub program error " << e.what();
}
});
// TODO(qiao) maybe we can remove this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing this means more "async" mode, trainer even doesn't know whether the sent gradient is updated to the server side weights before it gets the latest weights. Or do you mean by letting updates to different weights become parallel?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation will update gradients in sequence if we keep this wait. This may influence the effect, I will do some test on it.

Copy link
Member Author

@jacquesqiao jacquesqiao Apr 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussing with @typhoonzero , we think that each gradient should be put to an independent block queue to ensure that they are updated without conflict.

Copy link
Contributor

@Yancey1989 Yancey1989 Apr 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we think that each gradient should be put to an independent block queue

Do you mean each gradient of one parameter, such as grad_w1(trainer0), grad_w1(trainer1), grad_w2(trainer0), we put grad_w1(trainer0) and grad_w1(trainer1) into a queue, and grad_w2(trainer0) into another one?

According to the design doc, maybe we need multiple BlockingQueues so that each parameter can own one of them to implement a lock of updating parameter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we need multiple block queue, each will store gradients for on parameters, but we do not need to add a lock, because the queue will block until the optimize block is finished.

queue_(queue),
responder_(&ctx_) {
if (sync_mode_) {
request_.reset(new VariableResponse(scope, dev_ctx_, false));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request_.reset(new VariableResponse(
scope,
dev_ctx_,
!sync_mode_ // create_scope
));

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought a while here, and think the current code is easier for user understand the intent.

@@ -61,7 +63,7 @@ class VariableResponse {
// other: number of error field.
int Parse(const ::grpc::ByteBuffer& byte_buffer);

const framework::Scope& GetLocalScope() const { return *local_scope_; }
framework::Scope& GetLocalScope() const { return *local_scope_; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider GetMutableLocalScope that returns a pointer and avoid removing the const?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -221,6 +327,12 @@ from send_op and send back variables to recv_op.
"IP address to listen on.")
.SetDefault("127.0.0.1:6164")
.AddCustomChecker([](const std::string &ip) { return !ip.empty(); });
AddAttr<std::vector<std::string>>(
"grad_to_id",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grad_to_block_id?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -143,7 +143,8 @@ def transpile(self,
program=None,
pservers="127.0.0.1:6174",
trainers=1,
split_method=splitter.round_robin):
split_method=splitter.round_robin,
sync_mode=True):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need comment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

AsyncExecuteBlock(executor, grad_to_prepared_block[recv_var_name].get(),
v.second->GetMutableLocalScope());
// TODO(qiao): explain why
if (var->IsType<framework::SelectedRows>()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we don't need to clear the rows, because of each gradient var is in a new scope.

Copy link
Member Author

@jacquesqiao jacquesqiao Apr 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion! removed.

Copy link
Contributor

@typhoonzero typhoonzero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It LGTM now, does python wrapping in io.py need to be updated in this PR or later?

@jacquesqiao
Copy link
Member Author

@typhoonzero the CI is too slow, I will give another PR to fix the io.py.

Copy link
Contributor

@Yancey1989 Yancey1989 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM++

@jacquesqiao jacquesqiao merged commit 6d93456 into PaddlePaddle:develop Apr 26, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

listen_and_serv_op support async update
4 participants