Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(grpcrouter): add hybrid query handler #17921

Merged
merged 10 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 86 additions & 43 deletions baseapp/grpcrouter.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package baseapp

import (
"context"
"fmt"

abci "github.com/cometbft/cometbft/abci/types"
gogogrpc "github.com/cosmos/gogoproto/grpc"
"github.com/cosmos/gogoproto/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/runtime/protoiface"

"github.com/cosmos/cosmos-sdk/baseapp/internal/protocompat"
"github.com/cosmos/cosmos-sdk/client/grpc/reflection"
"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
Expand All @@ -16,9 +21,11 @@ import (

// GRPCQueryRouter routes ABCI Query requests to GRPC handlers
type GRPCQueryRouter struct {
routes map[string]GRPCQueryHandler
cdc encoding.Codec
serviceData []serviceData
routes map[string]GRPCQueryHandler
handlerByMessageName map[string][]func(ctx context.Context, msg protoiface.MessageV1) (protoiface.MessageV1, error)
sdkCodec codec.BinaryCodec
cdc encoding.Codec
testinginprod marked this conversation as resolved.
Show resolved Hide resolved
serviceData []serviceData
}

// serviceData represents a gRPC service, along with its handler.
Expand All @@ -32,7 +39,8 @@ var _ gogogrpc.Server = &GRPCQueryRouter{}
// NewGRPCQueryRouter creates a new GRPCQueryRouter
func NewGRPCQueryRouter() *GRPCQueryRouter {
return &GRPCQueryRouter{
routes: map[string]GRPCQueryHandler{},
routes: map[string]GRPCQueryHandler{},
handlerByMessageName: map[string][]func(ctx context.Context, msg protoiface.MessageV1) (protoiface.MessageV1, error){},
}
}

Expand All @@ -58,46 +66,13 @@ func (qrt *GRPCQueryRouter) Route(path string) GRPCQueryHandler {
func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interface{}) {
// adds a top-level query handler based on the gRPC service name
for _, method := range sd.Methods {
fqName := fmt.Sprintf("/%s/%s", sd.ServiceName, method.MethodName)
methodHandler := method.Handler

// Check that each service is only registered once. If a service is
// registered more than once, then we should error. Since we can't
// return an error (`Server.RegisterService` interface restriction) we
// panic (at startup).
_, found := qrt.routes[fqName]
if found {
panic(
fmt.Errorf(
"gRPC query service %s has already been registered. Please make sure to only register each service once. "+
"This usually means that there are conflicting modules registering the same gRPC query service",
fqName,
),
)
err := qrt.registerABCIQueryHandler(sd, method, handler)
if err != nil {
panic(err)
}

qrt.routes[fqName] = func(ctx sdk.Context, req *abci.RequestQuery) (*abci.ResponseQuery, error) {
// call the method handler from the service description with the handler object,
// a wrapped sdk.Context with proto-unmarshaled data from the ABCI request data
res, err := methodHandler(handler, ctx, func(i interface{}) error {
return qrt.cdc.Unmarshal(req.Data, i)
}, nil)
if err != nil {
return nil, err
}

// proto marshal the result bytes
var resBytes []byte
resBytes, err = qrt.cdc.Marshal(res)
if err != nil {
return nil, err
}

// return the result bytes as the response value
return &abci.ResponseQuery{
Height: req.Height,
Value: resBytes,
}, nil
err = qrt.registerHandlerByMessageName(sd, method, handler)
if err != nil {
panic(err)
}
}

Expand All @@ -107,9 +82,77 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf
})
}

func (qrt *GRPCQueryRouter) registerABCIQueryHandler(sd *grpc.ServiceDesc, method grpc.MethodDesc, handler interface{}) error {
fqName := fmt.Sprintf("/%s/%s", sd.ServiceName, method.MethodName)
methodHandler := method.Handler

// Check that each service is only registered once. If a service is
// registered more than once, then we should error. Since we can't
// return an error (`Server.RegisterService` interface restriction) we
// panic (at startup).
_, found := qrt.routes[fqName]
if found {
return fmt.Errorf(
"gRPC query service %s has already been registered. Please make sure to only register each service once. "+
"This usually means that there are conflicting modules registering the same gRPC query service",
fqName,
)
}

qrt.routes[fqName] = func(ctx sdk.Context, req *abci.RequestQuery) (*abci.ResponseQuery, error) {
// call the method handler from the service description with the handler object,
// a wrapped sdk.Context with proto-unmarshaled data from the ABCI request data
res, err := methodHandler(handler, ctx, func(i interface{}) error {
return qrt.cdc.Unmarshal(req.Data, i)
}, nil)
if err != nil {
return nil, err
}

// proto marshal the result bytes
var resBytes []byte
resBytes, err = qrt.cdc.Marshal(res)
if err != nil {
return nil, err
}

// return the result bytes as the response value
return &abci.ResponseQuery{
Height: req.Height,
Value: resBytes,
}, nil
}
return nil
}

func (qrt *GRPCQueryRouter) HandlersByRequestName(name string) []func(ctx context.Context, req protoiface.MessageV1) (resp protoiface.MessageV1, err error) {
return qrt.handlerByMessageName[name]
}

func (qrt *GRPCQueryRouter) registerHandlerByMessageName(sd *grpc.ServiceDesc, method grpc.MethodDesc, handler interface{}) error {
// extract message name from method descriptor
methodFullName := protoreflect.FullName(fmt.Sprintf("%s.%s", sd.ServiceName, method.MethodName))
desc, err := proto.HybridResolver.FindDescriptorByName(methodFullName)
if err != nil {
return fmt.Errorf("cannot find method descriptor %s", methodFullName)
}
methodDesc, ok := desc.(protoreflect.MethodDescriptor)
if !ok {
return fmt.Errorf("invalid method descriptor %s", methodFullName)
}
inputName := methodDesc.Input().FullName()
methodHandler, err := protocompat.MakeHybridHandler(qrt.sdkCodec, sd, method, handler)
if err != nil {
return err
}
qrt.handlerByMessageName[string(inputName)] = append(qrt.handlerByMessageName[string(inputName)], methodHandler)
return nil
}

// SetInterfaceRegistry sets the interface registry for the router. This will
// also register the interface reflection gRPC service.
func (qrt *GRPCQueryRouter) SetInterfaceRegistry(interfaceRegistry codectypes.InterfaceRegistry) {
qrt.sdkCodec = codec.NewProtoCodec(interfaceRegistry)
// instantiate the codec
qrt.cdc = codec.NewProtoCodec(interfaceRegistry).GRPCCodec()
// Once we have an interface registry, we can register the interface
Expand Down
48 changes: 48 additions & 0 deletions baseapp/grpcrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,54 @@ func TestGRPCQueryRouter(t *testing.T) {
require.Equal(t, spot, res3.HasAnimal.Animal.GetCachedValue())
}

func TestGRPCRouterHybridHandlers(t *testing.T) {
assertRouterBehaviour := func(helper *baseapp.QueryServiceTestHelper) {
// test getting the handler by name
handlers := helper.GRPCQueryRouter.HandlersByRequestName("testpb.EchoRequest")
require.NotNil(t, handlers)
require.Len(t, handlers, 1)
handler := handlers[0]
// sending a protov2 message should work, and return a protov2 message
resp, err := handler(helper.Ctx, &testdata_pulsar.EchoRequest{Message: "hello"})
require.Nil(t, err)
require.NotNil(t, resp)
protov2Resp, ok := resp.(*testdata_pulsar.EchoResponse)
require.True(t, ok)
require.Equal(t, "hello", protov2Resp.Message)
// also sending a protov1 message should work, and return a gogoproto message
resp, err = handler(helper.Ctx, &testdata.EchoRequest{Message: "hello"})
require.Nil(t, err)
require.NotNil(t, resp)
gogoprotoResp, ok := resp.(*testdata.EchoResponse)
require.True(t, ok)
require.Equal(t, "hello", gogoprotoResp.Message)
}

t.Run("protov2 server", func(t *testing.T) {
qr := baseapp.NewGRPCQueryRouter()
interfaceRegistry := testdata.NewTestInterfaceRegistry()
qr.SetInterfaceRegistry(interfaceRegistry)
testdata_pulsar.RegisterQueryServer(qr, testdata_pulsar.QueryImpl{})
helper := &baseapp.QueryServiceTestHelper{
GRPCQueryRouter: qr,
Ctx: sdk.Context{}.WithContext(context.Background()),
}
assertRouterBehaviour(helper)
})

t.Run("gogoproto server", func(t *testing.T) {
qr := baseapp.NewGRPCQueryRouter()
interfaceRegistry := testdata.NewTestInterfaceRegistry()
qr.SetInterfaceRegistry(interfaceRegistry)
testdata.RegisterQueryServer(qr, testdata.QueryImpl{})
helper := &baseapp.QueryServiceTestHelper{
GRPCQueryRouter: qr,
Ctx: sdk.Context{}.WithContext(context.Background()),
}
assertRouterBehaviour(helper)
})
}

func TestRegisterQueryServiceTwice(t *testing.T) {
// Setup baseapp.
var appBuilder *runtime.AppBuilder
Expand Down
Loading
Loading