diff --git a/common/extension/event_dispatcher.go b/common/extension/event_dispatcher.go index d7353d84dc..f0503e0542 100644 --- a/common/extension/event_dispatcher.go +++ b/common/extension/event_dispatcher.go @@ -55,7 +55,7 @@ func SetAndInitGlobalDispatcher(name string) { if dp, ok := dispatchers[name]; !ok || dp == nil { panic("EventDispatcher for " + name + " is not found, make sure you have import the package, " + - "like github.com/apache/dubbo-go/common/observer/dispatcher ") + "like import _ github.com/apache/dubbo-go/common/observer/dispatcher ") } globalEventDispatcher = dispatchers[name]() } diff --git a/common/extension/service_discovery.go b/common/extension/service_discovery.go index 456b14c83d..0227920dc6 100644 --- a/common/extension/service_discovery.go +++ b/common/extension/service_discovery.go @@ -42,7 +42,7 @@ func SetServiceDiscovery(protocol string, creator func(name string) (registry.Se func GetServiceDiscovery(protocol string, name string) (registry.ServiceDiscovery, error) { creator, ok := discoveryCreatorMap[protocol] if !ok { - return nil, perrors.New("Could not find the service discovery with name: " + name) + return nil, perrors.New("Could not find the service discovery with discovery protocol: " + protocol) } return creator(name) } diff --git a/common/rpc_service.go b/common/rpc_service.go index f9fb145c80..6ca0e827cb 100644 --- a/common/rpc_service.go +++ b/common/rpc_service.go @@ -345,6 +345,9 @@ func suiteMethod(method reflect.Method) *MethodType { // this method is in RPCService // we force users must implement RPCService interface in their provider + // and RPCService has only one method "Reference" + // In general, this method should not be exported to client + // so we ignore this method // see RPCService if mname == "Reference" { return nil diff --git a/config/service_config.go b/config/service_config.go index a500a44419..57fce028fa 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -84,7 +84,7 @@ type ServiceConfig struct { exporters []protocol.Exporter } -// Prefix return dubbo.service.${interface}. +// Prefix returns dubbo.service.${interface}. func (c *ServiceConfig) Prefix() string { return constant.ServiceConfigPrefix + c.InterfaceName + "." } @@ -141,7 +141,7 @@ func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List { return ports } -// Export export the service +// Export exports the service func (c *ServiceConfig) Export() error { // TODO: config center start here diff --git a/config_center/configuration_listener.go b/config_center/configuration_listener.go index 541cc09286..97fd9c7092 100644 --- a/config_center/configuration_listener.go +++ b/config_center/configuration_listener.go @@ -27,6 +27,7 @@ import ( // ConfigurationListener for changing listener's event type ConfigurationListener interface { + // Process the notification event once there's any change happens on the config Process(*ConfigChangeEvent) } diff --git a/metadata/mapping/memory/service_name_mapping.go b/metadata/mapping/memory/service_name_mapping.go new file mode 100644 index 0000000000..0965d52d91 --- /dev/null +++ b/metadata/mapping/memory/service_name_mapping.go @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package memory + +import ( + "sync" +) + +import ( + gxset "github.com/dubbogo/gost/container/set" +) +import ( + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/metadata/mapping" +) + +func init() { + extension.SetGlobalServiceNameMapping(GetNameMappingInstance) +} + +type InMemoryServiceNameMapping struct{} + +func (i *InMemoryServiceNameMapping) Map(serviceInterface string, group string, version string, protocol string) error { + return nil +} + +func (i *InMemoryServiceNameMapping) Get(serviceInterface string, group string, version string, protocol string) (*gxset.HashSet, error) { + return gxset.NewSet(config.GetApplicationConfig().Name), nil +} + +var serviceNameMappingInstance *InMemoryServiceNameMapping +var serviceNameMappingOnce sync.Once + +func GetNameMappingInstance() mapping.ServiceNameMapping { + serviceNameMappingOnce.Do(func() { + serviceNameMappingInstance = &InMemoryServiceNameMapping{} + }) + return serviceNameMappingInstance +} diff --git a/metadata/report/etcd/report.go b/metadata/report/etcd/report.go index 0d49ff1783..9db0b577bf 100644 --- a/metadata/report/etcd/report.go +++ b/metadata/report/etcd/report.go @@ -41,7 +41,7 @@ import ( const DEFAULT_ROOT = "dubbo" func init() { - extension.SetMetadataReportFactory("etcd", func() factory.MetadataReportFactory { + extension.SetMetadataReportFactory(constant.ETCDV3_KEY, func() factory.MetadataReportFactory { return &etcdMetadataReportFactory{} }) } diff --git a/metadata/service/inmemory/service_proxy.go b/metadata/service/inmemory/service_proxy.go index 840bd9a9d5..7e01439f04 100644 --- a/metadata/service/inmemory/service_proxy.go +++ b/metadata/service/inmemory/service_proxy.go @@ -20,7 +20,6 @@ package inmemory import ( "context" "reflect" - "time" ) import ( @@ -59,10 +58,7 @@ func (m *MetadataServiceProxy) GetExportedURLs(serviceInterface string, group st invocation.WithAttachments(map[string]string{constant.ASYNC_KEY: "false"}), invocation.WithParameterValues([]reflect.Value{siV, gV, vV, pV})) - start := time.Now() res := m.invkr.Invoke(context.Background(), inv) - end := time.Now() - logger.Infof("duration: %s, result: %v", (end.Sub(start)).String(), res.Result()) if res.Error() != nil { logger.Errorf("could not get the metadata service from remote provider: %v", res.Error()) return []interface{}{}, nil diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index 7f2cede929..b8b5b50970 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -155,6 +155,11 @@ func (r *RPCInvocation) Invoker() protocol.Invoker { return r.invoker } +// nolint +func (r *RPCInvocation) SetInvoker(invoker protocol.Invoker) { + r.invoker = invoker +} + // CallBack sets RPC callback method. func (r *RPCInvocation) CallBack() interface{} { return r.callBack diff --git a/registry/base_configuration_listener.go b/registry/base_configuration_listener.go index 55418318df..7b28d7ee1b 100644 --- a/registry/base_configuration_listener.go +++ b/registry/base_configuration_listener.go @@ -29,19 +29,19 @@ import ( "github.com/apache/dubbo-go/remoting" ) -// BaseConfigurationListener ... +// nolint type BaseConfigurationListener struct { configurators []config_center.Configurator dynamicConfiguration config_center.DynamicConfiguration defaultConfiguratorFunc func(url *common.URL) config_center.Configurator } -// Configurators ... +// Configurators gets Configurator from config center func (bcl *BaseConfigurationListener) Configurators() []config_center.Configurator { return bcl.configurators } -// InitWith ... +// InitWith will init BaseConfigurationListener by @key+@Listener+@f func (bcl *BaseConfigurationListener) InitWith(key string, listener config_center.ConfigurationListener, f func(url *common.URL) config_center.Configurator) { bcl.dynamicConfiguration = config.GetEnvInstance().GetDynamicConfiguration() if bcl.dynamicConfiguration == nil { @@ -60,7 +60,7 @@ func (bcl *BaseConfigurationListener) InitWith(key string, listener config_cente } } -// Process ... +// Process the notification event once there's any change happens on the config. func (bcl *BaseConfigurationListener) Process(event *config_center.ConfigChangeEvent) { logger.Infof("Notification of overriding rule, change type is: %v , raw config content is:%v", event.ConfigType, event.Value) if event.ConfigType == remoting.EventTypeDel { @@ -82,14 +82,14 @@ func (bcl *BaseConfigurationListener) genConfiguratorFromRawRule(rawConfig strin return nil } -// OverrideUrl ... +// OverrideUrl gets existing configuration rule and overrides provider url before exporting. func (bcl *BaseConfigurationListener) OverrideUrl(url *common.URL) { for _, v := range bcl.configurators { v.Configure(url) } } -// ToConfigurators ... +// ToConfigurators converts @urls by @f to config_center.Configurators func ToConfigurators(urls []*common.URL, f func(url *common.URL) config_center.Configurator) []config_center.Configurator { if len(urls) == 0 { return nil diff --git a/registry/consul/listener.go b/registry/consul/listener.go index 5fac9ec0f9..cf3888dd16 100644 --- a/registry/consul/listener.go +++ b/registry/consul/listener.go @@ -187,6 +187,7 @@ func (l *consulListener) handler(idx uint64, raw interface{}) { } } +// Next returns the service event from consul. func (l *consulListener) Next() (*registry.ServiceEvent, error) { select { case event := <-l.eventCh: @@ -196,6 +197,7 @@ func (l *consulListener) Next() (*registry.ServiceEvent, error) { } } +// Close closes this listener func (l *consulListener) Close() { close(l.done) l.plan.Stop() diff --git a/registry/consul/registry.go b/registry/consul/registry.go index bd394be443..c425c5ec20 100644 --- a/registry/consul/registry.go +++ b/registry/consul/registry.go @@ -36,8 +36,7 @@ import ( ) const ( - // RegistryConnDelay ... - RegistryConnDelay = 3 + registryConnDelay = 3 ) func init() { @@ -148,7 +147,7 @@ func (r *consulRegistry) subscribe(url *common.URL, notifyListener registry.Noti return } logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) - time.Sleep(time.Duration(RegistryConnDelay) * time.Second) + time.Sleep(time.Duration(registryConnDelay) * time.Second) continue } @@ -171,10 +170,12 @@ func (r *consulRegistry) getListener(url common.URL) (registry.Listener, error) return listener, err } +// GetUrl get registry URL of consul registry center func (r *consulRegistry) GetUrl() common.URL { return *r.URL } +// IsAvailable checks consul registry center whether is available func (r *consulRegistry) IsAvailable() bool { select { case <-r.done: @@ -184,6 +185,7 @@ func (r *consulRegistry) IsAvailable() bool { } } +// Destroy consul registry center func (r *consulRegistry) Destroy() { close(r.done) } diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 253dc597f9..2fbf9410f7 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -46,6 +46,8 @@ func init() { extension.SetDefaultRegistryDirectory(NewRegistryDirectory) } +// RegistryDirectory implementation of Directory: +// Invoker list returned from this Directory's list method have been filtered by Routers type RegistryDirectory struct { directory.BaseDirectory cacheInvokers []protocol.Invoker diff --git a/registry/etcdv3/listener.go b/registry/etcdv3/listener.go index 51fdf21f5d..436b6eca5b 100644 --- a/registry/etcdv3/listener.go +++ b/registry/etcdv3/listener.go @@ -38,15 +38,17 @@ type dataListener struct { listener config_center.ConfigurationListener } -// NewRegistryDataListener +// NewRegistryDataListener creates a data listener for etcd func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener { return &dataListener{listener: listener} } +// AddInterestedURL adds a registration @url to listen func (l *dataListener) AddInterestedURL(url *common.URL) { l.interestedURL = append(l.interestedURL, url) } +// DataChange processes the data change event from registry center of etcd func (l *dataListener) DataChange(eventType remoting.Event) bool { index := strings.Index(eventType.Path, "/providers/") @@ -88,10 +90,12 @@ func NewConfigurationListener(reg *etcdV3Registry) *configurationListener { return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)} } +// Process data change event from config center of etcd func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) { l.events <- configType } +// Next returns next service event once received func (l *configurationListener) Next() (*registry.ServiceEvent, error) { for { select { @@ -114,6 +118,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { } } +// Close etcd registry center func (l *configurationListener) Close() { l.registry.WaitGroup().Done() } diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index 9e590da3e7..2fec8eaad2 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -104,31 +104,37 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) { return r, nil } +// InitListeners init listeners of etcd registry center func (r *etcdV3Registry) InitListeners() { r.listener = etcdv3.NewEventListener(r.client) r.configListener = NewConfigurationListener(r) r.dataListener = NewRegistryDataListener(r.configListener) } +// DoRegister actually do the register job in the registry center of etcd func (r *etcdV3Registry) DoRegister(root string, node string) error { return r.client.Create(path.Join(root, node), "") } +// nolint func (r *etcdV3Registry) DoUnregister(root string, node string) error { return perrors.New("DoUnregister is not support in etcdV3Registry") } +// CloseAndNilClient closes listeners and clear client func (r *etcdV3Registry) CloseAndNilClient() { r.client.Close() r.client = nil } +// CloseListener closes listeners func (r *etcdV3Registry) CloseListener() { if r.configListener != nil { r.configListener.Close() } } +// CreatePath create the path in the registry center of etcd func (r *etcdV3Registry) CreatePath(k string) error { var tmpPath string for _, str := range strings.Split(k, "/")[1:] { @@ -141,6 +147,7 @@ func (r *etcdV3Registry) CreatePath(k string) error { return nil } +// DoSubscribe actually subscribe the provider URL func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) { var ( diff --git a/registry/etcdv3/service_discovery.go b/registry/etcdv3/service_discovery.go index aab317389b..e435a556e5 100644 --- a/registry/etcdv3/service_discovery.go +++ b/registry/etcdv3/service_discovery.go @@ -19,6 +19,18 @@ package etcdv3 import ( "fmt" + "sync" + "time" +) + +import ( + gxset "github.com/dubbogo/gost/container/set" + gxpage "github.com/dubbogo/gost/page" + "github.com/hashicorp/vault/helper/jsonutil" + perrors "github.com/pkg/errors" +) + +import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" @@ -26,12 +38,6 @@ import ( "github.com/apache/dubbo-go/registry" "github.com/apache/dubbo-go/remoting" "github.com/apache/dubbo-go/remoting/etcdv3" - gxset "github.com/dubbogo/gost/container/set" - gxpage "github.com/dubbogo/gost/page" - "github.com/hashicorp/vault/helper/jsonutil" - perrors "github.com/pkg/errors" - "sync" - "time" ) const ( @@ -83,8 +89,13 @@ func (e *etcdV3ServiceDiscovery) Register(instance registry.ServiceInstance) err if nil != e.client { ins, err := jsonutil.EncodeJSON(instance) if err == nil { - e.client.Create(path, string(ins)) - e.services.Add(instance.GetServiceName()) + err = e.client.Update(path, string(ins)) + if err != nil { + logger.Errorf("cannot register the instance: %s", string(ins), err) + } else { + e.services.Add(instance.GetServiceName()) + } + } } diff --git a/registry/etcdv3/service_discovery_test.go b/registry/etcdv3/service_discovery_test.go index ff3708e6f3..d8e3f1a286 100644 --- a/registry/etcdv3/service_discovery_test.go +++ b/registry/etcdv3/service_discovery_test.go @@ -1,14 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package etcdv3 +import ( + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + import ( "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/common/observer" - "github.com/apache/dubbo-go/common/observer/dispatcher" "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/registry" - "github.com/stretchr/testify/assert" - "testing" ) var testName = "test" @@ -20,8 +40,8 @@ func setUp() { } config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{ - Address: "localhost:2380", - TimeoutStr: "1000s", + Address: "localhost:2379", + TimeoutStr: "10s", } } @@ -44,7 +64,7 @@ func Test_newEtcdV3ServiceDiscovery(t *testing.T) { assert.NotNil(t, err) config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{ - Address: "localhost:2380", + Address: "localhost:2379", TimeoutStr: "10s", } @@ -53,96 +73,8 @@ func Test_newEtcdV3ServiceDiscovery(t *testing.T) { assert.NotNil(t, res) } -func TestEtcdV3ServiceDiscovery_Destroy(t *testing.T) { - setUp() - serviceDiscovery, err := extension.GetServiceDiscovery(constant.ETCDV3_KEY, testName) - - assert.Nil(t, err) - assert.NotNil(t, serviceDiscovery) - - err = serviceDiscovery.Destroy() - assert.Nil(t, err) - assert.NotNil(t, serviceDiscovery.(*etcdV3ServiceDiscovery).client) -} - -func TestEtcdV3ServiceDiscovery_CRUD(t *testing.T) { - setUp() - extension.SetEventDispatcher("mock", func() observer.EventDispatcher { - return &dispatcher.MockEventDispatcher{} - }) - - extension.SetAndInitGlobalDispatcher("mock") - - serviceName := "service-name" - id := "id" - host := "host" - port := 123 - instance := ®istry.DefaultServiceInstance{ - Id: id, - ServiceName: serviceName, - Host: host, - Port: port, - Enable: true, - Healthy: true, - Metadata: nil, - } - - // clean data - - serviceDiscovry, _ := extension.GetServiceDiscovery(constant.ETCDV3_KEY, testName) - - // clean data for local test - serviceDiscovry.Unregister(®istry.DefaultServiceInstance{ - Id: id, - ServiceName: serviceName, - Host: host, - Port: port, - }) - - err := serviceDiscovry.Register(instance) - assert.Nil(t, err) - - page := serviceDiscovry.GetHealthyInstancesByPage(serviceName, 0, 10, true) - assert.NotNil(t, page) - - assert.Equal(t, 0, page.GetOffset()) - assert.Equal(t, 10, page.GetPageSize()) - assert.Equal(t, 1, page.GetDataSize()) - - instance = page.GetData()[0].(*registry.DefaultServiceInstance) - assert.NotNil(t, instance) - assert.Equal(t, id, instance.GetId()) - assert.Equal(t, host, instance.GetHost()) - assert.Equal(t, port, instance.GetPort()) - assert.Equal(t, serviceName, instance.GetServiceName()) - assert.Equal(t, 0, len(instance.GetMetadata())) - - instance.Metadata["a"] = "b" - - err = serviceDiscovry.Update(instance) - assert.Nil(t, err) - - pageMap := serviceDiscovry.GetRequestInstances([]string{serviceName}, 0, 1) - assert.Equal(t, 1, len(pageMap)) - page = pageMap[serviceName] - assert.NotNil(t, page) - assert.Equal(t, 1, len(page.GetData())) - - instance = page.GetData()[0].(*registry.DefaultServiceInstance) - v, _ := instance.Metadata["a"] - assert.Equal(t, "b", v) - - // test dispatcher event - err = serviceDiscovry.DispatchEventByServiceName(serviceName) - assert.Nil(t, err) - - // test AddListener - err = serviceDiscovry.AddListener(®istry.ServiceInstancesChangedListener{ServiceName: serviceName}) - assert.Nil(t, err) -} - func TestEtcdV3ServiceDiscovery_GetDefaultPageSize(t *testing.T) { setUp() - serviceDiscovry, _ := extension.GetServiceDiscovery(constant.ETCDV3_KEY, testName) + serviceDiscovry := &etcdV3ServiceDiscovery{} assert.Equal(t, registry.DefaultPageSize, serviceDiscovry.GetDefaultPageSize()) } diff --git a/registry/event.go b/registry/event.go index 6f647185cc..39fb00c740 100644 --- a/registry/event.go +++ b/registry/event.go @@ -37,7 +37,7 @@ func init() { // service event // //////////////////////////////////////// -// ServiceEvent ... +// ServiceEvent includes create, update, delete event type ServiceEvent struct { Action remoting.EventType Service common.URL diff --git a/registry/kubernetes/listener.go b/registry/kubernetes/listener.go index ac6f8af8a2..24c8d81614 100644 --- a/registry/kubernetes/listener.go +++ b/registry/kubernetes/listener.go @@ -38,12 +38,12 @@ type dataListener struct { listener config_center.ConfigurationListener } -// NewRegistryDataListener +// NewRegistryDataListener creates a data listener for kubernetes func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener { return &dataListener{listener: listener} } -// AddInterestedURL +// AddInterestedURL adds the @url of registry center to the listener func (l *dataListener) AddInterestedURL(url *common.URL) { l.interestedURL = append(l.interestedURL, url) } @@ -91,10 +91,12 @@ func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener { return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)} } +// Process processes the data change event from config center of kubernetes func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) { l.events <- configType } +// Next returns next service event once received func (l *configurationListener) Next() (*registry.ServiceEvent, error) { for { select { @@ -116,6 +118,8 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { } } } + +// Close kubernetes registry center func (l *configurationListener) Close() { l.registry.WaitGroup().Done() } diff --git a/registry/kubernetes/registry.go b/registry/kubernetes/registry.go index 6fa1193465..7c5162670d 100644 --- a/registry/kubernetes/registry.go +++ b/registry/kubernetes/registry.go @@ -68,23 +68,28 @@ type kubernetesRegistry struct { configListener *configurationListener } +// Client gets the etcdv3 kubernetes func (r *kubernetesRegistry) Client() *kubernetes.Client { r.cltLock.RLock() client := r.client r.cltLock.RUnlock() return client } + +// SetClient sets the kubernetes client func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) { r.cltLock.Lock() r.client = client r.cltLock.Unlock() } +// CloseAndNilClient closes listeners and clear client func (r *kubernetesRegistry) CloseAndNilClient() { r.client.Close() r.client = nil } +// CloseListener closes listeners func (r *kubernetesRegistry) CloseListener() { r.cltLock.Lock() @@ -96,6 +101,7 @@ func (r *kubernetesRegistry) CloseListener() { r.configListener = nil } +// CreatePath create the path in the registry center of kubernetes func (r *kubernetesRegistry) CreatePath(k string) error { if err := r.client.Create(k, ""); err != nil { return perrors.WithMessagef(err, "create path %s in kubernetes", k) @@ -103,6 +109,7 @@ func (r *kubernetesRegistry) CreatePath(k string) error { return nil } +// DoRegister actually do the register job in the registry center of kubernetes func (r *kubernetesRegistry) DoRegister(root string, node string) error { return r.client.Create(path.Join(root, node), "") } @@ -111,6 +118,7 @@ func (r *kubernetesRegistry) DoUnregister(root string, node string) error { return perrors.New("DoUnregister is not support in kubernetesRegistry") } +// DoSubscribe actually subscribe the provider URL func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) { var ( @@ -143,10 +151,12 @@ func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, er return configListener, nil } +// nolint func (r *kubernetesRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) { return nil, perrors.New("DoUnsubscribe is not support in kubernetesRegistry") } +// InitListeners init listeners of kubernetes registry center func (r *kubernetesRegistry) InitListeners() { r.listener = kubernetes.NewEventListener(r.client) r.configListener = NewConfigurationListener(r) @@ -191,6 +201,7 @@ func newMockKubernetesRegistry( return r, nil } +// HandleClientRestart will reconnect to kubernetes registry center func (r *kubernetesRegistry) HandleClientRestart() { var ( diff --git a/registry/mock_registry.go b/registry/mock_registry.go index f39490a267..10561d0f49 100644 --- a/registry/mock_registry.go +++ b/registry/mock_registry.go @@ -30,13 +30,13 @@ import ( "github.com/apache/dubbo-go/common/logger" ) -// MockRegistry ... +// MockRegistry is used as mock registry type MockRegistry struct { listener *listener destroyed *atomic.Bool } -// NewMockRegistry ... +// NewMockRegistry creates a mock registry func NewMockRegistry(url *common.URL) (Registry, error) { registry := &MockRegistry{ destroyed: atomic.NewBool(false), @@ -46,28 +46,28 @@ func NewMockRegistry(url *common.URL) (Registry, error) { return registry, nil } -// Register ... +// Register is used as a mock registry func (*MockRegistry) Register(url common.URL) error { return nil } -// UnRegister +// nolint func (r *MockRegistry) UnRegister(conf common.URL) error { return nil } -// Destroy ... +// nolint func (r *MockRegistry) Destroy() { if r.destroyed.CAS(false, true) { } } -// IsAvailable ... +// IsAvailable is use for determine a mock registry available func (r *MockRegistry) IsAvailable() bool { return !r.destroyed.Load() } -// GetUrl ... +// nolint func (r *MockRegistry) GetUrl() common.URL { return common.URL{} } @@ -76,7 +76,7 @@ func (r *MockRegistry) subscribe(*common.URL) (Listener, error) { return r.listener, nil } -// Subscribe ... +// nolint func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) error { go func() { for { @@ -134,7 +134,7 @@ func (*listener) Close() { } -// MockEvent ... +// nolint func (r *MockRegistry) MockEvent(event *ServiceEvent) { r.listener.listenChan <- event } diff --git a/registry/nacos/listener.go b/registry/nacos/listener.go index a2237dca26..36f733df5a 100644 --- a/registry/nacos/listener.go +++ b/registry/nacos/listener.go @@ -51,7 +51,7 @@ type nacosListener struct { subscribeParam *vo.SubscribeParam } -// NewNacosListener ... +// NewRegistryDataListener creates a data listener for nacos func NewNacosListener(url common.URL, namingClient naming_client.INamingClient) (*nacosListener, error) { listener := &nacosListener{ namingClient: namingClient, @@ -109,6 +109,7 @@ func generateUrl(instance model.Instance) *common.URL { ) } +// Callback will be invoked when got subscribed events. func (nl *nacosListener) Callback(services []model.SubscribeService, err error) { if err != nil { logger.Errorf("nacos subscribe callback error:%s , subscribe:%+v ", err.Error(), nl.subscribeParam) @@ -198,6 +199,7 @@ func (nl *nacosListener) process(configType *config_center.ConfigChangeEvent) { nl.events <- configType } +// Next returns the service event from nacos. func (nl *nacosListener) Next() (*registry.ServiceEvent, error) { for { select { @@ -212,6 +214,7 @@ func (nl *nacosListener) Next() (*registry.ServiceEvent, error) { } } +// nolint func (nl *nacosListener) Close() { nl.stopListen() close(nl.done) diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index 6bfd476344..51d3e2f56a 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -117,6 +117,7 @@ func createRegisterParam(url common.URL, serviceName string) vo.RegisterInstance return instance } +// Register will register the service @url to its nacos registry center func (nr *nacosRegistry) Register(url common.URL) error { serviceName := getServiceName(url) param := createRegisterParam(url, serviceName) @@ -179,15 +180,18 @@ func (nr *nacosRegistry) UnSubscribe(url *common.URL, notifyListener registry.No return perrors.New("UnSubscribe not support in nacosRegistry") } +// GetUrl gets its registration URL func (nr *nacosRegistry) GetUrl() common.URL { return *nr.URL } +// IsAvailable determines nacos registry center whether it is available func (nr *nacosRegistry) IsAvailable() bool { // TODO return true } +// nolint func (nr *nacosRegistry) Destroy() { return } diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index a936db80bf..4c669b2cee 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -117,6 +117,7 @@ func (proto *registryProtocol) initConfigurationListeners() { proto.providerConfigurationListener = newProviderConfigurationListener(proto.overrideListeners) } +// Refer provider service from registry center func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { var registryUrl = url var serviceUrl = registryUrl.SubURL @@ -156,6 +157,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { return invoker } +// Export provider service to registry center func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporter { proto.once.Do(func() { proto.initConfigurationListeners() @@ -229,6 +231,7 @@ func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Inv return &overrideSubscribeListener{url: overriderUrl, originInvoker: invoker, protocol: proto} } +// Notify will be triggered when a service change notification is received. func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) { if isMatched(&(event.Service), nl.url) && event.Action == remoting.EventTypeAdd { nl.configurator = extension.GetDefaultConfigurator(&(event.Service)) @@ -325,6 +328,7 @@ func getSubscribedOverrideUrl(providerUrl *common.URL) *common.URL { return newUrl } +// Destroy registry protocol func (proto *registryProtocol) Destroy() { for _, ivk := range proto.invokers { ivk.Destroy() @@ -389,6 +393,7 @@ func newWrappedInvoker(invoker protocol.Invoker, url *common.URL) *wrappedInvoke } } +// Invoke remote service base on URL of wrappedInvoker func (ivk *wrappedInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { // get right url ivk.invoker.(*proxy_factory.ProxyInvoker).BaseInvoker = *protocol.NewBaseInvoker(ivk.GetUrl()) @@ -411,6 +416,7 @@ func newProviderConfigurationListener(overrideListeners *sync.Map) *providerConf return listener } +// Process notified once there's any change happens on the provider config func (listener *providerConfigurationListener) Process(event *config_center.ConfigChangeEvent) { listener.BaseConfigurationListener.Process(event) listener.overrideListeners.Range(func(key, value interface{}) bool { @@ -435,6 +441,7 @@ func newServiceConfigurationListener(overrideListener *overrideSubscribeListener return listener } +// Process notified once there's any change happens on the service config func (listener *serviceConfigurationListener) Process(event *config_center.ConfigChangeEvent) { listener.BaseConfigurationListener.Process(event) listener.overrideListener.doOverrideIfNecessary() diff --git a/registry/registry.go b/registry/registry.go index ce214c4971..bb09ead7ef 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -58,13 +58,16 @@ type Registry interface { UnSubscribe(*common.URL, NotifyListener) error } -// NotifyListener ... +// nolint type NotifyListener interface { + // Notify supports notifications on the service interface and the dimension of the data type. Notify(*ServiceEvent) } // Listener Deprecated! type Listener interface { + // Next returns next service event once received Next() (*ServiceEvent, error) + // Close closes this listener Close() } diff --git a/registry/service_discovery.go b/registry/service_discovery.go index a8228a4abe..cb7a3c0182 100644 --- a/registry/service_discovery.go +++ b/registry/service_discovery.go @@ -28,6 +28,7 @@ import ( const DefaultPageSize = 100 +// ServiceDiscovery is the common operations of Service Discovery type ServiceDiscovery interface { fmt.Stringer diff --git a/registry/service_instance.go b/registry/service_instance.go index 08ca79ecbf..dbb458284d 100644 --- a/registry/service_instance.go +++ b/registry/service_instance.go @@ -21,6 +21,7 @@ import ( gxsort "github.com/dubbogo/gost/sort" ) +// ServiceInstance is the model class of an instance of a service, which is used for service registration and discovery. type ServiceInstance interface { // GetId will return this instance's id. It should be unique. diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 5d5f9e0526..8f2ac1023b 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -87,12 +87,12 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) { return r, nil } -// Options ... +// nolint type Options struct { client *zookeeper.ZookeeperClient } -// Option ... +// nolint type Option func(*Options) func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestCluster, *zkRegistry, error) { @@ -117,6 +117,7 @@ func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestClust return c, r, nil } +// InitListeners initializes listeners of zookeeper registry center func (r *zkRegistry) InitListeners() { r.listener = zookeeper.NewZkEventListener(r.client) newDataListener := NewRegistryDataListener() @@ -147,10 +148,12 @@ func (r *zkRegistry) InitListeners() { r.dataListener = newDataListener } +// CreatePath creates the path in the registry center of zookeeper func (r *zkRegistry) CreatePath(path string) error { return r.ZkClient().Create(path) } +// DoRegister actually do the register job in the registry center of zookeeper func (r *zkRegistry) DoRegister(root string, node string) error { return r.registerTempZookeeperNode(root, node) } @@ -164,6 +167,7 @@ func (r *zkRegistry) DoUnregister(root string, node string) error { return r.ZkClient().Delete(path.Join(root, node)) } +// DoSubscribe actually subscribes the provider URL func (r *zkRegistry) DoSubscribe(conf *common.URL) (registry.Listener, error) { return r.getListener(conf) } @@ -172,23 +176,28 @@ func (r *zkRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) return r.getCloseListener(conf) } +// CloseAndNilClient closes listeners and clear client func (r *zkRegistry) CloseAndNilClient() { r.client.Close() r.client = nil } +// nolint func (r *zkRegistry) ZkClient() *zookeeper.ZookeeperClient { return r.client } +// nolint func (r *zkRegistry) SetZkClient(client *zookeeper.ZookeeperClient) { r.client = client } +// nolint func (r *zkRegistry) ZkClientLock() *sync.Mutex { return &r.cltLock } +// CloseListener closes listeners func (r *zkRegistry) CloseListener() { if r.dataListener != nil { r.dataListener.Close() diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go index 93da6402c7..a24a69232e 100644 --- a/remoting/etcdv3/client.go +++ b/remoting/etcdv3/client.go @@ -144,7 +144,7 @@ func NewServiceDiscoveryClient(opts ...Option) *Client { newClient, err := NewClient(options.name, options.endpoints, options.timeout, options.heartbeat) if err != nil { - logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}", + logger.Errorf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}", options.name, options.endpoints, options.timeout, err) return nil }