Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Imp: modify codes by code review of gitee #590

Merged
merged 6 commits into from
Jun 8, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cluster/cluster_impl/available_cluster_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cluster_impl

import (
"context"
"fmt"
"strings"
"testing"
)
Expand All @@ -32,14 +33,16 @@ import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/protocol/mock"
)

var (
availableUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
availableUrl, _ = common.NewURL(fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider",
constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
)

func registerAvailable(invoker *mock.MockInvoker) protocol.Invoker {
Expand Down
14 changes: 8 additions & 6 deletions cluster/cluster_impl/base_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func (invoker *baseClusterInvoker) checkWhetherDestroyed() error {

func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
var selectedInvoker protocol.Invoker
if len(invokers) <= 0 {
return selectedInvoker
}

url := invokers[0].GetUrl()
sticky := url.GetParamBool(constant.STICKY_KEY, false)
//Get the service method sticky config if have
Expand All @@ -97,19 +101,17 @@ func (invoker *baseClusterInvoker) doSelect(lb cluster.LoadBalance, invocation p
invoker.stickyInvoker = nil
}

if sticky && invoker.stickyInvoker != nil && (invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) {
if invoker.availablecheck && invoker.stickyInvoker.IsAvailable() {
return invoker.stickyInvoker
}
if sticky && invoker.availablecheck &&
invoker.stickyInvoker != nil && invoker.stickyInvoker.IsAvailable() &&
(invoked == nil || !isInvoked(invoker.stickyInvoker, invoked)) {
return invoker.stickyInvoker
}

selectedInvoker = invoker.doSelectInvoker(lb, invocation, invokers, invoked)

if sticky {
invoker.stickyInvoker = selectedInvoker
}
return selectedInvoker

}

func (invoker *baseClusterInvoker) doSelectInvoker(lb cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, invoked []protocol.Invoker) protocol.Invoker {
Expand Down
11 changes: 7 additions & 4 deletions cluster/cluster_impl/base_cluster_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/apache/dubbo-go/protocol/invocation"
)

func Test_StickyNormal(t *testing.T) {
func TestStickyNormal(t *testing.T) {
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
Expand All @@ -43,12 +43,15 @@ func Test_StickyNormal(t *testing.T) {
base := &baseClusterInvoker{}
base.availablecheck = true
invoked := []protocol.Invoker{}
result := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked)
result1 := base.doSelect(loadbalance.NewRandomLoadBalance(), invocation.NewRPCInvocation("getUser", nil, nil), invokers, invoked)

tmpRandomBalance := loadbalance.NewRandomLoadBalance()
tmpInvocation := invocation.NewRPCInvocation("getUser", nil, nil)
result := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
result1 := base.doSelect(tmpRandomBalance, tmpInvocation, invokers, invoked)
assert.Equal(t, result, result1)
}

func Test_StickyNormalWhenError(t *testing.T) {
func TestStickyNormalWhenError(t *testing.T) {
invokers := []protocol.Invoker{}
for i := 0; i < 10; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i))
Expand Down
15 changes: 9 additions & 6 deletions cluster/cluster_impl/broadcast_cluster_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package cluster_impl
import (
"context"
"errors"
"fmt"
"testing"
)

Expand All @@ -32,17 +33,19 @@ import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/protocol/mock"
)

var (
broadcastUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
broadcastUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
)

func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol.Invoker {
func registerBroadcast(mockInvokers ...*mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)

invokers := []protocol.Invoker{}
Expand All @@ -59,7 +62,7 @@ func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol
return clusterInvoker
}

func Test_BroadcastInvokeSuccess(t *testing.T) {
func TestBroadcastInvokeSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

Expand All @@ -72,13 +75,13 @@ func Test_BroadcastInvokeSuccess(t *testing.T) {
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
}

clusterInvoker := registerBroadcast(t, invokers...)
clusterInvoker := registerBroadcast(invokers...)

result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
}

func Test_BroadcastInvokeFailed(t *testing.T) {
func TestBroadcastInvokeFailed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

Expand All @@ -102,7 +105,7 @@ func Test_BroadcastInvokeFailed(t *testing.T) {
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
}

clusterInvoker := registerBroadcast(t, invokers...)
clusterInvoker := registerBroadcast(invokers...)

result := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
assert.Equal(t, mockFailedResult.Err, result.Error())
Expand Down
35 changes: 17 additions & 18 deletions cluster/cluster_impl/failback_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {

func (invoker *failbackClusterInvoker) process(ctx context.Context) {
invoker.ticker = time.NewTicker(time.Second * 1)
for range invoker.ticker.C {
// check each timeout task and re-run
for {
for {
select {
case <-invoker.ticker.C:
value, err := invoker.taskList.Peek()
if err == queue.ErrDisposed {
return
Expand All @@ -91,26 +91,25 @@ func (invoker *failbackClusterInvoker) process(ctx context.Context) {
}

// ignore return. the get must success.
_, err = invoker.taskList.Get(1)
if err != nil {
if _, err = invoker.taskList.Get(1); err != nil {
logger.Warnf("get task found err: %v\n", err)
break
}
go invoker.tryTimerTaskProc(ctx, retryTask)
}
}
}

go func(retryTask *retryTimerTask) {
invoked := make([]protocol.Invoker, 0)
invoked = append(invoked, retryTask.lastInvoker)

retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
var result protocol.Result
result = retryInvoker.Invoke(ctx, retryTask.invocation)
if result.Error() != nil {
retryTask.lastInvoker = retryInvoker
invoker.checkRetry(retryTask, result.Error())
}
}(retryTask)
func (invoker *failbackClusterInvoker) tryTimerTaskProc(ctx context.Context, retryTask *retryTimerTask) {
var invoked []protocol.Invoker
invoked = append(invoked, retryTask.lastInvoker)

}
retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
var result protocol.Result
result = retryInvoker.Invoke(ctx, retryTask.invocation)
if result.Error() != nil {
retryTask.lastInvoker = retryInvoker
invoker.checkRetry(retryTask, result.Error())
}
}

Expand Down
33 changes: 18 additions & 15 deletions cluster/cluster_impl/failback_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cluster_impl

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand All @@ -34,18 +35,20 @@ import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
"github.com/apache/dubbo-go/protocol/mock"
)

var (
failbackUrl, _ = common.NewURL("dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
failbackUrl, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
)

// registerFailback register failbackCluster to cluster extension.
func registerFailback(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker {
func registerFailback(invoker *mock.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failbackCluster := NewFailbackCluster()

Expand All @@ -60,12 +63,12 @@ func registerFailback(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker
}

// success firstly, failback should return origin invoke result.
func Test_FailbackSuceess(t *testing.T) {
func TestFailbackSuceess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)

invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()

Expand All @@ -77,12 +80,12 @@ func Test_FailbackSuceess(t *testing.T) {
}

// failed firstly, success later after one retry.
func Test_FailbackRetryOneSuccess(t *testing.T) {
func TestFailbackRetryOneSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)

invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()

Expand All @@ -95,7 +98,7 @@ func Test_FailbackRetryOneSuccess(t *testing.T) {
wg.Add(1)
now := time.Now()
mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func() protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= 5)
wg.Done()
Expand All @@ -120,12 +123,12 @@ func Test_FailbackRetryOneSuccess(t *testing.T) {
}

// failed firstly, and failed again after ech retry time.
func Test_FailbackRetryFailed(t *testing.T) {
func TestFailbackRetryFailed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)

invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()

Expand All @@ -141,7 +144,7 @@ func Test_FailbackRetryFailed(t *testing.T) {
// add retry call that eventually failed.
for i := 0; i < retries; i++ {
j := i + 1
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func() protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= int64(5*j))
wg.Done()
Expand All @@ -166,12 +169,12 @@ func Test_FailbackRetryFailed(t *testing.T) {
}

// add 10 tasks but all failed firstly, and failed again with one retry.
func Test_FailbackRetryFailed10Times(t *testing.T) {
func TestFailbackRetryFailed10Times(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
clusterInvoker.maxRetries = 10

invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
Expand All @@ -184,7 +187,7 @@ func Test_FailbackRetryFailed10Times(t *testing.T) {
var wg sync.WaitGroup
wg.Add(10)
now := time.Now()
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func(invocation protocol.Invocation) protocol.Result {
invoker.EXPECT().Invoke(gomock.Any()).DoAndReturn(func() protocol.Result {
delta := time.Since(now).Nanoseconds() / int64(time.Second)
assert.True(t, delta >= 5)
wg.Done()
Expand All @@ -208,12 +211,12 @@ func Test_FailbackRetryFailed10Times(t *testing.T) {
assert.Equal(t, int64(0), clusterInvoker.taskList.Len())
}

func Test_FailbackOutOfLimit(t *testing.T) {
func TestFailbackOutOfLimit(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerFailback(t, invoker).(*failbackClusterInvoker)
clusterInvoker := registerFailback(invoker).(*failbackClusterInvoker)
clusterInvoker.failbackTasks = 1

invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
Expand Down
Loading