Skip to content

Commit

Permalink
[WIP] No automatic manager shutdown on demotion/removal
Browse files Browse the repository at this point in the history
This is an attempt to fix the demotion process. Right now, the agent
finds out about a role change, and independently, the raft node finds
out it's no longer in the cluster, and shuts itself down. This causes
the manager to also shut itself down. This is very error-prone and has
led to a lot of problems. I believe there are corner cases that are not
properly addressed.

This changes things so that raft only signals to the higher level that
the node has been removed. The manager supervision code will shut down
the manager, and wait a certain amount of time for a role change (which
should come through the agent reconnecting to a different manager now
that the local manager is shut down).

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
  • Loading branch information
aaronlehmann committed Dec 22, 2016
1 parent 3c82476 commit 3220c44
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 90 deletions.
2 changes: 1 addition & 1 deletion manager/logbroker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (lb *LogBroker) watchSubscriptions(nodeID string) ([]*subscription, chan ev
}))

// Grab current subscriptions.
subscriptions := make([]*subscription, 0)
var subscriptions []*subscription
for _, s := range lb.registeredSubscriptions {
if s.Contains(nodeID) {
subscriptions = append(subscriptions, s)
Expand Down
35 changes: 16 additions & 19 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@ func New(config *Config) (*Manager, error) {
return m, nil
}

// RemovedFromRaft returns a channel that's closed if the manager is removed
// from the raft cluster. This should be used to trigger a manager shutdown.
func (m *Manager) RemovedFromRaft() <-chan struct{} {
return m.raftNode.RemovedFromRaft
}

// Addr returns tcp address on which remote api listens.
func (m *Manager) Addr() string {
return m.config.RemoteAPI.ListenAddr
Expand Down Expand Up @@ -394,33 +400,22 @@ func (m *Manager) Run(parent context.Context) error {
if err != nil {
errCh <- err
log.G(ctx).WithError(err).Error("raft node stopped")
m.Stop(ctx)
m.Stop(ctx, false)
}
}()

returnErr := func(err error) error {
select {
case runErr := <-errCh:
if runErr == raft.ErrMemberRemoved {
return runErr
}
default:
}
return err
}

if err := raft.WaitForLeader(ctx, m.raftNode); err != nil {
return returnErr(err)
return err
}

c, err := raft.WaitForCluster(ctx, m.raftNode)
if err != nil {
return returnErr(err)
return err
}
raftConfig := c.Spec.Raft

if err := m.watchForKEKChanges(ctx); err != nil {
return returnErr(err)
return err
}

if int(raftConfig.ElectionTick) != m.raftNode.Config.ElectionTick {
Expand All @@ -438,16 +433,17 @@ func (m *Manager) Run(parent context.Context) error {
return nil
}
m.mu.Unlock()
m.Stop(ctx)
m.Stop(ctx, false)

return returnErr(err)
return err
}

const stopTimeout = 8 * time.Second

// Stop stops the manager. It immediately closes all open connections and
// active RPCs as well as stopping the scheduler.
func (m *Manager) Stop(ctx context.Context) {
// active RPCs as well as stopping the scheduler. If clearData is set, the
// raft logs, snapshots, and keys will be erased.
func (m *Manager) Stop(ctx context.Context, clearData bool) {
log.G(ctx).Info("Stopping manager")
// It's not safe to start shutting down while the manager is still
// starting up.
Expand Down Expand Up @@ -498,6 +494,7 @@ func (m *Manager) Stop(ctx context.Context) {
m.keyManager.Stop()
}

m.raftNode.ClearData()
m.cancelFunc()
<-m.raftNode.Done()

Expand Down
4 changes: 2 additions & 2 deletions manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestManager(t *testing.T) {
_, err = client.Heartbeat(context.Background(), &api.HeartbeatRequest{})
assert.Contains(t, grpc.ErrorDesc(err), "removed from swarm")

m.Stop(ctx)
m.Stop(ctx, false)

// After stopping we should MAY receive an error from ListenAndServe if
// all this happened before WaitForLeader completed, so don't check the
Expand Down Expand Up @@ -393,7 +393,7 @@ func TestManagerLockUnlock(t *testing.T) {
require.NotNil(t, unencryptedDEK)
require.Equal(t, currentDEK, unencryptedDEK)

m.Stop(ctx)
m.Stop(ctx, false)

// After stopping we should MAY receive an error from ListenAndServe if
// all this happened before WaitForLeader completed, so don't check the
Expand Down
47 changes: 19 additions & 28 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ type Node struct {

ticker clock.Ticker
doneCh chan struct{}
// removeRaftCh notifies about node deletion from raft cluster
removeRaftCh chan struct{}
// RemovedFromRaft notifies about node deletion from raft cluster
RemovedFromRaft chan struct{}
removeRaftFunc func()
leadershipBroadcast *watch.Queue

Expand All @@ -134,6 +134,7 @@ type Node struct {
raftLogger *storage.EncryptedRaftLogger
keyRotator EncryptionKeyRotator
rotationQueued bool
clearData bool
waitForAppliedIndex uint64
}

Expand Down Expand Up @@ -199,7 +200,7 @@ func NewNode(opts NodeOptions) *Node {
Logger: cfg.Logger,
},
doneCh: make(chan struct{}),
removeRaftCh: make(chan struct{}),
RemovedFromRaft: make(chan struct{}),
stopped: make(chan struct{}),
leadershipBroadcast: watch.NewQueue(),
lastSendToMember: make(map[uint64]chan struct{}),
Expand All @@ -220,7 +221,11 @@ func NewNode(opts NodeOptions) *Node {
var removeRaftOnce sync.Once
return func() {
removeRaftOnce.Do(func() {
close(n.removeRaftCh)
atomic.StoreUint32(&n.isMember, 0)
n.stopMu.Lock()
n.wait.cancelAll()
n.stopMu.Unlock()
close(n.RemovedFromRaft)
})
}
}(n)
Expand Down Expand Up @@ -364,6 +369,12 @@ func (n *Node) done() {
close(n.doneCh)
}

// ClearData tells the raft node to delete its WALs, snapshots, and keys on
// shutdown.
func (n *Node) ClearData() {
n.clearData = true
}

// Run is the main loop for a Raft node, it goes along the state machine,
// acting on the messages received from other Raft nodes in the cluster.
//
Expand All @@ -373,13 +384,10 @@ func (n *Node) Run(ctx context.Context) error {
ctx = log.WithLogger(ctx, logrus.WithField("raft_id", fmt.Sprintf("%x", n.Config.ID)))
ctx, cancel := context.WithCancel(ctx)

// nodeRemoved indicates that node was stopped due its removal.
nodeRemoved := false

defer func() {
cancel()
n.stop(ctx)
if nodeRemoved {
if n.clearData {
// Delete WAL and snapshots, since they are no longer
// usable.
if err := n.raftLogger.Clear(ctx); err != nil {
Expand Down Expand Up @@ -536,12 +544,6 @@ func (n *Node) Run(ctx context.Context) error {
case n.needsSnapshot(ctx):
n.doSnapshot(ctx, n.getCurrentRaftConfig())
}
case <-n.removeRaftCh:
nodeRemoved = true
// If the node was removed from other members,
// send back an error to the caller to start
// the shutdown process.
return ErrMemberRemoved
case <-ctx.Done():
return nil
}
Expand Down Expand Up @@ -1240,17 +1242,6 @@ func (n *Node) IsMember() bool {
return atomic.LoadUint32(&n.isMember) == 1
}

// canSubmitProposal defines if any more proposals
// could be submitted and processed.
func (n *Node) canSubmitProposal() bool {
select {
case <-n.stopped:
return false
default:
return true
}
}

// Saves a log entry to our Store
func (n *Node) saveToStorage(
ctx context.Context,
Expand Down Expand Up @@ -1479,7 +1470,7 @@ type applyResult struct {
// shutdown.
func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRaftRequest, cb func()) (proto.Message, error) {
n.stopMu.RLock()
if !n.canSubmitProposal() {
if !n.IsMember() {
n.stopMu.RUnlock()
return nil, ErrStopped
}
Expand Down Expand Up @@ -1713,11 +1704,11 @@ func (n *Node) applyRemoveNode(ctx context.Context, cc raftpb.ConfChange) (err e
}

if cc.NodeID == n.Config.ID {
n.removeRaftFunc()

// wait the commit ack to be sent before closing connection
n.asyncTasks.Wait()

n.removeRaftFunc()

// if there are only 2 nodes in the cluster, and leader is leaving
// before closing the connection, leader has to ensure that follower gets
// noticed about this raft conf change commit. Otherwise, follower would
Expand Down
7 changes: 0 additions & 7 deletions manager/state/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"log"
"math/rand"
"os"
"path/filepath"
"reflect"
"strconv"
"testing"
Expand All @@ -20,7 +19,6 @@ import (
"golang.org/x/net/context"

"github.com/Sirupsen/logrus"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/wal"
"github.com/docker/swarmkit/api"
cautils "github.com/docker/swarmkit/ca/testutils"
Expand Down Expand Up @@ -341,11 +339,6 @@ func TestRaftLeaderLeave(t *testing.T) {
// Wait for election tick
raftutils.WaitForCluster(t, clockSource, newCluster)

// Node1's state should be cleared
require.False(t, fileutil.Exist(filepath.Join(nodes[1].StateDir, "snap-v3-encrypted")))
require.False(t, fileutil.Exist(filepath.Join(nodes[1].StateDir, "wal-v3-encrypted")))
require.Equal(t, raft.EncryptionKeys{}, nodes[1].KeyRotator.GetKeys())

// Leader should not be 1
assert.NotEqual(t, nodes[2].Leader(), nodes[1].Config.ID)
assert.Equal(t, nodes[2].Leader(), nodes[3].Leader())
Expand Down
71 changes: 38 additions & 33 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager"
"github.com/docker/swarmkit/manager/encryption"
"github.com/docker/swarmkit/manager/state/raft"
"github.com/docker/swarmkit/remotes"
"github.com/docker/swarmkit/xnet"
"github.com/pkg/errors"
Expand Down Expand Up @@ -630,7 +629,7 @@ func (n *Node) waitRole(ctx context.Context, role string) error {
return nil
}

func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}, workerRole <-chan struct{}) error {
remoteAddr, _ := n.remotes.Select(n.NodeID())
m, err := manager.New(&manager.Config{
ForceNewCluster: n.config.ForceNewCluster,
Expand All @@ -655,25 +654,18 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
done := make(chan struct{})
var runErr error
go func() {
if err := m.Run(context.Background()); err != nil && err != raft.ErrMemberRemoved {
if err := m.Run(context.Background()); err != nil {
runErr = err
}
close(done)
}()

workerRole := make(chan struct{})
waitRoleCtx, waitRoleCancel := context.WithCancel(ctx)
defer waitRoleCancel()
go func() {
n.waitRole(waitRoleCtx, ca.WorkerRole)
close(workerRole)
}()

var clearData bool
defer func() {
n.Lock()
n.manager = nil
n.Unlock()
m.Stop(ctx)
m.Stop(ctx, clearData)
<-done
n.setControlSocket(nil)
}()
Expand Down Expand Up @@ -704,43 +696,56 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
}

// wait for manager stop or for role change
// if manager stopped before role change, wait for new role for 16 seconds,
// then just restart manager, we might just miss that event.
// we need to wait for role to prevent manager to start again with wrong
// certificate
select {
case <-done:
timer := time.NewTimer(16 * time.Second)
defer timer.Stop()
select {
case <-timer.C:
log.G(ctx).Warn("failed to get worker role after manager stop, restart manager")
case <-workerRole:
case <-ctx.Done():
return ctx.Err()
}
return runErr
case <-workerRole:
log.G(ctx).Info("role changed to worker, wait for manager to stop")
select {
case <-done:
return runErr
case <-ctx.Done():
return ctx.Err()
}
log.G(ctx).Info("role changed to worker, stopping manager")
clearData = true
case <-m.RemovedFromRaft():
log.G(ctx).Info("manager removed from raft cluster, stopping manager")
clearData = true
case <-ctx.Done():
return ctx.Err()
}
return nil
}

func (n *Node) superviseManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
for {
if err := n.waitRole(ctx, ca.ManagerRole); err != nil {
return err
}
if err := n.runManager(ctx, securityConfig, ready); err != nil {

workerRole := make(chan struct{})
waitRoleCtx, waitRoleCancel := context.WithCancel(ctx)
go func() {
n.waitRole(waitRoleCtx, ca.WorkerRole)
close(workerRole)
}()

if err := n.runManager(ctx, securityConfig, ready, workerRole); err != nil {
waitRoleCancel()
return errors.Wrap(err, "manager stopped")
}

// If the manager stopped running and our role is still
// "manager", it's possible that the manager was demoted and
// the agent hasn't realized this yet. We should wait for the
// role to change instead of restarting the manager immediately.
timer := time.NewTimer(16 * time.Second)
select {
case <-timer.C:
log.G(ctx).Warn("failed to get worker role after manager stop, restarting manager")
case <-workerRole:
case <-ctx.Done():
timer.Stop()
waitRoleCancel()
return ctx.Err()
}
timer.Stop()
waitRoleCancel()

ready = nil
}
}
Expand Down

0 comments on commit 3220c44

Please sign in to comment.