Skip to content

Commit

Permalink
fix: deadlock in StopFetch (#424)
Browse files Browse the repository at this point in the history
(cherry picked from commit 57906c2)
  • Loading branch information
cool-develope authored and mergify[bot] committed May 22, 2024
1 parent 93f42b1 commit ea77576
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 29 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ test: stop
docker compose -f docker-compose.yml up -d simavs-sync
sleep 3
docker ps -a
trap '$(STOP)' EXIT; go test ./... --timeout=10m
trap '$(STOP)' EXIT; go test ./... --timeout=10m --race
.PHONY: test

run-db-mongo:
Expand Down
9 changes: 4 additions & 5 deletions rpcclient/arbitrum/batch_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (f *Fetcher) Fetch(l1BeginBlockNumber uint64) error {
for _, batch := range batches {
var rawMsg []byte
if batch.serialized[0] == BlobHashesHeaderFlag {
rawMsg, err = f.fetchBlock(f.ctx, batch.BlockNumber, batch.TxHash)
rawMsg, err = f.fetchBlock(batch.BlockNumber, batch.TxHash)
if err != nil {
return err
}
Expand All @@ -163,8 +163,7 @@ func (f *Fetcher) Fetch(l1BeginBlockNumber uint64) error {
if err != nil {
return err
}
_, err := f.sequencerInbox.parseL2Transactions(batch)
if err != nil {
if _, err := f.sequencerInbox.parseL2Transactions(batch); err != nil {
return err
}
batchesRef, err := f.getBatchRef(batch)
Expand Down Expand Up @@ -244,19 +243,19 @@ func (f *Fetcher) StopFetch() {
f.lastSyncedL1BlockNumber.Store(0)
// close L1 fetcher
f.cancel()
<-f.done
// drain channel
for len(f.batchHeaders) > 0 {
<-f.batchHeaders
}
<-f.done // wait for the fetcher to finish

f.cancel = nil
f.ctx = nil
}

// fetchBlock fetches the given block and analyzes the transactions
// which are sent to the BatchInbox EOA.
func (f *Fetcher) fetchBlock(ctx context.Context, blockNumber uint64, txHash common.Hash) ([]byte, error) {
func (f *Fetcher) fetchBlock(blockNumber uint64, txHash common.Hash) ([]byte, error) {
block, err := f.l1Client.GetBlockByNumber(blockNumber)
if err != nil {
return nil, err
Expand Down
33 changes: 33 additions & 0 deletions rpcclient/arbitrum/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package arbitrum

import (
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
)

func TestErrorHandling(t *testing.T) {
cfg := &Config{
RPCURL: "http://localhost:8545",
L1RPCURL: "http://localhost:8545",
BeaconURL: "http://localhost:8545",
BatchInbox: common.Address{}.Hex(),
ConcurrentFetchers: 4,
}
client, err := NewClient(cfg)
require.NoError(t, err)

client.SetBeginBlockNumber(10)
time.Sleep(1 * time.Second)
client.fetcher.StopFetch()

// check if able to restart
client.SetBeginBlockNumber(50)
time.Sleep(1 * time.Second)
client.fetcher.Stop()
// check error propagation
_, err = client.NextBatch()
require.Error(t, err)
}
11 changes: 6 additions & 5 deletions rpcclient/optimism/batch_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/Lagrange-Labs/lagrange-node/rpcclient/types"
sequencerv2types "github.com/Lagrange-Labs/lagrange-node/sequencer/types/v2"
"github.com/Lagrange-Labs/lagrange-node/telemetry"
"github.com/Lagrange-Labs/lagrange-node/utils"
)

const (
Expand Down Expand Up @@ -184,7 +185,7 @@ func (f *Fetcher) Fetch(l1BeginBlockNumber uint64) error {

number := i
g.Go(func() error {
res, err := f.fetchBlock(ctx, number)
res, err := f.fetchBlock(number)
if err != nil {
return err
}
Expand Down Expand Up @@ -283,24 +284,24 @@ func (f *Fetcher) StopFetch() {
f.lastSyncedL1BlockNumber.Store(0)
// close L1 fetcher
f.cancel()
<-f.done
// close batch decoder
close(f.chFramesRef)
for range f.chFramesRef {
}
<-f.done
<-f.done // wait for the batch decoder to finish
// drain channel
for len(f.batchHeaders) > 0 {
<-f.batchHeaders
}
<-f.done // wait for the fetcher to finish

f.cancel = nil
f.ctx = nil
}

// fetchBlock fetches the given block and analyzes the transactions
// which are sent to the BatchInbox EOA.
func (f *Fetcher) fetchBlock(ctx context.Context, blockNumber uint64) ([]*FramesRef, error) {
func (f *Fetcher) fetchBlock(blockNumber uint64) ([]*FramesRef, error) {
block, err := f.l1Client.GetBlockByNumber(blockNumber)
if err != nil {
return nil, err
Expand Down Expand Up @@ -352,7 +353,7 @@ func (f *Fetcher) fetchBlock(ctx context.Context, blockNumber uint64) ([]*Frames
Time: block.Time(),
}
ti := time.Now()
blobs, err := f.l1BlobFetcher.GetBlobs(ctx, blockRef, hashes)
blobs, err := f.l1BlobFetcher.GetBlobs(utils.GetContext(), blockRef, hashes)
if err != nil {
logger.Errorf("failed to get blobs: %v", err)
return nil, err
Expand Down
22 changes: 4 additions & 18 deletions rpcclient/optimism/client_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package optimism

import (
"sync"
"testing"
"time"

Expand All @@ -22,26 +21,13 @@ func TestErrorHandling(t *testing.T) {
require.NoError(t, err)

client.SetBeginBlockNumber(10)
mtx := sync.Mutex{}
mtx.Lock()
go func() {
defer mtx.Unlock()
time.Sleep(2 * time.Second)
client.fetcher.StopFetch()
}()
mtx.Lock()
time.Sleep(1 * time.Second)
client.fetcher.StopFetch()

// check if able to restart
client.SetBeginBlockNumber(50)
mtx.Unlock()
mtx.Lock()
go func() {
defer mtx.Unlock()
time.Sleep(2 * time.Second)
client.fetcher.Stop()
}()
mtx.Lock()
defer mtx.Unlock()
time.Sleep(1 * time.Second)
client.fetcher.Stop()
// check error propagation
_, err = client.NextBatch()
require.Error(t, err)
Expand Down

0 comments on commit ea77576

Please sign in to comment.