Skip to content

Commit

Permalink
Port over gossip heartbeat eviction and graceful left asynkron#639
Browse files Browse the repository at this point in the history
  • Loading branch information
Kunduin committed Jan 21, 2023
1 parent acf4016 commit a42802d
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 21 deletions.
2 changes: 2 additions & 0 deletions cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Config struct {
GossipRequestTimeout time.Duration
GossipFanOut int
GossipMaxSend int
HeartbeatExpiration time.Duration // Gossip heartbeat timeout. If the member does not update its heartbeat within this period, it will be added to the BlockList
PubSubConfig *PubSubConfig
}

Expand All @@ -46,6 +47,7 @@ func Configure(clusterName string, clusterProvider ClusterProvider, identityLook
GossipRequestTimeout: time.Millisecond * 500,
GossipFanOut: 3,
GossipMaxSend: 50,
HeartbeatExpiration: time.Second * 20,
PubSubConfig: newPubSubConfig(),
}

Expand Down
7 changes: 7 additions & 0 deletions cluster/config_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,10 @@ func WithPubSubSubscriberTimeout(timeout time.Duration) ConfigOption {
c.PubSubConfig.SubscriberTimeout = timeout
}
}

// WithHeartbeatExpiration sets the gossip heartbeat expiration.
func WithHeartbeatExpiration(t time.Duration) ConfigOption {
return func(c *Config) {
c.HeartbeatExpiration = t
}
}
3 changes: 1 addition & 2 deletions cluster/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package cluster

import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)

// customary type that defines a states sender callback.
Expand All @@ -13,7 +12,7 @@ type LocalStateSender func(memberStateDelta *MemberStateDelta, member *Member)
// This interface must be implemented by any value that.
// wants to be used as a gossip state storage
type GossipStateStorer interface {
GetState(key string) map[string]*anypb.Any
GetState(key string) map[string]*GossipKeyValue
SetState(key string, value proto.Message)
}

Expand Down
5 changes: 5 additions & 0 deletions cluster/gossip_state_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cluster
import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"time"
)

