Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add: kubernetes registry and remote package unit test #400

Merged
merged 66 commits into from
Mar 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
456f58f
Release 1.2.0
hxmhlt Oct 30, 2019
daa1fc3
Release 1.2.0
hxmhlt Oct 30, 2019
fe43eec
Merge branch 'master' into 1.2.0-release
hxmhlt Oct 30, 2019
11878c4
Release 1.2.0
hxmhlt Oct 30, 2019
55ab09e
Fix kubernetes import block and make map param
invalid-email-address Dec 20, 2019
32e5758
Fix the kubernetes && etcd registry race-condition
sxllwx Jan 13, 2020
723aa68
Fix bool value return
sxllwx Jan 13, 2020
2ec84e5
Delete the unused check block
sxllwx Jan 13, 2020
daac5e1
Add apache license
sxllwx Jan 13, 2020
347124d
Fix test embed etcd-server workdir
sxllwx Jan 13, 2020
460fed6
Delete the etcd test-server workdir after ut
sxllwx Jan 13, 2020
156bf40
Fix etcd work-dir conflict
sxllwx Jan 13, 2020
04ee311
fix latest issue
invalid-email-address Jan 18, 2020
5975ca0
Fix registry concurrent close panic
sxllwx Jan 19, 2020
00b1fb1
Merge pull request #289 from sxllwx/k8s
flycash Jan 24, 2020
7a71b3a
Merge develop branch
sxllwx Mar 13, 2020
4e5debc
Add ut for remote/kubernetes
sxllwx Mar 13, 2020
60eaf55
Delete unused method
sxllwx Mar 13, 2020
34ebc72
adapte for new registry
sxllwx Mar 13, 2020
29c8888
Add ut for registry/kubernetes
sxllwx Mar 13, 2020
f29c788
Fix ci client close race condition
sxllwx Mar 14, 2020
f11f6fb
Fix remote/kubernetes unit-test race condition
sxllwx Mar 14, 2020
e5c3ff5
Fix nil init
sxllwx Mar 14, 2020
9af6c53
Fix latest comment
sxllwx Mar 14, 2020
ae667e9
Fix latest alex comment
sxllwx Mar 14, 2020
059f9b8
Add double check for RWMutex
sxllwx Mar 14, 2020
d7ae998
Fix registry package unit test cover
sxllwx Mar 15, 2020
3349096
Add test cover for remote/kubernetes
sxllwx Mar 15, 2020
dfa8267
sync watch unit goroutine
sxllwx Mar 15, 2020
c7474fa
Fix method name bug,and handle the del event in config-listener.
sxllwx Mar 15, 2020
d776f8e
Fix remote/kubernetes sendMsg locker
sxllwx Mar 15, 2020
0886257
delete unused http pprof suite
sxllwx Mar 15, 2020
bf8bf85
Rename watcher and store name
sxllwx Mar 15, 2020
bc536bb
Fix named err
sxllwx Mar 15, 2020
7b13b44
Fix named err
sxllwx Mar 15, 2020
b9bf7d9
Fix time gap
sxllwx Mar 15, 2020
aee4f90
delete unused select case
sxllwx Mar 15, 2020
06b0da8
Add more rich log
sxllwx Mar 15, 2020
83d3975
move handle-client-restart from remote to registry
sxllwx Mar 15, 2020
603c1b9
Fix wg bug, add(1) out of goroutine
sxllwx Mar 15, 2020
8773a7d
Fix interest url slice-> map
sxllwx Mar 15, 2020
59fe063
Fix lock scop
sxllwx Mar 15, 2020
168a978
Fix kubernetes registry configListener nil condition
sxllwx Mar 15, 2020
92f9ea2
Fix zookeeper wg bug
sxllwx Mar 15, 2020
6b04b96
Fix create-path and push to test
sxllwx Mar 15, 2020
25f366c
Fix listener slice -> map
sxllwx Mar 15, 2020
5b87f8e
Fix missing protocol scheme bug
sxllwx Mar 15, 2020
b7af875
Fix nil point, the make slice will import a nil object in slice
sxllwx Mar 15, 2020
8941796
Merge pull request #1 from apache/develop
sxllwx Mar 15, 2020
1ed3e91
Fix go.sum conflict
sxllwx Mar 15, 2020
a47cf73
Fix go.sum conflict
sxllwx Mar 15, 2020
46f5c9a
Fix CHANGE.md
sxllwx Mar 15, 2020
d7a37ba
Fix CHANGE.md
sxllwx Mar 15, 2020
b88cf66
Mod: split long line codes
AlexStocks Mar 16, 2020
35624ed
Imp: set the init len for a map
AlexStocks Mar 16, 2020
eeaa817
Fix latest comment
sxllwx Mar 16, 2020
5a60988
Fix ci flow block issue
sxllwx Mar 16, 2020
14ba4ff
etcdv3 unit test, adapte for windows
sxllwx Mar 16, 2020
ab7db8a
simple the client validate method
sxllwx Mar 16, 2020
cef0d5d
short test time-cost for watch set
sxllwx Mar 16, 2020
938d5ed
Add more rich unit-test
sxllwx Mar 16, 2020
52a9e28
etcdv3 unit test, adapte for windows
sxllwx Mar 16, 2020
0769966
Fix ut nil pointer issue
sxllwx Mar 16, 2020
2eeb732
Fix kubernetes registry ut block issue
sxllwx Mar 16, 2020
164bf89
Fix kubernetes registry ut block issue
sxllwx Mar 16, 2020
e48b698
Add new registry block time, wait the watch groutine start
sxllwx Mar 16, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ require (
go.uber.org/zap v1.10.0
google.golang.org/grpc v1.22.1
gopkg.in/yaml.v2 v2.2.2
k8s.io/api v0.0.0-20190325185214-7544f9db76f6
sxllwx marked this conversation as resolved.
Show resolved Hide resolved
k8s.io/apimachinery v0.0.0-20190223001710-c182ff3b9841
k8s.io/client-go v8.0.0+incompatible
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a // indirect
sxllwx marked this conversation as resolved.
Show resolved Hide resolved
)

