-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
listen_and_serv_op support async update #10042
listen_and_serv_op support async update #10042
Conversation
… add-async-listen-and-serv-op
…qiao/Paddle into add-async-listen-and-serv-op
…qiao/Paddle into add-async-listen-and-serv-op
… add-async-listen-and-serv-op
…o/Paddle into add-async-listen-and-serv-op
…om/jacquesqiao/Paddle into add-async-listen-and-serv-op
…om/jacquesqiao/Paddle into add-async-listen-and-serv-op
…om/jacquesqiao/Paddle into add-async-listen-and-serv-op
… add-async-listen-and-serv-op
… add-async-listen-and-serv-op
std::unordered_map<std::string, int32_t> grad_to_id; | ||
std::unordered_map<int32_t, std::string> id_to_grad; | ||
|
||
auto grad_to_id_str = Attr<std::vector<std::string>>("grad_to_id"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grad_to_id_str
can be generated by listen_and_serv_op
when initializing, read the ProgramDesc
blocks and create a mapping, so we can save this attribute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think grad_to_id_str should be created in Python by transpiler because the transpile logic know how to split the operator and block, listen_and_serv_op just use the result is fine, or it has to understand the detailed logic of transpiler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Firstly we want to make listen_and_serv_op
general, that means it should not know that attributes and inputs are "grads" or parameters, it should simply receive the data and run a block.
In that case, for Async Execution, listen_and_serv
is responsible to determine which block need to run when the data arrives. Just open for discussion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussing with @typhoonzero, I get the point, I totally agree with the idea that listen_and_serv_op should be a general operator! We will find a better way to implement async update in the future PRs.
@@ -30,9 +30,13 @@ enum CallStatus { PROCESS = 0, FINISH }; | |||
class RequestBase { | |||
public: | |||
explicit RequestBase(GrpcService::AsyncService* service, | |||
::grpc::ServerCompletionQueue* cq, | |||
::grpc::ServerCompletionQueue* cq, bool sync_mode, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe is_sync
or just sync
can tell the meaning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think sync_mode
means it works in a mode, but is_sync
means itself is async. So I think sync_mode
is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.
auto optimize_prepared = executor->Prepare(*program, block_list); | ||
std::unordered_map<std::string, | ||
std::shared_ptr<framework::ExecutorPrepareContext>> | ||
grad_to_prepared; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grad_to_prepared_block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
LOG(ERROR) << "run sub program error " << e.what(); | ||
} | ||
}); | ||
// TODO(qiao) maybe we can remove this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removing this means more "async" mode, trainer even doesn't know whether the sent gradient is updated to the server side weights before it gets the latest weights. Or do you mean by letting updates to different weights become parallel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation will update gradients in sequence if we keep this wait. This may influence the effect, I will do some test on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussing with @typhoonzero , we think that each gradient should be put to an independent block queue to ensure that they are updated without conflict.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we think that each gradient should be put to an independent block queue
Do you mean each gradient of one parameter, such as grad_w1(trainer0), grad_w1(trainer1), grad_w2(trainer0), we put grad_w1(trainer0) and grad_w1(trainer1) into a queue, and grad_w2(trainer0) into another one?
According to the design doc, maybe we need multiple BlockingQueues so that each parameter can own one of them to implement a lock of updating parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we need multiple block queue, each will store gradients for on parameters, but we do not need to add a lock, because the queue will block until the optimize block is finished.
queue_(queue), | ||
responder_(&ctx_) { | ||
if (sync_mode_) { | ||
request_.reset(new VariableResponse(scope, dev_ctx_, false)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
request_.reset(new VariableResponse(
scope,
dev_ctx_,
!sync_mode_ // create_scope
));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought a while here, and think the current code is easier for user understand the intent.
@@ -61,7 +63,7 @@ class VariableResponse { | |||
// other: number of error field. | |||
int Parse(const ::grpc::ByteBuffer& byte_buffer); | |||
|
|||
const framework::Scope& GetLocalScope() const { return *local_scope_; } | |||
framework::Scope& GetLocalScope() const { return *local_scope_; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider GetMutableLocalScope that returns a pointer and avoid removing the const?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -221,6 +327,12 @@ from send_op and send back variables to recv_op. | |||
"IP address to listen on.") | |||
.SetDefault("127.0.0.1:6164") | |||
.AddCustomChecker([](const std::string &ip) { return !ip.empty(); }); | |||
AddAttr<std::vector<std::string>>( | |||
"grad_to_id", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grad_to_block_id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -143,7 +143,8 @@ def transpile(self, | |||
program=None, | |||
pservers="127.0.0.1:6174", | |||
trainers=1, | |||
split_method=splitter.round_robin): | |||
split_method=splitter.round_robin, | |||
sync_mode=True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
… add-async-listen-and-serv-op
AsyncExecuteBlock(executor, grad_to_prepared_block[recv_var_name].get(), | ||
v.second->GetMutableLocalScope()); | ||
// TODO(qiao): explain why | ||
if (var->IsType<framework::SelectedRows>()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we don't need to clear the rows, because of each gradient var is in a new scope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great suggestion! removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It LGTM now, does python wrapping in io.py
need to be updated in this PR or later?
@typhoonzero the CI is too slow, I will give another PR to fix the io.py. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM++
fix: #9997