Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(relayer): restart indexer and crawler when they exit due to failed API requests #16465

Merged
merged 12 commits into from
Mar 18, 2024
31 changes: 22 additions & 9 deletions packages/relayer/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,28 +218,38 @@ func (i *Indexer) Start() error {
i.wg.Add(1)

go func() {
defer func() {
i.wg.Done()
}()

if err := i.filter(i.ctx); err != nil {
slog.Error("error filtering blocks", "error", err.Error())
if err := backoff.Retry(func() error {
err := i.filter(i.ctx)
if err != nil {
slog.Error("filter failed, will retry", "error", err)
}
return err
}, backoff.NewConstantBackOff(5*time.Second)); err != nil {
mask-pp marked this conversation as resolved.
Show resolved Hide resolved
slog.Error("error after retrying filter with backoff", "error", err)
}
}()

go func() {
if err := backoff.Retry(func() error {
return scanBlocks(i.ctx, i.srcEthClient, i.srcChainId, i.wg)
err := scanBlocks(i.ctx, i.srcEthClient, i.srcChainId, i.wg)
if err != nil {
slog.Error("scanBlocks failed, will retry", "error", err)
}
return err
}, backoff.NewConstantBackOff(5*time.Second)); err != nil {
slog.Error("scan blocks backoff retry", "error", err)
}
}()

go func() {
if err := backoff.Retry(func() error {
return i.queue.Notify(i.ctx, i.wg)
err := i.queue.Notify(i.ctx, i.wg)
if err != nil {
slog.Error("i.queue.Notify failed, will retry", "error", err)
}
return err
}, backoff.NewConstantBackOff(5*time.Second)); err != nil {
xiaodino marked this conversation as resolved.
Show resolved Hide resolved
slog.Error("queue notify backoff retry", "error", err)
slog.Error("i.queue.Notify backoff retry", "error", err)
}
}()

Expand Down Expand Up @@ -383,6 +393,9 @@ func (i *Indexer) filter(ctx context.Context) error {

if i.watchMode == CrawlPastBlocks {
slog.Info("restarting filtering from genesis")

i.processingBlockHeight = 0

return i.filter(ctx)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ func (i *Indexer) setInitialProcessingBlockByMode(

return nil
case Resync:
if i.watchMode == CrawlPastBlocks && i.processingBlockHeight > startingBlock {
return nil
}

i.processingBlockHeight = startingBlock

return nil
default:
return relayer.ErrInvalidMode
Expand Down
Loading