Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #750. registry center notify with complete address list #758

Merged
merged 24 commits into from
Dec 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
3053a22
fix #750. 1.fix empty ServiceEvent, we should clear all invokers 2.w…
cvictory Sep 16, 2020
fa94763
add test for clear all address
cvictory Sep 16, 2020
c3aac14
fix nacos test
cvictory Sep 16, 2020
2a6c0a1
fix nacos test
cvictory Sep 16, 2020
94496c5
fix review issue: add constraint of Action value of ServiceEvent in N…
cvictory Sep 23, 2020
eb1b730
Merge branch 'develop' of github.com:apache/dubbo-go into notify_all
cvictory Sep 25, 2020
6053327
fix #750. 1.fix unit test
cvictory Sep 25, 2020
b90bb44
Merge branch 'develop' of github.com:apache/dubbo-go into notify_all
cvictory Sep 27, 2020
3389b38
fix: if the registry center notify address list, it cannot remove inv…
cvictory Sep 27, 2020
634e8d2
enhance the refresh func
cvictory Oct 13, 2020
45c926b
remove doc
cvictory Oct 13, 2020
5f31e3a
enhance the refresh func
cvictory Oct 13, 2020
0c94049
Merge branch 'develop' of github.com:apache/dubbo-go into notify_all
cvictory Oct 14, 2020
d5ffb62
add some debug log
cvictory Oct 18, 2020
2757a1a
use goroutine to destroy invoker
cvictory Nov 13, 2020
51ee140
Merge branch 'develop' of github.com:apache/dubbo-go into notify_all
cvictory Nov 13, 2020
60f53ab
fix merge issue and optimize some invalid usage
cvictory Dec 9, 2020
f4d5df2
fix log error
cvictory Dec 9, 2020
068505d
refactor some code
cvictory Dec 9, 2020
af608d8
trigger init router for first call
zouyx Dec 10, 2020
a5bd416
Revert "trigger init router for first call"
zouyx Dec 11, 2020
5ffcdfe
fix review issue
cvictory Dec 14, 2020
a136af5
Merge branch 'develop' of github.com:apache/dubbo-go into notify_all
cvictory Dec 14, 2020
0cb6305
Merge branch 'notify_all' of github.com:cvictory/dubbo-go into notify…
cvictory Dec 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions cluster/router/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,6 @@ func (c *RouterChain) copyInvokerIfNecessary(cache *InvokerCache) []protocol.Inv
func (c *RouterChain) buildCache() {
origin := c.loadCache()
invokers := c.copyInvokerIfNecessary(origin)
if invokers == nil || len(invokers) == 0 {
return
}
Comment on lines -198 to -200
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why delete this block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete empty protection. When the address is empty, the route should be refresh.


var (
mutex sync.Mutex
Expand Down
13 changes: 13 additions & 0 deletions metadata/report/nacos/report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package nacos

import (
"encoding/json"
"net/http"
"strconv"
"testing"
"time"
)

import (
Expand All @@ -36,6 +38,9 @@ import (
)

func TestNacosMetadataReport_CRUD(t *testing.T) {
if !checkNacosServerAlive() {
return
}
rpt := newTestReport()
assert.NotNil(t, rpt)

Expand Down Expand Up @@ -114,3 +119,11 @@ func newTestReport() report.MetadataReport {
res := extension.GetMetadataReportFactory("nacos").CreateMetadataReport(regurl)
return res
}

func checkNacosServerAlive() bool {
c := http.Client{Timeout: time.Second}
if _, err := c.Get("http://console.nacos.io/nacos/"); err != nil {
return false
}
return true
}
173 changes: 121 additions & 52 deletions registry/directory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ type RegistryDirectory struct {
referenceConfigurationListener *referenceConfigurationListener
serviceKey string
forbidden atomic.Bool
registerLock sync.Mutex // this lock if for register
}

// NewRegistryDirectory will create a new RegistryDirectory
func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {
if url.SubURL == nil {
return nil, perrors.Errorf("url is invalid, suburl can not be nil")
}
logger.Debugf("new RegistryDirectory for service :%s.", url.Key())
dir := &RegistryDirectory{
BaseDirectory: directory.NewBaseDirectory(url),
cacheInvokers: []protocol.Invoker{},
Expand All @@ -97,69 +99,128 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.

// subscribe from registry
func (dir *RegistryDirectory) subscribe(url *common.URL) {
logger.Debugf("subscribe service :%s for RegistryDirectory.", url.Key())
dir.consumerConfigurationListener.addNotifyListener(dir)
dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
dir.registry.Subscribe(url, dir)
}

// Notify monitor changes from registry,and update the cacheServices
func (dir *RegistryDirectory) Notify(events ...*registry.ServiceEvent) {
go dir.refreshInvokers(events...)
func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) {
if event == nil {
return
}
go dir.refreshInvokers(event)
}

// NotifyAll notify the events that are complete Service Event List.
// After notify the address, the callback func will be invoked.
func (dir *RegistryDirectory) NotifyAll(events []*registry.ServiceEvent, callback func()) {
go dir.refreshAllInvokers(events, callback)
}

// refreshInvokers refreshes service's events.
func (dir *RegistryDirectory) refreshInvokers(event *registry.ServiceEvent) {
if event != nil {
logger.Debugf("refresh invokers with %+v", event)
} else {
logger.Debug("refresh invokers with nil")
}

var oldInvoker protocol.Invoker
if event != nil {
oldInvoker, _ = dir.cacheInvokerByEvent(event)
}
dir.setNewInvokers()
if oldInvoker != nil {
oldInvoker.Destroy()
}
}

// refreshInvokers refreshes service's events. It supports two modes: incremental mode and batch mode. If a single
// service event is passed in, then it is incremental mode, and if an array of service events are passed in, it is
// batch mode, in this mode, we assume the registry center have the complete list of the service events, therefore
// in this case, we can safely assume any cached invoker not in the incoming list can be removed. It is necessary
// since in batch mode, the register center handles the different type of events by itself, then notify the directory
// a batch of 'Update' events, instead of omit the different type of event one by one.
func (dir *RegistryDirectory) refreshInvokers(events ...*registry.ServiceEvent) {
var oldInvokers []protocol.Invoker
// refreshAllInvokers the argument is the complete list of the service events, we can safely assume any cached invoker
// not in the incoming list can be removed. The Action of serviceEvent should be EventTypeUpdate.
func (dir *RegistryDirectory) refreshAllInvokers(events []*registry.ServiceEvent, callback func()) {
var (
oldInvokers []protocol.Invoker
addEvents []*registry.ServiceEvent
)
dir.overrideUrl(dir.GetDirectoryUrl())
referenceUrl := dir.GetDirectoryUrl().SubURL

// in batch mode, it is safe to remove since we have the complete list of events.
if len(events) > 1 {
// loop the events to check the Action should be EventTypeUpdate.
for _, event := range events {
if event.Action != remoting.EventTypeUpdate {
panic("Your implements of register center is wrong, " +
"please check the Action of ServiceEvent should be EventTypeUpdate")
return
}
// Originally it will Merge URL many times, now we just execute once.
// MergeUrl is executed once and put the result into Event. After this, the key will get from Event.Key().
newUrl := dir.convertUrl(event)
newUrl = common.MergeUrl(newUrl, referenceUrl)
dir.overrideUrl(newUrl)
event.Update(newUrl)
}
// After notify all addresses, do some callback.
defer callback()
func() {
// this lock is work at batch update of InvokeCache
dir.registerLock.Lock()
defer dir.registerLock.Unlock()
// get need clear invokers from original invoker list
dir.cacheInvokersMap.Range(func(k, v interface{}) bool {
if !dir.eventMatched(k.(string), events) {
// delete unused invoker from cache
if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil {
oldInvokers = append(oldInvokers, invoker)
}
}
return true
})
}

for _, event := range events {
logger.Debugf("registry update, result{%s}", event)
if oldInvoker, _ := dir.cacheInvokerByEvent(event); oldInvoker != nil {
oldInvokers = append(oldInvokers, oldInvoker)
// get need add invokers from events
for _, event := range events {
// Get the key from Event.Key()
if _, ok := dir.cacheInvokersMap.Load(event.Key()); !ok {
addEvents = append(addEvents, event)
}
}
}

if len(events) > 0 {
dir.setNewInvokers()
}

// After dir.cacheInvokers is updated,destroy the oldInvoker
// Ensure that no request will enter the oldInvoker
// loop the updateEvents
for _, event := range addEvents {
logger.Debugf("registry update, result{%s}", event)
logger.Infof("selector add service url{%s}", event.Service)
// FIXME: routers are built in every address notification?
dir.configRouters()
if oldInvoker, _ := dir.doCacheInvoker(event.Service); oldInvoker != nil {
oldInvokers = append(oldInvokers, oldInvoker)
}
}
}()
dir.setNewInvokers()
// destroy unused invokers
for _, invoker := range oldInvokers {
invoker.Destroy()
go invoker.Destroy()
}
}

// eventMatched checks if a cached invoker appears in the incoming invoker list, if no, then it is safe to remove.
func (dir *RegistryDirectory) eventMatched(key string, events []*registry.ServiceEvent) bool {
for _, event := range events {
if dir.invokerCacheKey(event.Service) == key {
if dir.invokerCacheKey(event) == key {
return true
}
}
return false
}

// invokerCacheKey generates the key in the cache for a given URL.
func (dir *RegistryDirectory) invokerCacheKey(url *common.URL) string {
// invokerCacheKey generates the key in the cache for a given ServiceEvent.
func (dir *RegistryDirectory) invokerCacheKey(event *registry.ServiceEvent) string {
// If the url is merged, then return Event.Key() directly.
if event.Updated() {
return event.Key()
}
referenceUrl := dir.GetDirectoryUrl().SubURL
newUrl := common.MergeUrl(url, referenceUrl)
newUrl := common.MergeUrl(event.Service, referenceUrl)
event.Update(newUrl)
return newUrl.Key()
}

Expand Down Expand Up @@ -294,30 +355,38 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
newUrl := common.MergeUrl(url, referenceUrl)
dir.overrideUrl(newUrl)
if cacheInvoker, ok := dir.cacheInvokersMap.Load(newUrl.Key()); !ok {
logger.Debugf("service will be added in cache invokers: invokers url is %s!", newUrl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
}
} else {
// if cached invoker has the same URL with the new URL, then no need to re-refer, and no need to destroy
// the old invoker.
if common.IsEquals(newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) {
return nil
}

logger.Debugf("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl())
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
return cacheInvoker.(protocol.Invoker)
}
if v, ok := dir.doCacheInvoker(newUrl); ok {
return v
}
}
return nil
}

func (dir *RegistryDirectory) doCacheInvoker(newUrl *common.URL) (protocol.Invoker, bool) {
key := newUrl.Key()
if cacheInvoker, ok := dir.cacheInvokersMap.Load(key); !ok {
logger.Debugf("service will be added in cache invokers: invokers url is %s!", newUrl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(key, newInvoker)
}
} else {
// if cached invoker has the same URL with the new URL, then no need to re-refer, and no need to destroy
// the old invoker.
if common.IsEquals(newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) {
return nil, true
}

logger.Debugf("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl())
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(key, newInvoker)
return cacheInvoker.(protocol.Invoker), true
}
}
return nil, false
}

// List selected protocol invokers from the directory
func (dir *RegistryDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
invokers := dir.cacheInvokers
Expand Down Expand Up @@ -406,7 +475,7 @@ func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL)
func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers()
l.directory.refreshInvokers(nil)
}

type consumerConfigurationListener struct {
Expand All @@ -433,5 +502,5 @@ func (l *consumerConfigurationListener) addNotifyListener(listener registry.Noti
func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers()
l.directory.refreshInvokers(nil)
}
28 changes: 28 additions & 0 deletions registry/directory/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,34 @@ func Test_toGroupInvokers(t *testing.T) {
assert.Len(t, groupInvokers, 2)
}

func Test_RefreshUrl(t *testing.T) {
registryDirectory, mockRegistry := normalRegistryDir()
providerUrl, _ := common.NewURL("dubbo://0.0.0.0:20011/org.apache.dubbo-go.mockService",
common.WithParamsValue(constant.CLUSTER_KEY, "mock1"),
common.WithParamsValue(constant.GROUP_KEY, "group"),
common.WithParamsValue(constant.VERSION_KEY, "1.0.0"))
providerUrl2, _ := common.NewURL("dubbo://0.0.0.0:20012/org.apache.dubbo-go.mockService",
common.WithParamsValue(constant.CLUSTER_KEY, "mock1"),
common.WithParamsValue(constant.GROUP_KEY, "group"),
common.WithParamsValue(constant.VERSION_KEY, "1.0.0"))
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 3)
mockRegistry.MockEvent(&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: providerUrl})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 4)
mockRegistry.MockEvents([]*registry.ServiceEvent{&registry.ServiceEvent{Action: remoting.EventTypeUpdate, Service: providerUrl}})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 1)
mockRegistry.MockEvents([]*registry.ServiceEvent{&registry.ServiceEvent{Action: remoting.EventTypeUpdate, Service: providerUrl},
&registry.ServiceEvent{Action: remoting.EventTypeUpdate, Service: providerUrl2}})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 2)
// clear all address
mockRegistry.MockEvents([]*registry.ServiceEvent{})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 0)
}

func normalRegistryDir(noMockEvent ...bool) (*RegistryDirectory, *registry.MockRegistry) {
extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter)

Expand Down
29 changes: 27 additions & 2 deletions registry/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,36 @@ func init() {
type ServiceEvent struct {
Action remoting.EventType
Service *common.URL
// store the key for Service.Key()
key string
// If the url is updated, such as Merged.
updated bool
}

// String return the description of event
func (e ServiceEvent) String() string {
return fmt.Sprintf("ServiceEvent{Action{%s}, Path{%s}}", e.Action, e.Service)
func (e *ServiceEvent) String() string {
return fmt.Sprintf("ServiceEvent{Action{%s}, Path{%s}, Key{%s}}", e.Action, e.Service, e.key)
}

// Update() update the url with the merged URL. Work with Updated() can reduce the process of some merging URL.
func (e *ServiceEvent) Update(url *common.URL) {
e.Service = url
e.updated = true
}

// Updated() check if the url is updated.
// If the serviceEvent is updated, then it don't need merge url again.
func (e *ServiceEvent) Updated() bool {
return e.updated
}

// Key() generate the key for service.Key(). It is cached once.
func (e *ServiceEvent) Key() string {
if len(e.key) > 0 {
return e.key
}
e.key = e.Service.Key()
return e.key
}

// ServiceInstancesChangedEvent represents service instances make some changing
Expand Down
Loading