Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ftr: Nearest first for multiple registry #659

Merged
merged 13 commits into from
Aug 9, 2020
16 changes: 16 additions & 0 deletions cluster/cluster_impl/base_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package cluster_impl

import (
"context"
cityiron marked this conversation as resolved.
Show resolved Hide resolved
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
Expand All @@ -36,6 +37,7 @@ type baseClusterInvoker struct {
availablecheck bool
destroyed *atomic.Bool
stickyInvoker protocol.Invoker
interceptor cluster.ClusterInterceptor
}

func newBaseClusterInvoker(directory cluster.Directory) baseClusterInvoker {
Expand Down Expand Up @@ -146,6 +148,20 @@ func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invoc
return selectedInvoker
}

func (invoker *baseClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
if invoker.interceptor != nil {
invoker.interceptor.BeforeInvoker(ctx, invocation)

result := invoker.interceptor.DoInvoke(ctx, invocation)

invoker.interceptor.AfterInvoker(ctx, invocation)

return result
}

return nil
}

func isInvoked(selectedInvoker protocol.Invoker, invoked []protocol.Invoker) bool {
for _, i := range invoked {
if i == selectedInvoker {
Expand Down
9 changes: 2 additions & 7 deletions cluster/cluster_impl/failover_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ package cluster_impl

import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)

type failoverCluster struct{}

const name = "failover"

func init() {
extension.SetCluster(name, NewFailoverCluster)
extension.SetCluster(constant.FAILOVER_CLUSTER_NAME, NewFailoverCluster)
}

// NewFailoverCluster returns a failover cluster instance
Expand All @@ -44,7 +43,3 @@ func NewFailoverCluster() cluster.Cluster {
func (cluster *failoverCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailoverClusterInvoker(directory)
}

func GetFailoverName() string {
return name
}
10 changes: 2 additions & 8 deletions cluster/cluster_impl/zone_aware_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ package cluster_impl

import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)

type zoneAwareCluster struct{}

const zoneAware = "zoneAware"

func init() {
extension.SetCluster(zoneAware, NewZoneAwareCluster)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, NewZoneAwareCluster)
}

// NewZoneAwareCluster returns a zoneaware cluster instance.
Expand All @@ -43,8 +42,3 @@ func NewZoneAwareCluster() cluster.Cluster {
func (cluster *zoneAwareCluster) Join(directory cluster.Directory) protocol.Invoker {
cityiron marked this conversation as resolved.
Show resolved Hide resolved
return newZoneAwareClusterInvoker(directory)
}

// GetZoneAwareName get cluster name
func GetZoneAwareName() string {
return zoneAware
}
37 changes: 32 additions & 5 deletions cluster/cluster_impl/zone_aware_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@ type zoneAwareClusterInvoker struct {
}

func newZoneAwareClusterInvoker(directory cluster.Directory) protocol.Invoker {
return &zoneAwareClusterInvoker{
invoke := &zoneAwareClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
// add self to interceptor
invoke.interceptor = invoke
return invoke
}

// nolint
func (invoker *zoneAwareClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
func (invoker *zoneAwareClusterInvoker) DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)

err := invoker.checkInvokers(invokers, invocation)
Expand All @@ -63,16 +66,16 @@ func (invoker *zoneAwareClusterInvoker) Invoke(ctx context.Context, invocation p
}

// providers in the registry with the same zone
zone := invocation.AttachmentsByKey(constant.REGISTRY_ZONE, "")
key := constant.REGISTRY_KEY + "." + constant.ZONE_KEY
zone := invocation.AttachmentsByKey(key, "")
if "" != zone {
for _, invoker := range invokers {
key := constant.REGISTRY_KEY + "." + constant.ZONE_KEY
if invoker.IsAvailable() && matchParam(zone, key, "", invoker) {
return invoker.Invoke(ctx, invocation)
}
}
cityiron marked this conversation as resolved.
Show resolved Hide resolved

force := invocation.AttachmentsByKey(constant.REGISTRY_ZONE_FORCE, "")
force := invocation.AttachmentsByKey(constant.REGISTRY_KEY+"."+constant.ZONE_FORCE_KEY, "")
if "true" == force {
return &protocol.RPCResult{
Err: fmt.Errorf("no registry instance in zone or "+
Expand Down Expand Up @@ -101,6 +104,30 @@ func (invoker *zoneAwareClusterInvoker) Invoke(ctx context.Context, invocation p
}
}

func (invoker *zoneAwareClusterInvoker) BeforeInvoker(ctx context.Context, invocation protocol.Invocation) {
key := constant.REGISTRY_KEY + "." + constant.ZONE_FORCE_KEY
force := ctx.Value(key)

if force != nil {
switch value := force.(type) {
case bool:
if value {
invocation.SetAttachments(key, "true")
}
case string:
if "true" == value {
invocation.SetAttachments(key, "true")
}
default:
// ignore
}
}
}

func (invoker *zoneAwareClusterInvoker) AfterInvoker(ctx context.Context, invocation protocol.Invocation) {

}

func matchParam(target, key, def string, invoker protocol.Invoker) bool {
return target == invoker.GetUrl().GetParam(key, def)
}
15 changes: 8 additions & 7 deletions cluster/cluster_impl/zone_aware_cluster_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/apache/dubbo-go/protocol/mock"
)

func Test_ZoneWareInvokerWithPreferredSuccess(t *testing.T) {
func TestZoneWareInvokerWithPreferredSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
// In Go versions 1.14+, if you pass a *testing.T
// into gomock.NewController(t) you no longer need to call ctrl.Finish().
Expand Down Expand Up @@ -78,7 +78,7 @@ func Test_ZoneWareInvokerWithPreferredSuccess(t *testing.T) {
assert.Equal(t, mockResult, result)
}

func Test_ZoneWareInvokerWithWeightSuccess(t *testing.T) {
func TestZoneWareInvokerWithWeightSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
// In Go versions 1.14+, if you pass a *testing.T
// into gomock.NewController(t) you no longer need to call ctrl.Finish().
Expand Down Expand Up @@ -134,7 +134,7 @@ func Test_ZoneWareInvokerWithWeightSuccess(t *testing.T) {
w1, w1Count, w2, w2Count)
}

func Test_ZoneWareInvokerWithZoneSuccess(t *testing.T) {
func TestZoneWareInvokerWithZoneSuccess(t *testing.T) {
var zoneArray = []string{"hangzhou", "shanghai"}

ctrl := gomock.NewController(t)
Expand Down Expand Up @@ -167,14 +167,14 @@ func Test_ZoneWareInvokerWithZoneSuccess(t *testing.T) {
inv := &invocation.RPCInvocation{}
// zone hangzhou
hz := zoneArray[0]
inv.SetAttachments(constant.REGISTRY_ZONE, hz)
inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, hz)

result := clusterInvoker.Invoke(context.Background(), inv)

assert.Equal(t, hz, result.Attachment(constant.ZONE_KEY, ""))
}

func Test_ZoneWareInvokerWithZoneForceFail(t *testing.T) {
func TestZoneWareInvokerWithZoneForceFail(t *testing.T) {
ctrl := gomock.NewController(t)
// In Go versions 1.14+, if you pass a *testing.T
// into gomock.NewController(t) you no longer need to call ctrl.Finish().
Expand All @@ -196,8 +196,9 @@ func Test_ZoneWareInvokerWithZoneForceFail(t *testing.T) {

inv := &invocation.RPCInvocation{}
// zone hangzhou
inv.SetAttachments(constant.REGISTRY_ZONE, "hangzhou")
inv.SetAttachments(constant.REGISTRY_ZONE_FORCE, "true")
inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_KEY, "hangzhou")
// zone force
inv.SetAttachments(constant.REGISTRY_KEY+"."+constant.ZONE_FORCE_KEY, "true")

result := clusterInvoker.Invoke(context.Background(), inv)

Expand Down
36 changes: 36 additions & 0 deletions cluster/cluster_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cluster

import (
"context"
"github.com/apache/dubbo-go/protocol"
cityiron marked this conversation as resolved.
Show resolved Hide resolved
)

// ClusterInterceptor
// Extension - ClusterInterceptor
type ClusterInterceptor interface {
// Before DoInvoke method
BeforeInvoker(ctx context.Context, invocation protocol.Invocation)

// After DoInvoke method
AfterInvoker(ctx context.Context, invocation protocol.Invocation)

// Corresponding cluster invoke
DoInvoke(ctx context.Context, invocation protocol.Invocation) protocol.Result
}
24 changes: 24 additions & 0 deletions common/constant/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package constant

// nolint
const (
FAILOVER_CLUSTER_NAME = "failover"
ZONEAWARE_CLUSTER_NAME = "zoneAware"
)
3 changes: 1 addition & 2 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ const (
REGISTRY_LABEL_KEY = "label"
PREFERRED_KEY = "preferred"
ZONE_KEY = "zone"
REGISTRY_ZONE = "registry_zone"
REGISTRY_ZONE_FORCE = "registry_zone_force"
ZONE_FORCE_KEY = "zone.force"
REGISTRY_TTL_KEY = "registry.ttl"
)

Expand Down
6 changes: 3 additions & 3 deletions config/config_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestLoad(t *testing.T) {
SetProviderService(ms)

extension.SetProtocol("registry", GetProtocol)
extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory)

Load()
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestLoadWithSingleReg(t *testing.T) {
SetProviderService(ms)

extension.SetProtocol("registry", GetProtocol)
extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory)

Load()
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestWithNoRegLoad(t *testing.T) {
SetProviderService(ms)

extension.SetProtocol("registry", GetProtocol)
extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory)

Load()
Expand Down
21 changes: 10 additions & 11 deletions config/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
)

import (
"github.com/apache/dubbo-go/cluster/cluster_impl"
"github.com/creasty/defaults"
gxstrings "github.com/dubbogo/gost/strings"
)
Expand Down Expand Up @@ -148,26 +147,26 @@ func (c *ReferenceConfig) Refer(_ interface{}) {
}

// TODO(decouple from directory, config should not depend on directory module)
var hitClu string
if regUrl != nil {
// for multi-subscription scenario, use 'zone-aware' policy by default
cluster := extension.GetCluster(cluster_impl.GetZoneAwareName())
// The invoker wrap sequence would be:
// ZoneAwareClusterInvoker(StaticDirectory) ->
// FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
c.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
hitClu = constant.ZONEAWARE_CLUSTER_NAME
} else {
// not a registry url, must be direct invoke.
clu := cluster_impl.GetFailoverName()
hitClu = constant.FAILOVER_CLUSTER_NAME
if len(invokers) > 0 {
u := invokers[0].GetUrl()
if nil != &u {
clu = u.GetParam(constant.CLUSTER_KEY, cluster_impl.GetZoneAwareName())
hitClu = u.GetParam(constant.CLUSTER_KEY, constant.ZONEAWARE_CLUSTER_NAME)
}
}

cluster := extension.GetCluster(clu)
c.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
}

cluster := extension.GetCluster(hitClu)
// If 'zone-aware' policy select, the invoker wrap sequence would be:
// ZoneAwareClusterInvoker(StaticDirectory) ->
// FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
c.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
}

// create proxy
Expand Down
8 changes: 4 additions & 4 deletions config/reference_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func doInitConsumerWithSingleRegistry() {
func TestReferMultiReg(t *testing.T) {
doInitConsumer()
extension.SetProtocol("registry", GetProtocol)
extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)

for _, reference := range consumerConfig.References {
reference.Refer(nil)
Expand All @@ -203,7 +203,7 @@ func TestReferMultiReg(t *testing.T) {
func TestRefer(t *testing.T) {
doInitConsumer()
extension.SetProtocol("registry", GetProtocol)
extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)

for _, reference := range consumerConfig.References {
reference.Refer(nil)
Expand All @@ -217,7 +217,7 @@ func TestRefer(t *testing.T) {
func TestReferAsync(t *testing.T) {
doInitConsumerAsync()
extension.SetProtocol("registry", GetProtocol)
extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)

for _, reference := range consumerConfig.References {
reference.Refer(nil)
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestReferMultiP2PWithReg(t *testing.T) {
func TestImplement(t *testing.T) {
doInitConsumer()
extension.SetProtocol("registry", GetProtocol)
extension.SetCluster(cluster_impl.GetZoneAwareName(), cluster_impl.NewZoneAwareCluster)
extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
for _, reference := range consumerConfig.References {
reference.Refer(nil)
reference.Implement(&MockService{})
Expand Down
Loading