From 5a111c319561cc0895b1950d19a5f1a4079c330f Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Sat, 6 Jun 2020 23:26:00 +0800 Subject: [PATCH 1/9] add comment in all files of registry directory --- config_center/configuration_listener.go | 2 ++ registry/base_configuration_listener.go | 12 ++++++------ registry/consul/listener.go | 2 ++ registry/consul/registry.go | 11 ++++++++--- registry/directory/directory.go | 2 ++ registry/etcdv3/listener.go | 7 ++++++- registry/etcdv3/registry.go | 12 +++++++++--- registry/event.go | 2 +- registry/event_listener.go | 1 + registry/kubernetes/listener.go | 8 ++++++-- registry/kubernetes/registry.go | 10 ++++++++++ registry/mock_registry.go | 16 ++++++++-------- registry/nacos/listener.go | 5 ++++- registry/nacos/registry.go | 4 ++++ registry/nacos/service_discovery.go | 7 ++++--- registry/nacos/service_discovery_test.go | 2 +- registry/protocol/protocol.go | 9 +++++++++ registry/registry.go | 10 +++++++++- registry/service_discovery.go | 3 +-- registry/service_instance.go | 1 + registry/zookeeper/registry.go | 13 +++++++++++-- 21 files changed, 105 insertions(+), 34 deletions(-) diff --git a/config_center/configuration_listener.go b/config_center/configuration_listener.go index 541cc09286..8783baf1e8 100644 --- a/config_center/configuration_listener.go +++ b/config_center/configuration_listener.go @@ -27,6 +27,8 @@ import ( // ConfigurationListener for changing listener's event type ConfigurationListener interface { + // Process is Listener callback method. Listener gets notified by this method once there's any change happens on the config + // the listener listens on. Process(*ConfigChangeEvent) } diff --git a/registry/base_configuration_listener.go b/registry/base_configuration_listener.go index 55418318df..4dd484deb2 100644 --- a/registry/base_configuration_listener.go +++ b/registry/base_configuration_listener.go @@ -29,19 +29,19 @@ import ( "github.com/apache/dubbo-go/remoting" ) -// BaseConfigurationListener ... +// BaseConfigurationListener will get notified when the config it listens on changes. type BaseConfigurationListener struct { configurators []config_center.Configurator dynamicConfiguration config_center.DynamicConfiguration defaultConfiguratorFunc func(url *common.URL) config_center.Configurator } -// Configurators ... +// Configurators gets Configurator from config center func (bcl *BaseConfigurationListener) Configurators() []config_center.Configurator { return bcl.configurators } -// InitWith ... +// InitWith will init BaseConfigurationListener with @key 、@listener and @f func (bcl *BaseConfigurationListener) InitWith(key string, listener config_center.ConfigurationListener, f func(url *common.URL) config_center.Configurator) { bcl.dynamicConfiguration = config.GetEnvInstance().GetDynamicConfiguration() if bcl.dynamicConfiguration == nil { @@ -60,7 +60,7 @@ func (bcl *BaseConfigurationListener) InitWith(key string, listener config_cente } } -// Process ... +// Process can reference ConfigurationListener.Process func (bcl *BaseConfigurationListener) Process(event *config_center.ConfigChangeEvent) { logger.Infof("Notification of overriding rule, change type is: %v , raw config content is:%v", event.ConfigType, event.Value) if event.ConfigType == remoting.EventTypeDel { @@ -82,14 +82,14 @@ func (bcl *BaseConfigurationListener) genConfiguratorFromRawRule(rawConfig strin return nil } -// OverrideUrl ... +// OverrideUrl gets existing configuration rule and override provider url before exporting. func (bcl *BaseConfigurationListener) OverrideUrl(url *common.URL) { for _, v := range bcl.configurators { v.Configure(url) } } -// ToConfigurators ... +// ToConfigurators converts override urls to map for use when re-refer. Send all rules every time, the urls will be reassembled and calculated func ToConfigurators(urls []*common.URL, f func(url *common.URL) config_center.Configurator) []config_center.Configurator { if len(urls) == 0 { return nil diff --git a/registry/consul/listener.go b/registry/consul/listener.go index 5fac9ec0f9..841ecbacc5 100644 --- a/registry/consul/listener.go +++ b/registry/consul/listener.go @@ -187,6 +187,7 @@ func (l *consulListener) handler(idx uint64, raw interface{}) { } } +// Next returns next service event once received func (l *consulListener) Next() (*registry.ServiceEvent, error) { select { case event := <-l.eventCh: @@ -196,6 +197,7 @@ func (l *consulListener) Next() (*registry.ServiceEvent, error) { } } +// Close closes this listener func (l *consulListener) Close() { close(l.done) l.plan.Stop() diff --git a/registry/consul/registry.go b/registry/consul/registry.go index c5b8510a6c..c03f2c06ff 100644 --- a/registry/consul/registry.go +++ b/registry/consul/registry.go @@ -36,8 +36,7 @@ import ( ) const ( - // RegistryConnDelay ... - RegistryConnDelay = 3 + registryConnDelay = 3 ) func init() { @@ -74,6 +73,7 @@ func newConsulRegistry(url *common.URL) (registry.Registry, error) { return r, nil } +// Register service to consul registry center func (r *consulRegistry) Register(url common.URL) error { var err error @@ -95,6 +95,7 @@ func (r *consulRegistry) register(url common.URL) error { return r.client.Agent().ServiceRegister(service) } +// Unregister service from consul registry center func (r *consulRegistry) Unregister(url common.URL) error { var err error @@ -112,6 +113,7 @@ func (r *consulRegistry) unregister(url common.URL) error { return r.client.Agent().ServiceDeregister(buildId(url)) } +// Subscribe service from consul registry center func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, "")) if role == common.CONSUMER { @@ -133,7 +135,7 @@ func (r *consulRegistry) subscribe(url *common.URL, notifyListener registry.Noti return } logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) - time.Sleep(time.Duration(RegistryConnDelay) * time.Second) + time.Sleep(time.Duration(registryConnDelay) * time.Second) continue } @@ -156,10 +158,12 @@ func (r *consulRegistry) getListener(url common.URL) (registry.Listener, error) return listener, err } +// GetUrl get registry URL of consul registry center func (r *consulRegistry) GetUrl() common.URL { return *r.URL } +// IsAvailable determines consul registry center whether is available func (r *consulRegistry) IsAvailable() bool { select { case <-r.done: @@ -169,6 +173,7 @@ func (r *consulRegistry) IsAvailable() bool { } } +// Destroy consul registry center func (r *consulRegistry) Destroy() { close(r.done) } diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 49b0027f43..e845db01f1 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -46,6 +46,8 @@ func init() { extension.SetDefaultRegistryDirectory(NewRegistryDirectory) } +// RegistryDirectory implementation of Directory: +// Invoker list returned from this Directory's list method have been filtered by Routers type RegistryDirectory struct { directory.BaseDirectory cacheInvokers []protocol.Invoker diff --git a/registry/etcdv3/listener.go b/registry/etcdv3/listener.go index 51fdf21f5d..0ecdbf7af8 100644 --- a/registry/etcdv3/listener.go +++ b/registry/etcdv3/listener.go @@ -38,15 +38,17 @@ type dataListener struct { listener config_center.ConfigurationListener } -// NewRegistryDataListener +// NewRegistryDataListener creates a data listener for etcd func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener { return &dataListener{listener: listener} } +// AddInterestedURL add more URL of registry center to listen func (l *dataListener) AddInterestedURL(url *common.URL) { l.interestedURL = append(l.interestedURL, url) } +// Process data change event from registry center of etcd func (l *dataListener) DataChange(eventType remoting.Event) bool { index := strings.Index(eventType.Path, "/providers/") @@ -88,10 +90,12 @@ func NewConfigurationListener(reg *etcdV3Registry) *configurationListener { return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)} } +// Process data change event from config center of etcd func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) { l.events <- configType } +// Next returns next service event once received func (l *configurationListener) Next() (*registry.ServiceEvent, error) { for { select { @@ -114,6 +118,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { } } +// Close etcd registry center func (l *configurationListener) Close() { l.registry.WaitGroup().Done() } diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index 5d389c3637..0fbf98ae9e 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -57,17 +57,17 @@ type etcdV3Registry struct { configListener *configurationListener } -// Client get the etcdv3 client +// Client gets the etcdv3 client func (r *etcdV3Registry) Client() *etcdv3.Client { return r.client } -//SetClient set the etcdv3 client +// SetClient sets the etcdv3 client func (r *etcdV3Registry) SetClient(client *etcdv3.Client) { r.client = client } -// +// ClientLock gets registry control lock func (r *etcdV3Registry) ClientLock() *sync.Mutex { return &r.cltLock } @@ -104,27 +104,32 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) { return r, nil } +// InitListeners init listeners of etcd registry center func (r *etcdV3Registry) InitListeners() { r.listener = etcdv3.NewEventListener(r.client) r.configListener = NewConfigurationListener(r) r.dataListener = NewRegistryDataListener(r.configListener) } +// DoRegister actually do the register job in the registry center of etcd func (r *etcdV3Registry) DoRegister(root string, node string) error { return r.client.Create(path.Join(root, node), "") } +// CloseAndNilClient closes listeners and clear client func (r *etcdV3Registry) CloseAndNilClient() { r.client.Close() r.client = nil } +// CloseListener closes listeners func (r *etcdV3Registry) CloseListener() { if r.configListener != nil { r.configListener.Close() } } +// CreatePath create the path in the registry center of etcd func (r *etcdV3Registry) CreatePath(k string) error { var tmpPath string for _, str := range strings.Split(k, "/")[1:] { @@ -137,6 +142,7 @@ func (r *etcdV3Registry) CreatePath(k string) error { return nil } +// DoSubscribe actually subscribe the provider URL func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) { var ( diff --git a/registry/event.go b/registry/event.go index be9f11d00b..00fe183a98 100644 --- a/registry/event.go +++ b/registry/event.go @@ -36,7 +36,7 @@ func init() { // service event // //////////////////////////////////////// -// ServiceEvent ... +// ServiceEvent is create、update or delete event of service type ServiceEvent struct { Action remoting.EventType Service common.URL diff --git a/registry/event_listener.go b/registry/event_listener.go index b8d6148442..7ab011a549 100644 --- a/registry/event_listener.go +++ b/registry/event_listener.go @@ -36,6 +36,7 @@ type ConditionalEventListener interface { Accept(e Event) bool } +// ServiceInstancesChangedListener is use for the Service Discovery Changed // TODO (implement ConditionalEventListener) type ServiceInstancesChangedListener struct { ServiceName string diff --git a/registry/kubernetes/listener.go b/registry/kubernetes/listener.go index ac6f8af8a2..3da0116600 100644 --- a/registry/kubernetes/listener.go +++ b/registry/kubernetes/listener.go @@ -38,12 +38,12 @@ type dataListener struct { listener config_center.ConfigurationListener } -// NewRegistryDataListener +// NewRegistryDataListener creates a data listener for kubernetes func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener { return &dataListener{listener: listener} } -// AddInterestedURL +// AddInterestedURL add more URL of registry center to listen func (l *dataListener) AddInterestedURL(url *common.URL) { l.interestedURL = append(l.interestedURL, url) } @@ -91,10 +91,12 @@ func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener { return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)} } +// Process data change event from config center of kubernetes func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) { l.events <- configType } +// Next returns next service event once received func (l *configurationListener) Next() (*registry.ServiceEvent, error) { for { select { @@ -116,6 +118,8 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { } } } + +// Close kubernetes registry center func (l *configurationListener) Close() { l.registry.WaitGroup().Done() } diff --git a/registry/kubernetes/registry.go b/registry/kubernetes/registry.go index 9fa6b83941..f06d80124b 100644 --- a/registry/kubernetes/registry.go +++ b/registry/kubernetes/registry.go @@ -68,23 +68,28 @@ type kubernetesRegistry struct { configListener *configurationListener } +// Client gets the etcdv3 kubernetes func (r *kubernetesRegistry) Client() *kubernetes.Client { r.cltLock.RLock() client := r.client r.cltLock.RUnlock() return client } + +// SetClient sets the kubernetes client func (r *kubernetesRegistry) SetClient(client *kubernetes.Client) { r.cltLock.Lock() r.client = client r.cltLock.Unlock() } +// CloseAndNilClient closes listeners and clear client func (r *kubernetesRegistry) CloseAndNilClient() { r.client.Close() r.client = nil } +// CloseListener closes listeners func (r *kubernetesRegistry) CloseListener() { r.cltLock.Lock() @@ -96,6 +101,7 @@ func (r *kubernetesRegistry) CloseListener() { r.configListener = nil } +// CreatePath create the path in the registry center of kubernetes func (r *kubernetesRegistry) CreatePath(k string) error { if err := r.client.Create(k, ""); err != nil { return perrors.WithMessagef(err, "create path %s in kubernetes", k) @@ -103,10 +109,12 @@ func (r *kubernetesRegistry) CreatePath(k string) error { return nil } +// DoRegister actually do the register job in the registry center of kubernetes func (r *kubernetesRegistry) DoRegister(root string, node string) error { return r.client.Create(path.Join(root, node), "") } +// DoSubscribe actually subscribe the provider URL func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, error) { var ( @@ -139,6 +147,7 @@ func (r *kubernetesRegistry) DoSubscribe(svc *common.URL) (registry.Listener, er return configListener, nil } +// InitListeners init listeners of kubernetes registry center func (r *kubernetesRegistry) InitListeners() { r.listener = kubernetes.NewEventListener(r.client) r.configListener = NewConfigurationListener(r) @@ -183,6 +192,7 @@ func newMockKubernetesRegistry( return r, nil } +// HandleClientRestart will reconnect to kubernetes registry center func (r *kubernetesRegistry) HandleClientRestart() { var ( diff --git a/registry/mock_registry.go b/registry/mock_registry.go index 9591928eeb..7dbcbbd4e2 100644 --- a/registry/mock_registry.go +++ b/registry/mock_registry.go @@ -30,13 +30,13 @@ import ( "github.com/apache/dubbo-go/common/logger" ) -// MockRegistry ... +// MockRegistry is use for mock registry type MockRegistry struct { listener *listener destroyed *atomic.Bool } -// NewMockRegistry ... +// NewMockRegistry is use for create a mock registry func NewMockRegistry(url *common.URL) (Registry, error) { registry := &MockRegistry{ destroyed: atomic.NewBool(false), @@ -46,23 +46,23 @@ func NewMockRegistry(url *common.URL) (Registry, error) { return registry, nil } -// Register ... +// Register is use for register a mock registry func (*MockRegistry) Register(url common.URL) error { return nil } -// Destroy ... +// Destroy is use for destory a mock registry func (r *MockRegistry) Destroy() { if r.destroyed.CAS(false, true) { } } -// IsAvailable ... +// IsAvailable is use for determine a mock registry available func (r *MockRegistry) IsAvailable() bool { return !r.destroyed.Load() } -// GetUrl ... +// GetUrl is use for register a mock registry URL func (r *MockRegistry) GetUrl() common.URL { return common.URL{} } @@ -71,7 +71,7 @@ func (r *MockRegistry) subscribe(*common.URL) (Listener, error) { return r.listener, nil } -// Subscribe ... +// Subscribe is use for subscribe a mock registry func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) { go func() { for { @@ -123,7 +123,7 @@ func (*listener) Close() { } -// MockEvent ... +// MockEvent is use for register a mock event func (r *MockRegistry) MockEvent(event *ServiceEvent) { r.listener.listenChan <- event } diff --git a/registry/nacos/listener.go b/registry/nacos/listener.go index a2237dca26..c53f2ccd8a 100644 --- a/registry/nacos/listener.go +++ b/registry/nacos/listener.go @@ -51,7 +51,7 @@ type nacosListener struct { subscribeParam *vo.SubscribeParam } -// NewNacosListener ... +// NewRegistryDataListener creates a data listener for nacos func NewNacosListener(url common.URL, namingClient naming_client.INamingClient) (*nacosListener, error) { listener := &nacosListener{ namingClient: namingClient, @@ -109,6 +109,7 @@ func generateUrl(instance model.Instance) *common.URL { ) } +// Callback will callback when subscribed func (nl *nacosListener) Callback(services []model.SubscribeService, err error) { if err != nil { logger.Errorf("nacos subscribe callback error:%s , subscribe:%+v ", err.Error(), nl.subscribeParam) @@ -198,6 +199,7 @@ func (nl *nacosListener) process(configType *config_center.ConfigChangeEvent) { nl.events <- configType } +// Next returns next service event once received func (nl *nacosListener) Next() (*registry.ServiceEvent, error) { for { select { @@ -212,6 +214,7 @@ func (nl *nacosListener) Next() (*registry.ServiceEvent, error) { } } +// Close nacos registry center func (nl *nacosListener) Close() { nl.stopListen() close(nl.done) diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index a436b85064..345e5145e0 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -123,6 +123,7 @@ func createRegisterParam(url common.URL, serviceName string) vo.RegisterInstance return instance } +// Register service to nacos registry center func (nr *nacosRegistry) Register(url common.URL) error { serviceName := getServiceName(url) param := createRegisterParam(url, serviceName) @@ -174,14 +175,17 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti } } +// GetUrl get registry URL of nacos registry center func (nr *nacosRegistry) GetUrl() common.URL { return *nr.URL } +// IsAvailable determines nacos registry center whether is available func (nr *nacosRegistry) IsAvailable() bool { return true } +// Destroy nacos registry center func (nr *nacosRegistry) Destroy() { return } diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index 7d3406cac2..b73a52a797 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -34,8 +34,9 @@ import ( ) const ( - defaultGroup = "DEFAULT_GROUP" - idKey = "id" + defaultGroup = "DEFAULT_GROUP" + idKey = "id" + defaultPageSize = 100 ) // init will put the service discovery into extension @@ -92,7 +93,7 @@ func (n *nacosServiceDiscovery) Unregister(instance registry.ServiceInstance) er // GetDefaultPageSize will return the constant registry.DefaultPageSize func (n *nacosServiceDiscovery) GetDefaultPageSize() int { - return registry.DefaultPageSize + return registry.defaultPageSize } // GetServices will return the all services diff --git a/registry/nacos/service_discovery_test.go b/registry/nacos/service_discovery_test.go index a756e86693..04431a614b 100644 --- a/registry/nacos/service_discovery_test.go +++ b/registry/nacos/service_discovery_test.go @@ -113,7 +113,7 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) { func TestNacosServiceDiscovery_GetDefaultPageSize(t *testing.T) { serviceDiscovry, _ := extension.GetServiceDiscovery(constant.NACOS_KEY, mockUrl()) - assert.Equal(t, registry.DefaultPageSize, serviceDiscovry.GetDefaultPageSize()) + assert.Equal(t, defaultPageSize, serviceDiscovry.GetDefaultPageSize()) } func mockUrl() *common.URL { diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index a936db80bf..399b15ed27 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -117,6 +117,7 @@ func (proto *registryProtocol) initConfigurationListeners() { proto.providerConfigurationListener = newProviderConfigurationListener(proto.overrideListeners) } +// Refer provider service from registry center func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { var registryUrl = url var serviceUrl = registryUrl.SubURL @@ -156,6 +157,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { return invoker } +// Export provider service to registry center func (proto *registryProtocol) Export(invoker protocol.Invoker) protocol.Exporter { proto.once.Do(func() { proto.initConfigurationListeners() @@ -229,6 +231,7 @@ func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Inv return &overrideSubscribeListener{url: overriderUrl, originInvoker: invoker, protocol: proto} } +// Notify will triggered when a service change notification is received. func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) { if isMatched(&(event.Service), nl.url) && event.Action == remoting.EventTypeAdd { nl.configurator = extension.GetDefaultConfigurator(&(event.Service)) @@ -325,6 +328,7 @@ func getSubscribedOverrideUrl(providerUrl *common.URL) *common.URL { return newUrl } +// Destroy registry protocol func (proto *registryProtocol) Destroy() { for _, ivk := range proto.invokers { ivk.Destroy() @@ -389,6 +393,7 @@ func newWrappedInvoker(invoker protocol.Invoker, url *common.URL) *wrappedInvoke } } +// Invoke remote service base on URL of wrappedInvoker func (ivk *wrappedInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { // get right url ivk.invoker.(*proxy_factory.ProxyInvoker).BaseInvoker = *protocol.NewBaseInvoker(ivk.GetUrl()) @@ -411,6 +416,8 @@ func newProviderConfigurationListener(overrideListeners *sync.Map) *providerConf return listener } +// Process is Listener callback method. Listener gets notified by this method once there's any change happens on the config +// the listener listens on. func (listener *providerConfigurationListener) Process(event *config_center.ConfigChangeEvent) { listener.BaseConfigurationListener.Process(event) listener.overrideListeners.Range(func(key, value interface{}) bool { @@ -435,6 +442,8 @@ func newServiceConfigurationListener(overrideListener *overrideSubscribeListener return listener } +// Process is Listener callback method. Listener gets notified by this method once there's any change happens on the config +// the listener listens on. func (listener *serviceConfigurationListener) Process(event *config_center.ConfigChangeEvent) { listener.BaseConfigurationListener.Process(event) listener.overrideListener.doOverrideIfNecessary() diff --git a/registry/registry.go b/registry/registry.go index d673864700..1e110ada12 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -45,13 +45,21 @@ type Registry interface { Subscribe(*common.URL, NotifyListener) } -// NotifyListener ... +// NotifyListener is used for triggered when a service change notification is received. type NotifyListener interface { + // Notify needs to support the contract:
+ // 1. Always notifications on the service interface and the dimension of the data type. that is, won't notify part of the same type data belonging to one service. Users do not need to compare the results of the previous notification.
+ // 2. The first notification at a subscription must be a full notification of all types of data of a service.
+ // 3. At the time of change, different types of data are allowed to be notified separately, e.g.: providers, consumers, routers, overrides. It allows only one of these types to be notified, but the data of this type must be full, not incremental.
+ // 4. If a data type is empty, need to notify a empty protocol with category parameter identification of url data.
+ // 5. The order of notifications to be guaranteed by the notifications(That is, the implementation of the registry). Such as: single thread push, queue serialization, and version comparison.
Notify(*ServiceEvent) } // Listener Deprecated! type Listener interface { + // Next returns next service event once received Next() (*ServiceEvent, error) + // Close closes this listener Close() } diff --git a/registry/service_discovery.go b/registry/service_discovery.go index a8228a4abe..1d5a3593e3 100644 --- a/registry/service_discovery.go +++ b/registry/service_discovery.go @@ -26,8 +26,7 @@ import ( gxpage "github.com/dubbogo/gost/page" ) -const DefaultPageSize = 100 - +// ServiceDiscovery is the common operations of Service Discovery type ServiceDiscovery interface { fmt.Stringer diff --git a/registry/service_instance.go b/registry/service_instance.go index 2cc229ee3b..247c856765 100644 --- a/registry/service_instance.go +++ b/registry/service_instance.go @@ -17,6 +17,7 @@ package registry +// ServiceInstance is the model class of an instance of a service, which is used for service registration and discovery. type ServiceInstance interface { // GetId will return this instance's id. It should be unique. diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 88d5d6221b..fe40d29e76 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -86,12 +86,12 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) { return r, nil } -// Options ... +// nolint type Options struct { client *zookeeper.ZookeeperClient } -// Option ... +// nolint type Option func(*Options) func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestCluster, *zkRegistry, error) { @@ -116,6 +116,7 @@ func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestClust return c, r, nil } +// InitListeners init listeners of zookeeper registry center func (r *zkRegistry) InitListeners() { r.listener = zookeeper.NewZkEventListener(r.client) newDataListener := NewRegistryDataListener() @@ -141,35 +142,43 @@ func (r *zkRegistry) InitListeners() { r.dataListener = newDataListener } +// CreatePath create the path in the registry center of zookeeper func (r *zkRegistry) CreatePath(path string) error { return r.ZkClient().Create(path) } +// DoRegister actually do the register job in the registry center of zookeeper func (r *zkRegistry) DoRegister(root string, node string) error { return r.registerTempZookeeperNode(root, node) } +// DoSubscribe actually subscribe the provider URL func (r *zkRegistry) DoSubscribe(conf *common.URL) (registry.Listener, error) { return r.getListener(conf) } +// CloseAndNilClient closes listeners and clear client func (r *zkRegistry) CloseAndNilClient() { r.client.Close() r.client = nil } +// ZkClient gets zookeeper client func (r *zkRegistry) ZkClient() *zookeeper.ZookeeperClient { return r.client } +// SetZkClient sets zookeeper client func (r *zkRegistry) SetZkClient(client *zookeeper.ZookeeperClient) { r.client = client } +// ZkClientLock gets zookeeper registry control lock func (r *zkRegistry) ZkClientLock() *sync.Mutex { return &r.cltLock } +// CloseListener closes listeners func (r *zkRegistry) CloseListener() { if r.dataListener != nil { r.dataListener.Close() From cebc94adb3d542922aa245339a930e1d5af12ae8 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Sun, 7 Jun 2020 12:09:58 +0800 Subject: [PATCH 2/9] fix compile error --- registry/nacos/service_discovery.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go index b73a52a797..2611a8dc58 100644 --- a/registry/nacos/service_discovery.go +++ b/registry/nacos/service_discovery.go @@ -93,7 +93,7 @@ func (n *nacosServiceDiscovery) Unregister(instance registry.ServiceInstance) er // GetDefaultPageSize will return the constant registry.DefaultPageSize func (n *nacosServiceDiscovery) GetDefaultPageSize() int { - return registry.defaultPageSize + return defaultPageSize } // GetServices will return the all services From 6f2801c6a69b5e470ecc920b4b0347fd08801d2e Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Wed, 10 Jun 2020 22:50:19 +0800 Subject: [PATCH 3/9] fix comment --- config_center/configuration_listener.go | 3 +-- registry/protocol/protocol.go | 6 ++---- registry/registry.go | 7 +------ 3 files changed, 4 insertions(+), 12 deletions(-) diff --git a/config_center/configuration_listener.go b/config_center/configuration_listener.go index 8783baf1e8..096df1a586 100644 --- a/config_center/configuration_listener.go +++ b/config_center/configuration_listener.go @@ -27,8 +27,7 @@ import ( // ConfigurationListener for changing listener's event type ConfigurationListener interface { - // Process is Listener callback method. Listener gets notified by this method once there's any change happens on the config - // the listener listens on. + // Process notified once there's any change happens on the config Process(*ConfigChangeEvent) } diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 399b15ed27..68d8a9efb4 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -416,8 +416,7 @@ func newProviderConfigurationListener(overrideListeners *sync.Map) *providerConf return listener } -// Process is Listener callback method. Listener gets notified by this method once there's any change happens on the config -// the listener listens on. +// Process notified once there's any change happens on the provider config func (listener *providerConfigurationListener) Process(event *config_center.ConfigChangeEvent) { listener.BaseConfigurationListener.Process(event) listener.overrideListeners.Range(func(key, value interface{}) bool { @@ -442,8 +441,7 @@ func newServiceConfigurationListener(overrideListener *overrideSubscribeListener return listener } -// Process is Listener callback method. Listener gets notified by this method once there's any change happens on the config -// the listener listens on. +// Process notified once there's any change happens on the service config func (listener *serviceConfigurationListener) Process(event *config_center.ConfigChangeEvent) { listener.BaseConfigurationListener.Process(event) listener.overrideListener.doOverrideIfNecessary() diff --git a/registry/registry.go b/registry/registry.go index 1e110ada12..02f968a28f 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -47,12 +47,7 @@ type Registry interface { // NotifyListener is used for triggered when a service change notification is received. type NotifyListener interface { - // Notify needs to support the contract:
- // 1. Always notifications on the service interface and the dimension of the data type. that is, won't notify part of the same type data belonging to one service. Users do not need to compare the results of the previous notification.
- // 2. The first notification at a subscription must be a full notification of all types of data of a service.
- // 3. At the time of change, different types of data are allowed to be notified separately, e.g.: providers, consumers, routers, overrides. It allows only one of these types to be notified, but the data of this type must be full, not incremental.
- // 4. If a data type is empty, need to notify a empty protocol with category parameter identification of url data.
- // 5. The order of notifications to be guaranteed by the notifications(That is, the implementation of the registry). Such as: single thread push, queue serialization, and version comparison.
+ // Notify supports notifications on the service interface and the dimension of the data type. Notify(*ServiceEvent) } From 32df9e88eee58be071d7aff665237e6af6161597 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Sat, 13 Jun 2020 22:38:38 +0800 Subject: [PATCH 4/9] fix review comment --- registry/base_configuration_listener.go | 2 +- registry/nacos/listener.go | 2 +- registry/registry.go | 2 +- registry/zookeeper/registry.go | 6 +++--- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/registry/base_configuration_listener.go b/registry/base_configuration_listener.go index 4dd484deb2..855af648c2 100644 --- a/registry/base_configuration_listener.go +++ b/registry/base_configuration_listener.go @@ -29,7 +29,7 @@ import ( "github.com/apache/dubbo-go/remoting" ) -// BaseConfigurationListener will get notified when the config it listens on changes. +// nolint type BaseConfigurationListener struct { configurators []config_center.Configurator dynamicConfiguration config_center.DynamicConfiguration diff --git a/registry/nacos/listener.go b/registry/nacos/listener.go index c53f2ccd8a..9af731801c 100644 --- a/registry/nacos/listener.go +++ b/registry/nacos/listener.go @@ -214,7 +214,7 @@ func (nl *nacosListener) Next() (*registry.ServiceEvent, error) { } } -// Close nacos registry center +// nolint func (nl *nacosListener) Close() { nl.stopListen() close(nl.done) diff --git a/registry/registry.go b/registry/registry.go index 02f968a28f..5b37aa684c 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -45,7 +45,7 @@ type Registry interface { Subscribe(*common.URL, NotifyListener) } -// NotifyListener is used for triggered when a service change notification is received. +// nolint type NotifyListener interface { // Notify supports notifications on the service interface and the dimension of the data type. Notify(*ServiceEvent) diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index fe40d29e76..e7ae2279f8 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -163,17 +163,17 @@ func (r *zkRegistry) CloseAndNilClient() { r.client = nil } -// ZkClient gets zookeeper client +// nolint func (r *zkRegistry) ZkClient() *zookeeper.ZookeeperClient { return r.client } -// SetZkClient sets zookeeper client +// nolint func (r *zkRegistry) SetZkClient(client *zookeeper.ZookeeperClient) { r.client = client } -// ZkClientLock gets zookeeper registry control lock +// nolint func (r *zkRegistry) ZkClientLock() *sync.Mutex { return &r.cltLock } From e17d8e1cdd2169d10294759924972575eaac91d5 Mon Sep 17 00:00:00 2001 From: cvictory Date: Tue, 16 Jun 2020 14:09:37 +0800 Subject: [PATCH 5/9] add setInvoker method for invocation --- protocol/invocation/rpcinvocation.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index 7f2cede929..1e820b0e9c 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -155,6 +155,11 @@ func (r *RPCInvocation) Invoker() protocol.Invoker { return r.invoker } +// SetInvoker ... +func (r *RPCInvocation) SetInvoker(invoker protocol.Invoker) { + r.invoker = invoker +} + // CallBack sets RPC callback method. func (r *RPCInvocation) CallBack() interface{} { return r.callBack From 2e62486b85a60a52d152c91c1862e722bb22044e Mon Sep 17 00:00:00 2001 From: cvictory Date: Tue, 16 Jun 2020 14:11:41 +0800 Subject: [PATCH 6/9] add setInvoker method for invocation --- protocol/invocation/rpcinvocation.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index 1e820b0e9c..b8b5b50970 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -155,7 +155,7 @@ func (r *RPCInvocation) Invoker() protocol.Invoker { return r.invoker } -// SetInvoker ... +// nolint func (r *RPCInvocation) SetInvoker(invoker protocol.Invoker) { r.invoker = invoker } From d109e557fe549251fab05d4b214d3868dbca8608 Mon Sep 17 00:00:00 2001 From: Joe Zou Date: Thu, 18 Jun 2020 22:46:53 +0800 Subject: [PATCH 7/9] fix review comment --- config_center/configuration_listener.go | 2 +- registry/base_configuration_listener.go | 8 ++++---- registry/consul/listener.go | 2 +- registry/consul/registry.go | 2 +- registry/etcdv3/listener.go | 4 ++-- registry/event.go | 2 +- registry/event_listener.go | 2 +- registry/kubernetes/listener.go | 4 ++-- registry/mock_registry.go | 14 +++++++------- registry/nacos/listener.go | 4 ++-- registry/nacos/registry.go | 10 +++++----- registry/protocol/protocol.go | 2 +- registry/zookeeper/registry.go | 6 +++--- 13 files changed, 31 insertions(+), 31 deletions(-) diff --git a/config_center/configuration_listener.go b/config_center/configuration_listener.go index 096df1a586..97fd9c7092 100644 --- a/config_center/configuration_listener.go +++ b/config_center/configuration_listener.go @@ -27,7 +27,7 @@ import ( // ConfigurationListener for changing listener's event type ConfigurationListener interface { - // Process notified once there's any change happens on the config + // Process the notification event once there's any change happens on the config Process(*ConfigChangeEvent) } diff --git a/registry/base_configuration_listener.go b/registry/base_configuration_listener.go index 855af648c2..7b28d7ee1b 100644 --- a/registry/base_configuration_listener.go +++ b/registry/base_configuration_listener.go @@ -41,7 +41,7 @@ func (bcl *BaseConfigurationListener) Configurators() []config_center.Configurat return bcl.configurators } -// InitWith will init BaseConfigurationListener with @key 、@listener and @f +// InitWith will init BaseConfigurationListener by @key+@Listener+@f func (bcl *BaseConfigurationListener) InitWith(key string, listener config_center.ConfigurationListener, f func(url *common.URL) config_center.Configurator) { bcl.dynamicConfiguration = config.GetEnvInstance().GetDynamicConfiguration() if bcl.dynamicConfiguration == nil { @@ -60,7 +60,7 @@ func (bcl *BaseConfigurationListener) InitWith(key string, listener config_cente } } -// Process can reference ConfigurationListener.Process +// Process the notification event once there's any change happens on the config. func (bcl *BaseConfigurationListener) Process(event *config_center.ConfigChangeEvent) { logger.Infof("Notification of overriding rule, change type is: %v , raw config content is:%v", event.ConfigType, event.Value) if event.ConfigType == remoting.EventTypeDel { @@ -82,14 +82,14 @@ func (bcl *BaseConfigurationListener) genConfiguratorFromRawRule(rawConfig strin return nil } -// OverrideUrl gets existing configuration rule and override provider url before exporting. +// OverrideUrl gets existing configuration rule and overrides provider url before exporting. func (bcl *BaseConfigurationListener) OverrideUrl(url *common.URL) { for _, v := range bcl.configurators { v.Configure(url) } } -// ToConfigurators converts override urls to map for use when re-refer. Send all rules every time, the urls will be reassembled and calculated +// ToConfigurators converts @urls by @f to config_center.Configurators func ToConfigurators(urls []*common.URL, f func(url *common.URL) config_center.Configurator) []config_center.Configurator { if len(urls) == 0 { return nil diff --git a/registry/consul/listener.go b/registry/consul/listener.go index 841ecbacc5..cf3888dd16 100644 --- a/registry/consul/listener.go +++ b/registry/consul/listener.go @@ -187,7 +187,7 @@ func (l *consulListener) handler(idx uint64, raw interface{}) { } } -// Next returns next service event once received +// Next returns the service event from consul. func (l *consulListener) Next() (*registry.ServiceEvent, error) { select { case event := <-l.eventCh: diff --git a/registry/consul/registry.go b/registry/consul/registry.go index c03f2c06ff..4ef8739468 100644 --- a/registry/consul/registry.go +++ b/registry/consul/registry.go @@ -163,7 +163,7 @@ func (r *consulRegistry) GetUrl() common.URL { return *r.URL } -// IsAvailable determines consul registry center whether is available +// IsAvailable checks consul registry center whether is available func (r *consulRegistry) IsAvailable() bool { select { case <-r.done: diff --git a/registry/etcdv3/listener.go b/registry/etcdv3/listener.go index 0ecdbf7af8..436b6eca5b 100644 --- a/registry/etcdv3/listener.go +++ b/registry/etcdv3/listener.go @@ -43,12 +43,12 @@ func NewRegistryDataListener(listener config_center.ConfigurationListener) *data return &dataListener{listener: listener} } -// AddInterestedURL add more URL of registry center to listen +// AddInterestedURL adds a registration @url to listen func (l *dataListener) AddInterestedURL(url *common.URL) { l.interestedURL = append(l.interestedURL, url) } -// Process data change event from registry center of etcd +// DataChange processes the data change event from registry center of etcd func (l *dataListener) DataChange(eventType remoting.Event) bool { index := strings.Index(eventType.Path, "/providers/") diff --git a/registry/event.go b/registry/event.go index 00fe183a98..5fe6df6a37 100644 --- a/registry/event.go +++ b/registry/event.go @@ -36,7 +36,7 @@ func init() { // service event // //////////////////////////////////////// -// ServiceEvent is create、update or delete event of service +// ServiceEvent includes create, update, delete event type ServiceEvent struct { Action remoting.EventType Service common.URL diff --git a/registry/event_listener.go b/registry/event_listener.go index 7ab011a549..1805f2833c 100644 --- a/registry/event_listener.go +++ b/registry/event_listener.go @@ -36,7 +36,7 @@ type ConditionalEventListener interface { Accept(e Event) bool } -// ServiceInstancesChangedListener is use for the Service Discovery Changed +// ServiceInstancesChangedListener is used when the Service Discovery Changed // TODO (implement ConditionalEventListener) type ServiceInstancesChangedListener struct { ServiceName string diff --git a/registry/kubernetes/listener.go b/registry/kubernetes/listener.go index 3da0116600..24c8d81614 100644 --- a/registry/kubernetes/listener.go +++ b/registry/kubernetes/listener.go @@ -43,7 +43,7 @@ func NewRegistryDataListener(listener config_center.ConfigurationListener) *data return &dataListener{listener: listener} } -// AddInterestedURL add more URL of registry center to listen +// AddInterestedURL adds the @url of registry center to the listener func (l *dataListener) AddInterestedURL(url *common.URL) { l.interestedURL = append(l.interestedURL, url) } @@ -91,7 +91,7 @@ func NewConfigurationListener(reg *kubernetesRegistry) *configurationListener { return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)} } -// Process data change event from config center of kubernetes +// Process processes the data change event from config center of kubernetes func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) { l.events <- configType } diff --git a/registry/mock_registry.go b/registry/mock_registry.go index 7dbcbbd4e2..2b83d5ab88 100644 --- a/registry/mock_registry.go +++ b/registry/mock_registry.go @@ -30,13 +30,13 @@ import ( "github.com/apache/dubbo-go/common/logger" ) -// MockRegistry is use for mock registry +// MockRegistry is used as mock registry type MockRegistry struct { listener *listener destroyed *atomic.Bool } -// NewMockRegistry is use for create a mock registry +// NewMockRegistry creates a mock registry func NewMockRegistry(url *common.URL) (Registry, error) { registry := &MockRegistry{ destroyed: atomic.NewBool(false), @@ -46,12 +46,12 @@ func NewMockRegistry(url *common.URL) (Registry, error) { return registry, nil } -// Register is use for register a mock registry +// Register is used as a mock registry func (*MockRegistry) Register(url common.URL) error { return nil } -// Destroy is use for destory a mock registry +// nolint func (r *MockRegistry) Destroy() { if r.destroyed.CAS(false, true) { } @@ -62,7 +62,7 @@ func (r *MockRegistry) IsAvailable() bool { return !r.destroyed.Load() } -// GetUrl is use for register a mock registry URL +// nolint func (r *MockRegistry) GetUrl() common.URL { return common.URL{} } @@ -71,7 +71,7 @@ func (r *MockRegistry) subscribe(*common.URL) (Listener, error) { return r.listener, nil } -// Subscribe is use for subscribe a mock registry +// nolint func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) { go func() { for { @@ -123,7 +123,7 @@ func (*listener) Close() { } -// MockEvent is use for register a mock event +// nolint func (r *MockRegistry) MockEvent(event *ServiceEvent) { r.listener.listenChan <- event } diff --git a/registry/nacos/listener.go b/registry/nacos/listener.go index 9af731801c..36f733df5a 100644 --- a/registry/nacos/listener.go +++ b/registry/nacos/listener.go @@ -109,7 +109,7 @@ func generateUrl(instance model.Instance) *common.URL { ) } -// Callback will callback when subscribed +// Callback will be invoked when got subscribed events. func (nl *nacosListener) Callback(services []model.SubscribeService, err error) { if err != nil { logger.Errorf("nacos subscribe callback error:%s , subscribe:%+v ", err.Error(), nl.subscribeParam) @@ -199,7 +199,7 @@ func (nl *nacosListener) process(configType *config_center.ConfigChangeEvent) { nl.events <- configType } -// Next returns next service event once received +// Next returns the service event from nacos. func (nl *nacosListener) Next() (*registry.ServiceEvent, error) { for { select { diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go index 345e5145e0..3eeb7680ab 100644 --- a/registry/nacos/registry.go +++ b/registry/nacos/registry.go @@ -123,7 +123,7 @@ func createRegisterParam(url common.URL, serviceName string) vo.RegisterInstance return instance } -// Register service to nacos registry center +// Register will register the service @url to its nacos registry center func (nr *nacosRegistry) Register(url common.URL) error { serviceName := getServiceName(url) param := createRegisterParam(url, serviceName) @@ -141,7 +141,7 @@ func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error) return NewNacosListener(*conf, nr.namingClient) } -//subscribe from registry +// subscribe from registry func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) { for { if !nr.IsAvailable() { @@ -175,17 +175,17 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti } } -// GetUrl get registry URL of nacos registry center +// GetUrl gets its registration URL func (nr *nacosRegistry) GetUrl() common.URL { return *nr.URL } -// IsAvailable determines nacos registry center whether is available +// IsAvailable determines nacos registry center whether it is available func (nr *nacosRegistry) IsAvailable() bool { return true } -// Destroy nacos registry center +// nolint func (nr *nacosRegistry) Destroy() { return } diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 68d8a9efb4..4c669b2cee 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -231,7 +231,7 @@ func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Inv return &overrideSubscribeListener{url: overriderUrl, originInvoker: invoker, protocol: proto} } -// Notify will triggered when a service change notification is received. +// Notify will be triggered when a service change notification is received. func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) { if isMatched(&(event.Service), nl.url) && event.Action == remoting.EventTypeAdd { nl.configurator = extension.GetDefaultConfigurator(&(event.Service)) diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index e7ae2279f8..1e7bd08ade 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -116,7 +116,7 @@ func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestClust return c, r, nil } -// InitListeners init listeners of zookeeper registry center +// InitListeners initializes listeners of zookeeper registry center func (r *zkRegistry) InitListeners() { r.listener = zookeeper.NewZkEventListener(r.client) newDataListener := NewRegistryDataListener() @@ -142,7 +142,7 @@ func (r *zkRegistry) InitListeners() { r.dataListener = newDataListener } -// CreatePath create the path in the registry center of zookeeper +// CreatePath creates the path in the registry center of zookeeper func (r *zkRegistry) CreatePath(path string) error { return r.ZkClient().Create(path) } @@ -152,7 +152,7 @@ func (r *zkRegistry) DoRegister(root string, node string) error { return r.registerTempZookeeperNode(root, node) } -// DoSubscribe actually subscribe the provider URL +// DoSubscribe actually subscribes the provider URL func (r *zkRegistry) DoSubscribe(conf *common.URL) (registry.Listener, error) { return r.getListener(conf) } From e3d77705971a058f06fdd48d41055fa2ce4d09c1 Mon Sep 17 00:00:00 2001 From: flycash Date: Sun, 21 Jun 2020 15:38:31 +0800 Subject: [PATCH 8/9] Fix UT --- metadata/service/inmemory/service_proxy.go | 4 - registry/etcdv3/service_discovery_test.go | 105 ++------------------- 2 files changed, 8 insertions(+), 101 deletions(-) diff --git a/metadata/service/inmemory/service_proxy.go b/metadata/service/inmemory/service_proxy.go index 840bd9a9d5..7e01439f04 100644 --- a/metadata/service/inmemory/service_proxy.go +++ b/metadata/service/inmemory/service_proxy.go @@ -20,7 +20,6 @@ package inmemory import ( "context" "reflect" - "time" ) import ( @@ -59,10 +58,7 @@ func (m *MetadataServiceProxy) GetExportedURLs(serviceInterface string, group st invocation.WithAttachments(map[string]string{constant.ASYNC_KEY: "false"}), invocation.WithParameterValues([]reflect.Value{siV, gV, vV, pV})) - start := time.Now() res := m.invkr.Invoke(context.Background(), inv) - end := time.Now() - logger.Infof("duration: %s, result: %v", (end.Sub(start)).String(), res.Result()) if res.Error() != nil { logger.Errorf("could not get the metadata service from remote provider: %v", res.Error()) return []interface{}{}, nil diff --git a/registry/etcdv3/service_discovery_test.go b/registry/etcdv3/service_discovery_test.go index ff3708e6f3..c03973cb14 100644 --- a/registry/etcdv3/service_discovery_test.go +++ b/registry/etcdv3/service_discovery_test.go @@ -1,14 +1,13 @@ package etcdv3 import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/common/observer" - "github.com/apache/dubbo-go/common/observer/dispatcher" "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/registry" - "github.com/stretchr/testify/assert" - "testing" ) var testName = "test" @@ -20,8 +19,8 @@ func setUp() { } config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{ - Address: "localhost:2380", - TimeoutStr: "1000s", + Address: "localhost:2379", + TimeoutStr: "10s", } } @@ -44,7 +43,7 @@ func Test_newEtcdV3ServiceDiscovery(t *testing.T) { assert.NotNil(t, err) config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{ - Address: "localhost:2380", + Address: "localhost:2379", TimeoutStr: "10s", } @@ -53,96 +52,8 @@ func Test_newEtcdV3ServiceDiscovery(t *testing.T) { assert.NotNil(t, res) } -func TestEtcdV3ServiceDiscovery_Destroy(t *testing.T) { - setUp() - serviceDiscovery, err := extension.GetServiceDiscovery(constant.ETCDV3_KEY, testName) - - assert.Nil(t, err) - assert.NotNil(t, serviceDiscovery) - - err = serviceDiscovery.Destroy() - assert.Nil(t, err) - assert.NotNil(t, serviceDiscovery.(*etcdV3ServiceDiscovery).client) -} - -func TestEtcdV3ServiceDiscovery_CRUD(t *testing.T) { - setUp() - extension.SetEventDispatcher("mock", func() observer.EventDispatcher { - return &dispatcher.MockEventDispatcher{} - }) - - extension.SetAndInitGlobalDispatcher("mock") - - serviceName := "service-name" - id := "id" - host := "host" - port := 123 - instance := ®istry.DefaultServiceInstance{ - Id: id, - ServiceName: serviceName, - Host: host, - Port: port, - Enable: true, - Healthy: true, - Metadata: nil, - } - - // clean data - - serviceDiscovry, _ := extension.GetServiceDiscovery(constant.ETCDV3_KEY, testName) - - // clean data for local test - serviceDiscovry.Unregister(®istry.DefaultServiceInstance{ - Id: id, - ServiceName: serviceName, - Host: host, - Port: port, - }) - - err := serviceDiscovry.Register(instance) - assert.Nil(t, err) - - page := serviceDiscovry.GetHealthyInstancesByPage(serviceName, 0, 10, true) - assert.NotNil(t, page) - - assert.Equal(t, 0, page.GetOffset()) - assert.Equal(t, 10, page.GetPageSize()) - assert.Equal(t, 1, page.GetDataSize()) - - instance = page.GetData()[0].(*registry.DefaultServiceInstance) - assert.NotNil(t, instance) - assert.Equal(t, id, instance.GetId()) - assert.Equal(t, host, instance.GetHost()) - assert.Equal(t, port, instance.GetPort()) - assert.Equal(t, serviceName, instance.GetServiceName()) - assert.Equal(t, 0, len(instance.GetMetadata())) - - instance.Metadata["a"] = "b" - - err = serviceDiscovry.Update(instance) - assert.Nil(t, err) - - pageMap := serviceDiscovry.GetRequestInstances([]string{serviceName}, 0, 1) - assert.Equal(t, 1, len(pageMap)) - page = pageMap[serviceName] - assert.NotNil(t, page) - assert.Equal(t, 1, len(page.GetData())) - - instance = page.GetData()[0].(*registry.DefaultServiceInstance) - v, _ := instance.Metadata["a"] - assert.Equal(t, "b", v) - - // test dispatcher event - err = serviceDiscovry.DispatchEventByServiceName(serviceName) - assert.Nil(t, err) - - // test AddListener - err = serviceDiscovry.AddListener(®istry.ServiceInstancesChangedListener{ServiceName: serviceName}) - assert.Nil(t, err) -} - func TestEtcdV3ServiceDiscovery_GetDefaultPageSize(t *testing.T) { setUp() - serviceDiscovry, _ := extension.GetServiceDiscovery(constant.ETCDV3_KEY, testName) + serviceDiscovry := &etcdV3ServiceDiscovery{} assert.Equal(t, registry.DefaultPageSize, serviceDiscovry.GetDefaultPageSize()) } From facab2ba9600005bc3ed977c6a66f554ded8f339 Mon Sep 17 00:00:00 2001 From: flycash Date: Mon, 22 Jun 2020 21:57:41 +0800 Subject: [PATCH 9/9] Fix Etcd BUG --- common/extension/service_discovery.go | 2 +- metadata/report/etcd/report.go | 2 +- registry/etcdv3/service_discovery.go | 9 +++++++-- remoting/etcdv3/client.go | 2 +- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/common/extension/service_discovery.go b/common/extension/service_discovery.go index 456b14c83d..0227920dc6 100644 --- a/common/extension/service_discovery.go +++ b/common/extension/service_discovery.go @@ -42,7 +42,7 @@ func SetServiceDiscovery(protocol string, creator func(name string) (registry.Se func GetServiceDiscovery(protocol string, name string) (registry.ServiceDiscovery, error) { creator, ok := discoveryCreatorMap[protocol] if !ok { - return nil, perrors.New("Could not find the service discovery with name: " + name) + return nil, perrors.New("Could not find the service discovery with discovery protocol: " + protocol) } return creator(name) } diff --git a/metadata/report/etcd/report.go b/metadata/report/etcd/report.go index 0d49ff1783..9db0b577bf 100644 --- a/metadata/report/etcd/report.go +++ b/metadata/report/etcd/report.go @@ -41,7 +41,7 @@ import ( const DEFAULT_ROOT = "dubbo" func init() { - extension.SetMetadataReportFactory("etcd", func() factory.MetadataReportFactory { + extension.SetMetadataReportFactory(constant.ETCDV3_KEY, func() factory.MetadataReportFactory { return &etcdMetadataReportFactory{} }) } diff --git a/registry/etcdv3/service_discovery.go b/registry/etcdv3/service_discovery.go index f47d5324c4..e435a556e5 100644 --- a/registry/etcdv3/service_discovery.go +++ b/registry/etcdv3/service_discovery.go @@ -89,8 +89,13 @@ func (e *etcdV3ServiceDiscovery) Register(instance registry.ServiceInstance) err if nil != e.client { ins, err := jsonutil.EncodeJSON(instance) if err == nil { - e.client.Create(path, string(ins)) - e.services.Add(instance.GetServiceName()) + err = e.client.Update(path, string(ins)) + if err != nil { + logger.Errorf("cannot register the instance: %s", string(ins), err) + } else { + e.services.Add(instance.GetServiceName()) + } + } } diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go index 93da6402c7..a24a69232e 100644 --- a/remoting/etcdv3/client.go +++ b/remoting/etcdv3/client.go @@ -144,7 +144,7 @@ func NewServiceDiscoveryClient(opts ...Option) *Client { newClient, err := NewClient(options.name, options.endpoints, options.timeout, options.heartbeat) if err != nil { - logger.Warnf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}", + logger.Errorf("new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}", options.name, options.endpoints, options.timeout, err) return nil }