From 7059b5ce2a1ca24a195d32c1a2c90b7e8006ecfb Mon Sep 17 00:00:00 2001 From: Dzmitry Hil Date: Mon, 6 Feb 2023 12:28:41 +0300 Subject: [PATCH] GRPC client for the client package. --- integration-tests/chain.go | 24 ++- integration-tests/gov.go | 8 +- integration-tests/init.go | 8 +- integration-tests/upgrade/upgrade_test.go | 49 +++--- pkg/client/context.go | 47 ++++- pkg/client/tx.go | 203 +++++++++++++--------- x/asset/nft/keeper/keeper.go | 4 +- 7 files changed, 218 insertions(+), 125 deletions(-) diff --git a/integration-tests/chain.go b/integration-tests/chain.go index 20196f7da..bbdd17982 100644 --- a/integration-tests/chain.go +++ b/integration-tests/chain.go @@ -2,6 +2,7 @@ package integrationtests import ( "reflect" + "strings" cosmosclient "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/flags" @@ -10,6 +11,7 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/google/uuid" "github.com/pkg/errors" + "google.golang.org/grpc" "github.com/CoreumFoundation/coreum/app" "github.com/CoreumFoundation/coreum/pkg/client" @@ -151,7 +153,7 @@ func (c ChainContext) ComputeNeededBalanceFromOptions(options BalancesOptions) s // ChainConfig defines the config arguments required for the test chain initialisation. type ChainConfig struct { - RPCAddress string + GRPCAddress string NetworkConfig config.NetworkConfig FundingMnemonic string StakerMnemonics []string @@ -166,16 +168,26 @@ type Chain struct { // NewChain creates an instance of the new Chain. func NewChain(cfg ChainConfig) Chain { - rpcClient, err := cosmosclient.NewClientFromNode(cfg.RPCAddress) - if err != nil { - panic(err) - } clientCtx := client.NewContext(client.DefaultContextConfig(), app.ModuleBasics). WithChainID(string(cfg.NetworkConfig.ChainID)). - WithClient(rpcClient). WithKeyring(newConcurrentSafeKeyring(keyring.NewInMemory())). WithBroadcastMode(flags.BroadcastBlock) + // TODO(dhil) remove switch once crust is updated + if strings.HasPrefix(cfg.GRPCAddress, "tcp") { + rpcClient, err := cosmosclient.NewClientFromNode(cfg.GRPCAddress) + if err != nil { + panic(err) + } + clientCtx = clientCtx.WithRPCClient(rpcClient) + } else { + grpcClient, err := grpc.Dial(cfg.GRPCAddress, grpc.WithInsecure()) + if err != nil { + panic(err) + } + clientCtx = clientCtx.WithGRPCClient(grpcClient) + } + chainCtx := NewChainContext(clientCtx, cfg.NetworkConfig) governance := NewGovernance(chainCtx, cfg.StakerMnemonics) diff --git a/integration-tests/gov.go b/integration-tests/gov.go index 6cb933040..ad01f52a9 100644 --- a/integration-tests/gov.go +++ b/integration-tests/gov.go @@ -5,6 +5,7 @@ import ( "time" "github.com/cosmos/cosmos-sdk/client/flags" + "github.com/cosmos/cosmos-sdk/client/grpc/tmservice" sdk "github.com/cosmos/cosmos-sdk/types" govtypes "github.com/cosmos/cosmos-sdk/x/gov/types" paramproposal "github.com/cosmos/cosmos-sdk/x/params/types/proposal" @@ -227,12 +228,13 @@ func (g Governance) WaitForVotingToFinalize(ctx context.Context, proposalID uint return proposal.Status, err } - block, err := g.chainCtx.ClientContext.Client().Block(ctx, nil) + tmQueryClient := tmservice.NewServiceClient(g.chainCtx.ClientContext) + blockRes, err := tmQueryClient.GetLatestBlock(ctx, &tmservice.GetLatestBlockRequest{}) if err != nil { return proposal.Status, errors.WithStack(err) } - if block.Block.Time.Before(proposal.VotingEndTime) { - waitCtx, waitCancel := context.WithTimeout(ctx, proposal.VotingEndTime.Sub(block.Block.Time)) + if blockRes.Block.Header.Time.Before(proposal.VotingEndTime) { + waitCtx, waitCancel := context.WithTimeout(ctx, proposal.VotingEndTime.Sub(blockRes.Block.Header.Time)) defer waitCancel() <-waitCtx.Done() diff --git a/integration-tests/init.go b/integration-tests/init.go index dba01fe5c..879bcb211 100644 --- a/integration-tests/init.go +++ b/integration-tests/init.go @@ -26,7 +26,7 @@ func (m *stringsFlag) Set(val string) error { } type testingConfig struct { - RPCAddress string + GRPCAddress string NetworkConfig config.NetworkConfig FundingMnemonic string StakerMnemonics []string @@ -45,7 +45,7 @@ func init() { stakerMnemonics stringsFlag ) - flag.StringVar(&coredAddress, "cored-address", "tcp://localhost:26657", "Address of cored node started by znet") + flag.StringVar(&coredAddress, "cored-address", "localhost:9090", "Address of cored node started by znet") flag.StringVar(&fundingMnemonic, "funding-mnemonic", "pitch basic bundle cause toe sound warm love town crucial divorce shell olympic convince scene middle garment glimpse narrow during fix fruit suffer honey", "Funding account mnemonic required by tests") flag.Var(&stakerMnemonics, "staker-mnemonic", "Staker account mnemonics required by tests, supports multiple") flag.StringVar(&logFormat, "log-format", string(logger.ToolDefaultConfig.Format), "Format of logs produced by tests") @@ -69,7 +69,7 @@ func init() { panic(fmt.Sprintf("can't create network config for the integration tests: %s", err)) } cfg = testingConfig{ - RPCAddress: coredAddress, + GRPCAddress: coredAddress, NetworkConfig: networkConfig, FundingMnemonic: fundingMnemonic, StakerMnemonics: stakerMnemonics, @@ -81,7 +81,7 @@ func init() { config.NewNetwork(cfg.NetworkConfig).SetSDKConfig() chain = NewChain(ChainConfig{ - RPCAddress: cfg.RPCAddress, + GRPCAddress: cfg.GRPCAddress, NetworkConfig: cfg.NetworkConfig, FundingMnemonic: cfg.FundingMnemonic, StakerMnemonics: cfg.StakerMnemonics, diff --git a/integration-tests/upgrade/upgrade_test.go b/integration-tests/upgrade/upgrade_test.go index a751da134..130ddd0e3 100644 --- a/integration-tests/upgrade/upgrade_test.go +++ b/integration-tests/upgrade/upgrade_test.go @@ -4,17 +4,17 @@ package upgrade import ( "context" + "fmt" "strings" "testing" "time" + "github.com/cosmos/cosmos-sdk/client/grpc/tmservice" govtypes "github.com/cosmos/cosmos-sdk/x/gov/types" upgradetypes "github.com/cosmos/cosmos-sdk/x/upgrade/types" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - abci "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/rpc/client" "go.uber.org/zap" "github.com/CoreumFoundation/coreum-tools/pkg/logger" @@ -35,10 +35,15 @@ func TestUpgrade(t *testing.T) { requireT.NoError(err) requireT.Nil(currentPlan.Plan) - infoBefore, err := info(ctx, chain.ClientContext.Client()) + tmQueryClient := tmservice.NewServiceClient(chain.ClientContext) + infoBeforeRes, err := tmQueryClient.GetNodeInfo(ctx, &tmservice.GetNodeInfoRequest{}) requireT.NoError(err) - require.False(t, strings.HasSuffix(infoBefore.Version, "-upgrade")) - upgradeHeight := infoBefore.LastBlockHeight + 30 + require.False(t, strings.HasSuffix(infoBeforeRes.ApplicationVersion.Version, "-upgrade")) + + latestBlockRes, err := tmQueryClient.GetLatestBlock(ctx, &tmservice.GetLatestBlockRequest{}) + requireT.NoError(err) + + upgradeHeight := latestBlockRes.Block.Header.Height + 30 // Create new proposer. proposer := chain.GenAccount() @@ -85,23 +90,24 @@ func TestUpgrade(t *testing.T) { assert.Equal(t, "upgrade", currentPlan.Plan.Name) assert.Equal(t, upgradeHeight, currentPlan.Plan.Height) - infoWaiting, err := info(ctx, chain.ClientContext.Client()) + infoWaitingBlockRes, err := tmQueryClient.GetLatestBlock(ctx, &tmservice.GetLatestBlockRequest{}) requireT.NoError(err) - log.Info("Waiting for upgrade", zap.Int64("upgradeHeight", upgradeHeight), zap.Int64("currentHeight", infoWaiting.LastBlockHeight)) - retryCtx, cancel := context.WithTimeout(ctx, 3*time.Second*time.Duration(upgradeHeight-infoWaiting.LastBlockHeight)) + retryCtx, cancel := context.WithTimeout(ctx, 3*time.Second*time.Duration(upgradeHeight-infoWaitingBlockRes.Block.Header.Height)) defer cancel() - var infoAfter abci.ResponseInfo + log.Info("Waiting for upgrade", zap.Int64("upgradeHeight", upgradeHeight), zap.Int64("currentHeight", infoWaitingBlockRes.Block.Header.Height)) err = retry.Do(retryCtx, time.Second, func() error { + requestCtx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() var err error - infoAfter, err = info(ctx, chain.ClientContext.Client()) + infoAfterBlockRes, err := tmQueryClient.GetLatestBlock(requestCtx, &tmservice.GetLatestBlockRequest{}) if err != nil { return retry.Retryable(err) } - if infoAfter.LastBlockHeight >= upgradeHeight { + if infoAfterBlockRes.Block.Header.Height >= upgradeHeight { return nil } - return retry.Retryable(errors.Errorf("waiting for upgraded block %d, current block: %d", upgradeHeight, infoAfter.LastBlockHeight)) + return retry.Retryable(errors.Errorf("waiting for upgraded block %d, current block: %d", upgradeHeight, infoAfterBlockRes.Block.Header.Height)) }) requireT.NoError(err) @@ -112,16 +118,13 @@ func TestUpgrade(t *testing.T) { requireT.NoError(err) assert.Equal(t, upgradeHeight, appliedPlan.Height) - // Verify that node was restarted by cosmovisor and new version is running. - assert.Equal(t, infoBefore.Version+"-upgrade", infoAfter.Version) -} + log.Info(fmt.Sprintf("Upgrade passed, applied plan height: %d", appliedPlan.Height)) -func info(ctx context.Context, client client.Client) (abci.ResponseInfo, error) { - requestCtx, cancel := context.WithTimeout(ctx, 2*time.Second) - defer cancel() - i, err := client.ABCIInfo(requestCtx) - if err != nil { - return abci.ResponseInfo{}, errors.WithStack(err) - } - return i.Response, nil + infoAfterRes, err := tmQueryClient.GetNodeInfo(ctx, &tmservice.GetNodeInfoRequest{}) + requireT.NoError(err) + + log.Info(fmt.Sprintf("New binary version: %s", infoAfterRes.ApplicationVersion.Version)) + + // Verify that node was restarted by cosmovisor and new version is running. + assert.Equal(t, infoBeforeRes.ApplicationVersion.Version+"-upgrade", infoAfterRes.ApplicationVersion.Version) } diff --git a/pkg/client/context.go b/pkg/client/context.go index 86788e249..ee9619b08 100644 --- a/pkg/client/context.go +++ b/pkg/client/context.go @@ -21,6 +21,7 @@ import ( sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" grpctypes "github.com/cosmos/cosmos-sdk/types/grpc" "github.com/cosmos/cosmos-sdk/types/module" + protobufgrpc "github.com/gogo/protobuf/grpc" gogoproto "github.com/gogo/protobuf/proto" "github.com/pkg/errors" abci "github.com/tendermint/tendermint/abci/types" @@ -91,8 +92,9 @@ func NewContext(contextConfig ContextConfig, modules module.BasicManager) Contex // Context exposes the functionality of SDK context in a way where we may intercept GRPC-related method (Invoke) // to provide better implementation. type Context struct { - config ContextConfig - clientCtx client.Context + config ContextConfig + clientCtx client.Context + grpcClient protobufgrpc.ClientConn } // ChainID returns chain ID. @@ -116,13 +118,19 @@ func (c Context) GasPriceAdjustment() sdk.Dec { return c.config.GasConfig.GasPriceAdjustment } -// WithClient returns a copy of the context with an updated RPC client +// WithRPCClient returns a copy of the context with an updated RPC client // instance. -func (c Context) WithClient(client rpcclient.Client) Context { +func (c Context) WithRPCClient(client rpcclient.Client) Context { c.clientCtx = c.clientCtx.WithClient(client) return c } +// WithGRPCClient returns a copy of the context with an updated GRPCClient client. +func (c Context) WithGRPCClient(grpcClient protobufgrpc.ClientConn) Context { + c.grpcClient = grpcClient + return c +} + // WithBroadcastMode returns a copy of the context with an updated broadcast // mode. func (c Context) WithBroadcastMode(mode string) Context { @@ -156,7 +164,15 @@ func (c Context) WithFeeGranterAddress(addr sdk.AccAddress) Context { // NewStream implements the grpc ClientConn.NewStream method. func (c Context) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { - return nil, errors.New("streaming rpc not supported") + if c.RPCClient() != nil { + return nil, errors.New("streaming rpc not supported") + } + + if c.GRPCClient() != nil { + return c.grpcClient.NewStream(ctx, desc, method, opts...) + } + + return nil, errors.New("neither RPC nor GRPC client is set") } // FeeGranterAddress returns the fee granter address from the context. @@ -179,11 +195,16 @@ func (c Context) BroadcastMode() string { return c.clientCtx.BroadcastMode } -// Client returns RPC client. -func (c Context) Client() rpcclient.Client { +// RPCClient returns RPC client. +func (c Context) RPCClient() rpcclient.Client { return c.clientCtx.Client } +// GRPCClient returns GRPCClient client. +func (c Context) GRPCClient() protobufgrpc.ClientConn { + return c.grpcClient +} + // InterfaceRegistry returns interface registry of SDK context. func (c Context) InterfaceRegistry() codectypes.InterfaceRegistry { return c.clientCtx.InterfaceRegistry @@ -343,6 +364,18 @@ func (c Context) PrintProto(toPrint gogoproto.Message) error { // Invoke invokes GRPC method. func (c Context) Invoke(ctx context.Context, method string, req, reply interface{}, opts ...grpc.CallOption) (err error) { + if c.RPCClient() != nil { + return c.invokeRPC(ctx, method, req, reply, opts) + } + + if c.GRPCClient() != nil { + return c.GRPCClient().Invoke(ctx, method, req, reply, opts...) + } + + return errors.New("neither RPC nor GRPC client is set") +} + +func (c Context) invokeRPC(ctx context.Context, method string, req, reply interface{}, opts []grpc.CallOption) error { if reflect.ValueOf(req).IsNil() { return sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "request cannot be nil") } diff --git a/pkg/client/tx.go b/pkg/client/tx.go index dd7b83a22..f1637b3b9 100644 --- a/pkg/client/tx.go +++ b/pkg/client/tx.go @@ -4,15 +4,15 @@ package client // Blocking broadcast mode was reimplemented to use polling instead of subscription to eliminate the case when // transaction execution is missed due to broken websocket connection. // For other broadcast modes we just call original Cosmos implementation. -// For more details check BroadcastRawTx & broadcastTxCommit. +// For more details check BroadcastRawTx & broadcastTxBlock. import ( "context" - "encoding/hex" "fmt" "strings" "github.com/cosmos/cosmos-sdk/client/flags" + "github.com/cosmos/cosmos-sdk/client/grpc/tmservice" "github.com/cosmos/cosmos-sdk/client/tx" sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" @@ -20,7 +20,6 @@ import ( authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" "github.com/pkg/errors" "github.com/tendermint/tendermint/mempool" - coretypes "github.com/tendermint/tendermint/rpc/core/types" tmtypes "github.com/tendermint/tendermint/types" "github.com/CoreumFoundation/coreum-tools/pkg/retry" @@ -123,78 +122,21 @@ func CalculateGas(ctx context.Context, clientCtx Context, txf Factory, msgs ...s // BroadcastRawTx broadcast the txBytes using the clientCtx and set BroadcastMode. func BroadcastRawTx(ctx context.Context, clientCtx Context, txBytes []byte) (*sdk.TxResponse, error) { - // broadcast to a Tendermint node switch clientCtx.BroadcastMode() { case flags.BroadcastSync: - res, err := clientCtx.Client().BroadcastTxSync(ctx, txBytes) - if err != nil { - return nil, err - } - - if res.Code != 0 { - return nil, errors.Wrapf(sdkerrors.ABCIError(res.Codespace, res.Code, res.Log), - "transaction '%s' failed", res.Hash.String()) - } - - return sdk.NewResponseFormatBroadcastTx(res), nil + return broadcastTxSync(ctx, clientCtx, txBytes) case flags.BroadcastAsync: - res, err := clientCtx.Client().BroadcastTxAsync(ctx, txBytes) - if err != nil { - return nil, err - } - return sdk.NewResponseFormatBroadcastTx(res), nil + return broadcastTxAsync(ctx, clientCtx, txBytes) case flags.BroadcastBlock: - return broadcastTxCommit(ctx, clientCtx, txBytes) + return broadcastTxBlock(ctx, clientCtx, txBytes) default: return nil, errors.Errorf("unsupported broadcast mode %s; supported types: sync, async, block", clientCtx.BroadcastMode()) } } -// broadcastTxCommit broadcasts encoded transaction, waits until it is included in a block. -func broadcastTxCommit(ctx context.Context, clientCtx Context, encodedTx []byte) (*sdk.TxResponse, error) { - requestCtx, cancel := context.WithTimeout(ctx, clientCtx.config.TimeoutConfig.RequestTimeout) - defer cancel() - - txHash := fmt.Sprintf("%X", tmtypes.Tx(encodedTx).Hash()) - res, err := clientCtx.Client().BroadcastTxSync(requestCtx, encodedTx) - if err != nil { - if errors.Is(err, requestCtx.Err()) { - return nil, errors.WithStack(err) - } - - if err := convertTendermintError(err); !sdkerrors.ErrTxInMempoolCache.Is(err) { - return nil, errors.WithStack(err) - } - } else if res.Code != 0 { - return nil, errors.Wrapf(sdkerrors.ABCIError(res.Codespace, res.Code, res.Log), - "transaction '%s' failed", txHash) - } - - awaitRes, err := AwaitTx(ctx, clientCtx, txHash) - if err != nil { - return nil, errors.WithStack(err) - } - - return sdk.NewResponseResultTx(awaitRes, nil, ""), nil -} - -func prepareFactory(ctx context.Context, clientCtx Context, txf tx.Factory) (tx.Factory, error) { - if txf.AccountNumber() == 0 && txf.Sequence() == 0 { - acc, err := GetAccountInfo(ctx, clientCtx, clientCtx.FromAddress()) - if err != nil { - return txf, err - } - txf = txf. - WithAccountNumber(acc.GetAccountNumber()). - WithSequence(acc.GetSequence()) - } - - return txf, nil -} - // GetAccountInfo returns account number and account sequence for provided address. func GetAccountInfo( ctx context.Context, @@ -218,17 +160,13 @@ func GetAccountInfo( return acc, nil } -// AwaitTx awaits until a signed transaction is included in a block, returning the result. +// AwaitTx waits until a signed transaction is included in a block, returning the result. func AwaitTx( ctx context.Context, clientCtx Context, txHash string, -) (resultTx *coretypes.ResultTx, err error) { - txHashBytes, err := hex.DecodeString(txHash) - if err != nil { - return nil, errors.Wrap(err, "tx hash is not a valid hex") - } - +) (txResponse *sdk.TxResponse, err error) { + txSvcClient := sdktx.NewServiceClient(clientCtx) timeoutCtx, cancel := context.WithTimeout(ctx, clientCtx.config.TimeoutConfig.TxTimeout) defer cancel() @@ -236,18 +174,20 @@ func AwaitTx( requestCtx, cancel := context.WithTimeout(ctx, clientCtx.config.TimeoutConfig.RequestTimeout) defer cancel() - var err error - resultTx, err = clientCtx.Client().Tx(requestCtx, txHashBytes, false) + res, err := txSvcClient.GetTx(requestCtx, &sdktx.GetTxRequest{ + Hash: txHash, + }) if err != nil { return retry.Retryable(errors.WithStack(err)) } - if resultTx.TxResult.Code != 0 { - res := resultTx.TxResult - return errors.Wrapf(sdkerrors.ABCIError(res.Codespace, res.Code, res.Log), "transaction '%s' failed", txHash) + txResponse = res.TxResponse + if txResponse.Code != 0 { + return errors.Wrapf(sdkerrors.ABCIError(txResponse.Codespace, txResponse.Code, txResponse.Logs.String()), + "transaction '%s' failed", txResponse.TxHash) } - if resultTx.Height == 0 { + if txResponse.Height == 0 { return retry.Retryable(errors.Errorf("transaction '%s' hasn't been included in a block yet", txHash)) } @@ -256,7 +196,7 @@ func AwaitTx( return nil, err } - return resultTx, nil + return txResponse, nil } // AwaitNextBlocks waits for next blocks. @@ -265,6 +205,7 @@ func AwaitNextBlocks( clientCtx Context, nextBlocks int64, ) error { + tmQueryClient := tmservice.NewServiceClient(clientCtx) timeoutCtx, cancel := context.WithTimeout(ctx, clientCtx.config.TimeoutConfig.TxNextBlocksTimeout) defer cancel() @@ -273,12 +214,12 @@ func AwaitNextBlocks( requestCtx, cancel := context.WithTimeout(ctx, clientCtx.config.TimeoutConfig.RequestTimeout) defer cancel() - res, err := clientCtx.Client().Status(requestCtx) + res, err := tmQueryClient.GetLatestBlock(requestCtx, &tmservice.GetLatestBlockRequest{}) if err != nil { return retry.Retryable(errors.WithStack(err)) } - currentHeight := res.SyncInfo.LatestBlockHeight + currentHeight := res.Block.Header.Height if heightToStart == 0 { heightToStart = currentHeight } @@ -306,6 +247,110 @@ func GetGasPrice( return res.GetMinGasPrice(), nil } +func broadcastTxAsync(ctx context.Context, clientCtx Context, txBytes []byte) (*sdk.TxResponse, error) { + requestCtx, cancel := context.WithTimeout(ctx, clientCtx.config.TimeoutConfig.RequestTimeout) + defer cancel() + + // rpc client + if clientCtx.RPCClient() != nil { + res, err := clientCtx.RPCClient().BroadcastTxAsync(requestCtx, txBytes) + if err != nil { + return nil, err + } + return sdk.NewResponseFormatBroadcastTx(res), nil + } + // grpc client + txSvcClient := sdktx.NewServiceClient(clientCtx) + res, err := txSvcClient.BroadcastTx(requestCtx, &sdktx.BroadcastTxRequest{ + TxBytes: txBytes, + Mode: sdktx.BroadcastMode_BROADCAST_MODE_ASYNC, + }) + if err != nil { + return nil, err + } + + return res.TxResponse, nil +} + +// broadcastTxBlock broadcasts encoded transaction, waits until it is included in a block. +func broadcastTxBlock(ctx context.Context, clientCtx Context, txBytes []byte) (*sdk.TxResponse, error) { + txRes, err := broadcastTxSync(ctx, clientCtx, txBytes) + if err != nil { + return nil, err + } + + awaitRes, err := AwaitTx(ctx, clientCtx, txRes.TxHash) + if err != nil { + return nil, errors.WithStack(err) + } + + return awaitRes, nil +} + +func broadcastTxSync(ctx context.Context, clientCtx Context, txBytes []byte) (*sdk.TxResponse, error) { + requestCtx, cancel := context.WithTimeout(ctx, clientCtx.config.TimeoutConfig.RequestTimeout) + defer cancel() + + // rpc client + txHash := fmt.Sprintf("%X", tmtypes.Tx(txBytes).Hash()) + if clientCtx.RPCClient() != nil { + res, err := clientCtx.RPCClient().BroadcastTxSync(requestCtx, txBytes) + if err != nil { + if err := processBroadcastBlockTxCommitError(requestCtx, err); err != nil { + return nil, err + } + } else if res.Code != 0 { + return nil, errors.Wrapf(sdkerrors.ABCIError(res.Codespace, res.Code, res.Log), + "transaction '%s' failed", txHash) + } + + return sdk.NewResponseFormatBroadcastTx(res), nil + } + + // grpc client + txSvcClient := sdktx.NewServiceClient(clientCtx) + res, err := txSvcClient.BroadcastTx(requestCtx, &sdktx.BroadcastTxRequest{ + TxBytes: txBytes, + Mode: sdktx.BroadcastMode_BROADCAST_MODE_SYNC, + }) + if err != nil { + if err := processBroadcastBlockTxCommitError(requestCtx, err); err != nil { + return nil, err + } + } else if res.TxResponse.Code != 0 { + return nil, errors.Wrapf(sdkerrors.ABCIError(res.TxResponse.Codespace, res.TxResponse.Code, res.TxResponse.Logs.String()), + "transaction '%s' failed", res.TxResponse.TxHash) + } + + return res.TxResponse, nil +} + +func processBroadcastBlockTxCommitError(ctx context.Context, err error) error { + if errors.Is(err, ctx.Err()) { + return errors.WithStack(err) + } + + if err := convertTendermintError(err); !sdkerrors.ErrTxInMempoolCache.Is(err) { + return errors.WithStack(err) + } + + return nil +} + +func prepareFactory(ctx context.Context, clientCtx Context, txf tx.Factory) (tx.Factory, error) { + if txf.AccountNumber() == 0 && txf.Sequence() == 0 { + acc, err := GetAccountInfo(ctx, clientCtx, clientCtx.FromAddress()) + if err != nil { + return txf, err + } + txf = txf. + WithAccountNumber(acc.GetAccountNumber()). + WithSequence(acc.GetSequence()) + } + + return txf, nil +} + // the idea behind this function is to map it similarly to how cosmos sdk does it in the link below // so the users can match against cosmos sdk error types. // https://github.com/cosmos/cosmos-sdk/blob/v0.45.2/client/broadcast.go#L49 diff --git a/x/asset/nft/keeper/keeper.go b/x/asset/nft/keeper/keeper.go index 561eb5f16..739948c4d 100644 --- a/x/asset/nft/keeper/keeper.go +++ b/x/asset/nft/keeper/keeper.go @@ -14,9 +14,7 @@ import ( "github.com/CoreumFoundation/coreum/x/nft" ) -var ( - frozenNFTStoreValue = []byte{0x01} -) +var frozenNFTStoreValue = []byte{0x01} // ParamSubspace represents a subscope of methods exposed by param module to store and retrieve parameters. type ParamSubspace interface {