Skip to content

Commit

Permalink
Port over cluster pubsub feat from .net asynkron#599
Browse files Browse the repository at this point in the history
  • Loading branch information
Kunduin committed Jan 16, 2023
1 parent 880b460 commit 3bb0c84
Show file tree
Hide file tree
Showing 20 changed files with 2,976 additions and 7 deletions.
1 change: 1 addition & 0 deletions cluster/build.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
protoc -I=../actor --go_out=. --go_opt=paths=source_relative --proto_path=. cluster.proto
protoc -I=../actor --go_out=. --go_opt=paths=source_relative --proto_path=. gossip.proto
protoc -I=../actor --go_out=. --go_opt=paths=source_relative --proto_path=. grain.proto
protoc -I=../actor --go_out=. --go_opt=paths=source_relative --proto_path=. pubsub.proto

4 changes: 4 additions & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Cluster struct {
ActorSystem *actor.ActorSystem
Config *Config
Gossip Gossiper
PubSub *PubSub
Remote *remote.Remote
PidCache *PidCacheValue
MemberList *MemberList
Expand Down Expand Up @@ -44,6 +45,7 @@ func New(actorSystem *actor.ActorSystem, config *Config) *Cluster {

var err error
c.Gossip, err = newGossiper(c)
c.PubSub = NewPubSub(c)

if err != nil {
panic(err)
Expand Down Expand Up @@ -97,6 +99,7 @@ func (c *Cluster) StartMember() {
if err := c.Gossip.StartGossiping(); err != nil {
panic(err)
}
c.PubSub.Start()
c.MemberList.InitializeTopologyConsensus()

if err := cfg.ClusterProvider.StartMember(c); err != nil {
Expand Down Expand Up @@ -130,6 +133,7 @@ func (c *Cluster) StartClient() {
if err := cfg.ClusterProvider.StartClient(c); err != nil {
panic(err)
}
c.PubSub.Start()
}

func (c *Cluster) Shutdown(graceful bool) {
Expand Down
2 changes: 2 additions & 0 deletions cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Config struct {
GossipRequestTimeout time.Duration
GossipFanOut int
GossipMaxSend int
PubSubConfig *PubSubConfig
}

func Configure(clusterName string, clusterProvider ClusterProvider, identityLookup IdentityLookup, remoteConfig *remote.Config, options ...ConfigOption) *Config {
Expand All @@ -45,6 +46,7 @@ func Configure(clusterName string, clusterProvider ClusterProvider, identityLook
GossipRequestTimeout: time.Millisecond * 500,
GossipFanOut: 3,
GossipMaxSend: 50,
PubSubConfig: newPubSubConfig(),
}

for _, option := range options {
Expand Down
8 changes: 8 additions & 0 deletions cluster/config_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,11 @@ func WithKinds(kinds ...*Kind) ConfigOption {
}
}
}

// WithPubSubSubscriberTimeout sets a timeout used when delivering a message batch to a subscriber.
// Default is 5s.
func WithPubSubSubscriberTimeout(timeout time.Duration) ConfigOption {
return func(c *Config) {
c.PubSubConfig.SubscriberTimeout = timeout
}
}
25 changes: 25 additions & 0 deletions cluster/key_value_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package cluster

import "golang.org/x/net/context"

// KeyValueStore is a distributed key value store
type KeyValueStore[T any] interface {
// Set the value for the given key.
Set(ctx context.Context, key string, value T) error
// Get the value for the given key..
Get(ctx context.Context, key string) (T, error)
// Clear the value for the given key.
Clear(ctx context.Context, key string) error
}

// EmptyKeyValueStore is a key value store that does nothing.
type EmptyKeyValueStore[T any] struct{}

func (e *EmptyKeyValueStore[T]) Set(_ context.Context, _ string, _ T) error { return nil }

func (e *EmptyKeyValueStore[T]) Get(_ context.Context, _ string) (T, error) {
var r T
return r, nil
}

func (e *EmptyKeyValueStore[T]) Clear(_ context.Context, _ string) error { return nil }
57 changes: 57 additions & 0 deletions cluster/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package cluster

import (
"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/extensions"
"time"
)

const PubSubDeliveryName = "$pubsub-delivery"

var pubsubExtensionID = extensions.NextExtensionID()

type PubSub struct {
cluster *Cluster
}

func NewPubSub(cluster *Cluster) *PubSub {
p := &PubSub{
cluster: cluster,
}
cluster.ActorSystem.Extensions.Register(p)
return p
}

// Start the PubSubMemberDeliveryActor
func (p *PubSub) Start() {
props := actor.PropsFromProducer(func() actor.Actor {
return NewPubSubMemberDeliveryActor(p.cluster.Config.PubSubConfig.SubscriberTimeout)
})
_, err := p.cluster.ActorSystem.Root.SpawnNamed(props, PubSubDeliveryName)
if err != nil {
panic(err) // let it crash
}
}

func (p *PubSub) ExtensionID() extensions.ExtensionID {
return pubsubExtensionID
}

type PubSubConfig struct {
// SubscriberTimeout is a timeout used when delivering a message batch to a subscriber. Default is 5s.
//
// This value gets rounded to seconds for optimization of cancellation token creation. Note that internally,
// cluster request is used to deliver messages to ClusterIdentity subscribers.
SubscriberTimeout time.Duration
}

func newPubSubConfig() *PubSubConfig {
return &PubSubConfig{
SubscriberTimeout: 5 * time.Second,
}
}

// GetPubSub returns the PubSub extension from the actor system
func GetPubSub(system *actor.ActorSystem) *PubSub {
return system.Extensions.Get(pubsubExtensionID).(*PubSub)
}
Loading

0 comments on commit 3bb0c84

Please sign in to comment.