diff --git a/Makefile b/Makefile index f5bdea63..262d7b8f 100644 --- a/Makefile +++ b/Makefile @@ -53,7 +53,7 @@ test: stop docker compose -f docker-compose.yml up -d simavs-sync sleep 3 docker ps -a - trap '$(STOP)' EXIT; go test ./... --timeout=10m + trap '$(STOP)' EXIT; go test ./... --timeout=10m --race .PHONY: test run-db-mongo: diff --git a/rpcclient/arbitrum/batch_fetcher.go b/rpcclient/arbitrum/batch_fetcher.go index 217162f7..2bcbca1e 100644 --- a/rpcclient/arbitrum/batch_fetcher.go +++ b/rpcclient/arbitrum/batch_fetcher.go @@ -152,7 +152,7 @@ func (f *Fetcher) Fetch(l1BeginBlockNumber uint64) error { for _, batch := range batches { var rawMsg []byte if batch.serialized[0] == BlobHashesHeaderFlag { - rawMsg, err = f.fetchBlock(f.ctx, batch.BlockNumber, batch.TxHash) + rawMsg, err = f.fetchBlock(batch.BlockNumber, batch.TxHash) if err != nil { return err } @@ -163,8 +163,7 @@ func (f *Fetcher) Fetch(l1BeginBlockNumber uint64) error { if err != nil { return err } - _, err := f.sequencerInbox.parseL2Transactions(batch) - if err != nil { + if _, err := f.sequencerInbox.parseL2Transactions(batch); err != nil { return err } batchesRef, err := f.getBatchRef(batch) @@ -244,11 +243,11 @@ func (f *Fetcher) StopFetch() { f.lastSyncedL1BlockNumber.Store(0) // close L1 fetcher f.cancel() - <-f.done // drain channel for len(f.batchHeaders) > 0 { <-f.batchHeaders } + <-f.done // wait for the fetcher to finish f.cancel = nil f.ctx = nil @@ -256,7 +255,7 @@ func (f *Fetcher) StopFetch() { // fetchBlock fetches the given block and analyzes the transactions // which are sent to the BatchInbox EOA. -func (f *Fetcher) fetchBlock(ctx context.Context, blockNumber uint64, txHash common.Hash) ([]byte, error) { +func (f *Fetcher) fetchBlock(blockNumber uint64, txHash common.Hash) ([]byte, error) { block, err := f.l1Client.GetBlockByNumber(blockNumber) if err != nil { return nil, err diff --git a/rpcclient/arbitrum/client_test.go b/rpcclient/arbitrum/client_test.go new file mode 100644 index 00000000..6e19586c --- /dev/null +++ b/rpcclient/arbitrum/client_test.go @@ -0,0 +1,33 @@ +package arbitrum + +import ( + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" +) + +func TestErrorHandling(t *testing.T) { + cfg := &Config{ + RPCURL: "http://localhost:8545", + L1RPCURL: "http://localhost:8545", + BeaconURL: "http://localhost:8545", + BatchInbox: common.Address{}.Hex(), + ConcurrentFetchers: 4, + } + client, err := NewClient(cfg) + require.NoError(t, err) + + client.SetBeginBlockNumber(10) + time.Sleep(1 * time.Second) + client.fetcher.StopFetch() + + // check if able to restart + client.SetBeginBlockNumber(50) + time.Sleep(1 * time.Second) + client.fetcher.Stop() + // check error propagation + _, err = client.NextBatch() + require.Error(t, err) +} diff --git a/rpcclient/optimism/batch_fetcher.go b/rpcclient/optimism/batch_fetcher.go index f2cdbdd2..24ad11ad 100644 --- a/rpcclient/optimism/batch_fetcher.go +++ b/rpcclient/optimism/batch_fetcher.go @@ -24,6 +24,7 @@ import ( "github.com/Lagrange-Labs/lagrange-node/rpcclient/types" sequencerv2types "github.com/Lagrange-Labs/lagrange-node/sequencer/types/v2" "github.com/Lagrange-Labs/lagrange-node/telemetry" + "github.com/Lagrange-Labs/lagrange-node/utils" ) const ( @@ -184,7 +185,7 @@ func (f *Fetcher) Fetch(l1BeginBlockNumber uint64) error { number := i g.Go(func() error { - res, err := f.fetchBlock(ctx, number) + res, err := f.fetchBlock(number) if err != nil { return err } @@ -283,16 +284,16 @@ func (f *Fetcher) StopFetch() { f.lastSyncedL1BlockNumber.Store(0) // close L1 fetcher f.cancel() - <-f.done // close batch decoder close(f.chFramesRef) for range f.chFramesRef { } - <-f.done + <-f.done // wait for the batch decoder to finish // drain channel for len(f.batchHeaders) > 0 { <-f.batchHeaders } + <-f.done // wait for the fetcher to finish f.cancel = nil f.ctx = nil @@ -300,7 +301,7 @@ func (f *Fetcher) StopFetch() { // fetchBlock fetches the given block and analyzes the transactions // which are sent to the BatchInbox EOA. -func (f *Fetcher) fetchBlock(ctx context.Context, blockNumber uint64) ([]*FramesRef, error) { +func (f *Fetcher) fetchBlock(blockNumber uint64) ([]*FramesRef, error) { block, err := f.l1Client.GetBlockByNumber(blockNumber) if err != nil { return nil, err @@ -352,7 +353,7 @@ func (f *Fetcher) fetchBlock(ctx context.Context, blockNumber uint64) ([]*Frames Time: block.Time(), } ti := time.Now() - blobs, err := f.l1BlobFetcher.GetBlobs(ctx, blockRef, hashes) + blobs, err := f.l1BlobFetcher.GetBlobs(utils.GetContext(), blockRef, hashes) if err != nil { logger.Errorf("failed to get blobs: %v", err) return nil, err diff --git a/rpcclient/optimism/client_test.go b/rpcclient/optimism/client_test.go index d4cf14b8..3c24ff9c 100644 --- a/rpcclient/optimism/client_test.go +++ b/rpcclient/optimism/client_test.go @@ -1,7 +1,6 @@ package optimism import ( - "sync" "testing" "time" @@ -22,26 +21,13 @@ func TestErrorHandling(t *testing.T) { require.NoError(t, err) client.SetBeginBlockNumber(10) - mtx := sync.Mutex{} - mtx.Lock() - go func() { - defer mtx.Unlock() - time.Sleep(2 * time.Second) - client.fetcher.StopFetch() - }() - mtx.Lock() + time.Sleep(1 * time.Second) + client.fetcher.StopFetch() // check if able to restart client.SetBeginBlockNumber(50) - mtx.Unlock() - mtx.Lock() - go func() { - defer mtx.Unlock() - time.Sleep(2 * time.Second) - client.fetcher.Stop() - }() - mtx.Lock() - defer mtx.Unlock() + time.Sleep(1 * time.Second) + client.fetcher.Stop() // check error propagation _, err = client.NextBatch() require.Error(t, err)