From 2f267b4110a8d26bb450c1fd90cf354e645ef693 Mon Sep 17 00:00:00 2001 From: cool-developer <51834436+cool-develope@users.noreply.github.com> Date: Tue, 28 May 2024 14:29:02 -0400 Subject: [PATCH] fix: the deadlock of the client storing (#433) --- network/client.go | 25 +++++++++++++++++++++---- network/client_test.go | 3 ++- rpcclient/arbitrum/batch_fetcher.go | 7 ++++++- rpcclient/arbitrum/client.go | 7 +++++-- rpcclient/mantle/client.go | 4 +++- rpcclient/mock/client.go | 3 ++- rpcclient/optimism/batch_fetcher.go | 7 ++++++- rpcclient/optimism/client.go | 6 ++++-- rpcclient/types/interface.go | 2 +- 9 files changed, 50 insertions(+), 14 deletions(-) diff --git a/network/client.go b/network/client.go index c86c53ad..3542095c 100644 --- a/network/client.go +++ b/network/client.go @@ -66,6 +66,7 @@ type Client struct { pullInterval time.Duration committeeCache *lru.Cache[uint64, *committee.ILagrangeCommitteeCommitteeData] openL1BlockNumber atomic.Uint64 + isSetBeginBlockNumber atomic.Bool db *goleveldb.DB chErr chan error @@ -348,7 +349,10 @@ func (c *Client) initBeginBlockNumber(blockNumber uint64) error { blockNumber = lastStoredBlockNumber } } - c.rpcClient.SetBeginBlockNumber(blockNumber) + + res := c.rpcClient.SetBeginBlockNumber(blockNumber) + c.isSetBeginBlockNumber.Store(res) + return nil } @@ -361,13 +365,26 @@ func (c *Client) startBatchFetching() { c.chErr <- err return } - telemetry.SetGauge(float64(batch.L1BlockNumber), "client", "get_batch_l1_block_number") - logger.Infof("got the batch the L1 block number %d", batch.L1BlockNumber) + telemetry.SetGauge(float64(batch.L1BlockNumber), "client", "fetch_batch_l1_block_number") + logger.Infof("fetch the batch with L1 block number %d", batch.L1BlockNumber) // block the writeBatchHeader if the batch is too far from the current block for openBlockNumber := c.openL1BlockNumber.Load(); openBlockNumber > 0 && openBlockNumber+PruningBlocks/4 < batch.L1BlockNumber; openBlockNumber = c.openL1BlockNumber.Load() { + if c.isSetBeginBlockNumber.Load() { + break + } time.Sleep(1 * time.Second) } + openL1BlockNumber := c.openL1BlockNumber.Load() + if openL1BlockNumber > 0 && openL1BlockNumber+PruningBlocks/4 < batch.L1BlockNumber { + logger.Infof("Rolling back the batch fetching to the block number %d", c.openL1BlockNumber.Load()) + } else if openL1BlockNumber > 0 && openL1BlockNumber+PruningBlocks/2 < batch.L1BlockNumber { + logger.Warnf("The batch %d fetching is too far from the current block number %d", batch.L1BlockNumber, openL1BlockNumber) + continue + } else if openL1BlockNumber > 0 && openL1BlockNumber+PruningBlocks/4 > batch.L1BlockNumber { + c.isSetBeginBlockNumber.Store(false) + } + if err := c.writeBatchHeader(batch); err != nil { logger.Errorf("failed to write the batch header: %v", err) c.chErr <- err @@ -483,7 +500,7 @@ func (c *Client) TryGetBatch() (*sequencerv2types.Batch, error) { fromBlockNumber := batch.BatchHeader.FromBlockNumber() toBlockNumber := batch.BatchHeader.ToBlockNumber() c.openL1BlockNumber.Store(batch.L1BlockNumber()) - logger.Infof("got the batch the block number from %d to %d", fromBlockNumber, toBlockNumber) + logger.Infof("get the batch with L2 block number from %d to %d", fromBlockNumber, toBlockNumber) // verify the L1 block number batchHeader, err := c.getBatchHeader(batch.L1BlockNumber(), fromBlockNumber) diff --git a/network/client_test.go b/network/client_test.go index ed553513..1d166cb4 100644 --- a/network/client_test.go +++ b/network/client_test.go @@ -62,8 +62,9 @@ func (m *mockRPC) GetChainID() (uint32, error) { return 0, nil } -func (m *mockRPC) SetBeginBlockNumber(l1BlockNumber uint64) { +func (m *mockRPC) SetBeginBlockNumber(l1BlockNumber uint64) bool { m.chBeginBlockNumber <- l1BlockNumber + return true } func (m *mockRPC) NextBatch() (*sequencerv2types.BatchHeader, error) { diff --git a/rpcclient/arbitrum/batch_fetcher.go b/rpcclient/arbitrum/batch_fetcher.go index 2bcbca1e..b710e476 100644 --- a/rpcclient/arbitrum/batch_fetcher.go +++ b/rpcclient/arbitrum/batch_fetcher.go @@ -243,11 +243,16 @@ func (f *Fetcher) StopFetch() { f.lastSyncedL1BlockNumber.Store(0) // close L1 fetcher f.cancel() - // drain channel + // drain channel, if the `batchHeaders` channel is full, it will block the fetcher + // and the fetcher will not stop. for len(f.batchHeaders) > 0 { <-f.batchHeaders } <-f.done // wait for the fetcher to finish + // drain channel to clean up the batches while stopping the fetcher + for len(f.batchHeaders) > 0 { + <-f.batchHeaders + } f.cancel = nil f.ctx = nil diff --git a/rpcclient/arbitrum/client.go b/rpcclient/arbitrum/client.go index 9b36a3c8..71935ee6 100644 --- a/rpcclient/arbitrum/client.go +++ b/rpcclient/arbitrum/client.go @@ -35,10 +35,11 @@ func NewClient(cfg *Config) (*Client, error) { } // SetBeginBlockNumber sets the begin L1 & L2 block number. -func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) { +func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) bool { lastSyncedL1BlockNumber := c.fetcher.GetFetchedBlockNumber() + // TODO: if `l1BlockNumber` is within channel range, we don't need to fetch again. if lastSyncedL1BlockNumber > 0 && lastSyncedL1BlockNumber+ParallelBlocks > l1BlockNumber && l1BlockNumber > lastSyncedL1BlockNumber-ParallelBlocks { - return + return false } c.fetcher.StopFetch() logger.Infof("last synced L1 block number: %d, begin L1 block number: %d", lastSyncedL1BlockNumber, l1BlockNumber) @@ -51,6 +52,8 @@ func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) { c.fetcher.Stop() } }() + + return true } // NextBatch returns the next batch header after SetBeginBlockNumber. diff --git a/rpcclient/mantle/client.go b/rpcclient/mantle/client.go index a9460876..aea3209d 100644 --- a/rpcclient/mantle/client.go +++ b/rpcclient/mantle/client.go @@ -100,7 +100,9 @@ func (c *Client) GetFinalizedBlockNumber() (uint64, error) { } // SetBeginBlockNumber sets the begin L1 & L2 block number. -func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) {} +func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) bool { + return true +} // NextBatch returns the next batch after SetBeginBlockNumber. func (c *Client) NextBatch() (*sequencerv2types.BatchHeader, error) { diff --git a/rpcclient/mock/client.go b/rpcclient/mock/client.go index 722b7a2b..fa32d9ac 100644 --- a/rpcclient/mock/client.go +++ b/rpcclient/mock/client.go @@ -63,8 +63,9 @@ func (c *Client) GetFinalizedBlockNumber() (uint64, error) { } // SetBeginBlockNumber sets the begin L1 & L2 block number. -func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) { +func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) bool { c.fromL1BlockNumber = l1BlockNumber + return true } // NextBatch returns the next batch after SetBeginBlockNumber. diff --git a/rpcclient/optimism/batch_fetcher.go b/rpcclient/optimism/batch_fetcher.go index 24ad11ad..5bc88bae 100644 --- a/rpcclient/optimism/batch_fetcher.go +++ b/rpcclient/optimism/batch_fetcher.go @@ -289,11 +289,16 @@ func (f *Fetcher) StopFetch() { for range f.chFramesRef { } <-f.done // wait for the batch decoder to finish - // drain channel + // drain channel, if the `batchHeaders` channel is full, it will block the fetcher + // and the fetcher will not stop. for len(f.batchHeaders) > 0 { <-f.batchHeaders } <-f.done // wait for the fetcher to finish + // drain channel to clean up the batches while stopping the fetcher + for len(f.batchHeaders) > 0 { + <-f.batchHeaders + } f.cancel = nil f.ctx = nil diff --git a/rpcclient/optimism/client.go b/rpcclient/optimism/client.go index d347df0f..7f0007b8 100644 --- a/rpcclient/optimism/client.go +++ b/rpcclient/optimism/client.go @@ -35,10 +35,10 @@ func NewClient(cfg *Config) (*Client, error) { } // SetBeginBlockNumber sets the begin L1 & L2 block number. -func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) { +func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) bool { lastSyncedL1BlockNumber := c.fetcher.GetFetchedBlockNumber() if lastSyncedL1BlockNumber > 0 && lastSyncedL1BlockNumber+ParallelBlocks > l1BlockNumber && l1BlockNumber > lastSyncedL1BlockNumber-ParallelBlocks { - return + return false } c.fetcher.StopFetch() logger.Infof("last synced L1 block number: %d, begin L1 block number: %d", lastSyncedL1BlockNumber, l1BlockNumber) @@ -51,6 +51,8 @@ func (c *Client) SetBeginBlockNumber(l1BlockNumber uint64) { c.fetcher.Stop() } }() + + return true } // NextBatch returns the next batch header after SetBeginBlockNumber. diff --git a/rpcclient/types/interface.go b/rpcclient/types/interface.go index b1b321c5..dae92b5f 100644 --- a/rpcclient/types/interface.go +++ b/rpcclient/types/interface.go @@ -27,7 +27,7 @@ type RpcClient interface { // GetChainID returns the chain ID. GetChainID() (uint32, error) // SetBeginBlockNumber sets the begin L1 block number. - SetBeginBlockNumber(l1BlockNumber uint64) + SetBeginBlockNumber(l1BlockNumber uint64) bool // NextBatch returns the next batch after SetBeginBlockNumber. NextBatch() (*sequencerv2types.BatchHeader, error) }