From 8ee9498dd9f1bb1904e26c43c775617ff07e40da Mon Sep 17 00:00:00 2001 From: disksing Date: Fri, 27 Apr 2018 11:19:14 +0800 Subject: [PATCH 1/5] server: resign pd leader when it is not same as etcd leader. --- server/api/member_test.go | 13 ++++++++++++- server/leader.go | 5 +++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/server/api/member_test.go b/server/api/member_test.go index ba05f09b129..ff3cbd651db 100644 --- a/server/api/member_test.go +++ b/server/api/member_test.go @@ -218,7 +218,7 @@ func (s *testMemberAPISuite) TestLeaderResign(c *C) { c.Assert(leader3.GetMemberId(), Equals, leader1.GetMemberId()) } -func (s *testMemberAPISuite) TestEtcdLeaderPriority(c *C) { +func (s *testMemberAPISuite) TestLeaderPriority(c *C) { cfgs, svrs, clean := mustNewCluster(c, 3) defer clean() @@ -229,10 +229,13 @@ func (s *testMemberAPISuite) TestEtcdLeaderPriority(c *C) { leader1, err := s.getEtcdLeader(svrs[0]) c.Assert(err, IsNil) + s.waitLeaderSync(c, svrs[0], leader1) s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), bytes.NewBufferString(`{"leader-priority": -1}`)) leader2 := s.waitEtcdLeaderChange(c, svrs[0], leader1) + s.waitLeaderSync(c, svrs[0], leader2) s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), bytes.NewBufferString(`{"leader-priority": 100}`)) leader3 := s.waitEtcdLeaderChange(c, svrs[0], leader2) + s.waitLeaderSync(c, svrs[0], leader3) c.Assert(leader3.GetMemberId(), Equals, leader1.GetMemberId()) } @@ -288,3 +291,11 @@ func (s *testMemberAPISuite) waitEtcdLeaderChange(c *C, svr *server.Server, old }) return leader } + +func (s *testMemberAPISuite) waitLeaderSync(c *C, svr *server.Server, etcdLeader *pdpb.Member) { + testutil.WaitUntil(c, func(c *C) bool { + leader, err := svr.GetLeader() + c.Assert(err, IsNil) + return leader.GetMemberId() == etcdLeader.GetMemberId() + }) +} diff --git a/server/leader.go b/server/leader.go index 39d24882c00..bc200bce650 100644 --- a/server/leader.go +++ b/server/leader.go @@ -282,6 +282,11 @@ func (s *Server) campaignLeader() error { if err = s.updateTimestamp(); err != nil { return errors.Trace(err) } + etcdLeader := s.etcd.Server.Lead() + if etcdLeader != s.ID() { + log.Info("etcd leader changed, %s resigns leadership", s.Name()) + return nil + } case <-s.resignCh: log.Infof("%s resigns leadership", s.Name()) return nil From 684467c1aaa6a1caa363b9466d7ca5cc84faab3d Mon Sep 17 00:00:00 2001 From: disksing Date: Wed, 2 May 2018 11:00:08 +0800 Subject: [PATCH 2/5] server: simplify leader resign. --- server/api/member_test.go | 1 + server/leader.go | 21 ++++----------------- server/server.go | 3 --- 3 files changed, 5 insertions(+), 20 deletions(-) diff --git a/server/api/member_test.go b/server/api/member_test.go index ff3cbd651db..7bdf18309f0 100644 --- a/server/api/member_test.go +++ b/server/api/member_test.go @@ -242,6 +242,7 @@ func (s *testMemberAPISuite) TestLeaderPriority(c *C) { func (s *testMemberAPISuite) post(c *C, url string, body io.Reader) { testutil.WaitUntil(c, func(c *C) bool { res, err := http.Post(url, "", body) + c.Logf("post res: %v, err: %v", res.StatusCode, err) c.Assert(err, IsNil) return res.StatusCode == http.StatusOK }) diff --git a/server/leader.go b/server/leader.go index bc200bce650..46b6002ae45 100644 --- a/server/leader.go +++ b/server/leader.go @@ -31,9 +31,8 @@ import ( ) var ( - errNoLeader = errors.New("no leader") - resignLeaderTimeout = time.Second * 5 - nextLeaderTTL = 10 // in seconds + errNoLeader = errors.New("no leader") + nextLeaderTTL = 10 // in seconds ) // IsLeader returns whether server is leader or not. @@ -284,12 +283,9 @@ func (s *Server) campaignLeader() error { } etcdLeader := s.etcd.Server.Lead() if etcdLeader != s.ID() { - log.Info("etcd leader changed, %s resigns leadership", s.Name()) + log.Infof("etcd leader changed, %s resigns leadership", s.Name()) return nil } - case <-s.resignCh: - log.Infof("%s resigns leadership", s.Name()) - return nil case <-ctx.Done(): return errors.New("server closed") } @@ -348,16 +344,7 @@ func (s *Server) ResignLeader(nextLeader string) error { nextLeaderID := leaderIDs[rand.Intn(len(leaderIDs))] log.Infof("%s ready to resign leader, next leader: %v", s.Name(), nextLeaderID) err = s.etcd.Server.MoveLeader(s.leaderLoopCtx, s.ID(), nextLeaderID) - if err != nil { - return errors.Trace(err) - } - // Resign leader. - select { - case s.resignCh <- struct{}{}: - return nil - case <-time.After(resignLeaderTimeout): - return errors.Errorf("failed to send resign signal, maybe not leader") - } + return errors.Trace(err) } func (s *Server) deleteLeaderKey() error { diff --git a/server/server.go b/server/server.go index 4daae1c2088..bf10eb42551 100644 --- a/server/server.go +++ b/server/server.go @@ -87,8 +87,6 @@ type Server struct { // For tso, set after pd becomes leader. ts atomic.Value lastSavedTime time.Time - // For resign notify. - resignCh chan struct{} // For async region heartbeat. hbStreams *heartbeatStreams } @@ -101,7 +99,6 @@ func CreateServer(cfg *Config, apiRegister func(*Server) http.Handler) (*Server, s := &Server{ cfg: cfg, scheduleOpt: newScheduleOption(cfg), - resignCh: make(chan struct{}), } s.handler = newHandler(s) From 1aa21a78f3ac2b83f286eaa081581736d32dd4d5 Mon Sep 17 00:00:00 2001 From: disksing Date: Wed, 2 May 2018 12:02:59 +0800 Subject: [PATCH 3/5] *: update log. --- server/api/member_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/api/member_test.go b/server/api/member_test.go index 7bdf18309f0..e226b5abacb 100644 --- a/server/api/member_test.go +++ b/server/api/member_test.go @@ -242,8 +242,11 @@ func (s *testMemberAPISuite) TestLeaderPriority(c *C) { func (s *testMemberAPISuite) post(c *C, url string, body io.Reader) { testutil.WaitUntil(c, func(c *C) bool { res, err := http.Post(url, "", body) - c.Logf("post res: %v, err: %v", res.StatusCode, err) c.Assert(err, IsNil) + b, err := ioutil.ReadAll(res.Body) + res.Body.Close() + c.Assert(err, IsNil) + c.Logf("post %s, status: %v res: %s", url, res.StatusCode, string(b)) return res.StatusCode == http.StatusOK }) } From fa15f04647479f60e58e9d0ca556019dd24bc589 Mon Sep 17 00:00:00 2001 From: disksing Date: Wed, 2 May 2018 15:44:20 +0800 Subject: [PATCH 4/5] *: fix test. --- server/api/member_test.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/server/api/member_test.go b/server/api/member_test.go index e226b5abacb..820d03febc1 100644 --- a/server/api/member_test.go +++ b/server/api/member_test.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "fmt" - "io" "io/ioutil" "math/rand" "net/http" @@ -211,9 +210,9 @@ func (s *testMemberAPISuite) TestLeaderResign(c *C) { leader1, err := svrs[0].GetLeader() c.Assert(err, IsNil) - s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/leader/resign", nil) + s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/leader/resign", "") leader2 := s.waitLeaderChange(c, svrs[0], leader1) - s.post(c, addrs[leader2.GetMemberId()]+apiPrefix+"/api/v1/leader/transfer/"+leader1.GetName(), nil) + s.post(c, addrs[leader2.GetMemberId()]+apiPrefix+"/api/v1/leader/transfer/"+leader1.GetName(), "") leader3 := s.waitLeaderChange(c, svrs[0], leader2) c.Assert(leader3.GetMemberId(), Equals, leader1.GetMemberId()) } @@ -230,18 +229,18 @@ func (s *testMemberAPISuite) TestLeaderPriority(c *C) { leader1, err := s.getEtcdLeader(svrs[0]) c.Assert(err, IsNil) s.waitLeaderSync(c, svrs[0], leader1) - s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), bytes.NewBufferString(`{"leader-priority": -1}`)) + s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), `{"leader-priority": -1}`) leader2 := s.waitEtcdLeaderChange(c, svrs[0], leader1) s.waitLeaderSync(c, svrs[0], leader2) - s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), bytes.NewBufferString(`{"leader-priority": 100}`)) + s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), `{"leader-priority": 100}`) leader3 := s.waitEtcdLeaderChange(c, svrs[0], leader2) s.waitLeaderSync(c, svrs[0], leader3) c.Assert(leader3.GetMemberId(), Equals, leader1.GetMemberId()) } -func (s *testMemberAPISuite) post(c *C, url string, body io.Reader) { +func (s *testMemberAPISuite) post(c *C, url string, body string) { testutil.WaitUntil(c, func(c *C) bool { - res, err := http.Post(url, "", body) + res, err := http.Post(url, "", bytes.NewBufferString(body)) c.Assert(err, IsNil) b, err := ioutil.ReadAll(res.Body) res.Body.Close() @@ -299,7 +298,11 @@ func (s *testMemberAPISuite) waitEtcdLeaderChange(c *C, svr *server.Server, old func (s *testMemberAPISuite) waitLeaderSync(c *C, svr *server.Server, etcdLeader *pdpb.Member) { testutil.WaitUntil(c, func(c *C) bool { leader, err := svr.GetLeader() - c.Assert(err, IsNil) + if err != nil { + c.Logf("GetLeader err: %v", err) + return false + } + c.Logf("leader is %v", leader.GetMemberId()) return leader.GetMemberId() == etcdLeader.GetMemberId() }) } From 13cc73fd555978c9b69859e1a361d49fe65725c2 Mon Sep 17 00:00:00 2001 From: disksing Date: Wed, 2 May 2018 19:44:27 +0800 Subject: [PATCH 5/5] *: add log. --- server/leader.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/leader.go b/server/leader.go index 46b6002ae45..6a103536edb 100644 --- a/server/leader.go +++ b/server/leader.go @@ -100,6 +100,7 @@ func (s *Server) leaderLoop() { etcdLeader := s.etcd.Server.Lead() if etcdLeader != s.ID() { + log.Infof("%v is not etcd leader, skip campaign leader and check later", s.Name()) time.Sleep(200 * time.Millisecond) continue }