Skip to content

Commit

Permalink
Merge pull request #632 from georgehao/fix/issue-615-enhancement-cluster
Browse files Browse the repository at this point in the history
fix: enhancement cluster code analysis
  • Loading branch information
AlexStocks authored Jun 29, 2020
2 parents 89e3bee + edabe97 commit a28a34d
Show file tree
Hide file tree
Showing 14 changed files with 326 additions and 245 deletions.
15 changes: 10 additions & 5 deletions cluster/cluster_impl/base_cluster_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,15 @@ import (
"github.com/apache/dubbo-go/protocol/invocation"
)

const (
baseClusterInvokerMethodName = "getUser"
baseClusterInvokerFormat = "dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider"
)

func TestStickyNormal(t *testing.T) {
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
url, _ := common.NewURL(fmt.Sprintf(baseClusterInvokerFormat, i))
url.SetParam("sticky", "true")
invokers = append(invokers, NewMockInvoker(url, 1))
}
Expand All @@ -45,7 +50,7 @@ func TestStickyNormal(t *testing.T) {
invoked := []protocol.Invoker{}

tmpRandomBalance := loadbalance.NewRandomLoadBalance()
tmpInvocation := invocation.NewRPCInvocation("getUser", nil, nil)
tmpInvocation := invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil)
result := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
result1 := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
assert.Equal(t, result, result1)
Expand All @@ -54,16 +59,16 @@ func TestStickyNormal(t *testing.T) {
func TestStickyNormalWhenError(t *testing.T) {
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
url, _ := common.NewURL(fmt.Sprintf(baseClusterInvokerFormat, i))
url.SetParam("sticky", "true")
invokers = append(invokers, NewMockInvoker(url, 1))
}
base := &baseClusterInvoker{}
base.availablecheck = true

invoked := []protocol.Invoker{}
result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked)
result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked)
invoked = append(invoked, result)
result1 := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked)
result1 := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation(baseClusterInvokerMethodName, nil, nil), invokers, invoked)
assert.NotEqual(t, result, result1)
}
51 changes: 23 additions & 28 deletions cluster/cluster_impl/failback_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@ func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
return invoker
}

func (invoker *failbackClusterInvoker) tryTimerTaskProc(ctx context.Context, retryTask *retryTimerTask) {
invoked := make([]protocol.Invoker, 0)
invoked = append(invoked, retryTask.lastInvoker)

retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
var result protocol.Result
result = retryInvoker.Invoke(ctx, retryTask.invocation)
if result.Error() != nil {
retryTask.lastInvoker = retryInvoker
invoker.checkRetry(retryTask, result.Error())
}
}

func (invoker *failbackClusterInvoker) process(ctx context.Context) {
invoker.ticker = time.NewTicker(time.Second * 1)
for range invoker.ticker.C {
Expand All @@ -91,25 +104,11 @@ func (invoker *failbackClusterInvoker) process(ctx context.Context) {
}

// ignore return. the get must success.
_, err = invoker.taskList.Get(1)
if err != nil {
if _, err = invoker.taskList.Get(1); err != nil {
logger.Warnf("get task found err: %v\n", err)
break
}

go func(retryTask *retryTimerTask) {
invoked := make([]protocol.Invoker, 0)
invoked = append(invoked, retryTask.lastInvoker)

retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
var result protocol.Result
result = retryInvoker.Invoke(ctx, retryTask.invocation)
if result.Error() != nil {
retryTask.lastInvoker = retryInvoker
invoker.checkRetry(retryTask, result.Error())
}
}(retryTask)

go invoker.tryTimerTaskProc(ctx, retryTask)
}
}
}
Expand All @@ -129,29 +128,26 @@ func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err

func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
if err := invoker.checkInvokers(invokers, invocation); err != nil {
logger.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
invocation.MethodName(), invoker.GetUrl().Service(), err)
return &protocol.RPCResult{}
}
url := invokers[0].GetUrl()
methodName := invocation.MethodName()

//Get the service loadbalance config
url := invokers[0].GetUrl()
lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)

