Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable gossip #635

Merged
merged 7 commits into from
Apr 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion _examples/cluster-broadcast/node1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
console "github.com/asynkron/goconsole"
"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/cluster"
automanaged "github.com/asynkron/protoactor-go/cluster/clusterproviders/_automanaged"
"github.com/asynkron/protoactor-go/cluster/clusterproviders/automanaged"
"github.com/asynkron/protoactor-go/cluster/identitylookup/disthash"
"github.com/asynkron/protoactor-go/remote"
)
Expand Down
2 changes: 1 addition & 1 deletion _examples/cluster-broadcast/node2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"cluster-broadcast/shared"

automanaged "github.com/asynkron/protoactor-go/cluster/clusterproviders/_automanaged"
"github.com/asynkron/protoactor-go/cluster/clusterproviders/automanaged"
"github.com/asynkron/protoactor-go/cluster/identitylookup/disthash"

console "github.com/asynkron/goconsole"
Expand Down
2 changes: 1 addition & 1 deletion _examples/cluster-eventstream-broadcast/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strings"
"time"

automanaged "github.com/asynkron/protoactor-go/cluster/clusterproviders/_automanaged"
"github.com/asynkron/protoactor-go/cluster/clusterproviders/automanaged"
"github.com/asynkron/protoactor-go/cluster/identitylookup/disthash"

console "github.com/asynkron/goconsole"
Expand Down
18 changes: 9 additions & 9 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/asynkron/protoactor-go/remote"
)

var extensionId = extensions.NextExtensionID()
var extensionID = extensions.NextExtensionID()

