Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: change register event chan to unbundent chan #1330

Merged
merged 1 commit into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/apache/dubbo-go-hessian2 v1.9.2
github.com/creasty/defaults v1.5.1
github.com/dubbogo/go-zookeeper v1.0.3
github.com/dubbogo/gost v1.11.13
github.com/dubbogo/gost v1.11.14
github.com/dubbogo/triple v1.0.1
github.com/emicklei/go-restful/v3 v3.4.0
github.com/frankban/quicktest v1.4.1 // indirect
Expand Down
8 changes: 2 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDH
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apache/dubbo-getty v1.4.3 h1:PCKpryDasKOxwT5MBC6MIMO+0NLOaHF6Xco9YXQw7HI=
github.com/apache/dubbo-getty v1.4.3/go.mod h1:ansXgKxxyhCOiQL29nO5ce1MDcEKmCyZuNR9oMs3hek=
github.com/apache/dubbo-getty v1.4.4 h1:pthYQaCXyjHJ6/SjVwKkX5NhdAqSpUrRL1Z9GowrLdE=
github.com/apache/dubbo-getty v1.4.4/go.mod h1:mcDyiu7M/TVrYDyL8TxDemQkOdvEqqHSQ4jOuYejY1w=
github.com/apache/dubbo-go-hessian2 v1.9.1/go.mod h1:xQUjE7F8PX49nm80kChFvepA/AvqAZ0oh/UaB6+6pBE=
github.com/apache/dubbo-go-hessian2 v1.9.2 h1:XuI8KvENSfKiAhiCBS4RNihmQDoPNmGWKT3gTui0p9A=
github.com/apache/dubbo-go-hessian2 v1.9.2/go.mod h1:xQUjE7F8PX49nm80kChFvepA/AvqAZ0oh/UaB6+6pBE=
Expand Down Expand Up @@ -130,9 +128,8 @@ github.com/dubbogo/go-zookeeper v1.0.3 h1:UkuY+rBsxdT7Bs63QAzp9z7XqQ53W1j8E5rwl8
github.com/dubbogo/go-zookeeper v1.0.3/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/gost v1.9.0/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.10.1/go.mod h1:+mQGS51XQEUWZP2JeGZTxJwipjRKtJO7Tr+FOg+72rI=
github.com/dubbogo/gost v1.11.12/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/gost v1.11.13 h1:sWvK1QbHpPBMmRQJV9qIH3syLegQBQa4xAPof3/Kv5c=
github.com/dubbogo/gost v1.11.13/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/gost v1.11.14 h1:9lfcdILOmqTOVAW1fPHa5uf1NrD6jlIOBe4vf8576yQ=
github.com/dubbogo/gost v1.11.14/go.mod h1:vIcP9rqz2KsXHPjsAwIUtfJIJjppQLQDcYaZTy/61jI=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
github.com/dubbogo/net v0.0.3 h1:2k53mh+1U8h1gFjJ8ykzyP4wNdAdgjc5moD+xVHI/AE=
github.com/dubbogo/net v0.0.3/go.mod h1:B6/ka3g8VzcyrmdCH4VkHP1K0aHeI37FmclS+TCwIBU=
Expand Down Expand Up @@ -430,7 +427,6 @@ github.com/modern-go/reflect2 v0.0.0-20180320133207-05fbef0ca5da/go.mod h1:bx2lN
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
Expand Down
10 changes: 6 additions & 4 deletions registry/etcdv3/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
)

import (
gxchan "github.com/dubbogo/gost/container/chan"
perrors "github.com/pkg/errors"
)

Expand Down Expand Up @@ -80,20 +81,20 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool {

type configurationListener struct {
registry *etcdV3Registry
events chan *config_center.ConfigChangeEvent
events *gxchan.UnboundedChan
closeOnce sync.Once
}

// NewConfigurationListener for listening the event of etcdv3.
func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
// add a new waiter
reg.WaitGroup().Add(1)
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
return &configurationListener{registry: reg, events: gxchan.NewUnboundedChan(32)}
}

// Process data change event from config center of etcd
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
l.events.In() <- configType
}

// Next returns next service event once received
Expand All @@ -104,7 +105,8 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
logger.Warnf("listener's etcd client connection is broken, so etcd event listener exit now.")
return nil, perrors.New("listener stopped")

