diff --git a/manager/logbroker/broker.go b/manager/logbroker/broker.go index 3521ca237e..fa07a04455 100644 --- a/manager/logbroker/broker.go +++ b/manager/logbroker/broker.go @@ -173,7 +173,7 @@ func (lb *LogBroker) watchSubscriptions(nodeID string) ([]*subscription, chan ev })) // Grab current subscriptions. - subscriptions := make([]*subscription, 0) + var subscriptions []*subscription for _, s := range lb.registeredSubscriptions { if s.Contains(nodeID) { subscriptions = append(subscriptions, s) diff --git a/manager/manager.go b/manager/manager.go index 69821eeb3c..e446873393 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -271,6 +271,12 @@ func New(config *Config) (*Manager, error) { return m, nil } +// RemovedFromRaft returns a channel that's closed if the manager is removed +// from the raft cluster. This should be used to trigger a manager shutdown. +func (m *Manager) RemovedFromRaft() <-chan struct{} { + return m.raftNode.RemovedFromRaft +} + // Addr returns tcp address on which remote api listens. func (m *Manager) Addr() string { return m.config.RemoteAPI.ListenAddr @@ -395,33 +401,22 @@ func (m *Manager) Run(parent context.Context) error { if err != nil { errCh <- err log.G(ctx).WithError(err).Error("raft node stopped") - m.Stop(ctx) + m.Stop(ctx, false) } }() - returnErr := func(err error) error { - select { - case runErr := <-errCh: - if runErr == raft.ErrMemberRemoved { - return runErr - } - default: - } - return err - } - if err := raft.WaitForLeader(ctx, m.raftNode); err != nil { - return returnErr(err) + return err } c, err := raft.WaitForCluster(ctx, m.raftNode) if err != nil { - return returnErr(err) + return err } raftConfig := c.Spec.Raft if err := m.watchForKEKChanges(ctx); err != nil { - return returnErr(err) + return err } if int(raftConfig.ElectionTick) != m.raftNode.Config.ElectionTick { @@ -439,16 +434,17 @@ func (m *Manager) Run(parent context.Context) error { return nil } m.mu.Unlock() - m.Stop(ctx) + m.Stop(ctx, false) - return returnErr(err) + return err } const stopTimeout = 8 * time.Second // Stop stops the manager. It immediately closes all open connections and -// active RPCs as well as stopping the scheduler. -func (m *Manager) Stop(ctx context.Context) { +// active RPCs as well as stopping the scheduler. If clearData is set, the +// raft logs, snapshots, and keys will be erased. +func (m *Manager) Stop(ctx context.Context, clearData bool) { log.G(ctx).Info("Stopping manager") // It's not safe to start shutting down while the manager is still // starting up. @@ -502,6 +498,9 @@ func (m *Manager) Stop(ctx context.Context) { m.keyManager.Stop() } + if clearData { + m.raftNode.ClearData() + } m.cancelFunc() <-m.raftNode.Done() diff --git a/manager/manager_test.go b/manager/manager_test.go index e34cf716d6..e7211eb154 100644 --- a/manager/manager_test.go +++ b/manager/manager_test.go @@ -185,7 +185,7 @@ func TestManager(t *testing.T) { _, err = client.Heartbeat(context.Background(), &api.HeartbeatRequest{}) assert.Contains(t, grpc.ErrorDesc(err), "removed from swarm") - m.Stop(ctx) + m.Stop(ctx, false) // After stopping we should MAY receive an error from ListenAndServe if // all this happened before WaitForLeader completed, so don't check the @@ -393,7 +393,7 @@ func TestManagerLockUnlock(t *testing.T) { require.NotNil(t, unencryptedDEK) require.Equal(t, currentDEK, unencryptedDEK) - m.Stop(ctx) + m.Stop(ctx, false) // After stopping we should MAY receive an error from ListenAndServe if // all this happened before WaitForLeader completed, so don't check the diff --git a/manager/state/raft/raft.go b/manager/state/raft/raft.go index c825db9616..8b304f61cb 100644 --- a/manager/state/raft/raft.go +++ b/manager/state/raft/raft.go @@ -112,8 +112,8 @@ type Node struct { ticker clock.Ticker doneCh chan struct{} - // removeRaftCh notifies about node deletion from raft cluster - removeRaftCh chan struct{} + // RemovedFromRaft notifies about node deletion from raft cluster + RemovedFromRaft chan struct{} removeRaftFunc func() leadershipBroadcast *watch.Queue @@ -134,6 +134,7 @@ type Node struct { raftLogger *storage.EncryptedRaftLogger keyRotator EncryptionKeyRotator rotationQueued bool + clearData bool waitForAppliedIndex uint64 } @@ -199,7 +200,7 @@ func NewNode(opts NodeOptions) *Node { Logger: cfg.Logger, }, doneCh: make(chan struct{}), - removeRaftCh: make(chan struct{}), + RemovedFromRaft: make(chan struct{}), stopped: make(chan struct{}), leadershipBroadcast: watch.NewQueue(), lastSendToMember: make(map[uint64]chan struct{}), @@ -220,7 +221,8 @@ func NewNode(opts NodeOptions) *Node { var removeRaftOnce sync.Once return func() { removeRaftOnce.Do(func() { - close(n.removeRaftCh) + atomic.StoreUint32(&n.isMember, 0) + close(n.RemovedFromRaft) }) } }(n) @@ -364,6 +366,12 @@ func (n *Node) done() { close(n.doneCh) } +// ClearData tells the raft node to delete its WALs, snapshots, and keys on +// shutdown. +func (n *Node) ClearData() { + n.clearData = true +} + // Run is the main loop for a Raft node, it goes along the state machine, // acting on the messages received from other Raft nodes in the cluster. // @@ -373,13 +381,10 @@ func (n *Node) Run(ctx context.Context) error { ctx = log.WithLogger(ctx, logrus.WithField("raft_id", fmt.Sprintf("%x", n.Config.ID))) ctx, cancel := context.WithCancel(ctx) - // nodeRemoved indicates that node was stopped due its removal. - nodeRemoved := false - defer func() { cancel() n.stop(ctx) - if nodeRemoved { + if n.clearData { // Delete WAL and snapshots, since they are no longer // usable. if err := n.raftLogger.Clear(ctx); err != nil { @@ -536,12 +541,6 @@ func (n *Node) Run(ctx context.Context) error { case n.needsSnapshot(ctx): n.doSnapshot(ctx, n.getCurrentRaftConfig()) } - case <-n.removeRaftCh: - nodeRemoved = true - // If the node was removed from other members, - // send back an error to the caller to start - // the shutdown process. - return ErrMemberRemoved case <-ctx.Done(): return nil } @@ -1240,17 +1239,6 @@ func (n *Node) IsMember() bool { return atomic.LoadUint32(&n.isMember) == 1 } -// canSubmitProposal defines if any more proposals -// could be submitted and processed. -func (n *Node) canSubmitProposal() bool { - select { - case <-n.stopped: - return false - default: - return true - } -} - // Saves a log entry to our Store func (n *Node) saveToStorage( ctx context.Context, @@ -1479,7 +1467,7 @@ type applyResult struct { // shutdown. func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRaftRequest, cb func()) (proto.Message, error) { n.stopMu.RLock() - if !n.canSubmitProposal() { + if !n.IsMember() { n.stopMu.RUnlock() return nil, ErrStopped } diff --git a/manager/state/raft/raft_test.go b/manager/state/raft/raft_test.go index bdd4a8b93f..634034021a 100644 --- a/manager/state/raft/raft_test.go +++ b/manager/state/raft/raft_test.go @@ -7,7 +7,6 @@ import ( "log" "math/rand" "os" - "path/filepath" "reflect" "strconv" "testing" @@ -20,7 +19,6 @@ import ( "golang.org/x/net/context" "github.com/Sirupsen/logrus" - "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/wal" "github.com/docker/swarmkit/api" cautils "github.com/docker/swarmkit/ca/testutils" @@ -341,11 +339,6 @@ func TestRaftLeaderLeave(t *testing.T) { // Wait for election tick raftutils.WaitForCluster(t, clockSource, newCluster) - // Node1's state should be cleared - require.False(t, fileutil.Exist(filepath.Join(nodes[1].StateDir, "snap-v3-encrypted"))) - require.False(t, fileutil.Exist(filepath.Join(nodes[1].StateDir, "wal-v3-encrypted"))) - require.Equal(t, raft.EncryptionKeys{}, nodes[1].KeyRotator.GetKeys()) - // Leader should not be 1 assert.NotEqual(t, nodes[2].Leader(), nodes[1].Config.ID) assert.Equal(t, nodes[2].Leader(), nodes[3].Leader()) diff --git a/node/node.go b/node/node.go index e14a4ab1e1..7177299e7d 100644 --- a/node/node.go +++ b/node/node.go @@ -22,7 +22,6 @@ import ( "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/manager" "github.com/docker/swarmkit/manager/encryption" - "github.com/docker/swarmkit/manager/state/raft" "github.com/docker/swarmkit/remotes" "github.com/docker/swarmkit/xnet" "github.com/pkg/errors" @@ -306,7 +305,18 @@ func (n *Node) run(ctx context.Context) (err error) { go func() { <-agentReady if role == ca.ManagerRole { - <-managerReady + workerRole := make(chan struct{}) + waitRoleCtx, waitRoleCancel := context.WithCancel(ctx) + go func() { + if n.waitRole(waitRoleCtx, ca.WorkerRole) == nil { + close(workerRole) + } + }() + select { + case <-managerReady: + case <-workerRole: + } + waitRoleCancel() } close(n.ready) }() @@ -630,7 +640,7 @@ func (n *Node) waitRole(ctx context.Context, role string) error { return nil } -func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error { +func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}, workerRole <-chan struct{}) error { remoteAddr, _ := n.remotes.Select(n.NodeID()) m, err := manager.New(&manager.Config{ ForceNewCluster: n.config.ForceNewCluster, @@ -655,25 +665,18 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig done := make(chan struct{}) var runErr error go func() { - if err := m.Run(context.Background()); err != nil && err != raft.ErrMemberRemoved { + if err := m.Run(context.Background()); err != nil { runErr = err } close(done) }() - workerRole := make(chan struct{}) - waitRoleCtx, waitRoleCancel := context.WithCancel(ctx) - defer waitRoleCancel() - go func() { - n.waitRole(waitRoleCtx, ca.WorkerRole) - close(workerRole) - }() - + var clearData bool defer func() { n.Lock() n.manager = nil n.Unlock() - m.Stop(ctx) + m.Stop(ctx, clearData) <-done n.setControlSocket(nil) }() @@ -704,33 +707,19 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig } // wait for manager stop or for role change - // if manager stopped before role change, wait for new role for 16 seconds, - // then just restart manager, we might just miss that event. - // we need to wait for role to prevent manager to start again with wrong - // certificate select { case <-done: - timer := time.NewTimer(16 * time.Second) - defer timer.Stop() - select { - case <-timer.C: - log.G(ctx).Warn("failed to get worker role after manager stop, restart manager") - case <-workerRole: - case <-ctx.Done(): - return ctx.Err() - } return runErr case <-workerRole: - log.G(ctx).Info("role changed to worker, wait for manager to stop") - select { - case <-done: - return runErr - case <-ctx.Done(): - return ctx.Err() - } + log.G(ctx).Info("role changed to worker, stopping manager") + clearData = true + case <-m.RemovedFromRaft(): + log.G(ctx).Info("manager removed from raft cluster, stopping manager") + clearData = true case <-ctx.Done(): return ctx.Err() } + return nil } func (n *Node) superviseManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error { @@ -738,9 +727,37 @@ func (n *Node) superviseManager(ctx context.Context, securityConfig *ca.Security if err := n.waitRole(ctx, ca.ManagerRole); err != nil { return err } - if err := n.runManager(ctx, securityConfig, ready); err != nil { + + workerRole := make(chan struct{}) + waitRoleCtx, waitRoleCancel := context.WithCancel(ctx) + go func() { + if n.waitRole(waitRoleCtx, ca.WorkerRole) == nil { + close(workerRole) + } + }() + + if err := n.runManager(ctx, securityConfig, ready, workerRole); err != nil { + waitRoleCancel() return errors.Wrap(err, "manager stopped") } + + // If the manager stopped running and our role is still + // "manager", it's possible that the manager was demoted and + // the agent hasn't realized this yet. We should wait for the + // role to change instead of restarting the manager immediately. + timer := time.NewTimer(16 * time.Second) + select { + case <-timer.C: + log.G(ctx).Warn("failed to get worker role after manager stop, restarting manager") + case <-workerRole: + case <-ctx.Done(): + timer.Stop() + waitRoleCancel() + return ctx.Err() + } + timer.Stop() + waitRoleCancel() + ready = nil } }