Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.7.5 bk #8

Merged
2 changes: 1 addition & 1 deletion common/extension/event_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func SetAndInitGlobalDispatcher(name string) {

if dp, ok := dispatchers[name]; !ok || dp == nil {
panic("EventDispatcher for " + name + " is not found, make sure you have import the package, " +
"like github.com/apache/dubbo-go/common/observer/dispatcher ")
"like import _ github.com/apache/dubbo-go/common/observer/dispatcher ")
}
globalEventDispatcher = dispatchers[name]()
}
Expand Down
2 changes: 1 addition & 1 deletion common/extension/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func SetServiceDiscovery(protocol string, creator func(name string) (registry.Se
func GetServiceDiscovery(protocol string, name string) (registry.ServiceDiscovery, error) {
creator, ok := discoveryCreatorMap[protocol]
if !ok {
return nil, perrors.New("Could not find the service discovery with name: " + name)
return nil, perrors.New("Could not find the service discovery with discovery protocol: " + protocol)
}
return creator(name)
}
3 changes: 3 additions & 0 deletions common/rpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ func suiteMethod(method reflect.Method) *MethodType {

// this method is in RPCService
// we force users must implement RPCService interface in their provider
// and RPCService has only one method "Reference"
// In general, this method should not be exported to client
// so we ignore this method
// see RPCService
if mname == "Reference" {
return nil
Expand Down
4 changes: 2 additions & 2 deletions config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type ServiceConfig struct {
exporters []protocol.Exporter
}

// Prefix return dubbo.service.${interface}.
// Prefix returns dubbo.service.${interface}.
func (c *ServiceConfig) Prefix() string {
return constant.ServiceConfigPrefix + c.InterfaceName + "."
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List {
return ports
}

// Export export the service
// Export exports the service
func (c *ServiceConfig) Export() error {
// TODO: config center start here

Expand Down
1 change: 1 addition & 0 deletions config_center/configuration_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

// ConfigurationListener for changing listener's event
type ConfigurationListener interface {
// Process the notification event once there's any change happens on the config
Process(*ConfigChangeEvent)
}

Expand Down
55 changes: 55 additions & 0 deletions metadata/mapping/memory/service_name_mapping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package memory

import (
"sync"
)

import (
gxset "github.com/dubbogo/gost/container/set"
)
import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/metadata/mapping"
)

func init() {
extension.SetGlobalServiceNameMapping(GetNameMappingInstance)
}

type InMemoryServiceNameMapping struct{}

func (i *InMemoryServiceNameMapping) Map(serviceInterface string, group string, version string, protocol string) error {
return nil
}

func (i *InMemoryServiceNameMapping) Get(serviceInterface string, group string, version string, protocol string) (*gxset.HashSet, error) {
return gxset.NewSet(config.GetApplicationConfig().Name), nil
}

var serviceNameMappingInstance *InMemoryServiceNameMapping
var serviceNameMappingOnce sync.Once

func GetNameMappingInstance() mapping.ServiceNameMapping {
serviceNameMappingOnce.Do(func() {
serviceNameMappingInstance = &InMemoryServiceNameMapping{}
})
return serviceNameMappingInstance
}
2 changes: 1 addition & 1 deletion metadata/report/etcd/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
const DEFAULT_ROOT = "dubbo"

func init() {
extension.SetMetadataReportFactory("etcd", func() factory.MetadataReportFactory {
extension.SetMetadataReportFactory(constant.ETCDV3_KEY, func() factory.MetadataReportFactory {
return &etcdMetadataReportFactory{}
})
}
Expand Down
4 changes: 0 additions & 4 deletions metadata/service/inmemory/service_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package inmemory
import (
"context"
"reflect"
"time"
)

import (
Expand Down Expand Up @@ -59,10 +58,7 @@ func (m *MetadataServiceProxy) GetExportedURLs(serviceInterface string, group st
invocation.WithAttachments(map[string]string{constant.ASYNC_KEY: "false"}),
invocation.WithParameterValues([]reflect.Value{siV, gV, vV, pV}))

start := time.Now()
res := m.invkr.Invoke(context.Background(), inv)
end := time.Now()
logger.Infof("duration: %s, result: %v", (end.Sub(start)).String(), res.Result())
if res.Error() != nil {
logger.Errorf("could not get the metadata service from remote provider: %v", res.Error())
return []interface{}{}, nil
Expand Down
5 changes: 5 additions & 0 deletions protocol/invocation/rpcinvocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ func (r *RPCInvocation) Invoker() protocol.Invoker {
return r.invoker
}

// nolint
func (r *RPCInvocation) SetInvoker(invoker protocol.Invoker) {
r.invoker = invoker
}

// CallBack sets RPC callback method.
func (r *RPCInvocation) CallBack() interface{} {
return r.callBack
Expand Down
12 changes: 6 additions & 6 deletions registry/base_configuration_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ import (
"github.com/apache/dubbo-go/remoting"
)

// BaseConfigurationListener ...
// nolint
type BaseConfigurationListener struct {
configurators []config_center.Configurator
dynamicConfiguration config_center.DynamicConfiguration
defaultConfiguratorFunc func(url *common.URL) config_center.Configurator
}

// Configurators ...
// Configurators gets Configurator from config center
func (bcl *BaseConfigurationListener) Configurators() []config_center.Configurator {
return bcl.configurators
}

// InitWith ...
// InitWith will init BaseConfigurationListener by @key+@Listener+@f
func (bcl *BaseConfigurationListener) InitWith(key string, listener config_center.ConfigurationListener, f func(url *common.URL) config_center.Configurator) {
bcl.dynamicConfiguration = config.GetEnvInstance().GetDynamicConfiguration()
if bcl.dynamicConfiguration == nil {
Expand All @@ -60,7 +60,7 @@ func (bcl *BaseConfigurationListener) InitWith(key string, listener config_cente
}
}

// Process ...
// Process the notification event once there's any change happens on the config.
func (bcl *BaseConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
logger.Infof("Notification of overriding rule, change type is: %v , raw config content is:%v", event.ConfigType, event.Value)
if event.ConfigType == remoting.EventTypeDel {
Expand All @@ -82,14 +82,14 @@ func (bcl *BaseConfigurationListener) genConfiguratorFromRawRule(rawConfig strin
return nil
}

// OverrideUrl ...
// OverrideUrl gets existing configuration rule and overrides provider url before exporting.
func (bcl *BaseConfigurationListener) OverrideUrl(url *common.URL) {
for _, v := range bcl.configurators {
v.Configure(url)
}
}

// ToConfigurators ...
// ToConfigurators converts @urls by @f to config_center.Configurators
func ToConfigurators(urls []*common.URL, f func(url *common.URL) config_center.Configurator) []config_center.Configurator {
if len(urls) == 0 {
return nil
Expand Down
2 changes: 2 additions & 0 deletions registry/consul/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func (l *consulListener) handler(idx uint64, raw interface{}) {
}
}

// Next returns the service event from consul.
func (l *consulListener) Next() (*registry.ServiceEvent, error) {
select {
case event := <-l.eventCh:
Expand All @@ -196,6 +197,7 @@ func (l *consulListener) Next() (*registry.ServiceEvent, error) {
}
}

// Close closes this listener
func (l *consulListener) Close() {
close(l.done)
l.plan.Stop()
Expand Down
8 changes: 5 additions & 3 deletions registry/consul/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ import (
)

const (
// RegistryConnDelay ...
RegistryConnDelay = 3
registryConnDelay = 3
)

func init() {
Expand Down Expand Up @@ -148,7 +147,7 @@ func (r *consulRegistry) subscribe(url *common.URL, notifyListener registry.Noti
return
}
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
time.Sleep(time.Duration(registryConnDelay) * time.Second)
continue
}

Expand All @@ -171,10 +170,12 @@ func (r *consulRegistry) getListener(url common.URL) (registry.Listener, error)
return listener, err
}

// GetUrl get registry URL of consul registry center
func (r *consulRegistry) GetUrl() common.URL {
return *r.URL
}

// IsAvailable checks consul registry center whether is available
func (r *consulRegistry) IsAvailable() bool {
select {
case <-r.done:
Expand All @@ -184,6 +185,7 @@ func (r *consulRegistry) IsAvailable() bool {
}
}

// Destroy consul registry center
func (r *consulRegistry) Destroy() {
close(r.done)
}
2 changes: 2 additions & 0 deletions registry/directory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func init() {
extension.SetDefaultRegistryDirectory(NewRegistryDirectory)
}

// RegistryDirectory implementation of Directory:
// Invoker list returned from this Directory's list method have been filtered by Routers
type RegistryDirectory struct {
directory.BaseDirectory
cacheInvokers []protocol.Invoker
Expand Down
7 changes: 6 additions & 1 deletion registry/etcdv3/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,17 @@ type dataListener struct {
listener config_center.ConfigurationListener
}

// NewRegistryDataListener
// NewRegistryDataListener creates a data listener for etcd
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener}
}

// AddInterestedURL adds a registration @url to listen
func (l *dataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
}

// DataChange processes the data change event from registry center of etcd
func (l *dataListener) DataChange(eventType remoting.Event) bool {

index := strings.Index(eventType.Path, "/providers/")
Expand Down Expand Up @@ -88,10 +90,12 @@ func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
}

// Process data change event from config center of etcd
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
}

// Next returns next service event once received
func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
Expand All @@ -114,6 +118,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
}
}

// Close etcd registry center
func (l *configurationListener) Close() {
l.registry.WaitGroup().Done()
}
7 changes: 7 additions & 0 deletions registry/etcdv3/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,31 +104,37 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
return r, nil
}

// InitListeners init listeners of etcd registry center
func (r *etcdV3Registry) InitListeners() {
r.listener = etcdv3.NewEventListener(r.client)
r.configListener = NewConfigurationListener(r)
r.dataListener = NewRegistryDataListener(r.configListener)
}

// DoRegister actually do the register job in the registry center of etcd
func (r *etcdV3Registry) DoRegister(root string, node string) error {
return r.client.Create(path.Join(root, node), "")
}

// nolint
func (r *etcdV3Registry) DoUnregister(root string, node string) error {
return perrors.New("DoUnregister is not support in etcdV3Registry")
}

// CloseAndNilClient closes listeners and clear client
func (r *etcdV3Registry) CloseAndNilClient() {
r.client.Close()
r.client = nil
}

// CloseListener closes listeners
func (r *etcdV3Registry) CloseListener() {
if r.configListener != nil {
r.configListener.Close()
}
}

// CreatePath create the path in the registry center of etcd
func (r *etcdV3Registry) CreatePath(k string) error {
var tmpPath string
for _, str := range strings.Split(k, "/")[1:] {
Expand All @@ -141,6 +147,7 @@ func (r *etcdV3Registry) CreatePath(k string) error {
return nil
}

// DoSubscribe actually subscribe the provider URL
func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) {

var (
Expand Down
27 changes: 19 additions & 8 deletions registry/etcdv3/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,25 @@ package etcdv3

import (
"fmt"
"sync"
"time"
)

import (
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
"github.com/hashicorp/vault/helper/jsonutil"
perrors "github.com/pkg/errors"
)

import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting"
"github.com/apache/dubbo-go/remoting/etcdv3"
gxset "github.com/dubbogo/gost/container/set"
gxpage "github.com/dubbogo/gost/page"
"github.com/hashicorp/vault/helper/jsonutil"
perrors "github.com/pkg/errors"
"sync"
"time"
)

const (
Expand Down Expand Up @@ -83,8 +89,13 @@ func (e *etcdV3ServiceDiscovery) Register(instance registry.ServiceInstance) err
if nil != e.client {
ins, err := jsonutil.EncodeJSON(instance)
if err == nil {
e.client.Create(path, string(ins))
e.services.Add(instance.GetServiceName())
err = e.client.Update(path, string(ins))
if err != nil {
logger.Errorf("cannot register the instance: %s", string(ins), err)
} else {
e.services.Add(instance.GetServiceName())
}

}
}

Expand Down
Loading