Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #750. registry center notify with complete address list #758

Merged
merged 24 commits into from
Dec 18, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
3053a22
fix #750. 1.fix empty ServiceEvent, we should clear all invokers 2.w…
cvictory Sep 16, 2020
fa94763
add test for clear all address
cvictory Sep 16, 2020
c3aac14
fix nacos test
cvictory Sep 16, 2020
2a6c0a1
fix nacos test
cvictory Sep 16, 2020
94496c5
fix review issue: add constraint of Action value of ServiceEvent in N…
cvictory Sep 23, 2020
eb1b730
Merge branch 'develop' of github.com:apache/dubbo-go into notify_all
cvictory Sep 25, 2020
6053327
fix #750. 1.fix unit test
cvictory Sep 25, 2020
b90bb44
Merge branch 'develop' of github.com:apache/dubbo-go into notify_all
cvictory Sep 27, 2020
3389b38
fix: if the registry center notify address list, it cannot remove inv…
cvictory Sep 27, 2020
634e8d2
enhance the refresh func
cvictory Oct 13, 2020
45c926b
remove doc
cvictory Oct 13, 2020
5f31e3a
enhance the refresh func
cvictory Oct 13, 2020
0c94049
Merge branch 'develop' of github.com:apache/dubbo-go into notify_all
cvictory Oct 14, 2020
d5ffb62
add some debug log
cvictory Oct 18, 2020
2757a1a
use goroutine to destroy invoker
cvictory Nov 13, 2020
51ee140
Merge branch 'develop' of github.com:apache/dubbo-go into notify_all
cvictory Nov 13, 2020
60f53ab
fix merge issue and optimize some invalid usage
cvictory Dec 9, 2020
f4d5df2
fix log error
cvictory Dec 9, 2020
068505d
refactor some code
cvictory Dec 9, 2020
af608d8
trigger init router for first call
zouyx Dec 10, 2020
a5bd416
Revert "trigger init router for first call"
zouyx Dec 11, 2020
5ffcdfe
fix review issue
cvictory Dec 14, 2020
a136af5
Merge branch 'develop' of github.com:apache/dubbo-go into notify_all
cvictory Dec 14, 2020
0cb6305
Merge branch 'notify_all' of github.com:cvictory/dubbo-go into notify…
cvictory Dec 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions metadata/report/nacos/report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package nacos

import (
"encoding/json"
"net/http"
"strconv"
"testing"
"time"
)

