diff --git a/relayer/chains/cosmos/query.go b/relayer/chains/cosmos/query.go index c4690f6b3..5525d2c81 100644 --- a/relayer/chains/cosmos/query.go +++ b/relayer/chains/cosmos/query.go @@ -8,6 +8,7 @@ import ( "fmt" "strconv" "strings" + "sync" "time" abci "github.com/cometbft/cometbft/abci/types" @@ -52,14 +53,55 @@ func (cc *CosmosProvider) queryIBCMessages(ctx context.Context, log *zap.Logger, return nil, errors.New("limit must greater than 0") } - res, err := cc.RPCClient.TxSearch(ctx, query, true, &page, &limit, "") - if err != nil { - return nil, err - } - var ibcMsgs []ibcMessage + var eg errgroup.Group chainID := cc.ChainId() - for _, tx := range res.Txs { - ibcMsgs = append(ibcMsgs, ibcMessagesFromEvents(log, tx.TxResult.Events, chainID, 0, base64Encoded)...) + var ibcMsgs []ibcMessage + var mu sync.Mutex + + eg.Go(func() error { + res, err := cc.RPCClient.BlockSearch(ctx, query, &page, &limit, "") + if err != nil { + return err + } + + var nestedEg errgroup.Group + + for _, b := range res.Blocks { + b := b + nestedEg.Go(func() error { + block, err := cc.RPCClient.BlockResults(ctx, &b.Block.Height) + if err != nil { + return err + } + + mu.Lock() + defer mu.Unlock() + ibcMsgs = append(ibcMsgs, ibcMessagesFromEvents(log, block.BeginBlockEvents, chainID, 0, base64Encoded)...) + ibcMsgs = append(ibcMsgs, ibcMessagesFromEvents(log, block.EndBlockEvents, chainID, 0, base64Encoded)...) + + return nil + }) + } + return nestedEg.Wait() + }) + + eg.Go(func() error { + res, err := cc.RPCClient.TxSearch(ctx, query, true, &page, &limit, "") + if err != nil { + return err + } + + mu.Lock() + defer mu.Unlock() + for _, tx := range res.Txs { + ibcMsgs = append(ibcMsgs, ibcMessagesFromEvents(log, tx.TxResult.Events, chainID, 0, base64Encoded)...) + } + + return nil + }) + + if err := eg.Wait(); err != nil { + return nil, err } return ibcMsgs, nil diff --git a/relayer/processor/path_processor.go b/relayer/processor/path_processor.go index 317764d0a..3fb8f6b95 100644 --- a/relayer/processor/path_processor.go +++ b/relayer/processor/path_processor.go @@ -95,11 +95,7 @@ func NewPathProcessor( clientUpdateThresholdTime time.Duration, flushInterval time.Duration, ) *PathProcessor { - if flushInterval == 0 { - // "disable" periodic flushing by using a large value. - flushInterval = 200 * 24 * 365 * time.Hour - } - return &PathProcessor{ + pp := &PathProcessor{ log: log, pathEnd1: newPathEndRuntime(log, pathEnd1, metrics), pathEnd2: newPathEndRuntime(log, pathEnd2, metrics), @@ -109,10 +105,33 @@ func NewPathProcessor( flushInterval: flushInterval, metrics: metrics, } + if flushInterval == 0 { + pp.disablePeriodicFlush() + } + return pp +} + +// disablePeriodicFlush will "disable" periodic flushing by using a large value. +func (pp *PathProcessor) disablePeriodicFlush() { + pp.flushInterval = 200 * 24 * 365 * time.Hour } func (pp *PathProcessor) SetMessageLifecycle(messageLifecycle MessageLifecycle) { pp.messageLifecycle = messageLifecycle + if !pp.shouldFlush() { + // disable flushing when termination conditions are set, e.g. connection/channel handshakes + pp.disablePeriodicFlush() + } +} + +func (pp *PathProcessor) shouldFlush() bool { + if pp.messageLifecycle == nil { + return true + } + if _, ok := pp.messageLifecycle.(*FlushLifecycle); ok { + return true + } + return false } // TEST USE ONLY @@ -299,7 +318,7 @@ func (pp *PathProcessor) Run(ctx context.Context, cancel func()) { continue } - if !pp.initialFlushComplete { + if pp.shouldFlush() && !pp.initialFlushComplete { pp.flush(ctx) pp.initialFlushComplete = true } else if pp.shouldTerminateForFlushComplete(ctx, cancel) { diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index f588a44dc..15a4e896e 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -781,6 +781,9 @@ func queryPacketCommitments( for i, p := range c.Commitments { commitments[k][i] = p.Sequence } + sort.SliceStable(commitments[k], func(i, j int) bool { + return commitments[k][i] < commitments[k][j] + }) return nil } }