-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pserver etcd registration #2544
Conversation
go/cmd/pserver/pserver.go
Outdated
etcdEndpoint := flag.String("etcd-endpoint", "http://127.0.0.1:2379", | ||
"comma separated endpoint string for pserver to connect to etcd") | ||
etcdTimeout := flag.Int("etcd-timeout", 5, "timeout for etcd calls") | ||
logLevel := flag.String("log-level", "info", "log level, one of debug") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we set a global one log level both in trainer and pserver? Instead of setting them separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can not be done, pserver and trainers will be compiled to different binaries and run separately.
go/pserver/service.go
Outdated
|
||
// registerPserverEtcd registers pserver node on etcd using transaction. | ||
func (s *Service) registerPserverEtcd() (*clientv3.TxnResponse, error) { | ||
return concurrency.NewSTMRepeatable(context.TODO(), s.etcdClient, func(c concurrency.STM) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://godoc.org/github.com/coreos/etcd/clientv3/concurrency#NewSTMRepeatable
this interface is deprecated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe can just replace it with:
NewSTM(c, apply, WithAbortContext(ctx), WithIsolation(RepeatableReads))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
go/cmd/pserver/pserver.go
Outdated
etcdEndpoint := flag.String("etcd-endpoint", "http://127.0.0.1:2379", | ||
"comma separated endpoint string for pserver to connect to etcd") | ||
etcdTimeout := flag.Int("etcd-timeout", 5, "timeout for etcd calls") | ||
logLevel := flag.String("log-level", "info", "log level, one of debug") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it's because I don't have many experience with leveled log, I don't understand what does "one of debug" means? Maybe we can do something like:
"log level, possible values: debug, info, warning, error, fatal, panic"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, typo error. Fixed.
go/pserver/service.go
Outdated
} | ||
s.registerPserverEtcd() | ||
} // if endpoints != "" | ||
// Bypass etcd registration if no endpoints specified |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you put this into the function documentation as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Done.
go/pserver/service.go
Outdated
// wait and set s.desired init value | ||
for { | ||
ctx, cancel := context.WithTimeout(context.Background(), time.Second) | ||
resp, err := s.etcdClient.Get(ctx, "/ps_desired") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe put "/ps_desired" into a constant, we have 4 places in code (including 2 comments) that reference this string. Making it a constant makes it not possible to have this 4 place out of sync, and it easy if later we want to change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
go/pserver/service.go
Outdated
} | ||
for _, ev := range resp.Kvs { | ||
log.Debugf("key: %s, value: %s", ev.Key, ev.Value) | ||
if string(ev.Key) == "/ps_desired" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not very familiar with etcd myself, but why here needs to check for the value of key equal to "/ps_desired", we only asked for the key "/ps_desired".
I think it's safe to use resp.Kvs[0] here, since s.etcdClient.Get
did not fail. Etcd implementation is directly using it without checking: https://github.com/coreos/etcd/blob/master/clientv3/concurrency/mutex.go#L63
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. Done.
go/pserver/service.go
Outdated
|
||
// registerPserverEtcd registers pserver node on etcd using transaction. | ||
func (s *Service) registerPserverEtcd() (*clientv3.TxnResponse, error) { | ||
return concurrency.NewSTMRepeatable(context.TODO(), s.etcdClient, func(c concurrency.STM) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe can just replace it with:
NewSTM(c, apply, WithAbortContext(ctx), WithIsolation(RepeatableReads))
go/pserver/service.go
Outdated
// find the first id and write info | ||
c.Put(psKey, s.externalIP, clientv3.WithLease(resp.ID)) | ||
log.Debugf("set pserver node %s with value %s", psKey, s.externalIP) | ||
ch, kaerr := s.etcdClient.KeepAlive(context.TODO(), resp.ID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the doc for KeepAlive:
// KeepAlive keeps the given lease alive forever.
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
We are telling etcd to keep the lease alive forever, but we do not persist the lease ID to somewhere, how can we get the lease again (or release the lease) after crash recovery?
Maybe we don't want the lease to be alive forever, so after the current Pserver process dead, all of its keys on etcd will disappear after TTL, so some new Pserver can become this Pserver by setting those keys when they are not set.
Edit: Sorry @typhoonzero I confused myself about this. We do need to call KeepAlive
for every lease.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we don't want the lease to be alive forever, so after the current Pserver process dead, all of its keys on etcd will disappear after TTL, so some new Pserver can become this Pserver by setting those keys when they are not set.
This is exactly what we wanted.
go/pserver/service.go
Outdated
return kaerr | ||
} | ||
// FIXME: does this really needed? | ||
go func(ch <-chan *clientv3.LeaseKeepAliveResponse) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need this. Keep alive is managed by the lease automatically, I think doing this is for if we want to know if the lease is still alive (have other way to do it as well: https://github.com/coreos/etcd/blob/24e85b2454c9cc11114367c5c849b26f1b86b0d4/clientv3/concurrency/session.go#L83).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove.
go/pserver/service.go
Outdated
ps := c.Get(psKey) | ||
log.Debugf("got value (%s) for key: %s", ps, psKey) | ||
|
||
resp, err := s.etcdClient.Grant(context.TODO(), 5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can get a new lease every time setting a key. Another way is to create a session, and every key uses the lease in that session would be easier?
Plus, this function could be retried, if we create a new lease every time, we could end up creating many leases but only using the last one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lease granting should be put under if ps == "" {
since each pserver will only register one node under /ps
. Will change.
go/pserver/service.go
Outdated
|
||
// registerPserverEtcd registers pserver node on etcd using transaction. | ||
func (s *Service) registerPserverEtcd() (*clientv3.TxnResponse, error) { | ||
return concurrency.NewSTMRepeatable(context.TODO(), s.etcdClient, func(c concurrency.STM) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could happen that this function return nil
without setting any key when some owner of the key is dead but TTL have not reached. I think we need a flag to track if the function actually sets any key successfully, otherwise wait some time and retry.
go/utils/helper.go
Outdated
@@ -0,0 +1,45 @@ | |||
package utils |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another way is to create a utils folder, but put different utils into packages under the folder: https://github.com/coreos/etcd/tree/master/pkg . Otherwise everything will be all under package utils
, different utils could conflict with each other, we don't get the benefit of encapsulation between packages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, utils in go encounters less namespace conflicts than other languages, like we can import like
import (
utils "github.com/PaddlePaddle/Paddle/go/utils"
masterutils "github.com/PaddlePaddle/Paddle/go/master/utils"
)
But indeed the name utils
is like data
or blob
means nothing. Will put to a more meaningful package like in https://github.com/kubernetes/kubernetes/tree/master/pkg/util
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I am less worries about package name conflict, more worried about if we have a single utils package, everything gets into that package, and the different code for different purpose may disturb each other. If we put utils of different purpose under different packages inside the util folder, it will not happen. I believe k8s does it as well: https://github.com/kubernetes/kubernetes/tree/master/pkg/util different utils are under different packages under the util folder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! One comment, not merge blocker.
time.Sleep(s.etcdTimeout) | ||
continue | ||
} | ||
if len(resp.Kvs) != 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
len(resp.Kvs)
will always be 1 if err == nil.
Btw, In the previous example https://github.com/coreos/etcd/blob/master/clientv3/concurrency/mutex.go#L64 I asked the coreos develop who wrote this code over IRC, he said checking len(ownerKey) == 0
is not necessary.
You can see from this line ownerKey := resp.Responses[1].GetResponseRange().Kvs
: https://github.com/coreos/etcd/blob/master/clientv3/concurrency/mutex.go#L63 , they don't check slice len as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will fix this in later PRs.
Fix #2511