diff --git a/server/api/member_test.go b/server/api/member_test.go index ba05f09b129..820d03febc1 100644 --- a/server/api/member_test.go +++ b/server/api/member_test.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "fmt" - "io" "io/ioutil" "math/rand" "net/http" @@ -211,14 +210,14 @@ func (s *testMemberAPISuite) TestLeaderResign(c *C) { leader1, err := svrs[0].GetLeader() c.Assert(err, IsNil) - s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/leader/resign", nil) + s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/leader/resign", "") leader2 := s.waitLeaderChange(c, svrs[0], leader1) - s.post(c, addrs[leader2.GetMemberId()]+apiPrefix+"/api/v1/leader/transfer/"+leader1.GetName(), nil) + s.post(c, addrs[leader2.GetMemberId()]+apiPrefix+"/api/v1/leader/transfer/"+leader1.GetName(), "") leader3 := s.waitLeaderChange(c, svrs[0], leader2) c.Assert(leader3.GetMemberId(), Equals, leader1.GetMemberId()) } -func (s *testMemberAPISuite) TestEtcdLeaderPriority(c *C) { +func (s *testMemberAPISuite) TestLeaderPriority(c *C) { cfgs, svrs, clean := mustNewCluster(c, 3) defer clean() @@ -229,17 +228,24 @@ func (s *testMemberAPISuite) TestEtcdLeaderPriority(c *C) { leader1, err := s.getEtcdLeader(svrs[0]) c.Assert(err, IsNil) - s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), bytes.NewBufferString(`{"leader-priority": -1}`)) + s.waitLeaderSync(c, svrs[0], leader1) + s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), `{"leader-priority": -1}`) leader2 := s.waitEtcdLeaderChange(c, svrs[0], leader1) - s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), bytes.NewBufferString(`{"leader-priority": 100}`)) + s.waitLeaderSync(c, svrs[0], leader2) + s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), `{"leader-priority": 100}`) leader3 := s.waitEtcdLeaderChange(c, svrs[0], leader2) + s.waitLeaderSync(c, svrs[0], leader3) c.Assert(leader3.GetMemberId(), Equals, leader1.GetMemberId()) } -func (s *testMemberAPISuite) post(c *C, url string, body io.Reader) { +func (s *testMemberAPISuite) post(c *C, url string, body string) { testutil.WaitUntil(c, func(c *C) bool { - res, err := http.Post(url, "", body) + res, err := http.Post(url, "", bytes.NewBufferString(body)) c.Assert(err, IsNil) + b, err := ioutil.ReadAll(res.Body) + res.Body.Close() + c.Assert(err, IsNil) + c.Logf("post %s, status: %v res: %s", url, res.StatusCode, string(b)) return res.StatusCode == http.StatusOK }) } @@ -288,3 +294,15 @@ func (s *testMemberAPISuite) waitEtcdLeaderChange(c *C, svr *server.Server, old }) return leader } + +func (s *testMemberAPISuite) waitLeaderSync(c *C, svr *server.Server, etcdLeader *pdpb.Member) { + testutil.WaitUntil(c, func(c *C) bool { + leader, err := svr.GetLeader() + if err != nil { + c.Logf("GetLeader err: %v", err) + return false + } + c.Logf("leader is %v", leader.GetMemberId()) + return leader.GetMemberId() == etcdLeader.GetMemberId() + }) +} diff --git a/server/leader.go b/server/leader.go index 39d24882c00..6a103536edb 100644 --- a/server/leader.go +++ b/server/leader.go @@ -31,9 +31,8 @@ import ( ) var ( - errNoLeader = errors.New("no leader") - resignLeaderTimeout = time.Second * 5 - nextLeaderTTL = 10 // in seconds + errNoLeader = errors.New("no leader") + nextLeaderTTL = 10 // in seconds ) // IsLeader returns whether server is leader or not. @@ -101,6 +100,7 @@ func (s *Server) leaderLoop() { etcdLeader := s.etcd.Server.Lead() if etcdLeader != s.ID() { + log.Infof("%v is not etcd leader, skip campaign leader and check later", s.Name()) time.Sleep(200 * time.Millisecond) continue } @@ -282,9 +282,11 @@ func (s *Server) campaignLeader() error { if err = s.updateTimestamp(); err != nil { return errors.Trace(err) } - case <-s.resignCh: - log.Infof("%s resigns leadership", s.Name()) - return nil + etcdLeader := s.etcd.Server.Lead() + if etcdLeader != s.ID() { + log.Infof("etcd leader changed, %s resigns leadership", s.Name()) + return nil + } case <-ctx.Done(): return errors.New("server closed") } @@ -343,16 +345,7 @@ func (s *Server) ResignLeader(nextLeader string) error { nextLeaderID := leaderIDs[rand.Intn(len(leaderIDs))] log.Infof("%s ready to resign leader, next leader: %v", s.Name(), nextLeaderID) err = s.etcd.Server.MoveLeader(s.leaderLoopCtx, s.ID(), nextLeaderID) - if err != nil { - return errors.Trace(err) - } - // Resign leader. - select { - case s.resignCh <- struct{}{}: - return nil - case <-time.After(resignLeaderTimeout): - return errors.Errorf("failed to send resign signal, maybe not leader") - } + return errors.Trace(err) } func (s *Server) deleteLeaderKey() error { diff --git a/server/server.go b/server/server.go index 4daae1c2088..bf10eb42551 100644 --- a/server/server.go +++ b/server/server.go @@ -87,8 +87,6 @@ type Server struct { // For tso, set after pd becomes leader. ts atomic.Value lastSavedTime time.Time - // For resign notify. - resignCh chan struct{} // For async region heartbeat. hbStreams *heartbeatStreams } @@ -101,7 +99,6 @@ func CreateServer(cfg *Config, apiRegister func(*Server) http.Handler) (*Server, s := &Server{ cfg: cfg, scheduleOpt: newScheduleOption(cfg), - resignCh: make(chan struct{}), } s.handler = newHandler(s)