Skip to content

Commit

Permalink
Problem: no command to patch missing transactions between v0.7.0 and …
Browse files Browse the repository at this point in the history
…v0.7.1 (crypto-org-chain#513)

* Problem: no command to patch missing transactions between v0.7.0 and v0.7.1

Solution:
- add fix-unlucky-tx command to patch txs post v0.7.0 upgrade.

* fix lint

* add min-block-height safty guard

* Update cmd/cronosd/cmd/fix-unlucky-tx.go

* fix db open

* add integration tests

* fix integration test
  • Loading branch information
yihuang committed May 31, 2022
1 parent fa61791 commit 5de290b
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
### Improvements

- [cronos#489](https://github.com/crypto-org-chain/cronos/pull/489) Enable jemalloc memory allocator, and update rocksdb src to `v6.29.5`.
- [#513](https://github.com/crypto-org-chain/cronos/pull/513) Add `fix-unlucky-tx` command to patch txs post v0.7.0 upgrade.

*May 3, 2022*

Expand Down
223 changes: 223 additions & 0 deletions cmd/cronosd/cmd/fix-unlucky-tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package cmd

import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"os"
"strconv"
"strings"

"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/server"
"github.com/spf13/cobra"
abci "github.com/tendermint/tendermint/abci/types"
tmcfg "github.com/tendermint/tendermint/config"
tmnode "github.com/tendermint/tendermint/node"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer/sink/psql"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/state/txindex/kv"
tmstore "github.com/tendermint/tendermint/store"
evmtypes "github.com/tharsis/ethermint/x/evm/types"
)

const (
FlagMinBlockHeight = "min-block-height"

ExceedBlockGasLimitError = "out of gas in location: block gas meter; gasWanted:"
)

// FixUnluckyTxCmd update the tx execution result of false-failed tx in tendermint db
func FixUnluckyTxCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "fix-unlucky-tx [blocks-file]",
Short: "Fix tx execution result of false-failed tx after v0.7.0 upgrade, \"-\" means stdin.",
Long: "Fix tx execution result of false-failed tx after v0.7.0 upgrade.\nWARNING: don't use this command to patch blocks generated before v0.7.0 upgrade",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
ctx := server.GetServerContextFromCmd(cmd)
clientCtx := client.GetClientContextFromCmd(cmd)

minBlockHeight, err := cmd.Flags().GetInt(FlagMinBlockHeight)
if err != nil {
return err
}

chainID, err := cmd.Flags().GetString(flags.FlagChainID)
if err != nil {
return err
}

var blocksFile io.Reader
if args[0] == "-" {
blocksFile = os.Stdin
} else {
fp, err := os.Open(args[0])
if err != nil {
return err
}
defer fp.Close()
blocksFile = fp
}

tmDB, err := openTMDB(ctx.Config, chainID)
if err != nil {
return err
}

scanner := bufio.NewScanner(blocksFile)
for scanner.Scan() {
blockNumber, err := strconv.ParseInt(scanner.Text(), 10, 64)
if err != nil {
return err
}

if blockNumber < int64(minBlockHeight) {
return fmt.Errorf("block number is generated before v0.7.0 upgrade: %d", blockNumber)
}

// load results
blockResults, err := tmDB.stateStore.LoadABCIResponses(blockNumber)
if err != nil {
return err
}

// find unlucky tx
var txIndex int64
for i, txResult := range blockResults.DeliverTxs {
if TxExceedsBlockGasLimit(txResult) {
if len(txResult.Events) > 0 && txResult.Events[len(txResult.Events)-1].Type == evmtypes.TypeMsgEthereumTx {
// already patched
break
}

// load raw tx
blk := tmDB.blockStore.LoadBlock(blockNumber)
if blk == nil {
return fmt.Errorf("block not found: %d", blockNumber)
}

tx, err := clientCtx.TxConfig.TxDecoder()(blk.Txs[i])
if err != nil {
return err
}

txIndex++
for msgIndex, msg := range tx.GetMsgs() {
ethTxIndex := txIndex + int64(msgIndex)
ethTx, ok := msg.(*evmtypes.MsgEthereumTx)
if !ok {
continue
}
evt := abci.Event{
Type: evmtypes.TypeMsgEthereumTx,
Attributes: []abci.EventAttribute{
{Key: []byte(evmtypes.AttributeKeyEthereumTxHash), Value: []byte(ethTx.Hash), Index: true},
{Key: []byte(evmtypes.AttributeKeyTxIndex), Value: []byte(strconv.FormatInt(ethTxIndex, 10)), Index: true},
},
}
txResult.Events = append(txResult.Events, evt)
}
if err := tmDB.stateStore.SaveABCIResponses(blockNumber, blockResults); err != nil {
return err
}
if err := tmDB.txIndexer.Index(&abci.TxResult{
Height: blockNumber,
Index: uint32(i),
Tx: blk.Txs[i],
Result: *txResult,
}); err != nil {
return err
}
for _, msg := range tx.GetMsgs() {
fmt.Println("patched", blockNumber, msg.(*evmtypes.MsgEthereumTx).Hash)
}
break
} else if txResult.Code == 0 {
// find the correct tx index
for _, evt := range txResult.Events {
if evt.Type == evmtypes.TypeMsgEthereumTx {
for _, attr := range evt.Attributes {
if bytes.Equal(attr.Key, []byte(evmtypes.AttributeKeyTxIndex)) {
txIndex, err = strconv.ParseInt(string(attr.Value), 10, 64)
if err != nil {
return err
}
}
}
}
}
}
}
}

return nil
},
}
cmd.Flags().String(flags.FlagChainID, "cronosmainnet_25-1", "network chain ID, only useful for psql tx indexer backend")
cmd.Flags().Int(FlagMinBlockHeight, 2693800, "The block height v0.7.0 upgrade executed, will reject block heights smaller than this.")

