Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(relayer): add numLatestBlocksStartWhenCrawling in crawler #17599

Merged
merged 9 commits into from
Jun 25, 2024
24 changes: 18 additions & 6 deletions packages/relayer/cmd/flags/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,23 @@ var (
Category: indexerCategory,
EnvVars: []string{"SRC_TAIKO_ADDRESS"},
}
NumLatestBlocksToIgnoreWhenCrawling = &cli.Uint64Flag{
Name: "numLatestBlocksToIgnoreWhenCrawling",
Usage: "Number of blocks to ignore when crawling chain, should be higher for L2-L1 indexing due to delay",
Value: 1000,
NumLatestBlocksEndWhenCrawling = &cli.Uint64Flag{
Name: "numLatestBlocksEndWhenCrawling",
Usage: `Number of blocks to ignore from the end when crawling chain,
should be higher for L2-L1 indexing due to delay
`,
Value: 300,
Category: indexerCategory,
EnvVars: []string{"NUM_LATEST_BLOCKS_END_WHEN_CRAWLING"},
}
NumLatestBlocksStartWhenCrawling = &cli.Uint64Flag{
Name: "numLatestBlocksStartWhenCrawling",
Usage: `Number of latest blocks to index from the start when crawling chain.
The default value is to cover past 7 days.
`,
Value: 50400,
Category: indexerCategory,
EnvVars: []string{"NUM_LATEST_BLOCKS_TO_IGNORE_WHEN_CRAWLING"},
EnvVars: []string{"NUM_LATEST_BLOCKS_START_WHEN_CRAWLING"},
}
EventName = &cli.StringFlag{
Name: "event",
Expand All @@ -101,7 +112,8 @@ var IndexerFlags = MergeFlags(CommonFlags, QueueFlags, []cli.Flag{
SubscriptionBackoff,
SyncMode,
WatchMode,
NumLatestBlocksToIgnoreWhenCrawling,
NumLatestBlocksEndWhenCrawling,
NumLatestBlocksStartWhenCrawling,
EventName,
TargetBlockNumber,
})
86 changes: 44 additions & 42 deletions packages/relayer/indexer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,53 +36,55 @@ type Config struct {
QueueHost string
QueuePort uint64
// rpc configs
SrcRPCUrl string
DestRPCUrl string
ETHClientTimeout uint64
BlockBatchSize uint64
NumGoroutines uint64
SubscriptionBackoff uint64
SyncMode SyncMode
WatchMode WatchMode
NumLatestBlocksToIgnoreWhenCrawling uint64
EventName string
TargetBlockNumber *uint64
BackOffRetryInterval time.Duration
BackOffMaxRetries uint64
OpenQueueFunc func() (queue.Queue, error)
OpenDBFunc func() (DB, error)
SrcRPCUrl string
DestRPCUrl string
ETHClientTimeout uint64
BlockBatchSize uint64
NumGoroutines uint64
SubscriptionBackoff uint64
SyncMode SyncMode
WatchMode WatchMode
NumLatestBlocksEndWhenCrawling uint64
NumLatestBlocksStartWhenCrawling uint64
EventName string
TargetBlockNumber *uint64
BackOffRetryInterval time.Duration
BackOffMaxRetries uint64
OpenQueueFunc func() (queue.Queue, error)
OpenDBFunc func() (DB, error)
}

// NewConfigFromCliContext creates a new config instance from command line flags.
func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
return &Config{
SrcBridgeAddress: common.HexToAddress(c.String(flags.SrcBridgeAddress.Name)),
SrcTaikoAddress: common.HexToAddress(c.String(flags.SrcTaikoAddress.Name)),
SrcSignalServiceAddress: common.HexToAddress(c.String(flags.SrcSignalServiceAddress.Name)),
DestBridgeAddress: common.HexToAddress(c.String(flags.DestBridgeAddress.Name)),
DatabaseUsername: c.String(flags.DatabaseUsername.Name),
DatabasePassword: c.String(flags.DatabasePassword.Name),
DatabaseName: c.String(flags.DatabaseName.Name),
DatabaseHost: c.String(flags.DatabaseHost.Name),
DatabaseMaxIdleConns: c.Uint64(flags.DatabaseMaxIdleConns.Name),
DatabaseMaxOpenConns: c.Uint64(flags.DatabaseMaxOpenConns.Name),
DatabaseMaxConnLifetime: c.Uint64(flags.DatabaseConnMaxLifetime.Name),
QueueUsername: c.String(flags.QueueUsername.Name),
QueuePassword: c.String(flags.QueuePassword.Name),
QueuePort: c.Uint64(flags.QueuePort.Name),
QueueHost: c.String(flags.QueueHost.Name),
SrcRPCUrl: c.String(flags.SrcRPCUrl.Name),
DestRPCUrl: c.String(flags.DestRPCUrl.Name),
BlockBatchSize: c.Uint64(flags.BlockBatchSize.Name),
NumGoroutines: c.Uint64(flags.MaxNumGoroutines.Name),
SubscriptionBackoff: c.Uint64(flags.SubscriptionBackoff.Name),
WatchMode: WatchMode(c.String(flags.WatchMode.Name)),
SyncMode: SyncMode(c.String(flags.SyncMode.Name)),
ETHClientTimeout: c.Uint64(flags.ETHClientTimeout.Name),
NumLatestBlocksToIgnoreWhenCrawling: c.Uint64(flags.NumLatestBlocksToIgnoreWhenCrawling.Name),
EventName: c.String(flags.EventName.Name),
BackOffMaxRetries: c.Uint64(flags.BackOffMaxRetrys.Name),
BackOffRetryInterval: c.Duration(flags.BackOffRetryInterval.Name),
SrcBridgeAddress: common.HexToAddress(c.String(flags.SrcBridgeAddress.Name)),
SrcTaikoAddress: common.HexToAddress(c.String(flags.SrcTaikoAddress.Name)),
SrcSignalServiceAddress: common.HexToAddress(c.String(flags.SrcSignalServiceAddress.Name)),
DestBridgeAddress: common.HexToAddress(c.String(flags.DestBridgeAddress.Name)),
DatabaseUsername: c.String(flags.DatabaseUsername.Name),
DatabasePassword: c.String(flags.DatabasePassword.Name),
DatabaseName: c.String(flags.DatabaseName.Name),
DatabaseHost: c.String(flags.DatabaseHost.Name),
DatabaseMaxIdleConns: c.Uint64(flags.DatabaseMaxIdleConns.Name),
DatabaseMaxOpenConns: c.Uint64(flags.DatabaseMaxOpenConns.Name),
DatabaseMaxConnLifetime: c.Uint64(flags.DatabaseConnMaxLifetime.Name),
QueueUsername: c.String(flags.QueueUsername.Name),
QueuePassword: c.String(flags.QueuePassword.Name),
QueuePort: c.Uint64(flags.QueuePort.Name),
QueueHost: c.String(flags.QueueHost.Name),
SrcRPCUrl: c.String(flags.SrcRPCUrl.Name),
DestRPCUrl: c.String(flags.DestRPCUrl.Name),
BlockBatchSize: c.Uint64(flags.BlockBatchSize.Name),
NumGoroutines: c.Uint64(flags.MaxNumGoroutines.Name),
SubscriptionBackoff: c.Uint64(flags.SubscriptionBackoff.Name),
WatchMode: WatchMode(c.String(flags.WatchMode.Name)),
SyncMode: SyncMode(c.String(flags.SyncMode.Name)),
ETHClientTimeout: c.Uint64(flags.ETHClientTimeout.Name),
NumLatestBlocksEndWhenCrawling: c.Uint64(flags.NumLatestBlocksEndWhenCrawling.Name),
NumLatestBlocksStartWhenCrawling: c.Uint64(flags.NumLatestBlocksStartWhenCrawling.Name),
EventName: c.String(flags.EventName.Name),
BackOffMaxRetries: c.Uint64(flags.BackOffMaxRetrys.Name),
BackOffRetryInterval: c.Duration(flags.BackOffRetryInterval.Name),
TargetBlockNumber: func() *uint64 {
if c.IsSet(flags.TargetBlockNumber.Name) {
value := c.Uint64(flags.TargetBlockNumber.Name)
Expand Down
8 changes: 5 additions & 3 deletions packages/relayer/indexer/handle_chain_data_synced_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package indexer
import (
"context"
"encoding/json"
"math/big"

"log/slog"

Expand All @@ -16,7 +15,6 @@ import (
// handleChainDataSyncedEvent handles an individual ChainDataSynced event
func (i *Indexer) handleChainDataSyncedEvent(
ctx context.Context,
chainID *big.Int,
event *signalservice.SignalServiceChainDataSynced,
waitForConfirmations bool,
) error {
Expand Down Expand Up @@ -74,7 +72,11 @@ func (i *Indexer) handleChainDataSyncedEvent(
return errors.Wrap(err, "i.eventRepo.Save")
}

slog.Info("chainDataSynced event saved")
slog.Info("chainDataSynced event saved",
"srcChainId", i.srcChainId,
"destChainId", i.destChainId,
"SyncedChainID", event.ChainId,
)

relayer.ChainDataSyncedEventsIndexed.Inc()

Expand Down
33 changes: 22 additions & 11 deletions packages/relayer/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ type Indexer struct {

wg *sync.WaitGroup

numLatestBlocksToIgnoreWhenCrawling uint64
numLatestBlocksEndWhenCrawling uint64
numLatestBlocksStartWhenCrawling uint64

targetBlockNumber *uint64

Expand Down Expand Up @@ -233,7 +234,8 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) (err error) {

i.ethClientTimeout = time.Duration(cfg.ETHClientTimeout) * time.Second

i.numLatestBlocksToIgnoreWhenCrawling = cfg.NumLatestBlocksToIgnoreWhenCrawling
i.numLatestBlocksEndWhenCrawling = cfg.NumLatestBlocksEndWhenCrawling
i.numLatestBlocksStartWhenCrawling = cfg.NumLatestBlocksStartWhenCrawling

i.targetBlockNumber = cfg.TargetBlockNumber

Expand Down Expand Up @@ -274,7 +276,7 @@ func (i *Indexer) Start() error {
}

// set the initial processing block, which will vary by sync mode.
if err := i.setInitialIndexingBlockByMode(i.ctx, i.syncMode, i.srcChainId); err != nil {
if err := i.setInitialIndexingBlockByMode(i.syncMode, i.srcChainId); err != nil {
return errors.Wrap(err, "i.setInitialIndexingBlockByMode")
}

Expand All @@ -298,10 +300,6 @@ func (i *Indexer) eventLoop(ctx context.Context, startBlockID uint64) {

var d time.Duration = 10 * time.Second

if i.watchMode == CrawlPastBlocks {
d = 10 * time.Minute
}

t := time.NewTicker(d)

defer t.Stop()
Expand Down Expand Up @@ -333,6 +331,15 @@ func (i *Indexer) filter(ctx context.Context) error {
// ignore latest N blocks if we are crawling past blocks, they are probably in queue already
// and are not "missed", have just not been processed.
if i.watchMode == CrawlPastBlocks {
if i.numLatestBlocksEndWhenCrawling > i.numLatestBlocksStartWhenCrawling {
cyberhorsey marked this conversation as resolved.
Show resolved Hide resolved
slog.Error("Invalid configuration",
"numLatestBlocksEndWhenCrawling", i.numLatestBlocksEndWhenCrawling,
"numLatestBlocksStartWhenCrawling", i.numLatestBlocksStartWhenCrawling,
)

return errors.New("numLatestBlocksStartWhenCrawling must be greater than numLatestBlocksEndWhenCrawling")
}

// if targetBlockNumber is not nil, we are just going to process a singular block.
if i.targetBlockNumber != nil {
slog.Info("targetBlockNumber is set", "targetBlockNumber", *i.targetBlockNumber)
Expand All @@ -342,15 +349,19 @@ func (i *Indexer) filter(ctx context.Context) error {
endBlockID = i.latestIndexedBlockNumber + 1
} else {
// set the initial processing block back to either 0 or the genesis block again.
if err := i.setInitialIndexingBlockByMode(i.ctx, i.syncMode, i.srcChainId); err != nil {
if err := i.setInitialIndexingBlockByMode(i.syncMode, i.srcChainId); err != nil {
return errors.Wrap(err, "i.setInitialIndexingBlockByMode")
}

if endBlockID > i.numLatestBlocksToIgnoreWhenCrawling {
if i.latestIndexedBlockNumber < endBlockID-i.numLatestBlocksStartWhenCrawling {
i.latestIndexedBlockNumber = endBlockID - i.numLatestBlocksStartWhenCrawling
}

if endBlockID > i.numLatestBlocksEndWhenCrawling {
// otherwise, we need to set the endBlockID as the greater of the two:
// either the endBlockID minus the number of latest blocks to ignore,
// or endBlockID.
endBlockID -= i.numLatestBlocksToIgnoreWhenCrawling
endBlockID -= i.numLatestBlocksEndWhenCrawling
}
}
}
Expand Down Expand Up @@ -594,7 +605,7 @@ func (i *Indexer) indexChainDataSyncedEvents(ctx context.Context,
event := chainDataSyncedEvents.Event

group.Go(func() error {
err := i.handleChainDataSyncedEvent(ctx, i.srcChainId, event, true)
err := i.handleChainDataSyncedEvent(ctx, event, true)
if err != nil {
relayer.MessageStatusChangedEventsIndexingErrors.Inc()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package indexer

import (
"context"
"math/big"

"github.com/pkg/errors"
Expand All @@ -11,7 +10,6 @@ import (
// setInitialIndexingBlockByMode takes in a SyncMode and determines how we should
// start our indexing
func (i *Indexer) setInitialIndexingBlockByMode(
ctx context.Context,
mode SyncMode,
chainID *big.Int,
) error {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package indexer

import (
"context"
"math/big"
"testing"

Expand Down Expand Up @@ -51,7 +50,6 @@ func Test_setInitialIndexingBlockByMode(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
svc, _ := newTestService(tt.mode, FilterAndSubscribe)
err := svc.setInitialIndexingBlockByMode(
context.Background(),
tt.mode,
tt.chainID,
)
Expand Down
Loading