case e := <-l.events:
case val := <-l.events.Out():
e, _ := val.(*config_center.ConfigChangeEvent)
logger.Infof("got etcd event %#v", e)
if e.ConfigType == remoting.EventTypeDel && l.registry.client.Valid() {
select {
Expand Down
10 changes: 6 additions & 4 deletions registry/kubernetes/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
)

import (
gxchan "github.com/dubbogo/gost/container/chan"
perrors "github.com/pkg/errors"
)

Expand Down Expand Up @@ -80,19 +81,19 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool {

type configurationListener struct {
registry *kubernetesRegistry
events chan *config_center.ConfigChangeEvent
events *gxchan.UnboundedChan
}

// NewConfigurationListener for listening the event of kubernetes.
func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener {
// add a new waiter
reg.WaitGroup().Add(1)
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
return &configurationListener{registry: reg, events: gxchan.NewUnboundedChan(32)}
}

// Process processes the data change event from config center of kubernetes
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
l.events.In() <- configType
}

// Next returns next service event once received
Expand All @@ -103,7 +104,8 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
logger.Warnf("listener's kubernetes client connection is broken, so kubernetes event listener exits now.")
return nil, perrors.New("listener stopped")

case e := <-l.events:
case val := <-l.events.Out():
e, _ := val.(*config_center.ConfigChangeEvent)
logger.Debugf("got kubernetes event %#v", e)
if e.ConfigType == remoting.EventTypeDel && !l.registry.client.Valid() {
select {
Expand Down
15 changes: 9 additions & 6 deletions registry/nacos/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
)

import (
gxchan "github.com/dubbogo/gost/container/chan"
nacosClient "github.com/dubbogo/gost/database/kv/nacos"
"github.com/nacos-group/nacos-sdk-go/model"
"github.com/nacos-group/nacos-sdk-go/vo"
Expand All @@ -44,7 +45,7 @@ import (
type nacosListener struct {
namingClient *nacosClient.NacosNamingClient
listenUrl *common.URL
events chan *config_center.ConfigChangeEvent
events *gxchan.UnboundedChan
instanceMap map[string]model.Instance
cacheLock sync.Mutex
done chan struct{}
Expand All @@ -55,9 +56,10 @@ type nacosListener struct {
func NewNacosListener(url *common.URL, namingClient *nacosClient.NacosNamingClient) (*nacosListener, error) {
listener := &nacosListener{
namingClient: namingClient,
listenUrl: url, events: make(chan *config_center.ConfigChangeEvent, 32),
instanceMap: map[string]model.Instance{},
done: make(chan struct{}),
listenUrl: url,
events: gxchan.NewUnboundedChan(32),
instanceMap: map[string]model.Instance{},
done: make(chan struct{}),
}
err := listener.startListen()
return listener, err
Expand Down Expand Up @@ -198,7 +200,7 @@ func (nl *nacosListener) stopListen() error {
}

func (nl *nacosListener) process(configType *config_center.ConfigChangeEvent) {
nl.events <- configType
nl.events.In() <- configType
}

// Next returns the service event from nacos.
Expand All @@ -209,7 +211,8 @@ func (nl *nacosListener) Next() (*registry.ServiceEvent, error) {
logger.Warnf("nacos listener is close!listenUrl:%+v", nl.listenUrl)
return nil, perrors.New("listener stopped")

case e := <-nl.events:
case val := <-nl.events.Out():
e, _ := val.(*config_center.ConfigChangeEvent)
logger.Debugf("got nacos event %s", e)
return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(*common.URL)}, nil
}
Expand Down
10 changes: 6 additions & 4 deletions registry/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
)

import (
gxchan "github.com/dubbogo/gost/container/chan"
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
perrors "github.com/pkg/errors"
)
Expand Down Expand Up @@ -116,7 +117,7 @@ func (l *RegistryDataListener) Close() {
type RegistryConfigurationListener struct {
client *gxzookeeper.ZookeeperClient
registry *zkRegistry
events chan *config_center.ConfigChangeEvent
events *gxchan.UnboundedChan
isClosed bool
close chan struct{}
closeOnce sync.Once
Expand All @@ -129,7 +130,7 @@ func NewRegistryConfigurationListener(client *gxzookeeper.ZookeeperClient, reg *
return &RegistryConfigurationListener{
client: client,
registry: reg,
events: make(chan *config_center.ConfigChangeEvent, 32),
events: gxchan.NewUnboundedChan(32),
isClosed: false,
close: make(chan struct{}, 1),
subscribeURL: conf,
Expand All @@ -138,7 +139,7 @@ func NewRegistryConfigurationListener(client *gxzookeeper.ZookeeperClient, reg *

// Process submit the ConfigChangeEvent to the event chan to notify all observer
func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
l.events.In() <- configType
}

// Next will observe the registry state and events chan
Expand All @@ -150,7 +151,8 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
case <-l.registry.Done():
logger.Warnf("zk consumer register has quit, so zk event listener exit now. (registry url {%v}", l.registry.BaseRegistry.URL)
return nil, perrors.New("zookeeper registry, (registry url{%v}) stopped")
case e := <-l.events:
case val := <-l.events.Out():
e, _ := val.(*config_center.ConfigChangeEvent)
logger.Debugf("got zk event %s", e)
if e.ConfigType == remoting.EventTypeDel && !l.valid() {
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
Expand Down