Skip to content

Commit

Permalink
No automatic manager shutdown on demotion/removal
Browse files Browse the repository at this point in the history
This is an attempt to fix the demotion process. Right now, the agent
finds out about a role change, and independently, the raft node finds
out it's no longer in the cluster, and shuts itself down. This causes
the manager to also shut itself down. This is very error-prone and has
led to a lot of problems. I believe there are corner cases that are not
properly addressed.

This changes things so that raft only signals to the higher level that
the node has been removed. The manager supervision code will shut down
the manager, and wait a certain amount of time for a role change (which
should come through the agent reconnecting to a different manager now
that the local manager is shut down).

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
  • Loading branch information
aaronlehmann committed Jan 4, 2017
1 parent 6381c76 commit a2c96b9
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 91 deletions.
2 changes: 1 addition & 1 deletion manager/logbroker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (lb *LogBroker) watchSubscriptions(nodeID string) ([]*subscription, chan ev
}))

// Grab current subscriptions.
subscriptions := make([]*subscription, 0)
var subscriptions []*subscription
for _, s := range lb.registeredSubscriptions {
if s.Contains(nodeID) {
subscriptions = append(subscriptions, s)
Expand Down
37 changes: 18 additions & 19 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,12 @@ func New(config *Config) (*Manager, error) {
return m, nil
}

// RemovedFromRaft returns a channel that's closed if the manager is removed
// from the raft cluster. This should be used to trigger a manager shutdown.
func (m *Manager) RemovedFromRaft() <-chan struct{} {
return m.raftNode.RemovedFromRaft
}

// Addr returns tcp address on which remote api listens.
func (m *Manager) Addr() string {
return m.config.RemoteAPI.ListenAddr
Expand Down Expand Up @@ -395,33 +401,22 @@ func (m *Manager) Run(parent context.Context) error {
if err != nil {
errCh <- err
log.G(ctx).WithError(err).Error("raft node stopped")
m.Stop(ctx)
m.Stop(ctx, false)
}
}()

returnErr := func(err error) error {
select {
case runErr := <-errCh:
if runErr == raft.ErrMemberRemoved {
return runErr
}
default:
}
return err
}

if err := raft.WaitForLeader(ctx, m.raftNode); err != nil {
return returnErr(err)
return err
}

c, err := raft.WaitForCluster(ctx, m.raftNode)
if err != nil {
return returnErr(err)
return err
}
raftConfig := c.Spec.Raft

if err := m.watchForKEKChanges(ctx); err != nil {
return returnErr(err)
return err
}

if int(raftConfig.ElectionTick) != m.raftNode.Config.ElectionTick {
Expand All @@ -439,16 +434,17 @@ func (m *Manager) Run(parent context.Context) error {
return nil
}
m.mu.Unlock()
m.Stop(ctx)
m.Stop(ctx, false)

return returnErr(err)
return err
}

const stopTimeout = 8 * time.Second

// Stop stops the manager. It immediately closes all open connections and
// active RPCs as well as stopping the scheduler.
func (m *Manager) Stop(ctx context.Context) {
// active RPCs as well as stopping the scheduler. If clearData is set, the
// raft logs, snapshots, and keys will be erased.
func (m *Manager) Stop(ctx context.Context, clearData bool) {
log.G(ctx).Info("Stopping manager")
// It's not safe to start shutting down while the manager is still
// starting up.
Expand Down Expand Up @@ -502,6 +498,9 @@ func (m *Manager) Stop(ctx context.Context) {
m.keyManager.Stop()
}

if clearData {
m.raftNode.ClearData()
}
m.cancelFunc()
<-m.raftNode.Done()

Expand Down
4 changes: 2 additions & 2 deletions manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestManager(t *testing.T) {
_, err = client.Heartbeat(context.Background(), &api.HeartbeatRequest{})
assert.Contains(t, grpc.ErrorDesc(err), "removed from swarm")

m.Stop(ctx)
m.Stop(ctx, false)

// After stopping we should MAY receive an error from ListenAndServe if
// all this happened before WaitForLeader completed, so don't check the
Expand Down Expand Up @@ -393,7 +393,7 @@ func TestManagerLockUnlock(t *testing.T) {
require.NotNil(t, unencryptedDEK)
require.Equal(t, currentDEK, unencryptedDEK)

m.Stop(ctx)
m.Stop(ctx, false)

// After stopping we should MAY receive an error from ListenAndServe if
// all this happened before WaitForLeader completed, so don't check the
Expand Down
44 changes: 16 additions & 28 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ type Node struct {

ticker clock.Ticker
doneCh chan struct{}
// removeRaftCh notifies about node deletion from raft cluster
removeRaftCh chan struct{}
// RemovedFromRaft notifies about node deletion from raft cluster
RemovedFromRaft chan struct{}
removeRaftFunc func()
leadershipBroadcast *watch.Queue

Expand All @@ -134,6 +134,7 @@ type Node struct {
raftLogger *storage.EncryptedRaftLogger
keyRotator EncryptionKeyRotator
rotationQueued bool
clearData bool
waitForAppliedIndex uint64
}

Expand Down Expand Up @@ -199,7 +200,7 @@ func NewNode(opts NodeOptions) *Node {
Logger: cfg.Logger,
},
doneCh: make(chan struct{}),
removeRaftCh: make(chan struct{}),
RemovedFromRaft: make(chan struct{}),
stopped: make(chan struct{}),
leadershipBroadcast: watch.NewQueue(),
lastSendToMember: make(map[uint64]chan struct{}),
Expand All @@ -220,7 +221,8 @@ func NewNode(opts NodeOptions) *Node {
var removeRaftOnce sync.Once
return func() {
removeRaftOnce.Do(func() {
close(n.removeRaftCh)
atomic.StoreUint32(&n.isMember, 0)
close(n.RemovedFromRaft)
})
}
}(n)
Expand Down Expand Up @@ -364,6 +366,12 @@ func (n *Node) done() {
close(n.doneCh)
}

// ClearData tells the raft node to delete its WALs, snapshots, and keys on
// shutdown.
func (n *Node) ClearData() {
n.clearData = true
}

// Run is the main loop for a Raft node, it goes along the state machine,
// acting on the messages received from other Raft nodes in the cluster.
//
Expand All @@ -373,13 +381,10 @@ func (n *Node) Run(ctx context.Context) error {
ctx = log.WithLogger(ctx, logrus.WithField("raft_id", fmt.Sprintf("%x", n.Config.ID)))
ctx, cancel := context.WithCancel(ctx)

// nodeRemoved indicates that node was stopped due its removal.
nodeRemoved := false

defer func() {
cancel()
n.stop(ctx)
if nodeRemoved {
if n.clearData {
// Delete WAL and snapshots, since they are no longer
// usable.
if err := n.raftLogger.Clear(ctx); err != nil {
Expand Down Expand Up @@ -536,12 +541,6 @@ func (n *Node) Run(ctx context.Context) error {
case n.needsSnapshot(ctx):
n.doSnapshot(ctx, n.getCurrentRaftConfig())
}
case <-n.removeRaftCh:
nodeRemoved = true
// If the node was removed from other members,
// send back an error to the caller to start
// the shutdown process.
return ErrMemberRemoved
case <-ctx.Done():
return nil
}
Expand Down Expand Up @@ -1240,17 +1239,6 @@ func (n *Node) IsMember() bool {
return atomic.LoadUint32(&n.isMember) == 1
}

// canSubmitProposal defines if any more proposals
// could be submitted and processed.
func (n *Node) canSubmitProposal() bool {
select {
case <-n.stopped:
return false
default:
return true
}
}

// Saves a log entry to our Store
func (n *Node) saveToStorage(
ctx context.Context,
Expand Down Expand Up @@ -1479,7 +1467,7 @@ type applyResult struct {
// shutdown.
func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRaftRequest, cb func()) (proto.Message, error) {
n.stopMu.RLock()
if !n.canSubmitProposal() {
if !n.IsMember() {
n.stopMu.RUnlock()
return nil, ErrStopped
}
Expand Down Expand Up @@ -1713,11 +1701,11 @@ func (n *Node) applyRemoveNode(ctx context.Context, cc raftpb.ConfChange) (err e
}

if cc.NodeID == n.Config.ID {
n.removeRaftFunc()

// wait the commit ack to be sent before closing connection
n.asyncTasks.Wait()

n.removeRaftFunc()

// if there are only 2 nodes in the cluster, and leader is leaving
// before closing the connection, leader has to ensure that follower gets
// noticed about this raft conf change commit. Otherwise, follower would
Expand Down
7 changes: 0 additions & 7 deletions manager/state/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"log"
"math/rand"
"os"
"path/filepath"
"reflect"
"strconv"
"testing"
Expand All @@ -20,7 +19,6 @@ import (
"golang.org/x/net/context"

"github.com/Sirupsen/logrus"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/wal"
"github.com/docker/swarmkit/api"
cautils "github.com/docker/swarmkit/ca/testutils"
Expand Down Expand Up @@ -341,11 +339,6 @@ func TestRaftLeaderLeave(t *testing.T) {
// Wait for election tick
raftutils.WaitForCluster(t, clockSource, newCluster)

// Node1's state should be cleared
require.False(t, fileutil.Exist(filepath.Join(nodes[1].StateDir, "snap-v3-encrypted")))
require.False(t, fileutil.Exist(filepath.Join(nodes[1].StateDir, "wal-v3-encrypted")))
require.Equal(t, raft.EncryptionKeys{}, nodes[1].KeyRotator.GetKeys())

// Leader should not be 1
assert.NotEqual(t, nodes[2].Leader(), nodes[1].Config.ID)
assert.Equal(t, nodes[2].Leader(), nodes[3].Leader())
Expand Down
Loading

0 comments on commit a2c96b9

Please sign in to comment.