Skip to content

Commit

Permalink
Merge pull request #317 from fangyincheng/feature/grpc
Browse files Browse the repository at this point in the history
Mod: modidfy Refer params and add licence
  • Loading branch information
hxmhlt authored Jan 13, 2020
2 parents 78818e8 + 2d6f7f3 commit 1c0e422
Show file tree
Hide file tree
Showing 24 changed files with 96 additions and 38 deletions.
10 changes: 7 additions & 3 deletions config/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ func (refconfig *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) erro
}

func (refconfig *ReferenceConfig) Refer(impl interface{}) {
url := common.NewURLWithOptions(common.WithPath(refconfig.id), common.WithProtocol(refconfig.Protocol), common.WithParams(refconfig.getUrlMap()))
url := common.NewURLWithOptions(common.WithPath(refconfig.id),
common.WithProtocol(refconfig.Protocol),
common.WithParams(refconfig.getUrlMap()),
common.WithParamsValue(constant.BEAN_NAME_KEY, refconfig.id),
)

//1. user specified URL, could be peer-to-peer address, or register center's address.
if refconfig.Url != "" {
Expand Down Expand Up @@ -123,12 +127,12 @@ func (refconfig *ReferenceConfig) Refer(impl interface{}) {
}
}
if len(refconfig.urls) == 1 {
refconfig.invoker = extension.GetProtocol(refconfig.urls[0].Protocol).Refer(*refconfig.urls[0], impl)
refconfig.invoker = extension.GetProtocol(refconfig.urls[0].Protocol).Refer(*refconfig.urls[0])
} else {
invokers := []protocol.Invoker{}
var regUrl *common.URL
for _, u := range refconfig.urls {
invokers = append(invokers, extension.GetProtocol(u.Protocol).Refer(*u, impl))
invokers = append(invokers, extension.GetProtocol(u.Protocol).Refer(*u))
if u.Protocol == constant.REGISTRY_PROTOCOL {
regUrl = u
}
Expand Down
2 changes: 1 addition & 1 deletion config/reference_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func newRegistryProtocol() protocol.Protocol {

type mockRegistryProtocol struct{}

func (*mockRegistryProtocol) Refer(url common.URL, impl interface{}) protocol.Invoker {
func (*mockRegistryProtocol) Refer(url common.URL) protocol.Invoker {
return protocol.NewBaseInvoker(url)
}

Expand Down
2 changes: 1 addition & 1 deletion protocol/dubbo/dubbo_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
return exporter
}

func (dp *DubboProtocol) Refer(url common.URL, impl interface{}) protocol.Invoker {
func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker {
//default requestTimeout
var requestTimeout = config.GetConsumerConfig().RequestTimeout

Expand Down
2 changes: 1 addition & 1 deletion protocol/dubbo/dubbo_protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestDubboProtocol_Refer(t *testing.T) {
"side=provider&timeout=3000&timestamp=1556509797245")
assert.NoError(t, err)
clientConf = &ClientConfig{}
invoker := proto.Refer(url, nil)
invoker := proto.Refer(url)

// make sure url
eq := invoker.GetUrl().URLEqual(url)
Expand Down
6 changes: 5 additions & 1 deletion protocol/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,23 @@ import (

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/config"
)

type Client struct {
*grpc.ClientConn
invoker reflect.Value
}

func NewClient(impl interface{}, url common.URL) *Client {
func NewClient(url common.URL) *Client {
conn, err := grpc.Dial(url.Location, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
panic(err)
}

key := url.GetParam(constant.BEAN_NAME_KEY, "")
impl := config.GetConsumerService(key)
invoker := getInvoker(impl, conn)

return &Client{
Expand Down
3 changes: 1 addition & 2 deletions protocol/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ func TestNewClient(t *testing.T) {
go internal.InitGrpcServer()
defer internal.ShutdownGrpcServer()

var impl *internal.GrpcGreeterImpl
url, err := common.NewURL(context.Background(), "grpc://127.0.0.1:30000/GrpcGreeterImpl?accesslog=&anyhost=true&app.version=0.0.1&application=BDTService&async=false&bean.name=GrpcGreeterImpl&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&execute.limit=&execute.limit.rejected.handler=&generic=false&group=&interface=io.grpc.examples.helloworld.GreeterGrpc%24IGreeter&ip=192.168.1.106&loadbalance=random&methods.SayHello.loadbalance=random&methods.SayHello.retries=1&methods.SayHello.tps.limit.interval=&methods.SayHello.tps.limit.rate=&methods.SayHello.tps.limit.strategy=&methods.SayHello.weight=0&module=dubbogo+say-hello+client&name=BDTService&organization=ikurento.com&owner=ZX&pid=49427&reference.filter=cshutdown&registry.role=3&remote.timestamp=1576923717&retries=&service.filter=echo%2Ctoken%2Caccesslog%2Ctps%2Cexecute%2Cpshutdown&side=provider&timestamp=1576923740&tps.limit.interval=&tps.limit.rate=&tps.limit.rejected.handler=&tps.limit.strategy=&tps.limiter=&version=&warmup=100!")
assert.Nil(t, err)
cli := NewClient(impl, url)
cli := NewClient(url)
assert.NotNil(t, cli)
}
3 changes: 1 addition & 2 deletions protocol/grpc/grpc_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ func TestInvoke(t *testing.T) {
go internal.InitGrpcServer()
defer internal.ShutdownGrpcServer()

var impl *internal.GrpcGreeterImpl
url, err := common.NewURL(context.Background(), "grpc://127.0.0.1:30000/GrpcGreeterImpl?accesslog=&anyhost=true&app.version=0.0.1&application=BDTService&async=false&bean.name=GrpcGreeterImpl&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&execute.limit=&execute.limit.rejected.handler=&generic=false&group=&interface=io.grpc.examples.helloworld.GreeterGrpc%24IGreeter&ip=192.168.1.106&loadbalance=random&methods.SayHello.loadbalance=random&methods.SayHello.retries=1&methods.SayHello.tps.limit.interval=&methods.SayHello.tps.limit.rate=&methods.SayHello.tps.limit.strategy=&methods.SayHello.weight=0&module=dubbogo+say-hello+client&name=BDTService&organization=ikurento.com&owner=ZX&pid=49427&reference.filter=cshutdown&registry.role=3&remote.timestamp=1576923717&retries=&service.filter=echo%2Ctoken%2Caccesslog%2Ctps%2Cexecute%2Cpshutdown&side=provider&timestamp=1576923740&tps.limit.interval=&tps.limit.rate=&tps.limit.rejected.handler=&tps.limit.strategy=&tps.limiter=&version=&warmup=100!")
assert.Nil(t, err)

cli := NewClient(impl, url)
cli := NewClient(url)

invoker := NewGrpcInvoker(url, cli)

Expand Down
4 changes: 2 additions & 2 deletions protocol/grpc/grpc_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func (gp *GrpcProtocol) openServer(url common.URL) {
}
}

func (gp *GrpcProtocol) Refer(url common.URL, impl interface{}) protocol.Invoker {
invoker := NewGrpcInvoker(url, NewClient(impl, url))
func (gp *GrpcProtocol) Refer(url common.URL) protocol.Invoker {
invoker := NewGrpcInvoker(url, NewClient(url))
gp.SetInvokers(invoker)
logger.Infof("Refer service: %s", url.String())
return invoker
Expand Down
3 changes: 1 addition & 2 deletions protocol/grpc/grpc_protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ func TestGrpcProtocol_Refer(t *testing.T) {
proto := GetProtocol()
url, err := common.NewURL(context.Background(), "grpc://127.0.0.1:30000/GrpcGreeterImpl?accesslog=&anyhost=true&app.version=0.0.1&application=BDTService&async=false&bean.name=GrpcGreeterImpl&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&execute.limit=&execute.limit.rejected.handler=&generic=false&group=&interface=io.grpc.examples.helloworld.GreeterGrpc%24IGreeter&ip=192.168.1.106&loadbalance=random&methods.SayHello.loadbalance=random&methods.SayHello.retries=1&methods.SayHello.tps.limit.interval=&methods.SayHello.tps.limit.rate=&methods.SayHello.tps.limit.strategy=&methods.SayHello.weight=0&module=dubbogo+say-hello+client&name=BDTService&organization=ikurento.com&owner=ZX&pid=49427&reference.filter=cshutdown&registry.role=3&remote.timestamp=1576923717&retries=&service.filter=echo%2Ctoken%2Caccesslog%2Ctps%2Cexecute%2Cpshutdown&side=provider&timestamp=1576923740&tps.limit.interval=&tps.limit.rate=&tps.limit.rejected.handler=&tps.limit.strategy=&tps.limiter=&version=&warmup=100!")
assert.NoError(t, err)
var impl *internal.GrpcGreeterImpl
invoker := proto.Refer(url, impl)
invoker := proto.Refer(url)

// make sure url
eq := invoker.GetUrl().URLEqual(url)
Expand Down
8 changes: 8 additions & 0 deletions protocol/grpc/internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ import (
"google.golang.org/grpc"
)

import (
"github.com/apache/dubbo-go/config"
)

func init() {
config.SetConsumerService(&GrpcGreeterImpl{})
}

// used for dubbo-grpc biz client
type GrpcGreeterImpl struct {
SayHello func(ctx context.Context, in *HelloRequest, out *HelloReply) error
Expand Down
17 changes: 17 additions & 0 deletions protocol/grpc/internal/helloworld.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions protocol/grpc/protoc-gen-dubbo/examples/Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

grpc-gen:
protoc -I ./ helloworld.proto --go_out=plugins=grpc:.
dubbo-gen:
Expand Down
17 changes: 17 additions & 0 deletions protocol/grpc/protoc-gen-dubbo/examples/helloworld.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion protocol/jsonrpc/jsonrpc_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
return exporter
}

func (jp *JsonrpcProtocol) Refer(url common.URL, impl interface{}) protocol.Invoker {
func (jp *JsonrpcProtocol) Refer(url common.URL) protocol.Invoker {
//default requestTimeout
var requestTimeout = config.GetConsumerConfig().RequestTimeout

Expand Down
2 changes: 1 addition & 1 deletion protocol/jsonrpc/jsonrpc_protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestJsonrpcProtocol_Refer(t *testing.T) {
RequestTimeout: 5 * time.Second,
}
config.SetConsumerConfig(con)
invoker := proto.Refer(url, nil)
invoker := proto.Refer(url)

// make sure url
eq := invoker.GetUrl().URLEqual(url)
Expand Down
4 changes: 2 additions & 2 deletions protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// Extension - protocol
type Protocol interface {
Export(invoker Invoker) Exporter
Refer(url common.URL, impl interface{}) Invoker
Refer(url common.URL) Invoker
Destroy()
}

Expand Down Expand Up @@ -74,7 +74,7 @@ func (bp *BaseProtocol) Export(invoker Invoker) Exporter {
return NewBaseExporter("base", invoker, bp.exporterMap)
}

func (bp *BaseProtocol) Refer(url common.URL, impl interface{}) Invoker {
func (bp *BaseProtocol) Refer(url common.URL) Invoker {
return NewBaseInvoker(url)
}

Expand Down
2 changes: 1 addition & 1 deletion protocol/protocolwrapper/mock_protocol_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (pfw *mockProtocolFilter) Export(invoker protocol.Invoker) protocol.Exporte
return protocol.NewBaseExporter("key", invoker, &sync.Map{})
}

func (pfw *mockProtocolFilter) Refer(url common.URL, impl interface{}) protocol.Invoker {
func (pfw *mockProtocolFilter) Refer(url common.URL) protocol.Invoker {
return protocol.NewBaseInvoker(url)
}

Expand Down
4 changes: 2 additions & 2 deletions protocol/protocolwrapper/protocol_filter_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ func (pfw *ProtocolFilterWrapper) Export(invoker protocol.Invoker) protocol.Expo
return pfw.protocol.Export(invoker)
}

func (pfw *ProtocolFilterWrapper) Refer(url common.URL, impl interface{}) protocol.Invoker {
func (pfw *ProtocolFilterWrapper) Refer(url common.URL) protocol.Invoker {
if pfw.protocol == nil {
pfw.protocol = extension.GetProtocol(url.Protocol)
}
return buildInvokerChain(pfw.protocol.Refer(url, impl), constant.REFERENCE_FILTER_KEY)
return buildInvokerChain(pfw.protocol.Refer(url), constant.REFERENCE_FILTER_KEY)
}

func (pfw *ProtocolFilterWrapper) Destroy() {
Expand Down
2 changes: 1 addition & 1 deletion protocol/protocolwrapper/protocol_filter_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestProtocolFilterWrapper_Refer(t *testing.T) {
u := common.NewURLWithOptions(
common.WithParams(url.Values{}),
common.WithParamsValue(constant.REFERENCE_FILTER_KEY, "echo"))
invoker := filtProto.Refer(*u, nil)
invoker := filtProto.Refer(*u)
_, ok := invoker.(*FilterInvoker)
assert.True(t, ok)
}
Expand Down
8 changes: 3 additions & 5 deletions registry/directory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,10 @@ type registryDirectory struct {
configurators []config_center.Configurator
consumerConfigurationListener *consumerConfigurationListener
referenceConfigurationListener *referenceConfigurationListener
impl interface{}
Options
}

func NewRegistryDirectory(url *common.URL, impl interface{}, registry registry.Registry, opts ...Option) (*registryDirectory, error) {
func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...Option) (*registryDirectory, error) {
options := Options{
//default 300s
serviceTTL: time.Duration(300e9),
Expand All @@ -80,7 +79,6 @@ func NewRegistryDirectory(url *common.URL, impl interface{}, registry registry.R
serviceType: url.SubURL.Service(),
registry: registry,
Options: options,
impl: impl,
}
dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)
return dir, nil
Expand Down Expand Up @@ -200,13 +198,13 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
dir.overrideUrl(newUrl)
if cacheInvoker, ok := dir.cacheInvokersMap.Load(newUrl.Key()); !ok {
logger.Infof("service will be added in cache invokers: invokers url is %s!", newUrl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl, dir.impl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
}
} else {
logger.Infof("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl())
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl, dir.impl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
cacheInvoker.(protocol.Invoker).Destroy()
Expand Down
6 changes: 3 additions & 3 deletions registry/directory/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestSubscribe(t *testing.T) {
func TestSubscribe_InvalidUrl(t *testing.T) {
url, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111")
mockRegistry, _ := registry.NewMockRegistry(&common.URL{})
_, err := NewRegistryDirectory(&url, nil, mockRegistry)
_, err := NewRegistryDirectory(&url, mockRegistry)
assert.Error(t, err)
}

Expand All @@ -77,7 +77,7 @@ func TestSubscribe_Group(t *testing.T) {
suburl.SetParam(constant.CLUSTER_KEY, "mock")
regurl.SubURL = &suburl
mockRegistry, _ := registry.NewMockRegistry(&common.URL{})
registryDirectory, _ := NewRegistryDirectory(&regurl, nil, mockRegistry)
registryDirectory, _ := NewRegistryDirectory(&regurl, mockRegistry)

go registryDirectory.Subscribe(common.NewURLWithOptions(common.WithPath("testservice")))

Expand Down Expand Up @@ -183,7 +183,7 @@ func normalRegistryDir(noMockEvent ...bool) (*registryDirectory, *registry.MockR
)
url.SubURL = &suburl
mockRegistry, _ := registry.NewMockRegistry(&common.URL{})
registryDirectory, _ := NewRegistryDirectory(&url, nil, mockRegistry)
registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry)

go registryDirectory.Subscribe(&suburl)
if len(noMockEvent) == 0 {
Expand Down
4 changes: 2 additions & 2 deletions registry/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (proto *registryProtocol) initConfigurationListeners() {
proto.serviceConfigurationListeners = &sync.Map{}
proto.providerConfigurationListener = newProviderConfigurationListener(proto.overrideListeners)
}
func (proto *registryProtocol) Refer(url common.URL, impl interface{}) protocol.Invoker {
func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {

var registryUrl = url
var serviceUrl = registryUrl.SubURL
Expand All @@ -108,7 +108,7 @@ func (proto *registryProtocol) Refer(url common.URL, impl interface{}) protocol.
}

//new registry directory for store service url from registry
directory, err := directory2.NewRegistryDirectory(&registryUrl, impl, reg)
directory, err := directory2.NewRegistryDirectory(&registryUrl, reg)
if err != nil {
logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!",
serviceUrl.String(), err.Error())
Expand Down
6 changes: 3 additions & 3 deletions registry/protocol/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func referNormal(t *testing.T, regProtocol *registryProtocol) {

url.SubURL = &suburl

invoker := regProtocol.Refer(url, nil)
invoker := regProtocol.Refer(url)
assert.IsType(t, &protocol.BaseInvoker{}, invoker)
assert.Equal(t, invoker.GetUrl().String(), url.String())
}
Expand All @@ -85,7 +85,7 @@ func TestMultiRegRefer(t *testing.T) {

url2.SubURL = &suburl2

regProtocol.Refer(url2, nil)
regProtocol.Refer(url2)
var count int
regProtocol.registries.Range(func(key, value interface{}) bool {
count++
Expand All @@ -107,7 +107,7 @@ func TestOneRegRefer(t *testing.T) {

url2.SubURL = &suburl2

regProtocol.Refer(url2, nil)
regProtocol.Refer(url2)
var count int
regProtocol.registries.Range(func(key, value interface{}) bool {
count++
Expand Down
2 changes: 0 additions & 2 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package zookeeper

import (
"fmt"
"path"
"strings"
"sync"
Expand Down Expand Up @@ -241,7 +240,6 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi

//listen sub path recursive
go func(zkPath string, listener remoting.DataListener) {
fmt.Printf("zkpath: %v \n", zkPath)
l.listenDirEvent(zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(dubboPath, listener)
Expand Down

0 comments on commit 1c0e422

Please sign in to comment.