Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add github action script for testcases #607

Merged
merged 4 commits into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions .github/workflows/checks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: checks
on: [
pull_request,
workflow_dispatch,
]

jobs:
test:
name: run-testcases
runs-on: ubuntu-latest
strategy:
matrix:
go_version: ['1.18']
# services:
# redis:
# image: redis:5.0-alpine
# ports:
# - 6379:6379
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go_version }}
- uses: actions/cache@v3
with:
path: |
~/.cache/go-build
~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: start services
run: docker compose up -d
- name: run testcases
run: make test

52 changes: 44 additions & 8 deletions cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cluster

import (
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -71,15 +72,50 @@ func (p *inmemoryProvider) Shutdown(graceful bool) error {
return nil
}

func TestCluster_Call(t *testing.T) {
assert := assert.New(t)
type fakeIdentityLookup struct {
m sync.Map
}

system := actor.NewActorSystem()
func (l fakeIdentityLookup) Get(identity *ClusterIdentity) *actor.PID {
if val, ok := l.m.Load(identity.Identity); ok {
return val.(*actor.PID)
} else {
// pid := actor.NewPID("127.0.0.1", fmt.Sprintf("%s/%s", identity.Kind, identity.Identity))
// l.m.Store(identity.Identity, pid)
// return pid
}
return nil
}

func (l fakeIdentityLookup) RemovePid(identity *ClusterIdentity, pid *actor.PID) {
if existPid := l.Get(identity); existPid.Equal(pid) {
l.m.Delete(identity.Identity)
}
}

func (lu fakeIdentityLookup) Setup(cluster *Cluster, kinds []string, isClient bool) {

c := New(system, Configure("mycluster", nil, nil, remote.Configure("nonhost", 0)))
}

func (lu fakeIdentityLookup) Shutdown() {

}

func newClusterForTest(name string, cp ClusterProvider, opts ...ConfigOption) *Cluster {
system := actor.NewActorSystem()
lookup := fakeIdentityLookup{}
cfg := Configure(name, cp, &lookup, remote.Configure("127.0.0.1", 0), opts...)
c := New(system, cfg)

c.MemberList = NewMemberList(c)
c.Config.RequestTimeoutTime = 1 * time.Second
c.Remote = remote.NewRemote(system, c.Config.RemoteConfig)
return c
}

func TestCluster_Call(t *testing.T) {
t.Skipf("Maintaining")
assert := assert.New(t)

members := Members{
{
Expand All @@ -89,8 +125,8 @@ func TestCluster_Call(t *testing.T) {
Kinds: []string{"kind"},
},
}
c := newClusterForTest("mycluster", nil)
c.MemberList.UpdateClusterTopology(members)
// address := memberList.GetPartitionMember("name", "kind")
t.Run("invalid kind", func(t *testing.T) {
msg := struct{}{}
resp, err := c.Request("name", "nonkind", &msg)
Expand All @@ -115,7 +151,7 @@ func TestCluster_Call(t *testing.T) {
context.Respond(msg)
}
})
pid := system.Root.Spawn(testProps)
pid := c.ActorSystem.Root.Spawn(testProps)
assert.NotNil(pid)
c.PidCache.Set("name", "kind", pid)
t.Run("normal", func(t *testing.T) {
Expand All @@ -128,15 +164,15 @@ func TestCluster_Call(t *testing.T) {
}

func TestCluster_Get(t *testing.T) {
t.Skipf("Maintaining")
cp := newInmemoryProvider()
system := actor.NewActorSystem()
kind := NewKind("kind", actor.PropsFromFunc(func(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *actor.Started:
_ = msg
}
}))
c := New(system, Configure("mycluster", cp, nil, remote.Configure("127.0.0.1", 0), WithKinds(kind)))
c := newClusterForTest("mycluster", cp, WithKinds(kind))
c.StartMember()
cp.publishClusterTopologyEvent()
t.Run("invalid kind", func(t *testing.T) {
Expand Down
8 changes: 6 additions & 2 deletions cluster/clusterproviders/consul/consul_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package consul

import (
"fmt"
"github.com/asynkron/protoactor-go/cluster/identitylookup/disthash"
"net"
"strconv"
"testing"
"time"

"github.com/asynkron/protoactor-go/cluster/identitylookup/disthash"

"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/cluster"
"github.com/asynkron/protoactor-go/remote"
Expand All @@ -31,6 +32,7 @@ func newClusterForTest(name string, addr string, cp cluster.ClusterProvider) *cl
// use for test without start remote
c.ActorSystem.ProcessRegistry.Address = addr
c.MemberList = cluster.NewMemberList(c)
c.Remote = remote.NewRemote(c.ActorSystem, c.Config.RemoteConfig)
return c
}

Expand Down Expand Up @@ -64,7 +66,8 @@ func TestStartMember(t *testing.T) {
// member joined
members := []*cluster.Member{
{
Id: "mycluster@127.0.0.1:8000",
// Id: "mycluster@127.0.0.1:8000",
Id: fmt.Sprintf("%s", c.ActorSystem.Id),
Host: "127.0.0.1",
Port: 8000,
Kinds: []string{},
Expand All @@ -74,6 +77,7 @@ func TestStartMember(t *testing.T) {
expected := &cluster.ClusterTopology{
Members: members,
Joined: members,
Left: []*cluster.Member{},
TopologyHash: msg.TopologyHash,
}
assert.Equal(expected, msg)
Expand Down
10 changes: 7 additions & 3 deletions cluster/clusterproviders/etcd/etcd_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func newClusterForTest(name string, addr string, cp cluster.ClusterProvider) *cl
// use for test without start remote
c.ActorSystem.ProcessRegistry.Address = addr
c.MemberList = cluster.NewMemberList(c)
c.Remote = remote.NewRemote(c.ActorSystem, c.Config.RemoteConfig)
return c
}

Expand All @@ -36,7 +37,8 @@ func TestStartMember(t *testing.T) {
}
assert := assert.New(t)

p, _ := New()
p, err := New()
assert.NoError(err)
defer p.Shutdown(true)

c := newClusterForTest("test_etcd_provider", "127.0.0.1:8000", p)
Expand All @@ -48,7 +50,7 @@ func TestStartMember(t *testing.T) {
}
})

err := p.StartMember(c)
err = p.StartMember(c)
assert.NoError(err)

select {
Expand All @@ -60,7 +62,8 @@ func TestStartMember(t *testing.T) {
msg := m.(*cluster.ClusterTopology)
members := []*cluster.Member{
{
Id: "test_etcd_provider@127.0.0.1:8000",
// Id: "test_etcd_provider@127.0.0.1:8000",
Id: fmt.Sprintf("test_etcd_provider@%s", c.ActorSystem.Id),
Host: "127.0.0.1",
Port: 8000,
Kinds: []string{},
Expand All @@ -70,6 +73,7 @@ func TestStartMember(t *testing.T) {
expected := &cluster.ClusterTopology{
Members: members,
Joined: members,
Left: []*cluster.Member{},
TopologyHash: msg.TopologyHash,
}
assert.Equal(expected, msg)
Expand Down
19 changes: 16 additions & 3 deletions cluster/clusterproviders/k8s/k8s_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package k8s
import (
"context"
"fmt"
"github.com/asynkron/protoactor-go/cluster/identitylookup/disthash"
"net"
"os"
"strconv"
"testing"
"time"

"github.com/asynkron/protoactor-go/cluster/identitylookup/disthash"

"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/cluster"
"github.com/asynkron/protoactor-go/remote"
Expand All @@ -17,7 +19,6 @@ import (
)

func newClusterForTest(name string, addr string, cp cluster.ClusterProvider, id cluster.IdentityLookup) *cluster.Cluster {

host, _port, err := net.SplitHostPort(addr)
if err != nil {
panic(err)
Expand All @@ -32,14 +33,17 @@ func newClusterForTest(name string, addr string, cp cluster.ClusterProvider, id
// use for test without start remote
c.ActorSystem.ProcessRegistry.Address = addr
c.MemberList = cluster.NewMemberList(c)
c.Remote = remote.NewRemote(system, config.RemoteConfig)
return c
}

func TestStartMember(t *testing.T) {

if testing.Short() {
return
}
if os.Getenv("KUBERNETES_SERVICE_HOST") == "" {
t.Skipf("Skipped k8s testcases")
}
assert := assert.New(t)

p, _ := New()
Expand Down Expand Up @@ -90,6 +94,9 @@ func TestRegisterMultipleMembers(t *testing.T) {
if testing.Short() {
return
}
if os.Getenv("KUBERNETES_SERVICE_HOST") == "" {
t.Skipf("Skipped k8s testcases")
}
assert := assert.New(t)

members := []struct {
Expand Down Expand Up @@ -128,6 +135,9 @@ func TestUpdateMemberState(t *testing.T) {
if testing.Short() {
return
}
if os.Getenv("KUBERNETES_SERVICE_HOST") == "" {
t.Skipf("Skipped k8s testcases")
}
assert := assert.New(t)

p, _ := New()
Expand All @@ -143,6 +153,9 @@ func TestUpdateMemberState_DoesNotReregisterAfterShutdown(t *testing.T) {
if testing.Short() {
return
}
if os.Getenv("KUBERNETES_SERVICE_HOST") == "" {
t.Skipf("Skipped k8s testcases")
}
assert := assert.New(t)

p, _ := New()
Expand Down
4 changes: 2 additions & 2 deletions cluster/clusterproviders/zk/zk_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,8 @@ func (p *Provider) updateNodesWithSelf(members []*Node, version int32) {
p.members[p.self.ID] = p.self
}

func (p *Provider) createClusterTopologyEvent() cluster.TopologyEvent {
res := make(cluster.TopologyEvent, len(p.members))
func (p *Provider) createClusterTopologyEvent() []*cluster.Member {
res := make([]*cluster.Member, len(p.members))
i := 0
for _, m := range p.members {
res[i] = m.MemberStatus()
Expand Down
9 changes: 9 additions & 0 deletions cluster/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cluster

import (
"fmt"
"time"

"github.com/asynkron/protoactor-go/actor"
Expand Down Expand Up @@ -61,6 +62,14 @@ func (c *Config) ToClusterContextConfig() *ClusterContextConfig {
ActorRequestTimeout: c.RequestTimeoutTime,
RequestsLogThrottlePeriod: c.RequestsLogThrottlePeriod,
MaxNumberOfEventsInRequestLogThrottledPeriod: c.MaxNumberOfEventsInRequestLogThrottledPeriod,
RetryAction: defaultRetryAction,
requestLogThrottle: actor.NewThrottle(
int32(defaultMaxNumberOfEvetsInRequestLogThrottledPeriod),
defaultRequestsLogThrottlePeriod,
func(i int32) {
plog.Info(fmt.Sprintf("Throttled %d Request logs", i))
},
),
}
return &clusterContextConfig
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/default_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ selectloop:
}
}

totalTime := time.Now().Sub(start)
totalTime := time.Since(start)
// TODO: add metrics ot set histogram for total request time

if contextError := ctx.Err(); contextError != nil && cfg.requestLogThrottle() == actor.Open {
Expand Down
Loading