diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index b9a3c1051..71bc8048e 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -80,6 +80,8 @@ func TestCluster_Call(t *testing.T) { c.MemberList = NewMemberList(c) c.Config.RequestTimeoutTime = 1 * time.Second + c.Config.ToClusterContextConfig() + c.Remote = remote.NewRemote(system, c.Config.RemoteConfig) members := Members{ { diff --git a/cluster/clusterproviders/consul/consul_provider_test.go b/cluster/clusterproviders/consul/consul_provider_test.go index d6df4d8f4..bf6d22b1d 100644 --- a/cluster/clusterproviders/consul/consul_provider_test.go +++ b/cluster/clusterproviders/consul/consul_provider_test.go @@ -2,12 +2,13 @@ package consul import ( "fmt" - "github.com/asynkron/protoactor-go/cluster/identitylookup/disthash" "net" "strconv" "testing" "time" + "github.com/asynkron/protoactor-go/cluster/identitylookup/disthash" + "github.com/asynkron/protoactor-go/actor" "github.com/asynkron/protoactor-go/cluster" "github.com/asynkron/protoactor-go/remote" @@ -31,6 +32,7 @@ func newClusterForTest(name string, addr string, cp cluster.ClusterProvider) *cl // use for test without start remote c.ActorSystem.ProcessRegistry.Address = addr c.MemberList = cluster.NewMemberList(c) + c.Remote = remote.NewRemote(c.ActorSystem, c.Config.RemoteConfig) return c } @@ -64,7 +66,8 @@ func TestStartMember(t *testing.T) { // member joined members := []*cluster.Member{ { - Id: "mycluster@127.0.0.1:8000", + // Id: "mycluster@127.0.0.1:8000", + Id: fmt.Sprintf("%s", c.ActorSystem.Id), Host: "127.0.0.1", Port: 8000, Kinds: []string{}, @@ -74,6 +77,7 @@ func TestStartMember(t *testing.T) { expected := &cluster.ClusterTopology{ Members: members, Joined: members, + Left: []*cluster.Member{}, TopologyHash: msg.TopologyHash, } assert.Equal(expected, msg) diff --git a/cluster/clusterproviders/etcd/etcd_provider_test.go b/cluster/clusterproviders/etcd/etcd_provider_test.go index 4c943f1cd..e17ee610d 100644 --- a/cluster/clusterproviders/etcd/etcd_provider_test.go +++ b/cluster/clusterproviders/etcd/etcd_provider_test.go @@ -27,6 +27,7 @@ func newClusterForTest(name string, addr string, cp cluster.ClusterProvider) *cl // use for test without start remote c.ActorSystem.ProcessRegistry.Address = addr c.MemberList = cluster.NewMemberList(c) + c.Remote = remote.NewRemote(c.ActorSystem, c.Config.RemoteConfig) return c } @@ -36,7 +37,8 @@ func TestStartMember(t *testing.T) { } assert := assert.New(t) - p, _ := New() + p, err := New() + assert.NoError(err) defer p.Shutdown(true) c := newClusterForTest("test_etcd_provider", "127.0.0.1:8000", p) @@ -48,7 +50,7 @@ func TestStartMember(t *testing.T) { } }) - err := p.StartMember(c) + err = p.StartMember(c) assert.NoError(err) select { @@ -60,7 +62,8 @@ func TestStartMember(t *testing.T) { msg := m.(*cluster.ClusterTopology) members := []*cluster.Member{ { - Id: "test_etcd_provider@127.0.0.1:8000", + // Id: "test_etcd_provider@127.0.0.1:8000", + Id: fmt.Sprintf("test_etcd_provider@%s", c.ActorSystem.Id), Host: "127.0.0.1", Port: 8000, Kinds: []string{}, @@ -70,6 +73,7 @@ func TestStartMember(t *testing.T) { expected := &cluster.ClusterTopology{ Members: members, Joined: members, + Left: []*cluster.Member{}, TopologyHash: msg.TopologyHash, } assert.Equal(expected, msg) diff --git a/cluster/clusterproviders/zk/zk_provider.go b/cluster/clusterproviders/zk/zk_provider.go index 99e19d14a..6d4b74fba 100644 --- a/cluster/clusterproviders/zk/zk_provider.go +++ b/cluster/clusterproviders/zk/zk_provider.go @@ -469,8 +469,8 @@ func (p *Provider) updateNodesWithSelf(members []*Node, version int32) { p.members[p.self.ID] = p.self } -func (p *Provider) createClusterTopologyEvent() cluster.TopologyEvent { - res := make(cluster.TopologyEvent, len(p.members)) +func (p *Provider) createClusterTopologyEvent() []*cluster.Member { + res := make([]*cluster.Member, len(p.members)) i := 0 for _, m := range p.members { res[i] = m.MemberStatus() diff --git a/cluster/config.go b/cluster/config.go index 3aff33455..c8adc45df 100644 --- a/cluster/config.go +++ b/cluster/config.go @@ -1,6 +1,7 @@ package cluster import ( + "fmt" "time" "github.com/asynkron/protoactor-go/actor" @@ -61,6 +62,14 @@ func (c *Config) ToClusterContextConfig() *ClusterContextConfig { ActorRequestTimeout: c.RequestTimeoutTime, RequestsLogThrottlePeriod: c.RequestsLogThrottlePeriod, MaxNumberOfEventsInRequestLogThrottledPeriod: c.MaxNumberOfEventsInRequestLogThrottledPeriod, + RetryAction: defaultRetryAction, + requestLogThrottle: actor.NewThrottle( + int32(defaultMaxNumberOfEvetsInRequestLogThrottledPeriod), + defaultRequestsLogThrottlePeriod, + func(i int32) { + plog.Info(fmt.Sprintf("Throttled %d Request logs", i)) + }, + ), } return &clusterContextConfig } diff --git a/cluster/default_context.go b/cluster/default_context.go index 23e083f66..b4a26548d 100644 --- a/cluster/default_context.go +++ b/cluster/default_context.go @@ -86,7 +86,7 @@ selectloop: } } - totalTime := time.Now().Sub(start) + totalTime := time.Since(start) // TODO: add metrics ot set histogram for total request time if contextError := ctx.Err(); contextError != nil && cfg.requestLogThrottle() == actor.Open {