Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: pending tx comes after get mined #488

Merged
merged 8 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (rpc) [#473](https://github.com/crypto-org-chain/ethermint/pull/473) Avoid panic on invalid elasticity_multiplier.
* (rpc) [#474](https://github.com/crypto-org-chain/ethermint/pull/474), [#476](https://github.com/crypto-org-chain/ethermint/pull/441) Align genesis related cmd.
* (rpc) [#480](https://github.com/crypto-org-chain/ethermint/pull/480), [#482](https://github.com/crypto-org-chain/ethermint/pull/482) Fix parsed logs from old events.
* (rpc) [#488](https://github.com/crypto-org-chain/ethermint/pull/488) Fix handling of pending transactions related APIs.

### Improvements

Expand Down
19 changes: 19 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ var (

type GenesisState map[string]json.RawMessage

type PendingTxListener func([]byte)

// var _ server.Application (*EthermintApp)(nil)

// EthermintApp implements an extended ABCI application. It is an application
Expand All @@ -205,6 +207,8 @@ type EthermintApp struct {

invCheckPeriod uint

pendingTxListeners []PendingTxListener

// keys to access the substores
keys map[string]*storetypes.KVStoreKey
tkeys map[string]*storetypes.TransientStoreKey
Expand Down Expand Up @@ -1063,6 +1067,21 @@ func (app *EthermintApp) GetStoreKey(name string) storetypes.StoreKey {
return app.okeys[name]
}

// RegisterPendingTxListener is used by json-rpc server to listen to pending transactions in CheckTx.
func (app *EthermintApp) RegisterPendingTxListener(listener PendingTxListener) {
app.pendingTxListeners = append(app.pendingTxListeners, listener)
}

func (app *EthermintApp) CheckTx(req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
mmsqe marked this conversation as resolved.
Show resolved Hide resolved
res, err := app.BaseApp.CheckTx(req)
if err == nil && res.Code == 0 && req.Type == abci.CheckTxType_New {
for _, listener := range app.pendingTxListeners {
listener(req.Tx)
}
}
return res, err
}

// RegisterSwaggerAPI registers swagger route with API Server
func RegisterSwaggerAPI(_ client.Context, rtr *mux.Router) {
root, err := fs.Sub(docs.SwaggerUI, "swagger-ui")
Expand Down
4 changes: 2 additions & 2 deletions rpc/namespaces/ethereum/eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
}

id := rpc.NewID()
_, offset := api.events.TxStream().ReadNonBlocking(-1)
_, offset := api.events.PendingTxStream().ReadNonBlocking(-1)
api.filters[id] = &filter{
typ: filters.PendingTransactionsSubscription,
deadline: time.NewTimer(deadline),
Expand Down Expand Up @@ -321,7 +321,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
switch f.typ {
case filters.PendingTransactionsSubscription:
var hashes []common.Hash
hashes, f.offset = api.events.TxStream().ReadAllNonBlocking(f.offset)
hashes, f.offset = api.events.PendingTxStream().ReadAllNonBlocking(f.offset)
return returnHashes(hashes), nil
case filters.BlocksSubscription:
var headers []stream.RPCHeader
Expand Down
77 changes: 32 additions & 45 deletions rpc/stream/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
)

var (
txEvents = tmtypes.QueryForEvent(tmtypes.EventTx).String()
evmEvents = cmtquery.MustCompile(fmt.Sprintf("%s='%s' AND %s.%s='%s'",
tmtypes.EventTypeKey,
tmtypes.EventTx,
Expand All @@ -51,22 +50,26 @@
logger log.Logger
txDecoder sdk.TxDecoder

headerStream *Stream[RPCHeader]
txStream *Stream[common.Hash]
logStream *Stream[*ethtypes.Log]
headerStream *Stream[RPCHeader]
pendingTxStream *Stream[common.Hash]
logStream *Stream[*ethtypes.Log]

wg sync.WaitGroup
}

func NewRPCStreams(evtClient rpcclient.EventsClient, logger log.Logger, txDecoder sdk.TxDecoder) (*RPCStream, error) {
func NewRPCStreams(
evtClient rpcclient.EventsClient,
logger log.Logger,
txDecoder sdk.TxDecoder,
) (*RPCStream, error) {
s := &RPCStream{
evtClient: evtClient,
logger: logger,
txDecoder: txDecoder,

headerStream: NewStream[RPCHeader](headerStreamSegmentSize, headerStreamCapacity),
txStream: NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity),
logStream: NewStream[*ethtypes.Log](logStreamSegmentSize, logStreamCapacity),
headerStream: NewStream[RPCHeader](headerStreamSegmentSize, headerStreamCapacity),
pendingTxStream: NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity),
logStream: NewStream[*ethtypes.Log](logStreamSegmentSize, logStreamCapacity),
}

ctx := context.Background()
Expand All @@ -76,14 +79,6 @@
return nil, err
}

chTx, err := s.evtClient.Subscribe(ctx, streamSubscriberName, txEvents, subscribBufferSize)
if err != nil {
if err := s.evtClient.UnsubscribeAll(ctx, streamSubscriberName); err != nil {
s.logger.Error("failed to unsubscribe", "err", err)
}
return nil, err
}

chLogs, err := s.evtClient.Subscribe(ctx, streamSubscriberName, evmEvents, subscribBufferSize)
if err != nil {
if err := s.evtClient.UnsubscribeAll(context.Background(), streamSubscriberName); err != nil {
Expand All @@ -92,7 +87,7 @@
return nil, err
}

go s.start(&s.wg, chBlocks, chTx, chLogs)
go s.start(&s.wg, chBlocks, chLogs)

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism

return s, nil
}
Expand All @@ -109,18 +104,34 @@
return s.headerStream
}

func (s *RPCStream) TxStream() *Stream[common.Hash] {
return s.txStream
func (s *RPCStream) PendingTxStream() *Stream[common.Hash] {
return s.pendingTxStream
}

func (s *RPCStream) LogStream() *Stream[*ethtypes.Log] {
return s.logStream
}

// ListenPendingTx is a callback passed to application to listen for pending transactions in CheckTx.
func (s *RPCStream) ListenPendingTx(bytes []byte) {
tx, err := s.txDecoder(bytes)
if err != nil {
s.logger.Error("fail to decode tx", "error", err.Error())
return
}

var hashes []common.Hash
for _, msg := range tx.GetMsgs() {
if ethTx, ok := msg.(*evmtypes.MsgEthereumTx); ok {
hashes = append(hashes, ethTx.AsTransaction().Hash())
}
}
s.pendingTxStream.Add(hashes...)
}

func (s *RPCStream) start(
wg *sync.WaitGroup,
chBlocks <-chan coretypes.ResultEvent,
chTx <-chan coretypes.ResultEvent,
chLogs <-chan coretypes.ResultEvent,
) {
wg.Add(1)
Expand Down Expand Up @@ -150,31 +161,7 @@
// TODO: fetch bloom from events
header := types.EthHeaderFromTendermint(data.Block.Header, ethtypes.Bloom{}, baseFee)
s.headerStream.Add(RPCHeader{EthHeader: header, Hash: common.BytesToHash(data.Block.Header.Hash())})
case ev, ok := <-chTx:
if !ok {
chTx = nil
break
}

data, ok := ev.Data.(tmtypes.EventDataTx)
if !ok {
s.logger.Error("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data))
continue
}

tx, err := s.txDecoder(data.Tx)
if err != nil {
s.logger.Error("fail to decode tx", "error", err.Error())
continue
}

var hashes []common.Hash
for _, msg := range tx.GetMsgs() {
if ethTx, ok := msg.(*evmtypes.MsgEthereumTx); ok {
hashes = append(hashes, ethTx.AsTransaction().Hash())
}
}
s.txStream.Add(hashes...)
case ev, ok := <-chLogs:
if !ok {
chLogs = nil
Expand All @@ -201,7 +188,7 @@
s.logStream.Add(txLogs...)
}

if chBlocks == nil && chTx == nil && chLogs == nil {
if chBlocks == nil && chLogs == nil {
break
}
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/websockets.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,32 +562,32 @@
func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn, subID rpc.ID) (context.CancelFunc, error) {
ctx, cancel := context.WithCancel(context.Background())
//nolint: errcheck
go api.events.TxStream().Subscribe(ctx, func(items []common.Hash, _ int) error {
go api.events.PendingTxStream().Subscribe(ctx, func(items []common.Hash, _ int) error {
for _, hash := range items {
// write to ws conn
res := &SubscriptionNotification{
Jsonrpc: "2.0",
Method: "eth_subscription",
Params: &SubscriptionResult{
Subscription: subID,
Result: hash,
},
}

err := wsConn.WriteJSON(res)
if err != nil {
api.logger.Debug("error writing header, will drop peer", "error", err.Error())

try(func() {
if err != websocket.ErrCloseSent {
_ = wsConn.Close()
}
}, api.logger, "closing websocket peer sub")
return err
}
}
return nil
})

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism

return cancel, nil
}
Expand Down
7 changes: 7 additions & 0 deletions server/json_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,17 @@ const (
MaxRetry = 6
)

type AppWithPendingTxStream interface {
RegisterPendingTxListener(listener func([]byte))
}

// StartJSONRPC starts the JSON-RPC server
func StartJSONRPC(srvCtx *server.Context,
clientCtx client.Context,
g *errgroup.Group,
config *config.Config,
indexer ethermint.EVMTxIndexer,
app AppWithPendingTxStream,
) (*http.Server, chan struct{}, error) {
logger := srvCtx.Logger.With("module", "geth")

Expand All @@ -69,6 +74,8 @@ func StartJSONRPC(srvCtx *server.Context,
return nil, nil, fmt.Errorf("failed to create rpc streams after %d attempts: %w", MaxRetry, err)
}

app.RegisterPendingTxListener(rpcStream.ListenPendingTx)

ethlog.Root().SetHandler(ethlog.FuncHandler(func(r *ethlog.Record) error {
switch r.Lvl {
case ethlog.LvlTrace, ethlog.LvlDebug:
Expand Down
10 changes: 8 additions & 2 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ func startInProcess(svrCtx *server.Context, clientCtx client.Context, opts Start
defer apiSrv.Close()
}

clientCtx, httpSrv, httpSrvDone, err := startJSONRPCServer(svrCtx, clientCtx, g, config, genDocProvider, idxer)
clientCtx, httpSrv, httpSrvDone, err := startJSONRPCServer(svrCtx, clientCtx, g, config, genDocProvider, idxer, app)
if httpSrv != nil {
defer func() {
shutdownCtx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down Expand Up @@ -655,20 +655,26 @@ func startJSONRPCServer(
config config.Config,
genDocProvider node.GenesisDocProvider,
idxer ethermint.EVMTxIndexer,
app types.Application,
) (ctx client.Context, httpSrv *http.Server, httpSrvDone chan struct{}, err error) {
ctx = clientCtx
if !config.JSONRPC.Enable {
return
}

txApp, ok := app.(AppWithPendingTxStream)
if !ok {
return ctx, httpSrv, httpSrvDone, fmt.Errorf("json-rpc server requires AppWithPendingTxStream")
}

genDoc, err := genDocProvider()
if err != nil {
return ctx, httpSrv, httpSrvDone, err
}

ctx = clientCtx.WithChainID(genDoc.ChainID)
g.Go(func() error {
httpSrv, httpSrvDone, err = StartJSONRPC(svrCtx, clientCtx, g, &config, idxer)
httpSrv, httpSrvDone, err = StartJSONRPC(svrCtx, clientCtx, g, &config, idxer, txApp)
return err
})
return
Expand Down
2 changes: 1 addition & 1 deletion testutil/network/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@
return fmt.Errorf("validator %s context is nil", val.Moniker)
}

val.jsonrpc, val.jsonrpcDone, err = server.StartJSONRPC(val.Ctx, val.ClientCtx, val.errGroup, val.AppConfig, nil)
val.jsonrpc, val.jsonrpcDone, err = server.StartJSONRPC(val.Ctx, val.ClientCtx, val.errGroup, val.AppConfig, nil, app.(server.AppWithPendingTxStream))

Check failure on line 146 in testutil/network/util.go

View workflow job for this annotation

GitHub Actions / Run golangci-lint

line is 152 characters (lll)
if err != nil {
return err
}
Expand Down
Loading