import (
Expand All @@ -36,6 +38,9 @@ import (
)

func TestNacosMetadataReport_CRUD(t *testing.T) {
if !checkNacosServerAlive() {
return
}
rpt := newTestReport()
assert.NotNil(t, rpt)

Expand Down Expand Up @@ -114,3 +119,11 @@ func newTestReport() report.MetadataReport {
res := extension.GetMetadataReportFactory("nacos").CreateMetadataReport(&regurl)
return res
}

func checkNacosServerAlive() bool {
c := http.Client{Timeout: time.Second}
if _, err := c.Get("http://console.nacos.io/nacos/"); err != nil {
return false
}
return true
}
83 changes: 52 additions & 31 deletions registry/directory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,44 +104,65 @@ func (dir *RegistryDirectory) subscribe(url *common.URL) {
}

// Notify monitor changes from registry,and update the cacheServices
func (dir *RegistryDirectory) Notify(events ...*registry.ServiceEvent) {
go dir.refreshInvokers(events...)
func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) {
if event == nil {
return
}
go dir.refreshInvokers(event)
}

// refreshInvokers refreshes service's events. It supports two modes: incremental mode and batch mode. If a single
// service event is passed in, then it is incremental mode, and if an array of service events are passed in, it is
// batch mode, in this mode, we assume the registry center have the complete list of the service events, therefore
// in this case, we can safely assume any cached invoker not in the incoming list can be removed. It is necessary
// since in batch mode, the register center handles the different type of events by itself, then notify the directory
// a batch of 'Update' events, instead of omit the different type of event one by one.
func (dir *RegistryDirectory) refreshInvokers(events ...*registry.ServiceEvent) {
var oldInvokers []protocol.Invoker

// in batch mode, it is safe to remove since we have the complete list of events.
if len(events) > 1 {
dir.cacheInvokersMap.Range(func(k, v interface{}) bool {
if !dir.eventMatched(k.(string), events) {
if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil {
oldInvokers = append(oldInvokers, invoker)
}
}
return true
})
func (dir *RegistryDirectory) NotifyAll(events []*registry.ServiceEvent) {
go dir.refreshAllInvokers(events)
}

// refreshInvokers refreshes service's events.
func (dir *RegistryDirectory) refreshInvokers(event *registry.ServiceEvent) {
logger.Debugf("refresh invokers with %+v", event)
var oldInvoker protocol.Invoker
if event != nil {
oldInvoker, _ = dir.cacheInvokerByEvent(event)
}
dir.setNewInvokers()
if oldInvoker != nil {
oldInvoker.Destroy()
}
}

// refreshAllInvokers the argument is the complete list of the service events, we can safely assume any cached invoker
// not in the incoming list can be removed. It will ignore Action of serviceEvent.
func (dir *RegistryDirectory) refreshAllInvokers(events []*registry.ServiceEvent) {
var (
oldInvokers []protocol.Invoker
addEvents []*registry.ServiceEvent
)

// get need clear invokers from original invoker list
dir.cacheInvokersMap.Range(func(k, v interface{}) bool {
if !dir.eventMatched(k.(string), events) {
// delete unused invoker from cache
if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil {
oldInvokers = append(oldInvokers, invoker)
}
}
return true
})
// get need add invokers from events
for _, event := range events {
// Is the key (url.Key()) of cacheInvokersMap the best way?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is necessary, we should assume (safely) that all events are UPDATE events, otherwise how would you process DELETE events? plus, per the comment on the interface:

// The argument of events []*ServiceEvent is equal to urls []*URL, because Action of ServiceEvent will be ignored.

if _, ok := dir.cacheInvokersMap.Load(event.Service.Key()); !ok {
event.Action = remoting.EventTypeAdd
addEvents = append(addEvents, event)
}
}
// loop the addEvents
for _, event := range addEvents {
logger.Debugf("registry update, result{%s}", event)
if oldInvoker, _ := dir.cacheInvokerByEvent(event); oldInvoker != nil {
oldInvokers = append(oldInvokers, oldInvoker)
}
}

if len(events) > 0 {
dir.setNewInvokers()
}

// After dir.cacheInvokers is updated,destroy the oldInvoker
// Ensure that no request will enter the oldInvoker
dir.setNewInvokers()
// destroy unused invokers
for _, invoker := range oldInvokers {
invoker.Destroy()
}
Expand Down Expand Up @@ -296,7 +317,7 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
newUrl := common.MergeUrl(url, referenceUrl)
dir.overrideUrl(newUrl)
if cacheInvoker, ok := dir.cacheInvokersMap.Load(newUrl.Key()); !ok {
logger.Debugf("service will be added in cache invokers: invokers url is %s!", newUrl)
logger.Debugf("service will be added in cache invokers: invokers url is %+v!", newUrl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
Expand Down Expand Up @@ -407,7 +428,7 @@ func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL)
func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers()
l.directory.refreshInvokers(nil)
}

type consumerConfigurationListener struct {
Expand All @@ -434,5 +455,5 @@ func (l *consumerConfigurationListener) addNotifyListener(listener registry.Noti
func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers()
l.directory.refreshInvokers(nil)
}
28 changes: 28 additions & 0 deletions registry/directory/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,34 @@ func Test_toGroupInvokers(t *testing.T) {
assert.Len(t, groupInvokers, 2)
}

func Test_RefreshUrl(t *testing.T) {
registryDirectory, mockRegistry := normalRegistryDir()
providerUrl, _ := common.NewURL("dubbo://0.0.0.0:20011/org.apache.dubbo-go.mockService",
common.WithParamsValue(constant.CLUSTER_KEY, "mock1"),
common.WithParamsValue(constant.GROUP_KEY, "group"),
common.WithParamsValue(constant.VERSION_KEY, "1.0.0"))
providerUrl2, _ := common.NewURL("dubbo://0.0.0.0:20012/org.apache.dubbo-go.mockService",
common.WithParamsValue(constant.CLUSTER_KEY, "mock1"),
common.WithParamsValue(constant.GROUP_KEY, "group"),
common.WithParamsValue(constant.VERSION_KEY, "1.0.0"))
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 3)
mockRegistry.MockEvent(&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: providerUrl})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 4)
mockRegistry.MockEvents([]*registry.ServiceEvent{&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: providerUrl}})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 1)
mockRegistry.MockEvents([]*registry.ServiceEvent{&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: providerUrl},
&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: providerUrl2}})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 2)
// clear all address
mockRegistry.MockEvents([]*registry.ServiceEvent{})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 0)
}

func normalRegistryDir(noMockEvent ...bool) (*RegistryDirectory, *registry.MockRegistry) {
extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter)

