From 41901cc3dc7a8f98fc918a5a5eeabc78d19652af Mon Sep 17 00:00:00 2001 From: Mulavar <978007503@qq.com> Date: Fri, 7 Apr 2023 18:22:16 +0800 Subject: [PATCH] Feat/nacos registry (#659) * feat(nacos): add nacos service registry&discovery * feat(nacos): add nacos service client * fix * fix --- cmd/admin/admin.go | 2 +- conf/bootstrap.yaml | 4 +- example/service_discovery/etcd/main.go | 5 +- example/service_discovery/nacos/main.go | 41 +++++++++ pkg/admin/config.go | 2 +- pkg/admin/service_discovery.go | 11 +-- pkg/config/nacos/nacos.go | 39 +++------ pkg/config/nacos/nacos_test.go | 29 ------- pkg/registry/base/base.go | 6 +- pkg/registry/discovery.go | 42 +++++++-- pkg/registry/etcd/discovery.go | 2 +- pkg/registry/etcd/registery.go | 3 +- pkg/registry/nacos/client.go | 109 +++++++++++++++++++++++ pkg/registry/nacos/discovery.go | 73 ++++++++++++++++ pkg/registry/nacos/registry.go | 111 ++++++++++++++++++++++++ pkg/registry/registry.go | 19 ++-- pkg/util/config/nacos.go | 99 +++++++++++++++++++++ pkg/util/config/nacos_test.go | 55 ++++++++++++ 18 files changed, 565 insertions(+), 87 deletions(-) create mode 100644 example/service_discovery/nacos/main.go create mode 100644 pkg/registry/nacos/client.go create mode 100644 pkg/registry/nacos/discovery.go create mode 100644 pkg/registry/nacos/registry.go create mode 100644 pkg/util/config/nacos.go create mode 100644 pkg/util/config/nacos_test.go diff --git a/cmd/admin/admin.go b/cmd/admin/admin.go index 34d7d40c..6fa205f4 100644 --- a/cmd/admin/admin.go +++ b/cmd/admin/admin.go @@ -85,7 +85,7 @@ func Run(bootstrapPath string, addr string) error { } registryConf := discovery.GetServiceRegistry(context.Background()) - serviceDiscovery, err := registry.InitDiscovery(registryConf.Name, registryConf.RootPath, "service", registryConf.Options) + serviceDiscovery, err := registry.InitDiscovery(registryConf.Name, registryConf.Options) if err != nil { log.Fatal("init service discovert failed: %v", err) return err diff --git a/conf/bootstrap.yaml b/conf/bootstrap.yaml index 2e04a44d..f72b4837 100644 --- a/conf/bootstrap.yaml +++ b/conf/bootstrap.yaml @@ -34,9 +34,9 @@ registry: # name: nacos # options: # endpoints: "127.0.0.1:8848" -# namespace: arana +# namespace_id: arana # group: arana -# contextPath: /nacos +# context_path: /nacos # scheme: http # username: nacos # password: nacos diff --git a/example/service_discovery/etcd/main.go b/example/service_discovery/etcd/main.go index 4c6864e7..56004c1b 100644 --- a/example/service_discovery/etcd/main.go +++ b/example/service_discovery/etcd/main.go @@ -30,11 +30,12 @@ import ( func main() { storeType := base.ETCD - basePath := "arana" options := make(map[string]interface{}) options["endpoints"] = "http://127.0.0.1:2379" + options["root_path"] = "arana" + options["service_path"] = "service" - etcdDiscovery, err := registry.InitDiscovery(storeType, basePath, "service", options) + etcdDiscovery, err := registry.InitDiscovery(storeType, options) if err != nil { log.Fatalf("Init %s discovery err:%v", storeType, err) return diff --git a/example/service_discovery/nacos/main.go b/example/service_discovery/nacos/main.go new file mode 100644 index 00000000..d244ac48 --- /dev/null +++ b/example/service_discovery/nacos/main.go @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "github.com/arana-db/arana/pkg/registry" + "github.com/arana-db/arana/pkg/registry/base" + "github.com/arana-db/arana/pkg/util/log" +) + +func main() { + storeType := base.NACOS + options := make(map[string]interface{}) + options["endpoints"] = "127.0.0.1:8848" + options["scheme"] = "http" + options["username"] = "nacos" + options["password"] = "nacos" + + nacosDiscovery, err := registry.InitDiscovery(base.NACOS, options) + if err != nil { + log.Fatalf("Init %s discovery err:%v", storeType, err) + return + } + + nacosDiscovery.GetServices() +} diff --git a/pkg/admin/config.go b/pkg/admin/config.go index 8ef2c7f0..11a7b744 100644 --- a/pkg/admin/config.go +++ b/pkg/admin/config.go @@ -76,7 +76,7 @@ type ServiceInstanceDTO struct { // Version is the version of the compiled. Version string `json:"version"` // Endpoint addresses of the service instance. - Endpoints []*config.Listener + Endpoint *config.Listener } // XConfigWriter represents the mutations of configurations. diff --git a/pkg/admin/service_discovery.go b/pkg/admin/service_discovery.go index a2fdf9a2..bac35cfa 100644 --- a/pkg/admin/service_discovery.go +++ b/pkg/admin/service_discovery.go @@ -18,7 +18,6 @@ package admin import ( - "github.com/arana-db/arana/pkg/config" "github.com/arana-db/arana/pkg/registry/base" ) @@ -36,13 +35,11 @@ func (mysds *myServiceDiscovery) ListServices() []*ServiceInstanceDTO { srvDTOs = make([]*ServiceInstanceDTO, 0, len(services)) ) for _, srv := range services { - endpoints := make([]*config.Listener, len(srv.Endpoints)) - copy(endpoints, srv.Endpoints) srvDTOs = append(srvDTOs, &ServiceInstanceDTO{ - ID: srv.ID, - Name: srv.Name, - Version: srv.Version, - Endpoints: endpoints, + ID: srv.ID, + Name: srv.Name, + Version: srv.Version, + Endpoint: srv.Endpoint, }) } return srvDTOs diff --git a/pkg/config/nacos/nacos.go b/pkg/config/nacos/nacos.go index baaa89af..ecaf8f70 100644 --- a/pkg/config/nacos/nacos.go +++ b/pkg/config/nacos/nacos.go @@ -37,20 +37,7 @@ import ( import ( "github.com/arana-db/arana/pkg/config" "github.com/arana-db/arana/pkg/util/bytesconv" -) - -const ( - _defaultGroupName string = "arana" - - _namespaceKey string = "namespace" - _groupKey string = "group" - _username string = "username" - _password string = "password" - _server string = "endpoints" - _contextPath string = "contextPath" - _scheme string = "scheme" - - _pathSplit string = "::" + u_conf "github.com/arana-db/arana/pkg/util/config" ) var ( @@ -84,13 +71,13 @@ func (s *storeOperate) Init(options map[string]interface{}) error { } func (s *storeOperate) initNacosClient(options map[string]interface{}) error { - s.groupName = _defaultGroupName - if val, ok := options[_groupKey]; ok { + s.groupName = u_conf.DefaultGroupName + if val, ok := options[u_conf.GroupKey]; ok { s.groupName = val.(string) } - clientConfig := parseClientConfig(options) - serverConfigs := parseServerConfig(options) + clientConfig := u_conf.ParseNacosClientConfig(options) + serverConfigs := u_conf.ParseNacosServerConfig(options) // a more graceful way to create config client client, err := clients.NewConfigClient( @@ -110,15 +97,15 @@ func parseServerConfig(options map[string]interface{}) []constant.ServerConfig { cfgs := make([]constant.ServerConfig, 0) scheme := "http" - if val, ok := options[_scheme]; ok { + if val, ok := options[u_conf.Scheme]; ok { scheme = val.(string) } contextPath := "/nacos" - if val, ok := options[_contextPath]; ok { + if val, ok := options[u_conf.ContextPath]; ok { contextPath = val.(string) } - if servers, ok := options[_server]; ok { + if servers, ok := options[u_conf.Server]; ok { addresses := strings.Split(servers.(string), ",") for i := range addresses { addr := strings.Split(strings.TrimSpace(addresses[i]), ":") @@ -141,13 +128,13 @@ func parseServerConfig(options map[string]interface{}) []constant.ServerConfig { func parseClientConfig(options map[string]interface{}) constant.ClientConfig { cc := constant.ClientConfig{} - if val, ok := options[_namespaceKey]; ok { + if val, ok := options[u_conf.NamespaceIdKey]; ok { cc.NamespaceId = val.(string) } - if val, ok := options[_username]; ok { + if val, ok := options[u_conf.Username]; ok { cc.Username = val.(string) } - if val, ok := options[_password]; ok { + if val, ok := options[u_conf.Password]; ok { cc.Password = val.(string) } return cc @@ -272,9 +259,9 @@ func (w *nacosWatcher) run(ctx context.Context) { } func buildNacosDataId(v string) string { - return strings.ReplaceAll(v, "/", _pathSplit) + return strings.ReplaceAll(v, "/", u_conf.PathSplit) } func revertNacosDataId(v string) string { - return strings.ReplaceAll(v, _pathSplit, "/") + return strings.ReplaceAll(v, u_conf.PathSplit, "/") } diff --git a/pkg/config/nacos/nacos_test.go b/pkg/config/nacos/nacos_test.go index d21a8034..243197fd 100644 --- a/pkg/config/nacos/nacos_test.go +++ b/pkg/config/nacos/nacos_test.go @@ -239,32 +239,3 @@ func Test_storeOpertae(t *testing.T) { assert.NoError(t, err, "blank string should be success") } - -func Test_parseServerConfig(t *testing.T) { - // _namespaceKey string = "namespace" - // _groupKey string = "group" - // _username string = "username" - // _password string = "password" - // _server string = "endpoints" - // _contextPath string = "contextPath" - // _scheme string = "scheme" - - options := map[string]interface{}{ - _namespaceKey: "arana_test", - _groupKey: "arana_test", - _username: "nacos_test", - _password: "nacos_test", - _server: "127.0.0.1:8848,127.0.0.2:8848", - } - - clientConfig := parseClientConfig(options) - assert.Equal(t, options[_namespaceKey], clientConfig.NamespaceId) - assert.Equal(t, options[_username], clientConfig.Username) - assert.Equal(t, options[_password], clientConfig.Password) - - serverConfigs := parseServerConfig(options) - assert.Equal(t, 2, len(serverConfigs)) - - assert.Equal(t, "127.0.0.1", serverConfigs[0].IpAddr) - assert.Equal(t, "127.0.0.2", serverConfigs[1].IpAddr) -} diff --git a/pkg/registry/base/base.go b/pkg/registry/base/base.go index c1249390..ddafd67e 100644 --- a/pkg/registry/base/base.go +++ b/pkg/registry/base/base.go @@ -41,15 +41,15 @@ type ServiceInstance struct { // Version is the version of the compiled. Version string `json:"version"` // Endpoint addresses of the service instance. - Endpoints []*config.Listener + Endpoint *config.Listener } func (p ServiceInstance) String() string { - return fmt.Sprintf("Service instance: id:%s, name:%s, version:%s, endpoints:%s", p.ID, p.Name, p.Version, p.Endpoints) + return fmt.Sprintf("Service instance: id:%s, name:%s, version:%s, endpoints:%s", p.ID, p.Name, p.Version, p.Endpoint) } type Registry interface { - Register(ctx context.Context, name string, serviceInstance *ServiceInstance) error + Register(ctx context.Context, serviceInstance *ServiceInstance) error Unregister(ctx context.Context, name string) error UnregisterAllService(ctx context.Context) error } diff --git a/pkg/registry/discovery.go b/pkg/registry/discovery.go index f05bdfdc..845f87d0 100644 --- a/pkg/registry/discovery.go +++ b/pkg/registry/discovery.go @@ -19,6 +19,7 @@ package registry import ( "fmt" + "strings" ) import ( @@ -28,16 +29,27 @@ import ( import ( "github.com/arana-db/arana/pkg/registry/base" "github.com/arana-db/arana/pkg/registry/etcd" + "github.com/arana-db/arana/pkg/registry/nacos" "github.com/arana-db/arana/pkg/util/log" ) -func InitDiscovery(storeType string, basePath string, servicePath string, options map[string]interface{}) (base.Discovery, error) { +const ( + _rootPath = "root_path" + _servicePath = "service_path" + _endpoints = "endpoints" + + _defaultServicePath = "service" + _defaultRootPath = "arana" +) + +func InitDiscovery(storeType string, options map[string]interface{}) (base.Discovery, error) { var serviceDiscovery base.Discovery var err error switch storeType { case base.ETCD: - serviceDiscovery, err = initEtcdDiscovery(basePath, servicePath, []string{options["endpoints"].(string)}) + serviceDiscovery, err = initEtcdDiscovery(options) case base.NACOS: + initNacosV2Discovery(options) default: err = errors.Errorf("Service registry not support store:%s", storeType) } @@ -50,12 +62,30 @@ func InitDiscovery(storeType string, basePath string, servicePath string, option return serviceDiscovery, nil } -func initEtcdDiscovery(basePath string, servicePath string, storeAddrs []string) (base.Discovery, error) { +func initEtcdDiscovery(options map[string]interface{}) (base.Discovery, error) { + var ( + rootPath = _defaultRootPath + servicePath = _defaultServicePath + storeAddrs = make([]string, 0) + ) + + if r, ok := options[_rootPath]; ok { + rootPath = r.(string) + } + + if s, ok := options[_servicePath]; ok { + servicePath = s.(string) + } + + if e, ok := options[_endpoints]; ok { + storeAddrs = append(storeAddrs, strings.Split(e.(string), ",")...) + } + if len(storeAddrs) == 0 { return nil, fmt.Errorf("service discovery init etcd error because get endpoints nil :%v", storeAddrs) } - serviceDiscovery, err := etcd.NewEtcdV3Discovery(basePath, servicePath, storeAddrs, nil) + serviceDiscovery, err := etcd.NewEtcdV3Discovery(rootPath, servicePath, storeAddrs, nil) if err != nil { return nil, fmt.Errorf("service discovery init etcd error because err: :%v", err) } @@ -63,6 +93,6 @@ func initEtcdDiscovery(basePath string, servicePath string, storeAddrs []string) return serviceDiscovery, nil } -func initNacosDiscovery(basePath string, servicePath string, storeAddrs []string) (base.Discovery, error) { - return nil, nil +func initNacosV2Discovery(options map[string]interface{}) (base.Discovery, error) { + return nacos.NewNacosV2Discovery(options) } diff --git a/pkg/registry/etcd/discovery.go b/pkg/registry/etcd/discovery.go index 0179523e..ef360fd0 100644 --- a/pkg/registry/etcd/discovery.go +++ b/pkg/registry/etcd/discovery.go @@ -36,7 +36,7 @@ import ( "github.com/arana-db/arana/pkg/util/log" ) -// EtcdV3Discovery is a etcd service discovery. +// EtcdV3Discovery is an etcd service discovery. // It always returns the registered servers in etcd. type EtcdV3Discovery struct { BasePath string diff --git a/pkg/registry/etcd/registery.go b/pkg/registry/etcd/registery.go index e6861c8a..87e34d8e 100644 --- a/pkg/registry/etcd/registery.go +++ b/pkg/registry/etcd/registery.go @@ -109,7 +109,8 @@ func NewEtcdV3Registry(serviceAddr, path string, etcdAddrs []string, options *st return etcdRegistry, nil } -func (r *EtcdV3Registry) Register(ctx context.Context, name string, serviceInstance *base.ServiceInstance) error { +func (r *EtcdV3Registry) Register(ctx context.Context, serviceInstance *base.ServiceInstance) error { + name := serviceInstance.Name if strings.TrimSpace(name) == "" { return errors.New("Register service `name` can't be empty") } diff --git a/pkg/registry/nacos/client.go b/pkg/registry/nacos/client.go new file mode 100644 index 00000000..1de65d15 --- /dev/null +++ b/pkg/registry/nacos/client.go @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "strings" +) + +import ( + "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client" + "github.com/nacos-group/nacos-sdk-go/v2/common/logger" + "github.com/nacos-group/nacos-sdk-go/v2/model" + "github.com/nacos-group/nacos-sdk-go/v2/vo" +) + +import ( + u_conf "github.com/arana-db/arana/pkg/util/config" +) + +const ( + _defaultPageSize = uint32(20) +) + +type NacosServiceClient struct { + naming_client.INamingClient + + pageSize uint32 + NamespaceId string + Servers []string // ip+port, like 127.0.0.1:8848 +} + +func NewNacosServiceClient(options map[string]interface{}) (*NacosServiceClient, error) { + var nsc = &NacosServiceClient{} + + if val, ok := options[u_conf.NamespaceIdKey]; ok { + nsc.NamespaceId = val.(string) + } + + if val, ok := options[u_conf.Server]; ok { + nsc.Servers = strings.Split(val.(string), u_conf.ServerSplit) + } + + if val, ok := options[u_conf.PageSizeKey]; ok { + nsc.pageSize = val.(uint32) + } else { + nsc.pageSize = _defaultPageSize + } + + client, err := u_conf.NewNacosV2NamingClient(options) + if err != nil { + return nil, err + } + nsc.INamingClient = client + + return nsc, nil +} + +func (ngsc *NacosServiceClient) SelectAllServiceInstances() []model.Instance { + var ( + result = make([]model.Instance, 0) + count int + currentCount = 0 + pageNo = uint32(1) + pageSize = ngsc.pageSize + ) + + srvs, err := ngsc.GetAllServicesInfo(vo.GetAllServiceInfoParam{PageNo: pageNo, PageSize: 1}) + if err != nil { + logger.Warnf("Failed to service count from nacos", err) + return nil + } + + count = int(srvs.Count) + for currentCount < count { + srvs, err = ngsc.GetAllServicesInfo(vo.GetAllServiceInfoParam{PageNo: pageNo, PageSize: pageSize}) + if err != nil { + logger.Warnf("Failed to get all services info from nacos: %s with pageNo:%d and pageSize:%d", err, pageNo, pageSize) + return nil + } + for _, srv := range srvs.Doms { + instances, err := ngsc.SelectAllInstances(vo.SelectAllInstancesParam{ServiceName: srv}) + if err != nil { + logger.Warnf("Failed to get all instances of service %s info from nacos: %s", srv, err) + return nil + } + logger.Infof("Successfully get all instances of service %s\n: ", srv, instances) + result = append(result, instances...) + } + pageNo++ + currentCount += len(srvs.Doms) + } + + return result +} diff --git a/pkg/registry/nacos/discovery.go b/pkg/registry/nacos/discovery.go new file mode 100644 index 00000000..26aaca11 --- /dev/null +++ b/pkg/registry/nacos/discovery.go @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "github.com/arana-db/arana/pkg/config" + "github.com/arana-db/arana/pkg/registry/base" +) + +// NacosV2Discovery is a nacos service discovery. +// It always returns the registered servers in etcd. +type NacosV2Discovery struct { + client *NacosServiceClient +} + +// NewNacosV2Discovery returns a new NacosV2Discovery. +func NewNacosV2Discovery(options map[string]interface{}) (base.Discovery, error) { + client, err := NewNacosServiceClient(options) + if err != nil { + return nil, err + } + return &NacosV2Discovery{client: client}, nil +} + +// GetServices returns the servers +func (nd *NacosV2Discovery) GetServices() []*base.ServiceInstance { + instances := nd.client.SelectAllServiceInstances() + result := make([]*base.ServiceInstance, 0, len(instances)) + + for _, instance := range instances { + result = append(result, &base.ServiceInstance{ + ID: instance.InstanceId, + Name: instance.ServiceName, + Version: "", + Endpoint: &config.Listener{ + ProtocolType: instance.Metadata[_protocolType], + SocketAddress: &config.SocketAddress{ + Address: instance.Ip, + Port: int(instance.Port), + }, + ServerVersion: instance.Metadata[_serverVersion], + }, + }) + } + + return result +} + +// WatchService returns a nil chan. +func (nd *NacosV2Discovery) WatchService() <-chan []*base.ServiceInstance { + // TODO will support later + return nil +} + +func (nd *NacosV2Discovery) Close() { + // TODO will support later + nd.client.CloseClient() +} diff --git a/pkg/registry/nacos/registry.go b/pkg/registry/nacos/registry.go new file mode 100644 index 00000000..bad3f9c0 --- /dev/null +++ b/pkg/registry/nacos/registry.go @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "context" +) + +import ( + "github.com/nacos-group/nacos-sdk-go/v2/common/logger" + "github.com/nacos-group/nacos-sdk-go/v2/vo" +) + +import ( + "github.com/arana-db/arana/pkg/registry/base" +) + +const ( + _protocolType = "protocol_type" + _serverVersion = "server_version" +) + +type NacosV2Registry struct { + client *NacosServiceClient +} + +func NewNacosV2Registry(options map[string]interface{}) (base.Registry, error) { + client, err := NewNacosServiceClient(options) + if err != nil { + return nil, err + } + return &NacosV2Registry{client: client}, nil +} + +func (ng *NacosV2Registry) Register(ctx context.Context, serviceInstance *base.ServiceInstance) error { + metadata := make(map[string]string) + metadata[_protocolType] = serviceInstance.Endpoint.ProtocolType + metadata[_serverVersion] = serviceInstance.Endpoint.ServerVersion + instance := vo.RegisterInstanceParam{ + Ip: serviceInstance.Endpoint.SocketAddress.Address, + Port: uint64(serviceInstance.Endpoint.SocketAddress.Port), + Weight: 10, + Enable: true, + Healthy: true, + ServiceName: serviceInstance.Name, + // TODO make GroupName and ClusterName configurable + GroupName: "DEFAULT_GROUP", + Metadata: metadata, + Ephemeral: true, + } + ok, err := ng.client.RegisterInstance(instance) + if err != nil { + return err + } + if !ok { + logger.Warnf("Register service %s failed.", serviceInstance) + } + return nil +} + +func (ng *NacosV2Registry) Unregister(ctx context.Context, name string) error { + instance, err := ng.client.SelectOneHealthyInstance(vo.SelectOneHealthInstanceParam{ServiceName: name}) + if err != nil { + return err + } + _, err = ng.client.DeregisterInstance(vo.DeregisterInstanceParam{ + Ip: instance.Ip, + Port: instance.Port, + ServiceName: instance.ServiceName, + Ephemeral: true, + }) + if err != nil { + return err + } + return nil +} + +func (ng *NacosV2Registry) UnregisterAllService(ctx context.Context) error { + instances := ng.client.SelectAllServiceInstances() + for _, inst := range instances { + flag, err := ng.client.DeregisterInstance(vo.DeregisterInstanceParam{ + Ip: inst.Ip, + Port: inst.Port, + Cluster: inst.ClusterName, + ServiceName: inst.ServiceName, + Ephemeral: true, + }) + if err != nil { + return err + } + if !flag { + logger.Infof("Deregister instance %s failed", inst) + } + } + return nil +} diff --git a/pkg/registry/registry.go b/pkg/registry/registry.go index 2d79122e..5a9b50e5 100644 --- a/pkg/registry/registry.go +++ b/pkg/registry/registry.go @@ -32,6 +32,7 @@ import ( "github.com/arana-db/arana/pkg/config" "github.com/arana-db/arana/pkg/registry/base" "github.com/arana-db/arana/pkg/registry/etcd" + "github.com/arana-db/arana/pkg/registry/nacos" "github.com/arana-db/arana/pkg/util/log" ) @@ -42,15 +43,17 @@ func DoRegistry(ctx context.Context, registryInstance base.Registry, name string if err != nil { return fmt.Errorf("service registry register error because get local host err:%v", err) } - for _, listener := range listeners { - tmpLister := *listener - if tmpLister.SocketAddress.Address == "0.0.0.0" || tmpLister.SocketAddress.Address == "127.0.0.1" { - tmpLister.SocketAddress.Address = serverAddr - } - serviceInstance.Endpoints = append(serviceInstance.Endpoints, &tmpLister) + // TODO this will be removed in the future + if len(listeners) == 0 { + return fmt.Errorf("listeners is not exist") } + tmpLister := listeners[0] + if tmpLister.SocketAddress.Address == "0.0.0.0" || tmpLister.SocketAddress.Address == "127.0.0.1" { + tmpLister.SocketAddress.Address = serverAddr + } + serviceInstance.Endpoint = tmpLister - return registryInstance.Register(ctx, name, serviceInstance) + return registryInstance.Register(ctx, serviceInstance) } func InitRegistry(registryConf *config.Registry) (base.Registry, error) { @@ -96,5 +99,5 @@ func initEtcdRegistry(registryConf *config.Registry) (base.Registry, error) { } func initNacosRegistry(registryConf *config.Registry) (base.Registry, error) { - return nil, nil + return nacos.NewNacosV2Registry(registryConf.Options) } diff --git a/pkg/util/config/nacos.go b/pkg/util/config/nacos.go new file mode 100644 index 00000000..e8efd4bd --- /dev/null +++ b/pkg/util/config/nacos.go @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "strconv" + "strings" +) + +import ( + "github.com/nacos-group/nacos-sdk-go/v2/clients" + "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client" + "github.com/nacos-group/nacos-sdk-go/v2/common/constant" +) + +const ( + DefaultGroupName string = "arana" + + NamespaceIdKey string = "namespace_id" + GroupKey string = "group" + Username string = "username" + Password string = "password" + Server string = "endpoints" + ContextPath string = "context_path" + Scheme string = "scheme" + PageSizeKey string = "page-size" + + PathSplit string = "::" + ServerSplit string = "," +) + +func NewNacosV2NamingClient(options map[string]interface{}) (naming_client.INamingClient, error) { + properties := make(map[string]interface{}) + properties[constant.KEY_CLIENT_CONFIG] = ParseNacosClientConfig(options) + properties[constant.KEY_SERVER_CONFIGS] = ParseNacosServerConfig(options) + return clients.CreateNamingClient(properties) +} + +func ParseNacosServerConfig(options map[string]interface{}) []constant.ServerConfig { + cfgs := make([]constant.ServerConfig, 0) + + scheme := "http" + if val, ok := options[Scheme]; ok { + scheme = val.(string) + } + contextPath := "/nacos" + if val, ok := options[ContextPath]; ok { + contextPath = val.(string) + } + + if servers, ok := options[Server]; ok { + addresses := strings.Split(servers.(string), ServerSplit) + for i := range addresses { + addr := strings.Split(strings.TrimSpace(addresses[i]), ":") + + ip := addr[0] + port, _ := strconv.ParseInt(addr[1], 10, 64) + + cfgs = append(cfgs, constant.ServerConfig{ + Scheme: scheme, + ContextPath: contextPath, + IpAddr: ip, + Port: uint64(port), + }) + } + } + + return cfgs +} + +func ParseNacosClientConfig(options map[string]interface{}) constant.ClientConfig { + cc := constant.ClientConfig{} + + if val, ok := options[NamespaceIdKey]; ok { + cc.NamespaceId = val.(string) + } + if val, ok := options[Username]; ok { + cc.Username = val.(string) + } + if val, ok := options[Password]; ok { + cc.Password = val.(string) + } + return cc +} diff --git a/pkg/util/config/nacos_test.go b/pkg/util/config/nacos_test.go new file mode 100644 index 00000000..9f7a5708 --- /dev/null +++ b/pkg/util/config/nacos_test.go @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +func Test_parseServerConfig(t *testing.T) { + // NamespaceIdKey string = "namespace_id" + // GroupKey string = "group" + // Username string = "username" + // Password string = "password" + // Server string = "endpoints" + // ContextPath string = "context_path" + // Scheme string = "scheme" + + options := map[string]interface{}{ + NamespaceIdKey: "arana_test", + GroupKey: "arana_test", + Username: "nacos_test", + Password: "nacos_test", + Server: "127.0.0.1:8848,127.0.0.2:8848", + } + + clientConfig := ParseNacosClientConfig(options) + assert.Equal(t, options[NamespaceIdKey], clientConfig.NamespaceId) + assert.Equal(t, options[Username], clientConfig.Username) + assert.Equal(t, options[Password], clientConfig.Password) + + serverConfigs := ParseNacosServerConfig(options) + assert.Equal(t, 2, len(serverConfigs)) + + assert.Equal(t, "127.0.0.1", serverConfigs[0].IpAddr) + assert.Equal(t, "127.0.0.2", serverConfigs[1].IpAddr) +}