From 9603407407201045913ede81125e030dda3dd3f0 Mon Sep 17 00:00:00 2001 From: zjubfd <296179868@qq.com> Date: Tue, 9 Nov 2021 19:39:58 +0800 Subject: [PATCH] fix useful difflayer item in cache been prune issue (#527) add logs fix useful difflayer item in cache been prune issue fix too many open files --- cmd/utils/flags.go | 4 +- core/blockchain.go | 42 +++++-------------- core/blockchain_diff_test.go | 2 +- core/state_processor.go | 13 +++--- eth/downloader/downloader.go | 68 +++++++++++++++++++------------ eth/downloader/downloader_test.go | 6 +-- eth/downloader/queue.go | 6 ++- eth/downloader/resultstore.go | 4 +- eth/ethconfig/config.go | 2 +- eth/handler_diff.go | 1 - eth/protocols/diff/handler.go | 2 +- node/node.go | 10 ++++- 12 files changed, 80 insertions(+), 80 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index c4dbd84911..49ea0d1de1 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -444,8 +444,8 @@ var ( } DiffBlockFlag = cli.Uint64Flag{ Name: "diffblock", - Usage: "The number of blocks should be persisted in db (default = 864000 )", - Value: uint64(864000), + Usage: "The number of blocks should be persisted in db (default = 86400)", + Value: uint64(86400), } // Miner settings MiningEnabledFlag = cli.BoolFlag{ diff --git a/core/blockchain.go b/core/blockchain.go index 22fe3e5998..4562061337 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -478,6 +478,9 @@ func (bc *BlockChain) cacheReceipts(hash common.Hash, receipts types.Receipts) { } func (bc *BlockChain) cacheDiffLayer(diffLayer *types.DiffLayer) { + if bc.diffLayerCache.Len() >= diffLayerCacheLimit { + bc.diffLayerCache.RemoveOldest() + } bc.diffLayerCache.Add(diffLayer.BlockHash, diffLayer) if bc.db.DiffStore() != nil { // push to priority queue before persisting @@ -2618,34 +2621,6 @@ func (bc *BlockChain) removeDiffLayers(diffHash common.Hash) { } } -func (bc *BlockChain) RemoveDiffPeer(pid string) { - bc.diffMux.Lock() - defer bc.diffMux.Unlock() - if invaliDiffHashes := bc.diffPeersToDiffHashes[pid]; invaliDiffHashes != nil { - for invalidDiffHash := range invaliDiffHashes { - lastDiffHash := false - if peers, ok := bc.diffHashToPeers[invalidDiffHash]; ok { - delete(peers, pid) - if len(peers) == 0 { - lastDiffHash = true - delete(bc.diffHashToPeers, invalidDiffHash) - } - } - if lastDiffHash { - affectedBlockHash := bc.diffHashToBlockHash[invalidDiffHash] - if diffs, exist := bc.blockHashToDiffLayers[affectedBlockHash]; exist { - delete(diffs, invalidDiffHash) - if len(diffs) == 0 { - delete(bc.blockHashToDiffLayers, affectedBlockHash) - } - } - delete(bc.diffHashToBlockHash, invalidDiffHash) - } - } - delete(bc.diffPeersToDiffHashes, pid) - } -} - func (bc *BlockChain) untrustedDiffLayerPruneLoop() { recheck := time.NewTicker(diffLayerPruneRecheckInterval) bc.wg.Add(1) @@ -2713,24 +2688,27 @@ func (bc *BlockChain) HandleDiffLayer(diffLayer *types.DiffLayer, pid string, fu // Basic check currentHeight := bc.CurrentBlock().NumberU64() if diffLayer.Number > currentHeight && diffLayer.Number-currentHeight > maxDiffQueueDist { - log.Error("diff layers too new from current", "pid", pid) + log.Debug("diff layers too new from current", "pid", pid) return nil } if diffLayer.Number < currentHeight && currentHeight-diffLayer.Number > maxDiffForkDist { - log.Error("diff layers too old from current", "pid", pid) + log.Debug("diff layers too old from current", "pid", pid) return nil } bc.diffMux.Lock() defer bc.diffMux.Unlock() + if blockHash, exist := bc.diffHashToBlockHash[diffLayer.DiffHash]; exist && blockHash == diffLayer.BlockHash { + return nil + } if !fulfilled && len(bc.diffPeersToDiffHashes[pid]) > maxDiffLimitForBroadcast { - log.Error("too many accumulated diffLayers", "pid", pid) + log.Debug("too many accumulated diffLayers", "pid", pid) return nil } if len(bc.diffPeersToDiffHashes[pid]) > maxDiffLimit { - log.Error("too many accumulated diffLayers", "pid", pid) + log.Debug("too many accumulated diffLayers", "pid", pid) return nil } if _, exist := bc.diffPeersToDiffHashes[pid]; exist { diff --git a/core/blockchain_diff_test.go b/core/blockchain_diff_test.go index 67affcff75..8e04363d0c 100644 --- a/core/blockchain_diff_test.go +++ b/core/blockchain_diff_test.go @@ -322,7 +322,7 @@ func TestPruneDiffLayer(t *testing.T) { if len(fullBackend.chain.diffNumToBlockHashes) != maxDiffForkDist { t.Error("unexpected size of diffNumToBlockHashes") } - if len(fullBackend.chain.diffPeersToDiffHashes) != 2 { + if len(fullBackend.chain.diffPeersToDiffHashes) != 1 { t.Error("unexpected size of diffPeersToDiffHashes") } if len(fullBackend.chain.blockHashToDiffLayers) != maxDiffForkDist { diff --git a/core/state_processor.go b/core/state_processor.go index b529082063..9f2a09bd36 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -43,8 +43,8 @@ import ( const ( fullProcessCheck = 21 // On diff sync mode, will do full process every fullProcessCheck randomly - recentTime = 2048 * 3 - recentDiffLayerTimeout = 20 + recentTime = 1024 * 3 + recentDiffLayerTimeout = 5 farDiffLayerTimeout = 2 ) @@ -68,15 +68,16 @@ func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consen } type LightStateProcessor struct { - randomGenerator *rand.Rand + check int64 StateProcessor } func NewLightStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *LightStateProcessor { randomGenerator := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) + check := randomGenerator.Int63n(fullProcessCheck) return &LightStateProcessor{ - randomGenerator: randomGenerator, - StateProcessor: *NewStateProcessor(config, bc, engine), + check: check, + StateProcessor: *NewStateProcessor(config, bc, engine), } } @@ -86,7 +87,7 @@ func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB allowLightProcess = posa.AllowLightProcess(p.bc, block.Header()) } // random fallback to full process - if check := p.randomGenerator.Int63n(fullProcessCheck); allowLightProcess && check != 0 && len(block.Transactions()) != 0 { + if allowLightProcess && block.NumberU64()%fullProcessCheck != uint64(p.check) && len(block.Transactions()) != 0 { var pid string if peer, ok := block.ReceivedFrom.(PeerIDer); ok { pid = peer.ID() diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 5da80c9d3a..0009f41786 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -53,6 +53,9 @@ var ( ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts + diffFetchTick = 10 * time.Millisecond + diffFetchLimit = 5 + qosTuningPeers = 5 // Number of peers to tune based on (best peers) qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value @@ -161,10 +164,10 @@ type Downloader struct { quitLock sync.Mutex // Lock to prevent double closes // Testing hooks - syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run - bodyFetchHook func([]*types.Header, ...interface{}) // Method to call upon starting a block body fetch - receiptFetchHook func([]*types.Header, ...interface{}) // Method to call upon starting a receipt fetch - chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) + syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run + bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch + receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch + chainInsertHook func([]*fetchResult, chan struct{}) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) } // LightChain encapsulates functions required to synchronise a light chain. @@ -232,27 +235,35 @@ type IPeerSet interface { func EnableDiffFetchOp(peers IPeerSet) DownloadOption { return func(dl *Downloader) *Downloader { - var hook = func(headers []*types.Header, args ...interface{}) { - if len(args) < 2 { - return - } - peerID, ok := args[1].(string) - if !ok { - return - } - mode, ok := args[0].(SyncMode) - if !ok { - return - } - if ep := peers.GetDiffPeer(peerID); mode == FullSync && ep != nil { - hashes := make([]common.Hash, 0, len(headers)) - for _, header := range headers { - hashes = append(hashes, header.Hash()) - } - ep.RequestDiffLayers(hashes) + var hook = func(results []*fetchResult, stop chan struct{}) { + if dl.getMode() == FullSync { + go func() { + ticker := time.NewTicker(diffFetchTick) + defer ticker.Stop() + for _, r := range results { + Wait: + for { + select { + case <-stop: + return + case <-ticker.C: + if dl.blockchain.CurrentHeader().Number.Int64()+int64(diffFetchLimit) > r.Header.Number.Int64() { + break Wait + } + } + } + if ep := peers.GetDiffPeer(r.pid); ep != nil { + // It turns out a diff layer is 5x larger than block, we just request one diffLayer each time + err := ep.RequestDiffLayers([]common.Hash{r.Header.Hash()}) + if err != nil { + return + } + } + } + }() } } - dl.bodyFetchHook = hook + dl.chainInsertHook = hook return dl } } @@ -1405,7 +1416,7 @@ func (d *Downloader) fetchReceipts(from uint64) error { // - kind: textual label of the type being downloaded to display in log messages func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool), - fetchHook func([]*types.Header, ...interface{}), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int, + fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int, idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error { // Create a ticker to detect expired retrieval tasks @@ -1554,7 +1565,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) } // Fetch the chunk and make sure any errors return the hashes to the queue if fetchHook != nil { - fetchHook(request.Headers, d.getMode(), peer.id) + fetchHook(request.Headers) } if err := fetch(peer, request); err != nil { // Although we could try and make an attempt to fix this, this error really @@ -1759,12 +1770,15 @@ func (d *Downloader) processFullSyncContent() error { if len(results) == 0 { return nil } + stop := make(chan struct{}) if d.chainInsertHook != nil { - d.chainInsertHook(results) + d.chainInsertHook(results, stop) } if err := d.importBlockResults(results); err != nil { + close(stop) return err } + close(stop) } } @@ -1850,7 +1864,7 @@ func (d *Downloader) processFastSyncContent() error { } } if d.chainInsertHook != nil { - d.chainInsertHook(results) + d.chainInsertHook(results, nil) } // If we haven't downloaded the pivot block yet, check pivot staleness // notifications from the header downloader diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 66f6872025..4b76b6a5db 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -565,7 +565,7 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) { // Wrap the importer to allow stepping blocked, proceed := uint32(0), make(chan struct{}) - tester.downloader.chainInsertHook = func(results []*fetchResult) { + tester.downloader.chainInsertHook = func(results []*fetchResult, _ chan struct{}) { atomic.StoreUint32(&blocked, uint32(len(results))) <-proceed } @@ -921,10 +921,10 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) { // Instrument the downloader to signal body requests bodiesHave, receiptsHave := int32(0), int32(0) - tester.downloader.bodyFetchHook = func(headers []*types.Header, _ ...interface{}) { + tester.downloader.bodyFetchHook = func(headers []*types.Header) { atomic.AddInt32(&bodiesHave, int32(len(headers))) } - tester.downloader.receiptFetchHook = func(headers []*types.Header, _ ...interface{}) { + tester.downloader.receiptFetchHook = func(headers []*types.Header) { atomic.AddInt32(&receiptsHave, int32(len(headers))) } // Synchronise with the peer and make sure all blocks were retrieved diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index ac7edc2c68..3b01984da5 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -63,6 +63,7 @@ type fetchRequest struct { // all outstanding pieces complete and the result as a whole can be processed. type fetchResult struct { pending int32 // Flag telling what deliveries are outstanding + pid string Header *types.Header Uncles []*types.Header @@ -70,9 +71,10 @@ type fetchResult struct { Receipts types.Receipts } -func newFetchResult(header *types.Header, fastSync bool) *fetchResult { +func newFetchResult(header *types.Header, fastSync bool, pid string) *fetchResult { item := &fetchResult{ Header: header, + pid: pid, } if !header.EmptyBody() { item.pending |= (1 << bodyType) @@ -503,7 +505,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common // we can ask the resultcache if this header is within the // "prioritized" segment of blocks. If it is not, we need to throttle - stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == FastSync) + stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == FastSync, p.id) if stale { // Don't put back in the task queue, this item has already been // delivered upstream diff --git a/eth/downloader/resultstore.go b/eth/downloader/resultstore.go index 21928c2a00..98ce75593b 100644 --- a/eth/downloader/resultstore.go +++ b/eth/downloader/resultstore.go @@ -75,7 +75,7 @@ func (r *resultStore) SetThrottleThreshold(threshold uint64) uint64 { // throttled - if true, the store is at capacity, this particular header is not prio now // item - the result to store data into // err - any error that occurred -func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, throttled bool, item *fetchResult, err error) { +func (r *resultStore) AddFetch(header *types.Header, fastSync bool, pid string) (stale, throttled bool, item *fetchResult, err error) { r.lock.Lock() defer r.lock.Unlock() @@ -85,7 +85,7 @@ func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, thro return stale, throttled, item, err } if item == nil { - item = newFetchResult(header, fastSync) + item = newFetchResult(header, fastSync, pid) r.items[index] = item } return stale, throttled, item, err diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 83db7fc998..010a1fb923 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -80,7 +80,7 @@ var Defaults = Config{ TrieTimeout: 60 * time.Minute, TriesInMemory: 128, SnapshotCache: 102, - DiffBlock: uint64(864000), + DiffBlock: uint64(86400), Miner: miner.Config{ GasFloor: 8000000, GasCeil: 8000000, diff --git a/eth/handler_diff.go b/eth/handler_diff.go index a5fbfdb0a8..6d5436bf9a 100644 --- a/eth/handler_diff.go +++ b/eth/handler_diff.go @@ -47,7 +47,6 @@ func (h *diffHandler) RunPeer(peer *diff.Peer, hand diff.Handler) error { ps.lock.Unlock() return err } - defer h.chain.RemoveDiffPeer(peer.ID()) return (*handler)(h).runDiffExtension(peer, hand) } diff --git a/eth/protocols/diff/handler.go b/eth/protocols/diff/handler.go index 8678ff7f65..18ec4e2541 100644 --- a/eth/protocols/diff/handler.go +++ b/eth/protocols/diff/handler.go @@ -17,7 +17,7 @@ const ( softResponseLimit = 2 * 1024 * 1024 // maxDiffLayerServe is the maximum number of diff layers to serve. - maxDiffLayerServe = 1024 + maxDiffLayerServe = 128 ) var requestTracker = NewTracker(time.Minute) diff --git a/node/node.go b/node/node.go index f1564bea23..c122dad32c 100644 --- a/node/node.go +++ b/node/node.go @@ -68,6 +68,8 @@ const ( closedState ) +const chainDataHandlesPercentage = 80 + // New creates a new P2P node, ready for protocol registration. func New(conf *Config) (*Node, error) { // Copy config and resolve the datadir so future changes to the current @@ -580,12 +582,16 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r } func (n *Node) OpenAndMergeDatabase(name string, cache, handles int, freezer, diff, namespace string, readonly, persistDiff bool) (ethdb.Database, error) { - chainDB, err := n.OpenDatabaseWithFreezer(name, cache, handles, freezer, namespace, readonly) + chainDataHandles := handles + if persistDiff { + chainDataHandles = handles * chainDataHandlesPercentage / 100 + } + chainDB, err := n.OpenDatabaseWithFreezer(name, cache, chainDataHandles, freezer, namespace, readonly) if err != nil { return nil, err } if persistDiff { - diffStore, err := n.OpenDiffDatabase(name, handles, diff, namespace, readonly) + diffStore, err := n.OpenDiffDatabase(name, handles-chainDataHandles, diff, namespace, readonly) if err != nil { chainDB.Close() return nil, err