Skip to content

Commit

Permalink
fix: the deadlock of the client storing (#433)
Browse files Browse the repository at this point in the history
  • Loading branch information
cool-develope authored May 28, 2024
1 parent 8503f55 commit 2f267b4
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 14 deletions.
25 changes: 21 additions & 4 deletions network/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type Client struct {
pullInterval time.Duration
committeeCache *lru.Cache[uint64, *committee.ILagrangeCommitteeCommitteeData]
openL1BlockNumber atomic.Uint64
isSetBeginBlockNumber atomic.Bool

db *goleveldb.DB
chErr chan error
Expand Down Expand Up @@ -348,7 +349,10 @@ func (c *Client) initBeginBlockNumber(blockNumber uint64) error {
blockNumber = lastStoredBlockNumber
}
}
c.rpcClient.SetBeginBlockNumber(blockNumber)

res := c.rpcClient.SetBeginBlockNumber(blockNumber)
c.isSetBeginBlockNumber.Store(res)

return nil
}

Expand All @@ -361,13 +365,26 @@ func (c *Client) startBatchFetching() {
c.chErr <- err
return
}
telemetry.SetGauge(float64(batch.L1BlockNumber), "client", "get_batch_l1_block_number")
logger.Infof("got the batch the L1 block number %d", batch.L1BlockNumber)
telemetry.SetGauge(float64(batch.L1BlockNumber), "client", "fetch_batch_l1_block_number")
logger.Infof("fetch the batch with L1 block number %d", batch.L1BlockNumber)

// block the writeBatchHeader if the batch is too far from the current block
for openBlockNumber := c.openL1BlockNumber.Load(); openBlockNumber > 0 && openBlockNumber+PruningBlocks/4 < batch.L1BlockNumber; openBlockNumber = c.openL1BlockNumber.Load() {
if c.isSetBeginBlockNumber.Load() {
break
}
time.Sleep(1 * time.Second)
}
openL1BlockNumber := c.openL1BlockNumber.Load()
if openL1BlockNumber > 0 && openL1BlockNumber+PruningBlocks/4 < batch.L1BlockNumber {
logger.Infof("Rolling back the batch fetching to the block number %d", c.openL1BlockNumber.Load())
} else if openL1BlockNumber > 0 && openL1BlockNumber+PruningBlocks/2 < batch.L1BlockNumber {
logger.Warnf("The batch %d fetching is too far from the current block number %d", batch.L1BlockNumber, openL1BlockNumber)
continue
} else if openL1BlockNumber > 0 && openL1BlockNumber+PruningBlocks/4 > batch.L1BlockNumber {
c.isSetBeginBlockNumber.Store(false)
}

if err := c.writeBatchHeader(batch); err != nil {
logger.Errorf("failed to write the batch header: %v", err)
c.chErr <- err
Expand Down Expand Up @@ -483,7 +500,7 @@ func (c *Client) TryGetBatch() (*sequencerv2types.Batch, error) {
fromBlockNumber := batch.BatchHeader.FromBlockNumber()
toBlockNumber := batch.BatchHeader.ToBlockNumber()
c.openL1BlockNumber.Store(batch.L1BlockNumber())
logger.Infof("got the batch the block number from %d to %d", fromBlockNumber, toBlockNumber)
logger.Infof("get the batch with L2 block number from %d to %d", fromBlockNumber, toBlockNumber)

// verify the L1 block number
batchHeader, err := c.getBatchHeader(batch.L1BlockNumber(), fromBlockNumber)
Expand Down
3 changes: 2 additions & 1 deletion network/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ func (m *mockRPC) GetChainID() (uint32, error) {
return 0, nil
}

func (m *mockRPC) SetBeginBlockNumber(l1BlockNumber uint64) {
func (m *mockRPC) SetBeginBlockNumber(l1BlockNumber uint64) bool {
m.chBeginBlockNumber <- l1BlockNumber
return true
}

func (m *mockRPC) NextBatch() (*sequencerv2types.BatchHeader, error) {
Expand Down
7 changes: 6 additions & 1 deletion rpcclient/arbitrum/batch_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,16 @@ func (f *Fetcher) StopFetch() {
f.lastSyncedL1BlockNumber.Store(0)
// close L1 fetcher
f.cancel()
// drain channel
// drain channel, if the `batchHeaders` channel is full, it will block the fetcher
// and the fetcher will not stop.
for len(f.batchHeaders) > 0 {
<-f.batchHeaders
}
<-f.done // wait for the fetcher to finish
// drain channel to clean up the batches while stopping the fetcher
for len(f.batchHeaders) > 0 {
<-f.batchHeaders
}

f.cancel = nil
f.ctx = nil
Expand Down
7 changes: 5 additions & 2 deletions rpcclient/arbitrum/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ func NewClient(cfg *Config) (*Client, error) {
}

// SetBeginBlockNumber sets the begin L1 & L2 block number.
func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) {
func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) bool {
lastSyncedL1BlockNumber := c.fetcher.GetFetchedBlockNumber()
// TODO: if `l1BlockNumber` is within channel range, we don't need to fetch again.
if lastSyncedL1BlockNumber > 0 && lastSyncedL1BlockNumber+ParallelBlocks > l1BlockNumber && l1BlockNumber > lastSyncedL1BlockNumber-ParallelBlocks {
return
return false
}
c.fetcher.StopFetch()
logger.Infof("last synced L1 block number: %d, begin L1 block number: %d", lastSyncedL1BlockNumber, l1BlockNumber)
Expand All @@ -51,6 +52,8 @@ func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) {
c.fetcher.Stop()
}
}()

return true
}

// NextBatch returns the next batch header after SetBeginBlockNumber.
Expand Down
4 changes: 3 additions & 1 deletion rpcclient/mantle/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ func (c *Client) GetFinalizedBlockNumber() (uint64, error) {
}

// SetBeginBlockNumber sets the begin L1 & L2 block number.
func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) {}
func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) bool {
return true
}