type Cluster struct {
ActorSystem *actor.ActorSystem
Expand Down Expand Up @@ -63,12 +63,12 @@ func (c *Cluster) subscribeToTopologyEvents() {
}

func (c *Cluster) ExtensionID() extensions.ExtensionID {
return extensionId
return extensionID
}

//goland:noinspection GoUnusedExportedFunction
func GetCluster(actorSystem *actor.ActorSystem) *Cluster {
c := actorSystem.Extensions.Get(extensionId)
c := actorSystem.Extensions.Get(extensionID)

return c.(*Cluster)
}
Expand All @@ -87,17 +87,17 @@ func (c *Cluster) StartMember() {
c.Remote.Start()

address := c.ActorSystem.Address()
plog.Info("Starting Proto.Actor cluster member", log.String("address", address))
plog.Info("Starting Proto.Actor cluster member", log.String("id", c.ActorSystem.ID), log.String("address", address))

c.IdentityLookup = cfg.IdentityLookup
c.IdentityLookup.Setup(c, c.GetClusterKinds(), false)

// TODO: Disable Gossip for now until API changes are done
// gossiper must be started whenever any topology events starts flowing
// if err := c.Gossip.StartGossiping(); err != nil {
// panic(err)
// }
// c.MemberList.InitializeTopologyConsensus()
//gossiper must be started whenever any topology events starts flowing
if err := c.Gossip.StartGossiping(); err != nil {
panic(err)
}
c.MemberList.InitializeTopologyConsensus()

if err := cfg.ClusterProvider.StartMember(c); err != nil {
panic(err)
Expand Down
3 changes: 2 additions & 1 deletion cluster/clusterproviders/automanaged/automanaged.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,5 +380,6 @@ func (p *AutoManagedProvider) isActiveProviderRunning() bool {
}

func (p *AutoManagedProvider) getCurrentNode() *NodeModel {
return NewNode(p.clusterName, p.address, p.memberPort, p.autoManagePort, p.knownKinds)

return NewNode(p.clusterName, p.cluster.ActorSystem.ID, p.address, p.memberPort, p.autoManagePort, p.knownKinds)
}
10 changes: 2 additions & 8 deletions cluster/clusterproviders/automanaged/node_model.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package automanaged

import "fmt"

// NodeModel represents a node in the cluster
type NodeModel struct {
ID string `json:"id"`
Expand All @@ -13,17 +11,13 @@ type NodeModel struct {
}

// NewNode returns a new node for the cluster
func NewNode(clusterName string, address string, port int, autoManPort int, kind []string) *NodeModel {
func NewNode(clusterName string, id string, address string, port int, autoManPort int, kind []string) *NodeModel {
return &NodeModel{
ID: createNodeID(clusterName, address, port),
ID: id,
ClusterName: clusterName,
Address: address,
Port: port,
AutoManagePort: autoManPort,
Kinds: kind,
}
}

func createNodeID(clusterName string, address string, port int) string {
return fmt.Sprintf("%v@%v:%v", clusterName, address, port)
}
98 changes: 53 additions & 45 deletions cluster/gossip_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func NewGossipActor(requestTimeout time.Duration, myID string, getBlockedMembers
// Receive method.
func (ga *GossipActor) Receive(ctx actor.Context) {
switch r := ctx.Message().(type) {
case *actor.Started:
//pass
case *SetGossipStateKey:
ga.onSetGossipStateKey(r, ctx)
case *GetGossipStateRequest:
Expand All @@ -49,7 +51,7 @@ func (ga *GossipActor) Receive(ctx actor.Context) {
case *ClusterTopology:
ga.onClusterTopology(r)
case *GossipResponse:
// noop: review after roger's work is done
plog.Error("GossipResponse should not be received by GossipActor") //it should be a response to a request
default:
plog.Warn("Gossip received unknown message request", log.Message(r))
}
Expand All @@ -74,7 +76,7 @@ func (ga *GossipActor) onGetGossipStateKey(r *GetGossipStateRequest, ctx actor.C
}

func (ga *GossipActor) onGossipRequest(r *GossipRequest, ctx actor.Context) {
plog.Debug("Gossip request", log.PID("sender", ctx.Sender()))
plog.Debug("OnGossipRequest", log.PID("sender", ctx.Sender()))
ga.ReceiveState(r.State, ctx)

if !GetCluster(ctx.ActorSystem()).MemberList.ContainsMemberID(r.MemberId) {
Expand All @@ -97,26 +99,36 @@ func (ga *GossipActor) onGossipRequest(r *GossipRequest, ctx actor.Context) {
return
}

msg := GossipResponse{
State: memberState.State,
}
future := ctx.RequestFuture(ctx.Sender(), &msg, GetCluster(ctx.ActorSystem()).Config.GossipRequestTimeout)

// wait until we get a response or an error from the future
resp, err := future.Result()
if err != nil {
plog.Error("onSendGossipState failed", log.Error(err))

return
}

if _, ok := resp.(*GossipResponseAck); ok {
memberState.CommitOffsets()

return
}

plog.Error("onSendGossipState received unknown response message", log.Message(r))
ctx.Respond(&GossipResponse{})
return

//turn off acking for now

//msg := GossipResponse{
// State: memberState.State,
//}
//future := ctx.RequestFuture(ctx.Sender(), &msg, GetCluster(ctx.ActorSystem()).Config.GossipRequestTimeout)
//
//ctx.ReenterAfter(future, func(res interface{}, err error) {
// if err != nil {
// plog.Warn("onGossipRequest failed", log.String("MemberId", r.MemberId), log.Error(err))
// return
// }
//
// if _, ok := res.(*GossipResponseAck); ok {
// memberState.CommitOffsets()
// return
// }
//
// m, ok := res.(proto.Message)
// if !ok {
// plog.Warn("onGossipRequest failed", log.String("MemberId", r.MemberId), log.Error(err))
// return
// }
// n := string(proto.MessageName(m).Name())
//
// plog.Error("onGossipRequest received unknown response message", log.String("type", n), log.Message(r))
//})
}

func (ga *GossipActor) onSetGossipStateKey(r *SetGossipStateKey, ctx actor.Context) {
Expand Down Expand Up @@ -145,43 +157,39 @@ func (ga *GossipActor) ReceiveState(remoteState *GossipState, ctx actor.Context)

func (ga *GossipActor) sendGossipForMember(member *Member, memberStateDelta *MemberStateDelta, ctx actor.Context) {
pid := actor.NewPID(member.Address(), DefaultGossipActorName)
plog.Info("Sending GossipRequest", log.String("MemberId", member.Id))
plog.Debug("Sending GossipRequest", log.String("MemberId", member.Id))

// a short timeout is massively important, we cannot afford hanging around waiting
// for timeout, blocking other gossips from getting through

msg := GossipRequest{
// TODO: Uncomment this line when we replace the current "address:port" as ID
// with the proper ActorSystem.ID after new API refactor changes
// Oscar Campos: 2022-04-09
// MemberId: ctx.ActorSystem().ID,
MemberId: member.Address(),
MemberId: member.Id,
State: memberStateDelta.State,
}
future := ctx.RequestFuture(pid, &msg, ga.gossipRequestTimeout)

// wait until we get a response or an error from the future
r, err := future.Result()
if err != nil {
plog.Error("onSendGossipState failed", log.Error(err))
ctx.ReenterAfter(future, func(res interface{}, err error) {
if ctx.Sender() != nil {
ctx.Send(ctx.Sender(), &GossipResponseAck{})
}

return
}
if err != nil {
plog.Warn("sendGossipForMember failed", log.String("MemberId", member.Id), log.Error(err))
return
}

resp, ok := r.(*GossipResponse)
if !ok {
plog.Error("onSendGossipState received unknown response message", log.Message(r))
resp, ok := res.(*GossipResponse)
if !ok {
plog.Error("sendGossipForMember received unknown response message", log.Message(resp))

return
}
return
}

memberStateDelta.CommitOffsets()
memberStateDelta.CommitOffsets()

if resp.State != nil {
ga.ReceiveState(resp.State, ctx)
if resp.State != nil {
ga.ReceiveState(resp.State, ctx)

if ctx.Sender() != nil {
ctx.Send(ctx.Sender(), &GossipResponseAck{})
}
}
})
}