go 1.13
48 changes: 39 additions & 9 deletions go.sum

Large diffs are not rendered by default.

14 changes: 9 additions & 5 deletions registry/etcdv3/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ type dataListener struct {
listener config_center.ConfigurationListener
}

// NewRegistryDataListener ...
// NewRegistryDataListener
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener, interestedURL: []*common.URL{}}
return &dataListener{listener: listener}
}

func (l *dataListener) AddInterestedURL(url *common.URL) {
Expand All @@ -49,7 +49,12 @@ func (l *dataListener) AddInterestedURL(url *common.URL) {

func (l *dataListener) DataChange(eventType remoting.Event) bool {

url := eventType.Path[strings.Index(eventType.Path, "/providers/")+len("/providers/"):]
index := strings.Index(eventType.Path, "/providers/")
if index == -1 {
logger.Warnf("Listen with no url, event.path={%v}", eventType.Path)
return false
}
url := eventType.Path[index+len("/providers/"):]
serviceURL, err := common.NewURL(url)
if err != nil {
logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err)
Expand All @@ -68,7 +73,6 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool {
return true
}
}

return false
}

Expand Down Expand Up @@ -97,7 +101,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {

case e := <-l.events:
logger.Infof("got etcd event %#v", e)
if e.ConfigType == remoting.EventTypeDel {
if e.ConfigType == remoting.EventTypeDel && l.registry.client.Valid() {
select {
case <-l.registry.Done():
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
Expand Down
13 changes: 10 additions & 3 deletions registry/etcdv3/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
package etcdv3

import (
"os"
"testing"
"time"

"github.com/apache/dubbo-go/config_center"
)

import (
Expand All @@ -32,6 +31,7 @@ import (

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/remoting"
)

Expand All @@ -40,13 +40,16 @@ type RegistryTestSuite struct {
etcd *embed.Etcd
}

const defaultEtcdV3WorkDir = "/tmp/default-dubbo-go-registry.etcd"

// start etcd server
func (suite *RegistryTestSuite) SetupSuite() {

t := suite.T()

cfg := embed.NewConfig()
cfg.Dir = "/tmp/default.etcd"
// avoid conflict with default etcd work-dir
cfg.Dir = defaultEtcdV3WorkDir
e, err := embed.StartEtcd(cfg)
if err != nil {
t.Fatal(err)
Expand All @@ -66,6 +69,10 @@ func (suite *RegistryTestSuite) SetupSuite() {
// stop etcd server
func (suite *RegistryTestSuite) TearDownSuite() {
suite.etcd.Close()
// clean the etcd workdir
if err := os.RemoveAll(defaultEtcdV3WorkDir); err != nil {
suite.FailNow(err.Error())
}
}

func (suite *RegistryTestSuite) TestDataChange() {
Expand Down
4 changes: 2 additions & 2 deletions registry/etcdv3/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (suite *RegistryTestSuite) TestSubscribe() {
assert.Regexp(t, ".*ServiceEvent{Action{add}.*", serviceEvent.String())
}

func (suite *RegistryTestSuite) TestConsumerDestory() {
func (suite *RegistryTestSuite) TestConsumerDestroy() {

t := suite.T()
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
Expand All @@ -117,7 +117,7 @@ func (suite *RegistryTestSuite) TestConsumerDestory() {

}

func (suite *RegistryTestSuite) TestProviderDestory() {
func (suite *RegistryTestSuite) TestProviderDestroy() {

t := suite.T()
reg := initRegistry(t)
Expand Down
121 changes: 121 additions & 0 deletions registry/kubernetes/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kubernetes

import (
"strings"
)

import (
perrors "github.com/pkg/errors"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
)

type dataListener struct {
interestedURL []*common.URL
listener config_center.ConfigurationListener
}

// NewRegistryDataListener
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener}
}

// AddInterestedURL
func (l *dataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
}

// DataChange
// notify listen, when interest event
func (l *dataListener) DataChange(eventType remoting.Event) bool {

index := strings.Index(eventType.Path, "/providers/")
if index == -1 {
logger.Warnf("Listen with no url, event.path={%v}", eventType.Path)
return false
}
url := eventType.Path[index+len("/providers/"):]
serviceURL, err := common.NewURL(url)
if err != nil {
logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err)
return false
}

for _, v := range l.interestedURL {
if serviceURL.URLEqual(*v) {
l.listener.Process(
&config_center.ConfigChangeEvent{
Key: eventType.Path,
Value: serviceURL,
ConfigType: eventType.Action,
},
)
return true
}
}
return false
}

type configurationListener struct {
registry *kubernetesRegistry
events chan *config_center.ConfigChangeEvent
}

// NewConfigurationListener for listening the event of kubernetes.
func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener {
// add a new waiter
reg.WaitGroup().Add(1)
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
}

func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
sxllwx marked this conversation as resolved.
Show resolved Hide resolved
}

func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-l.registry.Done():
logger.Warnf("listener's kubernetes client connection is broken, so kubernetes event listener exits now.")
return nil, perrors.New("listener stopped")

case e := <-l.events:
logger.Infof("got kubernetes event %#v", e)
if e.ConfigType == remoting.EventTypeDel && !l.registry.client.Valid() {
select {
case <-l.registry.Done():
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
default:
}
continue
}
return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(common.URL)}, nil
}
}
}
func (l *configurationListener) Close() {
l.registry.WaitGroup().Done()
}
Loading