Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pserver etcd registration #2544

Merged
merged 3 commits into from
Jun 23, 2017
Merged

Conversation

typhoonzero
Copy link
Contributor

Fix #2511

etcdEndpoint := flag.String("etcd-endpoint", "http://127.0.0.1:2379",
"comma separated endpoint string for pserver to connect to etcd")
etcdTimeout := flag.Int("etcd-timeout", 5, "timeout for etcd calls")
logLevel := flag.String("log-level", "info", "log level, one of debug")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we set a global one log level both in trainer and pserver? Instead of setting them separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can not be done, pserver and trainers will be compiled to different binaries and run separately.


// registerPserverEtcd registers pserver node on etcd using transaction.
func (s *Service) registerPserverEtcd() (*clientv3.TxnResponse, error) {
return concurrency.NewSTMRepeatable(context.TODO(), s.etcdClient, func(c concurrency.STM) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe can just replace it with:
NewSTM(c, apply, WithAbortContext(ctx), WithIsolation(RepeatableReads))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

etcdEndpoint := flag.String("etcd-endpoint", "http://127.0.0.1:2379",
"comma separated endpoint string for pserver to connect to etcd")
etcdTimeout := flag.Int("etcd-timeout", 5, "timeout for etcd calls")
logLevel := flag.String("log-level", "info", "log level, one of debug")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's because I don't have many experience with leveled log, I don't understand what does "one of debug" means? Maybe we can do something like:
"log level, possible values: debug, info, warning, error, fatal, panic"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, typo error. Fixed.

}
s.registerPserverEtcd()
} // if endpoints != ""
// Bypass etcd registration if no endpoints specified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you put this into the function documentation as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Done.

// wait and set s.desired init value
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := s.etcdClient.Get(ctx, "/ps_desired")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe put "/ps_desired" into a constant, we have 4 places in code (including 2 comments) that reference this string. Making it a constant makes it not possible to have this 4 place out of sync, and it easy if later we want to change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
for _, ev := range resp.Kvs {
log.Debugf("key: %s, value: %s", ev.Key, ev.Value)
if string(ev.Key) == "/ps_desired" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not very familiar with etcd myself, but why here needs to check for the value of key equal to "/ps_desired", we only asked for the key "/ps_desired".
I think it's safe to use resp.Kvs[0] here, since s.etcdClient.Get did not fail. Etcd implementation is directly using it without checking: https://github.com/coreos/etcd/blob/master/clientv3/concurrency/mutex.go#L63

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. Done.


// registerPserverEtcd registers pserver node on etcd using transaction.
func (s *Service) registerPserverEtcd() (*clientv3.TxnResponse, error) {
return concurrency.NewSTMRepeatable(context.TODO(), s.etcdClient, func(c concurrency.STM) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe can just replace it with:
NewSTM(c, apply, WithAbortContext(ctx), WithIsolation(RepeatableReads))

// find the first id and write info
c.Put(psKey, s.externalIP, clientv3.WithLease(resp.ID))
log.Debugf("set pserver node %s with value %s", psKey, s.externalIP)
ch, kaerr := s.etcdClient.KeepAlive(context.TODO(), resp.ID)
Copy link
Contributor

@helinwang helinwang Jun 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the doc for KeepAlive:

// KeepAlive keeps the given lease alive forever.
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)

We are telling etcd to keep the lease alive forever, but we do not persist the lease ID to somewhere, how can we get the lease again (or release the lease) after crash recovery?

Maybe we don't want the lease to be alive forever, so after the current Pserver process dead, all of its keys on etcd will disappear after TTL, so some new Pserver can become this Pserver by setting those keys when they are not set.

Edit: Sorry @typhoonzero I confused myself about this. We do need to call KeepAlive for every lease.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we don't want the lease to be alive forever, so after the current Pserver process dead, all of its keys on etcd will disappear after TTL, so some new Pserver can become this Pserver by setting those keys when they are not set.

This is exactly what we wanted.

return kaerr
}
// FIXME: does this really needed?
go func(ch <-chan *clientv3.LeaseKeepAliveResponse) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need this. Keep alive is managed by the lease automatically, I think doing this is for if we want to know if the lease is still alive (have other way to do it as well: https://github.com/coreos/etcd/blob/24e85b2454c9cc11114367c5c849b26f1b86b0d4/clientv3/concurrency/session.go#L83).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove.

ps := c.Get(psKey)
log.Debugf("got value (%s) for key: %s", ps, psKey)

resp, err := s.etcdClient.Grant(context.TODO(), 5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get a new lease every time setting a key. Another way is to create a session, and every key uses the lease in that session would be easier?
Plus, this function could be retried, if we create a new lease every time, we could end up creating many leases but only using the last one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lease granting should be put under if ps == "" { since each pserver will only register one node under /ps. Will change.


// registerPserverEtcd registers pserver node on etcd using transaction.
func (s *Service) registerPserverEtcd() (*clientv3.TxnResponse, error) {
return concurrency.NewSTMRepeatable(context.TODO(), s.etcdClient, func(c concurrency.STM) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could happen that this function return nil without setting any key when some owner of the key is dead but TTL have not reached. I think we need a flag to track if the function actually sets any key successfully, otherwise wait some time and retry.

@@ -0,0 +1,45 @@
package utils
Copy link
Contributor

@helinwang helinwang Jun 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way is to create a utils folder, but put different utils into packages under the folder: https://github.com/coreos/etcd/tree/master/pkg . Otherwise everything will be all under package utils, different utils could conflict with each other, we don't get the benefit of encapsulation between packages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, utils in go encounters less namespace conflicts than other languages, like we can import like

import (

utils "github.com/PaddlePaddle/Paddle/go/utils"
masterutils "github.com/PaddlePaddle/Paddle/go/master/utils"

)

But indeed the name utils is like data or blob means nothing. Will put to a more meaningful package like in https://github.com/kubernetes/kubernetes/tree/master/pkg/util

Copy link
Contributor

@helinwang helinwang Jun 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I am less worries about package name conflict, more worried about if we have a single utils package, everything gets into that package, and the different code for different purpose may disturb each other. If we put utils of different purpose under different packages inside the util folder, it will not happen. I believe k8s does it as well: https://github.com/kubernetes/kubernetes/tree/master/pkg/util different utils are under different packages under the util folder.

Copy link
Contributor

@helinwang helinwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! One comment, not merge blocker.

time.Sleep(s.etcdTimeout)
continue
}
if len(resp.Kvs) != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

len(resp.Kvs) will always be 1 if err == nil.

Btw, In the previous example https://github.com/coreos/etcd/blob/master/clientv3/concurrency/mutex.go#L64 I asked the coreos develop who wrote this code over IRC, he said checking len(ownerKey) == 0 is not necessary.
You can see from this line ownerKey := resp.Responses[1].GetResponseRange().Kvs: https://github.com/coreos/etcd/blob/master/clientv3/concurrency/mutex.go#L63 , they don't check slice len as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, will fix this in later PRs.

@typhoonzero typhoonzero merged commit 94bfe2b into PaddlePaddle:develop Jun 23, 2017
@typhoonzero typhoonzero deleted the pserver_etcd branch August 11, 2017 06:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants