Skip to content

Commit

Permalink
Merge pull request #758 from cvictory/notify_all
Browse files Browse the repository at this point in the history
fix #750.  registry center notify with complete address list
  • Loading branch information
zouyx authored Dec 18, 2020
2 parents e2954fe + 0cb6305 commit cfef02a
Show file tree
Hide file tree
Showing 11 changed files with 284 additions and 82 deletions.
3 changes: 0 additions & 3 deletions cluster/router/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,6 @@ func (c *RouterChain) copyInvokerIfNecessary(cache *InvokerCache) []protocol.Inv
func (c *RouterChain) buildCache() {
origin := c.loadCache()
invokers := c.copyInvokerIfNecessary(origin)
if invokers == nil || len(invokers) == 0 {
return
}

var (
mutex sync.Mutex
Expand Down
13 changes: 13 additions & 0 deletions metadata/report/nacos/report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package nacos

import (
"encoding/json"
"net/http"
"strconv"
"testing"
"time"
)

import (
Expand All @@ -36,6 +38,9 @@ import (
)

func TestNacosMetadataReport_CRUD(t *testing.T) {
if !checkNacosServerAlive() {
return
}
rpt := newTestReport()
assert.NotNil(t, rpt)

Expand Down Expand Up @@ -114,3 +119,11 @@ func newTestReport() report.MetadataReport {
res := extension.GetMetadataReportFactory("nacos").CreateMetadataReport(regurl)
return res
}

func checkNacosServerAlive() bool {
c := http.Client{Timeout: time.Second}
if _, err := c.Get("http://console.nacos.io/nacos/"); err != nil {
return false
}
return true
}
173 changes: 121 additions & 52 deletions registry/directory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ type RegistryDirectory struct {
referenceConfigurationListener *referenceConfigurationListener
serviceKey string
forbidden atomic.Bool
registerLock sync.Mutex // this lock if for register
}

// NewRegistryDirectory will create a new RegistryDirectory
func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {
if url.SubURL == nil {
return nil, perrors.Errorf("url is invalid, suburl can not be nil")
}
logger.Debugf("new RegistryDirectory for service :%s.", url.Key())
dir := &RegistryDirectory{
BaseDirectory: directory.NewBaseDirectory(url),
cacheInvokers: []protocol.Invoker{},
Expand All @@ -97,69 +99,128 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.

// subscribe from registry
func (dir *RegistryDirectory) subscribe(url *common.URL) {
logger.Debugf("subscribe service :%s for RegistryDirectory.", url.Key())
dir.consumerConfigurationListener.addNotifyListener(dir)
dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
dir.registry.Subscribe(url, dir)
}

// Notify monitor changes from registry,and update the cacheServices
func (dir *RegistryDirectory) Notify(events ...*registry.ServiceEvent) {
go dir.refreshInvokers(events...)
func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) {
if event == nil {
return
}
go dir.refreshInvokers(event)
}

// NotifyAll notify the events that are complete Service Event List.
// After notify the address, the callback func will be invoked.
func (dir *RegistryDirectory) NotifyAll(events []*registry.ServiceEvent, callback func()) {
go dir.refreshAllInvokers(events, callback)
}

// refreshInvokers refreshes service's events.
func (dir *RegistryDirectory) refreshInvokers(event *registry.ServiceEvent) {
if event != nil {
logger.Debugf("refresh invokers with %+v", event)
} else {
logger.Debug("refresh invokers with nil")
}

var oldInvoker protocol.Invoker
if event != nil {
oldInvoker, _ = dir.cacheInvokerByEvent(event)
}
dir.setNewInvokers()
if oldInvoker != nil {
oldInvoker.Destroy()
}
}

// refreshInvokers refreshes service's events. It supports two modes: incremental mode and batch mode. If a single
// service event is passed in, then it is incremental mode, and if an array of service events are passed in, it is
// batch mode, in this mode, we assume the registry center have the complete list of the service events, therefore
// in this case, we can safely assume any cached invoker not in the incoming list can be removed. It is necessary
// since in batch mode, the register center handles the different type of events by itself, then notify the directory
// a batch of 'Update' events, instead of omit the different type of event one by one.
func (dir *RegistryDirectory) refreshInvokers(events ...*registry.ServiceEvent) {
var oldInvokers []protocol.Invoker
// refreshAllInvokers the argument is the complete list of the service events, we can safely assume any cached invoker
// not in the incoming list can be removed. The Action of serviceEvent should be EventTypeUpdate.
func (dir *RegistryDirectory) refreshAllInvokers(events []*registry.ServiceEvent, callback func()) {
var (
oldInvokers []protocol.Invoker
addEvents []*registry.ServiceEvent
)
dir.overrideUrl(dir.GetDirectoryUrl())
referenceUrl := dir.GetDirectoryUrl().SubURL

// in batch mode, it is safe to remove since we have the complete list of events.
if len(events) > 1 {
// loop the events to check the Action should be EventTypeUpdate.
for _, event := range events {
if event.Action != remoting.EventTypeUpdate {
panic("Your implements of register center is wrong, " +
"please check the Action of ServiceEvent should be EventTypeUpdate")
return
}
// Originally it will Merge URL many times, now we just execute once.
// MergeUrl is executed once and put the result into Event. After this, the key will get from Event.Key().
newUrl := dir.convertUrl(event)
newUrl = common.MergeUrl(newUrl, referenceUrl)
dir.overrideUrl(newUrl)
event.Update(newUrl)
}
// After notify all addresses, do some callback.
defer callback()
func() {
// this lock is work at batch update of InvokeCache
dir.registerLock.Lock()
defer dir.registerLock.Unlock()
// get need clear invokers from original invoker list
dir.cacheInvokersMap.Range(func(k, v interface{}) bool {
if !dir.eventMatched(k.(string), events) {
// delete unused invoker from cache
if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil {
oldInvokers = append(oldInvokers, invoker)
}
}
return true
})
}

for _, event := range events {
logger.Debugf("registry update, result{%s}", event)
if oldInvoker, _ := dir.cacheInvokerByEvent(event); oldInvoker != nil {
oldInvokers = append(oldInvokers, oldInvoker)
// get need add invokers from events
for _, event := range events {
// Get the key from Event.Key()
if _, ok := dir.cacheInvokersMap.Load(event.Key()); !ok {
addEvents = append(addEvents, event)
}
}
}

if len(events) > 0 {
dir.setNewInvokers()
}

// After dir.cacheInvokers is updated,destroy the oldInvoker
// Ensure that no request will enter the oldInvoker
// loop the updateEvents
for _, event := range addEvents {
logger.Debugf("registry update, result{%s}", event)
logger.Infof("selector add service url{%s}", event.Service)
// FIXME: routers are built in every address notification?
dir.configRouters()
if oldInvoker, _ := dir.doCacheInvoker(event.Service); oldInvoker != nil {
oldInvokers = append(oldInvokers, oldInvoker)
}
}
}()
dir.setNewInvokers()
// destroy unused invokers
for _, invoker := range oldInvokers {
invoker.Destroy()
go invoker.Destroy()
}
}

// eventMatched checks if a cached invoker appears in the incoming invoker list, if no, then it is safe to remove.
func (dir *RegistryDirectory) eventMatched(key string, events []*registry.ServiceEvent) bool {
for _, event := range events {
if dir.invokerCacheKey(event.Service) == key {
if dir.invokerCacheKey(event) == key {
return true
}
}
return false
}

// invokerCacheKey generates the key in the cache for a given URL.
func (dir *RegistryDirectory) invokerCacheKey(url *common.URL) string {
// invokerCacheKey generates the key in the cache for a given ServiceEvent.
func (dir *RegistryDirectory) invokerCacheKey(event *registry.ServiceEvent) string {
// If the url is merged, then return Event.Key() directly.
if event.Updated() {
return event.Key()
}
referenceUrl := dir.GetDirectoryUrl().SubURL
newUrl := common.MergeUrl(url, referenceUrl)
newUrl := common.MergeUrl(event.Service, referenceUrl)
event.Update(newUrl)
return newUrl.Key()
}

Expand Down Expand Up @@ -294,30 +355,38 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
newUrl := common.MergeUrl(url, referenceUrl)
dir.overrideUrl(newUrl)
if cacheInvoker, ok := dir.cacheInvokersMap.Load(newUrl.Key()); !ok {
logger.Debugf("service will be added in cache invokers: invokers url is %s!", newUrl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
}
} else {
// if cached invoker has the same URL with the new URL, then no need to re-refer, and no need to destroy
// the old invoker.
if common.IsEquals(newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) {
return nil
}

logger.Debugf("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl())
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
return cacheInvoker.(protocol.Invoker)
}
if v, ok := dir.doCacheInvoker(newUrl); ok {
return v
}
}
return nil
}

func (dir *RegistryDirectory) doCacheInvoker(newUrl *common.URL) (protocol.Invoker, bool) {
key := newUrl.Key()
if cacheInvoker, ok := dir.cacheInvokersMap.Load(key); !ok {
logger.Debugf("service will be added in cache invokers: invokers url is %s!", newUrl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(key, newInvoker)
}
} else {
// if cached invoker has the same URL with the new URL, then no need to re-refer, and no need to destroy
// the old invoker.
if common.IsEquals(newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) {
return nil, true
}

logger.Debugf("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl())
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(key, newInvoker)
return cacheInvoker.(protocol.Invoker), true
}
}
return nil, false
}

// List selected protocol invokers from the directory
func (dir *RegistryDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
invokers := dir.cacheInvokers
Expand Down Expand Up @@ -406,7 +475,7 @@ func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL)
func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers()
l.directory.refreshInvokers(nil)
}

type consumerConfigurationListener struct {
Expand All @@ -433,5 +502,5 @@ func (l *consumerConfigurationListener) addNotifyListener(listener registry.Noti
func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers()
l.directory.refreshInvokers(nil)
}
28 changes: 28 additions & 0 deletions registry/directory/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,34 @@ func Test_toGroupInvokers(t *testing.T) {
assert.Len(t, groupInvokers, 2)
}

func Test_RefreshUrl(t *testing.T) {
registryDirectory, mockRegistry := normalRegistryDir()
providerUrl, _ := common.NewURL("dubbo://0.0.0.0:20011/org.apache.dubbo-go.mockService",
common.WithParamsValue(constant.CLUSTER_KEY, "mock1"),
common.WithParamsValue(constant.GROUP_KEY, "group"),
common.WithParamsValue(constant.VERSION_KEY, "1.0.0"))
providerUrl2, _ := common.NewURL("dubbo://0.0.0.0:20012/org.apache.dubbo-go.mockService",
common.WithParamsValue(constant.CLUSTER_KEY, "mock1"),
common.WithParamsValue(constant.GROUP_KEY, "group"),
common.WithParamsValue(constant.VERSION_KEY, "1.0.0"))
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 3)
mockRegistry.MockEvent(&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: providerUrl})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 4)
mockRegistry.MockEvents([]*registry.ServiceEvent{&registry.ServiceEvent{Action: remoting.EventTypeUpdate, Service: providerUrl}})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 1)
mockRegistry.MockEvents([]*registry.ServiceEvent{&registry.ServiceEvent{Action: remoting.EventTypeUpdate, Service: providerUrl},
&registry.ServiceEvent{Action: remoting.EventTypeUpdate, Service: providerUrl2}})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 2)
// clear all address
mockRegistry.MockEvents([]*registry.ServiceEvent{})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 0)
}

func normalRegistryDir(noMockEvent ...bool) (*RegistryDirectory, *registry.MockRegistry) {
extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter)

Expand Down
29 changes: 27 additions & 2 deletions registry/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,36 @@ func init() {
type ServiceEvent struct {
Action remoting.EventType
Service *common.URL
// store the key for Service.Key()
key string
// If the url is updated, such as Merged.
updated bool
}

// String return the description of event
func (e ServiceEvent) String() string {
return fmt.Sprintf("ServiceEvent{Action{%s}, Path{%s}}", e.Action, e.Service)
func (e *ServiceEvent) String() string {
return fmt.Sprintf("ServiceEvent{Action{%s}, Path{%s}, Key{%s}}", e.Action, e.Service, e.key)
}

// Update() update the url with the merged URL. Work with Updated() can reduce the process of some merging URL.
func (e *ServiceEvent) Update(url *common.URL) {
e.Service = url
e.updated = true
}

// Updated() check if the url is updated.
// If the serviceEvent is updated, then it don't need merge url again.
func (e *ServiceEvent) Updated() bool {
return e.updated
}

// Key() generate the key for service.Key(). It is cached once.
func (e *ServiceEvent) Key() string {
if len(e.key) > 0 {
return e.key
}
e.key = e.Service.Key()
return e.key
}

// ServiceInstancesChangedEvent represents service instances make some changing
Expand Down
Loading

0 comments on commit cfef02a

Please sign in to comment.