Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: Use TransferLeadership to make leader demotion safer #1939

Merged
merged 1 commit into from
Feb 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 4 additions & 15 deletions manager/controlapi/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func TestUpdateNode(t *testing.T) {
assert.Error(t, err)
}

func testUpdateNodeDemote(leader bool, t *testing.T) {
func testUpdateNodeDemote(t *testing.T) {
tc := cautils.NewTestCA(nil)
defer tc.Stop()
ts := newTestServer(t)
Expand Down Expand Up @@ -654,14 +654,8 @@ func testUpdateNodeDemote(leader bool, t *testing.T) {
return nil
}))

var demoteNode, lastNode *raftutils.TestNode
if leader {
demoteNode = nodes[1]
lastNode = nodes[2]
} else {
demoteNode = nodes[2]
lastNode = nodes[1]
}
demoteNode := nodes[2]
lastNode := nodes[1]

raftMember = ts.Server.raft.GetMemberByNodeID(demoteNode.SecurityConfig.ClientTLSCreds.NodeID())
assert.NotNil(t, raftMember)
Expand Down Expand Up @@ -734,10 +728,5 @@ func testUpdateNodeDemote(leader bool, t *testing.T) {

func TestUpdateNodeDemote(t *testing.T) {
t.Parallel()
testUpdateNodeDemote(false, t)
}

func TestUpdateNodeDemoteLeader(t *testing.T) {
t.Parallel()
testUpdateNodeDemote(true, t)
testUpdateNodeDemote(t)
}
12 changes: 11 additions & 1 deletion manager/role_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,21 @@ func (rm *roleManager) reconcileRole(node *api.Node) {
rmCtx, rmCancel := context.WithTimeout(rm.ctx, 5*time.Second)
defer rmCancel()

if member.RaftID == rm.raft.Config.ID {
// Don't use rmCtx, because we expect to lose
// leadership, which will cancel this context.
log.L.Info("demoted; transferring leadership")
err := rm.raft.TransferLeadership(context.Background())
if err == nil {
return
}
log.L.WithError(err).Info("failed to transfer leadership")
}
if err := rm.raft.RemoveMember(rmCtx, member.RaftID); err != nil {
// TODO(aaronl): Retry later
log.L.WithError(err).Debugf("can't demote node %s at this time", node.ID)
return
}
return
}

err := rm.store.Update(func(tx store.Tx) error {
Expand Down
87 changes: 62 additions & 25 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
defer conn.Close()
client := api.NewRaftMembershipClient(conn)

joinCtx, joinCancel := context.WithTimeout(ctx, 10*time.Second)
joinCtx, joinCancel := context.WithTimeout(ctx, n.reqTimeout())
defer joinCancel()
resp, err := client.Join(joinCtx, &api.JoinRequest{
Addr: n.opts.Addr,
Expand Down Expand Up @@ -1030,6 +1030,10 @@ func (n *Node) UpdateNode(id uint64, addr string) {
// from a member who is willing to leave its raft
// membership to an active member of the raft
func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResponse, error) {
if req.Node == nil {
return nil, grpc.Errorf(codes.InvalidArgument, "no node information provided")
}

nodeInfo, err := ca.RemoteNode(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1100,18 +1104,58 @@ func (n *Node) removeMember(ctx context.Context, id uint64) error {

n.membershipLock.Lock()
defer n.membershipLock.Unlock()
if n.CanRemoveMember(id) {
cc := raftpb.ConfChange{
ID: id,
Type: raftpb.ConfChangeRemoveNode,
NodeID: id,
Context: []byte(""),
}
err := n.configure(ctx, cc)
return err
if !n.CanRemoveMember(id) {
return ErrCannotRemoveMember
}

return ErrCannotRemoveMember
cc := raftpb.ConfChange{
ID: id,
Type: raftpb.ConfChangeRemoveNode,
NodeID: id,
Context: []byte(""),
}
return n.configure(ctx, cc)
}

// TransferLeadership attempts to transfer leadership to a different node,
// and wait for the transfer to happen.
func (n *Node) TransferLeadership(ctx context.Context) error {
ctx, cancelTransfer := context.WithTimeout(ctx, n.reqTimeout())
defer cancelTransfer()

n.stopMu.RLock()
defer n.stopMu.RUnlock()

if !n.IsMember() {
return ErrNoRaftMember
}

if !n.isLeader() {
return ErrLostLeadership
}

transferee, err := n.transport.LongestActive()
if err != nil {
return errors.Wrap(err, "failed to get longest-active member")
}
start := time.Now()
n.raftNode.TransferLeadership(ctx, n.Config.ID, transferee)
ticker := time.NewTicker(n.opts.TickInterval / 10)
defer ticker.Stop()
var leader uint64
for {
leader = n.leader()
if leader != raft.None && leader != n.Config.ID {
break
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
log.G(ctx).Infof("raft: transfer leadership %x -> %x finished in %v", n.Config.ID, leader, time.Since(start))
return nil
}

// RemoveMember submits a configuration change to remove a member from the raft cluster
Expand Down Expand Up @@ -1726,23 +1770,12 @@ func (n *Node) applyRemoveNode(ctx context.Context, cc raftpb.ConfChange) (err e
}

if cc.NodeID == n.Config.ID {
// wait the commit ack to be sent before closing connection
// wait for the commit ack to be sent before closing connection
n.asyncTasks.Wait()

n.NodeRemoved()
// if there are only 2 nodes in the cluster, and leader is leaving
// before closing the connection, leader has to ensure that follower gets
// noticed about this raft conf change commit. Otherwise, follower would
// assume there are still 2 nodes in the cluster and won't get elected
// into the leader by acquiring the majority (2 nodes)

// while n.asyncTasks.Wait() could be helpful in this case
// it's the best-effort strategy, because this send could be fail due to some errors (such as time limit exceeds)
// TODO(Runshen Zhu): use leadership transfer to solve this case, after vendoring raft 3.0+
} else {
if err := n.transport.RemovePeer(cc.NodeID); err != nil {
return err
}
} else if err := n.transport.RemovePeer(cc.NodeID); err != nil {
return err
}

return n.cluster.RemoveMember(cc.NodeID)
Expand Down Expand Up @@ -1852,3 +1885,7 @@ func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
}
return sids
}

func (n *Node) reqTimeout() time.Duration {
return 5*time.Second + 2*time.Duration(n.Config.ElectionTick)*n.opts.TickInterval
}
13 changes: 13 additions & 0 deletions manager/state/raft/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,19 @@ func (t *Transport) Active(id uint64) bool {
return active
}

// LongestActive returns the ID of the peer that has been active for the longest
// length of time.
func (t *Transport) LongestActive() (uint64, error) {
p, err := t.longestActive()
if err != nil {
return 0, err
}

return p.id, nil
}

// longestActive returns the peer that has been active for the longest length of
// time.
func (t *Transport) longestActive() (*peer, error) {
var longest *peer
var longestTime time.Time
Expand Down