From 4d8d9d717c839ad856b4d85ba75d2a11611c3051 Mon Sep 17 00:00:00 2001 From: jeff <113397187+cyberhorsey@users.noreply.github.com> Date: Wed, 6 Mar 2024 02:18:01 -0800 Subject: [PATCH] feat(relayer): document indexer with comments, fix broken crawler logic, add new metrics (#16330) Co-authored-by: Roger <50648015+RogerLamTd@users.noreply.github.com> --- packages/relayer/indexer/config.go | 2 + .../indexer/detect_and_handle_reorg.go | 2 + .../indexer/handle_chain_data_synced_event.go | 2 + .../indexer/handle_message_sent_event.go | 2 + ...=> handle_message_status_changed_event.go} | 39 +--- .../indexer/handle_no_events_in_batch.go | 4 +- packages/relayer/indexer/indexer.go | 215 +++++++++++++----- packages/relayer/indexer/save_event_to_db.go | 1 + packages/relayer/indexer/scan_blocks.go | 2 + .../set_initial_processing_block_by_mode.go | 2 + packages/relayer/indexer/subscribe.go | 49 +++- .../relayer/pkg/encoding/hop_proof_test.go | 23 +- packages/relayer/processor/process_message.go | 2 +- packages/relayer/prometheus.go | 34 ++- 14 files changed, 259 insertions(+), 120 deletions(-) rename packages/relayer/indexer/{save_message_status_changed_events.go => handle_message_status_changed_event.go} (60%) diff --git a/packages/relayer/indexer/config.go b/packages/relayer/indexer/config.go index abc6f71a59..c8d7878c5b 100644 --- a/packages/relayer/indexer/config.go +++ b/packages/relayer/indexer/config.go @@ -12,6 +12,8 @@ import ( "gorm.io/gorm/logger" ) +// Config is a struct which should be created from the cli or environment variables, populated, +// and used to create a new Indexer. type Config struct { // address configs SrcBridgeAddress common.Address diff --git a/packages/relayer/indexer/detect_and_handle_reorg.go b/packages/relayer/indexer/detect_and_handle_reorg.go index 30d5efa35d..754b9745ec 100644 --- a/packages/relayer/indexer/detect_and_handle_reorg.go +++ b/packages/relayer/indexer/detect_and_handle_reorg.go @@ -8,6 +8,8 @@ import ( "log/slog" ) +// detectAndHandleReorg will look up an event in the database to see if we have seen this event +// before. If we have , we need to delete it before re-indexing. func (i *Indexer) detectAndHandleReorg(ctx context.Context, eventType string, msgHash string) error { // dont check on crawling past blocks, it will be a secondary indexer. // we expect to see duplicates in this mode. diff --git a/packages/relayer/indexer/handle_chain_data_synced_event.go b/packages/relayer/indexer/handle_chain_data_synced_event.go index 2c30d6662a..ef403a7aed 100644 --- a/packages/relayer/indexer/handle_chain_data_synced_event.go +++ b/packages/relayer/indexer/handle_chain_data_synced_event.go @@ -74,5 +74,7 @@ func (i *Indexer) handleChainDataSyncedEvent( slog.Info("chainDataSynced event saved") + relayer.ChainDataSyncedEventsIndexed.Inc() + return nil } diff --git a/packages/relayer/indexer/handle_message_sent_event.go b/packages/relayer/indexer/handle_message_sent_event.go index c4037922da..390eec897b 100644 --- a/packages/relayer/indexer/handle_message_sent_event.go +++ b/packages/relayer/indexer/handle_message_sent_event.go @@ -121,6 +121,8 @@ func (i *Indexer) handleMessageSentEvent( return errors.Wrap(err, "i.queue.Publish") } + relayer.MessageSentEventsIndexed.Inc() + return nil } diff --git a/packages/relayer/indexer/save_message_status_changed_events.go b/packages/relayer/indexer/handle_message_status_changed_event.go similarity index 60% rename from packages/relayer/indexer/save_message_status_changed_events.go rename to packages/relayer/indexer/handle_message_status_changed_event.go index ef8914b31e..36acbd49ce 100644 --- a/packages/relayer/indexer/save_message_status_changed_events.go +++ b/packages/relayer/indexer/handle_message_status_changed_event.go @@ -5,48 +5,13 @@ import ( "encoding/json" "math/big" - "log/slog" - "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" "github.com/taikoxyz/taiko-mono/packages/relayer" "github.com/taikoxyz/taiko-mono/packages/relayer/bindings/bridge" ) -func (i *Indexer) saveMessageStatusChangedEvents( - ctx context.Context, - chainID *big.Int, - events *bridge.BridgeMessageStatusChangedIterator, -) error { - if !events.Next() || events.Event == nil { - slog.Info("no messageStatusChanged events") - return nil - } - - for { - event := events.Event - - slog.Info("messageStatusChanged", "msgHash", common.Hash(event.MsgHash).Hex()) - - if err := i.detectAndHandleReorg( - ctx, - relayer.EventNameMessageStatusChanged, - common.Hash(event.MsgHash).Hex(), - ); err != nil { - return errors.Wrap(err, "i.detectAndHandleReorg") - } - - if err := i.saveMessageStatusChangedEvent(ctx, chainID, event); err != nil { - return errors.Wrap(err, "i.saveMessageStatusChangedEvent") - } - - if !events.Next() { - return nil - } - } -} - -func (i *Indexer) saveMessageStatusChangedEvent( +func (i *Indexer) handleMessageStatusChangedEvent( ctx context.Context, chainID *big.Int, event *bridge.BridgeMessageStatusChanged, @@ -81,5 +46,7 @@ func (i *Indexer) saveMessageStatusChangedEvent( return errors.Wrap(err, "i.eventRepo.Save") } + relayer.MessageStatusChangedEventsIndexed.Inc() + return nil } diff --git a/packages/relayer/indexer/handle_no_events_in_batch.go b/packages/relayer/indexer/handle_no_events_in_batch.go index 73f0440738..8dbd622a8e 100644 --- a/packages/relayer/indexer/handle_no_events_in_batch.go +++ b/packages/relayer/indexer/handle_no_events_in_batch.go @@ -10,8 +10,8 @@ import ( "github.com/taikoxyz/taiko-mono/packages/relayer" ) -// handleNoEventsInBatch is used when an entire batch call has no events in the entire response, -// and we need to update the latest block processed +// handleNoEventsInBatch is used when there are no events remaining in a batch, or +// the batch itself contained no events. func (i *Indexer) handleNoEventsInBatch( ctx context.Context, chainID *big.Int, diff --git a/packages/relayer/indexer/indexer.go b/packages/relayer/indexer/indexer.go index 390568c212..2447eefe0e 100644 --- a/packages/relayer/indexer/indexer.go +++ b/packages/relayer/indexer/indexer.go @@ -15,6 +15,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" "github.com/taikoxyz/taiko-mono/packages/relayer" "github.com/taikoxyz/taiko-mono/packages/relayer/bindings/bridge" @@ -31,24 +32,38 @@ var ( ZeroAddress = common.HexToAddress("0x0000000000000000000000000000000000000000") ) +// WatchMode is a type that determines how the indexer will operate. type WatchMode string var ( - Filter WatchMode = "filter" - Subscribe WatchMode = "subscribe" + // Filter will filter past blocks, but when catches up to latest block, + // will stop. + Filter WatchMode = "filter" + // Subscribe ignores all past blocks, only subscibes to new events from latest block. + Subscribe WatchMode = "subscribe" + // FilterAndSubscribe filters up til latest block, then subscribes to new events. This is the + // default mode. FilterAndSubscribe WatchMode = "filter-and-subscribe" - CrawlPastBlocks WatchMode = "crawl-past-blocks" - WatchModes = []WatchMode{Filter, Subscribe, FilterAndSubscribe, CrawlPastBlocks} + // CrawlPastBlocks filters through the past N blocks on a loop, when it reaches `latestBlock - N`, + // it will recursively start the loop again, filtering for missed events, or ones the + // processor failed to process. + CrawlPastBlocks WatchMode = "crawl-past-blocks" + WatchModes = []WatchMode{Filter, Subscribe, FilterAndSubscribe, CrawlPastBlocks} ) +// SyncMode is a type which determines how the indexer will start indexing. type SyncMode string var ( - Sync SyncMode = "sync" + // Sync grabs the latest processed block in the DB and starts from there. + Sync SyncMode = "sync" + // Resync starts from genesis, ignoring the database. Resync SyncMode = "resync" Modes = []SyncMode{Sync, Resync} ) +// ethClient is a local interface that lets us narrow the large ethclient.Client type +// from go-ethereum down to a mockable interface for testing. type ethClient interface { ChainID(ctx context.Context) (*big.Int, error) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) @@ -57,11 +72,19 @@ type ethClient interface { TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) } +// DB is a local interface that lets us narrow down a database type for testing. type DB interface { DB() (*sql.DB, error) GormDB() *gorm.DB } +// Indexer is the main struct of this package, containing all dependencies necessary for indexing +// relayer-related chain data. All database repositories, contract implementations, +// and configurations will be injected here. +// An indexer should be configured and deployed for all possible combinations of bridging. +// IE: an indexer for L1-L2, and another for L2-L1. an L1-L2 indexer will have the L1 configurations +// as its source, and vice versa for the L2-L1 indexer. They will add messages to a queue +// specifically for a processor of the same configuration. type Indexer struct { eventRepo relayer.EventRepository blockRepo relayer.BlockRepository @@ -103,6 +126,7 @@ type Indexer struct { eventName string } +// InitFromCli inits a new Indexer from command line or environment variables. func (i *Indexer) InitFromCli(ctx context.Context, c *cli.Context) error { cfg, err := NewConfigFromCliContext(c) if err != nil { @@ -112,6 +136,7 @@ func (i *Indexer) InitFromCli(ctx context.Context, c *cli.Context) error { return InitFromConfig(ctx, i, cfg) } +// InitFromConfig inits a new Indexer from a provided Config struct func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) (err error) { db, err := cfg.OpenDBFunc() if err != nil { @@ -153,6 +178,7 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) (err error) { return errors.Wrap(err, "bridge.NewBridge") } + // taikoL1 will only be set when initializing a L1 - L2 indexer var taikoL1 *taikol1.TaikoL1 if cfg.SrcTaikoAddress != ZeroAddress { taikoL1, err = taikol1.NewTaikoL1(cfg.SrcTaikoAddress, srcEthClient) @@ -215,14 +241,19 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) (err error) { return nil } +// Name implements the SubcommandAction interface func (i *Indexer) Name() string { return "indexer" } +// Close waits for the wait groups internally to be stopped ,which will be done when the +// context is stopped externally by cmd/main.go shutdown. func (i *Indexer) Close(ctx context.Context) { i.wg.Wait() } +// Start starts the indexer, which should initialize the queue, add to wait groups, +// and start filtering or subscribing depending on the WatchMode provided. // nolint: funlen func (i *Indexer) Start() error { i.ctx = context.Background() @@ -262,8 +293,10 @@ func (i *Indexer) Start() error { return nil } +// filter is the main function run by Start in the indexer, which should filter on a loop, +// then if desired to subscribe, start subscriptions to events when done filtering. func (i *Indexer) filter(ctx context.Context) error { - // if subscribing to new events, skip filtering and subscribe + // if subscribing to new events, skip filtering and subscribe only. if i.watchMode == Subscribe { return i.subscribe(ctx, i.srcChainId, i.destChainId) } @@ -275,31 +308,49 @@ func (i *Indexer) filter(ctx context.Context) error { syncMode = Resync } + // set the initial processing block, which will vary by sync mode. if err := i.setInitialProcessingBlockByMode(ctx, syncMode, i.srcChainId); err != nil { return errors.Wrap(err, "i.setInitialProcessingBlockByMode") } + // get the latest header header, err := i.srcEthClient.HeaderByNumber(ctx, nil) if err != nil { return errors.Wrap(err, "i.srcEthClient.HeaderByNumber") } + // compare it to the processing block height, set above. if its equal to the latest block, + // we dont actually need to filter - we should just subscribe, given our watch mode is not "Filter". if i.processingBlockHeight == header.Number.Uint64() { + if i.watchMode == Filter { + slog.Info("indexing caught up", "chainID", i.srcChainId.Uint64()) + + return nil + } + slog.Info("indexing caught up, subscribing to new incoming events", "chainID", i.srcChainId.Uint64()) + return i.subscribe(ctx, i.srcChainId, i.destChainId) } + // the end block is the latest header. endBlockID := header.Number.Uint64() - // ignore latest N blocks, they are probably in queue already - // and are not "missed". + // ignore latest N blocks if we are crawling past blocks, they are probably in queue already + // and are not "missed", have just not been processed. if i.watchMode == CrawlPastBlocks { + // if targetBlockNumber is not nil, we are just going to process a singular block. if i.targetBlockNumber != nil { slog.Info("targetBlockNumber is set", "targetBlockNumber", *i.targetBlockNumber) + i.processingBlockHeight = *i.targetBlockNumber + endBlockID = i.processingBlockHeight + 1 - } else { - endBlockID = i.numLatestBlocksToIgnoreWhenCrawling + } else if endBlockID > i.numLatestBlocksToIgnoreWhenCrawling { + // otherwise, we need to set the endBlockID as the greater of the two: + // either the endBlockID minus the number of latest blocks to ignore, + // or endBlockID. + endBlockID -= i.numLatestBlocksToIgnoreWhenCrawling } } @@ -311,6 +362,8 @@ func (i *Indexer) filter(ctx context.Context) error { "watchMode", i.watchMode, ) + // iterate through from the starting block (i.processingBlockHeight) through the + // latest block (endBlockID) in batches of i.blockBatchSize until we are finished. for j := i.processingBlockHeight; j < endBlockID; j += i.blockBatchSize { end := i.processingBlockHeight + i.blockBatchSize // if the end of the batch is greater than the latest block number, set end @@ -337,18 +390,29 @@ func (i *Indexer) filter(ctx context.Context) error { if err := i.indexMessageSentEvents(ctx, filterOpts); err != nil { return errors.Wrap(err, "i.indexMessageSentEvents") } + + // we dont want to watch for message status changed events + // when crawling past blocks on a loop. but otherwise, + // we want to index all three event types when indexing MessageSent events, + // since they are related. + if i.watchMode != CrawlPastBlocks { + if err := i.indexMessageStatusChangedEvents(ctx, filterOpts); err != nil { + return errors.Wrap(err, "i.indexMessageStatusChangedEvents") + } + + // we also want to index chain data synced events. + if err := i.indexChainDataSyncedEvents(ctx, filterOpts); err != nil { + return errors.Wrap(err, "i.indexChainDataSyncedEvents") + } + } case relayer.EventNameMessageReceived: if err := i.indexMessageReceivedEvents(ctx, filterOpts); err != nil { return errors.Wrap(err, "i.indexMessageReceivedEvents") } - case relayer.EventNameChainDataSynced: - if err := i.indexChainDataSyncedEvents(ctx, filterOpts); err != nil { - return errors.Wrap(err, "i.indexChainDataSyncedEvents") - } } - // handle no events remaining, saving the processing block and restarting the for - // loop + // handle no events remaining, saving the processing block and continuing on + // to the next batch. if err := i.handleNoEventsInBatch(ctx, i.srcChainId, int64(end)); err != nil { return errors.Wrap(err, "i.handleNoEventsInBatch") } @@ -358,6 +422,8 @@ func (i *Indexer) filter(ctx context.Context) error { "indexer fully caught up", ) + // if we are crawling past blocks, we dont want to continue, we want to repeat the loop above + // recursively. if i.watchMode == CrawlPastBlocks { slog.Info("restarting filtering from genesis") return i.filter(ctx) @@ -370,57 +436,34 @@ func (i *Indexer) filter(ctx context.Context) error { return errors.Wrap(err, "i.srcEthClient.HeaderByNumber") } - latestBlockIDToCompare := latestBlock.Number.Uint64() - - if i.watchMode == CrawlPastBlocks && latestBlockIDToCompare > i.numLatestBlocksToIgnoreWhenCrawling { - latestBlockIDToCompare -= i.numLatestBlocksToIgnoreWhenCrawling - } - - if i.processingBlockHeight < latestBlockIDToCompare { + if i.processingBlockHeight < latestBlock.Number.Uint64() { slog.Info("header has advanced", "processingBlockHeight", i.processingBlockHeight, - "latestBlock", latestBlockIDToCompare, + "latestBlock", latestBlock.Number.Uint64(), ) return i.filter(ctx) } - // we are caught up and specified not to subscribe, we can return now + // we are caught up and specified not to subscribe, we can return now and the indexer + // is finished it's job. if i.watchMode == Filter { return nil } + // otherwise, we subscribe to new events + slog.Info("processing is caught up to latest block, subscribing to new blocks") return i.subscribe(ctx, i.srcChainId, i.destChainId) } +// indexMessageSentEvents indexes `MessageSent` events on the bridge contract +// and stores them to the database, and adds the message to the queue if it has not been +// seen before. func (i *Indexer) indexMessageSentEvents(ctx context.Context, - filterOpts *bind.FilterOpts, -) error { - // we dont want to watch for message status changed events - // when crawling past blocks on a loop. - if i.watchMode != CrawlPastBlocks { - messageStatusChangedEvents, err := i.bridge.FilterMessageStatusChanged(filterOpts, nil) - if err != nil { - return errors.Wrap(err, "bridge.FilterMessageStatusChanged") - } - - // we don't need to do anything with msgStatus events except save them to the DB. - // we don't need to process them. they are for exposing via the API. - - err = i.saveMessageStatusChangedEvents(ctx, i.srcChainId, messageStatusChangedEvents) - if err != nil { - return errors.Wrap(err, "bridge.saveMessageStatusChangedEvents") - } - - // we also want to index chain data synced events. - if err := i.indexChainDataSyncedEvents(ctx, filterOpts); err != nil { - return errors.Wrap(err, "i.indexChainDataSyncedEvents") - } - } - - messageSentEvents, err := i.bridge.FilterMessageSent(filterOpts, nil) + filterOpts *bind.FilterOpts) error { + events, err := i.bridge.FilterMessageSent(filterOpts, nil) if err != nil { return errors.Wrap(err, "bridge.FilterMessageSent") } @@ -428,8 +471,8 @@ func (i *Indexer) indexMessageSentEvents(ctx context.Context, group, groupCtx := errgroup.WithContext(ctx) group.SetLimit(i.numGoroutines) - for messageSentEvents.Next() { - event := messageSentEvents.Event + for events.Next() { + event := events.Event group.Go(func() error { err := i.handleMessageSentEvent(groupCtx, i.srcChainId, event, false) @@ -453,24 +496,27 @@ func (i *Indexer) indexMessageSentEvents(ctx context.Context, return nil } +// indexMessageReceivedEvents indexes `MessageReceived` events on the bridge contract +// and stores them to the database, and adds the message to the queue if it has not been +// seen before. func (i *Indexer) indexMessageReceivedEvents(ctx context.Context, filterOpts *bind.FilterOpts, ) error { - messageSentEvents, err := i.bridge.FilterMessageReceived(filterOpts, nil) + events, err := i.bridge.FilterMessageReceived(filterOpts, nil) if err != nil { - return errors.Wrap(err, "bridge.FilterMessageSent") + return errors.Wrap(err, "bridge.FilterMessageReceived") } group, groupCtx := errgroup.WithContext(ctx) group.SetLimit(i.numGoroutines) - for messageSentEvents.Next() { - event := messageSentEvents.Event + for events.Next() { + event := events.Event group.Go(func() error { err := i.handleMessageReceivedEvent(groupCtx, i.srcChainId, event, false) if err != nil { - relayer.ErrorEvents.Inc() + relayer.MessageReceivedEventsIndexingErrors.Inc() // log error but always return nil to keep other goroutines active slog.Error("error handling event", "err", err.Error()) } else { @@ -489,6 +535,50 @@ func (i *Indexer) indexMessageReceivedEvents(ctx context.Context, return nil } +// indexMessageStatusChangedEvents indexes `MessageStatusChanged` events on the bridge contract +// and stores them to the database. It does not add them to any queue. +func (i *Indexer) indexMessageStatusChangedEvents(ctx context.Context, + filterOpts *bind.FilterOpts) error { + slog.Info("indexing messageStatusChanged events") + + events, err := i.bridge.FilterMessageStatusChanged(filterOpts, nil) + if err != nil { + return errors.Wrap(err, "bridge.FilterMessageStatusChanged") + } + + group, groupCtx := errgroup.WithContext(ctx) + group.SetLimit(i.numGoroutines) + + for events.Next() { + event := events.Event + + group.Go(func() error { + err := i.handleMessageStatusChangedEvent(groupCtx, i.srcChainId, event) + if err != nil { + relayer.MessageStatusChangedEventsIndexingErrors.Inc() + // log error but always return nil to keep other goroutines active + slog.Error("error handling messageStatusChanged", "err", err.Error()) + } else { + slog.Info("handled messageStatusChanged event successfully") + } + + return nil + }) + } + + // wait for the last of the goroutines to finish + if err := group.Wait(); err != nil { + return errors.Wrap(err, "group.Wait") + } + + slog.Info("done indexing messageStatusChanged events") + + return nil +} + +// indexChainDataSyncedEvents indexes `ChainDataSynced` events on the bridge contract +// and stores them to the database. It does not add them to any queue. It only indexes +// the "STATE_ROOT" kind, not the "SIGNAL_ROOT" kind. func (i *Indexer) indexChainDataSyncedEvents(ctx context.Context, filterOpts *bind.FilterOpts, ) error { @@ -496,12 +586,12 @@ func (i *Indexer) indexChainDataSyncedEvents(ctx context.Context, chainDataSyncedEvents, err := i.signalService.FilterChainDataSynced( filterOpts, - []uint64{i.destChainId.Uint64()}, - nil, + []uint64{i.destChainId.Uint64()}, // only index intended events destination chain nil, + [][32]byte{crypto.Keccak256Hash([]byte("STATE_ROOT"))}, // only index state root ) if err != nil { - return errors.Wrap(err, "bridge.FilterMessageSent") + return errors.Wrap(err, "bridge.FilterChainDataSynced") } group, groupCtx := errgroup.WithContext(ctx) @@ -513,7 +603,8 @@ func (i *Indexer) indexChainDataSyncedEvents(ctx context.Context, group.Go(func() error { err := i.handleChainDataSyncedEvent(groupCtx, i.srcChainId, event, false) if err != nil { - relayer.ErrorEvents.Inc() + relayer.MessageStatusChangedEventsIndexingErrors.Inc() + // log error but always return nil to keep other goroutines active slog.Error("error handling chainDataSynced", "err", err.Error()) } else { @@ -534,6 +625,8 @@ func (i *Indexer) indexChainDataSyncedEvents(ctx context.Context, return nil } +// queueName builds out the name of a queue, in the format the processor will also +// use to listen to events. func (i *Indexer) queueName() string { return fmt.Sprintf("%v-%v-%v-queue", i.srcChainId.String(), i.destChainId.String(), i.eventName) } diff --git a/packages/relayer/indexer/save_event_to_db.go b/packages/relayer/indexer/save_event_to_db.go index 644e565660..c7226e5ca1 100644 --- a/packages/relayer/indexer/save_event_to_db.go +++ b/packages/relayer/indexer/save_event_to_db.go @@ -9,6 +9,7 @@ import ( "github.com/taikoxyz/taiko-mono/packages/relayer" ) +// saveEventToDB is used to save any type of event to the database func (i *Indexer) saveEventToDB( ctx context.Context, marshalledEvent []byte, diff --git a/packages/relayer/indexer/scan_blocks.go b/packages/relayer/indexer/scan_blocks.go index 3c331d3362..9624ca5aa1 100644 --- a/packages/relayer/indexer/scan_blocks.go +++ b/packages/relayer/indexer/scan_blocks.go @@ -9,6 +9,8 @@ import ( "github.com/taikoxyz/taiko-mono/packages/relayer" ) +// scanBlocks is used as a liveness to make sure we are seeing new blocks, unrelated +// to events coming in. func scanBlocks(ctx context.Context, ethClient ethClient, chainID *big.Int, wg *sync.WaitGroup) error { wg.Add(1) diff --git a/packages/relayer/indexer/set_initial_processing_block_by_mode.go b/packages/relayer/indexer/set_initial_processing_block_by_mode.go index 3b4f25121a..1039df72a2 100644 --- a/packages/relayer/indexer/set_initial_processing_block_by_mode.go +++ b/packages/relayer/indexer/set_initial_processing_block_by_mode.go @@ -8,6 +8,8 @@ import ( "github.com/taikoxyz/taiko-mono/packages/relayer" ) +// setInitialProcessingBlockByMode takes in a SyncMode and determines how we should +// start our indexing func (i *Indexer) setInitialProcessingBlockByMode( ctx context.Context, mode SyncMode, diff --git a/packages/relayer/indexer/subscribe.go b/packages/relayer/indexer/subscribe.go index b4ec3c1324..e140866899 100644 --- a/packages/relayer/indexer/subscribe.go +++ b/packages/relayer/indexer/subscribe.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/event" "github.com/pkg/errors" "github.com/taikoxyz/taiko-mono/packages/relayer" @@ -21,6 +22,8 @@ func (i *Indexer) subscribe(ctx context.Context, chainID *big.Int, destChainID * errChan := make(chan error) + // we want to subscribe to all 3 events related to MessageSent + // if thats our desired event. if i.eventName == relayer.EventNameMessageSent { go i.subscribeMessageSent(ctx, chainID, destChainID, errChan) @@ -28,6 +31,8 @@ func (i *Indexer) subscribe(ctx context.Context, chainID *big.Int, destChainID * go i.subscribeChainDataSynced(ctx, chainID, destChainID, errChan) } else if i.eventName == relayer.EventNameMessageReceived { + // otherwise, we are running as a watchdog for MessageReceived, and only + // care about that one. go i.subscribeMessageReceived(ctx, chainID, destChainID, errChan) } @@ -38,7 +43,7 @@ func (i *Indexer) subscribe(ctx context.Context, chainID *big.Int, destChainID * slog.Info("context finished") return nil case err := <-errChan: - slog.Info("error encountered durign subscription", "error", err) + slog.Info("error encountered during subscription", "error", err) relayer.ErrorsEncounteredDuringSubscription.Inc() @@ -79,10 +84,12 @@ func (i *Indexer) subscribeMessageSent( case event := <-sink: go func() { slog.Info("new message sent event", "msgHash", common.Hash(event.MsgHash).Hex(), "chainID", chainID.String()) - err := i.handleMessageSentEvent(ctx, chainID, event, true) - if err != nil { + if err := i.handleMessageSentEvent(ctx, chainID, event, true); err != nil { slog.Error("i.subscribe, i.handleMessageSentEvent", "error", err) + + relayer.MessageSentEventsIndexingErrors.Inc() + return } @@ -97,19 +104,24 @@ func (i *Indexer) subscribeMessageSent( ) if err != nil { slog.Error("i.subscribe, blockRepo.GetLatestBlockProcessedForEvent", "error", err) + + relayer.MessageSentEventsIndexingErrors.Inc() + return } if block.Height < event.Raw.BlockNumber { - err = i.blockRepo.Save(relayer.SaveBlockOpts{ + if err := i.blockRepo.Save(relayer.SaveBlockOpts{ Height: event.Raw.BlockNumber, Hash: event.Raw.BlockHash, ChainID: chainID, DestChainID: destChainID, EventName: relayer.EventNameMessageSent, - }) - if err != nil { + }); err != nil { slog.Error("i.subscribe, i.blockRepo.Save", "error", err) + + relayer.MessageSentEventsIndexingErrors.Inc() + return } @@ -155,7 +167,10 @@ func (i *Indexer) subscribeMessageReceived( err := i.handleMessageReceivedEvent(ctx, chainID, event, true) if err != nil { + relayer.MessageReceivedEventsIndexingErrors.Inc() + slog.Error("i.subscribe, i.handleMessageReceived", "error", err) + return } @@ -169,7 +184,10 @@ func (i *Indexer) subscribeMessageReceived( destChainID, ) if err != nil { + relayer.MessageReceivedEventsIndexingErrors.Inc() + slog.Error("i.subscribe, blockRepo.GetLatestBlockProcessedForEvent", "error", err) + return } @@ -182,7 +200,10 @@ func (i *Indexer) subscribeMessageReceived( EventName: relayer.EventNameMessageReceived, }) if err != nil { + relayer.MessageReceivedEventsIndexingErrors.Inc() + slog.Error("i.subscribe, i.blockRepo.Save", "error", err) + return } @@ -227,8 +248,10 @@ func (i *Indexer) subscribeMessageStatusChanged( "chainID", chainID.String(), ) - if err := i.saveMessageStatusChangedEvent(ctx, chainID, event); err != nil { - slog.Error("i.subscribe, i.saveMessageStatusChangedEvent", "error", err) + if err := i.handleMessageStatusChangedEvent(ctx, chainID, event); err != nil { + slog.Error("i.subscribe, i.handleMessageStatusChangedEvent", "error", err) + + relayer.MessageSentEventsIndexingErrors.Inc() } } } @@ -250,7 +273,10 @@ func (i *Indexer) subscribeChainDataSynced( return i.signalService.WatchChainDataSynced(&bind.WatchOpts{ Context: ctx, - }, sink, []uint64{destChainID.Uint64()}, nil, nil) + }, sink, + []uint64{destChainID.Uint64()}, + nil, + [][32]byte{crypto.Keccak256Hash([]byte("STATE_ROOT"))}) }) defer sub.Unsubscribe() @@ -272,10 +298,9 @@ func (i *Indexer) subscribeChainDataSynced( if err := i.handleChainDataSyncedEvent(ctx, i.srcChainId, event, true); err != nil { slog.Error("error handling chainDataSynced event", "error", err) - continue - } - slog.Info("chainDataSynced event saved") + relayer.ChainDataSyncedEventsIndexingErrors.Inc() + } } } } diff --git a/packages/relayer/pkg/encoding/hop_proof_test.go b/packages/relayer/pkg/encoding/hop_proof_test.go index dbf0ccee74..dd81d0bf68 100644 --- a/packages/relayer/pkg/encoding/hop_proof_test.go +++ b/packages/relayer/pkg/encoding/hop_proof_test.go @@ -1,27 +1,40 @@ package encoding import ( + "encoding/hex" "testing" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/crypto" "gopkg.in/go-playground/assert.v1" ) func Test_EncodeABISignalProof(t *testing.T) { + // nolint: lll + accProof1, _ := hex.DecodeString("f90211a0f776494ecffe03ad2af2426ee4fb5ae66c012c03ea3955d5b40a52fe7c6b8df6a0ef4a67411e4accae9bbb05bbcc17d036432de4e3ee0b0d02a04ee6a67db9c1a5a06c4c8f2b3206553ec83f1081a6742edff2f7b7830d3ba4166872c4f3f32bae0ba0aa2e8d96490616498fa142ba7c6a628e2839b9b182c9995992ecf1c1d346784fa0f8d7262d0abdf5d084338a640322c4dd45b1484a630331922afa60256917058aa0e49099e467972084cd70025874c6ae0a6841159aec02f39384de2d9ae7460986a02ad2a355fe840ae1bfe7843616d6bb027c1fd478918cc956e0ca7b61ec0044d0a061ec8e714d951772d74765ae997bd25f3982ea8a0c4dda8baa1e47adbd5a3975a039e5f7e8298126da1e7d6f42b1f20846d68ff40dc857b0dd251d2791c0345589a065f4d771a35d862a10b2a2da6d7d6c1e082cd59280cbab1c8e6aaf89069e72dca07851c1d2a801228a1adb24b61a5b4e1d4e12c5043a0af1b1790cd79b281c43a0a031acf4e7bd8cfb52c19cb512fb2578f416cbcf28cd3b4387d7b6107a17d4ff31a09e8def26ad9cf07bfbba2cf4a3fad0fe4075030b9a67acbecbfa5b7bdc8620cca07174a59e7f5b71a1a20f8c4a232100c1303378a7eb5dd523904d5305e20ee7dca0199ae8cd6d2b09717d59b5106ef31b451b30d777296b4afb43db9327117471eca09193465aefe93e209d05f892f77002dd5c2e74d4eed502ebae3acb7db1e33acc80") + + // nolint: lll + accProof2, _ := hex.DecodeString("f8f1a0b4df476cf7a24306eb6b96d0a8fe974b88e035d93e6716cf56e3f6bacee15c88808080a09265834cbd374b79cc4f97cdab16770f4434bce18ff9724970bff1c91d54f2f1a0e13b8847e8cfc7fd465a61a64b732d940d8f87ed65828e1004e46fb4256aba48a05a7b35dc9c135fc4df95c4b02179719a3505d201fb590212cf41c31eae1ceb6980a07c46a7878ac08b149792973e8a17982311f3c997624aed0fde76ba45d9a1ba41a0ebfe5b284e549d79c976f878d93266d238d9ea3254d0ef9c199d2c5a12067de48080a05d5e60bc5b531718ba65199b06b68087c640472ef4b3afb7e4b857280edbf2e580808080") + + // nolint: lll + sProof1, _ := hex.DecodeString("f90211a062880ba06fb396ad4e16f01d22ca7a1ae677e3fe428817928db30e2cae96b97ca0aa57a805600be2304ffdd527bd3dc9e91161233dc59afb71c1aab542eafe70caa03bc6c86a4c6b49305b95b3812a162f7e6bec891b287ef291e9c468071ef0c4ada08ac85ec9872d9e6f5b4c6e72154158df88d44163457cf0bbf2134e385c871a4ea0f35f3c83fbd9da738bbfea1bc87b073d3b64abdecb6294b61cf3eb535eabefdea0905c9b0e1755d389f306f026ccb71f8f7e54cd68720cc1c76682504eeb7bceaea06867477d77649657f698380e66844a7ed14818e8aad9f4ac5748963ede640e0aa0caa272deb3227cb8b0765a718ac85bbc7ee93a04bc0a2cb9c5509c9394470eb3a01689508cc26d870b0183c13bee39ed5342bf32464813f872d7ea4e5bc5f79845a0b578886ee673adcdf7b219cd13d9d641f8e15dd9ec6c9345435e7968bc6bcc82a0fbd86d32d6c60089268373be401211c3b606efeb550659b9e43458007dce2eb6a035d73d30ad77c85ef247ab8473f71314a9d175c1e9a0ce73a78a103a3766f54ca0c08386bed5af43c7cadb42d9df7ba3b94886f483e8a2e66aaef7391a15ab51cba002ce1e689b6193a6d3a8c822b0b0076dfdf732fd045a4dc122ec3879fe3de70ea0db27c27a802c40acbde50e5c357e5327a91242e6550fe461eec10ac136ddddcea0ad6d871b4c62042c68f0ecfdb986a24ea7d850563bbd3d27f6916bc3ddd170a480") + + // nolint: lll + sProof2, _ := hex.DecodeString("f90211a05c9b8f83e3c03e07271225e2ebce1cbe9e7db3b14d2724ec6efe9cf8fce6fc06a0dbd4cd41e027eefe208271111ea3e99cb39b4645e7e166d084d62f427a9313ada0cc65078735257beecceb9c74985901fa16e8e9fb228ce6aaa62aedb282a1795fa012f4c2ae88c8f0396048da6a095d0fa2c8b86398651cd37a72d68d88d25ff19ea037cda349771733bba3681eda450fee72f5e3dcbb6b8f2acf4a2bd145d0bfad6da0ef1359be1a9f658e580c968b92029dbf62ce7a56932c10acce28b25bf7206665a037d9790673a2be78a1555bee7d37ab10d1b8d94d1f12bb011b7cc7257bf13004a0dd9b4774c203afaaeb098ab623ce32f1df6f8ff0ac1bbcb78e358b7a242cd19aa0dde51d1f37baae98d02b2e35c81030f17407fc31304ab72cf999bb2c7e8abff3a0f8672c12a366e074d6f42c2c7b0c5cc010bc4ec703c65e3b58c4fbfee18e89c2a057ba424e40bd1c6a8e7d494703f392e834d8ca7696759e2c0216ebd18bcf662fa01eafd299e8a772c056e6919eeb67bf7e1098129855234e942cfc18aaf364d39ea0df6b60bdf553e1511f445fdcf1fb7aadc23bf390eeb11145c9e2742552c2ed6da02e79f5afb8c177c40737cea4aed39fe3c0269f5a8989e02c07a0135594b83bb1a035535dac85afa0e4848c0186cc8687bc7d2de0215b97ea43e65c8e4da0a52517a08ce682327123eb41b4d49ef283ffe11d1da1b9d7163e892b775a63dd31072ec080") + s := []HopProof{ { ChainID: 1, BlockID: 1, - RootHash: common.BigToHash(common.Big256), + RootHash: crypto.Keccak256Hash([]byte("ROOT_HASH")), CacheOption: 0, - AccountProof: [][]byte{{}}, - StorageProof: [][]byte{}, + AccountProof: [][]byte{accProof1, accProof2}, + StorageProof: [][]byte{sProof1, sProof2}, }, } // nolint: lll - want := "0x000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000c000000000000000000000000000000000000000000000000000000000000001200000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" + want := "0x00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000001da52c4b137f7596e26ee4d4d6f70b13c851cdbe59d4365e4b7aa16e8c893ca63000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000c000000000000000000000000000000000000000000000000000000000000004800000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000004000000000000000000000000000000000000000000000000000000000000002800000000000000000000000000000000000000000000000000000000000000214f90211a0f776494ecffe03ad2af2426ee4fb5ae66c012c03ea3955d5b40a52fe7c6b8df6a0ef4a67411e4accae9bbb05bbcc17d036432de4e3ee0b0d02a04ee6a67db9c1a5a06c4c8f2b3206553ec83f1081a6742edff2f7b7830d3ba4166872c4f3f32bae0ba0aa2e8d96490616498fa142ba7c6a628e2839b9b182c9995992ecf1c1d346784fa0f8d7262d0abdf5d084338a640322c4dd45b1484a630331922afa60256917058aa0e49099e467972084cd70025874c6ae0a6841159aec02f39384de2d9ae7460986a02ad2a355fe840ae1bfe7843616d6bb027c1fd478918cc956e0ca7b61ec0044d0a061ec8e714d951772d74765ae997bd25f3982ea8a0c4dda8baa1e47adbd5a3975a039e5f7e8298126da1e7d6f42b1f20846d68ff40dc857b0dd251d2791c0345589a065f4d771a35d862a10b2a2da6d7d6c1e082cd59280cbab1c8e6aaf89069e72dca07851c1d2a801228a1adb24b61a5b4e1d4e12c5043a0af1b1790cd79b281c43a0a031acf4e7bd8cfb52c19cb512fb2578f416cbcf28cd3b4387d7b6107a17d4ff31a09e8def26ad9cf07bfbba2cf4a3fad0fe4075030b9a67acbecbfa5b7bdc8620cca07174a59e7f5b71a1a20f8c4a232100c1303378a7eb5dd523904d5305e20ee7dca0199ae8cd6d2b09717d59b5106ef31b451b30d777296b4afb43db9327117471eca09193465aefe93e209d05f892f77002dd5c2e74d4eed502ebae3acb7db1e33acc8000000000000000000000000000000000000000000000000000000000000000000000000000000000000000f3f8f1a0b4df476cf7a24306eb6b96d0a8fe974b88e035d93e6716cf56e3f6bacee15c88808080a09265834cbd374b79cc4f97cdab16770f4434bce18ff9724970bff1c91d54f2f1a0e13b8847e8cfc7fd465a61a64b732d940d8f87ed65828e1004e46fb4256aba48a05a7b35dc9c135fc4df95c4b02179719a3505d201fb590212cf41c31eae1ceb6980a07c46a7878ac08b149792973e8a17982311f3c997624aed0fde76ba45d9a1ba41a0ebfe5b284e549d79c976f878d93266d238d9ea3254d0ef9c199d2c5a12067de48080a05d5e60bc5b531718ba65199b06b68087c640472ef4b3afb7e4b857280edbf2e580808080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000004000000000000000000000000000000000000000000000000000000000000002800000000000000000000000000000000000000000000000000000000000000214f90211a062880ba06fb396ad4e16f01d22ca7a1ae677e3fe428817928db30e2cae96b97ca0aa57a805600be2304ffdd527bd3dc9e91161233dc59afb71c1aab542eafe70caa03bc6c86a4c6b49305b95b3812a162f7e6bec891b287ef291e9c468071ef0c4ada08ac85ec9872d9e6f5b4c6e72154158df88d44163457cf0bbf2134e385c871a4ea0f35f3c83fbd9da738bbfea1bc87b073d3b64abdecb6294b61cf3eb535eabefdea0905c9b0e1755d389f306f026ccb71f8f7e54cd68720cc1c76682504eeb7bceaea06867477d77649657f698380e66844a7ed14818e8aad9f4ac5748963ede640e0aa0caa272deb3227cb8b0765a718ac85bbc7ee93a04bc0a2cb9c5509c9394470eb3a01689508cc26d870b0183c13bee39ed5342bf32464813f872d7ea4e5bc5f79845a0b578886ee673adcdf7b219cd13d9d641f8e15dd9ec6c9345435e7968bc6bcc82a0fbd86d32d6c60089268373be401211c3b606efeb550659b9e43458007dce2eb6a035d73d30ad77c85ef247ab8473f71314a9d175c1e9a0ce73a78a103a3766f54ca0c08386bed5af43c7cadb42d9df7ba3b94886f483e8a2e66aaef7391a15ab51cba002ce1e689b6193a6d3a8c822b0b0076dfdf732fd045a4dc122ec3879fe3de70ea0db27c27a802c40acbde50e5c357e5327a91242e6550fe461eec10ac136ddddcea0ad6d871b4c62042c68f0ecfdb986a24ea7d850563bbd3d27f6916bc3ddd170a4800000000000000000000000000000000000000000000000000000000000000000000000000000000000000214f90211a05c9b8f83e3c03e07271225e2ebce1cbe9e7db3b14d2724ec6efe9cf8fce6fc06a0dbd4cd41e027eefe208271111ea3e99cb39b4645e7e166d084d62f427a9313ada0cc65078735257beecceb9c74985901fa16e8e9fb228ce6aaa62aedb282a1795fa012f4c2ae88c8f0396048da6a095d0fa2c8b86398651cd37a72d68d88d25ff19ea037cda349771733bba3681eda450fee72f5e3dcbb6b8f2acf4a2bd145d0bfad6da0ef1359be1a9f658e580c968b92029dbf62ce7a56932c10acce28b25bf7206665a037d9790673a2be78a1555bee7d37ab10d1b8d94d1f12bb011b7cc7257bf13004a0dd9b4774c203afaaeb098ab623ce32f1df6f8ff0ac1bbcb78e358b7a242cd19aa0dde51d1f37baae98d02b2e35c81030f17407fc31304ab72cf999bb2c7e8abff3a0f8672c12a366e074d6f42c2c7b0c5cc010bc4ec703c65e3b58c4fbfee18e89c2a057ba424e40bd1c6a8e7d494703f392e834d8ca7696759e2c0216ebd18bcf662fa01eafd299e8a772c056e6919eeb67bf7e1098129855234e942cfc18aaf364d39ea0df6b60bdf553e1511f445fdcf1fb7aadc23bf390eeb11145c9e2742552c2ed6da02e79f5afb8c177c40737cea4aed39fe3c0269f5a8989e02c07a0135594b83bb1a035535dac85afa0e4848c0186cc8687bc7d2de0215b97ea43e65c8e4da0a52517a08ce682327123eb41b4d49ef283ffe11d1da1b9d7163e892b775a63dd31072ec080000000000000000000000000" proof, err := EncodeHopProofs(s) assert.Equal(t, nil, err) assert.Equal(t, hexutil.Encode(proof), want) diff --git a/packages/relayer/processor/process_message.go b/packages/relayer/processor/process_message.go index 24b00a861b..4964c6350b 100644 --- a/packages/relayer/processor/process_message.go +++ b/packages/relayer/processor/process_message.go @@ -231,7 +231,7 @@ func (p *Processor) sendProcessMessageAndWaitForReceipt( return nil, err } - relayer.EventsProcessed.Inc() + relayer.MessageSentEventsProcessed.Inc() ctx, cancel := context.WithTimeout(ctx, 4*time.Minute) diff --git a/packages/relayer/prometheus.go b/packages/relayer/prometheus.go index 1758109f55..d9bbd61bf0 100644 --- a/packages/relayer/prometheus.go +++ b/packages/relayer/prometheus.go @@ -6,9 +6,37 @@ import ( ) var ( - EventsProcessed = promauto.NewCounter(prometheus.CounterOpts{ - Name: "events_processed_ops_total", - Help: "The total number of processed events", + ChainDataSyncedEventsIndexed = promauto.NewCounter(prometheus.CounterOpts{ + Name: "chain_data_synced_events_indexed_ops_total", + Help: "The total number of ChainDataSynced indexed events", + }) + MessageSentEventsProcessed = promauto.NewCounter(prometheus.CounterOpts{ + Name: "message_sent_events_processed_ops_total", + Help: "The total number of MessageSent processed events", + }) + MessageSentEventsIndexed = promauto.NewCounter(prometheus.CounterOpts{ + Name: "message_sent_events_indexed_ops_total", + Help: "The total number of MessageSent indexed events", + }) + MessageSentEventsIndexingErrors = promauto.NewCounter(prometheus.CounterOpts{ + Name: "message_sent_events_indexing_errors_ops_total", + Help: "The total number of errors indexing MessageSent events", + }) + MessageReceivedEventsIndexingErrors = promauto.NewCounter(prometheus.CounterOpts{ + Name: "message_received_events_indexing_errors_ops_total", + Help: "The total number of errors indexing MessageReceived events", + }) + MessageStatusChangedEventsIndexed = promauto.NewCounter(prometheus.CounterOpts{ + Name: "message_status_changed_events_indexed_ops_total", + Help: "The total number of MessageStatusChanged indexed events", + }) + MessageStatusChangedEventsIndexingErrors = promauto.NewCounter(prometheus.CounterOpts{ + Name: "message_status_changed_events_indexing_errors_ops_total", + Help: "The total number of errors indexing MessageStatusChanged events", + }) + ChainDataSyncedEventsIndexingErrors = promauto.NewCounter(prometheus.CounterOpts{ + Name: "chain_data_synced_events_indexing_errors_ops_total", + Help: "The total number of errors indexing ChainDataSynced events", }) BlocksProcessed = promauto.NewCounter(prometheus.CounterOpts{ Name: "blocks_processed_ops_total",