Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(relayer): fix some carps #17681

Merged
merged 3 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions packages/relayer/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type API struct {
srv *http.Server
httpPort uint64
ctx context.Context
wg *sync.WaitGroup
wg sync.WaitGroup
srcEthClient *ethclient.Client
}

Expand Down Expand Up @@ -84,7 +84,6 @@ func InitFromConfig(ctx context.Context, api *API, cfg *Config) (err error) {
api.srv = srv
api.httpPort = cfg.HTTPPort
api.ctx = ctx
api.wg = &sync.WaitGroup{}
api.srcEthClient = srcEthClient

return nil
Expand Down Expand Up @@ -112,7 +111,7 @@ func (api *API) Start() error {

go func() {
if err := backoff.Retry(func() error {
return utils.ScanBlocks(api.ctx, api.srcEthClient, api.wg)
return utils.ScanBlocks(api.ctx, api.srcEthClient, &api.wg)
}, backoff.NewConstantBackOff(5*time.Second)); err != nil {
slog.Error("scan blocks backoff retry", "error", err)
}
Expand Down
7 changes: 1 addition & 6 deletions packages/relayer/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,13 @@ type Bridge struct {
srcBridge relayer.Bridge
destBridge relayer.Bridge

mu *sync.Mutex

addr common.Address

backOffRetryInterval time.Duration
backOffMaxRetries uint64
ethClientTimeout time.Duration

wg *sync.WaitGroup
wg sync.WaitGroup

srcChainId *big.Int
destChainId *big.Int
Expand Down Expand Up @@ -121,9 +119,6 @@ func InitFromConfig(ctx context.Context, b *Bridge, cfg *Config) error {
b.srcChainId = srcChainID
b.destChainId = destChainID

b.wg = &sync.WaitGroup{}
b.mu = &sync.Mutex{}

b.backOffRetryInterval = time.Duration(cfg.BackoffRetryInterval) * time.Second
b.backOffMaxRetries = cfg.BackOffMaxRetrys
b.ethClientTimeout = time.Duration(cfg.ETHClientTimeout) * time.Second
Expand Down
28 changes: 0 additions & 28 deletions packages/relayer/db.go

This file was deleted.

18 changes: 4 additions & 14 deletions packages/relayer/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type Indexer struct {

ethClientTimeout time.Duration

wg *sync.WaitGroup
wg sync.WaitGroup

numLatestBlocksEndWhenCrawling uint64
numLatestBlocksStartWhenCrawling uint64
Expand All @@ -123,8 +123,6 @@ type Indexer struct {

ctx context.Context

mu *sync.Mutex

eventName string

cfg *Config
Expand Down Expand Up @@ -230,17 +228,13 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) (err error) {
i.syncMode = cfg.SyncMode
i.watchMode = cfg.WatchMode

i.wg = &sync.WaitGroup{}

i.ethClientTimeout = time.Duration(cfg.ETHClientTimeout) * time.Second

i.numLatestBlocksEndWhenCrawling = cfg.NumLatestBlocksEndWhenCrawling
i.numLatestBlocksStartWhenCrawling = cfg.NumLatestBlocksStartWhenCrawling

i.targetBlockNumber = cfg.TargetBlockNumber

i.mu = &sync.Mutex{}

i.eventName = cfg.EventName

i.cfg = cfg
Expand Down Expand Up @@ -280,13 +274,11 @@ func (i *Indexer) Start() error {
return errors.Wrap(err, "i.setInitialIndexingBlockByMode")
}

i.wg.Add(1)

go i.eventLoop(i.ctx, i.latestIndexedBlockNumber)

go func() {
if err := backoff.Retry(func() error {
return utils.ScanBlocks(i.ctx, i.srcEthClient, i.wg)
return utils.ScanBlocks(i.ctx, i.srcEthClient, &i.wg)
}, backoff.NewConstantBackOff(5*time.Second)); err != nil {
slog.Error("scan blocks backoff retry", "error", err)
}
Expand All @@ -296,12 +288,10 @@ func (i *Indexer) Start() error {
}

func (i *Indexer) eventLoop(ctx context.Context, startBlockID uint64) {
i.wg.Add(1)
defer i.wg.Done()

var d time.Duration = 10 * time.Second

t := time.NewTicker(d)

t := time.NewTicker(10 * time.Second)
defer t.Stop()

for {
Expand Down
4 changes: 0 additions & 4 deletions packages/relayer/indexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package indexer

import (
"context"
"sync"
"time"

"github.com/taikoxyz/taiko-mono/packages/relayer"
Expand All @@ -28,15 +27,12 @@ func newTestService(syncMode SyncMode, watchMode WatchMode) (*Indexer, relayer.B
syncMode: syncMode,
watchMode: watchMode,

wg: &sync.WaitGroup{},

ctx: context.Background(),

srcChainId: mock.MockChainID,
destChainId: mock.MockChainID,

ethClientTimeout: 10 * time.Second,
mu: &sync.Mutex{},
eventName: relayer.EventNameMessageSent,
}, b
}
5 changes: 1 addition & 4 deletions packages/relayer/pkg/utils/scan_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ type headSubscriber interface {

func ScanBlocks(ctx context.Context, ethClient headSubscriber, wg *sync.WaitGroup) error {
wg.Add(1)

defer func() {
wg.Done()
}()
defer wg.Done()

headers := make(chan *types.Header)

Expand Down
20 changes: 6 additions & 14 deletions packages/relayer/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ type Processor struct {

prover *proof.Prover

mu *sync.Mutex

relayerAddr common.Address
srcSignalServiceAddress common.Address

Expand All @@ -121,7 +119,7 @@ type Processor struct {

msgCh chan queue.Message

wg *sync.WaitGroup
wg sync.WaitGroup

srcChainId *big.Int
destChainId *big.Int
Expand All @@ -137,7 +135,7 @@ type Processor struct {
maxMessageRetries uint64

processingTxHashes map[common.Hash]bool
processingTxHashMu *sync.Mutex
processingTxHashMu sync.Mutex
}

// InitFromCli creates a new processor from a cli context
Expand Down Expand Up @@ -363,8 +361,6 @@ func InitFromConfig(ctx context.Context, p *Processor, cfg *Config) error {
p.srcSignalServiceAddress = cfg.SrcSignalServiceAddress

p.msgCh = make(chan queue.Message)
p.wg = &sync.WaitGroup{}
p.mu = &sync.Mutex{}
p.srcCaller = srcRpcClient

p.backOffRetryInterval = time.Duration(cfg.BackoffRetryInterval) * time.Second
Expand All @@ -376,7 +372,6 @@ func InitFromConfig(ctx context.Context, p *Processor, cfg *Config) error {
p.maxMessageRetries = cfg.MaxMessageRetries

p.processingTxHashes = make(map[common.Hash]bool, 0)
p.processingTxHashMu = &sync.Mutex{}

return nil
}
Expand Down Expand Up @@ -418,7 +413,7 @@ func (p *Processor) Start() error {
go func() {
if err := backoff.Retry(func() error {
slog.Info("attempting backoff queue subscription")
if err := p.queue.Subscribe(ctx, p.msgCh, p.wg); err != nil {
if err := p.queue.Subscribe(ctx, p.msgCh, &p.wg); err != nil {
slog.Error("processor queue subscription error", "err", err.Error())
return err
}
Expand All @@ -429,13 +424,11 @@ func (p *Processor) Start() error {
}
}()

p.wg.Add(1)

go p.eventLoop(ctx)

go func() {
if err := backoff.Retry(func() error {
return utils.ScanBlocks(ctx, p.srcEthClient, p.wg)
return utils.ScanBlocks(ctx, p.srcEthClient, &p.wg)
}, backoff.NewConstantBackOff(5*time.Second)); err != nil {
slog.Error("scan blocks backoff retry", "error", err)
}
Expand All @@ -451,9 +444,8 @@ func (p *Processor) queueName() string {
// eventLoop is the main event loop of a Processor which should read
// messages from a queue and then process them.
func (p *Processor) eventLoop(ctx context.Context) {
defer func() {
p.wg.Done()
}()
p.wg.Add(1)
defer p.wg.Done()

for {
select {
Expand Down
3 changes: 0 additions & 3 deletions packages/relayer/processor/processor_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package processor

import (
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -28,7 +27,6 @@ func newTestProcessor(profitableOnly bool) *Processor {
destEthClient: &mock.EthClient{},
destERC20Vault: &mock.TokenVault{},
srcSignalService: &mock.SignalService{},
mu: &sync.Mutex{},
ecdsaKey: privateKey,
prover: prover,
srcCaller: &mock.Caller{},
Expand All @@ -49,6 +47,5 @@ func newTestProcessor(profitableOnly bool) *Processor {
maxMessageRetries: 5,
destQuotaManager: &mock.QuotaManager{},
processingTxHashes: make(map[common.Hash]bool, 0),
processingTxHashMu: &sync.Mutex{},
}
}
2 changes: 1 addition & 1 deletion packages/relayer/scripts/abigen.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#/bin/sh
#!/bin/sh

if [ ! -d "../protocol/out" ]; then
echo "ABI not generated in protocol package yet. Please run npm install && npx hardhat compile in ../protocol"
Expand Down
4 changes: 2 additions & 2 deletions packages/relayer/scripts/swagger.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#/bin/sh
#!/bin/sh

swag init -g server.go -d pkg/http --parseDependency
swag init -g server.go -d pkg/http --parseDependency
11 changes: 0 additions & 11 deletions packages/relayer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,6 @@ var (
ZeroAddress = common.HexToAddress("0x0000000000000000000000000000000000000000")
)

// IsInSlice determines whether v is in slice s
func IsInSlice[T comparable](v T, s []T) bool {
for _, e := range s {
if v == e {
return true
}
}

return false
}

type confirmer interface {
TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
BlockNumber(ctx context.Context) (uint64, error)
Expand Down
10 changes: 0 additions & 10 deletions packages/relayer/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,6 @@ import (
"github.com/stretchr/testify/assert"
)

func Test_IsInSlice(t *testing.T) {
if IsInSlice("fake", []string{}) {
t.Fatal()
}

if !IsInSlice("real", []string{"real"}) {
t.Fatal()
}
}

type mockConfirmer struct {
}

Expand Down
10 changes: 3 additions & 7 deletions packages/relayer/watchdog/watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ type Watchdog struct {
srcBridge relayer.Bridge
destBridge relayer.Bridge

mu *sync.Mutex

watchdogAddr common.Address

confirmations uint64
Expand All @@ -80,7 +78,7 @@ type Watchdog struct {

msgCh chan queue.Message

wg *sync.WaitGroup
wg sync.WaitGroup

srcChainId *big.Int
destChainId *big.Int
Expand Down Expand Up @@ -194,8 +192,6 @@ func InitFromConfig(ctx context.Context, w *Watchdog, cfg *Config) error {
w.confirmations = cfg.Confirmations

w.msgCh = make(chan queue.Message)
w.wg = &sync.WaitGroup{}
w.mu = &sync.Mutex{}

w.backOffRetryInterval = time.Duration(cfg.BackoffRetryInterval) * time.Second
w.backOffMaxRetries = cfg.BackOffMaxRetrys
Expand Down Expand Up @@ -230,7 +226,7 @@ func (w *Watchdog) Start() error {
go func() {
if err := backoff.Retry(func() error {
slog.Info("attempting backoff queue subscription")
if err := w.queue.Subscribe(ctx, w.msgCh, w.wg); err != nil {
if err := w.queue.Subscribe(ctx, w.msgCh, &w.wg); err != nil {
slog.Error("processor queue subscription error", "err", err.Error())
return err
}
Expand All @@ -247,7 +243,7 @@ func (w *Watchdog) Start() error {

go func() {
if err := backoff.Retry(func() error {
return utils.ScanBlocks(ctx, w.srcEthClient, w.wg)
return utils.ScanBlocks(ctx, w.srcEthClient, &w.wg)
}, backoff.NewConstantBackOff(5*time.Second)); err != nil {
slog.Error("scan blocks backoff retry", "error", err)
}
Expand Down
Loading