Skip to content

Commit

Permalink
fix useful difflayer item in cache been prune issue (ethereum#527)
Browse files Browse the repository at this point in the history
add logs

fix useful difflayer item in cache been prune issue

fix too many open files
  • Loading branch information
unclezoro authored Nov 9, 2021
1 parent 176407a commit 9603407
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 80 deletions.
4 changes: 2 additions & 2 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,8 @@ var (
}
DiffBlockFlag = cli.Uint64Flag{
Name: "diffblock",
Usage: "The number of blocks should be persisted in db (default = 864000 )",
Value: uint64(864000),
Usage: "The number of blocks should be persisted in db (default = 86400)",
Value: uint64(86400),
}
// Miner settings
MiningEnabledFlag = cli.BoolFlag{
Expand Down
42 changes: 10 additions & 32 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,9 @@ func (bc *BlockChain) cacheReceipts(hash common.Hash, receipts types.Receipts) {
}

func (bc *BlockChain) cacheDiffLayer(diffLayer *types.DiffLayer) {
if bc.diffLayerCache.Len() >= diffLayerCacheLimit {
bc.diffLayerCache.RemoveOldest()
}
bc.diffLayerCache.Add(diffLayer.BlockHash, diffLayer)
if bc.db.DiffStore() != nil {
// push to priority queue before persisting
Expand Down Expand Up @@ -2618,34 +2621,6 @@ func (bc *BlockChain) removeDiffLayers(diffHash common.Hash) {
}
}

func (bc *BlockChain) RemoveDiffPeer(pid string) {
bc.diffMux.Lock()
defer bc.diffMux.Unlock()
if invaliDiffHashes := bc.diffPeersToDiffHashes[pid]; invaliDiffHashes != nil {
for invalidDiffHash := range invaliDiffHashes {
lastDiffHash := false
if peers, ok := bc.diffHashToPeers[invalidDiffHash]; ok {
delete(peers, pid)
if len(peers) == 0 {
lastDiffHash = true
delete(bc.diffHashToPeers, invalidDiffHash)
}
}
if lastDiffHash {
affectedBlockHash := bc.diffHashToBlockHash[invalidDiffHash]
if diffs, exist := bc.blockHashToDiffLayers[affectedBlockHash]; exist {
delete(diffs, invalidDiffHash)
if len(diffs) == 0 {
delete(bc.blockHashToDiffLayers, affectedBlockHash)
}
}
delete(bc.diffHashToBlockHash, invalidDiffHash)
}
}
delete(bc.diffPeersToDiffHashes, pid)
}
}

func (bc *BlockChain) untrustedDiffLayerPruneLoop() {
recheck := time.NewTicker(diffLayerPruneRecheckInterval)
bc.wg.Add(1)
Expand Down Expand Up @@ -2713,24 +2688,27 @@ func (bc *BlockChain) HandleDiffLayer(diffLayer *types.DiffLayer, pid string, fu
// Basic check
currentHeight := bc.CurrentBlock().NumberU64()
if diffLayer.Number > currentHeight && diffLayer.Number-currentHeight > maxDiffQueueDist {
log.Error("diff layers too new from current", "pid", pid)
log.Debug("diff layers too new from current", "pid", pid)
return nil
}
if diffLayer.Number < currentHeight && currentHeight-diffLayer.Number > maxDiffForkDist {
log.Error("diff layers too old from current", "pid", pid)
log.Debug("diff layers too old from current", "pid", pid)
return nil
}

bc.diffMux.Lock()
defer bc.diffMux.Unlock()
if blockHash, exist := bc.diffHashToBlockHash[diffLayer.DiffHash]; exist && blockHash == diffLayer.BlockHash {
return nil
}

if !fulfilled && len(bc.diffPeersToDiffHashes[pid]) > maxDiffLimitForBroadcast {
log.Error("too many accumulated diffLayers", "pid", pid)
log.Debug("too many accumulated diffLayers", "pid", pid)
return nil
}

if len(bc.diffPeersToDiffHashes[pid]) > maxDiffLimit {
log.Error("too many accumulated diffLayers", "pid", pid)
log.Debug("too many accumulated diffLayers", "pid", pid)
return nil
}
if _, exist := bc.diffPeersToDiffHashes[pid]; exist {
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func TestPruneDiffLayer(t *testing.T) {
if len(fullBackend.chain.diffNumToBlockHashes) != maxDiffForkDist {
t.Error("unexpected size of diffNumToBlockHashes")
}
if len(fullBackend.chain.diffPeersToDiffHashes) != 2 {
if len(fullBackend.chain.diffPeersToDiffHashes) != 1 {
t.Error("unexpected size of diffPeersToDiffHashes")
}
if len(fullBackend.chain.blockHashToDiffLayers) != maxDiffForkDist {
Expand Down
13 changes: 7 additions & 6 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ import (

const (
fullProcessCheck = 21 // On diff sync mode, will do full process every fullProcessCheck randomly
recentTime = 2048 * 3
recentDiffLayerTimeout = 20
recentTime = 1024 * 3
recentDiffLayerTimeout = 5
farDiffLayerTimeout = 2
)

Expand All @@ -68,15 +68,16 @@ func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consen
}

type LightStateProcessor struct {
randomGenerator *rand.Rand
check int64
StateProcessor
}

func NewLightStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *LightStateProcessor {
randomGenerator := rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
check := randomGenerator.Int63n(fullProcessCheck)
return &LightStateProcessor{
randomGenerator: randomGenerator,
StateProcessor: *NewStateProcessor(config, bc, engine),
check: check,
StateProcessor: *NewStateProcessor(config, bc, engine),
}
}

Expand All @@ -86,7 +87,7 @@ func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB
allowLightProcess = posa.AllowLightProcess(p.bc, block.Header())
}
// random fallback to full process
if check := p.randomGenerator.Int63n(fullProcessCheck); allowLightProcess && check != 0 && len(block.Transactions()) != 0 {
if allowLightProcess && block.NumberU64()%fullProcessCheck != uint64(p.check) && len(block.Transactions()) != 0 {
var pid string
if peer, ok := block.ReceivedFrom.(PeerIDer); ok {
pid = peer.ID()
Expand Down
68 changes: 41 additions & 27 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ var (
ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts

diffFetchTick = 10 * time.Millisecond
diffFetchLimit = 5

qosTuningPeers = 5 // Number of peers to tune based on (best peers)
qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value
Expand Down Expand Up @@ -161,10 +164,10 @@ type Downloader struct {
quitLock sync.Mutex // Lock to prevent double closes

// Testing hooks
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header, ...interface{}) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header, ...interface{}) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult, chan struct{}) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
}

// LightChain encapsulates functions required to synchronise a light chain.
Expand Down Expand Up @@ -232,27 +235,35 @@ type IPeerSet interface {

func EnableDiffFetchOp(peers IPeerSet) DownloadOption {
return func(dl *Downloader) *Downloader {
var hook = func(headers []*types.Header, args ...interface{}) {
if len(args) < 2 {
return
}
peerID, ok := args[1].(string)
if !ok {
return
}
mode, ok := args[0].(SyncMode)
if !ok {
return
}
if ep := peers.GetDiffPeer(peerID); mode == FullSync && ep != nil {
hashes := make([]common.Hash, 0, len(headers))
for _, header := range headers {
hashes = append(hashes, header.Hash())
}
ep.RequestDiffLayers(hashes)
var hook = func(results []*fetchResult, stop chan struct{}) {
if dl.getMode() == FullSync {
go func() {
ticker := time.NewTicker(diffFetchTick)
defer ticker.Stop()
for _, r := range results {
Wait:
for {
select {
case <-stop:
return
case <-ticker.C:
if dl.blockchain.CurrentHeader().Number.Int64()+int64(diffFetchLimit) > r.Header.Number.Int64() {
break Wait
}
}
}
if ep := peers.GetDiffPeer(r.pid); ep != nil {
// It turns out a diff layer is 5x larger than block, we just request one diffLayer each time
err := ep.RequestDiffLayers([]common.Hash{r.Header.Hash()})
if err != nil {
return
}
}
}
}()
}
}
dl.bodyFetchHook = hook
dl.chainInsertHook = hook
return dl
}
}
Expand Down Expand Up @@ -1405,7 +1416,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
// - kind: textual label of the type being downloaded to display in log messages
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),
fetchHook func([]*types.Header, ...interface{}), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {

// Create a ticker to detect expired retrieval tasks
Expand Down Expand Up @@ -1554,7 +1565,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if fetchHook != nil {
fetchHook(request.Headers, d.getMode(), peer.id)
fetchHook(request.Headers)
}
if err := fetch(peer, request); err != nil {
// Although we could try and make an attempt to fix this, this error really
Expand Down Expand Up @@ -1759,12 +1770,15 @@ func (d *Downloader) processFullSyncContent() error {
if len(results) == 0 {
return nil
}
stop := make(chan struct{})
if d.chainInsertHook != nil {
d.chainInsertHook(results)
d.chainInsertHook(results, stop)
}
if err := d.importBlockResults(results); err != nil {
close(stop)
return err
}
close(stop)
}
}

Expand Down Expand Up @@ -1850,7 +1864,7 @@ func (d *Downloader) processFastSyncContent() error {
}
}
if d.chainInsertHook != nil {
d.chainInsertHook(results)
d.chainInsertHook(results, nil)
}
// If we haven't downloaded the pivot block yet, check pivot staleness
// notifications from the header downloader
Expand Down
6 changes: 3 additions & 3 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) {

// Wrap the importer to allow stepping
blocked, proceed := uint32(0), make(chan struct{})
tester.downloader.chainInsertHook = func(results []*fetchResult) {
tester.downloader.chainInsertHook = func(results []*fetchResult, _ chan struct{}) {
atomic.StoreUint32(&blocked, uint32(len(results)))
<-proceed
}
Expand Down Expand Up @@ -921,10 +921,10 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) {

// Instrument the downloader to signal body requests
bodiesHave, receiptsHave := int32(0), int32(0)
tester.downloader.bodyFetchHook = func(headers []*types.Header, _ ...interface{}) {
tester.downloader.bodyFetchHook = func(headers []*types.Header) {
atomic.AddInt32(&bodiesHave, int32(len(headers)))
}
tester.downloader.receiptFetchHook = func(headers []*types.Header, _ ...interface{}) {
tester.downloader.receiptFetchHook = func(headers []*types.Header) {
atomic.AddInt32(&receiptsHave, int32(len(headers)))
}
// Synchronise with the peer and make sure all blocks were retrieved
Expand Down
6 changes: 4 additions & 2 deletions eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,18 @@ type fetchRequest struct {
// all outstanding pieces complete and the result as a whole can be processed.
type fetchResult struct {
pending int32 // Flag telling what deliveries are outstanding
pid string

Header *types.Header
Uncles []*types.Header
Transactions types.Transactions
Receipts types.Receipts
}

func newFetchResult(header *types.Header, fastSync bool) *fetchResult {
func newFetchResult(header *types.Header, fastSync bool, pid string) *fetchResult {
item := &fetchResult{
Header: header,
pid: pid,
}
if !header.EmptyBody() {
item.pending |= (1 << bodyType)
Expand Down Expand Up @@ -503,7 +505,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
// we can ask the resultcache if this header is within the
// "prioritized" segment of blocks. If it is not, we need to throttle

stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == FastSync)
stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == FastSync, p.id)
if stale {
// Don't put back in the task queue, this item has already been
// delivered upstream
Expand Down
4 changes: 2 additions & 2 deletions eth/downloader/resultstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (r *resultStore) SetThrottleThreshold(threshold uint64) uint64 {
// throttled - if true, the store is at capacity, this particular header is not prio now
// item - the result to store data into
// err - any error that occurred
func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, throttled bool, item *fetchResult, err error) {
func (r *resultStore) AddFetch(header *types.Header, fastSync bool, pid string) (stale, throttled bool, item *fetchResult, err error) {
r.lock.Lock()
defer r.lock.Unlock()

Expand All @@ -85,7 +85,7 @@ func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, thro
return stale, throttled, item, err
}
if item == nil {
item = newFetchResult(header, fastSync)
item = newFetchResult(header, fastSync, pid)
r.items[index] = item
}
return stale, throttled, item, err
Expand Down
2 changes: 1 addition & 1 deletion eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var Defaults = Config{
TrieTimeout: 60 * time.Minute,
TriesInMemory: 128,
SnapshotCache: 102,
DiffBlock: uint64(864000),
DiffBlock: uint64(86400),
Miner: miner.Config{
GasFloor: 8000000,
GasCeil: 8000000,
Expand Down
1 change: 0 additions & 1 deletion eth/handler_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func (h *diffHandler) RunPeer(peer *diff.Peer, hand diff.Handler) error {
ps.lock.Unlock()
return err
}
defer h.chain.RemoveDiffPeer(peer.ID())
return (*handler)(h).runDiffExtension(peer, hand)
}

Expand Down
2 changes: 1 addition & 1 deletion eth/protocols/diff/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const (
softResponseLimit = 2 * 1024 * 1024

// maxDiffLayerServe is the maximum number of diff layers to serve.
maxDiffLayerServe = 1024
maxDiffLayerServe = 128
)

var requestTracker = NewTracker(time.Minute)
Expand Down
10 changes: 8 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ const (
closedState
)

const chainDataHandlesPercentage = 80

// New creates a new P2P node, ready for protocol registration.
func New(conf *Config) (*Node, error) {
// Copy config and resolve the datadir so future changes to the current
Expand Down Expand Up @@ -580,12 +582,16 @@ func (n *Node) OpenDatabase(name string, cache, handles int, namespace string, r
}

func (n *Node) OpenAndMergeDatabase(name string, cache, handles int, freezer, diff, namespace string, readonly, persistDiff bool) (ethdb.Database, error) {
chainDB, err := n.OpenDatabaseWithFreezer(name, cache, handles, freezer, namespace, readonly)
chainDataHandles := handles
if persistDiff {
chainDataHandles = handles * chainDataHandlesPercentage / 100
}
chainDB, err := n.OpenDatabaseWithFreezer(name, cache, chainDataHandles, freezer, namespace, readonly)
if err != nil {
return nil, err
}
if persistDiff {
diffStore, err := n.OpenDiffDatabase(name, handles, diff, namespace, readonly)
diffStore, err := n.OpenDiffDatabase(name, handles-chainDataHandles, diff, namespace, readonly)
if err != nil {
chainDB.Close()
return nil, err
Expand Down

0 comments on commit 9603407

Please sign in to comment.