//Get the service method loadbalance config if have
methodName := invocation.MethodName()
if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" {
lb = v
}
loadbalance := extension.GetLoadbalance(lb)

loadBalance := extension.GetLoadbalance(lb)
invoked := make([]protocol.Invoker, 0, len(invokers))
var result protocol.Result

ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked)
//DO INVOKE
result = ivk.Invoke(ctx, invocation)
result := ivk.Invoke(ctx, invocation)
if result.Error() != nil {
invoker.once.Do(func() {
invoker.taskList = queue.New(invoker.failbackTasks)
Expand All @@ -164,15 +160,14 @@ func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation pr
return &protocol.RPCResult{}
}

timerTask := newRetryTimerTask(loadbalance, invocation, invokers, ivk)
timerTask := newRetryTimerTask(loadBalance, invocation, invokers, ivk)
invoker.taskList.Put(timerTask)

logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
methodName, url.Service(), result.Error().Error())
// ignore
return &protocol.RPCResult{}
}

return result
}

Expand Down
33 changes: 23 additions & 10 deletions cluster/loadbalance/consistent_hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package loadbalance

import (
"fmt"
"testing"
)

Expand All @@ -32,6 +33,19 @@ import (
"github.com/apache/dubbo-go/protocol/invocation"
)

const (
ip = "192.168.1.0"
port8080 = 8080
port8082 = 8082

url8080Short = "dubbo://192.168.1.0:8080"
url8081Short = "dubbo://192.168.1.0:8081"
url20000 = "dubbo://192.168.1.0:20000/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1"
url8080 = "dubbo://192.168.1.0:8080/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1"
url8081 = "dubbo://192.168.1.0:8081/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1"
url8082 = "dubbo://192.168.1.0:8082/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1"
)

