Skip to content

Commit

Permalink
Merge branch '3.0' into issue1152
Browse files Browse the repository at this point in the history
  • Loading branch information
xg.gao committed May 8, 2021
2 parents 8a61049 + f421172 commit 850e67d
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 41 deletions.
2 changes: 1 addition & 1 deletion cluster/cluster_impl/failback_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation pr
logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
methodName, url.Service(), result.Error().Error())
// ignore
return &protocol.RPCResult{Err: result.Error()}
return &protocol.RPCResult{}
}
return result
}
Expand Down
2 changes: 1 addition & 1 deletion config/config_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func setDefaultValue(target interface{}) {
if len(p.Protocols) == 0 {
p.Protocols[constant.DEFAULT_PROTOCOL] = &ProtocolConfig{
Name: constant.DEFAULT_PROTOCOL,
Port: string(constant.DEFAULT_PORT),
Port: strconv.Itoa(constant.DEFAULT_PORT),
}
}
if p.ApplicationConfig == nil {
Expand Down
2 changes: 1 addition & 1 deletion config_center/zookeeper/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (
)

func initZkData(group string, t *testing.T) (*zk.TestCluster, *zookeeperDynamicConfiguration) {
ts, err := zk.StartTestCluster(1, nil, nil)
ts, err := zk.StartTestCluster(1, nil, nil, zk.WithRetryTimes(20))
assert.NoError(t, err)
assert.NotNil(t, ts.Servers[0])
urlString := "registry://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)
Expand Down
2 changes: 1 addition & 1 deletion config_center/zookeeper/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

func initZkDynamicConfiguration(t *testing.T) (*zk.TestCluster, *zookeeperDynamicConfiguration) {
ts, err := zk.StartTestCluster(1, nil, nil)
ts, err := zk.StartTestCluster(1, nil, nil, zk.WithRetryTimes(20))
assert.NoError(t, err)
assert.NotNil(t, ts.Servers[0])
urlString := "registry://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/creasty/defaults v1.5.1
github.com/docker/go-connections v0.4.0 // indirect
github.com/dubbogo/go-zookeeper v1.0.3
github.com/dubbogo/gost v1.11.7
github.com/dubbogo/gost v1.11.8
github.com/dubbogo/triple v0.1.3
github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect
github.com/emicklei/go-restful/v3 v3.4.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ github.com/dubbogo/go-zookeeper v1.0.3 h1:UkuY+rBsxdT7Bs63QAzp9z7XqQ53W1j8E5rwl8
github.com/dubbogo/go-zookeeper v1.0.3/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.10.1/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI=
github.com/dubbogo/gost v1.11.7 h1:mS2nuUOhOQmHSPnbs/94GakMigcKhzRr0TFLreiqNyo=
github.com/dubbogo/gost v1.11.7/go.mod h1:2nB8jSrxVPwW5DBsRu3FZQH1+Ual3wnRHwFqjG9+4PY=
github.com/dubbogo/gost v1.11.8 h1:OPTG4qIyNQ949GbdgHvpvYiVNno/X/YBozOVBLuNkS4=
github.com/dubbogo/gost v1.11.8/go.mod h1:2nB8jSrxVPwW5DBsRu3FZQH1+Ual3wnRHwFqjG9+4PY=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
github.com/dubbogo/net v0.0.2-0.20210326124702-e6a866993192 h1:CBEicrrVwR6u8ty+kL68ItxXVk1jaVYThrsx5ARhxUc=
github.com/dubbogo/net v0.0.2-0.20210326124702-e6a866993192/go.mod h1:B6/ka3g8VzcyrmdCH4VkHP1K0aHeI37FmclS+TCwIBU=
Expand Down
2 changes: 1 addition & 1 deletion metadata/report/zookeeper/report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (suite *zookeeperMetadataReportTestSuite) testGetServiceDefinition() {
}

func test1(t *testing.T) {
testCluster, err := zk.StartTestCluster(1, nil, nil)
testCluster, err := zk.StartTestCluster(1, nil, nil, zk.WithRetryTimes(20))
assert.NoError(t, err)
defer func() {
err := testCluster.Stop()
Expand Down
35 changes: 25 additions & 10 deletions registry/nacos/service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/apache/dubbo-go/common/observer"
"github.com/apache/dubbo-go/common/observer/dispatcher"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/metadata/mapping"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/registry/event"
)
Expand Down Expand Up @@ -69,16 +70,6 @@ func Test_newNacosServiceDiscovery(t *testing.T) {
assert.NotNil(t, res)
}

func TestNacosServiceDiscovery_Destroy(t *testing.T) {
prepareData()
serviceDiscovery, err := extension.GetServiceDiscovery(constant.NACOS_KEY, testName)
assert.Nil(t, err)
assert.NotNil(t, serviceDiscovery)
err = serviceDiscovery.Destroy()
assert.Nil(t, err)
assert.Nil(t, serviceDiscovery.(*nacosServiceDiscovery).namingClient)
}

func TestNacosServiceDiscovery_CRUD(t *testing.T) {
if !checkNacosServerAlive() {
return
Expand All @@ -88,6 +79,10 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) {
return &dispatcher.MockEventDispatcher{}
})

extension.SetGlobalServiceNameMapping(func() mapping.ServiceNameMapping {
return &mockServiceNameMapping{}
})

extension.SetAndInitGlobalDispatcher("mock")
rand.Seed(time.Now().Unix())
serviceName := "service-name" + strconv.Itoa(rand.Intn(10000))
Expand Down Expand Up @@ -171,6 +166,16 @@ func TestNacosServiceDiscovery_GetDefaultPageSize(t *testing.T) {
assert.Equal(t, registry.DefaultPageSize, serviceDiscovery.GetDefaultPageSize())
}

func TestNacosServiceDiscovery_Destroy(t *testing.T) {
prepareData()
serviceDiscovery, err := extension.GetServiceDiscovery(constant.NACOS_KEY, testName)
assert.Nil(t, err)
assert.NotNil(t, serviceDiscovery)
err = serviceDiscovery.Destroy()
assert.Nil(t, err)
assert.Nil(t, serviceDiscovery.(*nacosServiceDiscovery).namingClient)
}

func prepareData() {
config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{
Protocol: "nacos",
Expand All @@ -182,3 +187,13 @@ func prepareData() {
TimeoutStr: "10s",
}
}

type mockServiceNameMapping struct{}

func (m *mockServiceNameMapping) Map(string, string, string, string) error {
return nil
}

func (m *mockServiceNameMapping) Get(string, string, string, string) (*gxset.HashSet, error) {
panic("implement me")
}
118 changes: 98 additions & 20 deletions registry/zookeeper/service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,29 @@
package zookeeper

import (
"github.com/apache/dubbo-go/registry/event"
gxset "github.com/dubbogo/gost/container/set"
"context"
"strconv"
"sync"
"testing"
)

import (
"github.com/dubbogo/go-zookeeper/zk"
gxset "github.com/dubbogo/gost/container/set"
"github.com/stretchr/testify/assert"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/observer"
"github.com/apache/dubbo-go/common/observer/dispatcher"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/metadata/mapping"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/registry/event"
)

var testName = "test"
Expand All @@ -43,7 +49,7 @@ var tc *zk.TestCluster

func prepareData(t *testing.T) *zk.TestCluster {
var err error
tc, err = zk.StartTestCluster(1, nil, nil)
tc, err = zk.StartTestCluster(1, nil, nil, zk.WithRetryTimes(20))
assert.NoError(t, err)
assert.NotNil(t, tc.Servers[0])
address := "127.0.0.1:" + strconv.Itoa(tc.Servers[0].Port)
Expand Down Expand Up @@ -80,22 +86,34 @@ func TestNewZookeeperServiceDiscovery(t *testing.T) {

func TestCURDZookeeperServiceDiscovery(t *testing.T) {
prepareData(t)
extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
return &dispatcher.MockEventDispatcher{}
})
extension.SetGlobalServiceNameMapping(func() mapping.ServiceNameMapping {
return &mockServiceNameMapping{}
})

extension.SetProtocol("mock", func() protocol.Protocol {
return &mockProtocol{}
})

sd, err := newZookeeperServiceDiscovery(testName)
assert.Nil(t, err)
defer func() {
_ = sd.Destroy()
}()
md := make(map[string]string)
md["t1"] = "test1"
err = sd.Register(&registry.DefaultServiceInstance{
ins := &registry.DefaultServiceInstance{
ID: "testID",
ServiceName: testName,
Host: "127.0.0.1",
Port: 2233,
Enable: true,
Healthy: true,
Metadata: md,
})
Metadata: nil,
}
ins.Metadata = map[string]string{"t1": "test1", constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME: `{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`}
err = sd.Register(ins)

assert.Nil(t, err)

testsPager := sd.GetHealthyInstancesByPage(testName, 0, 1, true)
Expand All @@ -105,16 +123,18 @@ func TestCURDZookeeperServiceDiscovery(t *testing.T) {
assert.Equal(t, "127.0.0.1:2233", test.GetID())
assert.Equal(t, "test1", test.GetMetadata()["t1"])

md["t1"] = "test12"
err = sd.Update(&registry.DefaultServiceInstance{
ins = &registry.DefaultServiceInstance{
ID: "testID",
ServiceName: testName,
Host: "127.0.0.1",
Port: 2233,
Enable: true,
Healthy: true,
Metadata: md,
})
}
ins.Metadata = map[string]string{"t1": "test12", constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME: `{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`}

err = sd.Update(ins)

assert.Nil(t, err)

testsPager = sd.GetInstancesByPage(testName, 0, 1)
Expand Down Expand Up @@ -154,15 +174,18 @@ func TestAddListenerZookeeperServiceDiscovery(t *testing.T) {
_ = sd.Destroy()
}()

err = sd.Register(&registry.DefaultServiceInstance{
ins := &registry.DefaultServiceInstance{
ID: "testID",
ServiceName: testName,
Host: "127.0.0.1",
Port: 2233,
Enable: true,
Healthy: true,
Metadata: nil,
})
}
ins.Metadata = map[string]string{"t1": "test12", constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME: `{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`}
err = sd.Register(ins)

assert.Nil(t, err)
wg := &sync.WaitGroup{}
wg.Add(1)
Expand All @@ -172,21 +195,25 @@ func TestAddListenerZookeeperServiceDiscovery(t *testing.T) {
}
hs := gxset.NewSet()
hs.Add(testName)

sicl := event.NewServiceInstancesChangedListener(hs)
sicl.AddListenerAndNotify(testName, tn)
extension.SetAndInitGlobalDispatcher("direct")
extension.GetGlobalDispatcher().AddEventListener(sicl)
err = sd.AddListener(sicl)
assert.NoError(t, err)

err = sd.Update(&registry.DefaultServiceInstance{
ins = &registry.DefaultServiceInstance{
ID: "testID",
ServiceName: testName,
Host: "127.0.0.1",
Port: 2233,
Enable: true,
Healthy: true,
Metadata: nil,
})
}
ins.Metadata = map[string]string{"t1": "test12", constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME: `{"protocol":"mock","timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"2233"}`}
err = sd.Update(ins)
assert.NoError(t, err)
tn.wg.Wait()
}
Expand All @@ -196,9 +223,60 @@ type testNotify struct {
t *testing.T
}

func (tn *testNotify) Notify(e observer.Event) {
ice := e.(*registry.ServiceInstancesChangedEvent)
assert.Equal(tn.t, 1, len(ice.Instances))
assert.Equal(tn.t, "127.0.0.1:2233", ice.Instances[0].GetID())
func (tn *testNotify) Notify(e *registry.ServiceEvent) {
assert.Equal(tn.t, "2233", e.Service.Port)
tn.wg.Done()
}
func (tn *testNotify) NotifyAll([]*registry.ServiceEvent, func()) {

}

type mockServiceNameMapping struct{}

func (m *mockServiceNameMapping) Map(string, string, string, string) error {
return nil
}

func (m *mockServiceNameMapping) Get(string, string, string, string) (*gxset.HashSet, error) {
return gxset.NewSet(config.GetApplicationConfig().Name), nil
}

type mockProtocol struct{}

func (m mockProtocol) Export(protocol.Invoker) protocol.Exporter {
panic("implement me")
}

func (m mockProtocol) Refer(*common.URL) protocol.Invoker {
return &mockInvoker{}
}

func (m mockProtocol) Destroy() {
panic("implement me")
}

type mockInvoker struct{}

func (m *mockInvoker) GetURL() *common.URL {
panic("implement me")
}

func (m *mockInvoker) IsAvailable() bool {
panic("implement me")
}

func (m *mockInvoker) Destroy() {
panic("implement me")
}

func (m *mockInvoker) Invoke(context.Context, protocol.Invocation) protocol.Result {
// for getMetadataInfo and ServiceInstancesChangedListenerImpl onEvent
serviceInfo := &common.ServiceInfo{ServiceKey: "test", MatchKey: "test"}
services := make(map[string]*common.ServiceInfo)
services["test"] = serviceInfo
return &protocol.RPCResult{
Rest: &common.MetadataInfo{
Services: services,
},
}
}
6 changes: 3 additions & 3 deletions remoting/etcdv3/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,23 +101,23 @@ func TestListener(t *testing.T) {
{input: struct {
k string
v string
}{k: "/dubbo", v: changedData}},
}{k: "/dubbo/", v: changedData}},
}
SetUpEtcdServer(t)
c, err := gxetcd.NewClient("test", []string{"localhost:2381"}, time.Second, 1)
assert.NoError(t, err)

listener := NewEventListener(c)
dataListener := &mockDataListener{client: c, changedData: changedData, rc: make(chan remoting.Event)}
listener.ListenServiceEvent("/dubbo", dataListener)
listener.ListenServiceEvent("/dubbo/", dataListener)

// NOTICE: direct listen will lose create msg
time.Sleep(time.Second)
for _, tc := range tests {

k := tc.input.k
v := tc.input.v
if err := c.Create(k, v); err != nil {
if err := c.Update(k, v); err != nil {
t.Fatal(err)
}

Expand Down

0 comments on commit 850e67d

Please sign in to comment.