Skip to content

Commit

Permalink
feat(grpcrouter): add hybrid query handler (#17921)
Browse files Browse the repository at this point in the history
Co-authored-by: unknown unknown <unknown@unknown>
  • Loading branch information
testinginprod and unknown unknown authored Oct 11, 2023
1 parent 44934e3 commit ecf14b7
Show file tree
Hide file tree
Showing 3 changed files with 343 additions and 42 deletions.
133 changes: 91 additions & 42 deletions baseapp/grpcrouter.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package baseapp

import (
"context"
"fmt"

abci "github.com/cometbft/cometbft/abci/types"
gogogrpc "github.com/cosmos/gogoproto/grpc"
"github.com/cosmos/gogoproto/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/runtime/protoiface"

"github.com/cosmos/cosmos-sdk/baseapp/internal/protocompat"
"github.com/cosmos/cosmos-sdk/client/grpc/reflection"
"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
Expand All @@ -16,8 +21,16 @@ import (

// GRPCQueryRouter routes ABCI Query requests to GRPC handlers
type GRPCQueryRouter struct {
routes map[string]GRPCQueryHandler
cdc encoding.Codec
// routes maps query handlers used in ABCIQuery.
routes map[string]GRPCQueryHandler
// handlerByMessageName maps the request name to the handler. It is a hybrid handler which seamlessly
// handles both gogo and protov2 messages.
handlerByMessageName map[string][]func(ctx context.Context, req, resp protoiface.MessageV1) error
// binaryCodec is used to encode/decode binary protobuf messages.
binaryCodec codec.BinaryCodec
// cdc is the gRPC codec used by the router to correctly unmarshal messages.
cdc encoding.Codec
// serviceData contains the gRPC services and their handlers.
serviceData []serviceData
}

Expand All @@ -32,7 +45,8 @@ var _ gogogrpc.Server = &GRPCQueryRouter{}
// NewGRPCQueryRouter creates a new GRPCQueryRouter
func NewGRPCQueryRouter() *GRPCQueryRouter {
return &GRPCQueryRouter{
routes: map[string]GRPCQueryHandler{},
routes: map[string]GRPCQueryHandler{},
handlerByMessageName: map[string][]func(ctx context.Context, req, resp protoiface.MessageV1) error{},
}
}

Expand All @@ -58,46 +72,13 @@ func (qrt *GRPCQueryRouter) Route(path string) GRPCQueryHandler {
func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interface{}) {
// adds a top-level query handler based on the gRPC service name
for _, method := range sd.Methods {
fqName := fmt.Sprintf("/%s/%s", sd.ServiceName, method.MethodName)
methodHandler := method.Handler

// Check that each service is only registered once. If a service is
// registered more than once, then we should error. Since we can't
// return an error (`Server.RegisterService` interface restriction) we
// panic (at startup).
_, found := qrt.routes[fqName]
if found {
panic(
fmt.Errorf(
"gRPC query service %s has already been registered. Please make sure to only register each service once. "+
"This usually means that there are conflicting modules registering the same gRPC query service",
fqName,
),
)
err := qrt.registerABCIQueryHandler(sd, method, handler)
if err != nil {
panic(err)
}

qrt.routes[fqName] = func(ctx sdk.Context, req *abci.RequestQuery) (*abci.ResponseQuery, error) {
// call the method handler from the service description with the handler object,
// a wrapped sdk.Context with proto-unmarshaled data from the ABCI request data
res, err := methodHandler(handler, ctx, func(i interface{}) error {
return qrt.cdc.Unmarshal(req.Data, i)
}, nil)
if err != nil {
return nil, err
}

// proto marshal the result bytes
var resBytes []byte
resBytes, err = qrt.cdc.Marshal(res)
if err != nil {
return nil, err
}

// return the result bytes as the response value
return &abci.ResponseQuery{
Height: req.Height,
Value: resBytes,
}, nil
err = qrt.registerHandlerByMessageName(sd, method, handler)
if err != nil {
panic(err)
}
}

Expand All @@ -107,9 +88,77 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf
})
}

func (qrt *GRPCQueryRouter) registerABCIQueryHandler(sd *grpc.ServiceDesc, method grpc.MethodDesc, handler interface{}) error {
fqName := fmt.Sprintf("/%s/%s", sd.ServiceName, method.MethodName)
methodHandler := method.Handler

// Check that each service is only registered once. If a service is
// registered more than once, then we should error. Since we can't
// return an error (`Server.RegisterService` interface restriction) we
// panic (at startup).
_, found := qrt.routes[fqName]
if found {
return fmt.Errorf(
"gRPC query service %s has already been registered. Please make sure to only register each service once. "+
"This usually means that there are conflicting modules registering the same gRPC query service",
fqName,
)
}

qrt.routes[fqName] = func(ctx sdk.Context, req *abci.RequestQuery) (*abci.ResponseQuery, error) {
// call the method handler from the service description with the handler object,
// a wrapped sdk.Context with proto-unmarshaled data from the ABCI request data
res, err := methodHandler(handler, ctx, func(i interface{}) error {
return qrt.cdc.Unmarshal(req.Data, i)
}, nil)
if err != nil {
return nil, err
}

// proto marshal the result bytes
var resBytes []byte
resBytes, err = qrt.cdc.Marshal(res)
if err != nil {
return nil, err
}

// return the result bytes as the response value
return &abci.ResponseQuery{
Height: req.Height,
Value: resBytes,
}, nil
}
return nil
}

func (qrt *GRPCQueryRouter) HandlersByRequestName(name string) []func(ctx context.Context, req, resp protoiface.MessageV1) error {
return qrt.handlerByMessageName[name]
}

func (qrt *GRPCQueryRouter) registerHandlerByMessageName(sd *grpc.ServiceDesc, method grpc.MethodDesc, handler interface{}) error {
// extract message name from method descriptor
methodFullName := protoreflect.FullName(fmt.Sprintf("%s.%s", sd.ServiceName, method.MethodName))
desc, err := proto.HybridResolver.FindDescriptorByName(methodFullName)
if err != nil {
return fmt.Errorf("cannot find method descriptor %s", methodFullName)
}
methodDesc, ok := desc.(protoreflect.MethodDescriptor)
if !ok {
return fmt.Errorf("invalid method descriptor %s", methodFullName)
}
inputName := methodDesc.Input().FullName()
methodHandler, err := protocompat.MakeHybridHandler(qrt.binaryCodec, sd, method, handler)
if err != nil {
return err
}
qrt.handlerByMessageName[string(inputName)] = append(qrt.handlerByMessageName[string(inputName)], methodHandler)
return nil
}

// SetInterfaceRegistry sets the interface registry for the router. This will
// also register the interface reflection gRPC service.
func (qrt *GRPCQueryRouter) SetInterfaceRegistry(interfaceRegistry codectypes.InterfaceRegistry) {
qrt.binaryCodec = codec.NewProtoCodec(interfaceRegistry)
// instantiate the codec
qrt.cdc = codec.NewProtoCodec(interfaceRegistry).GRPCCodec()
// Once we have an interface registry, we can register the interface
Expand Down
44 changes: 44 additions & 0 deletions baseapp/grpcrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,50 @@ func TestGRPCQueryRouter(t *testing.T) {
require.Equal(t, spot, res3.HasAnimal.Animal.GetCachedValue())
}

func TestGRPCRouterHybridHandlers(t *testing.T) {
assertRouterBehaviour := func(helper *baseapp.QueryServiceTestHelper) {
// test getting the handler by name
handlers := helper.GRPCQueryRouter.HandlersByRequestName("testpb.EchoRequest")
require.NotNil(t, handlers)
require.Len(t, handlers, 1)
handler := handlers[0]
// sending a protov2 message should work, and return a protov2 message
v2Resp := new(testdata_pulsar.EchoResponse)
err := handler(helper.Ctx, &testdata_pulsar.EchoRequest{Message: "hello"}, v2Resp)
require.Nil(t, err)
require.Equal(t, "hello", v2Resp.Message)
// also sending a protov1 message should work, and return a gogoproto message
gogoResp := new(testdata.EchoResponse)
err = handler(helper.Ctx, &testdata.EchoRequest{Message: "hello"}, gogoResp)
require.NoError(t, err)
require.Equal(t, "hello", gogoResp.Message)
}

t.Run("protov2 server", func(t *testing.T) {
qr := baseapp.NewGRPCQueryRouter()
interfaceRegistry := testdata.NewTestInterfaceRegistry()
qr.SetInterfaceRegistry(interfaceRegistry)
testdata_pulsar.RegisterQueryServer(qr, testdata_pulsar.QueryImpl{})
helper := &baseapp.QueryServiceTestHelper{
GRPCQueryRouter: qr,
Ctx: sdk.Context{}.WithContext(context.Background()),
}
assertRouterBehaviour(helper)
})

t.Run("gogoproto server", func(t *testing.T) {
qr := baseapp.NewGRPCQueryRouter()
interfaceRegistry := testdata.NewTestInterfaceRegistry()
qr.SetInterfaceRegistry(interfaceRegistry)
testdata.RegisterQueryServer(qr, testdata.QueryImpl{})
helper := &baseapp.QueryServiceTestHelper{
GRPCQueryRouter: qr,
Ctx: sdk.Context{}.WithContext(context.Background()),
}
assertRouterBehaviour(helper)
})
}

func TestRegisterQueryServiceTwice(t *testing.T) {
// Setup baseapp.
var appBuilder *runtime.AppBuilder
Expand Down
Loading

0 comments on commit ecf14b7

Please sign in to comment.