diff --git a/packages/relayer/api/api.go b/packages/relayer/api/api.go index e7b72bd1477..ab7b3c1b0e8 100644 --- a/packages/relayer/api/api.go +++ b/packages/relayer/api/api.go @@ -29,7 +29,7 @@ type API struct { srv *http.Server httpPort uint64 ctx context.Context - wg *sync.WaitGroup + wg sync.WaitGroup srcEthClient *ethclient.Client } @@ -84,7 +84,6 @@ func InitFromConfig(ctx context.Context, api *API, cfg *Config) (err error) { api.srv = srv api.httpPort = cfg.HTTPPort api.ctx = ctx - api.wg = &sync.WaitGroup{} api.srcEthClient = srcEthClient return nil @@ -112,7 +111,7 @@ func (api *API) Start() error { go func() { if err := backoff.Retry(func() error { - return utils.ScanBlocks(api.ctx, api.srcEthClient, api.wg) + return utils.ScanBlocks(api.ctx, api.srcEthClient, &api.wg) }, backoff.NewConstantBackOff(5*time.Second)); err != nil { slog.Error("scan blocks backoff retry", "error", err) } diff --git a/packages/relayer/bridge/bridge.go b/packages/relayer/bridge/bridge.go index bcfc22e603d..7c60ace3337 100644 --- a/packages/relayer/bridge/bridge.go +++ b/packages/relayer/bridge/bridge.go @@ -45,15 +45,13 @@ type Bridge struct { srcBridge relayer.Bridge destBridge relayer.Bridge - mu *sync.Mutex - addr common.Address backOffRetryInterval time.Duration backOffMaxRetries uint64 ethClientTimeout time.Duration - wg *sync.WaitGroup + wg sync.WaitGroup srcChainId *big.Int destChainId *big.Int @@ -121,9 +119,6 @@ func InitFromConfig(ctx context.Context, b *Bridge, cfg *Config) error { b.srcChainId = srcChainID b.destChainId = destChainID - b.wg = &sync.WaitGroup{} - b.mu = &sync.Mutex{} - b.backOffRetryInterval = time.Duration(cfg.BackoffRetryInterval) * time.Second b.backOffMaxRetries = cfg.BackOffMaxRetrys b.ethClientTimeout = time.Duration(cfg.ETHClientTimeout) * time.Second diff --git a/packages/relayer/db.go b/packages/relayer/db.go deleted file mode 100644 index 02798e0e236..00000000000 --- a/packages/relayer/db.go +++ /dev/null @@ -1,28 +0,0 @@ -package relayer - -import ( - "database/sql" - - "github.com/cyberhorsey/errors" - "gorm.io/gorm" -) - -var ( - ErrNoDB = errors.Validation.NewWithKeyAndDetail("ERR_NO_DB", "DB is required") -) - -type DBConnectionOpts struct { - Name string - Password string - Host string - Database string - MaxIdleConns uint64 - MaxOpenConns uint64 - MaxConnLifetime uint64 - OpenFunc func(dsn string) (DB, error) -} - -type DB interface { - DB() (*sql.DB, error) - GormDB() *gorm.DB -} diff --git a/packages/relayer/indexer/indexer.go b/packages/relayer/indexer/indexer.go index dc6683351a2..410fde629ae 100644 --- a/packages/relayer/indexer/indexer.go +++ b/packages/relayer/indexer/indexer.go @@ -114,7 +114,7 @@ type Indexer struct { ethClientTimeout time.Duration - wg *sync.WaitGroup + wg sync.WaitGroup numLatestBlocksEndWhenCrawling uint64 numLatestBlocksStartWhenCrawling uint64 @@ -123,8 +123,6 @@ type Indexer struct { ctx context.Context - mu *sync.Mutex - eventName string cfg *Config @@ -230,8 +228,6 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) (err error) { i.syncMode = cfg.SyncMode i.watchMode = cfg.WatchMode - i.wg = &sync.WaitGroup{} - i.ethClientTimeout = time.Duration(cfg.ETHClientTimeout) * time.Second i.numLatestBlocksEndWhenCrawling = cfg.NumLatestBlocksEndWhenCrawling @@ -239,8 +235,6 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) (err error) { i.targetBlockNumber = cfg.TargetBlockNumber - i.mu = &sync.Mutex{} - i.eventName = cfg.EventName i.cfg = cfg @@ -280,13 +274,11 @@ func (i *Indexer) Start() error { return errors.Wrap(err, "i.setInitialIndexingBlockByMode") } - i.wg.Add(1) - go i.eventLoop(i.ctx, i.latestIndexedBlockNumber) go func() { if err := backoff.Retry(func() error { - return utils.ScanBlocks(i.ctx, i.srcEthClient, i.wg) + return utils.ScanBlocks(i.ctx, i.srcEthClient, &i.wg) }, backoff.NewConstantBackOff(5*time.Second)); err != nil { slog.Error("scan blocks backoff retry", "error", err) } @@ -296,12 +288,10 @@ func (i *Indexer) Start() error { } func (i *Indexer) eventLoop(ctx context.Context, startBlockID uint64) { + i.wg.Add(1) defer i.wg.Done() - var d time.Duration = 10 * time.Second - - t := time.NewTicker(d) - + t := time.NewTicker(10 * time.Second) defer t.Stop() for { diff --git a/packages/relayer/indexer/indexer_test.go b/packages/relayer/indexer/indexer_test.go index ca5a150ab36..8af5555ca1d 100644 --- a/packages/relayer/indexer/indexer_test.go +++ b/packages/relayer/indexer/indexer_test.go @@ -2,7 +2,6 @@ package indexer import ( "context" - "sync" "time" "github.com/taikoxyz/taiko-mono/packages/relayer" @@ -28,15 +27,12 @@ func newTestService(syncMode SyncMode, watchMode WatchMode) (*Indexer, relayer.B syncMode: syncMode, watchMode: watchMode, - wg: &sync.WaitGroup{}, - ctx: context.Background(), srcChainId: mock.MockChainID, destChainId: mock.MockChainID, ethClientTimeout: 10 * time.Second, - mu: &sync.Mutex{}, eventName: relayer.EventNameMessageSent, }, b } diff --git a/packages/relayer/pkg/utils/scan_blocks.go b/packages/relayer/pkg/utils/scan_blocks.go index ec62c24a724..d6da257ebc5 100644 --- a/packages/relayer/pkg/utils/scan_blocks.go +++ b/packages/relayer/pkg/utils/scan_blocks.go @@ -15,10 +15,7 @@ type headSubscriber interface { func ScanBlocks(ctx context.Context, ethClient headSubscriber, wg *sync.WaitGroup) error { wg.Add(1) - - defer func() { - wg.Done() - }() + defer wg.Done() headers := make(chan *types.Header) diff --git a/packages/relayer/processor/processor.go b/packages/relayer/processor/processor.go index 43302583357..64747d32eab 100644 --- a/packages/relayer/processor/processor.go +++ b/packages/relayer/processor/processor.go @@ -103,8 +103,6 @@ type Processor struct { prover *proof.Prover - mu *sync.Mutex - relayerAddr common.Address srcSignalServiceAddress common.Address @@ -121,7 +119,7 @@ type Processor struct { msgCh chan queue.Message - wg *sync.WaitGroup + wg sync.WaitGroup srcChainId *big.Int destChainId *big.Int @@ -137,7 +135,7 @@ type Processor struct { maxMessageRetries uint64 processingTxHashes map[common.Hash]bool - processingTxHashMu *sync.Mutex + processingTxHashMu sync.Mutex } // InitFromCli creates a new processor from a cli context @@ -363,8 +361,6 @@ func InitFromConfig(ctx context.Context, p *Processor, cfg *Config) error { p.srcSignalServiceAddress = cfg.SrcSignalServiceAddress p.msgCh = make(chan queue.Message) - p.wg = &sync.WaitGroup{} - p.mu = &sync.Mutex{} p.srcCaller = srcRpcClient p.backOffRetryInterval = time.Duration(cfg.BackoffRetryInterval) * time.Second @@ -376,7 +372,6 @@ func InitFromConfig(ctx context.Context, p *Processor, cfg *Config) error { p.maxMessageRetries = cfg.MaxMessageRetries p.processingTxHashes = make(map[common.Hash]bool, 0) - p.processingTxHashMu = &sync.Mutex{} return nil } @@ -418,7 +413,7 @@ func (p *Processor) Start() error { go func() { if err := backoff.Retry(func() error { slog.Info("attempting backoff queue subscription") - if err := p.queue.Subscribe(ctx, p.msgCh, p.wg); err != nil { + if err := p.queue.Subscribe(ctx, p.msgCh, &p.wg); err != nil { slog.Error("processor queue subscription error", "err", err.Error()) return err } @@ -429,13 +424,11 @@ func (p *Processor) Start() error { } }() - p.wg.Add(1) - go p.eventLoop(ctx) go func() { if err := backoff.Retry(func() error { - return utils.ScanBlocks(ctx, p.srcEthClient, p.wg) + return utils.ScanBlocks(ctx, p.srcEthClient, &p.wg) }, backoff.NewConstantBackOff(5*time.Second)); err != nil { slog.Error("scan blocks backoff retry", "error", err) } @@ -451,9 +444,8 @@ func (p *Processor) queueName() string { // eventLoop is the main event loop of a Processor which should read // messages from a queue and then process them. func (p *Processor) eventLoop(ctx context.Context) { - defer func() { - p.wg.Done() - }() + p.wg.Add(1) + defer p.wg.Done() for { select { diff --git a/packages/relayer/processor/processor_test.go b/packages/relayer/processor/processor_test.go index 5b4582c1a39..2eb87863ea8 100644 --- a/packages/relayer/processor/processor_test.go +++ b/packages/relayer/processor/processor_test.go @@ -1,7 +1,6 @@ package processor import ( - "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -28,7 +27,6 @@ func newTestProcessor(profitableOnly bool) *Processor { destEthClient: &mock.EthClient{}, destERC20Vault: &mock.TokenVault{}, srcSignalService: &mock.SignalService{}, - mu: &sync.Mutex{}, ecdsaKey: privateKey, prover: prover, srcCaller: &mock.Caller{}, @@ -49,6 +47,5 @@ func newTestProcessor(profitableOnly bool) *Processor { maxMessageRetries: 5, destQuotaManager: &mock.QuotaManager{}, processingTxHashes: make(map[common.Hash]bool, 0), - processingTxHashMu: &sync.Mutex{}, } } diff --git a/packages/relayer/scripts/abigen.sh b/packages/relayer/scripts/abigen.sh index 2a25191ffee..d6e14be717f 100755 --- a/packages/relayer/scripts/abigen.sh +++ b/packages/relayer/scripts/abigen.sh @@ -1,4 +1,4 @@ -#/bin/sh +#!/bin/sh if [ ! -d "../protocol/out" ]; then echo "ABI not generated in protocol package yet. Please run npm install && npx hardhat compile in ../protocol" diff --git a/packages/relayer/scripts/swagger.sh b/packages/relayer/scripts/swagger.sh index 2ba0cac2c3a..3a45b8beaac 100755 --- a/packages/relayer/scripts/swagger.sh +++ b/packages/relayer/scripts/swagger.sh @@ -1,3 +1,3 @@ -#/bin/sh +#!/bin/sh -swag init -g server.go -d pkg/http --parseDependency \ No newline at end of file +swag init -g server.go -d pkg/http --parseDependency diff --git a/packages/relayer/types.go b/packages/relayer/types.go index 6867bb05775..657d22d63d9 100644 --- a/packages/relayer/types.go +++ b/packages/relayer/types.go @@ -22,17 +22,6 @@ var ( ZeroAddress = common.HexToAddress("0x0000000000000000000000000000000000000000") ) -// IsInSlice determines whether v is in slice s -func IsInSlice[T comparable](v T, s []T) bool { - for _, e := range s { - if v == e { - return true - } - } - - return false -} - type confirmer interface { TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) BlockNumber(ctx context.Context) (uint64, error) diff --git a/packages/relayer/types_test.go b/packages/relayer/types_test.go index 0444a7d7148..069f1229512 100644 --- a/packages/relayer/types_test.go +++ b/packages/relayer/types_test.go @@ -13,16 +13,6 @@ import ( "github.com/stretchr/testify/assert" ) -func Test_IsInSlice(t *testing.T) { - if IsInSlice("fake", []string{}) { - t.Fatal() - } - - if !IsInSlice("real", []string{"real"}) { - t.Fatal() - } -} - type mockConfirmer struct { } diff --git a/packages/relayer/watchdog/watchdog.go b/packages/relayer/watchdog/watchdog.go index 8de4f9df6c5..7a8c7826dbb 100644 --- a/packages/relayer/watchdog/watchdog.go +++ b/packages/relayer/watchdog/watchdog.go @@ -66,8 +66,6 @@ type Watchdog struct { srcBridge relayer.Bridge destBridge relayer.Bridge - mu *sync.Mutex - watchdogAddr common.Address confirmations uint64 @@ -80,7 +78,7 @@ type Watchdog struct { msgCh chan queue.Message - wg *sync.WaitGroup + wg sync.WaitGroup srcChainId *big.Int destChainId *big.Int @@ -194,8 +192,6 @@ func InitFromConfig(ctx context.Context, w *Watchdog, cfg *Config) error { w.confirmations = cfg.Confirmations w.msgCh = make(chan queue.Message) - w.wg = &sync.WaitGroup{} - w.mu = &sync.Mutex{} w.backOffRetryInterval = time.Duration(cfg.BackoffRetryInterval) * time.Second w.backOffMaxRetries = cfg.BackOffMaxRetrys @@ -230,7 +226,7 @@ func (w *Watchdog) Start() error { go func() { if err := backoff.Retry(func() error { slog.Info("attempting backoff queue subscription") - if err := w.queue.Subscribe(ctx, w.msgCh, w.wg); err != nil { + if err := w.queue.Subscribe(ctx, w.msgCh, &w.wg); err != nil { slog.Error("processor queue subscription error", "err", err.Error()) return err } @@ -247,7 +243,7 @@ func (w *Watchdog) Start() error { go func() { if err := backoff.Retry(func() error { - return utils.ScanBlocks(ctx, w.srcEthClient, w.wg) + return utils.ScanBlocks(ctx, w.srcEthClient, &w.wg) }, backoff.NewConstantBackOff(5*time.Second)); err != nil { slog.Error("scan blocks backoff retry", "error", err) }