Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: transport package #1748

Merged
merged 1 commit into from
Jan 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions log/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ func WithLogger(ctx context.Context, logger *logrus.Entry) context.Context {
return context.WithValue(ctx, loggerKey{}, logger)
}

// WithFields returns a new context with added fields to logger.
func WithFields(ctx context.Context, fields logrus.Fields) context.Context {
logger := ctx.Value(loggerKey{})

if logger == nil {
logger = L
}
return WithLogger(ctx, logger.(*logrus.Entry).WithFields(fields))
}

// WithField is convenience wrapper around WithFields.
func WithField(ctx context.Context, key, value string) context.Context {
return WithFields(ctx, logrus.Fields{key: value})
}

// GetLogger retrieves the current logger from the context. If no logger is
// available, the default logger is returned.
func GetLogger(ctx context.Context) *logrus.Entry {
Expand Down
13 changes: 10 additions & 3 deletions manager/controlapi/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,6 @@ func TestListManagerNodes(t *testing.T) {
return nil
}))

// Switch the raft node used by the server
ts.Server.raft = nodes[2].Node

// Stop node 1 (leader)
nodes[1].Server.Stop()
nodes[1].ShutdownRaft()
Expand All @@ -390,6 +387,16 @@ func TestListManagerNodes(t *testing.T) {
// Wait for the re-election to occur
raftutils.WaitForCluster(t, clockSource, newCluster)

var leaderNode *raftutils.TestNode
for _, node := range newCluster {
if node.IsLeader() {
leaderNode = node
}
}

// Switch the raft node used by the server
ts.Server.raft = leaderNode.Node

// Node 1 should not be the leader anymore
assert.NoError(t, raftutils.PollFunc(clockSource, func() error {
r, err = ts.Client.ListNodes(context.Background(), &api.ListNodesRequest{})
Expand Down
219 changes: 32 additions & 187 deletions manager/state/raft/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@ package membership

import (
"errors"
"fmt"
"sync"

"google.golang.org/grpc"

"github.com/coreos/etcd/raft/raftpb"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/watch"
"github.com/gogo/protobuf/proto"
"golang.org/x/net/context"
)

var (
Expand All @@ -25,101 +21,39 @@ var (
ErrConfigChangeInvalid = errors.New("membership: ConfChange type should be either AddNode, RemoveNode or UpdateNode")
// ErrCannotUnmarshalConfig is thrown when a node cannot unmarshal a configuration change
ErrCannotUnmarshalConfig = errors.New("membership: cannot unmarshal configuration change")
// ErrMemberRemoved is thrown when a node was removed from the cluster
ErrMemberRemoved = errors.New("raft: member was removed from the cluster")
)

// deferredConn used to store removed members connection for some time.
// We need this in case if removed node is redirector or endpoint of ControlAPI call.
type deferredConn struct {
tick int
conn *grpc.ClientConn
}

// Cluster represents a set of active
// raft Members
type Cluster struct {
mu sync.RWMutex
members map[uint64]*Member
deferedConns map[*deferredConn]struct{}
mu sync.RWMutex
members map[uint64]*Member

// removed contains the list of removed Members,
// those ids cannot be reused
removed map[uint64]bool
heartbeatTicks int
removed map[uint64]bool

PeersBroadcast *watch.Queue
}

// Member represents a raft Cluster Member
type Member struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: it doesn't hurt anything to leave it, but it seems like at this point we can just use api.RaftMember directly instead of Member in Cluster?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we should do that. Let's leave it to next PR. This one is too big already.

*api.RaftMember

Conn *grpc.ClientConn
tick int
active bool
lastSeenHost string
}

// HealthCheck sends a health check RPC to the member and returns the response.
func (member *Member) HealthCheck(ctx context.Context) error {
healthClient := api.NewHealthClient(member.Conn)
resp, err := healthClient.Check(ctx, &api.HealthCheckRequest{Service: "Raft"})
if err != nil {
return err
}
if resp.Status != api.HealthCheckResponse_SERVING {
return fmt.Errorf("health check returned status %s", resp.Status.String())
}
return nil
}

// NewCluster creates a new Cluster neighbors list for a raft Member.
// Member marked as inactive if there was no call ReportActive for heartbeatInterval.
func NewCluster(heartbeatTicks int) *Cluster {
func NewCluster() *Cluster {
// TODO(abronan): generate Cluster ID for federation

return &Cluster{
members: make(map[uint64]*Member),
removed: make(map[uint64]bool),
deferedConns: make(map[*deferredConn]struct{}),
heartbeatTicks: heartbeatTicks,
PeersBroadcast: watch.NewQueue(),
}
}

func (c *Cluster) handleInactive() {
for _, m := range c.members {
if !m.active {
continue
}
m.tick++
if m.tick > c.heartbeatTicks {
m.active = false
if m.Conn != nil {
m.Conn.Close()
}
}
}
}

func (c *Cluster) handleDeferredConns() {
for dc := range c.deferedConns {
dc.tick++
if dc.tick > c.heartbeatTicks {
dc.conn.Close()
delete(c.deferedConns, dc)
}
}
}

// Tick increases ticks for all members. After heartbeatTicks node marked as
// inactive.
func (c *Cluster) Tick() {
c.mu.Lock()
defer c.mu.Unlock()
c.handleInactive()
c.handleDeferredConns()
}

// Members returns the list of raft Members in the Cluster.
func (c *Cluster) Members() map[uint64]*Member {
members := make(map[uint64]*Member)
Expand Down Expand Up @@ -168,8 +102,6 @@ func (c *Cluster) AddMember(member *Member) error {
if c.removed[member.RaftID] {
return ErrIDRemoved
}
member.active = true
member.tick = 0

c.members[member.RaftID] = member

Expand All @@ -187,55 +119,47 @@ func (c *Cluster) RemoveMember(id uint64) error {
return c.clearMember(id)
}

// ClearMember removes a node from the Cluster Memberlist, but does NOT add it
// to the removed list.
func (c *Cluster) ClearMember(id uint64) error {
// UpdateMember updates member address.
func (c *Cluster) UpdateMember(id uint64, m *api.RaftMember) error {
c.mu.Lock()
defer c.mu.Unlock()

return c.clearMember(id)
}

func (c *Cluster) clearMember(id uint64) error {
m, ok := c.members[id]
if ok {
if m.Conn != nil {
// defer connection close to after heartbeatTicks
dConn := &deferredConn{conn: m.Conn}
c.deferedConns[dConn] = struct{}{}
}
delete(c.members, id)
if c.removed[id] {
return ErrIDRemoved
}
c.broadcastUpdate()
return nil
}

// ReplaceMemberConnection replaces the member's GRPC connection.
func (c *Cluster) ReplaceMemberConnection(id uint64, oldConn *Member, newConn *Member, newAddr string, force bool) error {
c.mu.Lock()
defer c.mu.Unlock()

oldMember, ok := c.members[id]
if !ok {
return ErrIDNotFound
}

if !force && oldConn.Conn != oldMember.Conn {
// The connection was already replaced. Don't do it again.
newConn.Conn.Close()
return nil
if oldMember.NodeID != m.NodeID {
// Should never happen; this is a sanity check
return errors.New("node ID mismatch match on node update")
}

if oldMember.Conn != nil {
oldMember.Conn.Close()
if oldMember.Addr == m.Addr {
// nothing to do
return nil
}
oldMember.RaftMember = m
return nil
}

// ClearMember removes a node from the Cluster Memberlist, but does NOT add it
// to the removed list.
func (c *Cluster) ClearMember(id uint64) error {
c.mu.Lock()
defer c.mu.Unlock()

newMember := *oldMember
newMember.RaftMember = oldMember.RaftMember.Copy()
newMember.RaftMember.Addr = newAddr
newMember.Conn = newConn.Conn
c.members[id] = &newMember
return c.clearMember(id)
}

func (c *Cluster) clearMember(id uint64) error {
if _, ok := c.members[id]; ok {
delete(c.members, id)
c.broadcastUpdate()
}
return nil
}

Expand All @@ -249,60 +173,12 @@ func (c *Cluster) IsIDRemoved(id uint64) bool {
// Clear resets the list of active Members and removed Members.
func (c *Cluster) Clear() {
c.mu.Lock()
for _, member := range c.members {
if member.Conn != nil {
member.Conn.Close()
}
}

for dc := range c.deferedConns {
dc.conn.Close()
}

c.members = make(map[uint64]*Member)
c.removed = make(map[uint64]bool)
c.deferedConns = make(map[*deferredConn]struct{})
c.mu.Unlock()
}

// ReportActive reports that member is active (called ProcessRaftMessage),
func (c *Cluster) ReportActive(id uint64, sourceHost string) {
c.mu.Lock()
defer c.mu.Unlock()
m, ok := c.members[id]
if !ok {
return
}
m.tick = 0
m.active = true
if sourceHost != "" {
m.lastSeenHost = sourceHost
}
}

// Active returns true if node is active.
func (c *Cluster) Active(id uint64) bool {
c.mu.RLock()
defer c.mu.RUnlock()
m, ok := c.members[id]
if !ok {
return false
}
return m.active
}

// LastSeenHost returns the last observed source address that the specified
// member connected from.
func (c *Cluster) LastSeenHost(id uint64) string {
c.mu.RLock()
defer c.mu.RUnlock()
m, ok := c.members[id]
if ok {
return m.lastSeenHost
}
return ""
}

// ValidateConfigurationChange takes a proposed ConfChange and
// ensures that it is valid.
func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
Expand Down Expand Up @@ -334,34 +210,3 @@ func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
}
return nil
}

// CanRemoveMember checks if removing a Member would not result in a loss
// of quorum, this check is needed before submitting a configuration change
// that might block or harm the Cluster on Member recovery
func (c *Cluster) CanRemoveMember(from uint64, id uint64) bool {
members := c.Members()
nreachable := 0 // reachable managers after removal

for _, m := range members {
if m.RaftID == id {
continue
}

// Local node from where the remove is issued
if m.RaftID == from {
nreachable++
continue
}

if c.Active(m.RaftID) {
nreachable++
}
}

nquorum := (len(members)-1)/2 + 1
if nreachable < nquorum {
return false
}

return true
}
4 changes: 2 additions & 2 deletions manager/state/raft/membership/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func newTestMember(id uint64) *membership.Member {
}

func newTestCluster(members []*membership.Member, removed []*membership.Member) *membership.Cluster {
c := membership.NewCluster(3)
c := membership.NewCluster()
for _, m := range members {
c.AddMember(m)
}
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestClusterMember(t *testing.T) {
}

func TestMembers(t *testing.T) {
cls := membership.NewCluster(1)
cls := membership.NewCluster()
defer cls.Clear()
cls.AddMember(&membership.Member{RaftMember: &api.RaftMember{RaftID: 1}})
cls.AddMember(&membership.Member{RaftMember: &api.RaftMember{RaftID: 5}})
Expand Down
Loading