Skip to content

Commit

Permalink
raft: use transport package
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Morozov <lk4d4math@gmail.com>
  • Loading branch information
LK4D4 committed Dec 27, 2016
1 parent bff7007 commit 8bc8c79
Show file tree
Hide file tree
Showing 12 changed files with 363 additions and 575 deletions.
3 changes: 2 additions & 1 deletion manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/docker/swarmkit/manager/scheduler"
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/raft"
"github.com/docker/swarmkit/manager/state/raft/membership"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/protobuf/ptypes"
"github.com/docker/swarmkit/remotes"
Expand Down Expand Up @@ -401,7 +402,7 @@ func (m *Manager) Run(parent context.Context) error {
returnErr := func(err error) error {
select {
case runErr := <-errCh:
if runErr == raft.ErrMemberRemoved {
if runErr == membership.ErrMemberRemoved {
return runErr
}
default:
Expand Down
196 changes: 6 additions & 190 deletions manager/state/raft/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@ package membership

import (
"errors"
"fmt"
"sync"

"google.golang.org/grpc"

"github.com/coreos/etcd/raft/raftpb"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/watch"
"github.com/gogo/protobuf/proto"
"golang.org/x/net/context"
)

var (
Expand All @@ -29,99 +25,35 @@ var (
ErrMemberRemoved = errors.New("raft: member was removed from the cluster")
)

// deferredConn used to store removed members connection for some time.
// We need this in case if removed node is redirector or endpoint of ControlAPI call.
type deferredConn struct {
tick int
conn *grpc.ClientConn
}

// Cluster represents a set of active
// raft Members
type Cluster struct {
mu sync.RWMutex
members map[uint64]*Member
deferedConns map[*deferredConn]struct{}
mu sync.RWMutex
members map[uint64]*Member

// removed contains the list of removed Members,
// those ids cannot be reused
removed map[uint64]bool
heartbeatTicks int
removed map[uint64]bool

PeersBroadcast *watch.Queue
}

// Member represents a raft Cluster Member
type Member struct {
*api.RaftMember

Conn *grpc.ClientConn
tick int
active bool
lastSeenHost string
}

// HealthCheck sends a health check RPC to the member and returns the response.
func (member *Member) HealthCheck(ctx context.Context) error {
healthClient := api.NewHealthClient(member.Conn)
resp, err := healthClient.Check(ctx, &api.HealthCheckRequest{Service: "Raft"})
if err != nil {
return err
}
if resp.Status != api.HealthCheckResponse_SERVING {
return fmt.Errorf("health check returned status %s", resp.Status.String())
}
return nil
}

// NewCluster creates a new Cluster neighbors list for a raft Member.
// Member marked as inactive if there was no call ReportActive for heartbeatInterval.
func NewCluster(heartbeatTicks int) *Cluster {
func NewCluster() *Cluster {
// TODO(abronan): generate Cluster ID for federation

return &Cluster{
members: make(map[uint64]*Member),
removed: make(map[uint64]bool),
deferedConns: make(map[*deferredConn]struct{}),
heartbeatTicks: heartbeatTicks,
PeersBroadcast: watch.NewQueue(),
}
}

func (c *Cluster) handleInactive() {
for _, m := range c.members {
if !m.active {
continue
}
m.tick++
if m.tick > c.heartbeatTicks {
m.active = false
if m.Conn != nil {
m.Conn.Close()
}
}
}
}

func (c *Cluster) handleDeferredConns() {
for dc := range c.deferedConns {
dc.tick++
if dc.tick > c.heartbeatTicks {
dc.conn.Close()
delete(c.deferedConns, dc)
}
}
}

// Tick increases ticks for all members. After heartbeatTicks node marked as
// inactive.
func (c *Cluster) Tick() {
c.mu.Lock()
defer c.mu.Unlock()
c.handleInactive()
c.handleDeferredConns()
}

// Members returns the list of raft Members in the Cluster.
func (c *Cluster) Members() map[uint64]*Member {
members := make(map[uint64]*Member)
Expand Down Expand Up @@ -170,8 +102,6 @@ func (c *Cluster) AddMember(member *Member) error {
if c.removed[member.RaftID] {
return ErrIDRemoved
}
member.active = true
member.tick = 0

c.members[member.RaftID] = member

Expand Down Expand Up @@ -199,45 +129,10 @@ func (c *Cluster) ClearMember(id uint64) error {
}

func (c *Cluster) clearMember(id uint64) error {
m, ok := c.members[id]
if ok {
if m.Conn != nil {
// defer connection close to after heartbeatTicks
dConn := &deferredConn{conn: m.Conn}
c.deferedConns[dConn] = struct{}{}
}
if _, ok := c.members[id]; ok {
delete(c.members, id)
c.broadcastUpdate()
}
c.broadcastUpdate()
return nil
}

// ReplaceMemberConnection replaces the member's GRPC connection.
func (c *Cluster) ReplaceMemberConnection(id uint64, oldConn *Member, newConn *Member, newAddr string, force bool) error {
c.mu.Lock()
defer c.mu.Unlock()

oldMember, ok := c.members[id]
if !ok {
return ErrIDNotFound
}

if !force && oldConn.Conn != oldMember.Conn {
// The connection was already replaced. Don't do it again.
newConn.Conn.Close()
return nil
}

if oldMember.Conn != nil {
oldMember.Conn.Close()
}

newMember := *oldMember
newMember.RaftMember = oldMember.RaftMember.Copy()
newMember.RaftMember.Addr = newAddr
newMember.Conn = newConn.Conn
c.members[id] = &newMember

return nil
}

Expand All @@ -251,60 +146,12 @@ func (c *Cluster) IsIDRemoved(id uint64) bool {
// Clear resets the list of active Members and removed Members.
func (c *Cluster) Clear() {
c.mu.Lock()
for _, member := range c.members {
if member.Conn != nil {
member.Conn.Close()
}
}

for dc := range c.deferedConns {
dc.conn.Close()
}

c.members = make(map[uint64]*Member)
c.removed = make(map[uint64]bool)
c.deferedConns = make(map[*deferredConn]struct{})
c.mu.Unlock()
}

// ReportActive reports that member is active (called ProcessRaftMessage),
func (c *Cluster) ReportActive(id uint64, sourceHost string) {
c.mu.Lock()
defer c.mu.Unlock()
m, ok := c.members[id]
if !ok {
return
}
m.tick = 0
m.active = true
if sourceHost != "" {
m.lastSeenHost = sourceHost
}
}

// Active returns true if node is active.
func (c *Cluster) Active(id uint64) bool {
c.mu.RLock()
defer c.mu.RUnlock()
m, ok := c.members[id]
if !ok {
return false
}
return m.active
}

// LastSeenHost returns the last observed source address that the specified
// member connected from.
func (c *Cluster) LastSeenHost(id uint64) string {
c.mu.RLock()
defer c.mu.RUnlock()
m, ok := c.members[id]
if ok {
return m.lastSeenHost
}
return ""
}

// ValidateConfigurationChange takes a proposed ConfChange and
// ensures that it is valid.
func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
Expand Down Expand Up @@ -336,34 +183,3 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
}
return nil
}

// CanRemoveMember checks if removing a Member would not result in a loss
// of quorum, this check is needed before submitting a configuration change
// that might block or harm the Cluster on Member recovery
func (c *Cluster) CanRemoveMember(from uint64, id uint64) bool {
members := c.Members()
nreachable := 0 // reachable managers after removal

for _, m := range members {
if m.RaftID == id {
continue
}

// Local node from where the remove is issued
if m.RaftID == from {
nreachable++
continue
}

if c.Active(m.RaftID) {
nreachable++
}
}

nquorum := (len(members)-1)/2 + 1
if nreachable < nquorum {
return false
}

return true
}
4 changes: 2 additions & 2 deletions manager/state/raft/membership/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func newTestMember(id uint64) *membership.Member {
}

func newTestCluster(members []*membership.Member, removed []*membership.Member) *membership.Cluster {
c := membership.NewCluster(3)
c := membership.NewCluster()
for _, m := range members {
c.AddMember(m)
}
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestClusterMember(t *testing.T) {
}

func TestMembers(t *testing.T) {
cls := membership.NewCluster(1)
cls := membership.NewCluster()
defer cls.Clear()
cls.AddMember(&membership.Member{RaftMember: &api.RaftMember{RaftID: 1}})
cls.AddMember(&membership.Member{RaftMember: &api.RaftMember{RaftID: 5}})
Expand Down
Loading

0 comments on commit 8bc8c79

Please sign in to comment.