From 1bd06c1e99956482f38f67595fdd24550342fb4b Mon Sep 17 00:00:00 2001 From: "binbin.zhang" Date: Wed, 1 Dec 2021 14:49:53 +0800 Subject: [PATCH] nacos support UnRegister and UnSubscribe (#1616) --- registry/nacos/listener.go | 20 +++++++++++------- registry/nacos/registry.go | 43 ++++++++++++++++++++++++++++++-------- 2 files changed, 47 insertions(+), 16 deletions(-) diff --git a/registry/nacos/listener.go b/registry/nacos/listener.go index 67b9d1f162..e0cbe2eaa7 100644 --- a/registry/nacos/listener.go +++ b/registry/nacos/listener.go @@ -44,6 +44,12 @@ import ( "dubbo.apache.org/dubbo-go/v3/remoting" ) +var ( + listenerCache sync.Map +) + +type callback func(services []model.SubscribeService, err error) + type nacosListener struct { namingClient *nacosClient.NacosNamingClient listenURL *common.URL @@ -191,15 +197,15 @@ func (nl *nacosListener) startListen() error { if nl.namingClient == nil { return perrors.New("nacos naming namingClient stopped") } - serviceName := getSubscribeName(nl.listenURL) - groupName := nl.regURL.GetParam(constant.RegistryGroupKey, defaultGroup) - nl.subscribeParam = &vo.SubscribeParam{ - ServiceName: serviceName, - SubscribeCallback: nl.Callback, - GroupName: groupName, + nl.subscribeParam = createSubscribeParam(nl.listenURL, nl.regURL, nl.Callback) + if nl.subscribeParam == nil { + return perrors.New("create nacos subscribeParam failed") } go func() { - _ = nl.namingClient.Client().Subscribe(nl.subscribeParam) + err := nl.namingClient.Client().Subscribe(nl.subscribeParam) + if err == nil { + listenerCache.Store(nl.subscribeParam.ServiceName+nl.subscribeParam.GroupName, nl) + } }() return nil } diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index 879e8fa43b..07520fcbe4 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -151,7 +151,8 @@ func createDeregisterParam(url *common.URL, serviceName string, groupName string } } -func (nr *nacosRegistry) DeRegister(url *common.URL) error { +// UnRegister +func (nr *nacosRegistry) UnRegister(url *common.URL) error { serviceName := getServiceName(url) groupName := nr.URL.GetParam(constant.NacosGroupKey, defaultGroup) param := createDeregisterParam(url, serviceName, groupName) @@ -165,11 +166,6 @@ func (nr *nacosRegistry) DeRegister(url *common.URL) error { return nil } -// UnRegister -func (nr *nacosRegistry) UnRegister(conf *common.URL) error { - return perrors.New("UnRegister is not support in nacosRegistry") -} - func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error) { return NewNacosListener(conf, nr.URL, nr.namingClient) } @@ -213,8 +209,37 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti } // UnSubscribe : -func (nr *nacosRegistry) UnSubscribe(url *common.URL, notifyListener registry.NotifyListener) error { - return perrors.New("UnSubscribe not support in nacosRegistry") +func (nr *nacosRegistry) UnSubscribe(url *common.URL, _ registry.NotifyListener) error { + param := createSubscribeParam(url, nr.URL, nil) + if param == nil { + return nil + } + err := nr.namingClient.Client().Unsubscribe(param) + if err != nil { + return perrors.New("UnSubscribe [" + param.ServiceName + "] to nacos failed") + } + return nil +} + +func createSubscribeParam(url, regUrl *common.URL, cb callback) *vo.SubscribeParam { + serviceName := getSubscribeName(url) + groupName := regUrl.GetParam(constant.RegistryGroupKey, defaultGroup) + if cb == nil { + v, ok := listenerCache.Load(serviceName + groupName) + if !ok { + return nil + } + listener, ok := v.(*nacosListener) + if !ok { + return nil + } + cb = listener.Callback + } + return &vo.SubscribeParam{ + ServiceName: serviceName, + SubscribeCallback: cb, + GroupName: groupName, + } } // GetURL gets its registration URL @@ -231,7 +256,7 @@ func (nr *nacosRegistry) IsAvailable() bool { // nolint func (nr *nacosRegistry) Destroy() { for _, url := range nr.registryUrls { - err := nr.DeRegister(url) + err := nr.UnRegister(url) logger.Infof("DeRegister Nacos URL:%+v", url) if err != nil { logger.Errorf("Deregister URL:%+v err:%v", url, err.Error())