diff --git a/common/config/environment.go b/common/config/environment.go index 998f0beefd..8709d69a78 100644 --- a/common/config/environment.go +++ b/common/config/environment.go @@ -63,32 +63,40 @@ func (env *Environment) UpdateExternalConfigMap(externalMap map[string]string) { func (env *Environment) Configuration() *list.List { list := list.New() memConf := newInmemoryConfiguration() - memConf.setProperties(env.externalConfigMap) + memConf.setProperties(&(env.externalConfigMap)) list.PushBack(memConf) return list } type InmemoryConfiguration struct { - store sync.Map + store *sync.Map } func newInmemoryConfiguration() *InmemoryConfiguration { return &InmemoryConfiguration{} } -func (conf *InmemoryConfiguration) setProperties(p sync.Map) { +func (conf *InmemoryConfiguration) setProperties(p *sync.Map) { conf.store = p } func (conf *InmemoryConfiguration) GetProperty(key string) (bool, string) { + if conf.store == nil { + return false, "" + } + v, ok := conf.store.Load(key) if ok { return true, v.(string) } - return false, "" + return false, "" } func (conf *InmemoryConfiguration) GetSubProperty(subKey string) map[string]struct{} { + if conf.store == nil { + return nil + } + properties := make(map[string]struct{}) conf.store.Range(func(key, value interface{}) bool { if idx := strings.Index(key.(string), subKey); idx >= 0 { @@ -100,5 +108,6 @@ func (conf *InmemoryConfiguration) GetSubProperty(subKey string) map[string]stru } return true }) + return properties } diff --git a/config/base_config.go b/config/base_config.go index 19acea2fd6..54ad8aba36 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -107,7 +107,8 @@ func setFieldValue(val reflect.Value, id reflect.Value, config *config.InmemoryC setBaseValue := func(f reflect.Value) { ok, value := config.GetProperty(getKeyPrefix(val, id) + key) if ok { - if f.Kind() == reflect.Int64 { + switch f.Kind() { + case reflect.Int64: x, err := strconv.Atoi(value) if err != nil { logger.Errorf("Dynamic change the configuration in struct {%v} field {%v} error ,error message is {%v}", @@ -120,21 +121,16 @@ func setFieldValue(val reflect.Value, id reflect.Value, config *config.InmemoryC val.Type().Name(), val.Type().Field(i).Name, perrors.Errorf("the int64 value {%v} from config center is overflow", int64(x))) } } - - } - - if f.Kind() == reflect.String { + case reflect.String: f.SetString(value) - } - if f.Kind() == reflect.Bool { + case reflect.Bool: x, err := strconv.ParseBool(value) if err != nil { logger.Errorf("Dynamic change the configuration in struct {%v} field {%v} error ,error message is {%v}", val.Type().Name(), val.Type().Field(i).Name, err) } f.SetBool(x) - } - if f.Kind() == reflect.Float64 { + case reflect.Float64: x, err := strconv.ParseFloat(value, 64) if err != nil { logger.Errorf("Dynamic change the configuration in struct {%v} field {%v} error ,error message is {%v}", @@ -147,7 +143,10 @@ func setFieldValue(val reflect.Value, id reflect.Value, config *config.InmemoryC val.Type().Name(), val.Type().Field(i).Name, perrors.Errorf("the float64 value {%v} from config center is overflow", x)) } } + default: + logger.Warnf("The kind of field {%v} is not supported ", f.Kind().String()) } + } } @@ -180,25 +179,32 @@ func setFieldValue(val reflect.Value, id reflect.Value, config *config.InmemoryC } if f.Kind() == reflect.Map { - //initiate config - s := reflect.New(f.Type().Elem().Elem()) - prefix := s.MethodByName("Prefix").Call(nil)[0].String() - m := config.GetSubProperty(prefix) - for k := range m { - f.SetMapIndex(reflect.ValueOf(k), reflect.New(f.Type().Elem().Elem())) + if f.Type().Elem().Kind() == reflect.Ptr { + //initiate config + s := reflect.New(f.Type().Elem().Elem()) + prefix := s.MethodByName("Prefix").Call(nil)[0].String() + m := config.GetSubProperty(prefix) + for k := range m { + f.SetMapIndex(reflect.ValueOf(k), reflect.New(f.Type().Elem().Elem())) + } } + //iter := f.MapRange() for _, k := range f.MapKeys() { v := f.MapIndex(k) - if v.Kind() == reflect.Ptr { + switch v.Kind() { + case reflect.Ptr: if v.Elem().Kind() == reflect.Struct { setFieldValue(v.Elem(), k, config) } else { setBaseValue(v.Elem()) } + case reflect.Int64, reflect.String, reflect.Bool, reflect.Float64: + setBaseValue(v) + default: + logger.Warnf("The kind of field {%v} is not supported ", v.Kind().String()) } - } } diff --git a/config/config_loader_test.go b/config/config_loader_test.go index 6e9689c763..cbdc397c28 100644 --- a/config/config_loader_test.go +++ b/config/config_loader_test.go @@ -54,6 +54,7 @@ func TestConfigLoader(t *testing.T) { assert.NotEqual(t, ConsumerConfig{}, GetConsumerConfig()) assert.NotNil(t, providerConfig) assert.NotEqual(t, ProviderConfig{}, GetProviderConfig()) + assert.Equal(t, "soa.com.ikurento.user.UserProvider", GetConsumerConfig().References["UserProvider"].Params["serviceid"]) } func TestLoad(t *testing.T) { diff --git a/config/reference_config.go b/config/reference_config.go index 2c38d8aa4a..835d17f054 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -38,19 +38,20 @@ import ( type ReferenceConfig struct { context context.Context pxy *proxy.Proxy - InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"` - Check *bool `yaml:"check" json:"check,omitempty" property:"check"` - Url string `yaml:"url" json:"url,omitempty" property:"url"` - Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` - Protocol string `yaml:"protocol" json:"protocol,omitempty" property:"protocol"` - Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"` - Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"` - Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` - Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"` - Group string `yaml:"group" json:"group,omitempty" property:"group"` - Version string `yaml:"version" json:"version,omitempty" property:"version"` - Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` - async bool `yaml:"async" json:"async,omitempty" property:"async"` + InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"` + Check *bool `yaml:"check" json:"check,omitempty" property:"check"` + Url string `yaml:"url" json:"url,omitempty" property:"url"` + Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` + Protocol string `yaml:"protocol" json:"protocol,omitempty" property:"protocol"` + Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"` + Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"` + Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` + Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"` + Group string `yaml:"group" json:"group,omitempty" property:"group"` + Version string `yaml:"version" json:"version,omitempty" property:"version"` + Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` + async bool `yaml:"async" json:"async,omitempty" property:"async"` + Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` invoker protocol.Invoker urls []*common.URL } @@ -143,6 +144,10 @@ func (refconfig *ReferenceConfig) GetRPCService() common.RPCService { func (refconfig *ReferenceConfig) getUrlMap() url.Values { urlMap := url.Values{} + //first set user params + for k, v := range refconfig.Params { + urlMap.Set(k, v) + } urlMap.Set(constant.INTERFACE_KEY, refconfig.InterfaceName) urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10)) urlMap.Set(constant.CLUSTER_KEY, refconfig.Cluster) diff --git a/config/reference_config_test.go b/config/reference_config_test.go index c41e2a16de..1a856872a6 100644 --- a/config/reference_config_test.go +++ b/config/reference_config_test.go @@ -80,6 +80,9 @@ func doInit() { }, References: map[string]*ReferenceConfig{ "MockService": { + Params: map[string]string{ + "serviceid": "soa.mock", + }, Registry: "shanghai_reg1,shanghai_reg2,hangzhou_reg1,hangzhou_reg2", InterfaceName: "MockService", Protocol: "mock", @@ -125,6 +128,7 @@ func Test_Refer(t *testing.T) { for _, reference := range consumerConfig.References { reference.Refer() + assert.Equal(t, "soa.mock", reference.Params["serviceid"]) assert.NotNil(t, reference.invoker) assert.NotNil(t, reference.pxy) } diff --git a/config/registry_config.go b/config/registry_config.go index 0c6b326a80..1a926b459e 100644 --- a/config/registry_config.go +++ b/config/registry_config.go @@ -37,8 +37,8 @@ type RegistryConfig struct { Group string `yaml:"group" json:"group,omitempty" property:"group"` //for registry Address string `yaml:"address" json:"address,omitempty" property:"address"` - Username string `yaml:"username" json:"address,omitempty" property:"username"` - Password string `yaml:"password" json:"address,omitempty" property:"password"` + Username string `yaml:"username" json:"username,omitempty" property:"username"` + Password string `yaml:"password" json:"password,omitempty" property:"password"` } func (*RegistryConfig) Prefix() string { diff --git a/config/service_config.go b/config/service_config.go index 79a29aa330..1b78c2ef3d 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -43,17 +43,18 @@ import ( type ServiceConfig struct { context context.Context - Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` - Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` //multi protocol support, split by ',' - InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"` - Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"` - Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"` - Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` - Group string `yaml:"group" json:"group,omitempty" property:"group"` - Version string `yaml:"version" json:"version,omitempty" property:"version" ` - Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` - Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"` - Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"` + Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"` + Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` //multi protocol support, split by ',' + InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"` + Registry string `yaml:"registry" json:"registry,omitempty" property:"registry"` + Cluster string `default:"failover" yaml:"cluster" json:"cluster,omitempty" property:"cluster"` + Loadbalance string `default:"random" yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"` + Group string `yaml:"group" json:"group,omitempty" property:"group"` + Version string `yaml:"version" json:"version,omitempty" property:"version" ` + Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` + Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"` + Retries int64 `yaml:"retries" json:"retries,omitempty" property:"retries"` + Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` unexported *atomic.Bool exported *atomic.Bool rpcService common.RPCService @@ -148,6 +149,10 @@ func (srvconfig *ServiceConfig) Implement(s common.RPCService) { func (srvconfig *ServiceConfig) getUrlMap() url.Values { urlMap := url.Values{} + //first set user params + for k, v := range srvconfig.Params { + urlMap.Set(k, v) + } urlMap.Set(constant.INTERFACE_KEY, srvconfig.InterfaceName) urlMap.Set(constant.TIMESTAMP_KEY, strconv.FormatInt(time.Now().Unix(), 10)) urlMap.Set(constant.CLUSTER_KEY, srvconfig.Cluster) diff --git a/config/testdata/consumer_config.yml b/config/testdata/consumer_config.yml index 08ff59f6fc..68398623b6 100644 --- a/config/testdata/consumer_config.yml +++ b/config/testdata/consumer_config.yml @@ -42,6 +42,9 @@ references: methods : - name: "GetUser" retries: 3 + params: + "serviceid": + "soa.com.ikurento.user.UserProvider" protocol_conf: dubbo: diff --git a/registry/zookeeper/registry_test.go b/registry/zookeeper/registry_test.go index ba2755fa99..168246e157 100644 --- a/registry/zookeeper/registry_test.go +++ b/registry/zookeeper/registry_test.go @@ -36,13 +36,13 @@ import ( func Test_Register(t *testing.T) { regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:1111", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))) - url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"})) + url, _ := common.NewURL(context.TODO(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithParamsValue("serviceid", "soa.mock"), common.WithMethods([]string{"GetUser", "AddUser"})) ts, reg, err := newMockZkRegistry(®url) defer ts.Stop() err = reg.Register(url) children, _ := reg.client.GetChildren("/dubbo/com.ikurento.user.UserProvider/providers") - assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*provider", children) + assert.Regexp(t, ".*dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26category%3Dproviders%26cluster%3Dmock%26dubbo%3Ddubbo-provider-golang-2.6.0%26.*.serviceid%3Dsoa.mock%26.*provider", children) assert.NoError(t, err) } diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index af668a1aaf..733870052d 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -132,7 +132,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li go func(node string) { logger.Infof("delete zkNode{%s}", node) if l.ListenServiceNodeEvent(node, listener) { - logger.Infof("delete content{%s}", n) + logger.Infof("delete content{%s}", node) listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) } logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) diff --git a/remoting/zookeeper/listener_test.go b/remoting/zookeeper/listener_test.go index b1f0d43d34..8b133336e7 100644 --- a/remoting/zookeeper/listener_test.go +++ b/remoting/zookeeper/listener_test.go @@ -18,7 +18,7 @@ package zookeeper import ( - "fmt" + "sync" "testing" "time" ) @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/assert" ) import ( + "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/remoting" ) @@ -86,32 +87,36 @@ func TestListener(t *testing.T) { dubbo.service.com.ikurento.user.UserProvider.warmup=100 dubbo.service.com.ikurento.user.UserProvider.cluster=failover ` - + var wait sync.WaitGroup ts, client, event := initZkData(t) defer ts.Stop() client.Wait.Add(1) + wait.Add(1) go client.HandleZkEvent(event) listener := NewZkEventListener(client) - dataListener := &mockDataListener{client: client, changedData: changedData} + dataListener := &mockDataListener{client: client, changedData: changedData, wait: &wait} listener.ListenServiceEvent("/dubbo", dataListener) _, err := client.Conn.Set("/dubbo/dubbo.properties", []byte(changedData), 1) assert.NoError(t, err) - client.Wait.Wait() + wait.Wait() assert.Equal(t, changedData, dataListener.eventList[1].Content) + client.Close() + } type mockDataListener struct { eventList []remoting.Event client *ZookeeperClient changedData string + wait *sync.WaitGroup } func (m *mockDataListener) DataChange(eventType remoting.Event) bool { - fmt.Println(eventType) + logger.Info(eventType) m.eventList = append(m.eventList, eventType) if eventType.Content == m.changedData { - m.client.Wait.Done() + m.wait.Done() } return true }