From caf3cff4b4c3f19b6e6e641b8c4c3e13a5882455 Mon Sep 17 00:00:00 2001 From: xiaodino Date: Sat, 16 Mar 2024 22:38:28 -0700 Subject: [PATCH] Update --- packages/relayer/indexer/handle_event.go | 7 +++--- packages/relayer/indexer/indexer.go | 27 ++++++++++++++++++------ 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/packages/relayer/indexer/handle_event.go b/packages/relayer/indexer/handle_event.go index d976ce3a93..8e048e3282 100644 --- a/packages/relayer/indexer/handle_event.go +++ b/packages/relayer/indexer/handle_event.go @@ -28,8 +28,9 @@ func (i *Indexer) handleEvent( chainID *big.Int, event *bridge.BridgeMessageSent, ) error { - // slog.Info("event found for msgHash", "msgHash", common.Hash(event.MsgHash).Hex(), "txHash", event.Raw.TxHash.Hex()) + slog.Info("event found for msgHash", "msgHash", common.Hash(event.MsgHash).Hex(), "txHash", event.Raw.TxHash.Hex()) // if the destinatio chain doesnt match, we dont process it in this indexer. + if new(big.Int).SetUint64(event.Message.DestChainId).Cmp(i.destChainId) != 0 { slog.Info("skipping event, wrong chainID", "messageDestChainID", @@ -92,8 +93,6 @@ func (i *Indexer) handleEvent( return errors.Wrap(err, "eventTypeAmountAndCanonicalTokenFromEvent(event)") } - // TODO(xiaodino): Change to batch query - // check if we have an existing event already. this is mostly likely only true // in the case of us crawling past blocks. existingEvent, err := i.eventRepo.FirstByEventAndMsgHash( @@ -142,7 +141,7 @@ func (i *Indexer) handleEvent( if i.watchMode == CrawlPastBlocks && eventStatus == existingEvent.Status { // If the status from contract matches the existing event status, // we can return early as this message has been processed as expected. - // slog.Info("crawler returning early", "eventStatus", eventStatus, "existingEvent.Status", existingEvent.Status) + slog.Info("crawler returning early", "eventStatus", eventStatus, "existingEvent.Status", existingEvent.Status) return nil } diff --git a/packages/relayer/indexer/indexer.go b/packages/relayer/indexer/indexer.go index cec315f60b..d41c937773 100644 --- a/packages/relayer/indexer/indexer.go +++ b/packages/relayer/indexer/indexer.go @@ -218,14 +218,24 @@ func (i *Indexer) Start() error { i.wg.Add(1) go func() { - if err := i.filter(i.ctx); err != nil { - slog.Error("error filtering blocks", "error", err.Error()) + if err := backoff.Retry(func() error { + err := i.filter(i.ctx) + if err != nil { + slog.Error("filter failed, will retry", "error", err) + } + return err + }, backoff.NewConstantBackOff(5*time.Second)); err != nil { + slog.Error("error after retrying filter with backoff", "error", err) } }() go func() { if err := backoff.Retry(func() error { - return scanBlocks(i.ctx, i.srcEthClient, i.srcChainId, i.wg) + err := scanBlocks(i.ctx, i.srcEthClient, i.srcChainId, i.wg) + if err != nil { + slog.Error("scanBlocks failed, will retry", "error", err) + } + return err }, backoff.NewConstantBackOff(5*time.Second)); err != nil { slog.Error("scan blocks backoff retry", "error", err) } @@ -233,9 +243,13 @@ func (i *Indexer) Start() error { go func() { if err := backoff.Retry(func() error { - return i.queue.Notify(i.ctx, i.wg) + err := i.queue.Notify(i.ctx, i.wg) + if err != nil { + slog.Error("i.queue.Notify failed, will retry", "error", err) + } + return err }, backoff.NewConstantBackOff(5*time.Second)); err != nil { - slog.Error("queue notify backoff retry", "error", err) + slog.Error("i.queue.Notify backoff retry", "error", err) } }() @@ -349,6 +363,8 @@ func (i *Indexer) filter(ctx context.Context) error { relayer.ErrorEvents.Inc() // log error but always return nil to keep other goroutines active slog.Error("error handling event", "err", err.Error()) + } else { + slog.Info("handled event successfully") } return nil @@ -364,7 +380,6 @@ func (i *Indexer) filter(ctx context.Context) error { // loop if err := i.handleNoEventsInBatch(ctx, i.srcChainId, int64(end)); err != nil { return errors.Wrap(err, "i.handleNoEventsInBatch") - } break