Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GRPC client for the client package. #378

Merged
merged 3 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions integration-tests/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package integrationtests

import (
"reflect"
"strings"

cosmosclient "github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/flags"
Expand All @@ -10,6 +11,7 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/google/uuid"
"github.com/pkg/errors"
"google.golang.org/grpc"

"github.com/CoreumFoundation/coreum/app"
"github.com/CoreumFoundation/coreum/pkg/client"
Expand Down Expand Up @@ -151,7 +153,7 @@ func (c ChainContext) ComputeNeededBalanceFromOptions(options BalancesOptions) s

// ChainConfig defines the config arguments required for the test chain initialisation.
type ChainConfig struct {
RPCAddress string
GRPCAddress string
NetworkConfig config.NetworkConfig
FundingMnemonic string
StakerMnemonics []string
Expand All @@ -166,16 +168,26 @@ type Chain struct {

// NewChain creates an instance of the new Chain.
func NewChain(cfg ChainConfig) Chain {
rpcClient, err := cosmosclient.NewClientFromNode(cfg.RPCAddress)
if err != nil {
panic(err)
}
clientCtx := client.NewContext(client.DefaultContextConfig(), app.ModuleBasics).
WithChainID(string(cfg.NetworkConfig.ChainID)).
WithClient(rpcClient).
WithKeyring(newConcurrentSafeKeyring(keyring.NewInMemory())).
WithBroadcastMode(flags.BroadcastBlock)

// TODO(dhil) remove switch once crust is updated
if strings.HasPrefix(cfg.GRPCAddress, "tcp") {
rpcClient, err := cosmosclient.NewClientFromNode(cfg.GRPCAddress)
if err != nil {
panic(err)
}
clientCtx = clientCtx.WithRPCClient(rpcClient)
} else {
grpcClient, err := grpc.Dial(cfg.GRPCAddress, grpc.WithInsecure())
if err != nil {
panic(err)
}
clientCtx = clientCtx.WithGRPCClient(grpcClient)
}

chainCtx := NewChainContext(clientCtx, cfg.NetworkConfig)
governance := NewGovernance(chainCtx, cfg.StakerMnemonics)

Expand Down
8 changes: 5 additions & 3 deletions integration-tests/gov.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/client/grpc/tmservice"
sdk "github.com/cosmos/cosmos-sdk/types"
govtypes "github.com/cosmos/cosmos-sdk/x/gov/types"
paramproposal "github.com/cosmos/cosmos-sdk/x/params/types/proposal"
Expand Down Expand Up @@ -227,12 +228,13 @@ func (g Governance) WaitForVotingToFinalize(ctx context.Context, proposalID uint
return proposal.Status, err
}

block, err := g.chainCtx.ClientContext.Client().Block(ctx, nil)
tmQueryClient := tmservice.NewServiceClient(g.chainCtx.ClientContext)
blockRes, err := tmQueryClient.GetLatestBlock(ctx, &tmservice.GetLatestBlockRequest{})
if err != nil {
return proposal.Status, errors.WithStack(err)
}
if block.Block.Time.Before(proposal.VotingEndTime) {
waitCtx, waitCancel := context.WithTimeout(ctx, proposal.VotingEndTime.Sub(block.Block.Time))
if blockRes.Block.Header.Time.Before(proposal.VotingEndTime) {
waitCtx, waitCancel := context.WithTimeout(ctx, proposal.VotingEndTime.Sub(blockRes.Block.Header.Time))
defer waitCancel()

<-waitCtx.Done()
Expand Down
8 changes: 4 additions & 4 deletions integration-tests/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (m *stringsFlag) Set(val string) error {
}

type testingConfig struct {
RPCAddress string
GRPCAddress string
NetworkConfig config.NetworkConfig
FundingMnemonic string
StakerMnemonics []string
Expand All @@ -45,7 +45,7 @@ func init() {
stakerMnemonics stringsFlag
)

flag.StringVar(&coredAddress, "cored-address", "tcp://localhost:26657", "Address of cored node started by znet")
flag.StringVar(&coredAddress, "cored-address", "localhost:9090", "Address of cored node started by znet")
flag.StringVar(&fundingMnemonic, "funding-mnemonic", "pitch basic bundle cause toe sound warm love town crucial divorce shell olympic convince scene middle garment glimpse narrow during fix fruit suffer honey", "Funding account mnemonic required by tests")
flag.Var(&stakerMnemonics, "staker-mnemonic", "Staker account mnemonics required by tests, supports multiple")
flag.StringVar(&logFormat, "log-format", string(logger.ToolDefaultConfig.Format), "Format of logs produced by tests")
Expand All @@ -69,7 +69,7 @@ func init() {
panic(fmt.Sprintf("can't create network config for the integration tests: %s", err))
}
cfg = testingConfig{
RPCAddress: coredAddress,
GRPCAddress: coredAddress,
NetworkConfig: networkConfig,
FundingMnemonic: fundingMnemonic,
StakerMnemonics: stakerMnemonics,
Expand All @@ -81,7 +81,7 @@ func init() {
config.NewNetwork(cfg.NetworkConfig).SetSDKConfig()

chain = NewChain(ChainConfig{
RPCAddress: cfg.RPCAddress,
GRPCAddress: cfg.GRPCAddress,
NetworkConfig: cfg.NetworkConfig,
FundingMnemonic: cfg.FundingMnemonic,
StakerMnemonics: cfg.StakerMnemonics,
Expand Down
49 changes: 26 additions & 23 deletions integration-tests/upgrade/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ package upgrade

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/cosmos/cosmos-sdk/client/grpc/tmservice"
govtypes "github.com/cosmos/cosmos-sdk/x/gov/types"
upgradetypes "github.com/cosmos/cosmos-sdk/x/upgrade/types"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/rpc/client"
"go.uber.org/zap"

"github.com/CoreumFoundation/coreum-tools/pkg/logger"
Expand All @@ -35,10 +35,15 @@ func TestUpgrade(t *testing.T) {
requireT.NoError(err)
requireT.Nil(currentPlan.Plan)

infoBefore, err := info(ctx, chain.ClientContext.Client())
tmQueryClient := tmservice.NewServiceClient(chain.ClientContext)
infoBeforeRes, err := tmQueryClient.GetNodeInfo(ctx, &tmservice.GetNodeInfoRequest{})
requireT.NoError(err)
require.False(t, strings.HasSuffix(infoBefore.Version, "-upgrade"))
upgradeHeight := infoBefore.LastBlockHeight + 30
require.False(t, strings.HasSuffix(infoBeforeRes.ApplicationVersion.Version, "-upgrade"))

latestBlockRes, err := tmQueryClient.GetLatestBlock(ctx, &tmservice.GetLatestBlockRequest{})
requireT.NoError(err)

upgradeHeight := latestBlockRes.Block.Header.Height + 30

// Create new proposer.
proposer := chain.GenAccount()
Expand Down Expand Up @@ -85,23 +90,24 @@ func TestUpgrade(t *testing.T) {
assert.Equal(t, "upgrade", currentPlan.Plan.Name)
assert.Equal(t, upgradeHeight, currentPlan.Plan.Height)

infoWaiting, err := info(ctx, chain.ClientContext.Client())
infoWaitingBlockRes, err := tmQueryClient.GetLatestBlock(ctx, &tmservice.GetLatestBlockRequest{})
requireT.NoError(err)
log.Info("Waiting for upgrade", zap.Int64("upgradeHeight", upgradeHeight), zap.Int64("currentHeight", infoWaiting.LastBlockHeight))

retryCtx, cancel := context.WithTimeout(ctx, 3*time.Second*time.Duration(upgradeHeight-infoWaiting.LastBlockHeight))
retryCtx, cancel := context.WithTimeout(ctx, 3*time.Second*time.Duration(upgradeHeight-infoWaitingBlockRes.Block.Header.Height))
defer cancel()
var infoAfter abci.ResponseInfo
log.Info("Waiting for upgrade", zap.Int64("upgradeHeight", upgradeHeight), zap.Int64("currentHeight", infoWaitingBlockRes.Block.Header.Height))
err = retry.Do(retryCtx, time.Second, func() error {
requestCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
var err error
infoAfter, err = info(ctx, chain.ClientContext.Client())
infoAfterBlockRes, err := tmQueryClient.GetLatestBlock(requestCtx, &tmservice.GetLatestBlockRequest{})
if err != nil {
return retry.Retryable(err)
}
if infoAfter.LastBlockHeight >= upgradeHeight {
if infoAfterBlockRes.Block.Header.Height >= upgradeHeight {
return nil
}
return retry.Retryable(errors.Errorf("waiting for upgraded block %d, current block: %d", upgradeHeight, infoAfter.LastBlockHeight))
return retry.Retryable(errors.Errorf("waiting for upgraded block %d, current block: %d", upgradeHeight, infoAfterBlockRes.Block.Header.Height))
})
requireT.NoError(err)

Expand All @@ -112,16 +118,13 @@ func TestUpgrade(t *testing.T) {
requireT.NoError(err)
assert.Equal(t, upgradeHeight, appliedPlan.Height)

// Verify that node was restarted by cosmovisor and new version is running.
assert.Equal(t, infoBefore.Version+"-upgrade", infoAfter.Version)
}
log.Info(fmt.Sprintf("Upgrade passed, applied plan height: %d", appliedPlan.Height))

func info(ctx context.Context, client client.Client) (abci.ResponseInfo, error) {
requestCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
i, err := client.ABCIInfo(requestCtx)
if err != nil {
return abci.ResponseInfo{}, errors.WithStack(err)
}
return i.Response, nil
infoAfterRes, err := tmQueryClient.GetNodeInfo(ctx, &tmservice.GetNodeInfoRequest{})
requireT.NoError(err)

log.Info(fmt.Sprintf("New binary version: %s", infoAfterRes.ApplicationVersion.Version))

// Verify that node was restarted by cosmovisor and new version is running.
assert.Equal(t, infoBeforeRes.ApplicationVersion.Version+"-upgrade", infoAfterRes.ApplicationVersion.Version)
}
47 changes: 40 additions & 7 deletions pkg/client/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
grpctypes "github.com/cosmos/cosmos-sdk/types/grpc"
"github.com/cosmos/cosmos-sdk/types/module"
protobufgrpc "github.com/gogo/protobuf/grpc"
gogoproto "github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
abci "github.com/tendermint/tendermint/abci/types"
Expand Down Expand Up @@ -91,8 +92,9 @@ func NewContext(contextConfig ContextConfig, modules module.BasicManager) Contex
// Context exposes the functionality of SDK context in a way where we may intercept GRPC-related method (Invoke)
// to provide better implementation.
type Context struct {
config ContextConfig
clientCtx client.Context
config ContextConfig
clientCtx client.Context
grpcClient protobufgrpc.ClientConn
}

// ChainID returns chain ID.
Expand All @@ -116,13 +118,19 @@ func (c Context) GasPriceAdjustment() sdk.Dec {
return c.config.GasConfig.GasPriceAdjustment
}

// WithClient returns a copy of the context with an updated RPC client
// WithRPCClient returns a copy of the context with an updated RPC client
// instance.
func (c Context) WithClient(client rpcclient.Client) Context {
func (c Context) WithRPCClient(client rpcclient.Client) Context {
c.clientCtx = c.clientCtx.WithClient(client)
return c
}

// WithGRPCClient returns a copy of the context with an updated GRPCClient client.
func (c Context) WithGRPCClient(grpcClient protobufgrpc.ClientConn) Context {
c.grpcClient = grpcClient
return c
}

// WithBroadcastMode returns a copy of the context with an updated broadcast
// mode.
func (c Context) WithBroadcastMode(mode string) Context {
Expand Down Expand Up @@ -156,7 +164,15 @@ func (c Context) WithFeeGranterAddress(addr sdk.AccAddress) Context {

// NewStream implements the grpc ClientConn.NewStream method.
func (c Context) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
return nil, errors.New("streaming rpc not supported")
if c.RPCClient() != nil {
return nil, errors.New("streaming rpc not supported")
}

if c.GRPCClient() != nil {
return c.grpcClient.NewStream(ctx, desc, method, opts...)
}

return nil, errors.New("neither RPC nor GRPC client is set")
}

// FeeGranterAddress returns the fee granter address from the context.
Expand All @@ -179,11 +195,16 @@ func (c Context) BroadcastMode() string {
return c.clientCtx.BroadcastMode
}

// Client returns RPC client.
func (c Context) Client() rpcclient.Client {
// RPCClient returns RPC client.
func (c Context) RPCClient() rpcclient.Client {
return c.clientCtx.Client
}

// GRPCClient returns GRPCClient client.
func (c Context) GRPCClient() protobufgrpc.ClientConn {
return c.grpcClient
}

// InterfaceRegistry returns interface registry of SDK context.
func (c Context) InterfaceRegistry() codectypes.InterfaceRegistry {
return c.clientCtx.InterfaceRegistry
Expand Down Expand Up @@ -343,6 +364,18 @@ func (c Context) PrintProto(toPrint gogoproto.Message) error {

// Invoke invokes GRPC method.
func (c Context) Invoke(ctx context.Context, method string, req, reply interface{}, opts ...grpc.CallOption) (err error) {
if c.RPCClient() != nil {
return c.invokeRPC(ctx, method, req, reply, opts)
}

if c.GRPCClient() != nil {
return c.GRPCClient().Invoke(ctx, method, req, reply, opts...)
}

return errors.New("neither RPC nor GRPC client is set")
}

func (c Context) invokeRPC(ctx context.Context, method string, req, reply interface{}, opts []grpc.CallOption) error {
if reflect.ValueOf(req).IsNil() {
return sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "request cannot be nil")
}
Expand Down
Loading