Skip to content

Commit

Permalink
nacos support UnRegister and UnSubscribe (#1616)
Browse files Browse the repository at this point in the history
  • Loading branch information
binbin0325 committed Dec 1, 2021
1 parent a7bbf32 commit 1bd06c1
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 16 deletions.
20 changes: 13 additions & 7 deletions registry/nacos/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ import (
"dubbo.apache.org/dubbo-go/v3/remoting"
)

var (
listenerCache sync.Map
)

type callback func(services []model.SubscribeService, err error)

type nacosListener struct {
namingClient *nacosClient.NacosNamingClient
listenURL *common.URL
Expand Down Expand Up @@ -191,15 +197,15 @@ func (nl *nacosListener) startListen() error {
if nl.namingClient == nil {
return perrors.New("nacos naming namingClient stopped")
}
serviceName := getSubscribeName(nl.listenURL)
groupName := nl.regURL.GetParam(constant.RegistryGroupKey, defaultGroup)
nl.subscribeParam = &vo.SubscribeParam{
ServiceName: serviceName,
SubscribeCallback: nl.Callback,
GroupName: groupName,
nl.subscribeParam = createSubscribeParam(nl.listenURL, nl.regURL, nl.Callback)
if nl.subscribeParam == nil {
return perrors.New("create nacos subscribeParam failed")
}
go func() {
_ = nl.namingClient.Client().Subscribe(nl.subscribeParam)
err := nl.namingClient.Client().Subscribe(nl.subscribeParam)
if err == nil {
listenerCache.Store(nl.subscribeParam.ServiceName+nl.subscribeParam.GroupName, nl)
}
}()
return nil
}
Expand Down
43 changes: 34 additions & 9 deletions registry/nacos/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func createDeregisterParam(url *common.URL, serviceName string, groupName string
}
}

func (nr *nacosRegistry) DeRegister(url *common.URL) error {
// UnRegister
func (nr *nacosRegistry) UnRegister(url *common.URL) error {
serviceName := getServiceName(url)
groupName := nr.URL.GetParam(constant.NacosGroupKey, defaultGroup)
param := createDeregisterParam(url, serviceName, groupName)
Expand All @@ -165,11 +166,6 @@ func (nr *nacosRegistry) DeRegister(url *common.URL) error {
return nil
}

// UnRegister
func (nr *nacosRegistry) UnRegister(conf *common.URL) error {
return perrors.New("UnRegister is not support in nacosRegistry")
}

func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error) {
return NewNacosListener(conf, nr.URL, nr.namingClient)
}
Expand Down Expand Up @@ -213,8 +209,37 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti
}

// UnSubscribe :
func (nr *nacosRegistry) UnSubscribe(url *common.URL, notifyListener registry.NotifyListener) error {
return perrors.New("UnSubscribe not support in nacosRegistry")
func (nr *nacosRegistry) UnSubscribe(url *common.URL, _ registry.NotifyListener) error {
param := createSubscribeParam(url, nr.URL, nil)
if param == nil {
return nil
}
err := nr.namingClient.Client().Unsubscribe(param)
if err != nil {
return perrors.New("UnSubscribe [" + param.ServiceName + "] to nacos failed")
}
return nil
}

func createSubscribeParam(url, regUrl *common.URL, cb callback) *vo.SubscribeParam {
serviceName := getSubscribeName(url)
groupName := regUrl.GetParam(constant.RegistryGroupKey, defaultGroup)
if cb == nil {
v, ok := listenerCache.Load(serviceName + groupName)
if !ok {
return nil
}
listener, ok := v.(*nacosListener)
if !ok {
return nil
}
cb = listener.Callback
}
return &vo.SubscribeParam{
ServiceName: serviceName,
SubscribeCallback: cb,
GroupName: groupName,
}
}

// GetURL gets its registration URL
Expand All @@ -231,7 +256,7 @@ func (nr *nacosRegistry) IsAvailable() bool {
// nolint
func (nr *nacosRegistry) Destroy() {
for _, url := range nr.registryUrls {
err := nr.DeRegister(url)
err := nr.UnRegister(url)
logger.Infof("DeRegister Nacos URL:%+v", url)
if err != nil {
logger.Errorf("Deregister URL:%+v err:%v", url, err.Error())
Expand Down

0 comments on commit 1bd06c1

Please sign in to comment.