diff --git a/cluster/cluster_impl/available_cluster.go b/cluster/cluster_impl/available_cluster.go new file mode 100644 index 0000000000..7e748cd938 --- /dev/null +++ b/cluster/cluster_impl/available_cluster.go @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type availableCluster struct{} + +const available = "available" + +func init() { + extension.SetCluster(available, NewAvailableCluster) +} + +func NewAvailableCluster() cluster.Cluster { + return &availableCluster{} +} + +func (cluser *availableCluster) Join(directory cluster.Directory) protocol.Invoker { + return NewAvailableClusterInvoker(directory) +} diff --git a/cluster/cluster_impl/available_cluster_invoker.go b/cluster/cluster_impl/available_cluster_invoker.go new file mode 100644 index 0000000000..c59c0702c2 --- /dev/null +++ b/cluster/cluster_impl/available_cluster_invoker.go @@ -0,0 +1,61 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "fmt" +) + +import ( + "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/protocol" +) + +type availableClusterInvoker struct { + baseClusterInvoker +} + +func NewAvailableClusterInvoker(directory cluster.Directory) protocol.Invoker { + return &availableClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } +} + +func (invoker *availableClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + invokers := invoker.directory.List(invocation) + err := invoker.checkInvokers(invokers, invocation) + if err != nil { + return &protocol.RPCResult{Err: err} + } + + err = invoker.checkWhetherDestroyed() + if err != nil { + return &protocol.RPCResult{Err: err} + } + + for _, ivk := range invokers { + if ivk.IsAvailable() { + return ivk.Invoke(invocation) + } + } + return &protocol.RPCResult{Err: errors.New(fmt.Sprintf("no provider available in %v", invokers))} +} diff --git a/cluster/cluster_impl/available_cluster_invoker_test.go b/cluster/cluster_impl/available_cluster_invoker_test.go new file mode 100644 index 0000000000..04032a7f24 --- /dev/null +++ b/cluster/cluster_impl/available_cluster_invoker_test.go @@ -0,0 +1,88 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "context" + "strings" + "testing" +) + +import ( + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/cluster/loadbalance" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/mock" +) + +var ( + availableUrl, _ = common.NewURL(context.Background(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") +) + +func registerAvailable(t *testing.T, invoker *mock.MockInvoker) protocol.Invoker { + extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + availableCluster := NewAvailableCluster() + + invokers := []protocol.Invoker{} + invokers = append(invokers, invoker) + invoker.EXPECT().GetUrl().Return(availableUrl) + + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := availableCluster.Join(staticDir) + return clusterInvoker +} + +func TestAvailableClusterInvokerSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerAvailable(t, invoker) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + invoker.EXPECT().IsAvailable().Return(true) + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + + assert.Equal(t, mockResult, result) +} + +func TestAvailableClusterInvokerNoAvail(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := mock.NewMockInvoker(ctrl) + clusterInvoker := registerAvailable(t, invoker) + + invoker.EXPECT().IsAvailable().Return(false) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + + assert.NotNil(t, result.Error()) + assert.True(t, strings.Contains(result.Error().Error(), "no provider available")) + assert.Nil(t, result.Result()) +}