Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add DefaultClusterContext #530

Merged
merged 1 commit into from
Dec 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package cluster

import (
"time"

"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/extensions"
"github.com/AsynkronIT/protoactor-go/log"
"github.com/AsynkronIT/protoactor-go/remote"
"time"
)

var extensionId = extensions.NextExtensionID()
Expand All @@ -22,19 +23,20 @@ type Cluster struct {
}

func New(actorSystem *actor.ActorSystem, config *Config) *Cluster {
c := &Cluster{

c := Cluster{
ActorSystem: actorSystem,
Config: config,
kinds: map[string]*actor.Props{},
}
actorSystem.Extensions.Register(c)
actorSystem.Extensions.Register(&c)

c.context = NewDefaultClusterContext(c)
c.context = config.ClusterContextProducer(&c)
c.PidCache = NewPidCache()
c.MemberList = NewMemberList(c)
c.MemberList = NewMemberList(&c)
c.subscribeToTopologyEvents()

return c
return &c
}

func (c *Cluster) subscribeToTopologyEvents() {
Expand Down
45 changes: 45 additions & 0 deletions cluster/cluster_config_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (C) 2017 - 2022 Asynkron.se <http://www.asynkron.se>

package cluster

import (
"fmt"
"time"

"github.com/AsynkronIT/protoactor-go/actor"
)

const (
defaultActorRequestTimeout time.Duration = 5 * time.Second
defaultRequestsLogThrottlePeriod time.Duration = 2 * time.Second
defaultMaxNumberOfEvetsInRequestLogThrottledPeriod int = 3
)

// Data structure used to configure cluster context parameters
type ClusterContextConfig struct {
ActorRequestTimeout time.Duration
RequestsLogThrottlePeriod time.Duration
MaxNumberOfEventsInRequestLogThrottledPeriod int
RetryAction func(int) int
requestLogThrottle actor.ShouldThrottle
}

// Creates a mew ClusterContextConfig with default
// values and returns a pointer to its memory address
func NewDefaultClusterContextConfig() *ClusterContextConfig {

config := ClusterContextConfig{
ActorRequestTimeout: defaultActorRequestTimeout,
RequestsLogThrottlePeriod: defaultRequestsLogThrottlePeriod,
MaxNumberOfEventsInRequestLogThrottledPeriod: defaultMaxNumberOfEvetsInRequestLogThrottledPeriod,
RetryAction: defaultRetryAction,
requestLogThrottle: actor.NewThrottle(
int32(defaultMaxNumberOfEvetsInRequestLogThrottledPeriod),
defaultRequestsLogThrottlePeriod,
func(i int32) {
plog.Info(fmt.Sprintf("Throttled %d Request logs", i))
},
),
}
return &config
}
19 changes: 3 additions & 16 deletions cluster/cluster_context.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,10 @@
package cluster

import "fmt"
import "time"

// Interface any cluster context needs to implement
type ClusterContext interface {
Request(identity string, kind string, message interface{}) (interface{}, error)
}

func NewDefaultClusterContext(cluster *Cluster) ClusterContext {
return &DefaultClusterContext{
cluster: cluster,
}
}

type DefaultClusterContext struct {
cluster *Cluster
}

func (d DefaultClusterContext) Request(identity string, kind string, message interface{}) (interface{}, error) {
return nil, fmt.Errorf("foo")
Request(identity string, kind string, message interface{}, timeout ...time.Duration) (interface{}, error)
}

/*
Expand Down
71 changes: 56 additions & 15 deletions cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,31 @@ import (
)

type Config struct {
Name string
Address string
ClusterProvider ClusterProvider
Identitylookup IdentityLookup
RemoteConfig remote.Config
RequestTimeoutTime time.Duration
MemberStrategyBuilder func(kind string) MemberStrategy
Kinds map[string]*actor.Props
Name string
Address string
ClusterProvider ClusterProvider
Identitylookup IdentityLookup
RemoteConfig remote.Config
RequestTimeoutTime time.Duration
RequestsLogThrottlePeriod time.Duration
MaxNumberOfEventsInRequestLogThrottledPeriod int
ClusterContextProducer ClusterContextProducer
MemberStrategyBuilder func(kind string) MemberStrategy
Kinds map[string]*actor.Props
}

func Configure(clusterName string, clusterProvider ClusterProvider, identityLookup IdentityLookup, remoteConfig remote.Config, kinds ...*Kind) *Config {
config := &Config{
Name: clusterName,
ClusterProvider: clusterProvider,
Identitylookup: identityLookup,
RequestTimeoutTime: time.Second * 5,
MemberStrategyBuilder: newDefaultMemberStrategy,
RemoteConfig: remoteConfig,
Kinds: make(map[string]*actor.Props),
Name: clusterName,
ClusterProvider: clusterProvider,
Identitylookup: identityLookup,
RequestTimeoutTime: defaultActorRequestTimeout,
RequestsLogThrottlePeriod: defaultRequestsLogThrottlePeriod,
MemberStrategyBuilder: newDefaultMemberStrategy,
RemoteConfig: remoteConfig,
Kinds: make(map[string]*actor.Props),
ClusterContextProducer: newDefaultClusterContext,
MaxNumberOfEventsInRequestLogThrottledPeriod: defaultMaxNumberOfEvetsInRequestLogThrottledPeriod,
}

for _, kind := range kinds {
Expand All @@ -42,6 +48,41 @@ func (c *Config) WithRequestTimeout(t time.Duration) *Config {
return c
}

// Sets the given request log throttle period duration
// and returns itself back
func (c *Config) WithRequestsLogThrottlePeriod(period time.Duration) *Config {

c.RequestsLogThrottlePeriod = period
return c
}

// Sets the given context producer and returns itself back
func (c *Config) WithClusterContextProducer(producer ClusterContextProducer) *Config {

c.ClusterContextProducer = producer
return c
}

// Sets the given max number of events in requests log throttle period
// and returns itself back
func (c *Config) WithMaxNumberOfEventsInRequestLogThrottlePeriod(maxNumber int) *Config {

c.MaxNumberOfEventsInRequestLogThrottledPeriod = maxNumber
return c
}

// Converts this Cluster config ClusterContext parameters
// into a valid ClusterContextConfig value and returns a pointer to its memory
func (c *Config) ToClusterContextConfig() *ClusterContextConfig {

clusterContextConfig := ClusterContextConfig{
ActorRequestTimeout: c.RequestTimeoutTime,
RequestsLogThrottlePeriod: c.RequestsLogThrottlePeriod,
MaxNumberOfEventsInRequestLogThrottledPeriod: c.MaxNumberOfEventsInRequestLogThrottledPeriod,
}
return &clusterContextConfig
}

type Kind struct {
Kind string
Props *actor.Props
Expand Down
114 changes: 114 additions & 0 deletions cluster/default_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright (C) 2017 - 2022 Asynkron.se <http://www.asynkron.se>

package cluster

import (
"context"
"fmt"
"time"

"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/log"
"github.com/AsynkronIT/protoactor-go/remote"
)

// Defines a type to provide DefaultClusterContext configurations / implementations
type ClusterContextProducer func(*Cluster) ClusterContext

// Defines a default cluster context data structure
type DefaultClusterContext struct {
cluster *Cluster
}

var _ ClusterContext = (*DefaultClusterContext)(nil)

// Creates a new DefaultClusterContext value and returns
// a pointer to its memory address as a ClusterContext
func newDefaultClusterContext(cluster *Cluster) ClusterContext {

clusterContext := DefaultClusterContext{
cluster: cluster,
}
return &clusterContext
}

func (dcc *DefaultClusterContext) Request(identity, kind string, message interface{}, timeout ...time.Duration) (interface{}, error) {

var err error
var resp interface{}
var counter int

// get the configuration from the composed Cluster value
cfg := dcc.cluster.Config.ToClusterContextConfig()

start := time.Now()
plog.Debug(fmt.Sprintf("Requesting %s:%s Message %#v", identity, kind, message))

// crate a new Timeout Context
ttl := cfg.ActorRequestTimeout
if len(timeout) > 0 {
ttl = timeout[0]
}
ctx, cancel := context.WithTimeout(context.Background(), ttl)
defer cancel()

_context := dcc.cluster.ActorSystem.Root
selectloop:
for {
select {
case <-ctx.Done():
// TODO: handler throttling and messaging here
err = fmt.Errorf("Request failed: %w", ctx.Err())
break selectloop
default:
pid := dcc.getCachedPid(identity, kind)
if pid == nil {
plog.Debug(fmt.Sprintf("Requesting %s:%s did not get PID from IdentityLookup", identity, kind))
counter = cfg.RetryAction(counter)
continue
}

resp, err = _context.RequestFuture(pid, message, ttl).Result()
if err != nil {
plog.Error("cluster.RequestFuture failed", log.Error(err), log.PID("pid", pid))
switch err {
case actor.ErrTimeout, remote.ErrTimeout, actor.ErrDeadLetter, remote.ErrDeadLetter:
counter = cfg.RetryAction(counter)
dcc.cluster.PidCache.Remove(identity, kind)
err = nil // reset our error variable as we can succeed still
continue
default:
break selectloop
}
}

// TODO: add metics to increment retries
}
}

totalTime := time.Now().Sub(start)
// TODO: add metrics ot set histogram for total request time

if contextError := ctx.Err(); contextError != nil && cfg.requestLogThrottle() == actor.Open {
// context timeout exceeded, report and return
plog.Warn(fmt.Sprintf("Request retried but failed for %s:%s, elapsed %v", identity, kind, totalTime))
}

return resp, err
}

// gets the cached PID for the given identity
// it can return nil if none is found
func (dcc *DefaultClusterContext) getCachedPid(identity, kind string) *actor.PID {

pid, _ := dcc.cluster.PidCache.Get(identity, kind)
return pid
}

// default retry action, it just sleeps incrementally
func defaultRetryAction(i int) int {

i++
time.Sleep(time.Duration(i * i * 50))
return i
}