diff --git a/cluster/cluster_impl/base_cluster_invoker.go b/cluster/cluster_impl/base_cluster_invoker.go index bbdfa715d7..ced5b15cb9 100644 --- a/cluster/cluster_impl/base_cluster_invoker.go +++ b/cluster/cluster_impl/base_cluster_invoker.go @@ -17,6 +17,10 @@ package cluster_impl +import ( + "context" +) + import ( gxnet "github.com/dubbogo/gost/net" perrors "github.com/pkg/errors" @@ -36,6 +40,7 @@ type baseClusterInvoker struct { availablecheck bool destroyed *atomic.Bool stickyInvoker protocol.Invoker + interceptor cluster.ClusterInterceptor } func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker { @@ -146,6 +151,20 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc return selectedInvoker } +func (invoker *baseClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { + if invoker.interceptor != nil { + invoker.interceptor.BeforeInvoker(ctx, invocation) + + result := invoker.interceptor.DoInvoke(ctx, invocation) + + invoker.interceptor.AfterInvoker(ctx, invocation) + + return result + } + + return nil +} + func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) bool { for _, i := range invoked { if i == selectedInvoker { diff --git a/cluster/cluster_impl/failover_cluster.go b/cluster/cluster_impl/failover_cluster.go index 254cc097e3..4c09fd16d3 100644 --- a/cluster/cluster_impl/failover_cluster.go +++ b/cluster/cluster_impl/failover_cluster.go @@ -19,16 +19,15 @@ package cluster_impl import ( "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/protocol" ) type failoverCluster struct{} -const name = "failover" - func init() { - extension.SetCluster(name, NewFailoverCluster) + extension.SetCluster(constant.FAILOVER_CLUSTER_NAME, NewFailoverCluster) } // NewFailoverCluster returns a failover cluster instance diff --git a/cluster/cluster_impl/registry_aware_cluster_invoker.go b/cluster/cluster_impl/registry_aware_cluster_invoker.go deleted file mode 100644 index 7840da5218..0000000000 --- a/cluster/cluster_impl/registry_aware_cluster_invoker.go +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cluster_impl - -import ( - "context" -) -import ( - "github.com/apache/dubbo-go/cluster" - "github.com/apache/dubbo-go/common/constant" - "github.com/apache/dubbo-go/protocol" -) - -type registryAwareClusterInvoker struct { - baseClusterInvoker -} - -func newRegistryAwareClusterInvoker(directory cluster.Directory) protocol.Invoker { - return ®istryAwareClusterInvoker{ - baseClusterInvoker: newBaseClusterInvoker(directory), - } -} - -// nolint -func (invoker *registryAwareClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { - invokers := invoker.directory.List(invocation) - //First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'default' key. - for _, invoker := range invokers { - if invoker.IsAvailable() && invoker.GetUrl().GetParam(constant.REGISTRY_DEFAULT_KEY, "false") == "true" { - return invoker.Invoke(ctx, invocation) - } - } - - //If none of the invokers has a local signal, pick the first one available. - for _, invoker := range invokers { - if invoker.IsAvailable() { - return invoker.Invoke(ctx, invocation) - } - } - return nil -} diff --git a/cluster/cluster_impl/registry_aware_cluster_test.go b/cluster/cluster_impl/registry_aware_cluster_test.go deleted file mode 100644 index 74584b4480..0000000000 --- a/cluster/cluster_impl/registry_aware_cluster_test.go +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cluster_impl - -import ( - "context" - "fmt" - "testing" -) -import ( - "github.com/stretchr/testify/assert" -) - -import ( - "github.com/apache/dubbo-go/cluster/directory" - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/protocol" - "github.com/apache/dubbo-go/protocol/invocation" -) - -func TestRegAwareInvokeSuccess(t *testing.T) { - - regAwareCluster := NewRegistryAwareCluster() - - invokers := []protocol.Invoker{} - for i := 0; i < 10; i++ { - url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) - invokers = append(invokers, NewMockInvoker(url, 1)) - } - - staticDir := directory.NewStaticDirectory(invokers) - clusterInvoker := regAwareCluster.Join(staticDir) - result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) - assert.NoError(t, result.Error()) - count = 0 -} - -func TestDestroy(t *testing.T) { - regAwareCluster := NewRegistryAwareCluster() - - invokers := []protocol.Invoker{} - for i := 0; i < 10; i++ { - url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) - invokers = append(invokers, NewMockInvoker(url, 1)) - } - - staticDir := directory.NewStaticDirectory(invokers) - clusterInvoker := regAwareCluster.Join(staticDir) - assert.Equal(t, true, clusterInvoker.IsAvailable()) - result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) - assert.NoError(t, result.Error()) - count = 0 - clusterInvoker.Destroy() - assert.Equal(t, false, clusterInvoker.IsAvailable()) - -} diff --git a/cluster/cluster_impl/registry_aware_cluster.go b/cluster/cluster_impl/zone_aware_cluster.go similarity index 63% rename from cluster/cluster_impl/registry_aware_cluster.go rename to cluster/cluster_impl/zone_aware_cluster.go index f4c0897371..7439db2d37 100644 --- a/cluster/cluster_impl/registry_aware_cluster.go +++ b/cluster/cluster_impl/zone_aware_cluster.go @@ -19,22 +19,26 @@ package cluster_impl import ( "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/protocol" ) -type registryAwareCluster struct{} +type zoneAwareCluster struct{} func init() { - extension.SetCluster("registryAware", NewRegistryAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, NewZoneAwareCluster) } -// NewRegistryAwareCluster returns a registry aware cluster instance -func NewRegistryAwareCluster() cluster.Cluster { - return ®istryAwareCluster{} +// NewZoneAwareCluster returns a zoneaware cluster instance. +// +// More than one registry for subscription. +// Usually it is used for choose between registries. +func NewZoneAwareCluster() cluster.Cluster { + return &zoneAwareCluster{} } -// nolint -func (cluster *registryAwareCluster) Join(directory cluster.Directory) protocol.Invoker { - return newRegistryAwareClusterInvoker(directory) +// Join returns a zoneAwareClusterInvoker instance +func (cluster *zoneAwareCluster) Join(directory cluster.Directory) protocol.Invoker { + return newZoneAwareClusterInvoker(directory) } diff --git a/cluster/cluster_impl/zone_aware_cluster_invoker.go b/cluster/cluster_impl/zone_aware_cluster_invoker.go new file mode 100644 index 0000000000..0f52b0442c --- /dev/null +++ b/cluster/cluster_impl/zone_aware_cluster_invoker.go @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "context" + "fmt" +) + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/protocol" +) + +// When there're more than one registry for subscription. +// +// This extension provides a strategy to decide how to distribute traffics among them: +// 1. registry marked as 'preferred=true' has the highest priority. +// 2. check the zone the current request belongs, pick the registry that has the same zone first. +// 3. Evenly balance traffic between all registries based on each registry's weight. +// 4. Pick anyone that's available. +type zoneAwareClusterInvoker struct { + baseClusterInvoker +} + +func newZoneAwareClusterInvoker(directory cluster.Directory) protocol.Invoker { + invoke := &zoneAwareClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } + // add self to interceptor + invoke.interceptor = invoke + return invoke +} + +// nolint +func (invoker *zoneAwareClusterInvoker) DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { + invokers := invoker.directory.List(invocation) + + err := invoker.checkInvokers(invokers, invocation) + if err != nil { + return &protocol.RPCResult{Err: err} + } + + // First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'preferred' key. + for _, invoker := range invokers { + key := constant.REGISTRY_KEY + "." + constant.PREFERRED_KEY + if invoker.IsAvailable() && matchParam("true", key, "false", invoker) { + return invoker.Invoke(ctx, invocation) + } + } + + // providers in the registry with the same zone + key := constant.REGISTRY_KEY + "." + constant.ZONE_KEY + zone := invocation.AttachmentsByKey(key, "") + if "" != zone { + for _, invoker := range invokers { + if invoker.IsAvailable() && matchParam(zone, key, "", invoker) { + return invoker.Invoke(ctx, invocation) + } + } + + force := invocation.AttachmentsByKey(constant.REGISTRY_KEY+"."+constant.ZONE_FORCE_KEY, "") + if "true" == force { + return &protocol.RPCResult{ + Err: fmt.Errorf("no registry instance in zone or "+ + "no available providers in the registry, zone: %v, "+ + " registries: %v", zone, invoker.GetUrl()), + } + } + } + + // load balance among all registries, with registry weight count in. + loadBalance := getLoadBalance(invokers[0], invocation) + ivk := invoker.doSelect(loadBalance, invocation, invokers, nil) + if ivk != nil && ivk.IsAvailable() { + return ivk.Invoke(ctx, invocation) + } + + // If none of the invokers has a preferred signal or is picked by the loadBalancer, pick the first one available. + for _, invoker := range invokers { + if invoker.IsAvailable() { + return invoker.Invoke(ctx, invocation) + } + } + + return &protocol.RPCResult{ + Err: fmt.Errorf("no provider available in %v", invokers), + } +} + +func (invoker *zoneAwareClusterInvoker) BeforeInvoker(ctx context.Context, invocation protocol.Invocation) { + key := constant.REGISTRY_KEY + "." + constant.ZONE_FORCE_KEY + force := ctx.Value(key) + + if force != nil { + switch value := force.(type) { + case bool: + if value { + invocation.SetAttachments(key, "true") + } + case string: + if "true" == value { + invocation.SetAttachments(key, "true") + } + default: + // ignore + } + } +} + +func (invoker *zoneAwareClusterInvoker) AfterInvoker(ctx context.Context, invocation protocol.Invocation) { + +} + +func matchParam(target, key, def string, invoker protocol.Invoker) bool { + return target == invoker.GetUrl().GetParam(key, def) +} diff --git a/cluster/cluster_impl/zone_aware_cluster_invoker_test.go b/cluster/cluster_impl/zone_aware_cluster_invoker_test.go new file mode 100644 index 0000000000..cd201a42c7 --- /dev/null +++ b/cluster/cluster_impl/zone_aware_cluster_invoker_test.go @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster_impl + +import ( + "context" + "fmt" + "testing" +) + +import ( + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/mock" +) + +func TestZoneWareInvokerWithPreferredSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + // In Go versions 1.14+, if you pass a *testing.T + // into gomock.NewController(t) you no longer need to call ctrl.Finish(). + //defer ctrl.Finish() + + mockResult := &protocol.RPCResult{ + Attrs: map[string]string{constant.PREFERRED_KEY: "true"}, + Rest: rest{tried: 0, success: true}} + + var invokers []protocol.Invoker + for i := 0; i < 2; i++ { + url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + invoker := mock.NewMockInvoker(ctrl) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() + invoker.EXPECT().GetUrl().Return(url).AnyTimes() + if 0 == i { + url.SetParam(constant.REGISTRY_KEY+"."+constant.PREFERRED_KEY, "true") + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( + func(invocation protocol.Invocation) protocol.Result { + return mockResult + }) + } else { + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( + func(invocation protocol.Invocation) protocol.Result { + return &protocol.RPCResult{} + }) + } + + invokers = append(invokers, invoker) + } + + zoneAwareCluster := NewZoneAwareCluster() + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := zoneAwareCluster.Join(staticDir) + + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) + + assert.Equal(t, mockResult, result) +} + +func TestZoneWareInvokerWithWeightSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + // In Go versions 1.14+, if you pass a *testing.T + // into gomock.NewController(t) you no longer need to call ctrl.Finish(). + //defer ctrl.Finish() + + w1 := "50" + w2 := "200" + + var invokers []protocol.Invoker + for i := 0; i < 2; i++ { + url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + invoker := mock.NewMockInvoker(ctrl) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() + invoker.EXPECT().GetUrl().Return(url).AnyTimes() + url.SetParam(constant.REGISTRY_KEY+"."+constant.REGISTRY_LABEL_KEY, "true") + if 1 == i { + url.SetParam(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, w1) + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( + func(invocation protocol.Invocation) protocol.Result { + return &protocol.RPCResult{ + Attrs: map[string]string{constant.WEIGHT_KEY: w1}, + Rest: rest{tried: 0, success: true}} + }).MaxTimes(100) + } else { + url.SetParam(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, w2) + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( + func(invocation protocol.Invocation) protocol.Result { + return &protocol.RPCResult{ + Attrs: map[string]string{constant.WEIGHT_KEY: w2}, + Rest: rest{tried: 0, success: true}} + }).MaxTimes(100) + } + invokers = append(invokers, invoker) + } + + zoneAwareCluster := NewZoneAwareCluster() + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := zoneAwareCluster.Join(staticDir) + + var w2Count, w1Count int + loop := 50 + for i := 0; i < loop; i++ { + result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{}) + if w2 == result.Attachment(constant.WEIGHT_KEY, "0") { + w2Count++ + } + if w1 == result.Attachment(constant.WEIGHT_KEY, "0") { + w1Count++ + } + assert.NoError(t, result.Error()) + } + t.Logf("loop count : %d, w1 height : %s | count : %d, w2 height : %s | count : %d", loop, + w1, w1Count, w2, w2Count) +} + +func TestZoneWareInvokerWithZoneSuccess(t *testing.T) { + var zoneArray = []string{"hangzhou", "shanghai"} + + ctrl := gomock.NewController(t) + // In Go versions 1.14+, if you pass a *testing.T + // into gomock.NewController(t) you no longer need to call ctrl.Finish(). + //defer ctrl.Finish() + + var invokers []protocol.Invoker + for i := 0; i < 2; i++ { + zoneValue := zoneArray[i] + url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + url.SetParam(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, zoneValue) + + invoker := mock.NewMockInvoker(ctrl) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() + invoker.EXPECT().GetUrl().Return(url).AnyTimes() + invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn( + func(invocation protocol.Invocation) protocol.Result { + return &protocol.RPCResult{ + Attrs: map[string]string{constant.ZONE_KEY: zoneValue}, + Rest: rest{tried: 0, success: true}} + }) + invokers = append(invokers, invoker) + } + + zoneAwareCluster := NewZoneAwareCluster() + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := zoneAwareCluster.Join(staticDir) + + inv := &invocation.RPCInvocation{} + // zone hangzhou + hz := zoneArray[0] + inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, hz) + + result := clusterInvoker.Invoke(context.Background(), inv) + + assert.Equal(t, hz, result.Attachment(constant.ZONE_KEY, "")) +} + +func TestZoneWareInvokerWithZoneForceFail(t *testing.T) { + ctrl := gomock.NewController(t) + // In Go versions 1.14+, if you pass a *testing.T + // into gomock.NewController(t) you no longer need to call ctrl.Finish(). + //defer ctrl.Finish() + + var invokers []protocol.Invoker + for i := 0; i < 2; i++ { + url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i)) + + invoker := mock.NewMockInvoker(ctrl) + invoker.EXPECT().IsAvailable().Return(true).AnyTimes() + invoker.EXPECT().GetUrl().Return(url).AnyTimes() + invokers = append(invokers, invoker) + } + + zoneAwareCluster := NewZoneAwareCluster() + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := zoneAwareCluster.Join(staticDir) + + inv := &invocation.RPCInvocation{} + // zone hangzhou + inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, "hangzhou") + // zone force + inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_FORCE_KEY, "true") + + result := clusterInvoker.Invoke(context.Background(), inv) + + assert.NotNil(t, result.Error()) +} diff --git a/cluster/cluster_interceptor.go b/cluster/cluster_interceptor.go new file mode 100644 index 0000000000..a627e81365 --- /dev/null +++ b/cluster/cluster_interceptor.go @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cluster + +import ( + "context" +) + +import ( + "github.com/apache/dubbo-go/protocol" +) + +// ClusterInterceptor +// Extension - ClusterInterceptor +type ClusterInterceptor interface { + // Before DoInvoke method + BeforeInvoker(ctx context.Context, invocation protocol.Invocation) + + // After DoInvoke method + AfterInvoker(ctx context.Context, invocation protocol.Invocation) + + // Corresponding cluster invoke + DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result +} diff --git a/cluster/loadbalance/util.go b/cluster/loadbalance/util.go index b6c013852b..684ffe11a7 100644 --- a/cluster/loadbalance/util.go +++ b/cluster/loadbalance/util.go @@ -28,23 +28,35 @@ import ( // GetWeight gets weight for load balance strategy func GetWeight(invoker protocol.Invoker, invocation protocol.Invocation) int64 { + var weight int64 url := invoker.GetUrl() - weight := url.GetMethodParamInt64(invocation.MethodName(), constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT) + // Multiple registry scenario, load balance among multiple registries. + isRegIvk := url.GetParamBool(constant.REGISTRY_KEY+"."+constant.REGISTRY_LABEL_KEY, false) + if isRegIvk { + weight = url.GetParamInt(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT) + } else { + weight = url.GetMethodParamInt64(invocation.MethodName(), constant.WEIGHT_KEY, constant.DEFAULT_WEIGHT) - if weight > 0 { - //get service register time an do warm up time - now := time.Now().Unix() - timestamp := url.GetParamInt(constant.REMOTE_TIMESTAMP_KEY, now) - if uptime := now - timestamp; uptime > 0 { - warmup := url.GetParamInt(constant.WARMUP_KEY, constant.DEFAULT_WARMUP) - if uptime < warmup { - if ww := float64(uptime) / float64(warmup) / float64(weight); ww < 1 { - weight = 1 - } else if int64(ww) <= weight { - weight = int64(ww) + if weight > 0 { + //get service register time an do warm up time + now := time.Now().Unix() + timestamp := url.GetParamInt(constant.REMOTE_TIMESTAMP_KEY, now) + if uptime := now - timestamp; uptime > 0 { + warmup := url.GetParamInt(constant.WARMUP_KEY, constant.DEFAULT_WARMUP) + if uptime < warmup { + if ww := float64(uptime) / float64(warmup) / float64(weight); ww < 1 { + weight = 1 + } else if int64(ww) <= weight { + weight = int64(ww) + } } } } } + + if weight < 0 { + weight = 0 + } + return weight } diff --git a/common/constant/cluster.go b/common/constant/cluster.go new file mode 100644 index 0000000000..6894f3595e --- /dev/null +++ b/common/constant/cluster.go @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package constant + +// nolint +const ( + FAILOVER_CLUSTER_NAME = "failover" + ZONEAWARE_CLUSTER_NAME = "zoneAware" +) diff --git a/common/constant/key.go b/common/constant/key.go index 72072ddb15..5c4b0ad669 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -97,6 +97,10 @@ const ( ROLE_KEY = "registry.role" REGISTRY_DEFAULT_KEY = "registry.default" REGISTRY_TIMEOUT_KEY = "registry.timeout" + REGISTRY_LABEL_KEY = "label" + PREFERRED_KEY = "preferred" + ZONE_KEY = "zone" + ZONE_FORCE_KEY = "zone.force" REGISTRY_TTL_KEY = "registry.ttl" ) diff --git a/config/config_loader_test.go b/config/config_loader_test.go index 01d2ca812a..a219b9f465 100644 --- a/config/config_loader_test.go +++ b/config/config_loader_test.go @@ -72,7 +72,7 @@ func TestLoad(t *testing.T) { SetProviderService(ms) extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory) Load() @@ -101,7 +101,7 @@ func TestLoadWithSingleReg(t *testing.T) { SetProviderService(ms) extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory) Load() @@ -130,7 +130,7 @@ func TestWithNoRegLoad(t *testing.T) { SetProviderService(ms) extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory) Load() diff --git a/config/reference_config.go b/config/reference_config.go index e9a895d57a..bbc875192c 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -147,13 +147,26 @@ func (c *ReferenceConfig) Refer(_ interface{}) { } // TODO(decouple from directory, config should not depend on directory module) + var hitClu string if regUrl != nil { - cluster := extension.GetCluster("registryAware") - c.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) + // for multi-subscription scenario, use 'zone-aware' policy by default + hitClu = constant.ZONEAWARE_CLUSTER_NAME } else { - cluster := extension.GetCluster(c.Cluster) - c.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) + // not a registry url, must be direct invoke. + hitClu = constant.FAILOVER_CLUSTER_NAME + if len(invokers) > 0 { + u := invokers[0].GetUrl() + if nil != &u { + hitClu = u.GetParam(constant.CLUSTER_KEY, constant.ZONEAWARE_CLUSTER_NAME) + } + } } + + cluster := extension.GetCluster(hitClu) + // If 'zone-aware' policy select, the invoker wrap sequence would be: + // ZoneAwareClusterInvoker(StaticDirectory) -> + // FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker + c.invoker = cluster.Join(directory.NewStaticDirectory(invokers)) } // create proxy diff --git a/config/reference_config_test.go b/config/reference_config_test.go index 45cdb2dfac..e457801596 100644 --- a/config/reference_config_test.go +++ b/config/reference_config_test.go @@ -187,10 +187,10 @@ func doInitConsumerWithSingleRegistry() { } } -func TestReferMultireg(t *testing.T) { +func TestReferMultiReg(t *testing.T) { doInitConsumer() extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) for _, reference := range consumerConfig.References { reference.Refer(nil) @@ -203,7 +203,7 @@ func TestReferMultireg(t *testing.T) { func TestRefer(t *testing.T) { doInitConsumer() extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) for _, reference := range consumerConfig.References { reference.Refer(nil) @@ -217,7 +217,7 @@ func TestRefer(t *testing.T) { func TestReferAsync(t *testing.T) { doInitConsumerAsync() extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) for _, reference := range consumerConfig.References { reference.Refer(nil) @@ -275,7 +275,7 @@ func TestReferMultiP2PWithReg(t *testing.T) { func TestImplement(t *testing.T) { doInitConsumer() extension.SetProtocol("registry", GetProtocol) - extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) + extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster) for _, reference := range consumerConfig.References { reference.Refer(nil) reference.Implement(&MockService{}) diff --git a/config/registry_config.go b/config/registry_config.go index 703606b836..89566c428e 100644 --- a/config/registry_config.go +++ b/config/registry_config.go @@ -41,11 +41,20 @@ type RegistryConfig struct { Group string `yaml:"group" json:"group,omitempty" property:"group"` TTL string `yaml:"ttl" default:"10m" json:"ttl,omitempty" property:"ttl"` // unit: minute // for registry - Address string `yaml:"address" json:"address,omitempty" property:"address"` - Username string `yaml:"username" json:"username,omitempty" property:"username"` - Password string `yaml:"password" json:"password,omitempty" property:"password"` - Simplified bool `yaml:"simplified" json:"simplified,omitempty" property:"simplified"` - Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` + Address string `yaml:"address" json:"address,omitempty" property:"address"` + Username string `yaml:"username" json:"username,omitempty" property:"username"` + Password string `yaml:"password" json:"password,omitempty" property:"password"` + Simplified bool `yaml:"simplified" json:"simplified,omitempty" property:"simplified"` + // Always use this registry first if set to true, useful when subscribe to multiple registries + Preferred bool `yaml:"preferred" json:"preferred,omitempty" property:"preferred"` + // The region where the registry belongs, usually used to isolate traffics + Zone string `yaml:"zone" json:"zone,omitempty" property:"zone"` + //// Force must user the region, property zone is specified. + //ZoneForce bool `yaml:"zoneForce" json:"zoneForce,omitempty" property:"zoneForce"` + // Affects traffic distribution among registries, + // useful when subscribe to multiple registries Take effect only when no preferred registry is specified. + Weight int64 `yaml:"weight" json:"params,omitempty" property:"weight"` + Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` } // UnmarshalYAML unmarshals the RegistryConfig by @unmarshal function @@ -119,6 +128,12 @@ func (c *RegistryConfig) getUrlMap(roleType common.RoleType) url.Values { urlMap.Set(constant.ROLE_KEY, strconv.Itoa(int(roleType))) urlMap.Set(constant.REGISTRY_KEY, c.Protocol) urlMap.Set(constant.REGISTRY_TIMEOUT_KEY, c.TimeoutStr) + // multi registry invoker weight label for load balance + urlMap.Set(constant.REGISTRY_KEY+"."+constant.REGISTRY_LABEL_KEY, strconv.FormatBool(true)) + urlMap.Set(constant.REGISTRY_KEY+"."+constant.PREFERRED_KEY, strconv.FormatBool(c.Preferred)) + urlMap.Set(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, c.Zone) + //urlMap.Set(constant.REGISTRY_KEY+"."+constant.ZONE_FORCE_KEY, strconv.FormatBool(c.ZoneForce)) + urlMap.Set(constant.REGISTRY_KEY+"."+constant.WEIGHT_KEY, strconv.FormatInt(c.Weight, 10)) urlMap.Set(constant.REGISTRY_TTL_KEY, c.TTL) for k, v := range c.Params { urlMap.Set(k, v) diff --git a/protocol/invocation.go b/protocol/invocation.go index ba5949794c..296ec0540c 100644 --- a/protocol/invocation.go +++ b/protocol/invocation.go @@ -41,6 +41,8 @@ type Invocation interface { Attributes() map[string]interface{} // AttributeByKey gets attribute by key , if nil then return default value AttributeByKey(string, interface{}) interface{} + // SetAttachments sets attribute by @key and @value. + SetAttachments(key string, value string) // Invoker gets the invoker in current context. Invoker() Invoker }