return cmd
}

type tmDB struct {
blockStore *tmstore.BlockStore
stateStore sm.Store
txIndexer txindex.TxIndexer
}

func openTMDB(cfg *tmcfg.Config, chainID string) (*tmDB, error) {
// open tendermint db
tmdb, err := tmnode.DefaultDBProvider(&tmnode.DBContext{ID: "blockstore", Config: cfg})
if err != nil {
return nil, err
}
blockStore := tmstore.NewBlockStore(tmdb)

stateDB, err := tmnode.DefaultDBProvider(&tmnode.DBContext{ID: "state", Config: cfg})
if err != nil {
return nil, err
}
stateStore := sm.NewStore(stateDB)

txIndexer, err := newTxIndexer(cfg, chainID)
if err != nil {
return nil, err
}

return &tmDB{
blockStore, stateStore, txIndexer,
}, nil
}

func newTxIndexer(config *tmcfg.Config, chainID string) (txindex.TxIndexer, error) {
switch config.TxIndex.Indexer {
case "kv":
store, err := tmnode.DefaultDBProvider(&tmnode.DBContext{ID: "tx_index", Config: config})
if err != nil {
return nil, err
}

return kv.NewTxIndex(store), nil
case "psql":
if config.TxIndex.PsqlConn == "" {
return nil, errors.New(`no psql-conn is set for the "psql" indexer`)
}
es, err := psql.NewEventSink(config.TxIndex.PsqlConn, chainID)
if err != nil {
return nil, fmt.Errorf("creating psql indexer: %w", err)
}
return es.TxIndexer(), nil
default:
return nil, fmt.Errorf("unsupported tx indexer backend %s", config.TxIndex.Indexer)
}
}

// TxExceedsBlockGasLimit returns true if tx's execution exceeds block gas limit
func TxExceedsBlockGasLimit(result *abci.ResponseDeliverTx) bool {
return result.Code == 11 && strings.Contains(result.Log, ExceedBlockGasLimitError)
}
1 change: 1 addition & 0 deletions cmd/cronosd/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func initRootCmd(rootCmd *cobra.Command, encodingConfig params.EncodingConfig) {
queryCommand(),
txCommand(),
ethermintclient.KeyCommands(app.DefaultNodeHome),
FixUnluckyTxCmd(),
)

// add rosetta
Expand Down
19 changes: 19 additions & 0 deletions integration_tests/cosmoscli.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ def account(self, addr):
)
)

def txs(self, events: str):
"/tx_search"
return json.loads(
self.raw("query", "txs", events=events, output="json", node=self.node_rpc)
)