Expand Down
68 changes: 51 additions & 17 deletions registry/mock_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ import (

// MockRegistry is used as mock registry
type MockRegistry struct {
listener *listener
destroyed *atomic.Bool
listener *listener
destroyed *atomic.Bool
allAddress chan []*ServiceEvent
}

// NewMockRegistry creates a mock registry
func NewMockRegistry(url *common.URL) (Registry, error) {
registry := &MockRegistry{
destroyed: atomic.NewBool(false),
destroyed: atomic.NewBool(false),
allAddress: make(chan []*ServiceEvent),
}
listener := &listener{count: 0, registry: registry, listenChan: make(chan *ServiceEvent)}
registry.listener = listener
Expand Down Expand Up @@ -80,22 +82,12 @@ func (r *MockRegistry) subscribe(*common.URL) (Listener, error) {
func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) error {
go func() {
for {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
time.Sleep(time.Duration(3) * time.Second)
return
}

listener, err := r.subscribe(url)
if err != nil {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
return
}
time.Sleep(time.Duration(3) * time.Second)
t, listener := r.checkLoopSubscribe(url)
if t == 0 {
continue
} else if t == -1 {
return
}

for {
serviceEvent, err := listener.Next()
if err != nil {
Expand All @@ -109,6 +101,24 @@ func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener)
}
}
}()
go func() {
for {
t, _ := r.checkLoopSubscribe(url)
if t == 0 {
continue
} else if t == -1 {
return
}

for {
select {
case e := <-r.allAddress:
notifyListener.NotifyAll(e)
break
}
}
}
}()
return nil
}

Expand Down Expand Up @@ -138,3 +148,27 @@ func (*listener) Close() {
func (r *MockRegistry) MockEvent(event *ServiceEvent) {
r.listener.listenChan <- event
}

// nolint
func (r *MockRegistry) MockEvents(events []*ServiceEvent) {
r.allAddress <- events
}

func (r *MockRegistry) checkLoopSubscribe(url *common.URL) (int, Listener) {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
time.Sleep(time.Duration(3) * time.Second)
return -1, nil
}

listener, err := r.subscribe(url)
if err != nil {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
return -1, nil
}
time.Sleep(time.Duration(3) * time.Second)
return 0, nil
}
return 1, listener
}
19 changes: 19 additions & 0 deletions registry/nacos/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package nacos

import (
"encoding/json"
"net/http"
"net/url"
"strconv"
"testing"
"time"
)

import (
Expand All @@ -35,6 +37,9 @@ import (
)

func TestNacosRegistry_Register(t *testing.T) {
if !checkNacosServerAlive() {
return
}
regurl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
urlMap := url.Values{}
urlMap.Set(constant.GROUP_KEY, "guangzhou-idc")
Expand Down Expand Up @@ -64,6 +69,9 @@ func TestNacosRegistry_Register(t *testing.T) {
}

func TestNacosRegistry_Subscribe(t *testing.T) {
if !checkNacosServerAlive() {
return
}
regurl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
urlMap := url.Values{}
urlMap.Set(constant.GROUP_KEY, "guangzhou-idc")
Expand Down Expand Up @@ -102,6 +110,9 @@ func TestNacosRegistry_Subscribe(t *testing.T) {
}

func TestNacosRegistry_Subscribe_del(t *testing.T) {
if !checkNacosServerAlive() {
return
}
regurl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
urlMap := url.Values{}
urlMap.Set(constant.GROUP_KEY, "guangzhou-idc")
Expand Down Expand Up @@ -188,3 +199,11 @@ func TestNacosListener_Close(t *testing.T) {
_, err = listener.Next()
assert.NotNil(t, err)
}

func checkNacosServerAlive() bool {
c := http.Client{Timeout: time.Second}
if _, err := c.Get("http://console.nacos.io/nacos/"); err != nil {
return false
}
return true
}
3 changes: 2 additions & 1 deletion registry/nacos/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ func (n *nacosServiceDiscovery) GetInstances(serviceName string) []registry.Serv
GroupName: n.group,
})
if err != nil {
logger.Errorf("Could not query the instances for service: " + serviceName + ", group: " + n.group)
logger.Errorf("Could not query the instances for service: %+v, group: %+v . It happened err %+v",
serviceName, n.group, err)
return make([]registry.ServiceInstance, 0, 0)
}
res := make([]registry.ServiceInstance, 0, len(instances))
Expand Down
3 changes: 3 additions & 0 deletions registry/nacos/service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ func TestNacosServiceDiscovery_Destroy(t *testing.T) {
}

func TestNacosServiceDiscovery_CRUD(t *testing.T) {
if !checkNacosServerAlive() {
return
}
prepareData()
extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
return &dispatcher.MockEventDispatcher{}
Expand Down
15 changes: 10 additions & 5 deletions registry/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,18 +241,23 @@ func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Inv
}

// Notify will be triggered when a service change notification is received.
func (nl *overrideSubscribeListener) Notify(events ...*registry.ServiceEvent) {
if len(events) == 0 {
return
}
func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) {

event := events[0]
if isMatched(&(event.Service), nl.url) && event.Action == remoting.EventTypeAdd {
nl.configurator = extension.GetDefaultConfigurator(&(event.Service))
nl.doOverrideIfNecessary()
}
}

func (nl *overrideSubscribeListener) NotifyAll(events []*registry.ServiceEvent) {
if len(events) == 0 {
return
}

event := events[0]
nl.Notify(event)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why notify all just get first element?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


func (nl *overrideSubscribeListener) doOverrideIfNecessary() {
providerUrl := getProviderUrl(nl.originInvoker)
key := getCacheKey(providerUrl)
Expand Down
Loading