// convenience type alias
Expand Down Expand Up @@ -39,6 +40,7 @@ func setKey(state *GossipState, key string, value proto.Message, memberID string
// if entry does not exists, add it
memberState := ensureMemberStateExists(state, memberID)
entry := ensureEntryExists(memberState, key)
entry.LocalTimestampUnixMilliseconds = time.Now().UnixMilli()

sequenceNo++
entry.SequenceNumber = sequenceNo
Expand Down Expand Up @@ -71,6 +73,7 @@ func mergeState(localState *GossipState, remoteState *GossipState) ([]*GossipUpd
SeqNumber: entry.SequenceNumber,
}
updates = append(updates, &update)
entry.LocalTimestampUnixMilliseconds = time.Now().UnixMilli()
updatedKeys[key] = empty{}
}
continue
Expand All @@ -89,6 +92,7 @@ func mergeState(localState *GossipState, remoteState *GossipState) ([]*GossipUpd
SeqNumber: remoteValue.SequenceNumber,
}
updates = append(updates, &update)
remoteValue.LocalTimestampUnixMilliseconds = time.Now().UnixMilli()
updatedKeys[key] = empty{}
continue
}
Expand All @@ -109,6 +113,7 @@ func mergeState(localState *GossipState, remoteState *GossipState) ([]*GossipUpd
SeqNumber: remoteValue.SequenceNumber,
}
updates = append(updates, &update)
remoteValue.LocalTimestampUnixMilliseconds = time.Now().UnixMilli()
updatedKeys[key] = empty{}
}
}
Expand Down
63 changes: 57 additions & 6 deletions cluster/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package cluster
import (
"errors"
"fmt"
"github.com/asynkron/protoactor-go/remote"
"strings"
"time"

"github.com/asynkron/gofun/set"
Expand Down Expand Up @@ -65,7 +67,7 @@ func newGossiper(cl *Cluster, opts ...Option) (*Gossiper, error) {
return gossiper, nil
}

func (g *Gossiper) GetState(key string) (map[string]*anypb.Any, error) {
func (g *Gossiper) GetState(key string) (map[string]*GossipKeyValue, error) {
plog.Debug(fmt.Sprintf("Gossiper getting state from %s", g.pid))

msg := NewGetGossipStateRequest(key)
Expand Down Expand Up @@ -223,25 +225,74 @@ func (g *Gossiper) gossipLoop() {
// P, and we do not want our Gs to be scheduled out from the running Ms
ticker := time.NewTicker(g.cluster.Config.GossipInterval)
breakLoop:
for {
for !g.cluster.ActorSystem.IsStopped() {
select {
case <-g.close:
plog.Info("Stopping Gossip Loop")
break breakLoop
case <-ticker.C:

g.blockExpiredHeartbeats()
g.blockGracefullyLeft()

g.SetState(HearthbeatKey, &MemberHeartbeat{
// todo collect the actor statistics
ActorStatistics: &ActorStatistics{},
})
g.SendState()
}
}
}

// blockExpiredHeartbeats blocks members that have not sent a heartbeat for a long time
func (g *Gossiper) blockExpiredHeartbeats() {
if g.cluster.Config.GossipInterval == 0 {
return
}
t, err := g.GetState(HearthbeatKey)
if err != nil {
plog.Error("Could not get heartbeat state", log.Error(err))
return
}

blockList := remote.GetRemote(g.cluster.ActorSystem).BlockList()

/*
await BlockExpiredHeartbeats();
blocked := make([]string, 0)

await BlockGracefullyLeft();
*/
for k, v := range t {
if k != g.cluster.ActorSystem.ID &&
!blockList.IsBlocked(k) &&
time.Now().Sub(time.UnixMilli(v.LocalTimestampUnixMilliseconds)) > g.cluster.Config.HeartbeatExpiration {
blocked = append(blocked, k)
}
}

if len(blocked) > 0 {
plog.Info("Blocking members due to expired heartbeat", log.String("members", strings.Join(blocked, ",")))
blockList.Block(blocked...)
}
}

// blockGracefullyLeft blocking members due to gracefully leaving
func (g *Gossiper) blockGracefullyLeft() {
t, err := g.GetState(GracefullyLeftKey)
if err != nil {
plog.Error("Could not get gracefully left members", log.Error(err))
return
}

blockList := remote.GetRemote(g.cluster.ActorSystem).BlockList()

gracefullyLeft := make([]string, 0)
for k := range t {
if !blockList.IsBlocked(k) && k != g.cluster.ActorSystem.ID {
gracefullyLeft = append(gracefullyLeft, k)
}
}
if len(gracefullyLeft) > 0 {
plog.Info("Blocking members due to gracefully leaving", log.String("members", strings.Join(gracefullyLeft, ",")))
blockList.Block(gracefullyLeft...)
}
}

func (g *Gossiper) throttledLog(counter int32) {
Expand Down
13 changes: 6 additions & 7 deletions cluster/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,15 @@ import (
"time"

"github.com/asynkron/gofun/set"
"google.golang.org/protobuf/types/known/anypb"

"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/log"
"google.golang.org/protobuf/proto"
)

const (
TopologyKey string = "topology"
HearthbeatKey string = "heathbeat"
TopologyKey string = "topology"
HearthbeatKey string = "heathbeat"
GracefullyLeftKey string = "left"
)

// create and seed a pseudo random numbers generator
Expand Down Expand Up @@ -237,12 +236,12 @@ func (inf *Informer) RemoveConsensusCheck(id string) {

// retrieves this informer current state for the given key
// returns map containing each known member id and their value
func (inf *Informer) GetState(key string) map[string]*anypb.Any {
entries := make(map[string]*anypb.Any)
func (inf *Informer) GetState(key string) map[string]*GossipKeyValue {
entries := make(map[string]*GossipKeyValue)

for memberID, memberState := range inf.state.Members {
if value, ok := memberState.Values[key]; ok {
entries[memberID] = value.Value
entries[memberID] = value
}
}

Expand Down
6 changes: 3 additions & 3 deletions cluster/informer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestInformer_GetState(t *testing.T) {
}

var s2 MemberHeartbeat
err := x.UnmarshalTo(&s2)
err := x.Value.UnmarshalTo(&s2)
if err != nil {
t.Error("unmarshal state error")
}
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestInformer_ReceiveState(t *testing.T) {

var s1 MemberHeartbeat

err := m1.UnmarshalTo(&s1)
err := m1.Value.UnmarshalTo(&s1)

if err != nil {
t.Error("unmarshal member1 state error")
Expand All @@ -110,7 +110,7 @@ func TestInformer_ReceiveState(t *testing.T) {

var s2 MemberHeartbeat

err = m2.UnmarshalTo(&s2)
err = m2.Value.UnmarshalTo(&s2)

if err != nil {
t.Error("unmarshal member2 state error")
Expand Down
5 changes: 2 additions & 3 deletions cluster/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cluster

import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)

// Used to query the GossipActor about a given key status
Expand All @@ -18,10 +17,10 @@ func NewGetGossipStateRequest(key string) GetGossipStateRequest {

// Used by the GossipActor to send back the status value of a given key
type GetGossipStateResponse struct {
State map[string]*anypb.Any
State map[string]*GossipKeyValue
}

func NewGetGossipStateResponse(state map[string]*anypb.Any) GetGossipStateResponse {
func NewGetGossipStateResponse(state map[string]*GossipKeyValue) GetGossipStateResponse {
value := GetGossipStateResponse{
State: state,
}
Expand Down

0 comments on commit a42802d

Please sign in to comment.