def total_supply(self):
return json.loads(
self.raw("query", "bank", "total", output="json", node=self.node_rpc)
Expand Down Expand Up @@ -1050,3 +1056,16 @@ def transfer_tokens(self, from_, to, amount, **kwargs):
**kwargs,
)
)

def fix_unlucky_tx(self, start_block, end_block):
with tempfile.NamedTemporaryFile("w") as fp:
for i in range(start_block, end_block + 1):
print(i, file=fp)
fp.flush()
output = self.raw(
"fix-unlucky-tx",
fp.name,
min_block_height=1,
home=self.data_dir,
).decode()
return [tuple(line.split()[1:]) for line in output.split("\n")]
41 changes: 38 additions & 3 deletions integration_tests/test_replay_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@

import pytest
import web3
from pystarport import ports
from web3._utils.method_formatters import receipt_formatter
from web3.datastructures import AttributeDict

from .network import setup_custom_cronos
from .utils import ADDRS, CONTRACTS, KEYS, deploy_contract, sign_transaction
from .utils import (
ADDRS,
CONTRACTS,
KEYS,
deploy_contract,
sign_transaction,
supervisorctl,
wait_for_port,
)


@pytest.fixture(scope="module")
Expand All @@ -19,6 +28,8 @@ def custom_cronos(tmp_path_factory):

def test_replay_block(custom_cronos):
w3: web3.Web3 = custom_cronos.w3
cli = custom_cronos.cosmos_cli()
begin_height = cli.block_height()
contract = deploy_contract(
w3,
CONTRACTS["TestMessageCall"],
Expand Down Expand Up @@ -71,17 +82,24 @@ def test_replay_block(custom_cronos):
assert "error" not in rsp, rsp["error"]
assert 2 == len(rsp["result"])

# gas used by the second tx
exp_gas_used2 = 758376

# check the replay receipts are the same
replay_receipts = [AttributeDict(receipt_formatter(item)) for item in rsp["result"]]
assert replay_receipts[0].gasUsed == replay_receipts[1].gasUsed == receipt1.gasUsed
assert replay_receipts[0].gasUsed == receipt1.gasUsed
assert replay_receipts[1].gasUsed == exp_gas_used2
assert replay_receipts[0].status == replay_receipts[1].status == receipt1.status
assert (
replay_receipts[0].logsBloom
== replay_receipts[1].logsBloom
== receipt1.logsBloom
)
assert replay_receipts[0].cumulativeGasUsed == receipt1.cumulativeGasUsed
assert replay_receipts[1].cumulativeGasUsed == receipt1.cumulativeGasUsed * 2
assert (
replay_receipts[1].cumulativeGasUsed
== receipt1.cumulativeGasUsed + exp_gas_used2
)

# check the postUpgrade mode
rsp = w3.provider.make_request(
Expand All @@ -92,3 +110,20 @@ def test_replay_block(custom_cronos):
replay_receipts = [AttributeDict(receipt_formatter(item)) for item in rsp["result"]]
assert replay_receipts[1].status == 0
assert replay_receipts[1].gasUsed == gas_limit

# patch the unlucky tx with the new cli command
# stop the node0
end_height = cli.block_height()
supervisorctl(custom_cronos.base_dir / "../tasks.ini", "stop", "cronos_777-1-node0")
cli = custom_cronos.cosmos_cli()
results = cli.fix_unlucky_tx(begin_height, end_height)
# the second tx is patched
assert results[0][1] == txhashes[1].hex()
# start the node0 again
supervisorctl(
custom_cronos.base_dir / "../tasks.ini", "start", "cronos_777-1-node0"
)
# wait for tm-rpc port
wait_for_port(ports.rpc_port(custom_cronos.base_port(0)))
# check the tx indexer
assert len(cli.txs(f"ethereum_tx.ethereumTxHash={txhashes[1].hex()}")["txs"]) == 1
12 changes: 12 additions & 0 deletions scripts/find-unlucky-txs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import json
import sys

result = json.load(sys.stdin)["result"]
for tx in result["txs_results"] or []:
if (
tx["code"] == 11
and "out of gas in location: block gas meter; gasWanted:" in tx["log"]
and not any(evt["type"] == "ethereum_tx" for evt in tx["events"])
):
print(result["height"])
break

0 comments on commit 5de290b

Please sign in to comment.