diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go index d13646dba8..0b5e286049 100644 --- a/common/proxy/proxy.go +++ b/common/proxy/proxy.go @@ -72,10 +72,11 @@ func (p *Proxy) Implement(v common.RPCService) { makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value { return func(in []reflect.Value) []reflect.Value { var ( - err error - inv *invocation_impl.RPCInvocation - inArr []interface{} - reply reflect.Value + err error + inv *invocation_impl.RPCInvocation + inIArr []interface{} + inVArr []reflect.Value + reply reflect.Value ) if methodName == "Echo" { methodName = "$echo" @@ -104,21 +105,25 @@ func (p *Proxy) Implement(v common.RPCService) { } if end-start <= 0 { - inArr = []interface{}{} + inIArr = []interface{}{} + inVArr = []reflect.Value{} } else if v, ok := in[start].Interface().([]interface{}); ok && end-start == 1 { - inArr = v + inIArr = v + inVArr = []reflect.Value{in[start]} } else { - inArr = make([]interface{}, end-start) + inIArr = make([]interface{}, end-start) + inVArr = make([]reflect.Value, end-start) index := 0 for i := start; i < end; i++ { - inArr[index] = in[i].Interface() + inIArr[index] = in[i].Interface() + inVArr[index] = in[i] index++ } } inv = invocation_impl.NewRPCInvocationWithOptions(invocation_impl.WithMethodName(methodName), - invocation_impl.WithArguments(inArr), invocation_impl.WithReply(reply.Interface()), - invocation_impl.WithCallBack(p.callBack)) + invocation_impl.WithArguments(inIArr), invocation_impl.WithReply(reply.Interface()), + invocation_impl.WithCallBack(p.callBack), invocation_impl.WithParameterValues(inVArr)) for k, value := range p.attachments { inv.SetAttachments(k, value) diff --git a/config/config_loader.go b/config/config_loader.go index 414bb47902..43237be94b 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -91,7 +91,7 @@ func Load() { continue } ref.id = key - ref.Refer() + ref.Refer(rpcService) ref.Implement(rpcService) } //wait for invoker is available, if wait over default 3s, then panic diff --git a/config/reference_config.go b/config/reference_config.go index fde3f7daa5..0009dc87c9 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -89,8 +89,12 @@ func (refconfig *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) erro return nil } -func (refconfig *ReferenceConfig) Refer() { - url := common.NewURLWithOptions(common.WithPath(refconfig.id), common.WithProtocol(refconfig.Protocol), common.WithParams(refconfig.getUrlMap())) +func (refconfig *ReferenceConfig) Refer(impl interface{}) { + url := common.NewURLWithOptions(common.WithPath(refconfig.id), + common.WithProtocol(refconfig.Protocol), + common.WithParams(refconfig.getUrlMap()), + common.WithParamsValue(constant.BEAN_NAME_KEY, refconfig.id), + ) //1. user specified URL, could be peer-to-peer address, or register center's address. if refconfig.Url != "" { @@ -214,7 +218,7 @@ func (refconfig *ReferenceConfig) GenericLoad(id string) { genericService := NewGenericService(refconfig.id) SetConsumerService(genericService) refconfig.id = id - refconfig.Refer() + refconfig.Refer(genericService) refconfig.Implement(genericService) return } diff --git a/config/reference_config_test.go b/config/reference_config_test.go index e689c471ed..7a65e55f09 100644 --- a/config/reference_config_test.go +++ b/config/reference_config_test.go @@ -184,7 +184,7 @@ func Test_ReferMultireg(t *testing.T) { extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) for _, reference := range consumerConfig.References { - reference.Refer() + reference.Refer(nil) assert.NotNil(t, reference.invoker) assert.NotNil(t, reference.pxy) } @@ -197,7 +197,7 @@ func Test_Refer(t *testing.T) { extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) for _, reference := range consumerConfig.References { - reference.Refer() + reference.Refer(nil) assert.Equal(t, "soa.mock", reference.Params["serviceid"]) assert.NotNil(t, reference.invoker) assert.NotNil(t, reference.pxy) @@ -211,7 +211,7 @@ func Test_ReferAsync(t *testing.T) { extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) for _, reference := range consumerConfig.References { - reference.Refer() + reference.Refer(nil) assert.Equal(t, "soa.mock", reference.Params["serviceid"]) assert.NotNil(t, reference.invoker) assert.NotNil(t, reference.pxy) @@ -227,7 +227,7 @@ func Test_ReferP2P(t *testing.T) { m.Url = "dubbo://127.0.0.1:20000" for _, reference := range consumerConfig.References { - reference.Refer() + reference.Refer(nil) assert.NotNil(t, reference.invoker) assert.NotNil(t, reference.pxy) } @@ -241,7 +241,7 @@ func Test_ReferMultiP2P(t *testing.T) { m.Url = "dubbo://127.0.0.1:20000;dubbo://127.0.0.2:20000" for _, reference := range consumerConfig.References { - reference.Refer() + reference.Refer(nil) assert.NotNil(t, reference.invoker) assert.NotNil(t, reference.pxy) } @@ -256,7 +256,7 @@ func Test_ReferMultiP2PWithReg(t *testing.T) { m.Url = "dubbo://127.0.0.1:20000;registry://127.0.0.2:20000" for _, reference := range consumerConfig.References { - reference.Refer() + reference.Refer(nil) assert.NotNil(t, reference.invoker) assert.NotNil(t, reference.pxy) } @@ -268,7 +268,7 @@ func Test_Implement(t *testing.T) { extension.SetProtocol("registry", GetProtocol) extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) for _, reference := range consumerConfig.References { - reference.Refer() + reference.Refer(nil) reference.Implement(&MockService{}) assert.NotNil(t, reference.GetRPCService()) @@ -284,7 +284,7 @@ func Test_Forking(t *testing.T) { m.Url = "dubbo://127.0.0.1:20000;registry://127.0.0.2:20000" for _, reference := range consumerConfig.References { - reference.Refer() + reference.Refer(nil) forks := int(reference.invoker.GetUrl().GetParamInt(constant.FORKS_KEY, constant.DEFAULT_FORKS)) assert.Equal(t, 5, forks) assert.NotNil(t, reference.pxy) @@ -301,7 +301,7 @@ func Test_Sticky(t *testing.T) { m.Url = "dubbo://127.0.0.1:20000;registry://127.0.0.2:20000" reference := consumerConfig.References["MockService"] - reference.Refer() + reference.Refer(nil) referenceSticky := reference.invoker.GetUrl().GetParam(constant.STICKY_KEY, "false") assert.Equal(t, "false", referenceSticky) diff --git a/go.mod b/go.mod index 4d65602d9d..d8142c0bb6 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/go-errors/errors v1.0.1 // indirect github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/mock v1.3.1 + github.com/golang/protobuf v1.3.2 github.com/google/btree v1.0.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go index 7d60090e2d..8a032d0ca9 100644 --- a/protocol/dubbo/dubbo_invoker_test.go +++ b/protocol/dubbo/dubbo_invoker_test.go @@ -40,8 +40,8 @@ func TestDubboInvoker_Invoke(t *testing.T) { pendingResponses: new(sync.Map), conf: *clientConf, opts: Options{ - ConnectTimeout: 3e9, - RequestTimeout: 6e9, + ConnectTimeout: 3 * time.Second, + RequestTimeout: 6 * time.Second, }, } c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) diff --git a/protocol/grpc/client.go b/protocol/grpc/client.go new file mode 100644 index 0000000000..126f3774e6 --- /dev/null +++ b/protocol/grpc/client.go @@ -0,0 +1,61 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpc + +import ( + "reflect" +) + +import ( + "google.golang.org/grpc" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/config" +) + +type Client struct { + *grpc.ClientConn + invoker reflect.Value +} + +func NewClient(url common.URL) *Client { + conn, err := grpc.Dial(url.Location, grpc.WithInsecure(), grpc.WithBlock()) + if err != nil { + panic(err) + } + + key := url.GetParam(constant.BEAN_NAME_KEY, "") + impl := config.GetConsumerService(key) + invoker := getInvoker(impl, conn) + + return &Client{ + ClientConn: conn, + invoker: reflect.ValueOf(invoker), + } +} + +func getInvoker(impl interface{}, conn *grpc.ClientConn) interface{} { + in := []reflect.Value{} + in = append(in, reflect.ValueOf(conn)) + method := reflect.ValueOf(impl).MethodByName("GetDubboStub") + res := method.Call(in) + return res[0].Interface() +} diff --git a/protocol/grpc/client_test.go b/protocol/grpc/client_test.go new file mode 100644 index 0000000000..7d96402782 --- /dev/null +++ b/protocol/grpc/client_test.go @@ -0,0 +1,54 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpc + +import ( + "context" + "reflect" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol/grpc/internal" +) + +func TestGetInvoker(t *testing.T) { + var conn *grpc.ClientConn + var impl *internal.GrpcGreeterImpl + invoker := getInvoker(impl, conn) + + i := reflect.TypeOf(invoker) + expected := reflect.TypeOf(internal.NewGreeterClient(nil)) + assert.Equal(t, i, expected) +} + +func TestNewClient(t *testing.T) { + go internal.InitGrpcServer() + defer internal.ShutdownGrpcServer() + + url, err := common.NewURL(context.Background(), "grpc://127.0.0.1:30000/GrpcGreeterImpl?accesslog=&anyhost=true&app.version=0.0.1&application=BDTService&async=false&bean.name=GrpcGreeterImpl&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&execute.limit=&execute.limit.rejected.handler=&generic=false&group=&interface=io.grpc.examples.helloworld.GreeterGrpc%24IGreeter&ip=192.168.1.106&loadbalance=random&methods.SayHello.loadbalance=random&methods.SayHello.retries=1&methods.SayHello.tps.limit.interval=&methods.SayHello.tps.limit.rate=&methods.SayHello.tps.limit.strategy=&methods.SayHello.weight=0&module=dubbogo+say-hello+client&name=BDTService&organization=ikurento.com&owner=ZX&pid=49427&reference.filter=cshutdown®istry.role=3&remote.timestamp=1576923717&retries=&service.filter=echo%2Ctoken%2Caccesslog%2Ctps%2Cexecute%2Cpshutdown&side=provider×tamp=1576923740&tps.limit.interval=&tps.limit.rate=&tps.limit.rejected.handler=&tps.limit.strategy=&tps.limiter=&version=&warmup=100!") + assert.Nil(t, err) + cli := NewClient(url) + assert.NotNil(t, cli) +} diff --git a/protocol/grpc/common_test.go b/protocol/grpc/common_test.go new file mode 100644 index 0000000000..7f78bdc40d --- /dev/null +++ b/protocol/grpc/common_test.go @@ -0,0 +1,112 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpc + +import ( + "context" + "fmt" +) + +import ( + native_grpc "google.golang.org/grpc" +) + +import ( + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/grpc/internal" + "github.com/apache/dubbo-go/protocol/invocation" +) + +// userd grpc-dubbo biz service +func addService() { + config.SetProviderService(newGreeterProvider()) +} + +type greeterProvider struct { + *greeterProviderBase +} + +func newGreeterProvider() *greeterProvider { + return &greeterProvider{ + greeterProviderBase: &greeterProviderBase{}, + } +} + +func (g *greeterProvider) SayHello(ctx context.Context, req *internal.HelloRequest) (reply *internal.HelloReply, err error) { + fmt.Printf("req: %v", req) + return &internal.HelloReply{Message: "this is message from reply"}, nil +} + +func (g *greeterProvider) Reference() string { + return "GrpcGreeterImpl" +} + +// code generated by greeter.go +type greeterProviderBase struct { + proxyImpl protocol.Invoker +} + +func (g *greeterProviderBase) SetProxyImpl(impl protocol.Invoker) { + g.proxyImpl = impl +} + +func (g *greeterProviderBase) GetProxyImpl() protocol.Invoker { + return g.proxyImpl +} + +func (g *greeterProviderBase) ServiceDesc() *native_grpc.ServiceDesc { + return &native_grpc.ServiceDesc{ + ServiceName: "helloworld.Greeter", + HandlerType: (*internal.GreeterServer)(nil), + Methods: []native_grpc.MethodDesc{ + { + MethodName: "SayHello", + Handler: _DUBBO_Greeter_SayHello_Handler, + }, + }, + Streams: []native_grpc.StreamDesc{}, + Metadata: "helloworld.proto", + } +} + +func _DUBBO_Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor native_grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(internal.HelloRequest) + if err := dec(in); err != nil { + return nil, err + } + base := srv.(DubboGrpcService) + + args := []interface{}{} + args = append(args, in) + invo := invocation.NewRPCInvocation("SayHello", args, nil) + + if interceptor == nil { + result := base.GetProxyImpl().Invoke(invo) + return result.Result(), result.Error() + } + info := &native_grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/helloworld.Greeter/SayHello", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + result := base.GetProxyImpl().Invoke(invo) + return result.Result(), result.Error() + } + return interceptor(ctx, in, info, handler) +} diff --git a/protocol/grpc/grpc_exporter.go b/protocol/grpc/grpc_exporter.go new file mode 100644 index 0000000000..8446d319f1 --- /dev/null +++ b/protocol/grpc/grpc_exporter.go @@ -0,0 +1,48 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpc + +import ( + "sync" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +type GrpcExporter struct { + *protocol.BaseExporter +} + +func NewGrpcExporter(key string, invoker protocol.Invoker, exporterMap *sync.Map) *GrpcExporter { + return &GrpcExporter{ + BaseExporter: protocol.NewBaseExporter(key, invoker, exporterMap), + } +} + +func (gg *GrpcExporter) Unexport() { + serviceId := gg.GetInvoker().GetUrl().GetParam(constant.BEAN_NAME_KEY, "") + gg.BaseExporter.Unexport() + err := common.ServiceMap.UnRegister(GRPC, serviceId) + if err != nil { + logger.Errorf("[GrpcExporter.Unexport] error: %v", err) + } +} diff --git a/protocol/grpc/grpc_invoker.go b/protocol/grpc/grpc_invoker.go new file mode 100644 index 0000000000..b74612b896 --- /dev/null +++ b/protocol/grpc/grpc_invoker.go @@ -0,0 +1,97 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpc + +import ( + "context" + "reflect" + "sync" +) + +import ( + "github.com/pkg/errors" + "google.golang.org/grpc/connectivity" +) + +import ( + hessian2 "github.com/apache/dubbo-go-hessian2" + + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) + +var ErrNoReply = errors.New("request need @response") + +type GrpcInvoker struct { + protocol.BaseInvoker + quitOnce sync.Once + client *Client +} + +func NewGrpcInvoker(url common.URL, client *Client) *GrpcInvoker { + return &GrpcInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(url), + client: client, + } +} + +func (gi *GrpcInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + var ( + result protocol.RPCResult + ) + + if invocation.Reply() == nil { + result.Err = ErrNoReply + } + + in := []reflect.Value{} + in = append(in, reflect.ValueOf(context.Background())) + in = append(in, invocation.ParameterValues()...) + + methodName := invocation.MethodName() + method := gi.client.invoker.MethodByName(methodName) + res := method.Call(in) + + result.Rest = res[0] + // check err + if !res[1].IsNil() { + result.Err = res[1].Interface().(error) + } else { + _ = hessian2.ReflectResponse(res[0], invocation.Reply()) + } + + return &result +} + +func (gi *GrpcInvoker) IsAvailable() bool { + return gi.BaseInvoker.IsAvailable() && gi.client.GetState() != connectivity.Shutdown +} + +func (gi *GrpcInvoker) IsDestroyed() bool { + return gi.BaseInvoker.IsDestroyed() && gi.client.GetState() == connectivity.Shutdown +} + +func (gi *GrpcInvoker) Destroy() { + gi.quitOnce.Do(func() { + gi.BaseInvoker.Destroy() + + if gi.client != nil { + _ = gi.client.Close() + } + }) +} diff --git a/protocol/grpc/grpc_invoker_test.go b/protocol/grpc/grpc_invoker_test.go new file mode 100644 index 0000000000..4f97e10631 --- /dev/null +++ b/protocol/grpc/grpc_invoker_test.go @@ -0,0 +1,56 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpc + +import ( + "context" + "reflect" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol/grpc/internal" + "github.com/apache/dubbo-go/protocol/invocation" +) + +func TestInvoke(t *testing.T) { + go internal.InitGrpcServer() + defer internal.ShutdownGrpcServer() + + url, err := common.NewURL(context.Background(), "grpc://127.0.0.1:30000/GrpcGreeterImpl?accesslog=&anyhost=true&app.version=0.0.1&application=BDTService&async=false&bean.name=GrpcGreeterImpl&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&execute.limit=&execute.limit.rejected.handler=&generic=false&group=&interface=io.grpc.examples.helloworld.GreeterGrpc%24IGreeter&ip=192.168.1.106&loadbalance=random&methods.SayHello.loadbalance=random&methods.SayHello.retries=1&methods.SayHello.tps.limit.interval=&methods.SayHello.tps.limit.rate=&methods.SayHello.tps.limit.strategy=&methods.SayHello.weight=0&module=dubbogo+say-hello+client&name=BDTService&organization=ikurento.com&owner=ZX&pid=49427&reference.filter=cshutdown®istry.role=3&remote.timestamp=1576923717&retries=&service.filter=echo%2Ctoken%2Caccesslog%2Ctps%2Cexecute%2Cpshutdown&side=provider×tamp=1576923740&tps.limit.interval=&tps.limit.rate=&tps.limit.rejected.handler=&tps.limit.strategy=&tps.limiter=&version=&warmup=100!") + assert.Nil(t, err) + + cli := NewClient(url) + + invoker := NewGrpcInvoker(url, cli) + + args := []reflect.Value{} + args = append(args, reflect.ValueOf(&internal.HelloRequest{Name: "request name"})) + bizReply := &internal.HelloReply{} + invo := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("SayHello"), + invocation.WithParameterValues(args), invocation.WithReply(bizReply)) + res := invoker.Invoke(invo) + assert.Nil(t, res.Error()) + assert.NotNil(t, res.Result()) + assert.Equal(t, "Hello request name", bizReply.Message) +} diff --git a/protocol/grpc/grpc_protocol.go b/protocol/grpc/grpc_protocol.go new file mode 100644 index 0000000000..cad75752ad --- /dev/null +++ b/protocol/grpc/grpc_protocol.go @@ -0,0 +1,104 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpc + +import ( + "sync" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +const GRPC = "grpc" + +func init() { + extension.SetProtocol(GRPC, GetProtocol) +} + +var grpcProtocol *GrpcProtocol + +type GrpcProtocol struct { + protocol.BaseProtocol + serverMap map[string]*Server + serverLock sync.Mutex +} + +func NewGRPCProtocol() *GrpcProtocol { + return &GrpcProtocol{ + BaseProtocol: protocol.NewBaseProtocol(), + serverMap: make(map[string]*Server), + } +} + +func (gp *GrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter { + url := invoker.GetUrl() + serviceKey := url.ServiceKey() + exporter := NewGrpcExporter(serviceKey, invoker, gp.ExporterMap()) + gp.SetExporterMap(serviceKey, exporter) + logger.Infof("Export service: %s", url.String()) + gp.openServer(url) + return exporter +} + +func (gp *GrpcProtocol) openServer(url common.URL) { + _, ok := gp.serverMap[url.Location] + if !ok { + _, ok := gp.ExporterMap().Load(url.ServiceKey()) + if !ok { + panic("[GrpcProtocol]" + url.Key() + "is not existing") + } + + gp.serverLock.Lock() + _, ok = gp.serverMap[url.Location] + if !ok { + srv := NewServer() + gp.serverMap[url.Location] = srv + srv.Start(url) + } + gp.serverLock.Unlock() + } +} + +func (gp *GrpcProtocol) Refer(url common.URL) protocol.Invoker { + invoker := NewGrpcInvoker(url, NewClient(url)) + gp.SetInvokers(invoker) + logger.Infof("Refer service: %s", url.String()) + return invoker +} + +func (gp *GrpcProtocol) Destroy() { + logger.Infof("GrpcProtocol destroy.") + + gp.BaseProtocol.Destroy() + + for key, server := range gp.serverMap { + delete(gp.serverMap, key) + server.Stop() + } +} + +func GetProtocol() protocol.Protocol { + if grpcProtocol == nil { + grpcProtocol = NewGRPCProtocol() + } + return grpcProtocol +} diff --git a/protocol/grpc/grpc_protocol_test.go b/protocol/grpc/grpc_protocol_test.go new file mode 100644 index 0000000000..e4629499b7 --- /dev/null +++ b/protocol/grpc/grpc_protocol_test.go @@ -0,0 +1,85 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpc + +import ( + "context" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/grpc/internal" +) + +func TestGrpcProtocol_Export(t *testing.T) { + // Export + addService() + + proto := GetProtocol() + url, err := common.NewURL(context.Background(), "grpc://127.0.0.1:40000/GrpcGreeterImpl?accesslog=&app.version=0.0.1&application=BDTService&bean.name=GrpcGreeterImpl&cluster=failover&environment=dev&execute.limit=&execute.limit.rejected.handler=&group=&interface=io.grpc.examples.helloworld.GreeterGrpc%24IGreeter&loadbalance=random&methods.SayHello.loadbalance=random&methods.SayHello.retries=1&methods.SayHello.tps.limit.interval=&methods.SayHello.tps.limit.rate=&methods.SayHello.tps.limit.strategy=&methods.SayHello.weight=0&module=dubbogo+say-hello+client&name=BDTService&organization=ikurento.com&owner=ZX®istry.role=3&retries=&service.filter=echo%2Ctoken%2Caccesslog%2Ctps%2Cexecute%2Cpshutdown×tamp=1576923717&tps.limit.interval=&tps.limit.rate=&tps.limit.rejected.handler=&tps.limit.strategy=&tps.limiter=&version=&warmup=100") + assert.NoError(t, err) + exporter := proto.Export(protocol.NewBaseInvoker(url)) + time.Sleep(time.Second) + + // make sure url + eq := exporter.GetInvoker().GetUrl().URLEqual(url) + assert.True(t, eq) + + // make sure exporterMap after 'Unexport' + _, ok := proto.(*GrpcProtocol).ExporterMap().Load(url.ServiceKey()) + assert.True(t, ok) + exporter.Unexport() + _, ok = proto.(*GrpcProtocol).ExporterMap().Load(url.ServiceKey()) + assert.False(t, ok) + + // make sure serverMap after 'Destroy' + _, ok = proto.(*GrpcProtocol).serverMap[url.Location] + assert.True(t, ok) + proto.Destroy() + _, ok = proto.(*GrpcProtocol).serverMap[url.Location] + assert.False(t, ok) +} + +func TestGrpcProtocol_Refer(t *testing.T) { + go internal.InitGrpcServer() + defer internal.ShutdownGrpcServer() + time.Sleep(time.Second) + + proto := GetProtocol() + url, err := common.NewURL(context.Background(), "grpc://127.0.0.1:30000/GrpcGreeterImpl?accesslog=&anyhost=true&app.version=0.0.1&application=BDTService&async=false&bean.name=GrpcGreeterImpl&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&execute.limit=&execute.limit.rejected.handler=&generic=false&group=&interface=io.grpc.examples.helloworld.GreeterGrpc%24IGreeter&ip=192.168.1.106&loadbalance=random&methods.SayHello.loadbalance=random&methods.SayHello.retries=1&methods.SayHello.tps.limit.interval=&methods.SayHello.tps.limit.rate=&methods.SayHello.tps.limit.strategy=&methods.SayHello.weight=0&module=dubbogo+say-hello+client&name=BDTService&organization=ikurento.com&owner=ZX&pid=49427&reference.filter=cshutdown®istry.role=3&remote.timestamp=1576923717&retries=&service.filter=echo%2Ctoken%2Caccesslog%2Ctps%2Cexecute%2Cpshutdown&side=provider×tamp=1576923740&tps.limit.interval=&tps.limit.rate=&tps.limit.rejected.handler=&tps.limit.strategy=&tps.limiter=&version=&warmup=100!") + assert.NoError(t, err) + invoker := proto.Refer(url) + + // make sure url + eq := invoker.GetUrl().URLEqual(url) + assert.True(t, eq) + + // make sure invokers after 'Destroy' + invokersLen := len(proto.(*GrpcProtocol).Invokers()) + assert.Equal(t, 1, invokersLen) + proto.Destroy() + invokersLen = len(proto.(*GrpcProtocol).Invokers()) + assert.Equal(t, 0, invokersLen) +} diff --git a/protocol/grpc/internal/client.go b/protocol/grpc/internal/client.go new file mode 100644 index 0000000000..bac3ce9488 --- /dev/null +++ b/protocol/grpc/internal/client.go @@ -0,0 +1,47 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "context" +) + +import ( + "google.golang.org/grpc" +) + +import ( + "github.com/apache/dubbo-go/config" +) + +func init() { + config.SetConsumerService(&GrpcGreeterImpl{}) +} + +// used for dubbo-grpc biz client +type GrpcGreeterImpl struct { + SayHello func(ctx context.Context, in *HelloRequest, out *HelloReply) error +} + +func (u *GrpcGreeterImpl) Reference() string { + return "GrpcGreeterImpl" +} + +func (u *GrpcGreeterImpl) GetDubboStub(cc *grpc.ClientConn) GreeterClient { + return NewGreeterClient(cc) +} diff --git a/protocol/grpc/internal/doc.go b/protocol/grpc/internal/doc.go new file mode 100644 index 0000000000..f2ef2ebd5e --- /dev/null +++ b/protocol/grpc/internal/doc.go @@ -0,0 +1,19 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// just for test, never use internal for production. +package internal diff --git a/protocol/grpc/internal/helloworld.pb.go b/protocol/grpc/internal/helloworld.pb.go new file mode 100644 index 0000000000..79b74ac650 --- /dev/null +++ b/protocol/grpc/internal/helloworld.pb.go @@ -0,0 +1,227 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: helloworld.proto + +package internal + +import ( + "context" + "fmt" + "math" + + "github.com/golang/protobuf/proto" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// The request message containing the user's name. +type HelloRequest struct { + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *HelloRequest) Reset() { *m = HelloRequest{} } +func (m *HelloRequest) String() string { return proto.CompactTextString(m) } +func (*HelloRequest) ProtoMessage() {} +func (*HelloRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_17b8c58d586b62f2, []int{0} +} + +func (m *HelloRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_HelloRequest.Unmarshal(m, b) +} +func (m *HelloRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_HelloRequest.Marshal(b, m, deterministic) +} +func (m *HelloRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_HelloRequest.Merge(m, src) +} +func (m *HelloRequest) XXX_Size() int { + return xxx_messageInfo_HelloRequest.Size(m) +} +func (m *HelloRequest) XXX_DiscardUnknown() { + xxx_messageInfo_HelloRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_HelloRequest proto.InternalMessageInfo + +func (m *HelloRequest) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +// The response message containing the greetings +type HelloReply struct { + Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *HelloReply) Reset() { *m = HelloReply{} } +func (m *HelloReply) String() string { return proto.CompactTextString(m) } +func (*HelloReply) ProtoMessage() {} +func (*HelloReply) Descriptor() ([]byte, []int) { + return fileDescriptor_17b8c58d586b62f2, []int{1} +} + +func (m *HelloReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_HelloReply.Unmarshal(m, b) +} +func (m *HelloReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_HelloReply.Marshal(b, m, deterministic) +} +func (m *HelloReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_HelloReply.Merge(m, src) +} +func (m *HelloReply) XXX_Size() int { + return xxx_messageInfo_HelloReply.Size(m) +} +func (m *HelloReply) XXX_DiscardUnknown() { + xxx_messageInfo_HelloReply.DiscardUnknown(m) +} + +var xxx_messageInfo_HelloReply proto.InternalMessageInfo + +func (m *HelloReply) GetMessage() string { + if m != nil { + return m.Message + } + return "" +} + +func init() { + proto.RegisterType((*HelloRequest)(nil), "helloworld.HelloRequest") + proto.RegisterType((*HelloReply)(nil), "helloworld.HelloReply") +} + +func init() { proto.RegisterFile("helloworld.proto", fileDescriptor_17b8c58d586b62f2) } + +var fileDescriptor_17b8c58d586b62f2 = []byte{ + // 175 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0xc8, 0x48, 0xcd, 0xc9, + 0xc9, 0x2f, 0xcf, 0x2f, 0xca, 0x49, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x42, 0x88, + 0x28, 0x29, 0x71, 0xf1, 0x78, 0x80, 0x78, 0x41, 0xa9, 0x85, 0xa5, 0xa9, 0xc5, 0x25, 0x42, 0x42, + 0x5c, 0x2c, 0x79, 0x89, 0xb9, 0xa9, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x60, 0xb6, 0x92, + 0x1a, 0x17, 0x17, 0x54, 0x4d, 0x41, 0x4e, 0xa5, 0x90, 0x04, 0x17, 0x7b, 0x6e, 0x6a, 0x71, 0x71, + 0x62, 0x3a, 0x4c, 0x11, 0x8c, 0x6b, 0xe4, 0xc9, 0xc5, 0xee, 0x5e, 0x94, 0x9a, 0x5a, 0x92, 0x5a, + 0x24, 0x64, 0xc7, 0xc5, 0x11, 0x9c, 0x58, 0x09, 0xd6, 0x25, 0x24, 0xa1, 0x87, 0xe4, 0x02, 0x64, + 0xcb, 0xa4, 0xc4, 0xb0, 0xc8, 0x14, 0xe4, 0x54, 0x2a, 0x31, 0x38, 0x19, 0x70, 0x49, 0x67, 0xe6, + 0xeb, 0xa5, 0x17, 0x15, 0x24, 0xeb, 0xa5, 0x56, 0x24, 0xe6, 0x16, 0xe4, 0xa4, 0x16, 0x23, 0xa9, + 0x75, 0xe2, 0x07, 0x2b, 0x0e, 0x07, 0xb1, 0x03, 0x40, 0x5e, 0x0a, 0x60, 0x4c, 0x62, 0x03, 0xfb, + 0xcd, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x0f, 0xb7, 0xcd, 0xf2, 0xef, 0x00, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// GreeterClient is the client API for Greeter service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type GreeterClient interface { + // Sends a greeting + SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) +} + +type greeterClient struct { + cc *grpc.ClientConn +} + +func NewGreeterClient(cc *grpc.ClientConn) GreeterClient { + return &greeterClient{cc} +} + +func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) { + out := new(HelloReply) + err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// GreeterServer is the server API for Greeter service. +type GreeterServer interface { + // Sends a greeting + SayHello(context.Context, *HelloRequest) (*HelloReply, error) +} + +// UnimplementedGreeterServer can be embedded to have forward compatible implementations. +type UnimplementedGreeterServer struct { +} + +func (*UnimplementedGreeterServer) SayHello(ctx context.Context, req *HelloRequest) (*HelloReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method SayHello not implemented") +} + +func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) { + s.RegisterService(&_Greeter_serviceDesc, srv) +} + +func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(HelloRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(GreeterServer).SayHello(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/helloworld.Greeter/SayHello", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Greeter_serviceDesc = grpc.ServiceDesc{ + ServiceName: "helloworld.Greeter", + HandlerType: (*GreeterServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "SayHello", + Handler: _Greeter_SayHello_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "helloworld.proto", +} diff --git a/protocol/grpc/internal/server.go b/protocol/grpc/internal/server.go new file mode 100644 index 0000000000..6491a5c218 --- /dev/null +++ b/protocol/grpc/internal/server.go @@ -0,0 +1,64 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "context" + "log" + "net" +) + +import ( + "google.golang.org/grpc" +) + +var ( + s *grpc.Server +) + +// server is used to implement helloworld.GreeterServer. +type server struct { + UnimplementedGreeterServer +} + +// SayHello implements helloworld.GreeterServer +func (s *server) SayHello(ctx context.Context, in *HelloRequest) (*HelloReply, error) { + log.Printf("Received: %v", in.GetName()) + return &HelloReply{Message: "Hello " + in.GetName()}, nil +} + +func InitGrpcServer() { + port := ":30000" + + lis, err := net.Listen("tcp", port) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + s = grpc.NewServer() + RegisterGreeterServer(s, &server{}) + if err := s.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } +} + +func ShutdownGrpcServer() { + if s == nil { + return + } + s.GracefulStop() +} diff --git a/protocol/grpc/protoc-gen-dubbo/examples/Makefile b/protocol/grpc/protoc-gen-dubbo/examples/Makefile new file mode 100644 index 0000000000..7893bbc51a --- /dev/null +++ b/protocol/grpc/protoc-gen-dubbo/examples/Makefile @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +grpc-gen: + protoc -I ./ helloworld.proto --go_out=plugins=grpc:. +dubbo-gen: + protoc -I ./ helloworld.proto --dubbo_out=plugins=grpc+dubbo:. diff --git a/protocol/grpc/protoc-gen-dubbo/examples/helloworld.pb.go b/protocol/grpc/protoc-gen-dubbo/examples/helloworld.pb.go new file mode 100644 index 0000000000..4ed55ab761 --- /dev/null +++ b/protocol/grpc/protoc-gen-dubbo/examples/helloworld.pb.go @@ -0,0 +1,301 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: helloworld.proto + +package main + +import ( + "context" + "fmt" + "math" + + "github.com/golang/protobuf/proto" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +import ( + "github.com/apache/dubbo-go/protocol" + dgrpc "github.com/apache/dubbo-go/protocol/grpc" + "github.com/apache/dubbo-go/protocol/invocation" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// The request message containing the user's name. +type HelloRequest struct { + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *HelloRequest) Reset() { *m = HelloRequest{} } +func (m *HelloRequest) String() string { return proto.CompactTextString(m) } +func (*HelloRequest) ProtoMessage() {} +func (*HelloRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_17b8c58d586b62f2, []int{0} +} + +func (m *HelloRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_HelloRequest.Unmarshal(m, b) +} +func (m *HelloRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_HelloRequest.Marshal(b, m, deterministic) +} +func (m *HelloRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_HelloRequest.Merge(m, src) +} +func (m *HelloRequest) XXX_Size() int { + return xxx_messageInfo_HelloRequest.Size(m) +} +func (m *HelloRequest) XXX_DiscardUnknown() { + xxx_messageInfo_HelloRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_HelloRequest proto.InternalMessageInfo + +func (m *HelloRequest) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +// The response message containing the greetings +type HelloReply struct { + Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *HelloReply) Reset() { *m = HelloReply{} } +func (m *HelloReply) String() string { return proto.CompactTextString(m) } +func (*HelloReply) ProtoMessage() {} +func (*HelloReply) Descriptor() ([]byte, []int) { + return fileDescriptor_17b8c58d586b62f2, []int{1} +} + +func (m *HelloReply) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_HelloReply.Unmarshal(m, b) +} +func (m *HelloReply) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_HelloReply.Marshal(b, m, deterministic) +} +func (m *HelloReply) XXX_Merge(src proto.Message) { + xxx_messageInfo_HelloReply.Merge(m, src) +} +func (m *HelloReply) XXX_Size() int { + return xxx_messageInfo_HelloReply.Size(m) +} +func (m *HelloReply) XXX_DiscardUnknown() { + xxx_messageInfo_HelloReply.DiscardUnknown(m) +} + +var xxx_messageInfo_HelloReply proto.InternalMessageInfo + +func (m *HelloReply) GetMessage() string { + if m != nil { + return m.Message + } + return "" +} + +func init() { + proto.RegisterType((*HelloRequest)(nil), "main.HelloRequest") + proto.RegisterType((*HelloReply)(nil), "main.HelloReply") +} + +func init() { proto.RegisterFile("helloworld.proto", fileDescriptor_17b8c58d586b62f2) } + +var fileDescriptor_17b8c58d586b62f2 = []byte{ + // 185 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0xc8, 0x48, 0xcd, 0xc9, + 0xc9, 0x2f, 0xcf, 0x2f, 0xca, 0x49, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0xc9, 0x4d, + 0xcc, 0xcc, 0x53, 0x52, 0xe2, 0xe2, 0xf1, 0x00, 0xc9, 0x04, 0xa5, 0x16, 0x96, 0xa6, 0x16, 0x97, + 0x08, 0x09, 0x71, 0xb1, 0xe4, 0x25, 0xe6, 0xa6, 0x4a, 0x30, 0x2a, 0x30, 0x6a, 0x70, 0x06, 0x81, + 0xd9, 0x4a, 0x6a, 0x5c, 0x5c, 0x50, 0x35, 0x05, 0x39, 0x95, 0x42, 0x12, 0x5c, 0xec, 0xb9, 0xa9, + 0xc5, 0xc5, 0x89, 0xe9, 0x30, 0x45, 0x30, 0xae, 0x91, 0x2d, 0x17, 0xbb, 0x7b, 0x51, 0x6a, 0x6a, + 0x49, 0x6a, 0x91, 0x90, 0x11, 0x17, 0x47, 0x70, 0x62, 0x25, 0x58, 0x97, 0x90, 0x90, 0x1e, 0xc8, + 0x26, 0x3d, 0x64, 0x6b, 0xa4, 0x04, 0x50, 0xc4, 0x0a, 0x72, 0x2a, 0x95, 0x18, 0x9c, 0xcc, 0xb8, + 0xa4, 0x33, 0xf3, 0xf5, 0xd2, 0x8b, 0x0a, 0x92, 0xf5, 0x52, 0x2b, 0x12, 0x73, 0x0b, 0x72, 0x52, + 0x8b, 0xf5, 0x10, 0xae, 0x76, 0xe2, 0x07, 0x2b, 0x0e, 0x07, 0xb1, 0x03, 0x40, 0x1e, 0x08, 0x60, + 0x5c, 0xc4, 0xc4, 0xec, 0xe1, 0x13, 0x9e, 0xc4, 0x06, 0xf6, 0x8f, 0x31, 0x20, 0x00, 0x00, 0xff, + 0xff, 0xd2, 0x16, 0x5f, 0x34, 0xe3, 0x00, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// GreeterClient is the client API for Greeter service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type GreeterClient interface { + // Sends a greeting + SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) +} + +type greeterClient struct { + cc *grpc.ClientConn +} + +func NewGreeterClient(cc *grpc.ClientConn) GreeterClient { + return &greeterClient{cc} +} + +func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) { + out := new(HelloReply) + err := c.cc.Invoke(ctx, "/main.Greeter/SayHello", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// GreeterServer is the server API for Greeter service. +type GreeterServer interface { + // Sends a greeting + SayHello(context.Context, *HelloRequest) (*HelloReply, error) +} + +// UnimplementedGreeterServer can be embedded to have forward compatible implementations. +type UnimplementedGreeterServer struct { +} + +func (*UnimplementedGreeterServer) SayHello(ctx context.Context, req *HelloRequest) (*HelloReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method SayHello not implemented") +} + +func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) { + s.RegisterService(&_Greeter_serviceDesc, srv) +} + +func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(HelloRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(GreeterServer).SayHello(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/main.Greeter/SayHello", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Greeter_serviceDesc = grpc.ServiceDesc{ + ServiceName: "main.Greeter", + HandlerType: (*GreeterServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "SayHello", + Handler: _Greeter_SayHello_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "helloworld.proto", +} + +// GreeterClientImpl is the client API for Greeter service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type GreeterClientImpl struct { + // Sends a greeting + SayHello func(ctx context.Context, in *HelloRequest, out *HelloReply) error +} + +func (c *GreeterClientImpl) Reference() string { + return "greeterImpl" +} + +func (c *GreeterClientImpl) GetDubboStub(cc *grpc.ClientConn) GreeterClient { + return NewGreeterClient(cc) +} + +type GreeterProviderBase struct { + proxyImpl protocol.Invoker +} + +func (s *GreeterProviderBase) SetProxyImpl(impl protocol.Invoker) { + s.proxyImpl = impl +} + +func (s *GreeterProviderBase) GetProxyImpl() protocol.Invoker { + return s.proxyImpl +} + +func _DUBBO_Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(HelloRequest) + if err := dec(in); err != nil { + return nil, err + } + base := srv.(dgrpc.DubboGrpcService) + args := []interface{}{} + args = append(args, in) + invo := invocation.NewRPCInvocation("SayHello", args, nil) + if interceptor == nil { + result := base.GetProxyImpl().Invoke(invo) + return result.Result(), result.Error() + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/main.Greeter/SayHello", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + result := base.GetProxyImpl().Invoke(invo) + return result.Result(), result.Error() + } + return interceptor(ctx, in, info, handler) +} + +func (s *GreeterProviderBase) ServiceDesc() *grpc.ServiceDesc { + return &grpc.ServiceDesc{ + ServiceName: "main.Greeter", + HandlerType: (*GreeterServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "SayHello", + Handler: _DUBBO_Greeter_SayHello_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "helloworld.proto", + } +} diff --git a/protocol/grpc/protoc-gen-dubbo/examples/helloworld.proto b/protocol/grpc/protoc-gen-dubbo/examples/helloworld.proto new file mode 100644 index 0000000000..d68e1dd37b --- /dev/null +++ b/protocol/grpc/protoc-gen-dubbo/examples/helloworld.proto @@ -0,0 +1,37 @@ +// Copyright 2015 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "io.grpc.examples.helloworld"; +option java_outer_classname = "HelloWorldProto"; +option objc_class_prefix = "HLW"; + +package main; + +// The greeting service definition. +service Greeter { + // Sends a greeting + rpc SayHello (HelloRequest) returns (HelloReply) {} +} + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} diff --git a/protocol/grpc/protoc-gen-dubbo/main.go b/protocol/grpc/protoc-gen-dubbo/main.go new file mode 100644 index 0000000000..b2f0e82f74 --- /dev/null +++ b/protocol/grpc/protoc-gen-dubbo/main.go @@ -0,0 +1,74 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "io/ioutil" + "os" +) + +import ( + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/protoc-gen-go/generator" + _ "github.com/golang/protobuf/protoc-gen-go/grpc" +) + +import ( + _ "github.com/apache/dubbo-go/protocol/grpc/protoc-gen-dubbo/plugin/dubbo" +) + +func main() { + // Begin by allocating a generate. The request and response structures are stored there + // so we can do error handling easily - the response structure contains the field to + // report failure. + g := generator.New() + + data, err := ioutil.ReadAll(os.Stdin) + if err != nil { + g.Error(err, "reading input") + } + + if err := proto.Unmarshal(data, g.Request); err != nil { + g.Error(err, "parsing input proto") + } + + if len(g.Request.FileToGenerate) == 0 { + g.Fail("no files to generate") + } + + g.CommandLineParameters(g.Request.GetParameter()) + + // Create a wrapped version of the Descriptors and EnumDescriptors that + // point to the file that defines them. + g.WrapTypes() + + g.SetPackageNames() + g.BuildTypeNameMap() + + g.GenerateAllFiles() + + // Send back the results. + data, err = proto.Marshal(g.Response) + if err != nil { + g.Error(err, "failed to marshal output proto") + } + _, err = os.Stdout.Write(data) + if err != nil { + g.Error(err, "failed to write output proto") + } +} diff --git a/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/doc.go b/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/doc.go new file mode 100644 index 0000000000..90799f3b4a --- /dev/null +++ b/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/doc.go @@ -0,0 +1,19 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// dubbo plugin for protobuf. +package dubbo diff --git a/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go b/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go new file mode 100644 index 0000000000..e84a7d0cc9 --- /dev/null +++ b/protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go @@ -0,0 +1,346 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dubbo + +import ( + "fmt" + "strconv" + "strings" +) + +import ( + pb "github.com/golang/protobuf/protoc-gen-go/descriptor" + "github.com/golang/protobuf/protoc-gen-go/generator" +) + +// generatedCodeVersion indicates a version of the generated code. +// It is incremented whenever an incompatibility between the generated code and +// the grpc package is introduced; the generated code references +// a constant, grpc.SupportPackageIsVersionN (where N is generatedCodeVersion). +const generatedCodeVersion = 4 + +// Paths for packages used by code generated in this file, +// relative to the import_prefix of the generator.Generator. +const ( + contextPkgPath = "context" + grpcPkgPath = "google.golang.org/grpc" + codePkgPath = "google.golang.org/grpc/codes" + statusPkgPath = "google.golang.org/grpc/status" +) + +func init() { + generator.RegisterPlugin(new(dubboGrpc)) +} + +// grpc is an implementation of the Go protocol buffer compiler's +// plugin architecture. It generates bindings for gRPC-dubbo support. +type dubboGrpc struct { + gen *generator.Generator +} + +// Name returns the name of this plugin, "grpc". +func (g *dubboGrpc) Name() string { + return "dubbo" +} + +// The names for packages imported in the generated code. +// They may vary from the final path component of the import path +// if the name is used by other packages. +var ( + contextPkg string + grpcPkg string +) + +// Init initializes the plugin. +func (g *dubboGrpc) Init(gen *generator.Generator) { + g.gen = gen +} + +// Given a type name defined in a .proto, return its object. +// Also record that we're using it, to guarantee the associated import. +func (g *dubboGrpc) objectNamed(name string) generator.Object { + g.gen.RecordTypeUse(name) + return g.gen.ObjectNamed(name) +} + +// Given a type name defined in a .proto, return its name as we will print it. +func (g *dubboGrpc) typeName(str string) string { + return g.gen.TypeName(g.objectNamed(str)) +} + +// P forwards to g.gen.P. +func (g *dubboGrpc) P(args ...interface{}) { g.gen.P(args...) } + +// Generate generates code for the services in the given file. +// be consistent with grpc plugin +func (g *dubboGrpc) Generate(file *generator.FileDescriptor) { + if len(file.FileDescriptorProto.Service) == 0 { + return + } + + contextPkg = string(g.gen.AddImport(contextPkgPath)) + grpcPkg = string(g.gen.AddImport(grpcPkgPath)) + + for i, service := range file.FileDescriptorProto.Service { + g.generateService(file, service, i) + } +} + +// GenerateImports generates the import declaration for this file. +func (g *dubboGrpc) GenerateImports(file *generator.FileDescriptor) { + g.P("import (") + g.P(`dgrpc "github.com/apache/dubbo-go/protocol/grpc"`) + g.P(`"github.com/apache/dubbo-go/protocol/invocation"`) + g.P(`"github.com/apache/dubbo-go/protocol"`) + g.P(`"github.com/apache/dubbo-go/config"`) + g.P(` ) `) +} + +func unexport(s string) string { return strings.ToLower(s[:1]) + s[1:] } + +// deprecationComment is the standard comment added to deprecated +// messages, fields, enums, and enum values. +var deprecationComment = "// Deprecated: Do not use." + +// generateService generates all the code for the named service. +func (g *dubboGrpc) generateService(file *generator.FileDescriptor, service *pb.ServiceDescriptorProto, index int) { + path := fmt.Sprintf("6,%d", index) // 6 means service. + + origServName := service.GetName() + fullServName := origServName + if pkg := file.GetPackage(); pkg != "" { + fullServName = pkg + "." + fullServName + } + servName := generator.CamelCase(origServName) + deprecated := service.GetOptions().GetDeprecated() + + g.P() + g.P(fmt.Sprintf(`// %sClientImpl is the client API for %s service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.`, servName, servName)) + + // Client interface. + if deprecated { + g.P("//") + g.P(deprecationComment) + } + dubboSrvName := servName + "ClientImpl" + g.P("type ", dubboSrvName, " struct {") + for i, method := range service.Method { + g.gen.PrintComments(fmt.Sprintf("%s,2,%d", path, i)) // 2 means method in a service. + if method.GetOptions().GetDeprecated() { + g.P("//") + g.P(deprecationComment) + } + g.P(g.generateClientSignature(servName, method)) + } + g.P("}") + g.P() + + // NewClient factory. + if deprecated { + g.P(deprecationComment) + } + + // add Reference method + //func (u *GrpcGreeterImpl) Reference() string { + // return "GrpcGreeterImpl" + //} + g.P("func (c *", dubboSrvName, ") ", " Reference() string ", "{") + g.P(`return "`, unexport(servName), `Impl"`) + g.P("}") + g.P() + + // add GetDubboStub method + // func (u *GrpcGreeterImpl) GetDubboStub(cc *grpc.ClientConn) GreeterClient { + // return NewGreeterClient(cc) + //} + g.P("func (c *", dubboSrvName, ") ", " GetDubboStub(cc *grpc.ClientConn) ", servName, "Client {") + g.P(`return New`, servName, `Client(cc)`) + g.P("}") + g.P() + + // Server interface. + serverType := servName + "ProviderBase" + g.P("type ", serverType, " struct {") + g.P("proxyImpl protocol.Invoker") + g.P("}") + g.P() + + // add set method + //func (g *GreeterProviderBase) SetProxyImpl(impl protocol.Invoker) { + // g.proxyImpl = impl + //} + g.P("func (s *", serverType, ") SetProxyImpl(impl protocol.Invoker) {") + g.P(`s.proxyImpl = impl`) + g.P("}") + g.P() + + // return get method + g.P("func (s *", serverType, ") GetProxyImpl() protocol.Invoker {") + g.P(`return s.proxyImpl`) + g.P("}") + g.P() + + // add handler + var handlerNames []string + for _, method := range service.Method { + hname := g.generateServerMethod(servName, fullServName, method) + handlerNames = append(handlerNames, hname) + } + + grpcserverType := servName + "Server" + // return service desc + g.P("func (s *", serverType, ") ServiceDesc() *grpc.ServiceDesc {") + g.P(`return &grpc.ServiceDesc{`) + g.P("ServiceName: ", strconv.Quote(fullServName), ",") + g.P("HandlerType: (*", grpcserverType, ")(nil),") + g.P("Methods: []", grpcPkg, ".MethodDesc{") + for i, method := range service.Method { + if method.GetServerStreaming() || method.GetClientStreaming() { + continue + } + g.P("{") + g.P("MethodName: ", strconv.Quote(method.GetName()), ",") + g.P("Handler: ", handlerNames[i], ",") + g.P("},") + } + g.P("},") + g.P("Streams: []", grpcPkg, ".StreamDesc{},") + g.P("Metadata: \"", file.GetName(), "\",") + g.P("}") + g.P("}") + g.P() +} + +// generateClientSignature returns the client-side signature for a method. +func (g *dubboGrpc) generateClientSignature(servName string, method *pb.MethodDescriptorProto) string { + origMethName := method.GetName() + methName := generator.CamelCase(origMethName) + //if reservedClientName[methName] { + // methName += "_" + //} + reqArg := ", in *" + g.typeName(method.GetInputType()) + if method.GetClientStreaming() { + reqArg = "" + } + respName := "out *" + g.typeName(method.GetOutputType()) + if method.GetServerStreaming() || method.GetClientStreaming() { + respName = servName + "_" + generator.CamelCase(origMethName) + "Client" + } + return fmt.Sprintf("%s func(ctx %s.Context%s, %s) error", methName, contextPkg, reqArg, respName) +} + +func (g *dubboGrpc) generateClientMethod(servName, fullServName, serviceDescVar string, method *pb.MethodDescriptorProto, descExpr string) { +} + +func (g *dubboGrpc) generateServerMethod(servName, fullServName string, method *pb.MethodDescriptorProto) string { + methName := generator.CamelCase(method.GetName()) + hname := fmt.Sprintf("_DUBBO_%s_%s_Handler", servName, methName) + inType := g.typeName(method.GetInputType()) + outType := g.typeName(method.GetOutputType()) + + if !method.GetServerStreaming() && !method.GetClientStreaming() { + g.P("func ", hname, "(srv interface{}, ctx ", contextPkg, ".Context, dec func(interface{}) error, interceptor ", grpcPkg, ".UnaryServerInterceptor) (interface{}, error) {") + g.P("in := new(", inType, ")") + g.P("if err := dec(in); err != nil { return nil, err }") + + g.P("base := srv.(dgrpc.DubboGrpcService)") + g.P("args := []interface{}{}") + g.P("args = append(args, in)") + g.P(`invo := invocation.NewRPCInvocation("`, methName, `", args, nil)`) + + g.P("if interceptor == nil {") + g.P("result := base.GetProxyImpl().Invoke(invo)") + g.P("return result.Result(), result.Error()") + g.P("}") + + g.P("info := &", grpcPkg, ".UnaryServerInfo{") + g.P("Server: srv,") + g.P("FullMethod: ", strconv.Quote(fmt.Sprintf("/%s/%s", fullServName, methName)), ",") + g.P("}") + + g.P("handler := func(ctx ", contextPkg, ".Context, req interface{}) (interface{}, error) {") + g.P("result := base.GetProxyImpl().Invoke(invo)") + g.P("return result.Result(), result.Error()") + g.P("}") + + g.P("return interceptor(ctx, in, info, handler)") + g.P("}") + g.P() + return hname + } + streamType := unexport(servName) + methName + "Server" + g.P("func ", hname, "(srv interface{}, stream ", grpcPkg, ".ServerStream) error {") + if !method.GetClientStreaming() { + g.P("m := new(", inType, ")") + g.P("if err := stream.RecvMsg(m); err != nil { return err }") + g.P("return srv.(", servName, "Server).", methName, "(m, &", streamType, "{stream})") + } else { + g.P("return srv.(", servName, "Server).", methName, "(&", streamType, "{stream})") + } + g.P("}") + g.P() + + genSend := method.GetServerStreaming() + genSendAndClose := !method.GetServerStreaming() + genRecv := method.GetClientStreaming() + + // Stream auxiliary types and methods. + g.P("type ", servName, "_", methName, "Server interface {") + if genSend { + g.P("Send(*", outType, ") error") + } + if genSendAndClose { + g.P("SendAndClose(*", outType, ") error") + } + if genRecv { + g.P("Recv() (*", inType, ", error)") + } + g.P(grpcPkg, ".ServerStream") + g.P("}") + g.P() + + g.P("type ", streamType, " struct {") + g.P(grpcPkg, ".ServerStream") + g.P("}") + g.P() + + if genSend { + g.P("func (x *", streamType, ") Send(m *", outType, ") error {") + g.P("return x.ServerStream.SendMsg(m)") + g.P("}") + g.P() + } + if genSendAndClose { + g.P("func (x *", streamType, ") SendAndClose(m *", outType, ") error {") + g.P("return x.ServerStream.SendMsg(m)") + g.P("}") + g.P() + } + if genRecv { + g.P("func (x *", streamType, ") Recv() (*", inType, ", error) {") + g.P("m := new(", inType, ")") + g.P("if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err }") + g.P("return m, nil") + g.P("}") + g.P() + } + + return hname +} diff --git a/protocol/grpc/server.go b/protocol/grpc/server.go new file mode 100644 index 0000000000..0777c09bb4 --- /dev/null +++ b/protocol/grpc/server.go @@ -0,0 +1,101 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpc + +import ( + "fmt" + "net" + "reflect" +) + +import ( + "google.golang.org/grpc" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/protocol" +) + +type Server struct { + grpcServer *grpc.Server +} + +func NewServer() *Server { + return &Server{} +} + +type DubboGrpcService interface { + SetProxyImpl(impl protocol.Invoker) + GetProxyImpl() protocol.Invoker + ServiceDesc() *grpc.ServiceDesc +} + +func (s *Server) Start(url common.URL) { + var ( + addr string + err error + ) + addr = url.Location + lis, err := net.Listen("tcp", addr) + if err != nil { + panic(err) + } + server := grpc.NewServer() + + key := url.GetParam(constant.BEAN_NAME_KEY, "") + service := config.GetProviderService(key) + + ds, ok := service.(DubboGrpcService) + if !ok { + panic("illegal service type registered") + } + + m, ok := reflect.TypeOf(service).MethodByName("SetProxyImpl") + if !ok { + panic("method SetProxyImpl is necessary for grpc service") + } + + exporter, _ := grpcProtocol.ExporterMap().Load(url.ServiceKey()) + if exporter == nil { + panic(fmt.Sprintf("no exporter found for servicekey: %v", url.ServiceKey())) + } + invoker := exporter.(protocol.Exporter).GetInvoker() + if invoker == nil { + panic(fmt.Sprintf("no invoker found for servicekey: %v", url.ServiceKey())) + } + in := []reflect.Value{reflect.ValueOf(service)} + in = append(in, reflect.ValueOf(invoker)) + m.Func.Call(in) + + server.RegisterService(ds.ServiceDesc(), service) + + s.grpcServer = server + go func() { + if err = server.Serve(lis); err != nil { + logger.Errorf("server serve failed with err: %v", err) + } + }() +} + +func (s *Server) Stop() { + s.grpcServer.Stop() +} diff --git a/protocol/invocation.go b/protocol/invocation.go index 055e7a4cd1..b0ccab39e8 100644 --- a/protocol/invocation.go +++ b/protocol/invocation.go @@ -24,6 +24,7 @@ import ( type Invocation interface { MethodName() string ParameterTypes() []reflect.Type + ParameterValues() []reflect.Value Arguments() []interface{} Reply() interface{} Attachments() map[string]string diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index bddd83b5db..a9b8695da9 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -31,14 +31,15 @@ import ( ///////////////////////////// // todo: is it necessary to separate fields of consumer(provider) from RPCInvocation type RPCInvocation struct { - methodName string - parameterTypes []reflect.Type - arguments []interface{} - reply interface{} - callBack interface{} - attachments map[string]string - invoker protocol.Invoker - lock sync.RWMutex + methodName string + parameterTypes []reflect.Type + parameterValues []reflect.Value + arguments []interface{} + reply interface{} + callBack interface{} + attachments map[string]string + invoker protocol.Invoker + lock sync.RWMutex } func NewRPCInvocation(methodName string, arguments []interface{}, attachments map[string]string) *RPCInvocation { @@ -65,6 +66,10 @@ func (r *RPCInvocation) ParameterTypes() []reflect.Type { return r.parameterTypes } +func (r *RPCInvocation) ParameterValues() []reflect.Value { + return r.parameterValues +} + func (r *RPCInvocation) Arguments() []interface{} { return r.arguments } @@ -137,6 +142,12 @@ func WithParameterTypes(parameterTypes []reflect.Type) option { } } +func WithParameterValues(parameterValues []reflect.Value) option { + return func(invo *RPCInvocation) { + invo.parameterValues = parameterValues + } +} + func WithArguments(arguments []interface{}) option { return func(invo *RPCInvocation) { invo.arguments = arguments diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 1defedc28a..24c4158e8c 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -370,6 +370,7 @@ func (r *zkRegistry) register(c common.URL) error { return perrors.Errorf("@c{%v} type is not referencer or provider", c) } + dubboPath = strings.ReplaceAll(dubboPath, "$", "%24") err = r.registerTempZookeeperNode(dubboPath, encodedURL) if err != nil { diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index a7fc568f56..19d65291e9 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -290,6 +290,7 @@ func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) { z.Lock() a := z.eventRegistry[zkPath] a = append(a, event) + z.eventRegistry[zkPath] = a logger.Debugf("zkClient{%s} register event{path:%s, ptr:%p}", z.name, zkPath, event) z.Unlock() diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 9521ea7490..0b9db5e09d 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -19,6 +19,7 @@ package zookeeper import ( "path" + "strings" "sync" "time" ) @@ -273,6 +274,7 @@ func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.Da children []string ) + zkPath = strings.ReplaceAll(zkPath, "$", "%24") l.pathMapLock.Lock() _, ok := l.pathMap[zkPath] l.pathMapLock.Unlock()