diff --git a/.travis.yml b/.travis.yml index 707e644814..62de363aa2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,10 +10,7 @@ install: true script: - go fmt ./... && [[ -z `git status -s` ]] - - mkdir -p remoting/zookeeper/zookeeper-4unittest/contrib/fatjar config_center/zookeeper/zookeeper-4unittest/contrib/fatjar registry/zookeeper/zookeeper-4unittest/contrib/fatjar - - wget -P "remoting/zookeeper/zookeeper-4unittest/contrib/fatjar" https://github.com/dubbogo/resources/raw/master/zookeeper-4unitest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar - - cp remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar config_center/zookeeper/zookeeper-4unittest/contrib/fatjar/ - - cp remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar registry/zookeeper/zookeeper-4unittest/contrib/fatjar/ + - chmod u+x before_ut.sh && ./before_ut.sh - go mod vendor && go test ./... -coverprofile=coverage.txt -covermode=atomic after_success: diff --git a/before_ut.bat b/before_ut.bat index 5296d0f876..fcf4ef2df6 100644 --- a/before_ut.bat +++ b/before_ut.bat @@ -15,7 +15,17 @@ :: limitations under the License. set zkJar=zookeeper-3.4.9-fatjar.jar -md remoting\zookeeper\zookeeper-4unittest\contrib\fatjar config_center\zookeeper\zookeeper-4unittest\contrib\fatjar registry\zookeeper\zookeeper-4unittest\contrib\fatjar +md remoting\zookeeper\zookeeper-4unittest\contrib\fatjar curl -L https://github.com/dubbogo/resources/raw/master/zookeeper-4unitest/contrib/fatjar/%zkJar% -o remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/%zkJar% + +md config_center\zookeeper\zookeeper-4unittest\contrib\fatjar xcopy /f "remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/%zkJar%" "config_center/zookeeper/zookeeper-4unittest/contrib/fatjar/" -xcopy /f "remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/%zkJar%" "registry/zookeeper/zookeeper-4unittest/contrib/fatjar/" \ No newline at end of file + +md registry\zookeeper\zookeeper-4unittest\contrib\fatjar +xcopy /f "remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/%zkJar%" "registry/zookeeper/zookeeper-4unittest/contrib/fatjar/" + +md cluster\router\chain\zookeeper-4unittest\contrib\fatjar +xcopy /f "remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/%zkJar%" "cluster/router/chain/zookeeper-4unittest/contrib/fatjar/" + +md cluster\router\condition\zookeeper-4unittest\contrib\fatjar +xcopy /f "remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/%zkJar%" "cluster/router/condition/zookeeper-4unittest/contrib/fatjar/" \ No newline at end of file diff --git a/before_ut.sh b/before_ut.sh old mode 100644 new mode 100755 index 323173bcc6..c6559cba31 --- a/before_ut.sh +++ b/before_ut.sh @@ -15,7 +15,17 @@ # limitations under the License. -mkdir -p remoting/zookeeper/zookeeper-4unittest/contrib/fatjar config_center/zookeeper/zookeeper-4unittest/contrib/fatjar registry/zookeeper/zookeeper-4unittest/contrib/fatjar +mkdir -p remoting/zookeeper/zookeeper-4unittest/contrib/fatjar wget -P "remoting/zookeeper/zookeeper-4unittest/contrib/fatjar" https://github.com/dubbogo/resources/raw/master/zookeeper-4unitest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar + +mkdir -p config_center/zookeeper/zookeeper-4unittest/contrib/fatjar cp remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar config_center/zookeeper/zookeeper-4unittest/contrib/fatjar/ -cp remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar registry/zookeeper/zookeeper-4unittest/contrib/fatjar/ \ No newline at end of file + +mkdir -p registry/zookeeper/zookeeper-4unittest/contrib/fatjar +cp remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar registry/zookeeper/zookeeper-4unittest/contrib/fatjar/ + +mkdir -p cluster/router/chain/zookeeper-4unittest/contrib/fatjar +cp remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar cluster/router/chain/zookeeper-4unittest/contrib/fatjar + +mkdir -p cluster/router/condition/zookeeper-4unittest/contrib/fatjar +cp remoting/zookeeper/zookeeper-4unittest/contrib/fatjar/zookeeper-3.4.9-fatjar.jar cluster/router/condition/zookeeper-4unittest/contrib/fatjar \ No newline at end of file diff --git a/cluster/directory/base_directory.go b/cluster/directory/base_directory.go index e1a38c4c82..75d9ef2656 100644 --- a/cluster/directory/base_directory.go +++ b/cluster/directory/base_directory.go @@ -20,39 +20,94 @@ package directory import ( "sync" ) + import ( + "github.com/dubbogo/gost/container/set" "go.uber.org/atomic" ) + import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/cluster/router/chain" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" ) -// BaseDirectory ... +var routerURLSet = gxset.NewSet() + +// BaseDirectory Abstract implementation of Directory: Invoker list returned from this Directory's list method have been filtered by Routers type BaseDirectory struct { url *common.URL destroyed *atomic.Bool - mutex sync.Mutex + // this mutex for change the properties in BaseDirectory, like routerChain , destroyed etc + mutex sync.Mutex + routerChain router.Chain } -// NewBaseDirectory ... +// NewBaseDirectory Create BaseDirectory with URL func NewBaseDirectory(url *common.URL) BaseDirectory { return BaseDirectory{ - url: url, - destroyed: atomic.NewBool(false), + url: url, + destroyed: atomic.NewBool(false), + routerChain: &chain.RouterChain{}, } } -// GetUrl ... +// RouterChain Return router chain in directory +func (dir *BaseDirectory) RouterChain() router.Chain { + return dir.routerChain +} + +// SetRouterChain Set router chain in directory +func (dir *BaseDirectory) SetRouterChain(routerChain router.Chain) { + dir.mutex.Lock() + defer dir.mutex.Unlock() + dir.routerChain = routerChain +} + +// GetUrl Get URL func (dir *BaseDirectory) GetUrl() common.URL { return *dir.url } -// GetDirectoryUrl ... +// GetDirectoryUrl Get URL instance func (dir *BaseDirectory) GetDirectoryUrl() *common.URL { return dir.url } -// Destroy ... +// SetRouters Convert url to routers and add them into dir.routerChain +func (dir *BaseDirectory) SetRouters(urls []*common.URL) { + if len(urls) == 0 { + return + } + + routers := make([]router.Router, 0, len(urls)) + + for _, url := range urls { + routerKey := url.GetParam(constant.ROUTER_KEY, "") + + if len(routerKey) > 0 { + factory := extension.GetRouterFactory(url.Protocol) + r, err := factory.NewRouter(url) + if err != nil { + logger.Errorf("Create router fail. router key: %s, error: %v", routerKey, url.Service(), err) + return + } + routers = append(routers, r) + } + } + + logger.Infof("Init file condition router success, size: %v", len(routers)) + dir.mutex.Lock() + rc := dir.routerChain + dir.mutex.Unlock() + + rc.AddRouters(routers) +} + +// Destroy Destroy func (dir *BaseDirectory) Destroy(doDestroy func()) { if dir.destroyed.CAS(false, true) { dir.mutex.Lock() @@ -61,7 +116,18 @@ func (dir *BaseDirectory) Destroy(doDestroy func()) { } } -// IsAvailable ... +// IsAvailable Once directory init finish, it will change to true func (dir *BaseDirectory) IsAvailable() bool { return !dir.destroyed.Load() } + +// GetRouterURLSet Return router URL +func GetRouterURLSet() *gxset.HashSet { + return routerURLSet +} + +// AddRouterURLSet Add router URL +// Router URL will init in config/config_loader.go +func AddRouterURLSet(url *common.URL) { + routerURLSet.Add(url) +} diff --git a/cluster/directory/base_directory_test.go b/cluster/directory/base_directory_test.go new file mode 100644 index 0000000000..d5993959f1 --- /dev/null +++ b/cluster/directory/base_directory_test.go @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package directory + +import ( + "encoding/base64" + "fmt" + "testing" +) + +import ( + gxnet "github.com/dubbogo/gost/net" + "github.com/stretchr/testify/assert" +) + +import ( + _ "github.com/apache/dubbo-go/cluster/router/condition" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" +) + +func TestNewBaseDirectory(t *testing.T) { + url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")) + directory := NewBaseDirectory(&url) + + assert.NotNil(t, directory) + + assert.Equal(t, url, directory.GetUrl()) + assert.Equal(t, &url, directory.GetDirectoryUrl()) + +} + +func TestBuildRouterChain(t *testing.T) { + url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")) + directory := NewBaseDirectory(&url) + + assert.NotNil(t, directory) + + localIP, _ := gxnet.GetLocalIP() + rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP)) + routeURL := getRouteUrl(rule) + routerURLs := make([]*common.URL, 0) + routerURLs = append(routerURLs, routeURL) + directory.SetRouters(routerURLs) + chain := directory.RouterChain() + + assert.NotNil(t, chain) +} + +func getRouteUrl(rule string) *common.URL { + url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService") + url.AddParam("rule", rule) + url.AddParam("force", "true") + url.AddParam(constant.ROUTER_KEY, "router") + return &url +} diff --git a/cluster/directory/static_directory.go b/cluster/directory/static_directory.go index 7d2d5490b0..9f600fedc4 100644 --- a/cluster/directory/static_directory.go +++ b/cluster/directory/static_directory.go @@ -18,6 +18,11 @@ package directory import ( + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/cluster/router/chain" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/protocol" ) @@ -27,7 +32,7 @@ type staticDirectory struct { invokers []protocol.Invoker } -// NewStaticDirectory ... +// NewStaticDirectory Create a new staticDirectory with invokers func NewStaticDirectory(invokers []protocol.Invoker) *staticDirectory { var url common.URL @@ -53,11 +58,21 @@ func (dir *staticDirectory) IsAvailable() bool { return true } +// List List invokers func (dir *staticDirectory) List(invocation protocol.Invocation) []protocol.Invoker { - //TODO:Here should add router - return dir.invokers + l := len(dir.invokers) + invokers := make([]protocol.Invoker, l, l) + copy(invokers, dir.invokers) + routerChain := dir.RouterChain() + + if routerChain == nil { + return invokers + } + dirUrl := dir.GetUrl() + return routerChain.Route(invokers, &dirUrl, invocation) } +// Destroy Destroy func (dir *staticDirectory) Destroy() { dir.BaseDirectory.Destroy(func() { for _, ivk := range dir.invokers { @@ -66,3 +81,17 @@ func (dir *staticDirectory) Destroy() { dir.invokers = []protocol.Invoker{} }) } + +// BuildRouterChain build router chain by invokers +func (dir *staticDirectory) BuildRouterChain(invokers []protocol.Invoker) error { + if len(invokers) == 0 { + return perrors.Errorf("invokers == null") + } + url := invokers[0].GetUrl() + routerChain, e := chain.NewRouterChain(&url) + if e != nil { + return e + } + dir.SetRouterChain(routerChain) + return nil +} diff --git a/cluster/directory/static_directory_test.go b/cluster/directory/static_directory_test.go index 42ef1bcd0b..c50c9a4063 100644 --- a/cluster/directory/static_directory_test.go +++ b/cluster/directory/static_directory_test.go @@ -40,7 +40,9 @@ func Test_StaticDirList(t *testing.T) { } staticDir := NewStaticDirectory(invokers) - assert.Len(t, staticDir.List(&invocation.RPCInvocation{}), 10) + list := staticDir.List(&invocation.RPCInvocation{}) + + assert.Len(t, list, 10) } func Test_StaticDirDestroy(t *testing.T) { diff --git a/cluster/router/chain/chain.go b/cluster/router/chain/chain.go new file mode 100644 index 0000000000..d48a837eba --- /dev/null +++ b/cluster/router/chain/chain.go @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package chain + +import ( + "math" + "sort" + "sync" +) + +import ( + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +// RouterChain Router chain +type RouterChain struct { + // Full list of addresses from registry, classified by method name. + invokers []protocol.Invoker + // Containing all routers, reconstruct every time 'route://' urls change. + routers []router.Router + // Fixed router instances: ConfigConditionRouter, TagRouter, e.g., the rule for each instance may change but the + // instance will never delete or recreate. + builtinRouters []router.Router + + mutex sync.RWMutex +} + +// Route Loop routers in RouterChain and call Route method to determine the target invokers list. +func (c *RouterChain) Route(invoker []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { + finalInvokers := invoker + l := len(c.routers) + rs := make([]router.Router, l, int(math.Ceil(float64(l)*1.2))) + c.mutex.RLock() + copy(rs, c.routers) + c.mutex.RUnlock() + + for _, r := range rs { + finalInvokers = r.Route(finalInvokers, url, invocation) + } + return finalInvokers +} + +// AddRouters Add routers to router chain +// New a array add builtinRouters which is not sorted in RouterChain and routers +// Sort the array +// Replace router array in RouterChain +func (c *RouterChain) AddRouters(routers []router.Router) { + newRouters := make([]router.Router, 0, len(c.builtinRouters)+len(routers)) + newRouters = append(newRouters, c.builtinRouters...) + newRouters = append(newRouters, routers...) + sortRouter(newRouters) + c.mutex.Lock() + defer c.mutex.Unlock() + c.routers = newRouters +} + +// NewRouterChain Use url to init router chain +// Loop routerFactories and call NewRouter method +func NewRouterChain(url *common.URL) (*RouterChain, error) { + routerFactories := extension.GetRouterFactories() + if len(routerFactories) == 0 { + return nil, perrors.Errorf("No routerFactory exits , create one please") + } + routers := make([]router.Router, 0, len(routerFactories)) + for key, routerFactory := range routerFactories { + r, err := routerFactory().NewRouter(url) + if r == nil || err != nil { + logger.Errorf("router chain build router fail! routerFactories key:%s error:%s", key, err.Error()) + continue + } + routers = append(routers, r) + } + + newRouters := make([]router.Router, len(routers)) + copy(newRouters, routers) + + sortRouter(newRouters) + + chain := &RouterChain{ + builtinRouters: routers, + routers: newRouters, + } + + return chain, nil +} + +// sortRouter Sort router instance by priority with stable algorithm +func sortRouter(routers []router.Router) { + sort.Stable(byPriority(routers)) +} + +// byPriority Sort by priority +type byPriority []router.Router + +func (a byPriority) Len() int { return len(a) } +func (a byPriority) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byPriority) Less(i, j int) bool { return a[i].Priority() < a[j].Priority() } diff --git a/cluster/router/chain/chain_test.go b/cluster/router/chain/chain_test.go new file mode 100644 index 0000000000..0cb47c4a18 --- /dev/null +++ b/cluster/router/chain/chain_test.go @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package chain + +import ( + "encoding/base64" + "fmt" + "strconv" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/cluster/router/condition" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/config" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + _ "github.com/apache/dubbo-go/config_center/zookeeper" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/remoting/zookeeper" +) + +func TestNewRouterChain(t *testing.T) { + ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) + assert.NoError(t, err) + err = z.Create("/dubbo/config/dubbo/test-condition.condition-router") + assert.NoError(t, err) + + testyml := `enabled: true +force: true +runtime: false +conditions: + - => host != 172.22.3.91 +` + + _, err = z.Conn.Set("/dubbo/config/dubbo/test-condition.condition-router", []byte(testyml), 0) + assert.NoError(t, err) + defer ts.Stop() + defer z.Close() + + zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) + config.GetEnvInstance().SetDynamicConfiguration(configuration) + + assert.Nil(t, err) + assert.NotNil(t, configuration) + + chain, err := NewRouterChain(getRouteUrl("test-condition")) + assert.Nil(t, err) + assert.Equal(t, 1, len(chain.routers)) + appRouter := chain.routers[0].(*condition.AppRouter) + + assert.NotNil(t, appRouter) + assert.NotNil(t, appRouter.RouterRule()) + rule := appRouter.RouterRule() + assert.Equal(t, "", rule.Scope) + assert.True(t, rule.Force) + assert.True(t, rule.Enabled) + assert.True(t, rule.Valid) + + assert.Equal(t, testyml, rule.RawRule) + assert.Equal(t, false, rule.Runtime) + assert.Equal(t, false, rule.Dynamic) + assert.Equal(t, "", rule.Key) +} + +func TestNewRouterChainURLNil(t *testing.T) { + chain, err := NewRouterChain(nil) + assert.NoError(t, err) + assert.NotNil(t, chain) +} + +func TestRouterChain_AddRouters(t *testing.T) { + ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) + assert.NoError(t, err) + err = z.Create("/dubbo/config/dubbo/test-condition.condition-router") + assert.NoError(t, err) + + testyml := `enabled: true +force: true +runtime: false +conditions: + - => host != 172.22.3.91 +` + + _, err = z.Conn.Set("/dubbo/config/dubbo/test-condition.condition-router", []byte(testyml), 0) + assert.NoError(t, err) + defer ts.Stop() + defer z.Close() + + zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) + config.GetEnvInstance().SetDynamicConfiguration(configuration) + + chain, err := NewRouterChain(getConditionRouteUrl("test-condition")) + assert.Nil(t, err) + assert.Equal(t, 2, len(chain.routers)) + + url := getConditionRouteUrl("test-condition") + assert.NotNil(t, url) + factory := extension.GetRouterFactory(url.Protocol) + r, err := factory.NewRouter(url) + assert.Nil(t, err) + assert.NotNil(t, r) + + routers := make([]router.Router, 0) + routers = append(routers, r) + chain.AddRouters(routers) + assert.Equal(t, 3, len(chain.routers)) +} + +func TestRouterChain_Route(t *testing.T) { + ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) + defer ts.Stop() + defer z.Close() + + zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) + config.GetEnvInstance().SetDynamicConfiguration(configuration) + + chain, err := NewRouterChain(getConditionRouteUrl("test-condition")) + assert.Nil(t, err) + assert.Equal(t, 1, len(chain.routers)) + + url := getConditionRouteUrl("test-condition") + assert.NotNil(t, url) + + invokers := []protocol.Invoker{} + dubboURL, _ := common.NewURL(fmt.Sprintf("dubbo://1.2.3.4:20000/com.foo.BarService")) + invokers = append(invokers, protocol.NewBaseInvoker(dubboURL)) + + targetURL, _ := common.NewURL(fmt.Sprintf("consumer://1.1.1.1/com.foo.BarService")) + inv := &invocation.RPCInvocation{} + finalInvokers := chain.Route(invokers, &targetURL, inv) + + assert.Equal(t, 1, len(finalInvokers)) +} + +func TestRouterChain_Route_AppRouter(t *testing.T) { + ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) + assert.NoError(t, err) + err = z.Create("/dubbo/config/dubbo/test-condition.condition-router") + assert.NoError(t, err) + + testyml := `enabled: true +force: true +runtime: false +conditions: + - => host = 1.1.1.1 => host != 1.2.3.4 +` + + _, err = z.Conn.Set("/dubbo/config/dubbo/test-condition.condition-router", []byte(testyml), 0) + assert.NoError(t, err) + defer ts.Stop() + defer z.Close() + + zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) + config.GetEnvInstance().SetDynamicConfiguration(configuration) + + chain, err := NewRouterChain(getConditionRouteUrl("test-condition")) + assert.Nil(t, err) + assert.Equal(t, 2, len(chain.routers)) + + invokers := []protocol.Invoker{} + dubboURL, _ := common.NewURL(fmt.Sprintf("dubbo://1.2.3.4:20000/com.foo.BarService")) + invokers = append(invokers, protocol.NewBaseInvoker(dubboURL)) + + targetURL, _ := common.NewURL(fmt.Sprintf("consumer://1.1.1.1/com.foo.BarService")) + inv := &invocation.RPCInvocation{} + finalInvokers := chain.Route(invokers, &targetURL, inv) + + assert.Equal(t, 0, len(finalInvokers)) +} + +func TestRouterChain_Route_NoRoute(t *testing.T) { + ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) + defer ts.Stop() + defer z.Close() + + zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) + config.GetEnvInstance().SetDynamicConfiguration(configuration) + + chain, err := NewRouterChain(getConditionNoRouteUrl("test-condition")) + assert.Nil(t, err) + assert.Equal(t, 1, len(chain.routers)) + + url := getConditionRouteUrl("test-condition") + assert.NotNil(t, url) + + invokers := []protocol.Invoker{} + dubboURL, _ := common.NewURL(fmt.Sprintf("dubbo://1.2.3.4:20000/com.foo.BarService")) + invokers = append(invokers, protocol.NewBaseInvoker(dubboURL)) + + targetURL, _ := common.NewURL(fmt.Sprintf("consumer://1.1.1.1/com.foo.BarService")) + inv := &invocation.RPCInvocation{} + finalInvokers := chain.Route(invokers, &targetURL, inv) + + assert.Equal(t, 0, len(finalInvokers)) +} + +func getConditionNoRouteUrl(applicationKey string) *common.URL { + url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService") + url.AddParam("application", applicationKey) + url.AddParam("force", "true") + rule := base64.URLEncoding.EncodeToString([]byte("host = 1.1.1.1 => host != 1.2.3.4")) + url.AddParam(constant.RULE_KEY, rule) + return &url +} + +func getConditionRouteUrl(applicationKey string) *common.URL { + url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService") + url.AddParam("application", applicationKey) + url.AddParam("force", "true") + rule := base64.URLEncoding.EncodeToString([]byte("host = 1.1.1.1 => host = 1.2.3.4")) + url.AddParam(constant.RULE_KEY, rule) + return &url +} + +func getRouteUrl(applicationKey string) *common.URL { + url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService") + url.AddParam("application", applicationKey) + url.AddParam("force", "true") + return &url +} diff --git a/cluster/router/router_factory.go b/cluster/router/condition/app_router.go similarity index 55% rename from cluster/router/router_factory.go rename to cluster/router/condition/app_router.go index 723050939e..056e32851c 100644 --- a/cluster/router/router_factory.go +++ b/cluster/router/condition/app_router.go @@ -15,27 +15,36 @@ * limitations under the License. */ -package router +package condition import ( - "github.com/apache/dubbo-go/cluster" - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/common/extension" + perrors "github.com/pkg/errors" ) -func init() { - extension.SetRouterFactory("condition", NewConditionRouterFactory) -} +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" +) -// ConditionRouterFactory ... -type ConditionRouterFactory struct{} +const ( + // Default priority for application router + appRouterDefaultPriority = int64(150) +) -// NewConditionRouterFactory ... -func NewConditionRouterFactory() cluster.RouterFactory { - return ConditionRouterFactory{} +// AppRouter For listen application router with config center +type AppRouter struct { + listenableRouter } -// Router ... -func (c ConditionRouterFactory) Router(url *common.URL) (cluster.Router, error) { - return newConditionRouter(url) +// NewAppRouter Init AppRouter by url +func NewAppRouter(url *common.URL) (*AppRouter, error) { + if url == nil { + return nil, perrors.Errorf("No route URL for create app router!") + } + appRouter, err := newListenableRouter(url, url.GetParam(constant.APPLICATION_KEY, "")) + if err != nil { + return nil, err + } + appRouter.priority = appRouterDefaultPriority + return appRouter, nil } diff --git a/cluster/router/condition/app_router_test.go b/cluster/router/condition/app_router_test.go new file mode 100644 index 0000000000..bd817af36c --- /dev/null +++ b/cluster/router/condition/app_router_test.go @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package condition + +import ( + "strconv" + "testing" + "time" +) + +import ( + _ "github.com/apache/dubbo-go/config_center/zookeeper" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/config" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/remoting" + "github.com/apache/dubbo-go/remoting/zookeeper" +) + +func TestNewAppRouter(t *testing.T) { + + testYML := `enabled: true +force: true +runtime: false +conditions: + - => host != 172.22.3.91 +` + ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) + assert.NoError(t, err) + err = z.Create("/dubbo/config/dubbo/test-condition.condition-router") + assert.NoError(t, err) + + _, err = z.Conn.Set("/dubbo/config/dubbo/test-condition.condition-router", []byte(testYML), 0) + assert.NoError(t, err) + defer ts.Stop() + defer z.Close() + + zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) + config.GetEnvInstance().SetDynamicConfiguration(configuration) + + assert.Nil(t, err) + assert.NotNil(t, configuration) + + appRouteURL := getAppRouteURL("test-condition") + appRouter, err := NewAppRouter(appRouteURL) + assert.Nil(t, err) + assert.NotNil(t, appRouter) + + assert.NotNil(t, appRouter) + assert.NotNil(t, appRouter.RouterRule()) + rule := appRouter.RouterRule() + assert.Equal(t, "", rule.Scope) + assert.True(t, rule.Force) + assert.True(t, rule.Enabled) + assert.True(t, rule.Valid) + + assert.Equal(t, testYML, rule.RawRule) + assert.Equal(t, false, rule.Runtime) + assert.Equal(t, false, rule.Dynamic) + assert.Equal(t, "", rule.Key) + assert.Equal(t, 0, rule.Priority) +} + +func TestGenerateConditions(t *testing.T) { + + testYML := `enabled: true +force: true +runtime: false +conditions: + - => host != 172.22.3.91 + - host = 192.168.199.208 => host = 192.168.199.208 +` + ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) + assert.NoError(t, err) + err = z.Create("/dubbo/config/dubbo/test-condition.condition-router") + assert.NoError(t, err) + + _, err = z.Conn.Set("/dubbo/config/dubbo/test-condition.condition-router", []byte(testYML), 0) + assert.NoError(t, err) + defer ts.Stop() + defer z.Close() + + zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) + config.GetEnvInstance().SetDynamicConfiguration(configuration) + + assert.Nil(t, err) + assert.NotNil(t, configuration) + + appRouteURL := getAppRouteURL("test-condition") + appRouter, err := NewAppRouter(appRouteURL) + assert.Nil(t, err) + assert.NotNil(t, appRouter) + + rule, err := Parse(testYML) + assert.Nil(t, err) + appRouter.generateConditions(rule) + + assert.Equal(t, 2, len(appRouter.conditionRouters)) +} + +func TestProcess(t *testing.T) { + + testYML := `enabled: true +force: true +runtime: false +conditions: + - => host != 172.22.3.91 +` + ts, z, _, err := zookeeper.NewMockZookeeperClient("test", 15*time.Second) + assert.NoError(t, err) + err = z.Create("/dubbo/config/dubbo/test-condition.condition-router") + assert.NoError(t, err) + + _, err = z.Conn.Set("/dubbo/config/dubbo/test-condition.condition-router", []byte(testYML), 0) + assert.NoError(t, err) + defer ts.Stop() + defer z.Close() + + zkUrl, _ := common.NewURL("zookeeper://127.0.0.1:" + strconv.Itoa(ts.Servers[0].Port)) + configuration, err := extension.GetConfigCenterFactory("zookeeper").GetDynamicConfiguration(&zkUrl) + config.GetEnvInstance().SetDynamicConfiguration(configuration) + + assert.Nil(t, err) + assert.NotNil(t, configuration) + + appRouteURL := getAppRouteURL("test-condition") + appRouter, err := NewAppRouter(appRouteURL) + assert.Nil(t, err) + assert.NotNil(t, appRouter) + + assert.Equal(t, 1, len(appRouter.conditionRouters)) + + testNewYML := ` +enabled: true +force: true +runtime: false +conditions: + - => host != 172.22.3.91 + - host = 192.168.199.208 => host = 192.168.199.208 +` + + appRouter.Process(&config_center.ConfigChangeEvent{ConfigType: remoting.EventTypeDel}) + + assert.Equal(t, 0, len(appRouter.conditionRouters)) + + appRouter.Process(&config_center.ConfigChangeEvent{Value: testNewYML, ConfigType: remoting.EventTypeAdd}) + + assert.Equal(t, 2, len(appRouter.conditionRouters)) +} + +func getAppRouteURL(applicationKey string) *common.URL { + url, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService") + url.AddParam("application", applicationKey) + url.AddParam("force", "true") + return &url +} diff --git a/cluster/router/condition/factory.go b/cluster/router/condition/factory.go new file mode 100644 index 0000000000..66512a1387 --- /dev/null +++ b/cluster/router/condition/factory.go @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package condition + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" +) + +func init() { + extension.SetRouterFactory(constant.ConditionRouterName, newConditionRouterFactory) + extension.SetRouterFactory(constant.ConditionAppRouterName, newAppRouterFactory) +} + +// ConditionRouterFactory Condition router factory +type ConditionRouterFactory struct{} + +func newConditionRouterFactory() router.RouterFactory { + return &ConditionRouterFactory{} +} + +// NewRouter Create ConditionRouterFactory by URL +func (c *ConditionRouterFactory) NewRouter(url *common.URL) (router.Router, error) { + return NewConditionRouter(url) +} + +// NewRouter Create FileRouterFactory by Content +func (c *ConditionRouterFactory) NewFileRouter(content []byte) (router.Router, error) { + return NewFileConditionRouter(content) +} + +// AppRouterFactory Application router factory +type AppRouterFactory struct{} + +func newAppRouterFactory() router.RouterFactory { + return &AppRouterFactory{} +} + +// NewRouter Create AppRouterFactory by URL +func (c *AppRouterFactory) NewRouter(url *common.URL) (router.Router, error) { + return NewAppRouter(url) +} diff --git a/cluster/router/condition_router_test.go b/cluster/router/condition/factory_test.go similarity index 74% rename from cluster/router/condition_router_test.go rename to cluster/router/condition/factory_test.go index fc639b9fc7..99cec34096 100644 --- a/cluster/router/condition_router_test.go +++ b/cluster/router/condition/factory_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package router +package condition import ( "context" @@ -119,33 +119,33 @@ func (bi *MockInvoker) Destroy() { func TestRoute_matchWhen(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("=> host = 1.2.3.4")) - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) + router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule)) cUrl, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService") - matchWhen, _ := router.(*ConditionRouter).MatchWhen(cUrl, inv) + matchWhen := router.(*ConditionRouter).MatchWhen(&cUrl, inv) assert.Equal(t, true, matchWhen) rule1 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) - router1, _ := NewConditionRouterFactory().Router(getRouteUrl(rule1)) - matchWhen1, _ := router1.(*ConditionRouter).MatchWhen(cUrl, inv) + router1, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule1)) + matchWhen1 := router1.(*ConditionRouter).MatchWhen(&cUrl, inv) assert.Equal(t, true, matchWhen1) rule2 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.1,3.3.3.3 & host !=1.1.1.1 => host = 1.2.3.4")) - router2, _ := NewConditionRouterFactory().Router(getRouteUrl(rule2)) - matchWhen2, _ := router2.(*ConditionRouter).MatchWhen(cUrl, inv) + router2, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule2)) + matchWhen2 := router2.(*ConditionRouter).MatchWhen(&cUrl, inv) assert.Equal(t, false, matchWhen2) rule3 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.4 & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) - router3, _ := NewConditionRouterFactory().Router(getRouteUrl(rule3)) - matchWhen3, _ := router3.(*ConditionRouter).MatchWhen(cUrl, inv) + router3, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule3)) + matchWhen3 := router3.(*ConditionRouter).MatchWhen(&cUrl, inv) assert.Equal(t, true, matchWhen3) rule4 := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) - router4, _ := NewConditionRouterFactory().Router(getRouteUrl(rule4)) - matchWhen4, _ := router4.(*ConditionRouter).MatchWhen(cUrl, inv) + router4, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule4)) + matchWhen4 := router4.(*ConditionRouter).MatchWhen(&cUrl, inv) assert.Equal(t, true, matchWhen4) rule5 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.1 => host = 1.2.3.4")) - router5, _ := NewConditionRouterFactory().Router(getRouteUrl(rule5)) - matchWhen5, _ := router5.(*ConditionRouter).MatchWhen(cUrl, inv) + router5, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule5)) + matchWhen5 := router5.(*ConditionRouter).MatchWhen(&cUrl, inv) assert.Equal(t, false, matchWhen5) rule6 := base64.URLEncoding.EncodeToString([]byte("host = 2.2.2.2,1.1.1.*,3.3.3.3 & host != 1.1.1.2 => host = 1.2.3.4")) - router6, _ := NewConditionRouterFactory().Router(getRouteUrl(rule6)) - matchWhen6, _ := router6.(*ConditionRouter).MatchWhen(cUrl, inv) + router6, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule6)) + matchWhen6 := router6.(*ConditionRouter).MatchWhen(&cUrl, inv) assert.Equal(t, true, matchWhen6) } @@ -162,19 +162,19 @@ func TestRoute_matchFilter(t *testing.T) { rule4 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 10.20.3.2,10.20.3.3,10.20.3.4")) rule5 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host != 10.20.3.3")) rule6 := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " serialization = fastjson")) - router1, _ := NewConditionRouterFactory().Router(getRouteUrl(rule1)) - router2, _ := NewConditionRouterFactory().Router(getRouteUrl(rule2)) - router3, _ := NewConditionRouterFactory().Router(getRouteUrl(rule3)) - router4, _ := NewConditionRouterFactory().Router(getRouteUrl(rule4)) - router5, _ := NewConditionRouterFactory().Router(getRouteUrl(rule5)) - router6, _ := NewConditionRouterFactory().Router(getRouteUrl(rule6)) + router1, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule1)) + router2, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule2)) + router3, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule3)) + router4, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule4)) + router5, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule5)) + router6, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule6)) cUrl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService") - fileredInvokers1 := router1.Route(invokers, cUrl, &invocation.RPCInvocation{}) - fileredInvokers2 := router2.Route(invokers, cUrl, &invocation.RPCInvocation{}) - fileredInvokers3 := router3.Route(invokers, cUrl, &invocation.RPCInvocation{}) - fileredInvokers4 := router4.Route(invokers, cUrl, &invocation.RPCInvocation{}) - fileredInvokers5 := router5.Route(invokers, cUrl, &invocation.RPCInvocation{}) - fileredInvokers6 := router6.Route(invokers, cUrl, &invocation.RPCInvocation{}) + fileredInvokers1 := router1.Route(invokers, &cUrl, &invocation.RPCInvocation{}) + fileredInvokers2 := router2.Route(invokers, &cUrl, &invocation.RPCInvocation{}) + fileredInvokers3 := router3.Route(invokers, &cUrl, &invocation.RPCInvocation{}) + fileredInvokers4 := router4.Route(invokers, &cUrl, &invocation.RPCInvocation{}) + fileredInvokers5 := router5.Route(invokers, &cUrl, &invocation.RPCInvocation{}) + fileredInvokers6 := router6.Route(invokers, &cUrl, &invocation.RPCInvocation{}) assert.Equal(t, 1, len(fileredInvokers1)) assert.Equal(t, 0, len(fileredInvokers2)) assert.Equal(t, 0, len(fileredInvokers3)) @@ -187,22 +187,22 @@ func TestRoute_matchFilter(t *testing.T) { func TestRoute_methodRoute(t *testing.T) { inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("getFoo"), invocation.WithParameterTypes([]reflect.Type{}), invocation.WithArguments([]interface{}{})) rule := base64.URLEncoding.EncodeToString([]byte("host !=4.4.4.* & host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4")) - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) + router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule)) url, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=setFoo,getFoo,findFoo") - matchWhen, _ := router.(*ConditionRouter).MatchWhen(url, inv) + matchWhen := router.(*ConditionRouter).MatchWhen(&url, inv) assert.Equal(t, true, matchWhen) url1, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=getFoo") - matchWhen, _ = router.(*ConditionRouter).MatchWhen(url1, inv) + matchWhen = router.(*ConditionRouter).MatchWhen(&url1, inv) assert.Equal(t, true, matchWhen) url2, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=getFoo") rule2 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host!=1.1.1.1 => host = 1.2.3.4")) - router2, _ := NewConditionRouterFactory().Router(getRouteUrl(rule2)) - matchWhen, _ = router2.(*ConditionRouter).MatchWhen(url2, inv) + router2, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule2)) + matchWhen = router2.(*ConditionRouter).MatchWhen(&url2, inv) assert.Equal(t, false, matchWhen) url3, _ := common.NewURL("consumer://1.1.1.1/com.foo.BarService?methods=getFoo") rule3 := base64.URLEncoding.EncodeToString([]byte("methods=getFoo & host=1.1.1.1 => host = 1.2.3.4")) - router3, _ := NewConditionRouterFactory().Router(getRouteUrl(rule3)) - matchWhen, _ = router3.(*ConditionRouter).MatchWhen(url3, inv) + router3, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule3)) + matchWhen = router3.(*ConditionRouter).MatchWhen(&url3, inv) assert.Equal(t, true, matchWhen) } @@ -214,8 +214,8 @@ func TestRoute_ReturnFalse(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => false")) curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*ConditionRouter).Route(invokers, curl, inv) + router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule)) + fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 0, len(fileredInvokers)) } @@ -226,19 +226,24 @@ func TestRoute_ReturnEmpty(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => ")) curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*ConditionRouter).Route(invokers, curl, inv) + router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule)) + fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 0, len(fileredInvokers)) } func TestRoute_ReturnAll(t *testing.T) { localIP, _ := gxnet.GetLocalIP() - invokers := []protocol.Invoker{&MockInvoker{}, &MockInvoker{}, &MockInvoker{}} + urlString := "dubbo://" + localIP + "/com.foo.BarService" + dubboURL, _ := common.NewURL(urlString) + mockInvoker1 := NewMockInvoker(dubboURL, 1) + mockInvoker2 := NewMockInvoker(dubboURL, 1) + mockInvoker3 := NewMockInvoker(dubboURL, 1) + invokers := []protocol.Invoker{mockInvoker1, mockInvoker2, mockInvoker3} inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP)) curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*ConditionRouter).Route(invokers, curl, inv) + router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule)) + fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, invokers, fileredInvokers) } @@ -254,8 +259,8 @@ func TestRoute_HostFilter(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP)) curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*ConditionRouter).Route(invokers, curl, inv) + router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule)) + fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 2, len(fileredInvokers)) assert.Equal(t, invoker2, fileredInvokers[0]) assert.Equal(t, invoker3, fileredInvokers[1]) @@ -273,8 +278,8 @@ func TestRoute_Empty_HostFilter(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte(" => " + " host = " + localIP)) curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*ConditionRouter).Route(invokers, curl, inv) + router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule)) + fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 2, len(fileredInvokers)) assert.Equal(t, invoker2, fileredInvokers[0]) assert.Equal(t, invoker3, fileredInvokers[1]) @@ -292,8 +297,8 @@ func TestRoute_False_HostFilter(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("true => " + " host = " + localIP)) curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*ConditionRouter).Route(invokers, curl, inv) + router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule)) + fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 2, len(fileredInvokers)) assert.Equal(t, invoker2, fileredInvokers[0]) assert.Equal(t, invoker3, fileredInvokers[1]) @@ -311,8 +316,8 @@ func TestRoute_Placeholder(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = $host")) curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrl(rule)) - fileredInvokers := router.(*ConditionRouter).Route(invokers, curl, inv) + router, _ := newConditionRouterFactory().NewRouter(getRouteUrl(rule)) + fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 2, len(fileredInvokers)) assert.Equal(t, invoker2, fileredInvokers[0]) assert.Equal(t, invoker3, fileredInvokers[1]) @@ -330,8 +335,8 @@ func TestRoute_NoForce(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 1.2.3.4")) curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrlWithNoForce(rule)) - fileredInvokers := router.(*ConditionRouter).Route(invokers, curl, inv) + router, _ := newConditionRouterFactory().NewRouter(getRouteUrlWithNoForce(rule)) + fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, invokers, fileredInvokers) } @@ -347,7 +352,17 @@ func TestRoute_Force(t *testing.T) { inv := &invocation.RPCInvocation{} rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = 1.2.3.4")) curl, _ := common.NewURL("consumer://" + localIP + "/com.foo.BarService") - router, _ := NewConditionRouterFactory().Router(getRouteUrlWithForce(rule, "true")) - fileredInvokers := router.(*ConditionRouter).Route(invokers, curl, inv) + router, _ := newConditionRouterFactory().NewRouter(getRouteUrlWithForce(rule, "true")) + fileredInvokers := router.(*ConditionRouter).Route(invokers, &curl, inv) assert.Equal(t, 0, len(fileredInvokers)) } + +func TestNewConditionRouterFactory(t *testing.T) { + factory := newConditionRouterFactory() + assert.NotNil(t, factory) +} + +func TestNewAppRouterFactory(t *testing.T) { + factory := newAppRouterFactory() + assert.NotNil(t, factory) +} diff --git a/cluster/router/condition/file.go b/cluster/router/condition/file.go new file mode 100644 index 0000000000..efeec53efc --- /dev/null +++ b/cluster/router/condition/file.go @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package condition + +import ( + "encoding/base64" + "net/url" + "strconv" + "strings" + "sync" +) + +import ( + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" +) + +// FileConditionRouter Use for parse config file of condition router +type FileConditionRouter struct { + listenableRouter + parseOnce sync.Once + url common.URL +} + +// NewFileConditionRouter Create file condition router instance with content ( from config file) +func NewFileConditionRouter(content []byte) (*FileConditionRouter, error) { + fileRouter := &FileConditionRouter{} + rule, err := Parse(string(content)) + if err != nil { + return nil, perrors.Errorf("yaml.Unmarshal() failed , error:%v", perrors.WithStack(err)) + } + + if !rule.Valid { + return nil, perrors.Errorf("rule content is not verify for condition router , error:%v", perrors.WithStack(err)) + } + + fileRouter.generateConditions(rule) + + return fileRouter, nil +} + +// URL Return URL in file condition router n +func (f *FileConditionRouter) URL() common.URL { + f.parseOnce.Do(func() { + routerRule := f.routerRule + rule := parseCondition(routerRule.Conditions) + f.url = *common.NewURLWithOptions( + common.WithProtocol(constant.CONDITION_ROUTE_PROTOCOL), + common.WithIp(constant.ANYHOST_VALUE), + common.WithParams(url.Values{}), + common.WithParamsValue(constant.RouterForce, strconv.FormatBool(routerRule.Force)), + common.WithParamsValue(constant.RouterPriority, strconv.Itoa(routerRule.Priority)), + common.WithParamsValue(constant.RULE_KEY, base64.URLEncoding.EncodeToString([]byte(rule))), + common.WithParamsValue(constant.ROUTER_KEY, constant.CONDITION_ROUTE_PROTOCOL), + common.WithParamsValue(constant.CATEGORY_KEY, constant.ROUTERS_CATEGORY)) + }) + return f.url +} + +func parseCondition(conditions []string) string { + var ( + when string + then string + ) + for _, condition := range conditions { + condition = strings.Trim(condition, " ") + if strings.Contains(condition, "=>") { + array := strings.SplitN(condition, "=>", 2) + consumer := strings.Trim(array[0], " ") + provider := strings.Trim(array[1], " ") + if len(consumer) != 0 { + if len(when) != 0 { + when = strings.Join([]string{when, consumer}, " & ") + } else { + when = consumer + } + } + if len(provider) != 0 { + if len(then) != 0 { + then = strings.Join([]string{then, provider}, " & ") + } else { + then = provider + } + } + + } + + } + + return strings.Join([]string{when, then}, " => ") +} diff --git a/cluster/router/condition/file_test.go b/cluster/router/condition/file_test.go new file mode 100644 index 0000000000..3092b12ff8 --- /dev/null +++ b/cluster/router/condition/file_test.go @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package condition + +import ( + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +func TestLoadYmlConfig(t *testing.T) { + router, e := NewFileConditionRouter([]byte(`priority: 1 +force: true +conditions : + - "a => b" + - "c => d"`)) + assert.Nil(t, e) + assert.NotNil(t, router) + assert.Equal(t, router.routerRule.Priority, 1) + assert.Equal(t, router.routerRule.Force, true) + assert.Equal(t, len(router.routerRule.Conditions), 2) +} + +func TestParseCondition(t *testing.T) { + s := make([]string, 2) + s = append(s, "a => b") + s = append(s, "c => d") + condition := parseCondition(s) + assert.Equal(t, "a & c => b & d", condition) +} + +func TestFileRouterURL(t *testing.T) { + router, e := NewFileConditionRouter([]byte(`priority: 1 +force: true +conditions : + - "a => b" + - "c => d"`)) + assert.Nil(t, e) + assert.NotNil(t, router) + assert.Equal(t, "condition://0.0.0.0:?category=routers&force=true&priority=1&router=condition&rule=YSAmIGMgPT4gYiAmIGQ%3D", router.URL().String()) +} diff --git a/cluster/router/condition/listenable_router.go b/cluster/router/condition/listenable_router.go new file mode 100644 index 0000000000..ba2fbb0eb2 --- /dev/null +++ b/cluster/router/condition/listenable_router.go @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package condition + +import ( + "fmt" +) + +import ( + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/config" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/remoting" +) + +const ( + // Default priority for listenable router, use the maximum int64 value + listenableRouterDefaultPriority = ^int64(0) +) + +// listenableRouter Abstract router which listens to dynamic configuration +type listenableRouter struct { + conditionRouters []*ConditionRouter + routerRule *RouterRule + url *common.URL + force bool + priority int64 +} + +// RouterRule Get RouterRule instance from listenableRouter +func (l *listenableRouter) RouterRule() *RouterRule { + return l.routerRule +} + +func newListenableRouter(url *common.URL, ruleKey string) (*AppRouter, error) { + if ruleKey == "" { + return nil, perrors.Errorf("NewListenableRouter ruleKey is nil, can't create Listenable router") + } + l := &AppRouter{} + + l.url = url + l.priority = listenableRouterDefaultPriority + + routerKey := ruleKey + constant.ConditionRouterRuleSuffix + //add listener + dynamicConfiguration := config.GetEnvInstance().GetDynamicConfiguration() + if dynamicConfiguration == nil { + return nil, perrors.Errorf("Get dynamicConfiguration fail, dynamicConfiguration is nil, init config center plugin please") + } + + dynamicConfiguration.AddListener(routerKey, l) + //get rule + rule, err := dynamicConfiguration.GetRule(routerKey, config_center.WithGroup(config_center.DEFAULT_GROUP)) + if len(rule) == 0 || err != nil { + return nil, perrors.Errorf("Get rule fail, config rule{%s}, error{%v}", rule, err) + } + l.Process(&config_center.ConfigChangeEvent{ + Key: routerKey, + Value: rule, + ConfigType: remoting.EventTypeUpdate}) + + logger.Info("Init app router success") + return l, nil +} + +// Process Process config change event , generate routers and set them to the listenableRouter instance +func (l *listenableRouter) Process(event *config_center.ConfigChangeEvent) { + logger.Infof("Notification of condition rule, change type is:[%s] , raw rule is:[%v]", event.ConfigType, event.Value) + if remoting.EventTypeDel == event.ConfigType { + l.routerRule = nil + if l.conditionRouters != nil { + l.conditionRouters = l.conditionRouters[:0] + } + return + } + content, ok := event.Value.(string) + if !ok { + msg := fmt.Sprintf("Convert event content fail,raw content:[%s] ", event.Value) + logger.Error(msg) + return + } + + routerRule, err := Parse(content) + if err != nil { + logger.Errorf("Parse condition router rule fail,error:[%s] ", err) + return + } + l.generateConditions(routerRule) +} + +func (l *listenableRouter) generateConditions(rule *RouterRule) { + if rule == nil || !rule.Valid { + return + } + l.conditionRouters = make([]*ConditionRouter, 0, len(rule.Conditions)) + l.routerRule = rule + for _, c := range rule.Conditions { + router, e := NewConditionRouterWithRule(c) + if e != nil { + logger.Errorf("Create condition router with rule fail,raw rule:[%s] ", c) + continue + } + router.Force = rule.Force + router.enabled = rule.Enabled + l.conditionRouters = append(l.conditionRouters, router) + } +} + +// Route Determine the target invokers list. +func (l *listenableRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { + if len(invokers) == 0 || len(l.conditionRouters) == 0 { + return invokers + } + //We will check enabled status inside each router. + for _, r := range l.conditionRouters { + invokers = r.Route(invokers, url, invocation) + } + return invokers +} + +// Priority Return Priority in listenable router +func (l *listenableRouter) Priority() int64 { + return l.priority +} + +// URL Return URL in listenable router +func (l *listenableRouter) URL() common.URL { + return *l.url +} diff --git a/cluster/router/condition_router.go b/cluster/router/condition/router.go similarity index 63% rename from cluster/router/condition_router.go rename to cluster/router/condition/router.go index c38e9718cc..c5d46444bd 100644 --- a/cluster/router/condition_router.go +++ b/cluster/router/condition/router.go @@ -15,57 +15,55 @@ * limitations under the License. */ -package router +package condition import ( - "reflect" "regexp" "strings" ) import ( - gxset "github.com/dubbogo/gost/container/set" - gxnet "github.com/dubbogo/gost/net" perrors "github.com/pkg/errors" ) import ( + matcher "github.com/apache/dubbo-go/cluster/router/match" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" + "github.com/dubbogo/gost/container/set" + "github.com/dubbogo/gost/net" ) const ( - //ROUTE_PATTERN route pattern regex - ROUTE_PATTERN = `([&!=,]*)\\s*([^&!=,\\s]+)` - // FORCE ... - FORCE = "force" - // PRIORITY ... - PRIORITY = "priority" + //pattern route pattern regex + pattern = `([&!=,]*)\\s*([^&!=,\\s]+)` ) -//ConditionRouter condition router struct +var ( + routerPatternReg = regexp.MustCompile(`([&!=,]*)\s*([^&!=,\s]+)`) +) + +// ConditionRouter Condition router struct type ConditionRouter struct { Pattern string - Url *common.URL - Priority int64 + url *common.URL + priority int64 Force bool + enabled bool WhenCondition map[string]MatchPair ThenCondition map[string]MatchPair } -func newConditionRouter(url *common.URL) (*ConditionRouter, error) { +// NewConditionRouterWithRule Init condition router by raw rule +func NewConditionRouterWithRule(rule string) (*ConditionRouter, error) { var ( whenRule string thenRule string when map[string]MatchPair then map[string]MatchPair ) - rule, err := url.GetParamAndDecoded(constant.RULE_KEY) - if err != nil || len(rule) == 0 { - return nil, perrors.Errorf("Illegal route rule!") - } rule = strings.Replace(rule, "consumer.", "", -1) rule = strings.Replace(rule, "provider.", "", -1) i := strings.Index(rule, "=>") @@ -98,31 +96,61 @@ func newConditionRouter(url *common.URL) (*ConditionRouter, error) { then = t } return &ConditionRouter{ - ROUTE_PATTERN, - url, - url.GetParamInt(PRIORITY, 0), - url.GetParamBool(FORCE, false), - when, - then, + Pattern: pattern, + WhenCondition: when, + ThenCondition: then, }, nil } -// Route -// Router determine the target server list. -func (c *ConditionRouter) Route(invokers []protocol.Invoker, url common.URL, invocation protocol.Invocation) []protocol.Invoker { - if len(invokers) == 0 { - return invokers +// NewConditionRouter Init condition router by URL +func NewConditionRouter(url *common.URL) (*ConditionRouter, error) { + if url == nil { + return nil, perrors.Errorf("Illegal route URL!") + } + rule, err := url.GetParamAndDecoded(constant.RULE_KEY) + if err != nil || len(rule) == 0 { + return nil, perrors.Errorf("Illegal route rule!") } - isMatchWhen, err := c.MatchWhen(url, invocation) + + router, err := NewConditionRouterWithRule(rule) if err != nil { + return nil, err + } - var urls []string - for _, invo := range invokers { - urls = append(urls, reflect.TypeOf(invo).String()) - } - logger.Warnf("Failed to execute condition router rule: %s , invokers: [%s], cause: %v", c.Url.String(), strings.Join(urls, ","), err) + router.url = url + router.priority = url.GetParamInt(constant.RouterPriority, 0) + router.Force = url.GetParamBool(constant.RouterForce, false) + router.enabled = url.GetParamBool(constant.RouterEnabled, true) + + return router, nil +} + +// Priority Return Priority in condition router +func (c *ConditionRouter) Priority() int64 { + return c.priority +} + +// URL Return URL in condition router +func (c *ConditionRouter) URL() common.URL { + return *c.url +} + +// Enabled Return is condition router is enabled +// true: enabled +// false: disabled +func (c *ConditionRouter) Enabled() bool { + return c.enabled +} + +// Route Determine the target invokers list. +func (c *ConditionRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { + if !c.Enabled() { + return invokers + } + if len(invokers) == 0 { return invokers } + isMatchWhen := c.MatchWhen(url, invocation) if !isMatchWhen { return invokers } @@ -130,17 +158,9 @@ func (c *ConditionRouter) Route(invokers []protocol.Invoker, url common.URL, inv if len(c.ThenCondition) == 0 { return result } - localIP, _ := gxnet.GetLocalIP() for _, invoker := range invokers { - isMatchThen, err := c.MatchThen(invoker.GetUrl(), url) - if err != nil { - var urls []string - for _, invo := range invokers { - urls = append(urls, reflect.TypeOf(invo).String()) - } - logger.Warnf("Failed to execute condition router rule: %s , invokers: [%s], cause: %v", c.Url.String(), strings.Join(urls, ","), err) - return invokers - } + invokerUrl := invoker.GetUrl() + isMatchThen := c.MatchThen(&invokerUrl, url) if isMatchThen { result = append(result, invoker) } @@ -149,6 +169,7 @@ func (c *ConditionRouter) Route(invokers []protocol.Invoker, url common.URL, inv return result } else if c.Force { rule, _ := url.GetParamAndDecoded(constant.RULE_KEY) + localIP, _ := gxnet.GetLocalIP() logger.Warnf("The route result is empty and force execute. consumer: %s, service: %s, router: %s", localIP, url.Service(), rule) return result } @@ -162,15 +183,10 @@ func parseRule(rule string) (map[string]MatchPair, error) { } var ( - pair MatchPair - startIndex int + pair MatchPair ) values := gxset.NewSet() - reg := regexp.MustCompile(`([&!=,]*)\s*([^&!=,\s]+)`) - if indexTuple := reg.FindIndex([]byte(rule)); len(indexTuple) > 0 { - startIndex = indexTuple[0] - } - matches := reg.FindAllSubmatch([]byte(rule), -1) + matches := routerPatternReg.FindAllSubmatch([]byte(rule), -1) for _, groups := range matches { separator := string(groups[1]) content := string(groups[2]) @@ -193,22 +209,26 @@ func parseRule(rule string) (map[string]MatchPair, error) { } case "=": if &pair == nil { + var startIndex = getStartIndex(rule) return nil, perrors.Errorf("Illegal route rule \"%s\", The error char '%s' at index %d before \"%d\".", rule, separator, startIndex, startIndex) } values = pair.Matches values.Add(content) case "!=": if &pair == nil { + var startIndex = getStartIndex(rule) return nil, perrors.Errorf("Illegal route rule \"%s\", The error char '%s' at index %d before \"%d\".", rule, separator, startIndex, startIndex) } values = pair.Mismatches values.Add(content) case ",": if values.Empty() { + var startIndex = getStartIndex(rule) return nil, perrors.Errorf("Illegal route rule \"%s\", The error char '%s' at index %d before \"%d\".", rule, separator, startIndex, startIndex) } values.Add(content) default: + var startIndex = getStartIndex(rule) return nil, perrors.Errorf("Illegal route rule \"%s\", The error char '%s' at index %d before \"%d\".", rule, separator, startIndex, startIndex) } @@ -216,23 +236,31 @@ func parseRule(rule string) (map[string]MatchPair, error) { return condition, nil } -//MatchWhen MatchWhen -func (c *ConditionRouter) MatchWhen(url common.URL, invocation protocol.Invocation) (bool, error) { - condition, err := MatchCondition(c.WhenCondition, &url, nil, invocation) - return len(c.WhenCondition) == 0 || condition, err +func getStartIndex(rule string) int { + if indexTuple := routerPatternReg.FindIndex([]byte(rule)); len(indexTuple) > 0 { + return indexTuple[0] + } + return -1 } -//MatchThen MatchThen -func (c *ConditionRouter) MatchThen(url common.URL, param common.URL) (bool, error) { - condition, err := MatchCondition(c.ThenCondition, &url, ¶m, nil) - return len(c.ThenCondition) > 0 && condition, err +// MatchWhen MatchWhen +func (c *ConditionRouter) MatchWhen(url *common.URL, invocation protocol.Invocation) bool { + condition := matchCondition(c.WhenCondition, url, nil, invocation) + return len(c.WhenCondition) == 0 || condition } -//MatchCondition MatchCondition -func MatchCondition(pairs map[string]MatchPair, url *common.URL, param *common.URL, invocation protocol.Invocation) (bool, error) { +// MatchThen MatchThen +func (c *ConditionRouter) MatchThen(url *common.URL, param *common.URL) bool { + condition := matchCondition(c.ThenCondition, url, param, nil) + return len(c.ThenCondition) > 0 && condition +} + +// MatchCondition MatchCondition +func matchCondition(pairs map[string]MatchPair, url *common.URL, param *common.URL, invocation protocol.Invocation) bool { sample := url.ToMap() if sample == nil { - return true, perrors.Errorf("url is not allowed be nil") + // because url.ToMap() may return nil, but it should continue to process make condition + sample = make(map[string]string) } var result bool for key, matchPair := range pairs { @@ -248,22 +276,22 @@ func MatchCondition(pairs map[string]MatchPair, url *common.URL, param *common.U } if len(sampleValue) > 0 { if !matchPair.isMatch(sampleValue, param) { - return false, nil + return false } result = true } else { if !(matchPair.Matches.Empty()) { - return false, nil + return false } result = true } } - return result, nil + return result } -// MatchPair ... +// MatchPair Match key pair , condition process type MatchPair struct { Matches *gxset.HashSet Mismatches *gxset.HashSet @@ -273,7 +301,7 @@ func (pair MatchPair) isMatch(value string, param *common.URL) bool { if !pair.Matches.Empty() && pair.Mismatches.Empty() { for match := range pair.Matches.Items { - if isMatchGlobPattern(match.(string), value, param) { + if matcher.IsMatchGlobalPattern(match.(string), value, param) { return true } } @@ -282,20 +310,21 @@ func (pair MatchPair) isMatch(value string, param *common.URL) bool { if !pair.Mismatches.Empty() && pair.Matches.Empty() { for mismatch := range pair.Mismatches.Items { - if isMatchGlobPattern(mismatch.(string), value, param) { + if matcher.IsMatchGlobalPattern(mismatch.(string), value, param) { return false } } return true } if !pair.Mismatches.Empty() && !pair.Matches.Empty() { + //when both mismatches and matches contain the same value, then using mismatches first for mismatch := range pair.Mismatches.Items { - if isMatchGlobPattern(mismatch.(string), value, param) { + if matcher.IsMatchGlobalPattern(mismatch.(string), value, param) { return false } } for match := range pair.Matches.Items { - if isMatchGlobPattern(match.(string), value, param) { + if matcher.IsMatchGlobalPattern(match.(string), value, param) { return true } } @@ -303,31 +332,3 @@ func (pair MatchPair) isMatch(value string, param *common.URL) bool { } return false } - -func isMatchGlobPattern(pattern string, value string, param *common.URL) bool { - if param != nil && strings.HasPrefix(pattern, "$") { - pattern = param.GetRawParam(pattern[1:]) - } - if "*" == pattern { - return true - } - if len(pattern) == 0 && len(value) == 0 { - return true - } - if len(pattern) == 0 || len(value) == 0 { - return false - } - i := strings.LastIndex(pattern, "*") - switch i { - case -1: - return value == pattern - case len(pattern) - 1: - return strings.HasPrefix(value, pattern[0:i]) - case 0: - return strings.HasSuffix(value, pattern[:i+1]) - default: - prefix := pattern[0:1] - suffix := pattern[i+1:] - return strings.HasPrefix(value, prefix) && strings.HasSuffix(value, suffix) - } -} diff --git a/cluster/router/condition/router_rule.go b/cluster/router/condition/router_rule.go new file mode 100644 index 0000000000..1374cf9de2 --- /dev/null +++ b/cluster/router/condition/router_rule.go @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package condition + +import ( + "gopkg.in/yaml.v2" +) + +import ( + "github.com/apache/dubbo-go/cluster/router" +) + +// RouterRule RouterRule config read from config file or config center +type RouterRule struct { + router.BaseRouterRule `yaml:",inline""` + Conditions []string +} + +/* Parse Router raw rule parser + * example : + * scope: application + * runtime: true + * force: false + * conditions: + * - > + * method!=sayHello => + * - > + * ip=127.0.0.1 + * => + * 1.1.1.1 + */ +func Parse(rawRule string) (*RouterRule, error) { + r := &RouterRule{} + err := yaml.Unmarshal([]byte(rawRule), r) + if err != nil { + return r, err + } + r.RawRule = rawRule + if len(r.Conditions) != 0 { + r.Valid = true + } + + return r, nil +} diff --git a/cluster/router/condition/router_rule_test.go b/cluster/router/condition/router_rule_test.go new file mode 100644 index 0000000000..5acc728391 --- /dev/null +++ b/cluster/router/condition/router_rule_test.go @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package condition + +import ( + "testing" +) +import ( + "github.com/stretchr/testify/assert" +) + +func TestParse(t *testing.T) { + testyml := ` +scope: application +runtime: true +force: false +conditions: + - > + method!=sayHello => + - > + ip=127.0.0.1 + => + 1.1.1.1` + rule, e := Parse(testyml) + + assert.Nil(t, e) + assert.NotNil(t, rule) + assert.Equal(t, 2, len(rule.Conditions)) + assert.Equal(t, "application", rule.Scope) + assert.True(t, rule.Runtime) + assert.Equal(t, false, rule.Force) + assert.Equal(t, testyml, rule.RawRule) + assert.True(t, true, rule.Valid) + assert.Equal(t, false, rule.Enabled) + assert.Equal(t, false, rule.Dynamic) + assert.Equal(t, "", rule.Key) +} diff --git a/cluster/router/match/match_utils.go b/cluster/router/match/match_utils.go new file mode 100644 index 0000000000..28fe7151c5 --- /dev/null +++ b/cluster/router/match/match_utils.go @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package match + +import ( + "strings" +) + +import ( + "github.com/apache/dubbo-go/common" +) + +// IsMatchGlobalPattern Match value to param content by pattern +func IsMatchGlobalPattern(pattern string, value string, param *common.URL) bool { + if param != nil && strings.HasPrefix(pattern, "$") { + pattern = param.GetRawParam(pattern[1:]) + } + return isMatchInternalPattern(pattern, value) +} + +func isMatchInternalPattern(pattern string, value string) bool { + if "*" == pattern { + return true + } + if len(pattern) == 0 && len(value) == 0 { + return true + } + if len(pattern) == 0 || len(value) == 0 { + return false + } + i := strings.LastIndex(pattern, "*") + switch i { + case -1: + // doesn't find "*" + return value == pattern + case len(pattern) - 1: + // "*" is at the end + return strings.HasPrefix(value, pattern[0:i]) + case 0: + // "*" is at the beginning + return strings.HasSuffix(value, pattern[i+1:]) + default: + // "*" is in the middle + prefix := pattern[0:1] + suffix := pattern[i+1:] + return strings.HasPrefix(value, prefix) && strings.HasSuffix(value, suffix) + } +} diff --git a/cluster/router/match/match_utils_test.go b/cluster/router/match/match_utils_test.go new file mode 100644 index 0000000000..f16480f1d3 --- /dev/null +++ b/cluster/router/match/match_utils_test.go @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package match + +import ( + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" +) + +func TestIsMatchInternalPattern(t *testing.T) { + assert.Equal(t, true, isMatchInternalPattern("*", "value")) + assert.Equal(t, true, isMatchInternalPattern("", "")) + assert.Equal(t, false, isMatchInternalPattern("", "value")) + assert.Equal(t, true, isMatchInternalPattern("value", "value")) + assert.Equal(t, true, isMatchInternalPattern("v*", "value")) + assert.Equal(t, true, isMatchInternalPattern("*ue", "value")) + assert.Equal(t, true, isMatchInternalPattern("*e", "value")) + assert.Equal(t, true, isMatchInternalPattern("v*e", "value")) +} + +func TestIsMatchGlobPattern(t *testing.T) { + url, _ := common.NewURL("dubbo://localhost:8080/Foo?key=v*e") + assert.Equal(t, true, IsMatchGlobalPattern("$key", "value", &url)) +} diff --git a/cluster/router/router.go b/cluster/router/router.go new file mode 100644 index 0000000000..a28002a09e --- /dev/null +++ b/cluster/router/router.go @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package router + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) + +// Extension - Router + +// RouterFactory Router create factory +type RouterFactory interface { + // NewRouter Create router instance with URL + NewRouter(*common.URL) (Router, error) +} + +// RouterFactory Router create factory use for parse config file +type FIleRouterFactory interface { + // NewFileRouters Create file router with config file + NewFileRouter([]byte) (Router, error) +} + +// Router +type Router interface { + // Route Determine the target invokers list. + Route([]protocol.Invoker, *common.URL, protocol.Invocation) []protocol.Invoker + // Priority Return Priority in router + // 0 to ^int(0) is better + Priority() int64 + // URL Return URL in router + URL() common.URL +} + +// Chain +type Chain interface { + // Route Determine the target invokers list with chain. + Route([]protocol.Invoker, *common.URL, protocol.Invocation) []protocol.Invoker + // AddRouters Add routers + AddRouters([]Router) +} diff --git a/cluster/router.go b/cluster/router/rule.go similarity index 63% rename from cluster/router.go rename to cluster/router/rule.go index 589eb9a269..42c08a7009 100644 --- a/cluster/router.go +++ b/cluster/router/rule.go @@ -15,31 +15,17 @@ * limitations under the License. */ -package cluster - -import ( - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/protocol" -) - -// Extension - Router - -// RouterFactory ... -type RouterFactory interface { - Router(*common.URL) (Router, error) -} - -// Router ... -type Router interface { - Route([]protocol.Invoker, common.URL, protocol.Invocation) []protocol.Invoker -} - -// RouterChain ... -type RouterChain struct { - routers []Router -} - -// NewRouterChain ... -func NewRouterChain(url common.URL) { - +package router + +// BaseRouterRule +type BaseRouterRule struct { + RawRule string + Runtime bool + Force bool + Valid bool + Enabled bool + Priority int + Dynamic bool + Scope string + Key string } diff --git a/common/constant/env.go b/common/constant/env.go index cb5394bb82..5376323328 100644 --- a/common/constant/env.go +++ b/common/constant/env.go @@ -24,4 +24,6 @@ const ( CONF_PROVIDER_FILE_PATH = "CONF_PROVIDER_FILE_PATH" // APP_LOG_CONF_FILE ... APP_LOG_CONF_FILE = "APP_LOG_CONF_FILE" + // CONF_ROUTER_FILE_PATH Specify Path variable of router config file + CONF_ROUTER_FILE_PATH = "CONF_ROUTER_FILE_PATH" ) diff --git a/common/constant/key.go b/common/constant/key.go index eff704371c..4536d945c3 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -40,6 +40,7 @@ const ( TOKEN_KEY = "token" LOCAL_ADDR = "local-addr" REMOTE_ADDR = "remote-addr" + PATH_SEPARATOR = "/" ) const ( @@ -89,16 +90,23 @@ const ( ) const ( - APPLICATION_KEY = "application" - ORGANIZATION_KEY = "organization" - NAME_KEY = "name" - MODULE_KEY = "module" - APP_VERSION_KEY = "app.version" - OWNER_KEY = "owner" - ENVIRONMENT_KEY = "environment" - METHOD_KEY = "method" - METHOD_KEYS = "methods" - RULE_KEY = "rule" + APPLICATION_KEY = "application" + ORGANIZATION_KEY = "organization" + NAME_KEY = "name" + MODULE_KEY = "module" + APP_VERSION_KEY = "app.version" + OWNER_KEY = "owner" + ENVIRONMENT_KEY = "environment" + METHOD_KEY = "method" + METHOD_KEYS = "methods" + RULE_KEY = "rule" + RUNTIME_KEY = "runtime" + BACKUP_KEY = "backup" + ROUTERS_CATEGORY = "routers" + ROUTE_PROTOCOL = "route" + CONDITION_ROUTE_PROTOCOL = "condition" + PROVIDERS_CATEGORY = "providers" + ROUTER_KEY = "router" ) const ( @@ -120,6 +128,7 @@ const ( ProviderConfigPrefix = "dubbo.provider." ConsumerConfigPrefix = "dubbo.consumer." ShutdownConfigPrefix = "dubbo.shutdown." + RouterConfigPrefix = "dubbo.router." ) const ( @@ -142,6 +151,26 @@ const ( TRACING_REMOTE_SPAN_CTX = "tracing.remote.span.ctx" ) +// Use for router module +const ( + // ConditionRouterName Specify file condition router name + ConditionRouterName = "condition" + // ConditionAppRouterName Specify listenable application router name + ConditionAppRouterName = "app" + // ListenableRouterName Specify listenable router name + ListenableRouterName = "listenable" + + // ConditionRouterRuleSuffix Specify condition router suffix + ConditionRouterRuleSuffix = ".condition-router" + + // Force Force key in router module + RouterForce = "force" + // Enabled Enabled key in router module + RouterEnabled = "enabled" + // Priority Priority key in router module + RouterPriority = "priority" +) + const ( CONSUMER_SIGN_FILTER = "sign" PROVIDER_AUTH_FILTER = "auth" diff --git a/common/extension/router_factory.go b/common/extension/router_factory.go index c77cc29136..70d71dfa85 100644 --- a/common/extension/router_factory.go +++ b/common/extension/router_factory.go @@ -18,23 +18,50 @@ package extension import ( - "github.com/apache/dubbo-go/cluster" + "sync" +) + +import ( + "github.com/apache/dubbo-go/cluster/router" ) var ( - routers = make(map[string]func() cluster.RouterFactory) + routers = make(map[string]func() router.RouterFactory) + fileRouterFactoryOnce sync.Once + fileRouterFactories = make(map[string]router.FIleRouterFactory) ) -// SetRouterFactory ... -func SetRouterFactory(name string, fun func() cluster.RouterFactory) { +// SetRouterFactory Set create router factory function by name +func SetRouterFactory(name string, fun func() router.RouterFactory) { routers[name] = fun } -// GetRouterFactory ... -func GetRouterFactory(name string) cluster.RouterFactory { +// GetRouterFactory Get create router factory function by name +func GetRouterFactory(name string) router.RouterFactory { if routers[name] == nil { panic("router_factory for " + name + " is not existing, make sure you have import the package.") } return routers[name]() +} +// GetRouterFactories Get all create router factory function +func GetRouterFactories() map[string]func() router.RouterFactory { + return routers +} + +// GetFileRouterFactories Get all create file router factory instance +func GetFileRouterFactories() map[string]router.FIleRouterFactory { + l := len(routers) + if l == 0 { + return nil + } + fileRouterFactoryOnce.Do(func() { + for k := range routers { + factory := GetRouterFactory(k) + if fr, ok := factory.(router.FIleRouterFactory); ok { + fileRouterFactories[k] = fr + } + } + }) + return fileRouterFactories } diff --git a/common/url.go b/common/url.go index 360f0aa4ca..7ede3d94fb 100644 --- a/common/url.go +++ b/common/url.go @@ -59,8 +59,8 @@ const ( var ( // DubboNodes ... DubboNodes = [...]string{"consumers", "configurators", "routers", "providers"} - // DubboRole ... - DubboRole = [...]string{"consumer", "", "", "provider"} + // DubboRole Dubbo service role + DubboRole = [...]string{"consumer", "", "routers", "provider"} ) // RoleType ... @@ -240,7 +240,7 @@ func NewURL(urlString string, opts ...option) (URL, error) { if strings.Contains(s.Location, ":") { s.Ip, s.Port, err = net.SplitHostPort(s.Location) if err != nil { - return s, perrors.Errorf("net.SplitHostPort(Url.Host{%s}), error{%v}", s.Location, err) + return s, perrors.Errorf("net.SplitHostPort(url.Host{%s}), error{%v}", s.Location, err) } } for _, opt := range opts { @@ -373,7 +373,7 @@ func (c URL) Service() string { return service } else if c.SubURL != nil { service = c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/")) - if service != "" { //if url.path is "" then return suburl's path, special for registry Url + if service != "" { //if url.path is "" then return suburl's path, special for registry url return service } } diff --git a/config/base_config.go b/config/base_config.go index 942e966eb3..a52dc756e7 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -18,6 +18,8 @@ package config import ( + "io/ioutil" + "path" "reflect" "strconv" "strings" @@ -360,3 +362,16 @@ func initializeStruct(t reflect.Type, v reflect.Value) { } } + +// loadYmlConfig Load yml config byte from file +func loadYmlConfig(confRouterFile string) ([]byte, error) { + if len(confRouterFile) == 0 { + return nil, perrors.Errorf("application configure(provider) file name is nil") + } + + if path.Ext(confRouterFile) != ".yml" { + return nil, perrors.Errorf("application configure file name{%v} suffix must be .yml", confRouterFile) + } + + return ioutil.ReadFile(confRouterFile) +} diff --git a/config/condition_router_config.go b/config/condition_router_config.go new file mode 100644 index 0000000000..20d096a373 --- /dev/null +++ b/config/condition_router_config.go @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" +) + +//RouterInit Load config file to init router config +func RouterInit(confRouterFile string) error { + fileRouterFactories := extension.GetFileRouterFactories() + bytes, err := loadYmlConfig(confRouterFile) + if err != nil { + return perrors.Errorf("ioutil.ReadFile(file:%s) = error:%v", confRouterFile, perrors.WithStack(err)) + } + for k, factory := range fileRouterFactories { + r, e := factory.NewFileRouter(bytes) + if e == nil { + url := r.URL() + directory.AddRouterURLSet(&url) + return nil + } + logger.Warnf("router config type %s create fail \n", k) + } + return perrors.Errorf("no file router exists for parse %s , implement router.FIleRouterFactory please.", confRouterFile) +} diff --git a/config/condition_router_config_test.go b/config/condition_router_config_test.go new file mode 100644 index 0000000000..2f0a38b2fd --- /dev/null +++ b/config/condition_router_config_test.go @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "strings" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/directory" + _ "github.com/apache/dubbo-go/cluster/router/condition" +) + +const testYML = "testdata/router_config.yml" +const errorTestYML = "testdata/router_config_error.yml" + +func TestString(t *testing.T) { + + s := "a1=>a2" + s1 := "=>a2" + s2 := "a1=>" + + n := strings.SplitN(s, "=>", 2) + n1 := strings.SplitN(s1, "=>", 2) + n2 := strings.SplitN(s2, "=>", 2) + + assert.Equal(t, n[0], "a1") + assert.Equal(t, n[1], "a2") + + assert.Equal(t, n1[0], "") + assert.Equal(t, n1[1], "a2") + + assert.Equal(t, n2[0], "a1") + assert.Equal(t, n2[1], "") +} + +func TestRouterInit(t *testing.T) { + errPro := RouterInit(errorTestYML) + assert.Error(t, errPro) + + assert.Equal(t, 0, directory.GetRouterURLSet().Size()) + + errPro = RouterInit(testYML) + assert.NoError(t, errPro) + + assert.Equal(t, 1, directory.GetRouterURLSet().Size()) +} diff --git a/config/config_loader.go b/config/config_loader.go index 875d1f6ddb..437f4d7323 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -36,21 +36,26 @@ var ( metricConfig *MetricConfig applicationConfig *ApplicationConfig maxWait = 3 + confRouterFile string ) // loaded consumer & provider config from xxx.yml, and log config from xxx.xml // Namely: dubbo.consumer.xml & dubbo.provider.xml in java dubbo func init() { var ( - confConFile, confProFile string + confConFile string + confProFile string ) confConFile = os.Getenv(constant.CONF_CONSUMER_FILE_PATH) confProFile = os.Getenv(constant.CONF_PROVIDER_FILE_PATH) + confRouterFile = os.Getenv(constant.CONF_ROUTER_FILE_PATH) + if errCon := ConsumerInit(confConFile); errCon != nil { log.Printf("[consumerInit] %#v", errCon) consumerConfig = nil } + if errPro := ProviderInit(confProFile); errPro != nil { log.Printf("[providerInit] %#v", errPro) providerConfig = nil @@ -73,6 +78,13 @@ func checkApplicationName(config *ApplicationConfig) { // Load Dubbo Init func Load() { + // init router + if confRouterFile != "" { + if errPro := RouterInit(confRouterFile); errPro != nil { + log.Printf("[routerConfig init] %#v", errPro) + } + } + // reference config if consumerConfig == nil { logger.Warnf("consumerConfig is nil!") @@ -100,6 +112,7 @@ func Load() { ref.Refer(rpcService) ref.Implement(rpcService) } + //wait for invoker is available, if wait over default 3s, then panic var count int checkok := true diff --git a/config/testdata/router_config.yml b/config/testdata/router_config.yml new file mode 100644 index 0000000000..f6b91f5da7 --- /dev/null +++ b/config/testdata/router_config.yml @@ -0,0 +1,6 @@ +# dubbo router yaml configure file +priority: 1 +force: true +conditions : + - "a => b" + - "c => d" \ No newline at end of file diff --git a/config/testdata/router_config_error.yml b/config/testdata/router_config_error.yml new file mode 100644 index 0000000000..37894ac964 --- /dev/null +++ b/config/testdata/router_config_error.yml @@ -0,0 +1,6 @@ +# dubbo router yaml configure file +priority: 1 +force: true +noConditions : + - "a => b" + - "c => d" \ No newline at end of file diff --git a/config_center/dynamic_configuration.go b/config_center/dynamic_configuration.go index 90cd3bbb1d..d6c3b06b32 100644 --- a/config_center/dynamic_configuration.go +++ b/config_center/dynamic_configuration.go @@ -22,6 +22,7 @@ import ( ) import ( + "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/config_center/parser" ) @@ -73,3 +74,8 @@ func WithTimeout(time time.Duration) Option { opt.Timeout = time } } + +//GetRuleKey The format is '{interfaceName}:[version]:[group]' +func GetRuleKey(url common.URL) string { + return url.ColonSeparatedKey() +} diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 1c6d15534e..c06e883ae5 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -24,6 +24,7 @@ import ( import ( perrors "github.com/pkg/errors" + "go.uber.org/atomic" ) import ( @@ -61,6 +62,8 @@ type registryDirectory struct { consumerConfigurationListener *consumerConfigurationListener referenceConfigurationListener *referenceConfigurationListener Options + serviceKey string + forbidden atomic.Bool } // NewRegistryDirectory ... @@ -124,12 +127,22 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { } else if url.Protocol == constant.ROUTER_PROTOCOL || //2.for router url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY { url = nil - //TODO: router } switch res.Action { case remoting.EventTypeAdd, remoting.EventTypeUpdate: logger.Infof("selector add service url{%s}", res.Service) - oldInvoker = dir.cacheInvoker(url) + var urls []*common.URL + + for _, v := range directory.GetRouterURLSet().Values() { + urls = append(urls, v.(*common.URL)) + } + + if len(urls) > 0 { + dir.SetRouters(urls) + } + + //dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL) + dir.cacheInvoker(url) case remoting.EventTypeDel: oldInvoker = dir.uncacheInvoker(url) logger.Infof("selector delete service url{%s}", res.Service) @@ -178,6 +191,7 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker { for _, invokers := range groupInvokersMap { staticDir := directory.NewStaticDirectory(invokers) cluster := extension.GetCluster(dir.GetUrl().SubURL.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER)) + staticDir.BuildRouterChain(invokers) groupInvokersList = append(groupInvokersList, cluster.Join(staticDir)) } } @@ -233,8 +247,13 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { //select the protocol invokers from the directory func (dir *registryDirectory) List(invocation protocol.Invocation) []protocol.Invoker { - //TODO:router - return dir.cacheInvokers + invokers := dir.cacheInvokers + routerChain := dir.RouterChain() + + if routerChain == nil { + return invokers + } + return routerChain.Route(invokers, dir.cacheOriginUrl, invocation) } func (dir *registryDirectory) IsAvailable() bool { diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index 8ebd130d7d..5c5304463b 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -30,6 +30,8 @@ import ( import ( "github.com/apache/dubbo-go/cluster/cluster_impl" + _ "github.com/apache/dubbo-go/cluster/router" + _ "github.com/apache/dubbo-go/cluster/router/condition" "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" @@ -104,7 +106,7 @@ func TestSubscribe_Group(t *testing.T) { func Test_Destroy(t *testing.T) { registryDirectory, _ := normalRegistryDir() - time.Sleep(1e9) + time.Sleep(3e9) assert.Len(t, registryDirectory.cacheInvokers, 3) assert.Equal(t, true, registryDirectory.IsAvailable()) @@ -116,7 +118,7 @@ func Test_Destroy(t *testing.T) { func Test_List(t *testing.T) { registryDirectory, _ := normalRegistryDir() - time.Sleep(1e9) + time.Sleep(4e9) assert.Len(t, registryDirectory.List(&invocation.RPCInvocation{}), 3) assert.Equal(t, true, registryDirectory.IsAvailable())