Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaodino committed Mar 17, 2024
1 parent 4a3dfa2 commit caf3cff
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 10 deletions.
7 changes: 3 additions & 4 deletions packages/relayer/indexer/handle_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ func (i *Indexer) handleEvent(
chainID *big.Int,
event *bridge.BridgeMessageSent,
) error {
// slog.Info("event found for msgHash", "msgHash", common.Hash(event.MsgHash).Hex(), "txHash", event.Raw.TxHash.Hex())
slog.Info("event found for msgHash", "msgHash", common.Hash(event.MsgHash).Hex(), "txHash", event.Raw.TxHash.Hex())
// if the destinatio chain doesnt match, we dont process it in this indexer.

if new(big.Int).SetUint64(event.Message.DestChainId).Cmp(i.destChainId) != 0 {
slog.Info("skipping event, wrong chainID",
"messageDestChainID",
Expand Down Expand Up @@ -92,8 +93,6 @@ func (i *Indexer) handleEvent(
return errors.Wrap(err, "eventTypeAmountAndCanonicalTokenFromEvent(event)")
}

// TODO(xiaodino): Change to batch query

// check if we have an existing event already. this is mostly likely only true
// in the case of us crawling past blocks.
existingEvent, err := i.eventRepo.FirstByEventAndMsgHash(
Expand Down Expand Up @@ -142,7 +141,7 @@ func (i *Indexer) handleEvent(
if i.watchMode == CrawlPastBlocks && eventStatus == existingEvent.Status {
// If the status from contract matches the existing event status,
// we can return early as this message has been processed as expected.
// slog.Info("crawler returning early", "eventStatus", eventStatus, "existingEvent.Status", existingEvent.Status)
slog.Info("crawler returning early", "eventStatus", eventStatus, "existingEvent.Status", existingEvent.Status)
return nil
}

Expand Down
27 changes: 21 additions & 6 deletions packages/relayer/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,24 +218,38 @@ func (i *Indexer) Start() error {
i.wg.Add(1)

go func() {
if err := i.filter(i.ctx); err != nil {
slog.Error("error filtering blocks", "error", err.Error())
if err := backoff.Retry(func() error {
err := i.filter(i.ctx)
if err != nil {
slog.Error("filter failed, will retry", "error", err)
}
return err
}, backoff.NewConstantBackOff(5*time.Second)); err != nil {
slog.Error("error after retrying filter with backoff", "error", err)
}
}()

go func() {
if err := backoff.Retry(func() error {
return scanBlocks(i.ctx, i.srcEthClient, i.srcChainId, i.wg)
err := scanBlocks(i.ctx, i.srcEthClient, i.srcChainId, i.wg)
if err != nil {
slog.Error("scanBlocks failed, will retry", "error", err)
}
return err
}, backoff.NewConstantBackOff(5*time.Second)); err != nil {
slog.Error("scan blocks backoff retry", "error", err)
}
}()

go func() {
if err := backoff.Retry(func() error {
return i.queue.Notify(i.ctx, i.wg)
err := i.queue.Notify(i.ctx, i.wg)
if err != nil {
slog.Error("i.queue.Notify failed, will retry", "error", err)
}
return err
}, backoff.NewConstantBackOff(5*time.Second)); err != nil {
slog.Error("queue notify backoff retry", "error", err)
slog.Error("i.queue.Notify backoff retry", "error", err)
}
}()

Expand Down Expand Up @@ -349,6 +363,8 @@ func (i *Indexer) filter(ctx context.Context) error {
relayer.ErrorEvents.Inc()
// log error but always return nil to keep other goroutines active
slog.Error("error handling event", "err", err.Error())
} else {
slog.Info("handled event successfully")
}

return nil
Expand All @@ -364,7 +380,6 @@ func (i *Indexer) filter(ctx context.Context) error {
// loop
if err := i.handleNoEventsInBatch(ctx, i.srcChainId, int64(end)); err != nil {
return errors.Wrap(err, "i.handleNoEventsInBatch")

}

break
Expand Down

0 comments on commit caf3cff

Please sign in to comment.