Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #750. registry center notify with complete address list #758

Merged
merged 24 commits into from
Dec 18, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
3053a22
fix #750. 1.fix empty ServiceEvent, we should clear all invokers 2.w…
cvictory Sep 16, 2020
fa94763
add test for clear all address
cvictory Sep 16, 2020
c3aac14
fix nacos test
cvictory Sep 16, 2020
2a6c0a1
fix nacos test
cvictory Sep 16, 2020
94496c5
fix review issue: add constraint of Action value of ServiceEvent in N…
cvictory Sep 23, 2020
eb1b730
Merge branch 'develop' of github.com:apache/dubbo-go into notify_all
cvictory Sep 25, 2020
6053327
fix #750. 1.fix unit test
cvictory Sep 25, 2020
b90bb44
Merge branch 'develop' of github.com:apache/dubbo-go into notify_all
cvictory Sep 27, 2020
3389b38
fix: if the registry center notify address list, it cannot remove inv…
cvictory Sep 27, 2020
634e8d2
enhance the refresh func
cvictory Oct 13, 2020
45c926b
remove doc
cvictory Oct 13, 2020
5f31e3a
enhance the refresh func
cvictory Oct 13, 2020
0c94049
Merge branch 'develop' of github.com:apache/dubbo-go into notify_all
cvictory Oct 14, 2020
d5ffb62
add some debug log
cvictory Oct 18, 2020
2757a1a
use goroutine to destroy invoker
cvictory Nov 13, 2020
51ee140
Merge branch 'develop' of github.com:apache/dubbo-go into notify_all
cvictory Nov 13, 2020
60f53ab
fix merge issue and optimize some invalid usage
cvictory Dec 9, 2020
f4d5df2
fix log error
cvictory Dec 9, 2020
068505d
refactor some code
cvictory Dec 9, 2020
af608d8
trigger init router for first call
zouyx Dec 10, 2020
a5bd416
Revert "trigger init router for first call"
zouyx Dec 11, 2020
5ffcdfe
fix review issue
cvictory Dec 14, 2020
a136af5
Merge branch 'develop' of github.com:apache/dubbo-go into notify_all
cvictory Dec 14, 2020
0cb6305
Merge branch 'notify_all' of github.com:cvictory/dubbo-go into notify…
cvictory Dec 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions cluster/router/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,6 @@ func (c *RouterChain) copyInvokerIfNecessary(cache *InvokerCache) []protocol.Inv
func (c *RouterChain) buildCache() {
origin := c.loadCache()
invokers := c.copyInvokerIfNecessary(origin)
if invokers == nil || len(invokers) == 0 {
return
}
Comment on lines -198 to -200
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why delete this block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete empty protection. When the address is empty, the route should be refresh.


var (
mutex sync.Mutex
Expand Down
13 changes: 13 additions & 0 deletions metadata/report/nacos/report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package nacos

import (
"encoding/json"
"net/http"
"strconv"
"testing"
"time"
)

import (
Expand All @@ -36,6 +38,9 @@ import (
)

func TestNacosMetadataReport_CRUD(t *testing.T) {
if !checkNacosServerAlive() {
return
}
rpt := newTestReport()
assert.NotNil(t, rpt)

Expand Down Expand Up @@ -114,3 +119,11 @@ func newTestReport() report.MetadataReport {
res := extension.GetMetadataReportFactory("nacos").CreateMetadataReport(&regurl)
return res
}

func checkNacosServerAlive() bool {
c := http.Client{Timeout: time.Second}
if _, err := c.Get("http://console.nacos.io/nacos/"); err != nil {
return false
}
return true
}
94 changes: 64 additions & 30 deletions registry/directory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type RegistryDirectory struct {
referenceConfigurationListener *referenceConfigurationListener
serviceKey string
forbidden atomic.Bool
registerLock sync.Mutex // this lock if for register
}

// NewRegistryDirectory will create a new RegistryDirectory
Expand Down Expand Up @@ -104,44 +105,77 @@ func (dir *RegistryDirectory) subscribe(url *common.URL) {
}

// Notify monitor changes from registry,and update the cacheServices
func (dir *RegistryDirectory) Notify(events ...*registry.ServiceEvent) {
go dir.refreshInvokers(events...)
func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) {
if event == nil {
return
}
go dir.refreshInvokers(event)
}

// NotifyAll notify the events that are complete Service Event List.
func (dir *RegistryDirectory) NotifyAll(events []*registry.ServiceEvent) {
go dir.refreshAllInvokers(events)
}

// refreshInvokers refreshes service's events.
func (dir *RegistryDirectory) refreshInvokers(event *registry.ServiceEvent) {
logger.Debugf("refresh invokers with %+v", event)
var oldInvoker protocol.Invoker
if event != nil {
oldInvoker, _ = dir.cacheInvokerByEvent(event)
}
dir.setNewInvokers()
if oldInvoker != nil {
oldInvoker.Destroy()
}
}

// refreshInvokers refreshes service's events. It supports two modes: incremental mode and batch mode. If a single
// service event is passed in, then it is incremental mode, and if an array of service events are passed in, it is
// batch mode, in this mode, we assume the registry center have the complete list of the service events, therefore
// in this case, we can safely assume any cached invoker not in the incoming list can be removed. It is necessary
// since in batch mode, the register center handles the different type of events by itself, then notify the directory
// a batch of 'Update' events, instead of omit the different type of event one by one.
func (dir *RegistryDirectory) refreshInvokers(events ...*registry.ServiceEvent) {
var oldInvokers []protocol.Invoker

// in batch mode, it is safe to remove since we have the complete list of events.
if len(events) > 1 {
// refreshAllInvokers the argument is the complete list of the service events, we can safely assume any cached invoker
// not in the incoming list can be removed. The Action of serviceEvent should be EventTypeUpdate.
func (dir *RegistryDirectory) refreshAllInvokers(events []*registry.ServiceEvent) {
var (
oldInvokers []protocol.Invoker
addEvents []*registry.ServiceEvent
)
// loop the events to check the Action should be EventTypeUpdate.
for _, event := range events {
if event.Action != remoting.EventTypeUpdate {
panic("Your implements of register center is wrong, " +
"please check the Action of ServiceEvent should be EventTypeUpdate")
return
}
}
func() {
// this lock is work at batch update of InvokeCache
dir.registerLock.Lock()
defer dir.registerLock.Unlock()
// get need clear invokers from original invoker list
dir.cacheInvokersMap.Range(func(k, v interface{}) bool {
if !dir.eventMatched(k.(string), events) {
// delete unused invoker from cache
if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil {
oldInvokers = append(oldInvokers, invoker)
}
}
return true
})
}

for _, event := range events {
logger.Debugf("registry update, result{%s}", event)
if oldInvoker, _ := dir.cacheInvokerByEvent(event); oldInvoker != nil {
oldInvokers = append(oldInvokers, oldInvoker)
// get need add invokers from events
for _, event := range events {
// Is the key (url.Key()) of cacheInvokersMap the best way?
if _, ok := dir.cacheInvokersMap.Load(event.Service.Key()); !ok {
addEvents = append(addEvents, event)
}
}
}

if len(events) > 0 {
dir.setNewInvokers()
}

// After dir.cacheInvokers is updated,destroy the oldInvoker
// Ensure that no request will enter the oldInvoker
// loop the updateEvents
for _, event := range addEvents {
logger.Debugf("registry update, result{%s}", event)
if oldInvoker, _ := dir.cacheInvokerByEvent(event); oldInvoker != nil {
oldInvokers = append(oldInvokers, oldInvoker)
}
}
}()
dir.setNewInvokers()
// destroy unused invokers
for _, invoker := range oldInvokers {
invoker.Destroy()
}
Expand Down Expand Up @@ -296,7 +330,7 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
newUrl := common.MergeUrl(url, referenceUrl)
dir.overrideUrl(newUrl)
if cacheInvoker, ok := dir.cacheInvokersMap.Load(newUrl.Key()); !ok {
logger.Debugf("service will be added in cache invokers: invokers url is %s!", newUrl)
logger.Debugf("service will be added in cache invokers: invokers url is %+v!", newUrl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
Expand Down Expand Up @@ -407,7 +441,7 @@ func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL)
func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers()
l.directory.refreshInvokers(nil)
}

type consumerConfigurationListener struct {
Expand All @@ -434,5 +468,5 @@ func (l *consumerConfigurationListener) addNotifyListener(listener registry.Noti
func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers()
l.directory.refreshInvokers(nil)
}
28 changes: 28 additions & 0 deletions registry/directory/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,34 @@ func Test_toGroupInvokers(t *testing.T) {
assert.Len(t, groupInvokers, 2)
}

func Test_RefreshUrl(t *testing.T) {
registryDirectory, mockRegistry := normalRegistryDir()
providerUrl, _ := common.NewURL("dubbo://0.0.0.0:20011/org.apache.dubbo-go.mockService",
common.WithParamsValue(constant.CLUSTER_KEY, "mock1"),
common.WithParamsValue(constant.GROUP_KEY, "group"),
common.WithParamsValue(constant.VERSION_KEY, "1.0.0"))
providerUrl2, _ := common.NewURL("dubbo://0.0.0.0:20012/org.apache.dubbo-go.mockService",
common.WithParamsValue(constant.CLUSTER_KEY, "mock1"),
common.WithParamsValue(constant.GROUP_KEY, "group"),
common.WithParamsValue(constant.VERSION_KEY, "1.0.0"))
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 3)
mockRegistry.MockEvent(&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: providerUrl})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 4)
mockRegistry.MockEvents([]*registry.ServiceEvent{&registry.ServiceEvent{Action: remoting.EventTypeUpdate, Service: providerUrl}})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 1)
mockRegistry.MockEvents([]*registry.ServiceEvent{&registry.ServiceEvent{Action: remoting.EventTypeUpdate, Service: providerUrl},
&registry.ServiceEvent{Action: remoting.EventTypeUpdate, Service: providerUrl2}})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 2)
// clear all address
mockRegistry.MockEvents([]*registry.ServiceEvent{})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 0)
}

func normalRegistryDir(noMockEvent ...bool) (*RegistryDirectory, *registry.MockRegistry) {
extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter)

Expand Down
68 changes: 51 additions & 17 deletions registry/mock_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ import (

// MockRegistry is used as mock registry
type MockRegistry struct {
listener *listener
destroyed *atomic.Bool
listener *listener
destroyed *atomic.Bool
allAddress chan []*ServiceEvent
}

// NewMockRegistry creates a mock registry
func NewMockRegistry(url *common.URL) (Registry, error) {
registry := &MockRegistry{
destroyed: atomic.NewBool(false),
destroyed: atomic.NewBool(false),
allAddress: make(chan []*ServiceEvent),
}
listener := &listener{count: 0, registry: registry, listenChan: make(chan *ServiceEvent)}
registry.listener = listener
Expand Down Expand Up @@ -80,22 +82,12 @@ func (r *MockRegistry) subscribe(*common.URL) (Listener, error) {
func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) error {
go func() {
for {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
time.Sleep(time.Duration(3) * time.Second)
return
}

listener, err := r.subscribe(url)
if err != nil {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
return
}
time.Sleep(time.Duration(3) * time.Second)
t, listener := r.checkLoopSubscribe(url)
if t == 0 {
continue
} else if t == -1 {
return
}

for {
serviceEvent, err := listener.Next()
if err != nil {
Expand All @@ -109,6 +101,24 @@ func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener)
}
}
}()
go func() {
for {
t, _ := r.checkLoopSubscribe(url)
if t == 0 {
continue
} else if t == -1 {
return
}

for {
select {
case e := <-r.allAddress:
notifyListener.NotifyAll(e)
break
}
}
}
}()
return nil
}

Expand Down Expand Up @@ -138,3 +148,27 @@ func (*listener) Close() {
func (r *MockRegistry) MockEvent(event *ServiceEvent) {
r.listener.listenChan <- event
}

// nolint
func (r *MockRegistry) MockEvents(events []*ServiceEvent) {
r.allAddress <- events
}

func (r *MockRegistry) checkLoopSubscribe(url *common.URL) (int, Listener) {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
time.Sleep(time.Duration(3) * time.Second)
return -1, nil
}

listener, err := r.subscribe(url)
if err != nil {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
return -1, nil
}
time.Sleep(time.Duration(3) * time.Second)
return 0, nil
}
return 1, listener
}
19 changes: 19 additions & 0 deletions registry/nacos/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package nacos

import (
"encoding/json"
"net/http"
"net/url"
"strconv"
"testing"
"time"
)

import (
Expand All @@ -35,6 +37,9 @@ import (
)

func TestNacosRegistry_Register(t *testing.T) {
if !checkNacosServerAlive() {
return
}
regurl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
urlMap := url.Values{}
urlMap.Set(constant.GROUP_KEY, "guangzhou-idc")
Expand Down Expand Up @@ -64,6 +69,9 @@ func TestNacosRegistry_Register(t *testing.T) {
}

func TestNacosRegistry_Subscribe(t *testing.T) {
if !checkNacosServerAlive() {
return
}
regurl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
urlMap := url.Values{}
urlMap.Set(constant.GROUP_KEY, "guangzhou-idc")
Expand Down Expand Up @@ -102,6 +110,9 @@ func TestNacosRegistry_Subscribe(t *testing.T) {
}

func TestNacosRegistry_Subscribe_del(t *testing.T) {
if !checkNacosServerAlive() {
return
}
regurl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
urlMap := url.Values{}
urlMap.Set(constant.GROUP_KEY, "guangzhou-idc")
Expand Down Expand Up @@ -188,3 +199,11 @@ func TestNacosListener_Close(t *testing.T) {
_, err = listener.Next()
assert.NotNil(t, err)
}

func checkNacosServerAlive() bool {
c := http.Client{Timeout: time.Second}
if _, err := c.Get("http://console.nacos.io/nacos/"); err != nil {
return false
}
return true
}
3 changes: 2 additions & 1 deletion registry/nacos/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ func (n *nacosServiceDiscovery) GetInstances(serviceName string) []registry.Serv
GroupName: n.group,
})
if err != nil {
logger.Errorf("Could not query the instances for service: " + serviceName + ", group: " + n.group)
logger.Errorf("Could not query the instances for service: %+v, group: %+v . It happened err %+v",
serviceName, n.group, err)
return make([]registry.ServiceInstance, 0, 0)
}
res := make([]registry.ServiceInstance, 0, len(instances))
Expand Down
3 changes: 3 additions & 0 deletions registry/nacos/service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ func TestNacosServiceDiscovery_Destroy(t *testing.T) {
}

func TestNacosServiceDiscovery_CRUD(t *testing.T) {
if !checkNacosServerAlive() {
return
}
prepareData()
extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
return &dispatcher.MockEventDispatcher{}
Expand Down
Loading