Skip to content

Commit

Permalink
server: resign pd leader when it is not same as etcd leader. (#1039)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored May 2, 2018
1 parent d64d252 commit 00a31f0
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 27 deletions.
34 changes: 26 additions & 8 deletions server/api/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
Expand Down Expand Up @@ -211,14 +210,14 @@ func (s *testMemberAPISuite) TestLeaderResign(c *C) {
leader1, err := svrs[0].GetLeader()
c.Assert(err, IsNil)

s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/leader/resign", nil)
s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/leader/resign", "")
leader2 := s.waitLeaderChange(c, svrs[0], leader1)
s.post(c, addrs[leader2.GetMemberId()]+apiPrefix+"/api/v1/leader/transfer/"+leader1.GetName(), nil)
s.post(c, addrs[leader2.GetMemberId()]+apiPrefix+"/api/v1/leader/transfer/"+leader1.GetName(), "")
leader3 := s.waitLeaderChange(c, svrs[0], leader2)
c.Assert(leader3.GetMemberId(), Equals, leader1.GetMemberId())
}

func (s *testMemberAPISuite) TestEtcdLeaderPriority(c *C) {
func (s *testMemberAPISuite) TestLeaderPriority(c *C) {
cfgs, svrs, clean := mustNewCluster(c, 3)
defer clean()

Expand All @@ -229,17 +228,24 @@ func (s *testMemberAPISuite) TestEtcdLeaderPriority(c *C) {

leader1, err := s.getEtcdLeader(svrs[0])
c.Assert(err, IsNil)
s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), bytes.NewBufferString(`{"leader-priority": -1}`))
s.waitLeaderSync(c, svrs[0], leader1)
s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), `{"leader-priority": -1}`)
leader2 := s.waitEtcdLeaderChange(c, svrs[0], leader1)
s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), bytes.NewBufferString(`{"leader-priority": 100}`))
s.waitLeaderSync(c, svrs[0], leader2)
s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), `{"leader-priority": 100}`)
leader3 := s.waitEtcdLeaderChange(c, svrs[0], leader2)
s.waitLeaderSync(c, svrs[0], leader3)
c.Assert(leader3.GetMemberId(), Equals, leader1.GetMemberId())
}

func (s *testMemberAPISuite) post(c *C, url string, body io.Reader) {
func (s *testMemberAPISuite) post(c *C, url string, body string) {
testutil.WaitUntil(c, func(c *C) bool {
res, err := http.Post(url, "", body)
res, err := http.Post(url, "", bytes.NewBufferString(body))
c.Assert(err, IsNil)
b, err := ioutil.ReadAll(res.Body)
res.Body.Close()
c.Assert(err, IsNil)
c.Logf("post %s, status: %v res: %s", url, res.StatusCode, string(b))
return res.StatusCode == http.StatusOK
})
}
Expand Down Expand Up @@ -288,3 +294,15 @@ func (s *testMemberAPISuite) waitEtcdLeaderChange(c *C, svr *server.Server, old
})
return leader
}

func (s *testMemberAPISuite) waitLeaderSync(c *C, svr *server.Server, etcdLeader *pdpb.Member) {
testutil.WaitUntil(c, func(c *C) bool {
leader, err := svr.GetLeader()
if err != nil {
c.Logf("GetLeader err: %v", err)
return false
}
c.Logf("leader is %v", leader.GetMemberId())
return leader.GetMemberId() == etcdLeader.GetMemberId()
})
}
25 changes: 9 additions & 16 deletions server/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ import (
)

var (
errNoLeader = errors.New("no leader")
resignLeaderTimeout = time.Second * 5
nextLeaderTTL = 10 // in seconds
errNoLeader = errors.New("no leader")
nextLeaderTTL = 10 // in seconds
)

// IsLeader returns whether server is leader or not.
Expand Down Expand Up @@ -101,6 +100,7 @@ func (s *Server) leaderLoop() {

etcdLeader := s.etcd.Server.Lead()
if etcdLeader != s.ID() {
log.Infof("%v is not etcd leader, skip campaign leader and check later", s.Name())
time.Sleep(200 * time.Millisecond)
continue
}
Expand Down Expand Up @@ -282,9 +282,11 @@ func (s *Server) campaignLeader() error {
if err = s.updateTimestamp(); err != nil {
return errors.Trace(err)
}
case <-s.resignCh:
log.Infof("%s resigns leadership", s.Name())
return nil
etcdLeader := s.etcd.Server.Lead()
if etcdLeader != s.ID() {
log.Infof("etcd leader changed, %s resigns leadership", s.Name())
return nil
}
case <-ctx.Done():
return errors.New("server closed")
}
Expand Down Expand Up @@ -343,16 +345,7 @@ func (s *Server) ResignLeader(nextLeader string) error {
nextLeaderID := leaderIDs[rand.Intn(len(leaderIDs))]
log.Infof("%s ready to resign leader, next leader: %v", s.Name(), nextLeaderID)
err = s.etcd.Server.MoveLeader(s.leaderLoopCtx, s.ID(), nextLeaderID)
if err != nil {
return errors.Trace(err)
}
// Resign leader.
select {
case s.resignCh <- struct{}{}:
return nil
case <-time.After(resignLeaderTimeout):
return errors.Errorf("failed to send resign signal, maybe not leader")
}
return errors.Trace(err)
}

func (s *Server) deleteLeaderKey() error {
Expand Down
3 changes: 0 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ type Server struct {
// For tso, set after pd becomes leader.
ts atomic.Value
lastSavedTime time.Time
// For resign notify.
resignCh chan struct{}
// For async region heartbeat.
hbStreams *heartbeatStreams
}
Expand All @@ -101,7 +99,6 @@ func CreateServer(cfg *Config, apiRegister func(*Server) http.Handler) (*Server,
s := &Server{
cfg: cfg,
scheduleOpt: newScheduleOption(cfg),
resignCh: make(chan struct{}),
}
s.handler = newHandler(s)

Expand Down

0 comments on commit 00a31f0

Please sign in to comment.