diff --git a/packages/relayer/indexer/indexer.go b/packages/relayer/indexer/indexer.go index eb789a91713..191a65b40f7 100644 --- a/packages/relayer/indexer/indexer.go +++ b/packages/relayer/indexer/indexer.go @@ -320,8 +320,8 @@ func (i *Indexer) filter(ctx context.Context) error { return errors.Wrap(err, "bridge.FilterMessageStatusChanged") } - // we dont need to do anything with msgStatus events except save them to the DB. - // we dont need to process them. they are for exposing via the API. + // we don't need to do anything with msgStatus events except save them to the DB. + // we don't need to process them. they are for exposing via the API. err = i.saveMessageStatusChangedEvents(ctx, i.srcChainId, messageStatusChangedEvents) if err != nil { @@ -333,21 +333,10 @@ func (i *Indexer) filter(ctx context.Context) error { return errors.Wrap(err, "bridge.FilterMessageSent") } - if !messageSentEvents.Next() || messageSentEvents.Event == nil { - // use "end" not "filterEnd" here, because it will be used as the start - // of the next batch. - if err := i.handleNoEventsInBatch(ctx, i.srcChainId, int64(end)); err != nil { - return errors.Wrap(err, "i.handleNoEventsInBatch") - } - - continue - } - group, groupCtx := errgroup.WithContext(ctx) - group.SetLimit(i.numGoroutines) - for { + for messageSentEvents.Next() { event := messageSentEvents.Event group.Go(func() error { @@ -359,24 +348,19 @@ func (i *Indexer) filter(ctx context.Context) error { } else { slog.Info("handled event successfully") } - return nil }) + } - // if there are no more events - if !messageSentEvents.Next() { - // wait for the last of the goroutines to finish - if err := group.Wait(); err != nil { - return errors.Wrap(err, "group.Wait") - } - // handle no events remaining, saving the processing block and restarting the for - // loop - if err := i.handleNoEventsInBatch(ctx, i.srcChainId, int64(end)); err != nil { - return errors.Wrap(err, "i.handleNoEventsInBatch") - } + // wait for the last of the goroutines to finish + if err := group.Wait(); err != nil { + return errors.Wrap(err, "group.Wait") + } - break - } + // handle no events remaining, saving the processing block and restarting the for + // loop + if err := i.handleNoEventsInBatch(ctx, i.srcChainId, int64(end)); err != nil { + return errors.Wrap(err, "i.handleNoEventsInBatch") } } diff --git a/packages/relayer/processor/processor.go b/packages/relayer/processor/processor.go index beeba0cf4e3..30cc9515058 100644 --- a/packages/relayer/processor/processor.go +++ b/packages/relayer/processor/processor.go @@ -372,6 +372,7 @@ func (p *Processor) Start() error { }() p.wg.Add(1) + go p.eventLoop(ctx) return nil