@@ -82,13 +82,16 @@ var (
82
82
const (
83
83
bodyCacheLimit = 256
84
84
blockCacheLimit = 256
85
+ diffLayerCacheLimit = 1024
85
86
receiptsCacheLimit = 10000
86
87
txLookupCacheLimit = 1024
87
88
maxFutureBlocks = 256
88
89
maxTimeFutureBlocks = 30
89
- badBlockLimit = 10
90
90
maxBeyondBlocks = 2048
91
91
92
+ diffLayerfreezerRecheckInterval = 3 * time .Second
93
+ diffLayerfreezerBlockLimit = 864000 // The number of blocks that should be kept in disk.
94
+
92
95
// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
93
96
//
94
97
// Changelog:
@@ -188,13 +191,15 @@ type BlockChain struct {
188
191
currentBlock atomic.Value // Current head of the block chain
189
192
currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)
190
193
191
- stateCache state.Database // State database to reuse between imports (contains state cache)
192
- bodyCache * lru.Cache // Cache for the most recent block bodies
193
- bodyRLPCache * lru.Cache // Cache for the most recent block bodies in RLP encoded format
194
- receiptsCache * lru.Cache // Cache for the most recent receipts per block
195
- blockCache * lru.Cache // Cache for the most recent entire blocks
196
- txLookupCache * lru.Cache // Cache for the most recent transaction lookup data.
197
- futureBlocks * lru.Cache // future blocks are blocks added for later processing
194
+ stateCache state.Database // State database to reuse between imports (contains state cache)
195
+ bodyCache * lru.Cache // Cache for the most recent block bodies
196
+ bodyRLPCache * lru.Cache // Cache for the most recent block bodies in RLP encoded format
197
+ receiptsCache * lru.Cache // Cache for the most recent receipts per block
198
+ blockCache * lru.Cache // Cache for the most recent entire blocks
199
+ txLookupCache * lru.Cache // Cache for the most recent transaction lookup data.
200
+ futureBlocks * lru.Cache // future blocks are blocks added for later processing
201
+ diffLayerCache * lru.Cache // Cache for the diffLayers
202
+ diffQueue * prque.Prque // A Priority queue to store recent diff layer
198
203
199
204
quit chan struct {} // blockchain quit channel
200
205
wg sync.WaitGroup // chain processing wait group for shutting down
@@ -226,6 +231,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
226
231
blockCache , _ := lru .New (blockCacheLimit )
227
232
txLookupCache , _ := lru .New (txLookupCacheLimit )
228
233
futureBlocks , _ := lru .New (maxFutureBlocks )
234
+ diffLayerCache , _ := lru .New (diffLayerCacheLimit )
229
235
230
236
bc := & BlockChain {
231
237
chainConfig : chainConfig ,
@@ -244,10 +250,12 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
244
250
bodyRLPCache : bodyRLPCache ,
245
251
receiptsCache : receiptsCache ,
246
252
blockCache : blockCache ,
253
+ diffLayerCache : diffLayerCache ,
247
254
txLookupCache : txLookupCache ,
248
255
futureBlocks : futureBlocks ,
249
256
engine : engine ,
250
257
vmConfig : vmConfig ,
258
+ diffQueue : prque .New (nil ),
251
259
}
252
260
bc .validator = NewBlockValidator (chainConfig , bc , engine )
253
261
bc .processor = NewStateProcessor (chainConfig , bc , engine )
@@ -396,6 +404,10 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
396
404
triedb .SaveCachePeriodically (bc .cacheConfig .TrieCleanJournal , bc .cacheConfig .TrieCleanRejournal , bc .quit )
397
405
}()
398
406
}
407
+ // Need persist and prune diff layer
408
+ if bc .db .DiffStore () != nil {
409
+ go bc .diffLayerFreeze ()
410
+ }
399
411
return bc , nil
400
412
}
401
413
@@ -408,6 +420,14 @@ func (bc *BlockChain) CacheReceipts(hash common.Hash, receipts types.Receipts) {
408
420
bc .receiptsCache .Add (hash , receipts )
409
421
}
410
422
423
+ func (bc * BlockChain ) CacheDiffLayer (hash common.Hash , num uint64 , diffLayer * types.DiffLayer ) {
424
+ bc .diffLayerCache .Add (hash , diffLayer )
425
+ if bc .db .DiffStore () != nil {
426
+ // push to priority queue before persisting
427
+ bc .diffQueue .Push (diffLayer , - (int64 (num )))
428
+ }
429
+ }
430
+
411
431
func (bc * BlockChain ) CacheBlock (hash common.Hash , block * types.Block ) {
412
432
bc .blockCache .Add (hash , block )
413
433
}
@@ -1506,10 +1526,19 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
1506
1526
wg .Done ()
1507
1527
}()
1508
1528
// Commit all cached state changes into underlying memory database.
1509
- root , err := state .Commit (bc .chainConfig .IsEIP158 (block .Number ()))
1529
+ root , diffLayer , err := state .Commit (bc .chainConfig .IsEIP158 (block .Number ()))
1510
1530
if err != nil {
1511
1531
return NonStatTy , err
1512
1532
}
1533
+
1534
+ // Ensure no empty block body
1535
+ if diffLayer != nil && block .Header ().TxHash != types .EmptyRootHash {
1536
+ // Filling necessary field
1537
+ diffLayer .Receipts = receipts
1538
+ diffLayer .StateRoot = root
1539
+ diffLayer .Hash = block .Hash ()
1540
+ bc .CacheDiffLayer (diffLayer .Hash , block .Number ().Uint64 (), diffLayer )
1541
+ }
1513
1542
triedb := bc .stateCache .TrieDB ()
1514
1543
1515
1544
// If we're running an archive node, always flush
@@ -1895,8 +1924,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
1895
1924
bc .reportBlock (block , receipts , err )
1896
1925
return it .index , err
1897
1926
}
1898
- bc .CacheReceipts (block .Hash (), receipts )
1899
- bc .CacheBlock (block .Hash (), block )
1900
1927
// Update the metrics touched during block processing
1901
1928
accountReadTimer .Update (statedb .AccountReads ) // Account reads are complete, we can mark them
1902
1929
storageReadTimer .Update (statedb .StorageReads ) // Storage reads are complete, we can mark them
@@ -1916,6 +1943,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
1916
1943
log .Error ("validate state failed" , "error" , err )
1917
1944
return it .index , err
1918
1945
}
1946
+ bc .CacheReceipts (block .Hash (), receipts )
1947
+ bc .CacheBlock (block .Hash (), block )
1919
1948
proctime := time .Since (start )
1920
1949
1921
1950
// Update the metrics touched during block validation
@@ -2292,6 +2321,77 @@ func (bc *BlockChain) update() {
2292
2321
}
2293
2322
}
2294
2323
2324
+ func (bc * BlockChain ) diffLayerFreeze () {
2325
+ recheck := time .Tick (diffLayerfreezerRecheckInterval )
2326
+ for {
2327
+ select {
2328
+ case <- bc .quit :
2329
+ // Persist all diffLayers when shutdown, it will introduce redundant storage, but it is acceptable.
2330
+ // If the client been ungracefully shutdown, it will missing all cached diff layers, it is acceptable as well.
2331
+ var batch ethdb.Batch
2332
+ for ! bc .diffQueue .Empty () {
2333
+ diff , _ := bc .diffQueue .Pop ()
2334
+ diffLayer := diff .(* types.DiffLayer )
2335
+ if batch == nil {
2336
+ batch = bc .db .DiffStore ().NewBatch ()
2337
+ }
2338
+ rawdb .WriteDiffLayer (batch , diffLayer .Hash , diffLayer )
2339
+ if batch .ValueSize () > ethdb .IdealBatchSize {
2340
+ if err := batch .Write (); err != nil {
2341
+ log .Error ("Failed to write diff layer" , "err" , err )
2342
+ return
2343
+ }
2344
+ batch .Reset ()
2345
+ }
2346
+ }
2347
+ if batch != nil {
2348
+ if err := batch .Write (); err != nil {
2349
+ log .Error ("Failed to write diff layer" , "err" , err )
2350
+ return
2351
+ }
2352
+ batch .Reset ()
2353
+ }
2354
+ return
2355
+ case <- recheck :
2356
+ currentHeight := bc .CurrentBlock ().NumberU64 ()
2357
+ var batch ethdb.Batch
2358
+ for ! bc .diffQueue .Empty () {
2359
+ diff , prio := bc .diffQueue .Pop ()
2360
+ diffLayer := diff .(* types.DiffLayer )
2361
+
2362
+ // if the block old enough
2363
+ if int64 (currentHeight )+ prio > int64 (bc .triesInMemory ) {
2364
+ canonicalHash := bc .GetCanonicalHash (uint64 (- prio ))
2365
+ // on the canonical chain
2366
+ if canonicalHash == diffLayer .Hash {
2367
+ if batch == nil {
2368
+ batch = bc .db .DiffStore ().NewBatch ()
2369
+ }
2370
+ rawdb .WriteDiffLayer (batch , diffLayer .Hash , diffLayer )
2371
+ staleHash := bc .GetCanonicalHash (uint64 (- prio ) - diffLayerfreezerBlockLimit )
2372
+ rawdb .DeleteDiffLayer (batch , staleHash )
2373
+ }
2374
+ } else {
2375
+ bc .diffQueue .Push (diffLayer , prio )
2376
+ break
2377
+ }
2378
+ if batch != nil && batch .ValueSize () > ethdb .IdealBatchSize {
2379
+ if err := batch .Write (); err != nil {
2380
+ panic (fmt .Sprintf ("Failed to write diff layer, error %v" , err ))
2381
+ }
2382
+ batch .Reset ()
2383
+ }
2384
+ }
2385
+ if batch != nil {
2386
+ if err := batch .Write (); err != nil {
2387
+ panic (fmt .Sprintf ("Failed to write diff layer, error %v" , err ))
2388
+ }
2389
+ batch .Reset ()
2390
+ }
2391
+ }
2392
+ }
2393
+ }
2394
+
2295
2395
// maintainTxIndex is responsible for the construction and deletion of the
2296
2396
// transaction index.
2297
2397
//
0 commit comments