func TestConsistentHashSelectorSuite(t *testing.T) {
suite.Run(t, new(consistentHashSelectorSuite))
}
Expand All @@ -43,8 +57,7 @@ type consistentHashSelectorSuite struct {

func (s *consistentHashSelectorSuite) SetupTest() {
var invokers []protocol.Invoker
url, _ := common.NewURL(
"dubbo://192.168.1.0:20000/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1")
url, _ := common.NewURL(url20000)
invokers = append(invokers, protocol.NewBaseInvoker(url))
s.selector = newConsistentHashSelector(invokers, "echo", 999944)
}
Expand All @@ -55,14 +68,14 @@ func (s *consistentHashSelectorSuite) TestToKey() {
}

func (s *consistentHashSelectorSuite) TestSelectForKey() {
url1, _ := common.NewURL("dubbo://192.168.1.0:8080")
url2, _ := common.NewURL("dubbo://192.168.1.0:8081")
url1, _ := common.NewURL(url8080Short)
url2, _ := common.NewURL(url8081Short)
s.selector.virtualInvokers = make(map[uint32]protocol.Invoker)
s.selector.virtualInvokers[99874] = protocol.NewBaseInvoker(url1)
s.selector.virtualInvokers[9999945] = protocol.NewBaseInvoker(url2)
s.selector.keys = []uint32{99874, 9999945}
result := s.selector.selectForKey(9999944)
s.Equal(result.GetUrl().String(), "dubbo://192.168.1.0:8081?")
s.Equal(result.GetUrl().String(), url8081Short+"?")
}

func TestConsistentHashLoadBalanceSuite(t *testing.T) {
Expand All @@ -83,11 +96,11 @@ type consistentHashLoadBalanceSuite struct {

func (s *consistentHashLoadBalanceSuite) SetupTest() {
var err error
s.url1, err = common.NewURL("dubbo://192.168.1.0:8080/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1")
s.url1, err = common.NewURL(url8080)
s.NoError(err)
s.url2, err = common.NewURL("dubbo://192.168.1.0:8081/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1")
s.url2, err = common.NewURL(url8081)
s.NoError(err)
s.url3, err = common.NewURL("dubbo://192.168.1.0:8082/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1")
s.url3, err = common.NewURL(url8082)
s.NoError(err)

s.invoker1 = protocol.NewBaseInvoker(s.url1)
Expand All @@ -101,9 +114,9 @@ func (s *consistentHashLoadBalanceSuite) SetupTest() {
func (s *consistentHashLoadBalanceSuite) TestSelect() {
args := []interface{}{"name", "password", "age"}
invoker := s.lb.Select(s.invokers, invocation.NewRPCInvocation("echo", args, nil))
s.Equal(invoker.GetUrl().Location, "192.168.1.0:8080")
s.Equal(invoker.GetUrl().Location, fmt.Sprintf("%s:%d", ip, port8080))

args = []interface{}{"ok", "abc"}
invoker = s.lb.Select(s.invokers, invocation.NewRPCInvocation("echo", args, nil))
s.Equal(invoker.GetUrl().Location, "192.168.1.0:8082")
s.Equal(invoker.GetUrl().Location, fmt.Sprintf("%s:%d", ip, port8082))
}
22 changes: 14 additions & 8 deletions cluster/loadbalance/random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,24 @@ import (
"github.com/apache/dubbo-go/protocol/invocation"
)

const (
tmpUrl = "dubbo://192.168.1.100:20000/com.ikurento.user.UserProvider"
tmpUrlFormat = "dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider"
tmpIp = "192.168.1.100"
)

func TestRandomlbSelect(t *testing.T) {
randomlb := NewRandomLoadBalance()

invokers := []protocol.Invoker{}

url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", 0))
url, _ := common.NewURL(fmt.Sprintf(tmpUrlFormat, 0))
invokers = append(invokers, protocol.NewBaseInvoker(url))
i := randomlb.Select(invokers, &invocation.RPCInvocation{})
assert.True(t, i.GetUrl().URLEqual(url))

for i := 1; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
url, _ := common.NewURL(fmt.Sprintf(tmpUrlFormat, i))
invokers = append(invokers, protocol.NewBaseInvoker(url))
}
randomlb.Select(invokers, &invocation.RPCInvocation{})
Expand All @@ -58,21 +64,21 @@ func TestRandomlbSelectWeight(t *testing.T) {

invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
url, _ := common.NewURL(fmt.Sprintf(tmpUrlFormat, i))
invokers = append(invokers, protocol.NewBaseInvoker(url))
}

urlParams := url.Values{}
urlParams.Set("methods.test."+constant.WEIGHT_KEY, "10000000000000")
urll, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.100:20000/com.ikurento.user.UserProvider"), common.WithParams(urlParams))
urll, _ := common.NewURL(tmpUrl, common.WithParams(urlParams))
invokers = append(invokers, protocol.NewBaseInvoker(urll))
ivc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test"))

var selectedInvoker []protocol.Invoker
var selected float64
for i := 0; i < 10000; i++ {
s := randomlb.Select(invokers, ivc)
if s.GetUrl().Ip == "192.168.1.100" {
if s.GetUrl().Ip == tmpIp {
selected++
}
selectedInvoker = append(selectedInvoker, s)
Expand All @@ -89,21 +95,21 @@ func TestRandomlbSelectWarmup(t *testing.T) {

invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
url, _ := common.NewURL(fmt.Sprintf(tmpUrlFormat, i))
invokers = append(invokers, protocol.NewBaseInvoker(url))
}

urlParams := url.Values{}
urlParams.Set(constant.REMOTE_TIMESTAMP_KEY, strconv.FormatInt(time.Now().Add(time.Minute*(-9)).Unix(), 10))
urll, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.100:20000/com.ikurento.user.UserProvider"), common.WithParams(urlParams))
urll, _ := common.NewURL(tmpUrl, common.WithParams(urlParams))
invokers = append(invokers, protocol.NewBaseInvoker(urll))
ivc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test"))

var selectedInvoker []protocol.Invoker
var selected float64
for i := 0; i < 10000; i++ {
s := randomlb.Select(invokers, ivc)
if s.GetUrl().Ip == "192.168.1.100" {
if s.GetUrl().Ip == tmpIp {
selected++
}
selectedInvoker = append(selectedInvoker, s)
Expand Down
Loading

0 comments on commit a28a34d

Please sign in to comment.