// NextBatch returns the next batch after SetBeginBlockNumber.
func (c *Client) NextBatch() (*sequencerv2types.BatchHeader, error) {
Expand Down
3 changes: 2 additions & 1 deletion rpcclient/mock/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ func (c *Client) GetFinalizedBlockNumber() (uint64, error) {
}

// SetBeginBlockNumber sets the begin L1 & L2 block number.
func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) {
func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) bool {
c.fromL1BlockNumber = l1BlockNumber
return true
}

// NextBatch returns the next batch after SetBeginBlockNumber.
Expand Down
7 changes: 6 additions & 1 deletion rpcclient/optimism/batch_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,16 @@ func (f *Fetcher) StopFetch() {
for range f.chFramesRef {
}
<-f.done // wait for the batch decoder to finish
// drain channel
// drain channel, if the `batchHeaders` channel is full, it will block the fetcher
// and the fetcher will not stop.
for len(f.batchHeaders) > 0 {
<-f.batchHeaders
}
<-f.done // wait for the fetcher to finish
// drain channel to clean up the batches while stopping the fetcher
for len(f.batchHeaders) > 0 {
<-f.batchHeaders
}

f.cancel = nil
f.ctx = nil
Expand Down
6 changes: 4 additions & 2 deletions rpcclient/optimism/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ func NewClient(cfg *Config) (*Client, error) {
}

// SetBeginBlockNumber sets the begin L1 & L2 block number.
func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) {
func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) bool {
lastSyncedL1BlockNumber := c.fetcher.GetFetchedBlockNumber()
if lastSyncedL1BlockNumber > 0 && lastSyncedL1BlockNumber+ParallelBlocks > l1BlockNumber && l1BlockNumber > lastSyncedL1BlockNumber-ParallelBlocks {
return
return false
}
c.fetcher.StopFetch()
logger.Infof("last synced L1 block number: %d, begin L1 block number: %d", lastSyncedL1BlockNumber, l1BlockNumber)
Expand All @@ -51,6 +51,8 @@ func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) {
c.fetcher.Stop()
}
}()

return true
}

// NextBatch returns the next batch header after SetBeginBlockNumber.
Expand Down
2 changes: 1 addition & 1 deletion rpcclient/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type RpcClient interface {
// GetChainID returns the chain ID.
GetChainID() (uint32, error)
// SetBeginBlockNumber sets the begin L1 block number.
SetBeginBlockNumber(l1BlockNumber uint64)
SetBeginBlockNumber(l1BlockNumber uint64) bool
// NextBatch returns the next batch after SetBeginBlockNumber.
NextBatch() (*sequencerv2types.BatchHeader, error)
}
Expand Down

0 comments on commit 2f267b4

Please sign in to comment.